1 /*
2  * Geario - A cross-platform abstraction library with asynchronous I/O.
3  *
4  * Copyright (C) 2021-2022 Kerisy.com
5  *
6  * Website: https://www.kerisy.com
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module geario.event.EventLoopGroup;
13 
14 import geario.event.EventLoop;
15 import geario.logging;
16 import geario.system.Memory;
17 import geario.util.Lifecycle;
18 import geario.util.worker;
19 
20 import core.atomic;
21 
22 /**
23  * 
24  */
25 class EventLoopGroup : Lifecycle {
26     private TaskQueue _pool;
27     private Worker _worker;
28 
29     this(size_t ioThreadSize = (totalCPUs - 1), size_t workerThreadSize = 0) {
30         size_t _size = ioThreadSize > 0 ? ioThreadSize : 1;
31 
32         version(GEAR_DEBUG) log.info("ioThreadSize: %d, workerThreadSize: %d", ioThreadSize, workerThreadSize);
33 
34         _eventLoops = new EventLoop[_size];
35 
36         if(workerThreadSize > 0) {
37             _pool = new MemoryTaskQueue();
38             _worker = new Worker(_pool, workerThreadSize);
39             _worker.Run();
40         } 
41 
42         foreach (i; 0 .. _size) {
43             _eventLoops[i] = new EventLoop(i, _size, _worker);
44         }
45     }
46 
47     void Start() {
48         Start(-1);
49     }
50 
51     Worker worker() {
52         return _worker;
53     }
54 
55     /**
56         timeout: in millisecond
57     */
58     void Start(long timeout) {
59         if (cas(&_isRunning, false, true)) {
60             foreach (EventLoop pool; _eventLoops) {
61                 pool.RunAsync(timeout);
62             }
63         }
64     }
65 
66     void Stop() {
67         if (!cas(&_isRunning, true, false))
68             return;
69 
70         if(_worker !is null) {
71             _worker.Stop();
72         }
73 
74         version (GEAR_IO_DEBUG)
75             Trace("stopping EventLoopGroup...");
76         foreach (EventLoop pool; _eventLoops) {
77             pool.Stop();
78         }
79 
80         version (GEAR_IO_DEBUG)
81             Trace("EventLoopGroup stopped.");
82     }
83 
84     bool IsRunning() {
85         return _isRunning;
86     }
87 
88     bool IsReady() {
89         
90         foreach (EventLoop pool; _eventLoops) {
91             if(!pool.IsReady()) return false;
92         }
93 
94         return true;
95     }
96 
97     @property size_t size() {
98         return _eventLoops.length;
99     }
100 
101     EventLoop nextLoop(size_t factor) {
102        return _eventLoops[factor % _eventLoops.length];
103     }
104 
105     EventLoop OpIndex(size_t index) {
106         auto i = index % _eventLoops.length;
107         return _eventLoops[i];
108     }
109 
110     EventLoop[] Loops()
111     {
112         return _eventLoops;
113     }
114 
115     int opApply(scope int delegate(EventLoop) dg) {
116         int ret = 0;
117         foreach (pool; _eventLoops) {
118             ret = dg(pool);
119             if (ret)
120                 break;
121         }
122         return ret;
123     }
124 
125 private:
126     shared int _loopIndex;
127     shared bool _isRunning;
128     EventLoop[] _eventLoops;
129 }