1 module geario.net.channel.iocp.AbstractStream; 2 3 // dfmt off 4 version (HAVE_IOCP) : 5 // dfmt on 6 7 import geario.event.selector.Selector; 8 import geario.net.channel.AbstractSocketChannel; 9 import geario.net.channel.ChannelTask; 10 import geario.net.channel.Types; 11 import geario.net.channel.iocp.Common; 12 import geario.logging; 13 import geario.Functions; 14 import geario.event.selector.IOCP; 15 import geario.system.Error; 16 import geario.util.ThreadHelper; 17 import geario.util.worker; 18 import nbuff; 19 import core.atomic; 20 import core.sys.windows.windows; 21 import core.sys.windows.winsock2; 22 import core.sys.windows.mswsock; 23 import std.format; 24 import std.socket; 25 import std.string; 26 import core.stdc.string; 27 28 /** 29 TCP Peer 30 */ 31 abstract class AbstractStream : AbstractSocketChannel { 32 33 // data event handlers 34 35 /** 36 * Warning: The received data is stored a inner buffer. For a data safe, 37 * you would make a copy of it. 38 */ 39 protected DataReceivedHandler dataReceivedHandler; 40 protected DataSendedHandler dataSendedHandler; 41 protected SimpleActionHandler dataWriteDoneHandler; 42 43 protected NbuffChunk _bufferForRead; 44 protected AddressFamily _family; 45 46 private size_t _bufferSize = 4096; 47 private ChannelTask _task = null; 48 49 50 this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) { 51 _bufferSize = bufferSize; 52 super(loop, ChannelType.TCP); 53 // setFlag(ChannelFlag.Read, true); 54 // setFlag(ChannelFlag.Write, true); 55 56 // version (GEAR_IO_DEBUG) 57 // Trace("Buffer size: ", bufferSize); 58 // _readBuffer = new ubyte[bufferSize]; 59 // _readBytes = Nbuff.get(_bufferSize); 60 //_bufferForRead = BufferUtils.allocate(bufferSize); 61 62 //_readBuffer = cast(ubyte[])_bufferForRead.toString(); 63 // _writeQueue = new WritingBufferQueue(); 64 // this.socket = new TcpSocket(family); 65 66 loadWinsockExtension(this.handle); 67 } 68 69 mixin CheckIocpError; 70 71 abstract bool IsClient(); 72 73 override void OnRead() { 74 version (GEAR_IO_DEBUG) 75 Trace("ready to read"); 76 super.OnRead(); 77 } 78 79 /** 80 * Should be thread-safe. 81 */ 82 override void OnWrite() { 83 version (GEAR_IO_DEBUG) 84 log.trace("checking write status, isWritting: %s, writeBytes: %s", _isWritting, writeBytes is null); 85 86 //if(!_isWritting){ 87 // version (GEAR_IO_DEBUG) log.info("No data to write out. fd=%d", this.handle); 88 // return; 89 //} 90 91 if(IsClosing() && _isWriteCancelling) { 92 version (GEAR_IO_DEBUG) log.info("Write cancelled, fd=%d", this.handle); 93 ResetWriteStatus(); 94 return; 95 } 96 TryNextBufferWrite(); 97 } 98 99 protected override void OnClose() { 100 _isWritting = false; 101 ResetWriteStatus(); 102 if(this._socket is null) { 103 import core.sys.windows.winsock2; 104 .closesocket(this.handle); 105 } else { 106 // FIXME: Needing refactor or cleanup -@Administrator at 2019/8/9 1:20:27 pm 107 // 108 //while(!_isSingleWriteBusy) 109 //{ 110 this._socket.shutdown(SocketShutdown.BOTH); 111 this._socket.close(); 112 //} 113 } 114 super.OnClose(); 115 } 116 117 void BeginRead() { 118 // https://docs.microsoft.com/en-us/windows/desktop/api/winsock2/nf-winsock2-wsarecv 119 /// _isSingleWriteBusy = true; 120 auto b = Nbuff.get(_bufferSize); 121 122 _readBuffer = b.data(); 123 124 WSABUF _dataReadBuffer; 125 _dataReadBuffer.len = cast(uint) _readBuffer.length; 126 _dataReadBuffer.buf = cast(char*) _readBuffer.ptr; 127 memset( &_iocpread.overlapped , 0, _iocpread.overlapped.sizeof ); 128 _iocpread.channel = this; 129 _iocpread.operation = IocpOperation.read; 130 DWORD dwReceived = 0; 131 DWORD dwFlags = 0; 132 version (GEAR_IO_DEBUG) 133 log.trace("start receiving [fd=%d] ", this.socket.handle); 134 // _isSingleWriteBusy = true; 135 int nRet = WSARecv(cast(SOCKET) this.socket.handle, &_dataReadBuffer, 1u, &dwReceived, &dwFlags, &_iocpread.overlapped, cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null); 136 137 if (nRet == SOCKET_ERROR && (GetLastError() != ERROR_IO_PENDING)) { 138 _isSingleWriteBusy = false; 139 Close(); 140 } 141 //checkErro(nRet, SOCKET_ERROR); 142 } 143 144 protected bool DoConnect(Address addr) { 145 Address binded = CreateAddress(this.socket.addressFamily); 146 _isSingleWriteBusy = true; 147 this.socket.bind(binded); 148 _iocpread.channel = this; 149 _iocpread.operation = IocpOperation.connect; 150 151 import std.datetime.stopwatch; 152 auto sw = StopWatch(AutoStart.yes); 153 sw.start(); 154 scope(exit) { 155 sw.stop(); 156 } 157 158 // https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nc-mswsock-lpfn_connectex 159 int nRet = ConnectEx(cast(SOCKET) this.socket.handle(), cast(SOCKADDR*) addr.name(), 160 addr.nameLen(), null, 0, null, &_iocpread.overlapped); 161 checkErro(nRet, SOCKET_ERROR); 162 163 if(this._error) 164 return false; 165 166 // https://docs.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-getsockopt 167 int seconds = 0; 168 int bytes = seconds.sizeof; 169 int iResult = 0; 170 171 CHECK: 172 iResult = getsockopt(cast(SOCKET) this.socket.handle(), SOL_SOCKET, SO_CONNECT_TIME, 173 cast(void*)&seconds, cast(PINT)&bytes); 174 175 bool result = false; 176 if ( iResult != NO_ERROR ) { 177 DWORD dwLastError = WSAGetLastError(); 178 log.warn("getsockopt(SO_CONNECT_TIME) failed with Error: code=%d, message=%s", 179 dwLastError, GetErrorMessage(dwLastError)); 180 } else { 181 if (seconds == 0xFFFFFFFF) { 182 version(GEAR_IO_DEBUG) log.warn("Connection not established yet (destination: %s).", addr); 183 // so to check again 184 goto CHECK; 185 } else { 186 result = true; 187 version(GEAR_IO_DEBUG) { 188 // 189 log.info("Connection has been established in %d msecs, destination: %s", sw.peek.total!"msecs", addr); 190 } 191 // https://docs.microsoft.com/en-us/windows/win32/winsock/sol-socket-socket-options 192 enum SO_UPDATE_CONNECT_CONTEXT = 0x7010; 193 iResult = setsockopt(cast(SOCKET) this.socket.handle(), SOL_SOCKET, 194 SO_UPDATE_CONNECT_CONTEXT, NULL, 0 ); 195 } 196 } 197 198 return result; 199 } 200 201 private uint DoWrite(const(ubyte)[] data) { 202 DWORD dwSent = 0;//cast(DWORD)data.length; 203 DWORD dwFlags = 0; 204 205 memset(&_iocpwrite.overlapped , 0 ,_iocpwrite.overlapped.sizeof ); 206 _iocpwrite.channel = this; 207 _iocpwrite.operation = IocpOperation.write; 208 // log.trace("To write %d bytes, fd=%d", data.length, this.socket.handle()); 209 version (GEAR_IO_DEBUG) { 210 size_t bufferLength = data.length; 211 log.trace("To write %d bytes", bufferLength); 212 if (bufferLength > 32) 213 log.trace("%(%02X %) ...", data[0 .. 32]); 214 else 215 log.trace("%s", data); 216 } 217 // size_t bufferLength = data.length; 218 // log.trace("To write %d bytes", bufferLength); 219 // log.trace("%s", data); 220 WSABUF _dataWriteBuffer; 221 222 //char[] bf = new char[data.length]; 223 //memcpy(bf.ptr,data.ptr,data.length); 224 //_dataWriteBuffer.buf = bf.ptr; 225 _dataWriteBuffer.buf = cast(char*) data.ptr; 226 _dataWriteBuffer.len = cast(uint) data.length; 227 // _isSingleWriteBusy = true; 228 int nRet = WSASend( cast(SOCKET) this.socket.handle(), &_dataWriteBuffer, 1, &dwSent, 229 dwFlags, &_iocpwrite.overlapped, cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null); 230 // if (nRet != NO_ERROR && (GetLastError() != ERROR_IO_PENDING)) 231 // { 232 // _isSingleWriteBusy = false; 233 // // Close(); 234 // } 235 236 checkErro( nRet, SOCKET_ERROR); 237 238 // FIXME: Needing refactor or cleanup -@Administrator at 2019/8/9 12:18:20 pm 239 // Keep this to prevent the buffer corrupted. Why? 240 version (GEAR_IO_DEBUG) { 241 log.trace("sent: %d / %d bytes, fd=%d", dwSent, bufferLength, this.handle); 242 } 243 244 if (this.IsError) { 245 log.error("Socket Error on write: fd=%d, message=%s", this.handle, this.ErrorMessage); 246 this.Close(); 247 } 248 249 return dwSent; 250 } 251 252 protected void DoRead() { 253 //_isSingleWriteBusy = false; 254 this.ClearError(); 255 version (GEAR_IO_DEBUG) 256 log.trace("start reading: %d nbytes", this.readLen); 257 258 if (readLen > 0) { 259 // import std.stdio; 260 // writefln("length=%d, data: %(%02X %)", readLen, _readBuffer[0 .. readLen]); 261 HandleReceivedData(readLen); 262 263 // Go on reading 264 this.BeginRead(); 265 266 } else if (readLen == 0) { 267 version (GEAR_IO_DEBUG) { 268 if (_remoteAddress !is null) 269 log.warn("connection broken: %s", _remoteAddress.toString()); 270 } 271 OnDisconnected(); 272 // if (_isClosed) 273 // this.Close(); 274 } else { 275 version (GEAR_IO_DEBUG) { 276 log.warn("undefined behavior on thread %d", GetTid()); 277 } else { 278 this._error = true; 279 this._errorMessage = "undefined behavior on thread"; 280 } 281 } 282 } 283 284 private void HandleReceivedData(ptrdiff_t len) { 285 version (GEAR_IO_DEBUG) 286 log.trace("reading done: %d nbytes", readLen); 287 288 if (dataReceivedHandler is null) 289 return; 290 291 // _bufferForRead.limit(cast(int)readLen); 292 // _bufferForRead.position(0); 293 // dataReceivedHandler(_bufferForRead); 294 295 // Bytes bufferCopy; 296 import std.algorithm : copy; 297 auto buffer = Nbuff.get(len); 298 copy((cast(string)_readBuffer[0 .. len]).representation, buffer.data); 299 300 NbuffChunk bytes = NbuffChunk(buffer, len); 301 302 // bufferCopy.opAssign(_bufferForRead); 303 if(taskWorker is null) { 304 dataReceivedHandler(bytes); 305 } else { 306 ChannelTask task = _task; 307 308 // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-02-05T09:18:02+08:00 309 // More tests needed 310 if(task is null || task.IsFinishing()) { 311 task = CreateChannelTask(); 312 _task = task; 313 314 } else { 315 version(GEAR_METRIC) { 316 log.warn("Request peeding... Task status: %s", task.status); 317 } 318 } 319 320 task.put(bytes); 321 } 322 } 323 324 private ChannelTask CreateChannelTask() { 325 ChannelTask task = new ChannelTask(); 326 task.dataReceivedHandler = dataReceivedHandler; 327 taskWorker.put(task); 328 return task; 329 } 330 331 // try to write a block of data directly 332 protected size_t TryWrite(const ubyte[] data) { 333 version (GEAR_IO_DEBUG) 334 log.trace("start to write, total=%d bytes, fd=%d", data.length, this.handle); 335 ClearError(); 336 size_t nBytes; 337 //scope(exit) { 338 // _isSingleWriteBusy = false; 339 //} 340 if (!_isSingleWriteBusy) 341 { 342 nBytes = DoWrite(data); 343 } 344 345 return nBytes; 346 } 347 348 // try to write a block of data from the write queue 349 private void TryNextBufferWrite() { 350 if(CheckAllWriteDone()){ 351 _isSingleWriteBusy = false; 352 // if (!IsClient()) 353 // { 354 // this.BeginRead(); 355 // } 356 return; 357 } 358 359 // keep thread-safe here 360 //if(!cas(&_isSingleWriteBusy, false, true)) { 361 // version (GEAR_IO_DEBUG) log.warn("busy writing. fd=%d", this.handle); 362 // return; 363 //} 364 365 //scope(exit) { 366 // _isSingleWriteBusy = false; 367 //} 368 369 ClearError(); 370 371 // bool haveBuffer = _writeQueue.TryDequeue(writeBytes); 372 writeBytes = _senddingBuffer.frontChunk(); 373 _senddingBuffer.popChunk(); 374 WriteBufferRemaining(); 375 } 376 377 private void WriteBufferRemaining() { 378 if ( writeBytes.empty() ) 379 { 380 return; 381 } 382 const(ubyte)[] data = cast(const(ubyte)[])writeBytes.data(); 383 384 size_t nBytes = DoWrite(data); 385 386 version (GEAR_IO_DEBUG) 387 log.trace("written data: %d bytes, fd=%d", nBytes, this.handle); 388 if(nBytes == data.length) { 389 writeBytes.popBackN(writeBytes.length); 390 } else if (nBytes > 0) { 391 writeBytes.popFrontN(nBytes); 392 version (GEAR_IO_DEBUG) 393 log.warn("remaining data: %d / %d, fd=%d", data.length - nBytes, data.length, this.handle); 394 } else { 395 version (GEAR_IO_DEBUG) 396 log.warn("I/O busy: writing. fd=%d", this.handle); 397 } 398 } 399 400 protected bool CheckAllWriteDone() 401 { 402 if ( _senddingBuffer.empty() && writeBytes.empty() ) 403 { 404 ResetWriteStatus(); 405 406 version (GEAR_IO_DEBUG) 407 log.trace("All data are written out. fd=%d", this.handle); 408 409 if(dataWriteDoneHandler !is null) 410 dataWriteDoneHandler(this); 411 412 return true; 413 } 414 415 return false; 416 } 417 418 void ResetWriteStatus() 419 { 420 if(!_senddingBuffer.empty()) 421 _senddingBuffer.clear(); 422 423 _isWritting = false; 424 _isWriteCancelling = false; 425 sendDataBuffer = null; 426 sendDataBackupBuffer = null; 427 if (!writeBytes.empty()) 428 writeBytes.popBackN(writeBytes.length); 429 _isSingleWriteBusy = false; 430 } 431 432 /** 433 * Called by selector after data sent 434 * Note: It's only for IOCP selector: 435 */ 436 void OnWriteDone(size_t nBytes) { 437 version (GEAR_IO_DEBUG) { 438 log.trace("write done once: %d bytes, isWritting: %s, writeBytes: %s, fd=%d", 439 nBytes, _isWritting, writeBytes is null, this.handle); 440 } 441 //if (_isWriteCancelling) { 442 // version (GEAR_IO_DEBUG) log.trace("write cancelled."); 443 // ResetWriteStatus(); 444 // return; 445 //} 446 447 448 //while(_isSingleWriteBusy) { 449 // version(GEAR_IO_DEBUG) 450 // Info("waiting for last writting get finished..."); 451 //} 452 453 version (GEAR_IO_DEBUG) { 454 log.trace("write done once: %d bytes, isWritting: %s, writeBytes: %s, fd=%d", 455 nBytes, _isWritting, writeBytes is null, this.handle); 456 } 457 458 if (!writeBytes.empty()) { 459 version (GEAR_IO_DEBUG) log.trace("try to write the remaining in buffer."); 460 WriteBufferRemaining(); 461 } else { 462 version (GEAR_IO_DEBUG) log.trace("try to write next buffer."); 463 TryNextBufferWrite(); 464 } 465 } 466 467 private void NotifyDataWrittenDone() { 468 if(dataWriteDoneHandler !is null && _senddingBuffer.empty) { 469 dataWriteDoneHandler(this); 470 } 471 } 472 473 DataReceivedHandler GetDataReceivedHandler() { 474 return dataReceivedHandler; 475 } 476 477 void CancelWrite() { 478 _isWriteCancelling = true; 479 } 480 481 abstract bool IsConnected() nothrow; 482 abstract protected void OnDisconnected(); 483 484 protected void InitializeWriteQueue() { 485 // if (_writeQueue is null) { 486 // _writeQueue = new WritingBufferQueue(); 487 // } 488 } 489 490 SimpleEventHandler disconnectionHandler; 491 492 // protected WritingBufferQueue _writeQueue; 493 protected Nbuff _senddingBuffer; 494 protected bool _isWriteCancelling = false; 495 private bool _isSingleWriteBusy = false; // keep a single I/O write operation atomic 496 private NbuffChunk _readBytes; 497 private ubyte[] _readBuffer; 498 private const(ubyte)[] sendDataBuffer; 499 private const(ubyte)[] sendDataBackupBuffer; 500 private NbuffChunk writeBytes; 501 502 private IocpContext _iocpread; 503 private IocpContext _iocpwrite; 504 }