1 module geario.net.channel.posix.AbstractStream;
2 
3 // dfmt off
4 version(Posix):
5 // dfmt on
6 
7 import geario.event.selector.Selector;
8 import geario.Functions;
9 import geario.net.channel.AbstractSocketChannel;
10 import geario.net.channel.ChannelTask;
11 import geario.net.channel.Types;
12 import geario.net.IoError;
13 import geario.logging;
14 import geario.system.Error;
15 import geario.util.worker;
16 
17 import nbuff;
18 
19 import std.format;
20 import std.socket;
21 
22 import core.atomic;
23 import core.stdc.errno;
24 import core.stdc.string;
25 
26 import core.sys.posix.sys.socket : accept;
27 import core.sys.posix.unistd;
28 
29 /**
30 TCP Peer
31 */
32 abstract class AbstractStream : AbstractSocketChannel {
33     protected size_t _bufferSize = 4096;
34     private NbuffChunk _writeBytes;
35     private ChannelTask _task = null;
36     size_t _receivedLen = 0;
37 
38     /**
39     * Warning: The received data is stored a inner buffer. For a data safe,
40     * you would make a copy of it.
41     */
42     protected DataReceivedHandler dataReceivedHandler;
43     protected DataSendedHandler dataSendedHandler;
44     protected SimpleEventHandler disconnectionHandler;
45     protected SimpleActionHandler dataWriteDoneHandler;
46 
47     protected AddressFamily _family;
48     // protected Buffer _bufferForRead;
49     // protected WritingBufferQueue _writeQueue;
50     protected Nbuff _senddingBuffer;
51     protected bool _isWriteCancelling = false;
52 
53     this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) {
54         this._family = family;
55         _bufferSize = bufferSize;
56         super(loop, ChannelType.TCP);
57         setFlag(ChannelFlag.Read, true);
58         setFlag(ChannelFlag.Write, true);
59         setFlag(ChannelFlag.ETMode, true);
60     }
61 
62     abstract bool IsClient();
63     abstract bool IsConnected() nothrow;
64     abstract protected void OnDisconnected();
65 
66     private void onDataReceived(NbuffChunk bytes) {
67         if(taskWorker is null) {
68             // TODO: Tasks pending completion -@zhangxueping at 2021-03-09T09:59:00+08:00
69             // Using memory pool
70             if (dataReceivedHandler !is null) {
71                 dataReceivedHandler(bytes);
72             }
73         } else {
74             ChannelTask task = _task;
75 
76             // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-02-05T09:18:02+08:00
77             // More tests needed
78             if(task is null || task.IsFinishing()) {
79                 task = CreateChannelTask();
80                 _task = task;
81 
82             } else {
83                 version(GEAR_METRIC) {
84                     log.warn("Request peeding... Task status: %s", task.Status);
85                 }
86             }
87 
88             task.put(bytes);
89         }
90     }
91 
92     private ChannelTask CreateChannelTask() {
93         ChannelTask task = new ChannelTask();
94         task.dataReceivedHandler = dataReceivedHandler;
95         taskWorker.put(task);
96         return task;
97     }
98 
99     /**
100      *
101      */
102     protected bool TryRead() {
103         bool isDone = true;
104         this.ClearError();
105 
106         // TODO: Tasks pending completion -@zhangxueping at 2021-03-09T09:59:00+08:00
107         // Using memory pool        
108    
109         // auto readBuffer = Buffer.Get(_bufferSize);
110         // auto readBufferSpace =  readBuffer.data();
111         // NbuffChunk buffer = Nbuff.get(_bufferSize);
112         auto buffer = Nbuff.get(_bufferSize);
113         // ubyte[] readData = buffer.AsArray();
114 
115         // TODO : loop read data
116         ptrdiff_t len = read(this.handle, cast(void*)buffer.ptr, _bufferSize);
117 
118         // ubyte[] rb = new ubyte[BufferSize];
119         // ptrdiff_t len = read(this.handle, cast(void*) rb.ptr, rb.length);
120         version (GEAR_IO_DEBUG) log.trace("reading[fd=%d]: %d bytes", this.handle, len);
121 
122         if (len > 0)
123         {
124             version(GEAR_IO_DEBUG)
125             {
126                 if (len <= 32)
127                     log.info("fd: %d, %d bytes: %(%02X %)", this.handle, len, buffer[0 .. len]);
128                 else
129                     log.info("fd: %d, 32/%d bytes: %(%02X %)", this.handle, len, buffer[0 .. 32]);
130             }
131 
132             // buffer.ReaderIndex(0);
133             // buffer.WriterIndex(len);
134             onDataReceived(NbuffChunk(buffer, len));
135 
136             // It's prossible that there are more data waitting for read in the read I/O space.
137             if (len == _bufferSize) {
138                 version (GEAR_IO_DEBUG) log.info("Read buffer is full read %d bytes. Need to read again.", len);
139                 isDone = false;
140             }
141         }
142         else if (len == Socket.ERROR)
143         {
144             // https://stackoverflow.com/questions/14595269/errno-35-eagain-returned-on-recv-call
145             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13
146             // check more Error status
147             this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK;
148             if (_error)
149             {
150                 this._errorMessage = GetErrorMessage(errno);
151 
152                 if(errno == ECONNRESET)
153                 {
154                     // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean
155                     OnDisconnected();
156                     // ErrorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer");
157                 }
158                 else
159                 {
160                     ErrorOccurred(ErrorCode.INTERRUPTED , "Error occurred on read");
161                 }
162             }
163         }
164         else
165         {
166             version (GEAR_DEBUG) log.info("connection broken: %s, fd:%d", _remoteAddress.toString(), this.handle);
167 
168             OnDisconnected();
169         }
170 
171         return isDone;
172     }
173 
174     override protected void DoClose()
175     {
176         version (GEAR_IO_DEBUG) log.info("peer socket %s closing: fd=%d", this.RemoteAddress.toString(), this.handle);
177 
178         if(this.socket is null)
179         {
180             import core.sys.posix.unistd;
181             core.sys.posix.unistd.close(this.handle);
182         }
183         else
184         {
185             this.socket.shutdown(SocketShutdown.BOTH);
186             this.socket.close();
187         }
188 
189         version (GEAR_IO_DEBUG) log.info("peer socket %s closed: fd=%d", this.RemoteAddress.toString, this.handle);
190 
191         Task task = _task;
192         if(task !is null)
193         {
194             task.Stop();
195         }
196     }
197 
198     /**
199      * Try to write a block of data.
200      */
201     protected ptrdiff_t TryWrite(const(ubyte)[] data) 
202     {
203         ClearError();
204         // const nBytes = this.socket.send(data);
205         version (GEAR_IO_DEBUG) log.trace("try to write: %d bytes, fd=%d", data.length, this.handle);
206 
207         const nBytes = write(this.handle, data.ptr, data.length);
208 
209         version (GEAR_IO_DEBUG)
210             log.trace("actually written: %d / %d bytes, fd=%d", nBytes, data.length, this.handle);
211 
212         if (nBytes > 0) {
213             return nBytes;
214         }
215 
216         if (nBytes == Socket.ERROR) {
217             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38
218             // check more Error status
219             // EPIPE/Broken pipe:
220             // https://github.com/angrave/SystemProgramming/wiki/Networking%2C-Part-7%3A-Nonblocking-I-O%2C-select%28%29%2C-and-epoll
221 
222             if(errno == EAGAIN) {
223                 version (GEAR_IO_DEBUG) {
224                     log.warn("Warning on write: fd=%d, errno=%d, message=%s", this.handle,
225                         errno, GetErrorMessage(errno));
226                 }
227             } else if(errno == EINTR || errno == EWOULDBLOCK) {
228                 // https://stackoverflow.com/questions/38964745/can-a-socket-become-writeable-after-an-ewouldblock-but-before-an-epoll-wait
229                 debug log.warn("Warning on write: fd=%d, errno=%d, message=%s", this.handle,
230                         errno, GetErrorMessage(errno));
231                 // eventLoop.update(this);
232             } else {
233                 this._error = true;
234                 this._errorMessage = GetErrorMessage(errno);
235                 if(errno == ECONNRESET) {
236                     // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean
237                     OnDisconnected();
238                     // ErrorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer");
239                 } else if(errno == EPIPE) {
240                     // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe
241                     // Handle SIGPIPE signal
242                     OnDisconnected();
243                     ErrorOccurred(ErrorCode.BROKENPIPE , "Broken pipe detected!");
244                 }
245 
246             }
247         } else {
248             version (GEAR_DEBUG) {
249                 log.warn("nBytes=%d, message: %s", nBytes, lastSocketError());
250                 assert(false, "Undefined behavior!");
251             } else {
252                 this._error = true;
253             }
254         }
255 
256         return 0;
257     }
258 
259     private bool TryNextWrite(NbuffChunk buffer) {
260         const(ubyte)[] data = cast(const(ubyte)[])buffer.data;
261         version (GEAR_IO_DEBUG) {
262             log.trace("writting from a buffer [fd=%d], %d bytes, buffer: %s",
263                 this.handle, data.length, buffer.data.ptr);
264         }
265 
266         ptrdiff_t remaining = data.length;
267         if(data.length == 0)
268             return true;
269 
270         while(remaining > 0 && !_error && !IsClosing() && !_isWriteCancelling) {
271             ptrdiff_t nBytes = TryWrite(data);
272             version (GEAR_IO_DEBUG)
273             {
274                 log.trace("write out once: fd=%d, %d / %d bytes, remaining: %d buffer: %s",
275                     this.handle, nBytes, data.length, remaining, buffer.AsArray.ptr);
276             }
277 
278             if (nBytes > 0) {
279                 remaining -= nBytes;
280                 data = data[nBytes .. $];
281             }
282         }
283 
284         version (GEAR_IO_DEBUG) {
285             if(remaining == 0) {
286                     log.trace("A buffer is written out. fd=%d", this.handle);
287                 return true;
288             } else {
289                 log.warn("Writing cancelled or an Error ocurred. fd=%d", this.handle);
290                 return false;
291             }
292         } else {
293             return remaining == 0;
294         }
295     }
296 
297     void ResetWriteStatus() {
298         // if(_writeQueue !is null)
299         //     _writeQueue.Clear();
300         if(!_senddingBuffer.empty())
301             _senddingBuffer.clear();
302         atomicStore(_isWritting, false);
303         _isWriteCancelling = false;
304     }
305 
306     /**
307      * Should be thread-safe.
308      */
309     override void OnWrite() {
310         version (GEAR_IO_DEBUG)
311         {
312             log.trace("checking status, isWritting: %s, writeBytes: %s",
313                 _isWritting, _writeBytes.empty() ? "null" : cast(string)_writeBytes.data());
314         }
315 
316         if(!_isWritting) {
317             version (GEAR_IO_DEBUG)
318             log.info("No data needs to be written out. fd=%d", this.handle);
319             return;
320         }
321 
322         if(IsClosing() && _isWriteCancelling) {
323             version (GEAR_DEBUG) log.info("Write cancelled or closed, fd=%d", this.handle);
324             ResetWriteStatus();
325             return;
326         }
327 
328         // FIXME: Needing refactor or cleanup -@zhangxueping at 2020-04-24T14:26:45+08:00
329         // More tests are needed
330         // keep thread-safe here
331         if(!cas(&_isBusyWritting, false, true)) {
332             // version (GEAR_IO_DEBUG)
333             version(GEAR_DEBUG) log.warn("busy writing. fd=%d", this.handle);
334             return;
335         }
336 
337         scope(exit) {
338             _isBusyWritting = false;
339         }
340 
341         if(!_writeBytes.empty()) {
342             if(TryNextWrite(_writeBytes)) {
343                 _writeBytes.popBackN(_writeBytes.length);
344             } else {
345                 version (GEAR_IO_DEBUG)
346                 {
347                     log.info("waiting to try again... fd=%d, writeBytes: %s",
348                         this.handle, cast(string)_writeBytes.AsArray);
349                 }
350                 // eventLoop.update(this);
351                 return;
352             }
353             version (GEAR_IO_DEBUG)
354                 log.trace("running here, fd=%d", this.handle);
355         }
356 
357         if(CheckAllWriteDone()) {
358             return;
359         }
360 
361         version (GEAR_IO_DEBUG) {
362             log.trace("start to write [fd=%d], writeBytes %s empty", this.handle, _writeBytes.empty() ? "is" : "is not");
363         }
364 
365         _writeBytes = _senddingBuffer.frontChunk();
366         _senddingBuffer.popChunk();
367         if(!_writeBytes.empty()) {
368             if(TryNextWrite(_writeBytes)) {
369                 _writeBytes.popBackN(_writeBytes.length);
370                 CheckAllWriteDone();
371             } else {
372             version (GEAR_IO_DEBUG)
373                 log.info("waiting to try again: fd=%d, writeBytes: %s", this.handle, cast(string)_writeBytes.AsArray);
374 
375                 // eventLoop.update(this);
376             }
377             version (GEAR_IO_DEBUG) {
378                 log.warn("running here, fd=%d", this.handle);
379             }
380         }
381     }
382     private shared bool _isBusyWritting = false;
383 
384     protected bool CheckAllWriteDone() {
385         version (GEAR_IO_DEBUG) {
386             import std.conv;
387             log.trace("checking remaining: fd=%d, writeQueue empty: %s", this.handle,
388                _senddingBuffer.empty() ||  _senddingBuffer.empty().to!string());
389         }
390 
391         if(_senddingBuffer.empty()) {
392             ResetWriteStatus();
393             version (GEAR_IO_DEBUG)
394                 log.info("All data are written out: fd=%d", this.handle);
395             if(dataWriteDoneHandler !is null)
396                 dataWriteDoneHandler(this);
397             return true;
398         }
399 
400         return false;
401     }
402 
403     protected void InitializeWriteQueue() {
404         // if (_writeQueue is null) {
405         //     _writeQueue = new WritingBufferQueue();
406         // }
407     }
408 
409     protected bool DoConnect(Address addr) {
410         try {
411             this.socket.connect(addr);
412         } catch (SocketOSException e) {
413             log.error(e.msg);
414             version(GEAR_DEBUG) error(e);
415             return false;
416         }
417         return true;
418     }
419 
420     void CancelWrite() {
421         _isWriteCancelling = true;
422     }
423 
424     bool isWriteCancelling() {
425         return _isWriteCancelling;
426     }
427 
428     DataReceivedHandler GetDataReceivedHandler() {
429         return dataReceivedHandler;
430     }
431 
432 }