1 module geario.net.channel.ChannelTask; 2 3 import geario.event.selector.Selector; 4 import geario.Functions; 5 import geario.logging; 6 import geario.net.channel.AbstractSocketChannel; 7 import geario.net.channel.Types; 8 import geario.net.IoError; 9 import geario.system.Error; 10 import geario.util.queue; 11 import geario.util.worker; 12 13 import nbuff; 14 15 import std.format; 16 import std.socket; 17 18 import core.atomic; 19 20 /** 21 * 22 */ 23 class ChannelTask : Task { 24 DataReceivedHandler dataReceivedHandler; 25 private shared bool _isFinishing = false; 26 // private Queue!(NbuffChunk) _bytes; 27 private Nbuff _buffers; 28 29 this() { 30 // _bytes = new SimpleQueue!(NbuffChunk); 31 } 32 33 void put(NbuffChunk bytes) { 34 // _bytes.Push(bytes); 35 _buffers.append(bytes); 36 } 37 38 bool IsFinishing () { 39 return _isFinishing; 40 } 41 42 override protected void DoExecute() { 43 44 NbuffChunk bytes; 45 46 do { 47 bytes = _buffers.frontChunk(); 48 _buffers.popChunk(); 49 50 if(bytes.empty()) { 51 version(GEAR_IO_DEBUG) { 52 log.warn("A null buffer poped"); 53 } 54 break; 55 } 56 57 version(GEAR_IO_DEBUG) { 58 log.trace("buffer: %s", cast(string)bytes.data); 59 } 60 61 dataReceivedHandler(bytes); 62 63 version(GEAR_IO_DEBUG) { 64 log.trace("bytes: %s", cast(string)bytes.data); 65 } 66 67 _isFinishing = IsTerminated(); 68 if(!_isFinishing) { 69 _isFinishing = _buffers.empty(); 70 } 71 72 if(_isFinishing) { 73 version(GEAR_DEBUG) { 74 if(!bytes.empty() || !_buffers.empty()) { 75 log.warn("The buffered data lost"); 76 } 77 } 78 break; 79 } 80 } while(true); 81 } 82 } 83