1 module geario.net.channel.iocp.AbstractStream;
2 
3 // dfmt off
4 version (HAVE_IOCP) : 
5 // dfmt on
6 
7 import geario.event.selector.Selector;
8 import geario.net.channel.AbstractSocketChannel;
9 import geario.net.channel.ChannelTask;
10 import geario.net.channel.Types;
11 import geario.net.channel.iocp.Common;
12 import geario.logging;
13 import geario.Functions;
14 import geario.event.selector.IOCP;
15 import geario.system.Error;
16 import geario.util.ThreadHelper;
17 import geario.util.worker;
18 import nbuff;
19 import core.atomic;
20 import core.sys.windows.windows;
21 import core.sys.windows.winsock2;
22 import core.sys.windows.mswsock;
23 import std.format;
24 import std.socket;
25 import std.string;
26 import core.stdc.string;
27 
28 /**
29 TCP Peer
30 */
31 abstract class AbstractStream : AbstractSocketChannel {
32 
33     // data event handlers
34     
35     /**
36     * Warning: The received data is stored a inner buffer. For a data safe, 
37     * you would make a copy of it. 
38     */
39     protected DataReceivedHandler dataReceivedHandler;
40     protected DataSendedHandler dataSendedHandler;
41     protected SimpleActionHandler dataWriteDoneHandler;
42 
43     protected NbuffChunk _bufferForRead;
44     protected AddressFamily _family;
45 
46     private size_t _bufferSize = 4096;
47     private ChannelTask _task = null;
48     
49 
50     this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) {
51         _bufferSize = bufferSize;
52         super(loop, ChannelType.TCP);
53         // setFlag(ChannelFlag.Read, true);
54         // setFlag(ChannelFlag.Write, true);
55 
56         // version (GEAR_IO_DEBUG)
57         //     Trace("Buffer size: ", bufferSize);
58         // _readBuffer = new ubyte[bufferSize];
59         // _readBytes = Nbuff.get(_bufferSize);
60         //_bufferForRead = BufferUtils.allocate(bufferSize);
61 
62         //_readBuffer = cast(ubyte[])_bufferForRead.toString();
63         // _writeQueue = new WritingBufferQueue();
64         // this.socket = new TcpSocket(family);
65 
66         loadWinsockExtension(this.handle);
67     }
68 
69     mixin CheckIocpError;
70 
71     abstract bool IsClient();
72 
73     override void OnRead() {
74         version (GEAR_IO_DEBUG)
75             Trace("ready to read");
76         super.OnRead();
77     }
78 
79     /**
80      * Should be thread-safe.
81      */
82     override void OnWrite() {  
83         version (GEAR_IO_DEBUG)
84             log.trace("checking write status, isWritting: %s, writeBytes: %s", _isWritting, writeBytes is null);
85 
86         //if(!_isWritting){
87         //    version (GEAR_IO_DEBUG) log.info("No data to write out. fd=%d", this.handle);
88         //    return;
89         //}
90 
91         if(IsClosing() && _isWriteCancelling) {
92             version (GEAR_IO_DEBUG) log.info("Write cancelled, fd=%d", this.handle);
93             ResetWriteStatus();
94             return;
95         }
96         TryNextBufferWrite();
97     }
98     
99     protected override void OnClose() {
100         _isWritting = false;
101         ResetWriteStatus();
102         if(this._socket is null) {
103             import core.sys.windows.winsock2;
104             .closesocket(this.handle);
105         } else {
106             // FIXME: Needing refactor or cleanup -@Administrator at 2019/8/9 1:20:27 pm
107             //
108             //while(!_isSingleWriteBusy)
109             //{
110                 this._socket.shutdown(SocketShutdown.BOTH);
111                 this._socket.close();
112             //}
113         }
114         super.OnClose();
115     }
116 
117     void BeginRead() {
118         // https://docs.microsoft.com/en-us/windows/desktop/api/winsock2/nf-winsock2-wsarecv
119         ///  _isSingleWriteBusy = true;
120         auto b = Nbuff.get(_bufferSize);
121 
122         _readBuffer = b.data();
123 
124         WSABUF _dataReadBuffer;
125         _dataReadBuffer.len = cast(uint) _readBuffer.length;
126         _dataReadBuffer.buf = cast(char*) _readBuffer.ptr;
127         memset( &_iocpread.overlapped , 0, _iocpread.overlapped.sizeof );
128         _iocpread.channel = this;
129         _iocpread.operation = IocpOperation.read;
130         DWORD dwReceived = 0;
131         DWORD dwFlags = 0;
132         version (GEAR_IO_DEBUG)
133             log.trace("start receiving [fd=%d] ", this.socket.handle);
134         // _isSingleWriteBusy = true;
135         int nRet = WSARecv(cast(SOCKET) this.socket.handle, &_dataReadBuffer, 1u, &dwReceived, &dwFlags, &_iocpread.overlapped, cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null);
136 
137         if (nRet == SOCKET_ERROR && (GetLastError() != ERROR_IO_PENDING)) {
138             _isSingleWriteBusy = false;
139             Close();
140         }
141         //checkErro(nRet, SOCKET_ERROR);
142     }
143 
144     protected bool DoConnect(Address addr) {
145         Address binded = CreateAddress(this.socket.addressFamily);
146         _isSingleWriteBusy = true;
147         this.socket.bind(binded);
148         _iocpread.channel = this;
149         _iocpread.operation = IocpOperation.connect;
150 
151         import std.datetime.stopwatch;
152         auto sw = StopWatch(AutoStart.yes);
153         sw.start();
154         scope(exit) {
155             sw.stop();
156         }
157 
158         // https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nc-mswsock-lpfn_connectex
159         int nRet = ConnectEx(cast(SOCKET) this.socket.handle(), cast(SOCKADDR*) addr.name(), 
160             addr.nameLen(), null, 0, null, &_iocpread.overlapped);
161         checkErro(nRet, SOCKET_ERROR);
162 
163         if(this._error) 
164             return false;
165 
166         // https://docs.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-getsockopt
167         int seconds = 0;
168         int bytes = seconds.sizeof;
169         int iResult = 0;
170 
171         CHECK: 
172         iResult = getsockopt(cast(SOCKET) this.socket.handle(), SOL_SOCKET, SO_CONNECT_TIME,
173                             cast(void*)&seconds, cast(PINT)&bytes);
174 
175         bool result = false;
176         if ( iResult != NO_ERROR ) {
177             DWORD dwLastError = WSAGetLastError();
178             log.warn("getsockopt(SO_CONNECT_TIME) failed with Error: code=%d, message=%s", 
179                 dwLastError, GetErrorMessage(dwLastError));
180         } else {
181             if (seconds == 0xFFFFFFFF) {
182                 version(GEAR_IO_DEBUG) log.warn("Connection not established yet (destination: %s).", addr);
183                 // so to check again
184                 goto CHECK;
185             } else {
186                 result = true;
187                 version(GEAR_IO_DEBUG) {
188                     //
189                     log.info("Connection has been established in %d msecs, destination: %s", sw.peek.total!"msecs", addr);
190                 }
191                 // https://docs.microsoft.com/en-us/windows/win32/winsock/sol-socket-socket-options
192                 enum SO_UPDATE_CONNECT_CONTEXT = 0x7010;
193                 iResult = setsockopt(cast(SOCKET) this.socket.handle(), SOL_SOCKET, 
194                     SO_UPDATE_CONNECT_CONTEXT, NULL, 0 );
195             }
196         }
197         
198         return result;
199     }
200 
201     private uint DoWrite(const(ubyte)[] data) {
202         DWORD dwSent = 0;//cast(DWORD)data.length;
203         DWORD dwFlags = 0;
204 
205         memset(&_iocpwrite.overlapped , 0 ,_iocpwrite.overlapped.sizeof );
206         _iocpwrite.channel = this;
207         _iocpwrite.operation = IocpOperation.write;
208         // log.trace("To write %d bytes, fd=%d", data.length, this.socket.handle());
209         version (GEAR_IO_DEBUG) {
210             size_t bufferLength = data.length;
211             log.trace("To write %d bytes", bufferLength);
212             if (bufferLength > 32)
213                 log.trace("%(%02X %) ...", data[0 .. 32]);
214             else
215                 log.trace("%s", data);
216         }
217         // size_t bufferLength = data.length;
218         //     log.trace("To write %d bytes", bufferLength);
219         //     log.trace("%s", data);
220         WSABUF _dataWriteBuffer;
221 
222         //char[] bf = new char[data.length];
223         //memcpy(bf.ptr,data.ptr,data.length);
224         //_dataWriteBuffer.buf =  bf.ptr;
225         _dataWriteBuffer.buf = cast(char*) data.ptr;
226         _dataWriteBuffer.len = cast(uint) data.length;
227         // _isSingleWriteBusy = true;
228         int nRet = WSASend( cast(SOCKET) this.socket.handle(), &_dataWriteBuffer, 1, &dwSent,
229         dwFlags, &_iocpwrite.overlapped, cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null);
230         // if (nRet != NO_ERROR && (GetLastError() != ERROR_IO_PENDING))
231         // {
232         //     _isSingleWriteBusy = false;
233         //     // Close();
234         // }
235 
236         checkErro( nRet, SOCKET_ERROR);
237 
238         // FIXME: Needing refactor or cleanup -@Administrator at 2019/8/9 12:18:20 pm
239         // Keep this to prevent the buffer corrupted. Why?
240         version (GEAR_IO_DEBUG) {
241             log.trace("sent: %d / %d bytes, fd=%d", dwSent, bufferLength, this.handle);
242         }
243 
244         if (this.IsError) {
245             log.error("Socket Error on write: fd=%d, message=%s", this.handle, this.ErrorMessage);
246             this.Close();
247         }
248 
249         return dwSent;
250     }
251 
252     protected void DoRead() {
253         //_isSingleWriteBusy = false;
254         this.ClearError();
255         version (GEAR_IO_DEBUG)
256             log.trace("start reading: %d nbytes", this.readLen);
257 
258         if (readLen > 0) {
259             // import std.stdio;
260             // writefln("length=%d, data: %(%02X %)", readLen, _readBuffer[0 .. readLen]);
261             HandleReceivedData(readLen);
262 
263             // Go on reading
264             this.BeginRead();
265 
266         } else if (readLen == 0) {
267             version (GEAR_IO_DEBUG) {
268                 if (_remoteAddress !is null)
269                     log.warn("connection broken: %s", _remoteAddress.toString());
270             }
271             OnDisconnected();
272             // if (_isClosed)
273             //     this.Close();
274         } else {
275             version (GEAR_IO_DEBUG) {
276                 log.warn("undefined behavior on thread %d", GetTid());
277             } else {
278                 this._error = true;
279                 this._errorMessage = "undefined behavior on thread";
280             }
281         }
282     }
283 
284     private void HandleReceivedData(ptrdiff_t len) {
285         version (GEAR_IO_DEBUG)
286             log.trace("reading done: %d nbytes", readLen);
287 
288         if (dataReceivedHandler is null) 
289             return;
290 
291         // _bufferForRead.limit(cast(int)readLen);
292         // _bufferForRead.position(0);
293         // dataReceivedHandler(_bufferForRead);
294 
295         // Bytes bufferCopy;
296         import std.algorithm : copy;
297         auto buffer = Nbuff.get(len);
298         copy((cast(string)_readBuffer[0 .. len]).representation, buffer.data);
299 
300         NbuffChunk bytes = NbuffChunk(buffer, len);
301 
302         // bufferCopy.opAssign(_bufferForRead);
303         if(taskWorker is null) {
304             dataReceivedHandler(bytes);
305         } else {
306             ChannelTask task = _task;
307 
308             // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-02-05T09:18:02+08:00
309             // More tests needed
310             if(task is null || task.IsFinishing()) {
311                 task = CreateChannelTask();
312                 _task = task;
313 
314             } else {
315                 version(GEAR_METRIC) {
316                     log.warn("Request peeding... Task status: %s", task.status);
317                 }
318             }
319 
320             task.put(bytes);
321         }        
322     }
323 
324     private ChannelTask CreateChannelTask() {
325         ChannelTask task = new ChannelTask();
326         task.dataReceivedHandler = dataReceivedHandler;
327         taskWorker.put(task);
328         return task;
329     }
330 
331     // try to write a block of data directly
332     protected size_t TryWrite(const ubyte[] data) {        
333         version (GEAR_IO_DEBUG)
334             log.trace("start to write, total=%d bytes, fd=%d", data.length, this.handle);
335         ClearError();
336         size_t nBytes;
337         //scope(exit) {
338         //    _isSingleWriteBusy = false;
339         //}
340         if (!_isSingleWriteBusy)
341         {
342              nBytes = DoWrite(data);
343         }
344 
345         return nBytes;
346     }
347 
348     // try to write a block of data from the write queue
349     private void TryNextBufferWrite() {
350         if(CheckAllWriteDone()){
351             _isSingleWriteBusy = false;
352             // if (!IsClient())
353             // {
354             //     this.BeginRead();
355             // }
356             return;
357         } 
358         
359         // keep thread-safe here
360         //if(!cas(&_isSingleWriteBusy, false, true)) {
361         //    version (GEAR_IO_DEBUG) log.warn("busy writing. fd=%d", this.handle);
362         //    return;
363         //}
364 
365         //scope(exit) {
366         //    _isSingleWriteBusy = false;
367         //}
368 
369         ClearError();
370 
371         // bool haveBuffer = _writeQueue.TryDequeue(writeBytes);
372         writeBytes = _senddingBuffer.frontChunk();
373         _senddingBuffer.popChunk();
374         WriteBufferRemaining();
375     }
376 
377     private void WriteBufferRemaining() {
378         if ( writeBytes.empty() )
379         {
380             return;
381         }
382         const(ubyte)[] data = cast(const(ubyte)[])writeBytes.data();
383 
384         size_t nBytes = DoWrite(data);
385 
386         version (GEAR_IO_DEBUG)
387             log.trace("written data: %d bytes, fd=%d", nBytes, this.handle);
388         if(nBytes == data.length) {
389             writeBytes.popBackN(writeBytes.length);
390         } else if (nBytes > 0) { 
391             writeBytes.popFrontN(nBytes);
392             version (GEAR_IO_DEBUG)
393                 log.warn("remaining data: %d / %d, fd=%d", data.length - nBytes, data.length, this.handle);
394         } else { 
395             version (GEAR_IO_DEBUG)
396             log.warn("I/O busy: writing. fd=%d", this.handle);
397         }   
398     }
399     
400     protected bool CheckAllWriteDone()
401     {
402         if ( _senddingBuffer.empty() && writeBytes.empty() )
403         {
404             ResetWriteStatus();
405 
406             version (GEAR_IO_DEBUG)
407                 log.trace("All data are written out. fd=%d", this.handle);
408 
409             if(dataWriteDoneHandler !is null)
410                 dataWriteDoneHandler(this);
411 
412             return true;
413         }
414 
415         return false;
416     }
417     
418     void ResetWriteStatus()
419     {
420         if(!_senddingBuffer.empty())
421             _senddingBuffer.clear();
422 
423         _isWritting = false;
424         _isWriteCancelling = false;
425         sendDataBuffer = null;
426         sendDataBackupBuffer = null;
427         if (!writeBytes.empty())
428             writeBytes.popBackN(writeBytes.length);
429         _isSingleWriteBusy = false;
430     }
431 
432     /**
433      * Called by selector after data sent
434      * Note: It's only for IOCP selector: 
435     */
436     void OnWriteDone(size_t nBytes) {
437         version (GEAR_IO_DEBUG) {
438             log.trace("write done once: %d bytes, isWritting: %s, writeBytes: %s, fd=%d",
439                  nBytes, _isWritting, writeBytes is null, this.handle);
440         }
441         //if (_isWriteCancelling) {
442         //    version (GEAR_IO_DEBUG) log.trace("write cancelled.");
443         //    ResetWriteStatus();
444         //    return;
445         //}
446 
447 
448         //while(_isSingleWriteBusy) {
449         //    version(GEAR_IO_DEBUG)
450         //    Info("waiting for last writting get finished...");
451         //}
452 
453         version (GEAR_IO_DEBUG) {
454             log.trace("write done once: %d bytes, isWritting: %s, writeBytes: %s, fd=%d",
455                  nBytes, _isWritting, writeBytes is null, this.handle);
456         }
457 
458         if (!writeBytes.empty()) {
459             version (GEAR_IO_DEBUG) log.trace("try to write the remaining in buffer.");
460             WriteBufferRemaining();
461         }  else {
462             version (GEAR_IO_DEBUG) log.trace("try to write next buffer.");
463             TryNextBufferWrite();
464         }
465     }
466 
467     private void NotifyDataWrittenDone() {
468         if(dataWriteDoneHandler !is null && _senddingBuffer.empty) {
469             dataWriteDoneHandler(this);
470         }
471     }
472     
473     DataReceivedHandler GetDataReceivedHandler() {
474         return dataReceivedHandler;
475     }
476 
477     void CancelWrite() {
478         _isWriteCancelling = true;
479     }
480 
481     abstract bool IsConnected() nothrow;
482     abstract protected void OnDisconnected();
483 
484     protected void InitializeWriteQueue() {
485         // if (_writeQueue is null) {
486         //     _writeQueue = new WritingBufferQueue();
487         // }
488     }
489 
490     SimpleEventHandler disconnectionHandler;
491     
492     // protected WritingBufferQueue _writeQueue;
493     protected Nbuff _senddingBuffer;
494     protected bool _isWriteCancelling = false;
495     private  bool _isSingleWriteBusy = false; // keep a single I/O write operation atomic
496     private NbuffChunk _readBytes;
497     private ubyte[] _readBuffer;
498     private const(ubyte)[] sendDataBuffer;
499     private const(ubyte)[] sendDataBackupBuffer;
500     private NbuffChunk writeBytes; 
501 
502     private IocpContext _iocpread;
503     private IocpContext _iocpwrite;
504 }