1 module geario.net.channel.posix.AbstractStream; 2 3 // dfmt off 4 version(Posix): 5 // dfmt on 6 7 import geario.event.selector.Selector; 8 import geario.Functions; 9 import geario.net.channel.AbstractSocketChannel; 10 import geario.net.channel.ChannelTask; 11 import geario.net.channel.Types; 12 import geario.net.IoError; 13 import geario.logging; 14 import geario.system.Error; 15 import geario.util.worker; 16 17 import nbuff; 18 19 import std.format; 20 import std.socket; 21 22 import core.atomic; 23 import core.stdc.errno; 24 import core.stdc.string; 25 26 import core.sys.posix.sys.socket : accept; 27 import core.sys.posix.unistd; 28 29 /** 30 TCP Peer 31 */ 32 abstract class AbstractStream : AbstractSocketChannel { 33 protected size_t _bufferSize = 4096; 34 private NbuffChunk _writeBytes; 35 private ChannelTask _task = null; 36 size_t _receivedLen = 0; 37 38 /** 39 * Warning: The received data is stored a inner buffer. For a data safe, 40 * you would make a copy of it. 41 */ 42 protected DataReceivedHandler dataReceivedHandler; 43 protected DataSendedHandler dataSendedHandler; 44 protected SimpleEventHandler disconnectionHandler; 45 protected SimpleActionHandler dataWriteDoneHandler; 46 47 protected AddressFamily _family; 48 // protected Buffer _bufferForRead; 49 // protected WritingBufferQueue _writeQueue; 50 protected Nbuff _senddingBuffer; 51 protected bool _isWriteCancelling = false; 52 53 this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) { 54 this._family = family; 55 _bufferSize = bufferSize; 56 super(loop, ChannelType.TCP); 57 setFlag(ChannelFlag.Read, true); 58 setFlag(ChannelFlag.Write, true); 59 setFlag(ChannelFlag.ETMode, true); 60 } 61 62 abstract bool IsClient(); 63 abstract bool IsConnected() nothrow; 64 abstract protected void OnDisconnected(); 65 66 private void onDataReceived(NbuffChunk bytes) { 67 if(taskWorker is null) { 68 // TODO: Tasks pending completion -@zhangxueping at 2021-03-09T09:59:00+08:00 69 // Using memory pool 70 if (dataReceivedHandler !is null) { 71 dataReceivedHandler(bytes); 72 } 73 } else { 74 ChannelTask task = _task; 75 76 // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-02-05T09:18:02+08:00 77 // More tests needed 78 if(task is null || task.IsFinishing()) { 79 task = CreateChannelTask(); 80 _task = task; 81 82 } else { 83 version(GEAR_METRIC) { 84 log.warn("Request peeding... Task status: %s", task.Status); 85 } 86 } 87 88 task.put(bytes); 89 } 90 } 91 92 private ChannelTask CreateChannelTask() { 93 ChannelTask task = new ChannelTask(); 94 task.dataReceivedHandler = dataReceivedHandler; 95 taskWorker.put(task); 96 return task; 97 } 98 99 /** 100 * 101 */ 102 protected bool TryRead() { 103 bool isDone = true; 104 this.ClearError(); 105 106 // TODO: Tasks pending completion -@zhangxueping at 2021-03-09T09:59:00+08:00 107 // Using memory pool 108 109 // auto readBuffer = Buffer.Get(_bufferSize); 110 // auto readBufferSpace = readBuffer.data(); 111 // NbuffChunk buffer = Nbuff.get(_bufferSize); 112 auto buffer = Nbuff.get(_bufferSize); 113 // ubyte[] readData = buffer.AsArray(); 114 115 // TODO : loop read data 116 ptrdiff_t len = read(this.handle, cast(void*)buffer.ptr, _bufferSize); 117 118 // ubyte[] rb = new ubyte[BufferSize]; 119 // ptrdiff_t len = read(this.handle, cast(void*) rb.ptr, rb.length); 120 version (GEAR_IO_DEBUG) log.trace("reading[fd=%d]: %d bytes", this.handle, len); 121 122 if (len > 0) 123 { 124 version(GEAR_IO_DEBUG) 125 { 126 if (len <= 32) 127 log.info("fd: %d, %d bytes: %(%02X %)", this.handle, len, buffer[0 .. len]); 128 else 129 log.info("fd: %d, 32/%d bytes: %(%02X %)", this.handle, len, buffer[0 .. 32]); 130 } 131 132 // buffer.ReaderIndex(0); 133 // buffer.WriterIndex(len); 134 onDataReceived(NbuffChunk(buffer, len)); 135 136 // It's prossible that there are more data waitting for read in the read I/O space. 137 if (len == _bufferSize) { 138 version (GEAR_IO_DEBUG) log.info("Read buffer is full read %d bytes. Need to read again.", len); 139 isDone = false; 140 } 141 } 142 else if (len == Socket.ERROR) 143 { 144 // https://stackoverflow.com/questions/14595269/errno-35-eagain-returned-on-recv-call 145 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13 146 // check more Error status 147 this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK; 148 if (_error) 149 { 150 this._errorMessage = GetErrorMessage(errno); 151 152 if(errno == ECONNRESET) 153 { 154 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean 155 OnDisconnected(); 156 // ErrorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer"); 157 } 158 else 159 { 160 ErrorOccurred(ErrorCode.INTERRUPTED , "Error occurred on read"); 161 } 162 } 163 } 164 else 165 { 166 version (GEAR_DEBUG) log.info("connection broken: %s, fd:%d", _remoteAddress.toString(), this.handle); 167 168 OnDisconnected(); 169 } 170 171 return isDone; 172 } 173 174 override protected void DoClose() 175 { 176 version (GEAR_IO_DEBUG) log.info("peer socket %s closing: fd=%d", this.RemoteAddress.toString(), this.handle); 177 178 if(this.socket is null) 179 { 180 import core.sys.posix.unistd; 181 core.sys.posix.unistd.close(this.handle); 182 } 183 else 184 { 185 this.socket.shutdown(SocketShutdown.BOTH); 186 this.socket.close(); 187 } 188 189 version (GEAR_IO_DEBUG) log.info("peer socket %s closed: fd=%d", this.RemoteAddress.toString, this.handle); 190 191 Task task = _task; 192 if(task !is null) 193 { 194 task.Stop(); 195 } 196 } 197 198 /** 199 * Try to write a block of data. 200 */ 201 protected ptrdiff_t TryWrite(const(ubyte)[] data) 202 { 203 ClearError(); 204 // const nBytes = this.socket.send(data); 205 version (GEAR_IO_DEBUG) log.trace("try to write: %d bytes, fd=%d", data.length, this.handle); 206 207 const nBytes = write(this.handle, data.ptr, data.length); 208 209 version (GEAR_IO_DEBUG) 210 log.trace("actually written: %d / %d bytes, fd=%d", nBytes, data.length, this.handle); 211 212 if (nBytes > 0) { 213 return nBytes; 214 } 215 216 if (nBytes == Socket.ERROR) { 217 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38 218 // check more Error status 219 // EPIPE/Broken pipe: 220 // https://github.com/angrave/SystemProgramming/wiki/Networking%2C-Part-7%3A-Nonblocking-I-O%2C-select%28%29%2C-and-epoll 221 222 if(errno == EAGAIN) { 223 version (GEAR_IO_DEBUG) { 224 log.warn("Warning on write: fd=%d, errno=%d, message=%s", this.handle, 225 errno, GetErrorMessage(errno)); 226 } 227 } else if(errno == EINTR || errno == EWOULDBLOCK) { 228 // https://stackoverflow.com/questions/38964745/can-a-socket-become-writeable-after-an-ewouldblock-but-before-an-epoll-wait 229 debug log.warn("Warning on write: fd=%d, errno=%d, message=%s", this.handle, 230 errno, GetErrorMessage(errno)); 231 // eventLoop.update(this); 232 } else { 233 this._error = true; 234 this._errorMessage = GetErrorMessage(errno); 235 if(errno == ECONNRESET) { 236 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean 237 OnDisconnected(); 238 // ErrorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer"); 239 } else if(errno == EPIPE) { 240 // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe 241 // Handle SIGPIPE signal 242 OnDisconnected(); 243 ErrorOccurred(ErrorCode.BROKENPIPE , "Broken pipe detected!"); 244 } 245 246 } 247 } else { 248 version (GEAR_DEBUG) { 249 log.warn("nBytes=%d, message: %s", nBytes, lastSocketError()); 250 assert(false, "Undefined behavior!"); 251 } else { 252 this._error = true; 253 } 254 } 255 256 return 0; 257 } 258 259 private bool TryNextWrite(NbuffChunk buffer) { 260 const(ubyte)[] data = cast(const(ubyte)[])buffer.data; 261 version (GEAR_IO_DEBUG) { 262 log.trace("writting from a buffer [fd=%d], %d bytes, buffer: %s", 263 this.handle, data.length, buffer.data.ptr); 264 } 265 266 ptrdiff_t remaining = data.length; 267 if(data.length == 0) 268 return true; 269 270 while(remaining > 0 && !_error && !IsClosing() && !_isWriteCancelling) { 271 ptrdiff_t nBytes = TryWrite(data); 272 version (GEAR_IO_DEBUG) 273 { 274 log.trace("write out once: fd=%d, %d / %d bytes, remaining: %d buffer: %s", 275 this.handle, nBytes, data.length, remaining, buffer.AsArray.ptr); 276 } 277 278 if (nBytes > 0) { 279 remaining -= nBytes; 280 data = data[nBytes .. $]; 281 } 282 } 283 284 version (GEAR_IO_DEBUG) { 285 if(remaining == 0) { 286 log.trace("A buffer is written out. fd=%d", this.handle); 287 return true; 288 } else { 289 log.warn("Writing cancelled or an Error ocurred. fd=%d", this.handle); 290 return false; 291 } 292 } else { 293 return remaining == 0; 294 } 295 } 296 297 void ResetWriteStatus() { 298 // if(_writeQueue !is null) 299 // _writeQueue.Clear(); 300 if(!_senddingBuffer.empty()) 301 _senddingBuffer.clear(); 302 atomicStore(_isWritting, false); 303 _isWriteCancelling = false; 304 } 305 306 /** 307 * Should be thread-safe. 308 */ 309 override void OnWrite() { 310 version (GEAR_IO_DEBUG) 311 { 312 log.trace("checking status, isWritting: %s, writeBytes: %s", 313 _isWritting, _writeBytes.empty() ? "null" : cast(string)_writeBytes.data()); 314 } 315 316 if(!_isWritting) { 317 version (GEAR_IO_DEBUG) 318 log.info("No data needs to be written out. fd=%d", this.handle); 319 return; 320 } 321 322 if(IsClosing() && _isWriteCancelling) { 323 version (GEAR_DEBUG) log.info("Write cancelled or closed, fd=%d", this.handle); 324 ResetWriteStatus(); 325 return; 326 } 327 328 // FIXME: Needing refactor or cleanup -@zhangxueping at 2020-04-24T14:26:45+08:00 329 // More tests are needed 330 // keep thread-safe here 331 if(!cas(&_isBusyWritting, false, true)) { 332 // version (GEAR_IO_DEBUG) 333 version(GEAR_DEBUG) log.warn("busy writing. fd=%d", this.handle); 334 return; 335 } 336 337 scope(exit) { 338 _isBusyWritting = false; 339 } 340 341 if(!_writeBytes.empty()) { 342 if(TryNextWrite(_writeBytes)) { 343 _writeBytes.popBackN(_writeBytes.length); 344 } else { 345 version (GEAR_IO_DEBUG) 346 { 347 log.info("waiting to try again... fd=%d, writeBytes: %s", 348 this.handle, cast(string)_writeBytes.AsArray); 349 } 350 // eventLoop.update(this); 351 return; 352 } 353 version (GEAR_IO_DEBUG) 354 log.trace("running here, fd=%d", this.handle); 355 } 356 357 if(CheckAllWriteDone()) { 358 return; 359 } 360 361 version (GEAR_IO_DEBUG) { 362 log.trace("start to write [fd=%d], writeBytes %s empty", this.handle, _writeBytes.empty() ? "is" : "is not"); 363 } 364 365 _writeBytes = _senddingBuffer.frontChunk(); 366 _senddingBuffer.popChunk(); 367 if(!_writeBytes.empty()) { 368 if(TryNextWrite(_writeBytes)) { 369 _writeBytes.popBackN(_writeBytes.length); 370 CheckAllWriteDone(); 371 } else { 372 version (GEAR_IO_DEBUG) 373 log.info("waiting to try again: fd=%d, writeBytes: %s", this.handle, cast(string)_writeBytes.AsArray); 374 375 // eventLoop.update(this); 376 } 377 version (GEAR_IO_DEBUG) { 378 log.warn("running here, fd=%d", this.handle); 379 } 380 } 381 } 382 private shared bool _isBusyWritting = false; 383 384 protected bool CheckAllWriteDone() { 385 version (GEAR_IO_DEBUG) { 386 import std.conv; 387 log.trace("checking remaining: fd=%d, writeQueue empty: %s", this.handle, 388 _senddingBuffer.empty() || _senddingBuffer.empty().to!string()); 389 } 390 391 if(_senddingBuffer.empty()) { 392 ResetWriteStatus(); 393 version (GEAR_IO_DEBUG) 394 log.info("All data are written out: fd=%d", this.handle); 395 if(dataWriteDoneHandler !is null) 396 dataWriteDoneHandler(this); 397 return true; 398 } 399 400 return false; 401 } 402 403 protected void InitializeWriteQueue() { 404 // if (_writeQueue is null) { 405 // _writeQueue = new WritingBufferQueue(); 406 // } 407 } 408 409 protected bool DoConnect(Address addr) { 410 try { 411 this.socket.connect(addr); 412 } catch (SocketOSException e) { 413 log.error(e.msg); 414 version(GEAR_DEBUG) error(e); 415 return false; 416 } 417 return true; 418 } 419 420 void CancelWrite() { 421 _isWriteCancelling = true; 422 } 423 424 bool isWriteCancelling() { 425 return _isWriteCancelling; 426 } 427 428 DataReceivedHandler GetDataReceivedHandler() { 429 return dataReceivedHandler; 430 } 431 432 }