1 /*
2  * Geario - A cross-platform abstraction library with asynchronous I/O.
3  *
4  * Copyright (C) 2021-2022 Kerisy.com
5  *
6  * Website: https://www.kerisy.com
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module geario.event.selector.Epoll;
13 
14 // dfmt off
15 version(HAVE_EPOLL):
16 
17 // dfmt on
18 
19 import std.exception;
20 import std.socket;
21 import std.string;
22 
23 import core.sys.posix.sys.types;
24 import core.sys.posix.netinet.tcp;
25 import core.sys.posix.netinet.in_;
26 import core.sys.posix.unistd;
27 import core.stdc.string;
28 import core.stdc.errno;
29 import core.time;
30 import core.thread;
31 
32 import core.sys.posix.sys.resource;
33 import core.sys.posix.sys.time;
34 import core.sys.linux.epoll;
35 
36 import geario.event.selector.Selector;
37 import geario.Exceptions;
38 import geario.net.channel;
39 import geario.logging;
40 import geario.event.timer;
41 import geario.system.Error;
42 import geario.util.worker;
43 
44 /* Max. theoretical number of file descriptors on system. */
45 __gshared size_t fdLimit = 0;
46 
47 shared static this() {
48     rlimit fileLimit;
49     getrlimit(RLIMIT_NOFILE, &fileLimit);
50     fdLimit = fileLimit.rlim_max;
51 }
52 
53 
54 /**
55  * 
56  */
57 class AbstractSelector : Selector {
58     enum int NUM_KEVENTS = 1024;
59     private int _epollFD;
60     private bool isDisposed = false;
61     private epoll_event[NUM_KEVENTS] events;
62     private EventChannel _eventChannel;
63 
64     this(size_t id, size_t divider, Worker worker = null, size_t maxChannels = 1500) {
65         super(id, divider, worker, maxChannels);
66 
67         // http://man7.org/linux/man-pages/man2/epoll_create.2.html
68         /*
69          * Set the close-on-exec (FD_CLOEXEC) flag on the new file descriptor.
70          * See the description of the O_CLOEXEC flag in open(2) for reasons why
71          * this may be useful.
72          */
73         _epollFD = epoll_create1(EPOLL_CLOEXEC);
74         if (_epollFD < 0)
75             throw new IOException("epoll_create failed");
76 
77         _eventChannel = new EpollEventChannel(this);
78         Register(_eventChannel);
79     }
80 
81     ~this() @nogc {
82         // Dispose();
83     }
84 
85     override void Dispose() {
86         if (isDisposed)
87             return;
88 
89         version (GEAR_IO_DEBUG)
90             log.trace("disposing selector[fd=%d]...", _epollFD);
91         isDisposed = true;
92         _eventChannel.Close();
93         int r = core.sys.posix.unistd.close(_epollFD);
94         if(r != 0) {
95             version (GEAR_IO_DEBUG) log.warn("Error: %d", r);
96         }
97 
98         super.Dispose();
99     }
100 
101     override void OnStop() {
102         version (GEAR_IO_DEBUG)
103             log.info("Selector stopping. fd=%d, id: %d", _epollFD, GetId());
104 
105         if(!_eventChannel.IsClosed()) {
106             _eventChannel.trigger();
107             // _eventChannel.OnWrite();
108         }
109     }
110 
111     override bool Register(AbstractChannel channel) {
112         super.Register(channel);
113         
114         version (GEAR_IO_DEBUG)
115             log.trace("register, channel(fd=%d, type=%s)", channel.handle, channel.Type);
116 
117         // epoll_event e;
118 
119         // e.data.fd = infd;
120         // e.data.ptr = cast(void*) channel;
121         // e.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLOUT;
122         // int s = epoll_ctl(_epollFD, EPOLL_CTL_ADD, infd, &e);
123         // if (s == -1) {
124         //     debug log.warn("failed to register channel: fd=%d", infd);
125         //     return false;
126         // } else {
127         //     return true;
128         // }
129         if (EpollCtl(channel, EPOLL_CTL_ADD)) {
130             return true;
131         } else {
132             debug log.warn("failed to register channel: fd=%d", channel.handle);
133             return false;
134         }
135     }
136 
137     override bool Deregister(AbstractChannel channel) {
138         scope(exit) {
139             super.Deregister(channel);
140             version (GEAR_IO_DEBUG)
141                 log.trace("deregister, channel(fd=%d, type=%s)", channel.handle, channel.Type);
142         }
143 
144         if (EpollCtl(channel, EPOLL_CTL_DEL)) {
145             return true;
146         } else {
147             log.warn("deregister channel failed: fd=%d", fd);
148             return false;
149         }
150     }
151 
152     /**
153         timeout: in millisecond
154     */
155     protected override int DoSelect(long timeout) {
156         int len = 0;
157 
158         if (timeout <= 0) { /* Indefinite or no wait */
159             do {
160                 // http://man7.org/linux/man-pages/man2/epoll_wait.2.html
161                 // https://stackoverflow.com/questions/6870158/epoll-wait-fails-due-to-eintr-how-to-remedy-this/6870391#6870391
162                 len = epoll_wait(_epollFD, events.ptr, events.length, cast(int) timeout);
163             } while ((len == -1) && (errno == EINTR));
164         } else { /* Bounded wait; bounded restarts */
165             len = iEpoll(_epollFD, events.ptr, events.length, cast(int) timeout);
166         }
167 
168         foreach (i; 0 .. len) {
169             AbstractChannel channel = cast(AbstractChannel)(events[i].data.ptr);
170             if (channel is null) {
171                 debug log.warn("channel is null");
172             } else {
173                 ChannelEventHandle(channel, events[i].events);
174             }
175         }
176 
177         return len;
178     }
179 
180     private void ChannelEventHandle(AbstractChannel channel, uint event) {
181         version (GEAR_IO_DEBUG) {
182             log.warn("thread: %s", Thread.getThis().name());
183 
184             // Thread.sleep(300.msecs);
185             log.info("handling event: selector=%d, channel=%d, events=%d, isReadable: %s, isWritable: %s, isClosed: %s", 
186                 this._epollFD, channel.handle, event, IsReadable(event), IsWritable(event), IsClosed(event));
187         }
188 
189         try {
190             if (IsClosed(event)) { // && errno != EINTR
191                 /* An Error has occured on this fd, or the socket is not
192                     ready for reading (why were we notified then?) */
193                 version (GEAR_IO_DEBUG) {
194                     log.warn("event=%d, isReadable: %s, isWritable: %s", 
195                         event, IsReadable(event), IsWritable(event));
196 
197                     if (IsError(event)) {
198                         log.warn("channel Error: fd=%s, event=%d, errno=%d, message=%s",
199                                 channel.handle, event, errno, GetErrorMessage(errno));
200                     } else {
201                         log.info("channel closed: fd=%d, errno=%d, message=%s",
202                                     channel.handle, errno, GetErrorMessage(errno));
203                     }
204                 }
205                 
206                 // The remote connection broken abnormally, so the channel should be notified.
207                 if(IsReadable(event)) {
208                     channel.OnRead();
209                 }
210 
211                 // if(IsWritable(event)) {
212                 //     channel.OnWrite();
213                 // }
214 
215                 channel.Close();
216             } else if (event == EPOLLIN) {
217                 version (GEAR_IO_DEBUG)
218                     log.trace("channel read event: fd=%d", channel.handle);
219                 channel.OnRead();
220             } else if (event == EPOLLOUT) {
221                 version (GEAR_IO_DEBUG)
222                     log.trace("channel write event: fd=%d", channel.handle);
223                 channel.OnWrite();
224             } else if (event == (EPOLLIN | EPOLLOUT)) {
225                 version (GEAR_IO_DEBUG)
226                     log.trace("channel read and write: fd=%d", channel.handle);
227                 channel.OnWrite();
228                 channel.OnRead();
229             } else {
230                 debug log.warn("Only read/write/close events can be handled, current event: %d", event);
231             }
232         } catch (Exception e) {
233             debug {
234                 log.error("Error while handing channel: fd=%s, exception=%s, message=%s",
235                         channel.handle, typeid(e), e.msg);
236             }
237             version(GEAR_DEBUG) log.warn(e);
238         }
239     }
240 
241     private int iEpoll(int epfd, epoll_event* events, int numfds, int timeout) {
242         long start, now;
243         int remaining = timeout;
244         timeval t;
245         long diff;
246 
247         gettimeofday(&t, null);
248         start = t.tv_sec * 1000 + t.tv_usec / 1000;
249 
250         for (;;) {
251             int res = epoll_wait(epfd, events, numfds, remaining);
252             if (res < 0 && errno == EINTR) {
253                 if (remaining >= 0) {
254                     gettimeofday(&t, null);
255                     now = t.tv_sec * 1000 + t.tv_usec / 1000;
256                     diff = now - start;
257                     remaining -= diff;
258                     if (diff < 0 || remaining <= 0) {
259                         return 0;
260                     }
261                     start = now;
262                 }
263             } else {
264                 return res;
265             }
266         }
267     }
268 
269     // https://blog.csdn.net/ljx0305/article/details/4065058
270     private static bool IsError(uint events) nothrow {
271         return (events & EPOLLERR) != 0;
272     }
273 
274     private static bool IsClosed(uint e) nothrow {
275         return (e & EPOLLERR) != 0 || (e & EPOLLHUP) != 0 || (e & EPOLLRDHUP) != 0
276                 || (!(e & EPOLLIN) && !(e & EPOLLOUT)) != 0;
277     }
278 
279     private static bool IsReadable(uint events) nothrow {
280         return (events & EPOLLIN) != 0;
281     }
282 
283     private static bool IsWritable(uint events) nothrow {
284         return (events & EPOLLOUT) != 0;
285     }
286 
287     private static BuildEpollEvent(AbstractChannel channel, ref epoll_event ev) {
288         ev.data.ptr = cast(void*) channel;
289         // ev.data.fd = channel.handle;
290         ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP;
291         if (channel.HasFlag(ChannelFlag.Read))
292             ev.events |= EPOLLIN;
293         if (channel.HasFlag(ChannelFlag.Write))
294             ev.events |= EPOLLOUT;
295         // if (channel.HasFlag(ChannelFlag.OneShot))
296         //     ev.events |= EPOLLONESHOT;
297         if (channel.HasFlag(ChannelFlag.ETMode))
298             ev.events |= EPOLLET;
299         return ev;
300     }
301 
302     private bool EpollCtl(AbstractChannel channel, int opcode) {
303         assert(channel !is null);
304         const fd = channel.handle;
305         assert(fd >= 0, "The channel.handle is not initialized!");
306 
307         epoll_event ev;
308         BuildEpollEvent(channel, ev);
309         int res = 0;
310 
311         do {
312             res = epoll_ctl(_epollFD, opcode, fd, &ev);
313         }
314         while ((res == -1) && (errno == EINTR));
315 
316         /*
317          * A channel may be registered with several Selectors. When each Selector
318          * is polled a EPOLL_CTL_DEL op will be inserted into its pending update
319          * list to Remove the file descriptor from epoll. The "last" Selector will
320          * close the file descriptor which automatically unregisters it from each
321          * epoll descriptor. To avoid costly synchronization between Selectors we
322          * allow pending updates to be processed, ignoring errors. The errors are
323          * harmless as the last update for the file descriptor is guaranteed to
324          * be EPOLL_CTL_DEL.
325          */
326         if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
327             log.warn("epoll_ctl failed");
328             return false;
329         } else
330             return true;
331     }
332 }