1 module geario.event.selector.Selector;
2 
3 import geario.Exceptions;
4 import geario.Functions;
5 import geario.net.channel.AbstractChannel;
6 import geario.net.channel.Types;
7 import geario.logging;
8 import geario.util.worker;
9 
10 import core.atomic;
11 import core.memory;
12 import core.thread;
13 
14 
15 /**
16 http://tutorials.jenkov.com/java-nio/selectors.html
17 */
18 abstract class Selector {
19 
20     private shared bool _running = false;
21     private shared bool _isStopping = false;
22     private bool _isReady;
23     protected size_t _id;
24     protected size_t divider;
25     private Worker _taskWorker;
26     // protected AbstractChannel[] channels;
27     protected long idleTime = -1; // in millisecond
28     protected int fd;
29 
30     protected long timeout = -1; // in millisecond
31     private Thread _thread;
32 
33     private SimpleEventHandler _startedHandler;
34     private SimpleEventHandler _stoppeddHandler;
35 
36     this(size_t id, size_t divider, Worker worker = null, size_t maxChannels = 1500) {
37         _id = id;
38         _taskWorker = worker;
39         this.divider = divider;
40         // channels = new AbstractChannel[maxChannels];
41     }
42 
43     size_t GetId() {
44         return _id;
45     }
46 
47     Worker worker() {
48         return _taskWorker;
49     }
50 
51     bool IsReady() {
52         return _isReady;
53     }
54 
55 
56     /**
57      * Tells whether or not this selector is running.
58      *
59      * @return <tt>true</tt> if, and only if, this selector is running
60      */
61     bool IsRuning() {
62         return _running;
63     }
64 
65     alias isOpen = IsRuning;
66 
67     bool IsStopping() {
68         return _isStopping;
69     }
70 
71     bool Register(AbstractChannel channel) {
72         assert(channel !is null);
73         channel.taskWorker = _taskWorker;
74         void* context = cast(void*)channel;
75         GC.addRoot(context);
76         GC.setAttr(cast(void*)context, GC.BlkAttr.NO_MOVE);
77         version (GEAR_IO_DEBUG) {
78             int infd = cast(int) channel.handle;
79             log.trace("Register channel@%s: fd=%d, selector: %d", context, infd, GetId());
80         }        
81         return true;
82     }
83 
84     bool Deregister(AbstractChannel channel) {
85         channel.taskWorker = null;
86         void* context = cast(void*)channel;
87         GC.removeRoot(context);
88         GC.clrAttr(context, GC.BlkAttr.NO_MOVE);
89         version(GEAR_IO_DEBUG) {
90             size_t fd = cast(size_t) channel.handle;
91             log.info("The channel@%s has been deregistered: fd=%d, selector: %d", context, fd, GetId());
92         }        
93         return true;
94     }
95 
96     protected abstract int DoSelect(long timeout);
97 
98     /**
99         timeout: in millisecond
100     */
101     void Run(long timeout = -1) {
102         this.timeout = timeout;
103         DoRun();
104     }
105 
106     /**
107         timeout: in millisecond
108     */
109     void RunAsync(long timeout = -1, SimpleEventHandler handler = null) {
110         if(_running) {
111             version (GEAR_IO_DEBUG) log.warn("The current selector %d has being running already!", _id);
112             return;
113         }
114         this.timeout = timeout;
115         version (GEAR_IO_DEBUG) Trace("runAsync ...");
116         Thread th = new Thread(() { 
117             try {
118                 DoRun(handler); 
119             } catch (Throwable t) {
120                 log.warn(t.msg);
121                 version(GEAR_DEBUG) log.warn(t.toString());
122             }
123         });
124         // th.IsDaemon = true; // unstable
125         th.start();
126     }
127     
128     private void DoRun(SimpleEventHandler handler=null) {
129         if(cas(&_running, false, true)) {
130             version (GEAR_IO_DEBUG) Trace("running selector...");
131             _thread = Thread.getThis();
132             if(handler !is null) {
133                 handler();
134             }
135             OnLoop(timeout);
136         } else {
137             version (GEAR_DEBUG) log.warn("The current selector %d has being running already!", _id);
138         }  
139     }
140 
141     void Stop() {
142         version (GEAR_IO_DEBUG)
143             log.trace("Stopping selector %d. _running=%s, _isStopping=%s", _id, _running, _isStopping); 
144         if(cas(&_isStopping, false, true)) {
145             try {
146                 OnStop();
147             } catch(Throwable t) {
148                 log.warn(t.msg);
149                 version(GEAR_DEBUG) log.warn(t);
150             }
151         }
152     }
153 
154     protected void OnStop() {
155         version (GEAR_IO_DEBUG) 
156             log.trace("stopping.");
157     }
158 
159     /**
160         timeout: in millisecond
161     */
162     protected void OnLoop(long timeout = -1) {
163         _isReady = true;
164         idleTime = timeout;
165 
166         version (HAVE_IOCP) {
167             DoSelect(timeout);
168         } else {
169             do {
170                 // version(GEAR_THREAD_DEBUG) log.warn("Threads: %d", Thread.getAll().length);
171                 DoSelect(timeout);
172                 // log.info("Selector rolled once. isRuning: %s", isRuning);
173             } while (!_isStopping);
174         }
175 
176         _isReady = false;
177         _running = false;
178         version(GEAR_IO_DEBUG) log.info("Selector %d exited.", _id);
179         Dispose();
180     }
181 
182     /**
183         timeout: in millisecond
184     */
185     int Select(long timeout) {
186         if (timeout < 0)
187             throw new IllegalArgumentException("Negative timeout");
188         return DoSelect((timeout == 0) ? -1 : timeout);
189     }
190 
191     int Select() {
192         return DoSelect(0);
193     }
194 
195     int SelectNow() {
196         return DoSelect(0);
197     }
198 
199     void Dispose() {
200         _thread = null;
201         _startedHandler = null;
202         _stoppeddHandler = null;
203     }
204     
205     bool IsSelfThread() {
206         return _thread is Thread.getThis();
207     }
208 }