1 module geario.event.selector.Selector; 2 3 import geario.Exceptions; 4 import geario.Functions; 5 import geario.net.channel.AbstractChannel; 6 import geario.net.channel.Types; 7 import geario.logging; 8 import geario.util.worker; 9 10 import core.atomic; 11 import core.memory; 12 import core.thread; 13 14 15 /** 16 http://tutorials.jenkov.com/java-nio/selectors.html 17 */ 18 abstract class Selector { 19 20 private shared bool _running = false; 21 private shared bool _isStopping = false; 22 private bool _isReady; 23 protected size_t _id; 24 protected size_t divider; 25 private Worker _taskWorker; 26 // protected AbstractChannel[] channels; 27 protected long idleTime = -1; // in millisecond 28 protected int fd; 29 30 protected long timeout = -1; // in millisecond 31 private Thread _thread; 32 33 private SimpleEventHandler _startedHandler; 34 private SimpleEventHandler _stoppeddHandler; 35 36 this(size_t id, size_t divider, Worker worker = null, size_t maxChannels = 1500) { 37 _id = id; 38 _taskWorker = worker; 39 this.divider = divider; 40 // channels = new AbstractChannel[maxChannels]; 41 } 42 43 size_t GetId() { 44 return _id; 45 } 46 47 Worker worker() { 48 return _taskWorker; 49 } 50 51 bool IsReady() { 52 return _isReady; 53 } 54 55 56 /** 57 * Tells whether or not this selector is running. 58 * 59 * @return <tt>true</tt> if, and only if, this selector is running 60 */ 61 bool IsRuning() { 62 return _running; 63 } 64 65 alias isOpen = IsRuning; 66 67 bool IsStopping() { 68 return _isStopping; 69 } 70 71 bool Register(AbstractChannel channel) { 72 assert(channel !is null); 73 channel.taskWorker = _taskWorker; 74 void* context = cast(void*)channel; 75 GC.addRoot(context); 76 GC.setAttr(cast(void*)context, GC.BlkAttr.NO_MOVE); 77 version (GEAR_IO_DEBUG) { 78 int infd = cast(int) channel.handle; 79 log.trace("Register channel@%s: fd=%d, selector: %d", context, infd, GetId()); 80 } 81 return true; 82 } 83 84 bool Deregister(AbstractChannel channel) { 85 channel.taskWorker = null; 86 void* context = cast(void*)channel; 87 GC.removeRoot(context); 88 GC.clrAttr(context, GC.BlkAttr.NO_MOVE); 89 version(GEAR_IO_DEBUG) { 90 size_t fd = cast(size_t) channel.handle; 91 log.info("The channel@%s has been deregistered: fd=%d, selector: %d", context, fd, GetId()); 92 } 93 return true; 94 } 95 96 protected abstract int DoSelect(long timeout); 97 98 /** 99 timeout: in millisecond 100 */ 101 void Run(long timeout = -1) { 102 this.timeout = timeout; 103 DoRun(); 104 } 105 106 /** 107 timeout: in millisecond 108 */ 109 void RunAsync(long timeout = -1, SimpleEventHandler handler = null) { 110 if(_running) { 111 version (GEAR_IO_DEBUG) log.warn("The current selector %d has being running already!", _id); 112 return; 113 } 114 this.timeout = timeout; 115 version (GEAR_IO_DEBUG) Trace("runAsync ..."); 116 Thread th = new Thread(() { 117 try { 118 DoRun(handler); 119 } catch (Throwable t) { 120 log.warn(t.msg); 121 version(GEAR_DEBUG) log.warn(t.toString()); 122 } 123 }); 124 // th.IsDaemon = true; // unstable 125 th.start(); 126 } 127 128 private void DoRun(SimpleEventHandler handler=null) { 129 if(cas(&_running, false, true)) { 130 version (GEAR_IO_DEBUG) Trace("running selector..."); 131 _thread = Thread.getThis(); 132 if(handler !is null) { 133 handler(); 134 } 135 OnLoop(timeout); 136 } else { 137 version (GEAR_DEBUG) log.warn("The current selector %d has being running already!", _id); 138 } 139 } 140 141 void Stop() { 142 version (GEAR_IO_DEBUG) 143 log.trace("Stopping selector %d. _running=%s, _isStopping=%s", _id, _running, _isStopping); 144 if(cas(&_isStopping, false, true)) { 145 try { 146 OnStop(); 147 } catch(Throwable t) { 148 log.warn(t.msg); 149 version(GEAR_DEBUG) log.warn(t); 150 } 151 } 152 } 153 154 protected void OnStop() { 155 version (GEAR_IO_DEBUG) 156 log.trace("stopping."); 157 } 158 159 /** 160 timeout: in millisecond 161 */ 162 protected void OnLoop(long timeout = -1) { 163 _isReady = true; 164 idleTime = timeout; 165 166 version (HAVE_IOCP) { 167 DoSelect(timeout); 168 } else { 169 do { 170 // version(GEAR_THREAD_DEBUG) log.warn("Threads: %d", Thread.getAll().length); 171 DoSelect(timeout); 172 // log.info("Selector rolled once. isRuning: %s", isRuning); 173 } while (!_isStopping); 174 } 175 176 _isReady = false; 177 _running = false; 178 version(GEAR_IO_DEBUG) log.info("Selector %d exited.", _id); 179 Dispose(); 180 } 181 182 /** 183 timeout: in millisecond 184 */ 185 int Select(long timeout) { 186 if (timeout < 0) 187 throw new IllegalArgumentException("Negative timeout"); 188 return DoSelect((timeout == 0) ? -1 : timeout); 189 } 190 191 int Select() { 192 return DoSelect(0); 193 } 194 195 int SelectNow() { 196 return DoSelect(0); 197 } 198 199 void Dispose() { 200 _thread = null; 201 _startedHandler = null; 202 _stoppeddHandler = null; 203 } 204 205 bool IsSelfThread() { 206 return _thread is Thread.getThis(); 207 } 208 }