1 /* 2 * Geario - A cross-platform abstraction library with asynchronous I/O. 3 * 4 * Copyright (C) 2021-2022 Kerisy.com 5 * 6 * Website: https://www.kerisy.com 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module geario.net.TcpListener; 13 14 public import geario.net.TcpStream; 15 public import geario.net.TcpStreamOptions; 16 public import geario.net.IoError; 17 18 import geario.net.channel; 19 20 import geario.system.Memory : totalCPUs; 21 22 import geario.event.EventLoop; 23 import geario.event.EventLoopThreadPool; 24 import geario.util.ThreadPool; 25 import geario.Exceptions; 26 import geario.Functions; 27 import geario.logging; 28 import geario.util.CompilerHelper; 29 30 import std.socket; 31 import std.exception; 32 import core.thread; 33 import core.time; 34 35 alias AcceptEventHandler = void delegate(TcpListener sender, TcpStream stream); 36 alias PeerCreateHandler = TcpStream delegate(TcpListener sender, Socket socket, size_t bufferSize); 37 alias EventErrorHandler = void delegate(IoError Error); 38 39 /** 40 * 41 */ 42 class TcpListener : AbstractListener 43 { 44 protected bool _isSslEnabled = false; 45 protected bool _isBlocking = false; 46 protected bool _isBinded = false; 47 48 protected EventLoopThreadPool _loopThreadPool; 49 protected size_t _ioThreads; 50 protected TcpStreamOptions _tcpStreamoption; 51 protected EventHandler _shutdownHandler; 52 53 /// event handlers 54 AcceptEventHandler acceptHandler; 55 SimpleEventHandler closeHandler; 56 PeerCreateHandler peerCreateHandler; 57 EventErrorHandler errorHandler; 58 59 private int _backlog = 1024; 60 61 this(EventLoop loop = null, AddressFamily family = AddressFamily.INET, size_t bufferSize = 1024) 62 { 63 _ioThreads = 1; 64 _tcpStreamoption = TcpStreamOptions.Create(); 65 _tcpStreamoption.bufferSize = bufferSize; 66 67 if (loop is null) 68 loop = new EventLoop; 69 70 version (HAVE_IOCP) 71 super(loop, family, bufferSize); 72 else 73 super(loop, family); 74 } 75 76 TcpListener Threads(size_t ioThreads = totalCPUs) 77 { 78 _ioThreads = ioThreads > 1 ? ioThreads : 1; 79 return this; 80 } 81 82 TcpListener Accepted(AcceptEventHandler handler) { 83 acceptHandler = handler; 84 return this; 85 } 86 87 TcpListener Error(EventErrorHandler handler) 88 { 89 errorHandler = handler; 90 return this; 91 } 92 93 TcpListener OnPeerCreating(PeerCreateHandler handler) { 94 peerCreateHandler = handler; 95 return this; 96 } 97 98 TcpListener OnShutdown(EventHandler handler) { 99 _shutdownHandler = handler; 100 return this; 101 } 102 103 TcpListener Bind(string ip, ushort port) 104 { 105 return Bind(parseAddress(ip, port)); 106 } 107 108 TcpListener Bind(ushort port) 109 { 110 return Bind(CreateAddress(this.socket.addressFamily, port)); 111 } 112 113 TcpListener Bind(Address addr) 114 { 115 try 116 { 117 this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 118 this.socket.bind(addr); 119 this.socket.blocking = _isBlocking; 120 _localAddress = _socket.localAddress(); 121 _isBinded = true; 122 } 123 catch (SocketOSException e) 124 { 125 if (errorHandler !is null) 126 { 127 this.errorHandler(new IoError(ErrorCode.ADDRINUSE , e.msg)); 128 } 129 } 130 131 return this; 132 } 133 134 Address BindingAddress() 135 { 136 return _localAddress; 137 } 138 139 void Blocking(bool flag) 140 { 141 _isBlocking = flag; 142 // if(_isBinded) 143 this.socket.blocking = flag; 144 } 145 146 bool Blocking() 147 { 148 return _isBlocking; 149 } 150 151 /** 152 * https://stackoverflow.com/questions/14388706/socket-options-so-reuseaddr-and-so-reuseport-how-do-they-differ-do-they-mean-t 153 * https://www.cnblogs.com/xybaby/p/7341579.html 154 * https://rextester.com/BUAFK86204 155 */ 156 TcpListener ReusePort(bool flag) 157 { 158 if(_isBinded) { 159 throw new IOException("Must be set before binding."); 160 } 161 162 version (Posix) { 163 import core.sys.posix.sys.socket; 164 165 this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, flag); 166 this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_REUSEPORT, flag); 167 } else version (Windows) { 168 // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse 169 // https://docs.microsoft.com/zh-cn/windows/win32/winsock/so-exclusiveaddruse 170 // TODO: Tasks pending completion -@Administrator at 2020-05-25T15:04:42+08:00 171 // More tests needed 172 import core.sys.windows.winsock2; 173 this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_EXCLUSIVEADDRUSE, !flag); 174 } 175 176 return this; 177 } 178 179 TcpListener Listen(int backlog) 180 { 181 _backlog = backlog; 182 return this; 183 } 184 185 override void Start() 186 { 187 if (_ioThreads > 1) 188 _loopThreadPool = new EventLoopThreadPool(_ioThreads); 189 190 this.socket.listen(_backlog); 191 _loop.Register(this); 192 _isRegistered = true; 193 version (HAVE_IOCP) 194 this.DoAccept(); 195 } 196 197 override void Close() { 198 if (closeHandler !is null) 199 closeHandler(); 200 else if (_shutdownHandler !is null) 201 _shutdownHandler(this, null); 202 this.OnClose(); 203 } 204 205 protected override void OnRead() { 206 bool canRead = true; 207 version (GEAR_DEBUG) 208 Trace("start to listen"); 209 // while(canRead && this.isRegistered) // why?? 210 { 211 version (GEAR_DEBUG) 212 Trace("listening..."); 213 214 try 215 { 216 canRead = OnAccept((Socket socket) { 217 218 version (GEAR_DEBUG) { 219 log.info("new connection from %s, fd=%d", 220 socket.remoteAddress.toString(), socket.handle()); 221 } 222 223 if (acceptHandler !is null) { 224 TcpStream stream; 225 if (peerCreateHandler is null) { 226 if (_ioThreads > 1) 227 stream = new TcpStream(_loopThreadPool.GetNextLoop(), socket, _tcpStreamoption); 228 else 229 stream = new TcpStream(_loop, socket, _tcpStreamoption); 230 } 231 else 232 stream = peerCreateHandler(this, socket, _tcpStreamoption.bufferSize); 233 234 acceptHandler(this, stream); 235 stream.Start(); 236 } 237 }); 238 239 if (this.IsError) { 240 canRead = false; 241 log.error("listener Error: ", this.ErrorMessage); 242 this.Close(); 243 } 244 } 245 catch (SocketOSException e) 246 { 247 if (errorHandler !is null) 248 { 249 errorHandler(new IoError(ErrorCode.OTHER , e.msg)); 250 } 251 } 252 } 253 } 254 } 255 256 // dfmt off 257 version(linux): 258 // dfmt on 259 static if (CompilerHelper.IsLessThan(2078)) { 260 version (X86) { 261 enum SO_REUSEPORT = 15; 262 } else version (X86_64) { 263 enum SO_REUSEPORT = 15; 264 } else version (MIPS32) { 265 enum SO_REUSEPORT = 0x0200; 266 } else version (MIPS64) { 267 enum SO_REUSEPORT = 0x0200; 268 } else version (PPC) { 269 enum SO_REUSEPORT = 15; 270 } else version (PPC64) { 271 enum SO_REUSEPORT = 15; 272 } else version (ARM) { 273 enum SO_REUSEPORT = 15; 274 } 275 } 276 277 version (AArch64) { 278 enum SO_REUSEPORT = 15; 279 } 280 281 version(CRuntime_Musl) { 282 enum SO_REUSEPORT = 15; 283 }