1 /*
2  * Geario - A cross-platform abstraction library with asynchronous I/O.
3  *
4  * Copyright (C) 2021-2022 Kerisy.com
5  *
6  * Website: https://www.kerisy.com
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module geario.net.TcpListener;
13 
14 public import geario.net.TcpStream;
15 public import geario.net.TcpStreamOptions;
16 public import geario.net.IoError;
17 
18 import geario.net.channel;
19 
20 import geario.system.Memory : totalCPUs;
21 
22 import geario.event.EventLoop;
23 import geario.event.EventLoopThreadPool;
24 import geario.util.ThreadPool;
25 import geario.Exceptions;
26 import geario.Functions;
27 import geario.logging;
28 import geario.util.CompilerHelper;
29 
30 import std.socket;
31 import std.exception;
32 import core.thread;
33 import core.time;
34 
35 alias AcceptEventHandler = void delegate(TcpListener sender, TcpStream stream);
36 alias PeerCreateHandler = TcpStream delegate(TcpListener sender, Socket socket, size_t bufferSize);
37 alias EventErrorHandler = void delegate(IoError Error);
38 
39 /**
40  * 
41  */
42 class TcpListener : AbstractListener
43 {
44     protected bool _isSslEnabled = false;
45     protected bool _isBlocking = false;
46     protected bool _isBinded = false;
47 
48     protected EventLoopThreadPool _loopThreadPool;
49     protected size_t _ioThreads;
50     protected TcpStreamOptions _tcpStreamoption;
51     protected EventHandler _shutdownHandler;
52 
53     /// event handlers
54     AcceptEventHandler acceptHandler;
55     SimpleEventHandler closeHandler;
56     PeerCreateHandler peerCreateHandler;
57     EventErrorHandler errorHandler;
58 
59     private int _backlog = 1024;
60 
61     this(EventLoop loop = null, AddressFamily family = AddressFamily.INET, size_t bufferSize = 1024)
62     {
63         _ioThreads = 1;
64         _tcpStreamoption = TcpStreamOptions.Create();
65         _tcpStreamoption.bufferSize = bufferSize;
66 
67         if (loop is null)
68             loop = new EventLoop;
69         
70         version (HAVE_IOCP)
71             super(loop, family, bufferSize);
72         else
73             super(loop, family);
74     }
75 
76     TcpListener Threads(size_t ioThreads = totalCPUs)
77     {
78         _ioThreads = ioThreads > 1 ? ioThreads : 1;
79         return this;
80     }
81 
82     TcpListener Accepted(AcceptEventHandler handler) {
83         acceptHandler = handler;
84         return this;
85     }
86 
87     TcpListener Error(EventErrorHandler handler)
88     {
89         errorHandler = handler;
90         return this;
91     }
92 
93     TcpListener OnPeerCreating(PeerCreateHandler handler) {
94         peerCreateHandler = handler;
95         return this;
96     }
97 
98     TcpListener OnShutdown(EventHandler handler) {
99         _shutdownHandler = handler;
100         return this;
101     }
102 
103     TcpListener Bind(string ip, ushort port)
104     {
105         return Bind(parseAddress(ip, port));
106     }
107 
108     TcpListener Bind(ushort port)
109     {
110         return Bind(CreateAddress(this.socket.addressFamily, port));
111     }
112 
113     TcpListener Bind(Address addr)
114     {
115         try
116         {
117             this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
118             this.socket.bind(addr);
119             this.socket.blocking = _isBlocking;
120             _localAddress = _socket.localAddress();
121             _isBinded = true;
122         }
123         catch (SocketOSException e)
124         {
125             if (errorHandler !is null)
126             {
127                 this.errorHandler(new IoError(ErrorCode.ADDRINUSE , e.msg));
128             }
129         }
130 
131         return this;
132     }
133 
134     Address BindingAddress()
135     {
136         return _localAddress;
137     }
138 
139     void Blocking(bool flag)
140     {
141         _isBlocking = flag;
142         // if(_isBinded)
143         this.socket.blocking = flag;
144     }
145 
146     bool Blocking()
147     {
148         return _isBlocking;
149     }
150 
151     /**
152      * https://stackoverflow.com/questions/14388706/socket-options-so-reuseaddr-and-so-reuseport-how-do-they-differ-do-they-mean-t
153      * https://www.cnblogs.com/xybaby/p/7341579.html
154      * https://rextester.com/BUAFK86204
155      */
156     TcpListener ReusePort(bool flag)
157     {
158         if(_isBinded) {
159             throw new IOException("Must be set before binding.");
160         }
161 
162         version (Posix) {
163             import core.sys.posix.sys.socket;
164 
165             this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, flag);
166             this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_REUSEPORT, flag);
167         } else version (Windows) {
168             // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
169             // https://docs.microsoft.com/zh-cn/windows/win32/winsock/so-exclusiveaddruse
170             // TODO: Tasks pending completion -@Administrator at 2020-05-25T15:04:42+08:00
171             // More tests needed            
172             import core.sys.windows.winsock2;
173             this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_EXCLUSIVEADDRUSE, !flag);
174         }
175 
176         return this;
177     }
178 
179     TcpListener Listen(int backlog)
180     {
181         _backlog = backlog;
182         return this;
183     }
184 
185     override void Start()
186     {
187         if (_ioThreads > 1)
188             _loopThreadPool = new EventLoopThreadPool(_ioThreads);
189 
190         this.socket.listen(_backlog);
191         _loop.Register(this);
192         _isRegistered = true;
193         version (HAVE_IOCP)
194             this.DoAccept();
195     }
196 
197     override void Close() {
198         if (closeHandler !is null)
199             closeHandler();
200         else if (_shutdownHandler !is null)
201             _shutdownHandler(this, null);
202         this.OnClose();
203     }
204 
205     protected override void OnRead() {
206         bool canRead = true;
207         version (GEAR_DEBUG)
208             Trace("start to listen");
209         // while(canRead && this.isRegistered) // why??
210         {
211             version (GEAR_DEBUG)
212                 Trace("listening...");
213 
214             try
215             {
216                 canRead = OnAccept((Socket socket) {
217 
218                     version (GEAR_DEBUG) {
219                         log.info("new connection from %s, fd=%d",
220                         socket.remoteAddress.toString(), socket.handle());
221                     }
222 
223                     if (acceptHandler !is null) {
224                         TcpStream stream;
225                         if (peerCreateHandler is null) {
226                             if (_ioThreads > 1)
227                                 stream = new TcpStream(_loopThreadPool.GetNextLoop(), socket, _tcpStreamoption);
228                             else
229                                 stream = new TcpStream(_loop, socket, _tcpStreamoption);
230                         }
231                         else
232                             stream = peerCreateHandler(this, socket, _tcpStreamoption.bufferSize);
233 
234                         acceptHandler(this, stream);
235                         stream.Start();
236                     }
237                 });
238 
239                 if (this.IsError) {
240                     canRead = false;
241                     log.error("listener Error: ", this.ErrorMessage);
242                     this.Close();
243                 }
244             }
245             catch (SocketOSException e)
246             {
247                 if (errorHandler !is null)
248                 {
249                     errorHandler(new IoError(ErrorCode.OTHER , e.msg));
250                 }
251             }
252         }
253     }
254 }
255 
256 // dfmt off
257 version(linux):
258 // dfmt on
259 static if (CompilerHelper.IsLessThan(2078)) {
260     version (X86) {
261         enum SO_REUSEPORT = 15;
262     } else version (X86_64) {
263         enum SO_REUSEPORT = 15;
264     } else version (MIPS32) {
265         enum SO_REUSEPORT = 0x0200;
266     } else version (MIPS64) {
267         enum SO_REUSEPORT = 0x0200;
268     } else version (PPC) {
269         enum SO_REUSEPORT = 15;
270     } else version (PPC64) {
271         enum SO_REUSEPORT = 15;
272     } else version (ARM) {
273         enum SO_REUSEPORT = 15;
274     }
275 }
276 
277 version (AArch64) {
278     enum SO_REUSEPORT = 15;
279 }
280 
281 version(CRuntime_Musl) {
282     enum SO_REUSEPORT = 15;
283 }