1 /*
2  * Archttp - A highly performant web framework written in D.
3  *
4  * Copyright (C) 2021-2022 Kerisy.com
5  *
6  * Website: https://www.kerisy.com
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module geario.net.TcpStream;
13 
14 import geario.net.channel.Types;
15 import geario.net.TcpStreamOptions;
16 import geario.net.IoError;
17 
18 import nbuff;
19 
20 import geario.event.selector.Selector;
21 import geario.event;
22 import geario.Functions;
23 import geario.logging;
24 
25 import std.exception;
26 import std.format;
27 import std.socket;
28 import std.string;
29 
30 import core.atomic;
31 import core.stdc.errno;
32 import core.thread;
33 import core.time;
34 
35 version (HAVE_EPOLL) {
36     import core.sys.linux.netinet.tcp : TCP_KEEPCNT;
37 }
38 
39 
40 
41 /**
42  *
43  */
44 class TcpStream : AbstractStream {
45     SimpleEventHandler closeHandler;
46     protected shared bool _isConnected; // It's always true for server.
47 
48     protected TcpStreamOptions _tcpOption;
49     protected int retryCount = 0;
50 
51     // for client
52     this(Selector loop, TcpStreamOptions option = null, AddressFamily family = AddressFamily.INET) {
53         _isClient = true;
54         _isConnected = false;
55 
56         if (option is null)
57             _tcpOption = TcpStreamOptions.Create();
58         else
59             _tcpOption = option;
60         this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP);
61         // int value = 1;
62         // if (-1 == .setsockopt(socket.handle, cast(int) SocketOptionLevel.SOCKET, cast(int) SO_NOSIGPIPE, &value, cast(uint) value.sizeof))
63         //     throw new SocketOSException("Unable to set socket option");
64 
65         super(loop, family, _tcpOption.bufferSize);
66         version(GEAR_IO_DEBUG) log.trace("buffer size: %d bytes", _tcpOption.bufferSize);
67     }
68 
69     // for server
70     this(Selector loop, Socket socket, TcpStreamOptions option = null) {
71         if (option is null)
72             _tcpOption = TcpStreamOptions.Create();
73         else
74             _tcpOption = option;
75         this.socket = socket;
76         // int value = 1;
77         // if (-1 == .setsockopt(socket.handle, cast(int) SocketOptionLevel.SOCKET, cast(int) SO_NOSIGPIPE, &value, cast(uint) value.sizeof))
78         //     throw new SocketOSException("Unable to set socket option");
79 
80         super(loop, socket.addressFamily, _tcpOption.bufferSize);
81         _remoteAddress = socket.remoteAddress();
82         _localAddress = socket.localAddress();
83 
84         _isClient = false;
85         _isConnected = true;
86         SetKeepalive();
87     }
88 
89     void options(TcpStreamOptions option) @property {
90         assert(option !is null);
91         this._tcpOption = option;
92     }
93 
94     TcpStreamOptions options() @property {
95         return this._tcpOption;
96     }
97 
98     override bool IsBusy() {
99         return _isWritting;
100     }
101 
102     
103     override bool IsClient() {
104         return _isClient;
105     }
106 
107     void Connect(string hostname, ushort port) {
108         Address[] addresses = getAddress(hostname, port);
109         if(addresses is null) {
110             throw new SocketException("Can't resolve hostname: " ~ hostname);
111         }
112         Address selectedAddress;
113         foreach(Address addr; addresses) {
114             string ip = addr.toAddrString();
115             if(ip.startsWith("::")) // skip IPV6
116                 continue;
117             if(ip.length <= 16) {
118                 selectedAddress = addr;
119                 break;
120             }
121         }
122 
123         if(selectedAddress is null) {
124             log.warn("No IPV4 avaliable");
125             selectedAddress = addresses[0];
126         }
127         version(GEAR_IO_DEBUG) {
128             log.info("connecting with: hostname=%s, ip=%s, port=%d ", hostname, selectedAddress.toAddrString(), port);
129         }
130         Connect(selectedAddress); // always select the first one.
131     }
132 
133     void Connect(Address addr) {
134         if (_isConnected)
135             return;
136 
137         _remoteAddress = addr;
138         import std.parallelism;
139 
140         auto connectionTask = task(&DoConnect, addr);
141         taskPool.put(connectionTask);
142         // DoConnect(addr);
143     }
144 
145     void ReConnect() {
146         if (!_isClient) {
147             throw new Exception("Only client can call this method.");
148         }
149 
150         if (_isConnected || retryCount >= _tcpOption.retryTimes)
151             return;
152 
153         retryCount++;
154         _isConnected = false;
155         this.socket = new Socket(this._family, SocketType.STREAM, ProtocolType.TCP);
156 
157         version (GEAR_DEBUG)
158             log.trace("reconnecting %d...", retryCount);
159         Connect(_remoteAddress);
160     }
161 
162     protected override bool DoConnect(Address addr)  {
163         try {
164             version (GEAR_DEBUG)
165                 log.trace("Connecting to %s...", addr);
166             // Address binded = CreateAddress(this.socket.addressFamily);
167             // this.socket.Bind(binded);
168             version (HAVE_IOCP) {
169                 this.socket.blocking = false;
170                 Start();
171                 if(super.DoConnect(addr)) {
172                     this.socket.blocking = false;
173                     SetKeepalive();
174                     _localAddress = this.socket.localAddress();
175                     _isConnected = true;
176                 } else {
177                     ErrorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused");
178                     _isConnected = false;
179                 }
180             } else {
181                 this.socket.blocking = true;
182                 if(super.DoConnect(addr)) {
183                     this.socket.blocking = false;
184                     SetKeepalive();
185                     _localAddress = this.socket.localAddress();
186                     Start();
187                     _isConnected = true;
188                 } else {
189                     ErrorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused");
190                     _isConnected = false;
191                 }
192             }
193         } catch (Throwable ex) {
194             // Must try the best to catch all the exceptions, because it will be executed in another thread.
195             debug log.warn(ex.msg);
196             version(GEAR_DEBUG) log.warn(ex);
197             ErrorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused");
198             _isConnected = false;
199         } 
200 
201         if (_connectionHandler !is null) {
202             try {
203                 _connectionHandler(_isConnected);
204 
205             } catch(Throwable ex) {
206                 debug log.warn(ex.msg);
207                 version(GEAR_DEBUG) log.warn(ex);
208             }
209         }
210         return true;
211     }
212 
213     // www.tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO/
214     // http://www.importnew.com/27624.html
215     protected void SetKeepalive() {
216         version(GEAR_DEBUG) {
217             log.info("isKeepalive: %s, keepaliveTime: %d seconds, Interval: %d seconds", 
218                 _tcpOption.isKeepalive, _tcpOption.keepaliveTime, _tcpOption.keepaliveInterval);
219         }
220         version (HAVE_EPOLL) {
221             if (_tcpOption.isKeepalive) {
222                 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval);
223                 this.setOption(SocketOptionLevel.TCP,
224                         cast(SocketOption) TCP_KEEPCNT, _tcpOption.keepaliveProbes);
225                 // version (GEAR_DEBUG) CheckKeepAlive();
226             }
227         } else version (HAVE_IOCP) {
228             if (_tcpOption.isKeepalive) {
229                 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval);
230                 // this.setOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT,
231                 //     _tcpOption.keepaliveProbes);
232                 // version (GEAR_DEBUG) CheckKeepAlive();
233             }
234         }
235     }
236 
237     version (GEAR_DEBUG) protected void CheckKeepAlive() {
238         version (HAVE_EPOLL) {
239             int time;
240             int ret1 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPIDLE, time);
241             log.trace("ret=%d, time=%d", ret1, time);
242 
243             int interval;
244             int ret2 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPINTVL, interval);
245             log.trace("ret=%d, interval=%d", ret2, interval);
246 
247             int isKeep;
248             int ret3 = getOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, isKeep);
249             log.trace("ret=%d, keepalive=%s", ret3, isKeep == 1);
250 
251             int probe;
252             int ret4 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, probe);
253             log.trace("ret=%d, interval=%d", ret4, probe);
254         }
255     }
256 
257     TcpStream Connected(ConnectionHandler handler) {
258         _connectionHandler = handler;
259         return this;
260     }
261 
262     TcpStream Received(DataReceivedHandler handler) {
263         dataReceivedHandler = handler;
264         return this;
265     }
266 
267     TcpStream Writed(DataSendedHandler handler) {
268         dataSendedHandler = handler;
269         return this;
270     }
271     
272     TcpStream Closed(SimpleEventHandler handler) {
273         closeHandler = handler;
274         return this;
275     }
276 
277     TcpStream Disconnected(SimpleEventHandler handler) {
278         disconnectionHandler = handler;
279         return this;
280     }
281 
282     TcpStream Error(ErrorEventHandler handler) {
283         errorHandler = handler;
284         return this;
285     }
286 
287     override bool IsConnected() nothrow {
288         return _isConnected;
289     }
290 
291     override void Start() {
292         if (_isRegistered)
293             return;
294         _loop.Register(this);
295         _isRegistered = true;
296         version (HAVE_IOCP)
297         {
298         //    this.BeginRead();
299         }
300     }
301 
302     void Write(NbuffChunk bytes) {
303         assert(!bytes.empty());
304 
305         if (!_isConnected) {
306             throw new Exception(format("The connection %s closed!",
307                 this.RemoteAddress.toString()));
308         }
309 
310         version (GEAR_IO_DEBUG)
311             log.info("data buffered (%s bytes): fd=%d", cast(string)bytes.data, this.handle);
312         _isWritting = true;
313         InitializeWriteQueue();
314         _senddingBuffer.append(bytes);
315         OnWrite();
316     }
317 
318     /**
319      * 
320      */
321     void Write(const(ubyte)[] data) {
322         
323         NbuffChunk bytes = NbuffChunk(cast(string) data);
324 
325         version (GEAR_IO_DEBUG_MORE) {
326             log.info("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]);
327         } else  version (GEAR_IO_DEBUG) {
328             if (data.length <= 32)
329                 log.info("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]);
330             else
331                 log.info("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. 32]);
332         }
333 
334         if(data is null) {
335             version(GEAR_DEBUG) {
336                 log.warn("Writting a empty data on connection %s.", this.RemoteAddress.toString());
337             }
338             return;
339         }
340 
341         if (!_isConnected) {
342             string msg = format("The connection %s is closed!", this.RemoteAddress.toString());
343             throw new Exception(msg);
344         }
345 
346         version (HAVE_IOCP) {
347             return Write(bytes);
348         } else {
349 
350             if (_senddingBuffer.empty() && !_isWritting) {
351                 _isWritting = true;
352                 const(ubyte)[] d = data;
353 
354                 // while (!IsClosing() && !_isWriteCancelling && d.length > 0) {
355                 while(d !is null) {
356                     if(isWriteCancelling()) {
357                         _errorMessage = format("The connection %s is cancelled!", this.RemoteAddress.toString());
358                         _error = true;
359                         log.warn(_errorMessage);
360                         throw new Exception(_errorMessage);
361                         // break;
362                     }
363 
364                     if(IsClosing() || IsClosed()) {
365                         _errorMessage= format("The connection %s is closing or closed!", this.RemoteAddress.toString());
366                         _error = true;
367                         log.warn("%s, %s", IsClosing(), IsClosed());
368                         throw new Exception(_errorMessage);
369                         // break;
370                     }
371 
372                     version (GEAR_IO_DEBUG)
373                         log.info("to write directly %d bytes, fd=%d", d.length, this.handle);
374                     size_t nBytes = TryWrite(d);
375                     // call Writed handler?
376                     // dataSendedHandler(nBytes);
377 
378                     if (nBytes == d.length) {
379                         version (GEAR_IO_DEBUG)
380                             log.trace("write all out at once: %d / %d bytes, fd=%d", nBytes, d.length, this.handle);
381                         CheckAllWriteDone();
382                         break;
383                     } else if (nBytes > 0) {
384                         version (GEAR_IO_DEBUG)
385                             log.trace("write out partly: %d / %d bytes, fd=%d", nBytes, d.length, this.handle);
386                         d = d[nBytes .. $];
387                     } else {
388                         version (GEAR_IO_DEBUG)
389                             log.warn("buffering data: %d bytes, fd=%d", d.length, this.handle);
390                         InitializeWriteQueue();
391                         _senddingBuffer.append(bytes);
392                         break;
393                     }
394                 }
395             } else {
396                 Write(bytes);
397             }
398         }
399     }
400 
401     void ShutdownInput() {
402         this.socket.shutdown(SocketShutdown.RECEIVE);
403     }
404 
405     void ShutdownOutput() {
406         this.socket.shutdown(SocketShutdown.SEND);
407     }
408 
409     override protected void OnDisconnected() {
410         version(GEAR_DEBUG) {
411             log.info("peer disconnected: fd=%d", this.handle);
412         }
413         if (disconnectionHandler !is null)
414             disconnectionHandler();
415 
416         this.Close();
417     }
418 
419 protected:
420     bool _isClient;
421     ConnectionHandler _connectionHandler;
422 
423     override void OnRead() {
424         version (GEAR_IO_DEBUG)
425             Trace("start to read");
426 
427         version (Posix) {
428             // todo: new buffer 
429             while (!_isClosed && !TryRead()) {
430                 version (GEAR_IO_DEBUG)
431                     Trace("continue reading...");
432             }
433             // onDataReceived
434         } else {
435             if (!_isClosed)
436             {
437                 DoRead();
438             }
439 
440         }
441 
442         //if (this.isError) {
443         //    string msg = format("Socket Error on read: fd=%d, code=%d, message: %s",
444         //            this.handle, errno, this.errorMessage);
445         //    debug errorf(msg);
446         //    if (!IsClosed())
447         //        ErrorOccurred(msg);
448         //}
449     }
450 
451     override void OnClose() {
452         bool lastConnectStatus = _isConnected;
453         super.OnClose();
454         if(lastConnectStatus) {
455             version (GEAR_IO_DEBUG) {
456                 if (!_senddingBuffer.empty()) {
457                     log.warn("Some data has not been sent yet: fd=%d", this.handle);
458                 }
459             }
460             version(GEAR_DEBUG) {
461                 log.info("Closing a connection with: %s, fd=%d", this.RemoteAddress, this.handle);
462             }
463 
464             ResetWriteStatus();
465             _isConnected = false;
466             version (GEAR_IO_DEBUG) {
467                 log.info("Raising a event on a TCP stream [%s] is down: fd=%d", 
468                     this.RemoteAddress.toString(), this.handle);
469             }
470 
471             if (closeHandler !is null)
472                 closeHandler();
473         }
474     }
475 
476 }