1 /* 2 * Geario - A cross-platform abstraction library with asynchronous I/O. 3 * 4 * Copyright (C) 2021-2022 Kerisy.com 5 * 6 * Website: https://www.kerisy.com 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module geario.event.selector.Epoll; 13 14 // dfmt off 15 version(HAVE_EPOLL): 16 17 // dfmt on 18 19 import std.exception; 20 import std.socket; 21 import std.string; 22 23 import core.sys.posix.sys.types; 24 import core.sys.posix.netinet.tcp; 25 import core.sys.posix.netinet.in_; 26 import core.sys.posix.unistd; 27 import core.stdc.string; 28 import core.stdc.errno; 29 import core.time; 30 import core.thread; 31 32 import core.sys.posix.sys.resource; 33 import core.sys.posix.sys.time; 34 import core.sys.linux.epoll; 35 36 import geario.event.selector.Selector; 37 import geario.Exceptions; 38 import geario.net.channel; 39 import geario.logging; 40 import geario.event.timer; 41 import geario.system.Error; 42 import geario.util.worker; 43 44 /* Max. theoretical number of file descriptors on system. */ 45 __gshared size_t fdLimit = 0; 46 47 shared static this() { 48 rlimit fileLimit; 49 getrlimit(RLIMIT_NOFILE, &fileLimit); 50 fdLimit = fileLimit.rlim_max; 51 } 52 53 54 /** 55 * 56 */ 57 class AbstractSelector : Selector { 58 enum int NUM_KEVENTS = 1024; 59 private int _epollFD; 60 private bool isDisposed = false; 61 private epoll_event[NUM_KEVENTS] events; 62 private EventChannel _eventChannel; 63 64 this(size_t id, size_t divider, Worker worker = null, size_t maxChannels = 1500) { 65 super(id, divider, worker, maxChannels); 66 67 // http://man7.org/linux/man-pages/man2/epoll_create.2.html 68 /* 69 * Set the close-on-exec (FD_CLOEXEC) flag on the new file descriptor. 70 * See the description of the O_CLOEXEC flag in open(2) for reasons why 71 * this may be useful. 72 */ 73 _epollFD = epoll_create1(EPOLL_CLOEXEC); 74 if (_epollFD < 0) 75 throw new IOException("epoll_create failed"); 76 77 _eventChannel = new EpollEventChannel(this); 78 Register(_eventChannel); 79 } 80 81 ~this() @nogc { 82 // Dispose(); 83 } 84 85 override void Dispose() { 86 if (isDisposed) 87 return; 88 89 version (GEAR_IO_DEBUG) 90 log.trace("disposing selector[fd=%d]...", _epollFD); 91 isDisposed = true; 92 _eventChannel.Close(); 93 int r = core.sys.posix.unistd.close(_epollFD); 94 if(r != 0) { 95 version (GEAR_IO_DEBUG) log.warn("Error: %d", r); 96 } 97 98 super.Dispose(); 99 } 100 101 override void OnStop() { 102 version (GEAR_IO_DEBUG) 103 log.info("Selector stopping. fd=%d, id: %d", _epollFD, GetId()); 104 105 if(!_eventChannel.IsClosed()) { 106 _eventChannel.trigger(); 107 // _eventChannel.OnWrite(); 108 } 109 } 110 111 override bool Register(AbstractChannel channel) { 112 super.Register(channel); 113 114 version (GEAR_IO_DEBUG) 115 log.trace("register, channel(fd=%d, type=%s)", channel.handle, channel.Type); 116 117 // epoll_event e; 118 119 // e.data.fd = infd; 120 // e.data.ptr = cast(void*) channel; 121 // e.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLOUT; 122 // int s = epoll_ctl(_epollFD, EPOLL_CTL_ADD, infd, &e); 123 // if (s == -1) { 124 // debug log.warn("failed to register channel: fd=%d", infd); 125 // return false; 126 // } else { 127 // return true; 128 // } 129 if (EpollCtl(channel, EPOLL_CTL_ADD)) { 130 return true; 131 } else { 132 debug log.warn("failed to register channel: fd=%d", channel.handle); 133 return false; 134 } 135 } 136 137 override bool Deregister(AbstractChannel channel) { 138 scope(exit) { 139 super.Deregister(channel); 140 version (GEAR_IO_DEBUG) 141 log.trace("deregister, channel(fd=%d, type=%s)", channel.handle, channel.Type); 142 } 143 144 if (EpollCtl(channel, EPOLL_CTL_DEL)) { 145 return true; 146 } else { 147 log.warn("deregister channel failed: fd=%d", fd); 148 return false; 149 } 150 } 151 152 /** 153 timeout: in millisecond 154 */ 155 protected override int DoSelect(long timeout) { 156 int len = 0; 157 158 if (timeout <= 0) { /* Indefinite or no wait */ 159 do { 160 // http://man7.org/linux/man-pages/man2/epoll_wait.2.html 161 // https://stackoverflow.com/questions/6870158/epoll-wait-fails-due-to-eintr-how-to-remedy-this/6870391#6870391 162 len = epoll_wait(_epollFD, events.ptr, events.length, cast(int) timeout); 163 } while ((len == -1) && (errno == EINTR)); 164 } else { /* Bounded wait; bounded restarts */ 165 len = iEpoll(_epollFD, events.ptr, events.length, cast(int) timeout); 166 } 167 168 foreach (i; 0 .. len) { 169 AbstractChannel channel = cast(AbstractChannel)(events[i].data.ptr); 170 if (channel is null) { 171 debug log.warn("channel is null"); 172 } else { 173 ChannelEventHandle(channel, events[i].events); 174 } 175 } 176 177 return len; 178 } 179 180 private void ChannelEventHandle(AbstractChannel channel, uint event) { 181 version (GEAR_IO_DEBUG) { 182 log.warn("thread: %s", Thread.getThis().name()); 183 184 // Thread.sleep(300.msecs); 185 log.info("handling event: selector=%d, channel=%d, events=%d, isReadable: %s, isWritable: %s, isClosed: %s", 186 this._epollFD, channel.handle, event, IsReadable(event), IsWritable(event), IsClosed(event)); 187 } 188 189 try { 190 if (IsClosed(event)) { // && errno != EINTR 191 /* An Error has occured on this fd, or the socket is not 192 ready for reading (why were we notified then?) */ 193 version (GEAR_IO_DEBUG) { 194 log.warn("event=%d, isReadable: %s, isWritable: %s", 195 event, IsReadable(event), IsWritable(event)); 196 197 if (IsError(event)) { 198 log.warn("channel Error: fd=%s, event=%d, errno=%d, message=%s", 199 channel.handle, event, errno, GetErrorMessage(errno)); 200 } else { 201 log.info("channel closed: fd=%d, errno=%d, message=%s", 202 channel.handle, errno, GetErrorMessage(errno)); 203 } 204 } 205 206 // The remote connection broken abnormally, so the channel should be notified. 207 if(IsReadable(event)) { 208 channel.OnRead(); 209 } 210 211 // if(IsWritable(event)) { 212 // channel.OnWrite(); 213 // } 214 215 channel.Close(); 216 } else if (event == EPOLLIN) { 217 version (GEAR_IO_DEBUG) 218 log.trace("channel read event: fd=%d", channel.handle); 219 channel.OnRead(); 220 } else if (event == EPOLLOUT) { 221 version (GEAR_IO_DEBUG) 222 log.trace("channel write event: fd=%d", channel.handle); 223 channel.OnWrite(); 224 } else if (event == (EPOLLIN | EPOLLOUT)) { 225 version (GEAR_IO_DEBUG) 226 log.trace("channel read and write: fd=%d", channel.handle); 227 channel.OnWrite(); 228 channel.OnRead(); 229 } else { 230 debug log.warn("Only read/write/close events can be handled, current event: %d", event); 231 } 232 } catch (Exception e) { 233 debug { 234 log.error("Error while handing channel: fd=%s, exception=%s, message=%s", 235 channel.handle, typeid(e), e.msg); 236 } 237 version(GEAR_DEBUG) log.warn(e); 238 } 239 } 240 241 private int iEpoll(int epfd, epoll_event* events, int numfds, int timeout) { 242 long start, now; 243 int remaining = timeout; 244 timeval t; 245 long diff; 246 247 gettimeofday(&t, null); 248 start = t.tv_sec * 1000 + t.tv_usec / 1000; 249 250 for (;;) { 251 int res = epoll_wait(epfd, events, numfds, remaining); 252 if (res < 0 && errno == EINTR) { 253 if (remaining >= 0) { 254 gettimeofday(&t, null); 255 now = t.tv_sec * 1000 + t.tv_usec / 1000; 256 diff = now - start; 257 remaining -= diff; 258 if (diff < 0 || remaining <= 0) { 259 return 0; 260 } 261 start = now; 262 } 263 } else { 264 return res; 265 } 266 } 267 } 268 269 // https://blog.csdn.net/ljx0305/article/details/4065058 270 private static bool IsError(uint events) nothrow { 271 return (events & EPOLLERR) != 0; 272 } 273 274 private static bool IsClosed(uint e) nothrow { 275 return (e & EPOLLERR) != 0 || (e & EPOLLHUP) != 0 || (e & EPOLLRDHUP) != 0 276 || (!(e & EPOLLIN) && !(e & EPOLLOUT)) != 0; 277 } 278 279 private static bool IsReadable(uint events) nothrow { 280 return (events & EPOLLIN) != 0; 281 } 282 283 private static bool IsWritable(uint events) nothrow { 284 return (events & EPOLLOUT) != 0; 285 } 286 287 private static BuildEpollEvent(AbstractChannel channel, ref epoll_event ev) { 288 ev.data.ptr = cast(void*) channel; 289 // ev.data.fd = channel.handle; 290 ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP; 291 if (channel.HasFlag(ChannelFlag.Read)) 292 ev.events |= EPOLLIN; 293 if (channel.HasFlag(ChannelFlag.Write)) 294 ev.events |= EPOLLOUT; 295 // if (channel.HasFlag(ChannelFlag.OneShot)) 296 // ev.events |= EPOLLONESHOT; 297 if (channel.HasFlag(ChannelFlag.ETMode)) 298 ev.events |= EPOLLET; 299 return ev; 300 } 301 302 private bool EpollCtl(AbstractChannel channel, int opcode) { 303 assert(channel !is null); 304 const fd = channel.handle; 305 assert(fd >= 0, "The channel.handle is not initialized!"); 306 307 epoll_event ev; 308 BuildEpollEvent(channel, ev); 309 int res = 0; 310 311 do { 312 res = epoll_ctl(_epollFD, opcode, fd, &ev); 313 } 314 while ((res == -1) && (errno == EINTR)); 315 316 /* 317 * A channel may be registered with several Selectors. When each Selector 318 * is polled a EPOLL_CTL_DEL op will be inserted into its pending update 319 * list to Remove the file descriptor from epoll. The "last" Selector will 320 * close the file descriptor which automatically unregisters it from each 321 * epoll descriptor. To avoid costly synchronization between Selectors we 322 * allow pending updates to be processed, ignoring errors. The errors are 323 * harmless as the last update for the file descriptor is guaranteed to 324 * be EPOLL_CTL_DEL. 325 */ 326 if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) { 327 log.warn("epoll_ctl failed"); 328 return false; 329 } else 330 return true; 331 } 332 }