1 /* 2 * Archttp - A highly performant web framework written in D. 3 * 4 * Copyright (C) 2021-2022 Kerisy.com 5 * 6 * Website: https://www.kerisy.com 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module geario.net.TcpStream; 13 14 import geario.net.channel.Types; 15 import geario.net.TcpStreamOptions; 16 import geario.net.IoError; 17 18 import nbuff; 19 20 import geario.event.selector.Selector; 21 import geario.event; 22 import geario.Functions; 23 import geario.logging; 24 25 import std.exception; 26 import std.format; 27 import std.socket; 28 import std.string; 29 30 import core.atomic; 31 import core.stdc.errno; 32 import core.thread; 33 import core.time; 34 35 version (HAVE_EPOLL) { 36 import core.sys.linux.netinet.tcp : TCP_KEEPCNT; 37 } 38 39 40 41 /** 42 * 43 */ 44 class TcpStream : AbstractStream { 45 SimpleEventHandler closeHandler; 46 protected shared bool _isConnected; // It's always true for server. 47 48 protected TcpStreamOptions _tcpOption; 49 protected int retryCount = 0; 50 51 // for client 52 this(Selector loop, TcpStreamOptions option = null, AddressFamily family = AddressFamily.INET) { 53 _isClient = true; 54 _isConnected = false; 55 56 if (option is null) 57 _tcpOption = TcpStreamOptions.Create(); 58 else 59 _tcpOption = option; 60 this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP); 61 // int value = 1; 62 // if (-1 == .setsockopt(socket.handle, cast(int) SocketOptionLevel.SOCKET, cast(int) SO_NOSIGPIPE, &value, cast(uint) value.sizeof)) 63 // throw new SocketOSException("Unable to set socket option"); 64 65 super(loop, family, _tcpOption.bufferSize); 66 version(GEAR_IO_DEBUG) log.trace("buffer size: %d bytes", _tcpOption.bufferSize); 67 } 68 69 // for server 70 this(Selector loop, Socket socket, TcpStreamOptions option = null) { 71 if (option is null) 72 _tcpOption = TcpStreamOptions.Create(); 73 else 74 _tcpOption = option; 75 this.socket = socket; 76 // int value = 1; 77 // if (-1 == .setsockopt(socket.handle, cast(int) SocketOptionLevel.SOCKET, cast(int) SO_NOSIGPIPE, &value, cast(uint) value.sizeof)) 78 // throw new SocketOSException("Unable to set socket option"); 79 80 super(loop, socket.addressFamily, _tcpOption.bufferSize); 81 _remoteAddress = socket.remoteAddress(); 82 _localAddress = socket.localAddress(); 83 84 _isClient = false; 85 _isConnected = true; 86 SetKeepalive(); 87 } 88 89 void options(TcpStreamOptions option) @property { 90 assert(option !is null); 91 this._tcpOption = option; 92 } 93 94 TcpStreamOptions options() @property { 95 return this._tcpOption; 96 } 97 98 override bool IsBusy() { 99 return _isWritting; 100 } 101 102 103 override bool IsClient() { 104 return _isClient; 105 } 106 107 void Connect(string hostname, ushort port) { 108 Address[] addresses = getAddress(hostname, port); 109 if(addresses is null) { 110 throw new SocketException("Can't resolve hostname: " ~ hostname); 111 } 112 Address selectedAddress; 113 foreach(Address addr; addresses) { 114 string ip = addr.toAddrString(); 115 if(ip.startsWith("::")) // skip IPV6 116 continue; 117 if(ip.length <= 16) { 118 selectedAddress = addr; 119 break; 120 } 121 } 122 123 if(selectedAddress is null) { 124 log.warn("No IPV4 avaliable"); 125 selectedAddress = addresses[0]; 126 } 127 version(GEAR_IO_DEBUG) { 128 log.info("connecting with: hostname=%s, ip=%s, port=%d ", hostname, selectedAddress.toAddrString(), port); 129 } 130 Connect(selectedAddress); // always select the first one. 131 } 132 133 void Connect(Address addr) { 134 if (_isConnected) 135 return; 136 137 _remoteAddress = addr; 138 import std.parallelism; 139 140 auto connectionTask = task(&DoConnect, addr); 141 taskPool.put(connectionTask); 142 // DoConnect(addr); 143 } 144 145 void ReConnect() { 146 if (!_isClient) { 147 throw new Exception("Only client can call this method."); 148 } 149 150 if (_isConnected || retryCount >= _tcpOption.retryTimes) 151 return; 152 153 retryCount++; 154 _isConnected = false; 155 this.socket = new Socket(this._family, SocketType.STREAM, ProtocolType.TCP); 156 157 version (GEAR_DEBUG) 158 log.trace("reconnecting %d...", retryCount); 159 Connect(_remoteAddress); 160 } 161 162 protected override bool DoConnect(Address addr) { 163 try { 164 version (GEAR_DEBUG) 165 log.trace("Connecting to %s...", addr); 166 // Address binded = CreateAddress(this.socket.addressFamily); 167 // this.socket.Bind(binded); 168 version (HAVE_IOCP) { 169 this.socket.blocking = false; 170 Start(); 171 if(super.DoConnect(addr)) { 172 this.socket.blocking = false; 173 SetKeepalive(); 174 _localAddress = this.socket.localAddress(); 175 _isConnected = true; 176 } else { 177 ErrorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused"); 178 _isConnected = false; 179 } 180 } else { 181 this.socket.blocking = true; 182 if(super.DoConnect(addr)) { 183 this.socket.blocking = false; 184 SetKeepalive(); 185 _localAddress = this.socket.localAddress(); 186 Start(); 187 _isConnected = true; 188 } else { 189 ErrorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused"); 190 _isConnected = false; 191 } 192 } 193 } catch (Throwable ex) { 194 // Must try the best to catch all the exceptions, because it will be executed in another thread. 195 debug log.warn(ex.msg); 196 version(GEAR_DEBUG) log.warn(ex); 197 ErrorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused"); 198 _isConnected = false; 199 } 200 201 if (_connectionHandler !is null) { 202 try { 203 _connectionHandler(_isConnected); 204 205 } catch(Throwable ex) { 206 debug log.warn(ex.msg); 207 version(GEAR_DEBUG) log.warn(ex); 208 } 209 } 210 return true; 211 } 212 213 // www.tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO/ 214 // http://www.importnew.com/27624.html 215 protected void SetKeepalive() { 216 version(GEAR_DEBUG) { 217 log.info("isKeepalive: %s, keepaliveTime: %d seconds, Interval: %d seconds", 218 _tcpOption.isKeepalive, _tcpOption.keepaliveTime, _tcpOption.keepaliveInterval); 219 } 220 version (HAVE_EPOLL) { 221 if (_tcpOption.isKeepalive) { 222 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval); 223 this.setOption(SocketOptionLevel.TCP, 224 cast(SocketOption) TCP_KEEPCNT, _tcpOption.keepaliveProbes); 225 // version (GEAR_DEBUG) CheckKeepAlive(); 226 } 227 } else version (HAVE_IOCP) { 228 if (_tcpOption.isKeepalive) { 229 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval); 230 // this.setOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, 231 // _tcpOption.keepaliveProbes); 232 // version (GEAR_DEBUG) CheckKeepAlive(); 233 } 234 } 235 } 236 237 version (GEAR_DEBUG) protected void CheckKeepAlive() { 238 version (HAVE_EPOLL) { 239 int time; 240 int ret1 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPIDLE, time); 241 log.trace("ret=%d, time=%d", ret1, time); 242 243 int interval; 244 int ret2 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPINTVL, interval); 245 log.trace("ret=%d, interval=%d", ret2, interval); 246 247 int isKeep; 248 int ret3 = getOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, isKeep); 249 log.trace("ret=%d, keepalive=%s", ret3, isKeep == 1); 250 251 int probe; 252 int ret4 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, probe); 253 log.trace("ret=%d, interval=%d", ret4, probe); 254 } 255 } 256 257 TcpStream Connected(ConnectionHandler handler) { 258 _connectionHandler = handler; 259 return this; 260 } 261 262 TcpStream Received(DataReceivedHandler handler) { 263 dataReceivedHandler = handler; 264 return this; 265 } 266 267 TcpStream Writed(DataSendedHandler handler) { 268 dataSendedHandler = handler; 269 return this; 270 } 271 272 TcpStream Closed(SimpleEventHandler handler) { 273 closeHandler = handler; 274 return this; 275 } 276 277 TcpStream Disconnected(SimpleEventHandler handler) { 278 disconnectionHandler = handler; 279 return this; 280 } 281 282 TcpStream Error(ErrorEventHandler handler) { 283 errorHandler = handler; 284 return this; 285 } 286 287 override bool IsConnected() nothrow { 288 return _isConnected; 289 } 290 291 override void Start() { 292 if (_isRegistered) 293 return; 294 _loop.Register(this); 295 _isRegistered = true; 296 version (HAVE_IOCP) 297 { 298 // this.BeginRead(); 299 } 300 } 301 302 void Write(NbuffChunk bytes) { 303 assert(!bytes.empty()); 304 305 if (!_isConnected) { 306 throw new Exception(format("The connection %s closed!", 307 this.RemoteAddress.toString())); 308 } 309 310 version (GEAR_IO_DEBUG) 311 log.info("data buffered (%s bytes): fd=%d", cast(string)bytes.data, this.handle); 312 _isWritting = true; 313 InitializeWriteQueue(); 314 _senddingBuffer.append(bytes); 315 OnWrite(); 316 } 317 318 /** 319 * 320 */ 321 void Write(const(ubyte)[] data) { 322 323 NbuffChunk bytes = NbuffChunk(cast(string) data); 324 325 version (GEAR_IO_DEBUG_MORE) { 326 log.info("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]); 327 } else version (GEAR_IO_DEBUG) { 328 if (data.length <= 32) 329 log.info("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]); 330 else 331 log.info("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. 32]); 332 } 333 334 if(data is null) { 335 version(GEAR_DEBUG) { 336 log.warn("Writting a empty data on connection %s.", this.RemoteAddress.toString()); 337 } 338 return; 339 } 340 341 if (!_isConnected) { 342 string msg = format("The connection %s is closed!", this.RemoteAddress.toString()); 343 throw new Exception(msg); 344 } 345 346 version (HAVE_IOCP) { 347 return Write(bytes); 348 } else { 349 350 if (_senddingBuffer.empty() && !_isWritting) { 351 _isWritting = true; 352 const(ubyte)[] d = data; 353 354 // while (!IsClosing() && !_isWriteCancelling && d.length > 0) { 355 while(d !is null) { 356 if(isWriteCancelling()) { 357 _errorMessage = format("The connection %s is cancelled!", this.RemoteAddress.toString()); 358 _error = true; 359 log.warn(_errorMessage); 360 throw new Exception(_errorMessage); 361 // break; 362 } 363 364 if(IsClosing() || IsClosed()) { 365 _errorMessage= format("The connection %s is closing or closed!", this.RemoteAddress.toString()); 366 _error = true; 367 log.warn("%s, %s", IsClosing(), IsClosed()); 368 throw new Exception(_errorMessage); 369 // break; 370 } 371 372 version (GEAR_IO_DEBUG) 373 log.info("to write directly %d bytes, fd=%d", d.length, this.handle); 374 size_t nBytes = TryWrite(d); 375 // call Writed handler? 376 // dataSendedHandler(nBytes); 377 378 if (nBytes == d.length) { 379 version (GEAR_IO_DEBUG) 380 log.trace("write all out at once: %d / %d bytes, fd=%d", nBytes, d.length, this.handle); 381 CheckAllWriteDone(); 382 break; 383 } else if (nBytes > 0) { 384 version (GEAR_IO_DEBUG) 385 log.trace("write out partly: %d / %d bytes, fd=%d", nBytes, d.length, this.handle); 386 d = d[nBytes .. $]; 387 } else { 388 version (GEAR_IO_DEBUG) 389 log.warn("buffering data: %d bytes, fd=%d", d.length, this.handle); 390 InitializeWriteQueue(); 391 _senddingBuffer.append(bytes); 392 break; 393 } 394 } 395 } else { 396 Write(bytes); 397 } 398 } 399 } 400 401 void ShutdownInput() { 402 this.socket.shutdown(SocketShutdown.RECEIVE); 403 } 404 405 void ShutdownOutput() { 406 this.socket.shutdown(SocketShutdown.SEND); 407 } 408 409 override protected void OnDisconnected() { 410 version(GEAR_DEBUG) { 411 log.info("peer disconnected: fd=%d", this.handle); 412 } 413 if (disconnectionHandler !is null) 414 disconnectionHandler(); 415 416 this.Close(); 417 } 418 419 protected: 420 bool _isClient; 421 ConnectionHandler _connectionHandler; 422 423 override void OnRead() { 424 version (GEAR_IO_DEBUG) 425 Trace("start to read"); 426 427 version (Posix) { 428 // todo: new buffer 429 while (!_isClosed && !TryRead()) { 430 version (GEAR_IO_DEBUG) 431 Trace("continue reading..."); 432 } 433 // onDataReceived 434 } else { 435 if (!_isClosed) 436 { 437 DoRead(); 438 } 439 440 } 441 442 //if (this.isError) { 443 // string msg = format("Socket Error on read: fd=%d, code=%d, message: %s", 444 // this.handle, errno, this.errorMessage); 445 // debug errorf(msg); 446 // if (!IsClosed()) 447 // ErrorOccurred(msg); 448 //} 449 } 450 451 override void OnClose() { 452 bool lastConnectStatus = _isConnected; 453 super.OnClose(); 454 if(lastConnectStatus) { 455 version (GEAR_IO_DEBUG) { 456 if (!_senddingBuffer.empty()) { 457 log.warn("Some data has not been sent yet: fd=%d", this.handle); 458 } 459 } 460 version(GEAR_DEBUG) { 461 log.info("Closing a connection with: %s, fd=%d", this.RemoteAddress, this.handle); 462 } 463 464 ResetWriteStatus(); 465 _isConnected = false; 466 version (GEAR_IO_DEBUG) { 467 log.info("Raising a event on a TCP stream [%s] is down: fd=%d", 468 this.RemoteAddress.toString(), this.handle); 469 } 470 471 if (closeHandler !is null) 472 closeHandler(); 473 } 474 } 475 476 }