1 /* 2 * Geario - A cross-platform abstraction library with asynchronous I/O. 3 * 4 * Copyright (C) 2021-2022 Kerisy.com 5 * 6 * Website: https://www.kerisy.com 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module geario.event.selector.Kqueue; 13 14 15 // dfmt off 16 version(HAVE_KQUEUE): 17 // dfmt on 18 import geario.event.selector.Selector; 19 import geario.event.timer.Kqueue; 20 import geario.Exceptions; 21 import geario.net.channel; 22 import geario.logging; 23 import geario.util.CompilerHelper; 24 25 import std.exception; 26 import std.socket; 27 import std.string; 28 29 import core.time; 30 import core.stdc.string; 31 import core.stdc.errno; 32 import core.sys.posix.sys.types; // for ssize_t, size_t 33 import core.sys.posix.signal; 34 import core.sys.posix.netinet.tcp; 35 import core.sys.posix.netinet.in_; 36 import core.sys.posix.unistd; 37 import core.sys.posix.time; 38 import geario.util.worker; 39 40 /** 41 * 42 */ 43 class AbstractSelector : Selector { 44 // kevent array size 45 enum int NUM_KEVENTS = 128; 46 private bool isDisposed = false; 47 private Kevent[NUM_KEVENTS] events; 48 private int _kqueueFD; 49 private EventChannel _eventChannel; 50 51 this(size_t number, size_t divider, Worker worker = null, size_t maxChannels = 1500) { 52 super(number, divider, worker, maxChannels); 53 _kqueueFD = kqueue(); 54 _eventChannel = new KqueueEventChannel(this); 55 Register(_eventChannel); 56 } 57 58 ~this() @nogc { 59 // Dispose(); 60 } 61 62 override void Dispose() { 63 if (isDisposed) 64 return; 65 66 version (GEAR_IO_DEBUG) 67 log.trace("disposing selector[fd=%d]...", _kqueueFD); 68 isDisposed = true; 69 _eventChannel.Close(); 70 int r = core.sys.posix.unistd.close(_kqueueFD); 71 if(r != 0) { 72 version(GEAR_DEBUG) log.warn("Error: %d", r); 73 } 74 75 super.Dispose(); 76 } 77 78 override void OnStop() { 79 version (GEAR_IO_DEBUG) 80 log.info("Selector stopping. fd=%d", _kqueueFD); 81 82 if(!_eventChannel.IsClosed()) { 83 _eventChannel.trigger(); 84 // _eventChannel.OnWrite(); 85 } 86 } 87 88 override bool Register(AbstractChannel channel) { 89 super.Register(channel); 90 91 const int fd = channel.handle; 92 version (GEAR_IO_DEBUG) 93 log.trace("register channel: fd=%d, type=%s", fd, channel.Type); 94 95 int err = -1; 96 if (channel.Type == ChannelType.Timer) 97 { 98 Kevent ev; 99 AbstractTimer timerChannel = cast(AbstractTimer) channel; 100 if (timerChannel is null) 101 return false; 102 size_t time = timerChannel.time < 20 ? 20 : timerChannel.time; // in millisecond 103 EV_SET(&ev, timerChannel.handle, EVFILT_TIMER, 104 EV_ADD | EV_ENABLE | EV_CLEAR, 0, time, cast(void*) channel); 105 err = kevent(_kqueueFD, &ev, 1, null, 0, null); 106 } 107 else 108 { 109 if (fd < 0) 110 return false; 111 112 Kevent[2] ev = void; 113 short read = EV_ADD | EV_ENABLE; 114 short write = EV_ADD | EV_ENABLE; 115 116 if (channel.HasFlag(ChannelFlag.ETMode)) 117 { 118 read |= EV_CLEAR; 119 write |= EV_CLEAR; 120 } 121 122 EV_SET(&(ev[0]), fd, EVFILT_READ, read, 0, 0, cast(void*) channel); 123 EV_SET(&(ev[1]), fd, EVFILT_WRITE, write, 0, 0, cast(void*) channel); 124 125 if (channel.HasFlag(ChannelFlag.Read) && channel.HasFlag(ChannelFlag.Write)) 126 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null); 127 else if (channel.HasFlag(ChannelFlag.Read)) 128 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null); 129 else if (channel.HasFlag(ChannelFlag.Write)) 130 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null); 131 } 132 133 if (err < 0) 134 { 135 return false; 136 } 137 138 return true; 139 } 140 141 override bool Deregister(AbstractChannel channel) 142 { 143 scope(exit) { 144 super.Deregister(channel); 145 version (GEAR_IO_DEBUG) 146 log.trace("deregister, channel(fd=%d, type=%s)", channel.handle, channel.Type); 147 } 148 149 const fd = channel.handle; 150 if (fd < 0) 151 return false; 152 153 int err = -1; 154 155 if (channel.Type == ChannelType.Timer) 156 { 157 Kevent ev; 158 AbstractTimer timerChannel = cast(AbstractTimer) channel; 159 if (timerChannel is null) 160 return false; 161 EV_SET(&ev, fd, EVFILT_TIMER, EV_DELETE, 0, 0, cast(void*) channel); 162 err = kevent(_kqueueFD, &ev, 1, null, 0, null); 163 } 164 else 165 { 166 Kevent[2] ev = void; 167 EV_SET(&(ev[0]), fd, EVFILT_READ, EV_DELETE, 0, 0, cast(void*) channel); 168 EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, cast(void*) channel); 169 170 if (channel.HasFlag(ChannelFlag.Read) && channel.HasFlag(ChannelFlag.Write)) 171 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null); 172 else if (channel.HasFlag(ChannelFlag.Read)) 173 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null); 174 else if (channel.HasFlag(ChannelFlag.Write)) 175 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null); 176 } 177 178 if (err < 0) 179 { 180 return false; 181 } 182 183 // channel.currtLoop = null; 184 channel.Clear(); 185 186 return true; 187 } 188 189 protected override int DoSelect(long timeout) { 190 // void* [] tmp; 191 // eventBuffer = tmp; 192 timespec ts; 193 timespec *tsp; 194 // timeout is in milliseconds. Convert to struct timespec. 195 // timeout == -1 : wait forever : timespec timeout of NULL 196 // timeout == 0 : return immediately : timespec timeout of zero 197 if (timeout >= 0) { 198 // For some indeterminate reason kevent(2) has been found to fail with 199 // an EINVAL Error for timeout values greater than or equal to 200 // 100000001000L. To avoid this problem, clamp the timeout arbitrarily 201 // to the maximum value of a 32-bit signed integer which is 202 // approximately 25 days in milliseconds. 203 const int timeoutMax = int.max; 204 if (timeout > timeoutMax) { 205 timeout = timeoutMax; 206 } 207 ts.tv_sec = timeout / 1000; 208 ts.tv_nsec = (timeout % 1000) * 1000000; //nanosec = 1 million millisec 209 tsp = &ts; 210 } else { 211 tsp = null; 212 } 213 214 // auto tspec = timespec(1, 1000 * 10); 215 int result = kevent(_kqueueFD, null, 0, events.ptr, events.length, tsp); 216 217 foreach (i; 0 .. result) 218 { 219 AbstractChannel channel = cast(AbstractChannel)(events[i].udata); 220 ushort eventFlags = events[i].flags; 221 222 version (GEAR_IO_DEBUG) 223 log.info("handling event: events=%d, fd=%d", eventFlags, channel.handle); 224 225 if (eventFlags & EV_ERROR) { 226 log.warn("channel[fd=%d] has a Error.", channel.handle); 227 channel.Close(); 228 continue; 229 } 230 231 if (eventFlags & EV_EOF) { 232 version (GEAR_IO_DEBUG) log.info("channel[fd=%d] closed", channel.handle); 233 channel.Close(); 234 continue; 235 } 236 237 short filter = events[i].filter; 238 ChannelEventHandle(channel, filter); 239 } 240 241 return result; 242 } 243 244 private void ChannelEventHandle(AbstractChannel channel, uint filter) { 245 version (GEAR_IO_DEBUG) 246 log.info("handling event: events=%d, fd=%d", filter, channel.handle); 247 248 if(filter == EVFILT_TIMER) 249 { 250 channel.OnRead(); 251 } 252 else if (filter == EVFILT_WRITE) 253 { 254 channel.OnWrite(); 255 } 256 else if (filter == EVFILT_READ) 257 { 258 channel.OnRead(); 259 } 260 else 261 { 262 log.warn("Unhandled channel filter: %d", filter); 263 } 264 } 265 } 266 267 enum : short { 268 EVFILT_READ = -1, 269 EVFILT_WRITE = -2, 270 EVFILT_AIO = -3, /* attached to aio requests */ 271 EVFILT_VNODE = -4, /* attached to vnodes */ 272 EVFILT_PROC = -5, /* attached to struct proc */ 273 EVFILT_SIGNAL = -6, /* attached to struct proc */ 274 EVFILT_TIMER = -7, /* timers */ 275 EVFILT_MACHPORT = -8, /* Mach portsets */ 276 EVFILT_FS = -9, /* filesystem events */ 277 EVFILT_USER = -10, /* User events */ 278 EVFILT_VM = -12, /* virtual memory events */ 279 EVFILT_SYSCOUNT = 11 280 } 281 282 extern (D) void EV_SET(Kevent* kevp, typeof(Kevent.tupleof) args) @nogc nothrow { 283 *kevp = Kevent(args); 284 } 285 286 struct Kevent { 287 uintptr_t ident; /* identifier for this event */ 288 short filter; /* filter for event */ 289 ushort flags; 290 uint fflags; 291 intptr_t data; 292 void* udata; /* opaque user data identifier */ 293 } 294 295 enum { 296 /* actions */ 297 EV_ADD = 0x0001, /* add event to kq (implies enable) */ 298 EV_DELETE = 0x0002, /* delete event from kq */ 299 EV_ENABLE = 0x0004, /* enable event */ 300 EV_DISABLE = 0x0008, /* disable event (not reported) */ 301 302 /* flags */ 303 EV_ONESHOT = 0x0010, /* only report one occurrence */ 304 EV_CLEAR = 0x0020, /* clear event state after reporting */ 305 EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */ 306 EV_DISPATCH = 0x0080, /* disable event after reporting */ 307 308 EV_SYSFLAGS = 0xF000, /* reserved by system */ 309 EV_FLAG1 = 0x2000, /* filter-specific flag */ 310 311 /* returned values */ 312 EV_EOF = 0x8000, /* EOF detected */ 313 EV_ERROR = 0x4000, /* Error, data contains errno */ 314 315 } 316 317 enum { 318 /* 319 * data/hint flags/masks for EVFILT_USER, shared with userspace 320 * 321 * On input, the top two bits of fflags specifies how the lower twenty four 322 * bits should be applied to the stored value of fflags. 323 * 324 * On output, the top two bits will always be set to NOTE_FFNOP and the 325 * remaining twenty four bits will contain the stored fflags value. 326 */ 327 NOTE_FFNOP = 0x00000000, /* ignore input fflags */ 328 NOTE_FFAND = 0x40000000, /* AND fflags */ 329 NOTE_FFOR = 0x80000000, /* OR fflags */ 330 NOTE_FFCOPY = 0xc0000000, /* copy fflags */ 331 NOTE_FFCTRLMASK = 0xc0000000, /* masks for operations */ 332 NOTE_FFLAGSMASK = 0x00ffffff, 333 334 NOTE_TRIGGER = 0x01000000, /* Cause the event to be 335 triggered for output. */ 336 337 /* 338 * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace 339 */ 340 NOTE_LOWAT = 0x0001, /* low water mark */ 341 342 /* 343 * data/hint flags for EVFILT_VNODE, shared with userspace 344 */ 345 NOTE_DELETE = 0x0001, /* vnode was removed */ 346 NOTE_WRITE = 0x0002, /* data contents changed */ 347 NOTE_EXTEND = 0x0004, /* size increased */ 348 NOTE_ATTRIB = 0x0008, /* attributes changed */ 349 NOTE_LINK = 0x0010, /* link count changed */ 350 NOTE_RENAME = 0x0020, /* vnode was renamed */ 351 NOTE_REVOKE = 0x0040, /* vnode access was revoked */ 352 353 /* 354 * data/hint flags for EVFILT_PROC, shared with userspace 355 */ 356 NOTE_EXIT = 0x80000000, /* process exited */ 357 NOTE_FORK = 0x40000000, /* process forked */ 358 NOTE_EXEC = 0x20000000, /* process exec'd */ 359 NOTE_PCTRLMASK = 0xf0000000, /* mask for hint bits */ 360 NOTE_PDATAMASK = 0x000fffff, /* mask for pid */ 361 362 /* additional flags for EVFILT_PROC */ 363 NOTE_TRACK = 0x00000001, /* follow across forks */ 364 NOTE_TRACKERR = 0x00000002, /* could not track child */ 365 NOTE_CHILD = 0x00000004, /* am a child process */ 366 367 } 368 369 extern (C) { 370 int kqueue() @nogc nothrow; 371 int kevent(int kq, const Kevent* changelist, int nchanges, 372 Kevent* eventlist, int nevents, const timespec* timeout) @nogc nothrow; 373 } 374 375 static if (CompilerHelper.IsLessThan(2078)) { 376 enum SO_REUSEPORT = 0x0200; 377 }