1 /*
2  * Geario - A cross-platform abstraction library with asynchronous I/O.
3  *
4  * Copyright (C) 2021-2022 Kerisy.com
5  *
6  * Website: https://www.kerisy.com
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module geario.event.selector.Kqueue;
13 
14 
15 // dfmt off
16 version(HAVE_KQUEUE):
17 // dfmt on
18 import geario.event.selector.Selector;
19 import geario.event.timer.Kqueue;
20 import geario.Exceptions;
21 import geario.net.channel;
22 import geario.logging;
23 import geario.util.CompilerHelper;
24 
25 import std.exception;
26 import std.socket;
27 import std.string;
28 
29 import core.time;
30 import core.stdc.string;
31 import core.stdc.errno;
32 import core.sys.posix.sys.types; // for ssize_t, size_t
33 import core.sys.posix.signal;
34 import core.sys.posix.netinet.tcp;
35 import core.sys.posix.netinet.in_;
36 import core.sys.posix.unistd;
37 import core.sys.posix.time;
38 import geario.util.worker;
39 
40 /**
41  * 
42  */
43 class AbstractSelector : Selector {
44     // kevent array size
45     enum int NUM_KEVENTS = 128;
46     private bool isDisposed = false;
47     private Kevent[NUM_KEVENTS] events;
48     private int _kqueueFD;
49     private EventChannel _eventChannel;
50 
51     this(size_t number, size_t divider, Worker worker = null, size_t maxChannels = 1500) {
52         super(number, divider, worker, maxChannels);
53         _kqueueFD = kqueue();
54         _eventChannel = new KqueueEventChannel(this);
55         Register(_eventChannel);
56     }
57 
58     ~this() @nogc {
59         // Dispose();
60     }
61 
62     override void Dispose() {
63         if (isDisposed)
64             return;
65 
66         version (GEAR_IO_DEBUG)
67             log.trace("disposing selector[fd=%d]...", _kqueueFD);
68         isDisposed = true;
69         _eventChannel.Close();
70         int r = core.sys.posix.unistd.close(_kqueueFD);
71         if(r != 0) {
72             version(GEAR_DEBUG) log.warn("Error: %d", r);
73         }
74 
75         super.Dispose();
76     }
77 
78     override void OnStop() {
79         version (GEAR_IO_DEBUG)
80             log.info("Selector stopping. fd=%d", _kqueueFD);  
81                
82         if(!_eventChannel.IsClosed()) {
83             _eventChannel.trigger();
84             // _eventChannel.OnWrite();
85         }
86     }
87 
88     override bool Register(AbstractChannel channel) {
89         super.Register(channel);
90         
91         const int fd = channel.handle;
92         version (GEAR_IO_DEBUG)
93             log.trace("register channel: fd=%d, type=%s", fd, channel.Type);
94 
95         int err = -1;
96         if (channel.Type == ChannelType.Timer)
97         {
98             Kevent ev;
99             AbstractTimer timerChannel = cast(AbstractTimer) channel;
100             if (timerChannel is null)
101                 return false;
102             size_t time = timerChannel.time < 20 ? 20 : timerChannel.time; // in millisecond
103             EV_SET(&ev, timerChannel.handle, EVFILT_TIMER,
104                     EV_ADD | EV_ENABLE | EV_CLEAR, 0, time, cast(void*) channel);
105             err = kevent(_kqueueFD, &ev, 1, null, 0, null);
106         }
107         else
108         {
109             if (fd < 0)
110                 return false;
111 
112             Kevent[2] ev = void;
113             short read = EV_ADD | EV_ENABLE;
114             short write = EV_ADD | EV_ENABLE;
115 
116             if (channel.HasFlag(ChannelFlag.ETMode))
117             {
118                 read |= EV_CLEAR;
119                 write |= EV_CLEAR;
120             }
121 
122             EV_SET(&(ev[0]), fd, EVFILT_READ, read, 0, 0, cast(void*) channel);
123             EV_SET(&(ev[1]), fd, EVFILT_WRITE, write, 0, 0, cast(void*) channel);
124 
125             if (channel.HasFlag(ChannelFlag.Read) && channel.HasFlag(ChannelFlag.Write))
126                 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null);
127             else if (channel.HasFlag(ChannelFlag.Read))
128                 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null);
129             else if (channel.HasFlag(ChannelFlag.Write))
130                 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null);
131         }
132 
133         if (err < 0)
134         {
135             return false;
136         }
137         
138         return true;
139     }
140 
141     override bool Deregister(AbstractChannel channel)
142     {
143         scope(exit) {
144             super.Deregister(channel);
145             version (GEAR_IO_DEBUG)
146                 log.trace("deregister, channel(fd=%d, type=%s)", channel.handle, channel.Type);
147         }
148         
149         const fd = channel.handle;
150         if (fd < 0)
151             return false;
152 
153         int err = -1;
154 
155         if (channel.Type == ChannelType.Timer)
156         {
157             Kevent ev;
158             AbstractTimer timerChannel = cast(AbstractTimer) channel;
159             if (timerChannel is null)
160                 return false;
161             EV_SET(&ev, fd, EVFILT_TIMER, EV_DELETE, 0, 0, cast(void*) channel);
162             err = kevent(_kqueueFD, &ev, 1, null, 0, null);
163         }
164         else
165         {
166             Kevent[2] ev = void;
167             EV_SET(&(ev[0]), fd, EVFILT_READ, EV_DELETE, 0, 0, cast(void*) channel);
168             EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, cast(void*) channel);
169 
170             if (channel.HasFlag(ChannelFlag.Read) && channel.HasFlag(ChannelFlag.Write))
171                 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null);
172             else if (channel.HasFlag(ChannelFlag.Read))
173                 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null);
174             else if (channel.HasFlag(ChannelFlag.Write))
175                 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null);
176         }
177 
178         if (err < 0)
179         {
180             return false;
181         }
182 
183         // channel.currtLoop = null;
184         channel.Clear();
185 
186         return true;
187     }
188 
189     protected override int DoSelect(long timeout) {
190         // void* [] tmp;
191         // eventBuffer = tmp;
192         timespec ts;
193         timespec *tsp;
194         // timeout is in milliseconds. Convert to struct timespec.
195         // timeout == -1 : wait forever : timespec timeout of NULL
196         // timeout == 0  : return immediately : timespec timeout of zero
197         if (timeout >= 0) {
198             // For some indeterminate reason kevent(2) has been found to fail with
199             // an EINVAL Error for timeout values greater than or equal to
200             // 100000001000L. To avoid this problem, clamp the timeout arbitrarily
201             // to the maximum value of a 32-bit signed integer which is
202             // approximately 25 days in milliseconds.
203             const int timeoutMax = int.max;
204             if (timeout > timeoutMax) {
205                 timeout = timeoutMax;
206             }
207             ts.tv_sec = timeout / 1000;
208             ts.tv_nsec = (timeout % 1000) * 1000000; //nanosec = 1 million millisec
209             tsp = &ts;
210         } else {
211             tsp = null;
212         }        
213 
214         // auto tspec = timespec(1, 1000 * 10);
215         int result = kevent(_kqueueFD, null, 0, events.ptr, events.length, tsp);
216 
217         foreach (i; 0 .. result)
218         {
219             AbstractChannel channel = cast(AbstractChannel)(events[i].udata);
220             ushort eventFlags = events[i].flags;
221 
222             version (GEAR_IO_DEBUG)
223             log.info("handling event: events=%d, fd=%d", eventFlags, channel.handle);
224 
225             if (eventFlags & EV_ERROR) {
226                 log.warn("channel[fd=%d] has a Error.", channel.handle);
227                 channel.Close();
228                 continue;
229             }
230 
231             if (eventFlags & EV_EOF) {
232                 version (GEAR_IO_DEBUG) log.info("channel[fd=%d] closed", channel.handle);
233                 channel.Close();
234                 continue;
235             }
236 
237             short filter = events[i].filter;
238             ChannelEventHandle(channel, filter);
239         }
240 
241         return result;
242     }
243 
244     private void ChannelEventHandle(AbstractChannel channel, uint filter) {
245         version (GEAR_IO_DEBUG)
246         log.info("handling event: events=%d, fd=%d", filter, channel.handle);
247 
248         if(filter == EVFILT_TIMER)
249         {
250             channel.OnRead();
251         }
252         else if (filter == EVFILT_WRITE)
253         {
254             channel.OnWrite();
255         }
256         else if (filter == EVFILT_READ)
257         {
258             channel.OnRead();
259         }
260         else
261         {
262             log.warn("Unhandled channel filter: %d", filter);
263         }
264     }
265 }
266 
267 enum : short {
268     EVFILT_READ = -1,
269     EVFILT_WRITE = -2,
270     EVFILT_AIO = -3, /* attached to aio requests */
271     EVFILT_VNODE = -4, /* attached to vnodes */
272     EVFILT_PROC = -5, /* attached to struct proc */
273     EVFILT_SIGNAL = -6, /* attached to struct proc */
274     EVFILT_TIMER = -7, /* timers */
275     EVFILT_MACHPORT = -8, /* Mach portsets */
276     EVFILT_FS = -9, /* filesystem events */
277     EVFILT_USER = -10, /* User events */
278     EVFILT_VM = -12, /* virtual memory events */
279     EVFILT_SYSCOUNT = 11
280 }
281 
282 extern (D) void EV_SET(Kevent* kevp, typeof(Kevent.tupleof) args) @nogc nothrow {
283     *kevp = Kevent(args);
284 }
285 
286 struct Kevent {
287     uintptr_t ident; /* identifier for this event */
288     short filter; /* filter for event */
289     ushort flags;
290     uint fflags;
291     intptr_t data;
292     void* udata; /* opaque user data identifier */
293 }
294 
295 enum {
296     /* actions */
297     EV_ADD = 0x0001, /* add event to kq (implies enable) */
298     EV_DELETE = 0x0002, /* delete event from kq */
299     EV_ENABLE = 0x0004, /* enable event */
300     EV_DISABLE = 0x0008, /* disable event (not reported) */
301 
302     /* flags */
303     EV_ONESHOT = 0x0010, /* only report one occurrence */
304     EV_CLEAR = 0x0020, /* clear event state after reporting */
305     EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */
306     EV_DISPATCH = 0x0080, /* disable event after reporting */
307 
308     EV_SYSFLAGS = 0xF000, /* reserved by system */
309     EV_FLAG1 = 0x2000, /* filter-specific flag */
310 
311     /* returned values */
312     EV_EOF = 0x8000, /* EOF detected */
313     EV_ERROR = 0x4000, /* Error, data contains errno */
314 
315 }
316 
317 enum {
318     /*
319     * data/hint flags/masks for EVFILT_USER, shared with userspace
320     *
321     * On input, the top two bits of fflags specifies how the lower twenty four
322     * bits should be applied to the stored value of fflags.
323     *
324     * On output, the top two bits will always be set to NOTE_FFNOP and the
325     * remaining twenty four bits will contain the stored fflags value.
326     */
327     NOTE_FFNOP = 0x00000000, /* ignore input fflags */
328     NOTE_FFAND = 0x40000000, /* AND fflags */
329     NOTE_FFOR = 0x80000000, /* OR fflags */
330     NOTE_FFCOPY = 0xc0000000, /* copy fflags */
331     NOTE_FFCTRLMASK = 0xc0000000, /* masks for operations */
332     NOTE_FFLAGSMASK = 0x00ffffff,
333 
334     NOTE_TRIGGER = 0x01000000, /* Cause the event to be
335                                     triggered for output. */
336 
337     /*
338     * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace
339     */
340     NOTE_LOWAT = 0x0001, /* low water mark */
341 
342     /*
343     * data/hint flags for EVFILT_VNODE, shared with userspace
344     */
345     NOTE_DELETE = 0x0001, /* vnode was removed */
346     NOTE_WRITE = 0x0002, /* data contents changed */
347     NOTE_EXTEND = 0x0004, /* size increased */
348     NOTE_ATTRIB = 0x0008, /* attributes changed */
349     NOTE_LINK = 0x0010, /* link count changed */
350     NOTE_RENAME = 0x0020, /* vnode was renamed */
351     NOTE_REVOKE = 0x0040, /* vnode access was revoked */
352 
353     /*
354     * data/hint flags for EVFILT_PROC, shared with userspace
355     */
356     NOTE_EXIT = 0x80000000, /* process exited */
357     NOTE_FORK = 0x40000000, /* process forked */
358     NOTE_EXEC = 0x20000000, /* process exec'd */
359     NOTE_PCTRLMASK = 0xf0000000, /* mask for hint bits */
360     NOTE_PDATAMASK = 0x000fffff, /* mask for pid */
361 
362     /* additional flags for EVFILT_PROC */
363     NOTE_TRACK = 0x00000001, /* follow across forks */
364     NOTE_TRACKERR = 0x00000002, /* could not track child */
365     NOTE_CHILD = 0x00000004, /* am a child process */
366 
367 }
368 
369 extern (C) {
370     int kqueue() @nogc nothrow;
371     int kevent(int kq, const Kevent* changelist, int nchanges,
372             Kevent* eventlist, int nevents, const timespec* timeout) @nogc nothrow;
373 }
374 
375 static if (CompilerHelper.IsLessThan(2078)) {
376     enum SO_REUSEPORT = 0x0200;
377 }