1 /*
2  * Geario - A cross-platform abstraction library with asynchronous I/O.
3  *
4  * Copyright (C) 2021-2022 Kerisy.com
5  *
6  * Website: https://www.kerisy.com
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module geario.util.ThreadPool;
13 
14 import core.thread;
15 
16 import geario.logging;
17 
18 import core.sync.mutex;
19 import core.sync.condition;
20 
21 import std.container.dlist;
22 
23 alias void delegate() ThreadTask;
24 
25 class ThreadPool
26 {
27 public:
28     this(size_t capacity = 8)
29     {
30         _capacity  = capacity;
31         _mutex     = new Mutex;
32         _condition = new Condition( _mutex );
33         _group     = new ThreadGroup;
34 
35         Init();
36     }
37 
38     ~this()
39     {
40         if (!_stopped)
41             Stop();
42     }
43 
44     void Stop()
45     {
46         _stopped = true;
47         _condition.notifyAll();
48         _group.joinAll();
49         _threads = null;
50     }
51 
52     void Emplace(ThreadTask task)
53     {
54         synchronized( _mutex )
55         {
56             _tasks.insertBack(task);
57             _condition.notify();
58 
59             Thread.yield();
60         }
61     }
62 
63 private:
64     void Init()
65     {
66         for ( size_t i = 0; i < _capacity; i++ )
67         {
68             _threads ~= _group.create(&Work);
69         }
70     }
71 
72     void Work()
73     {
74         while (!_stopped)
75         {
76             ThreadTask task;
77             synchronized( _mutex )
78             {
79                 if (_tasks.empty())
80                 {
81                     _condition.wait();
82                 }
83 
84                 if ( !_tasks.empty() )
85                 {
86                     task = _tasks.front();
87                     _tasks.removeFront();
88                 }
89             }
90 
91             if(task !is null)
92             {
93                 task();
94             }
95         }
96     }
97 
98     DList!ThreadTask _tasks;
99     size_t           _capacity;
100     bool             _stopped;
101     Mutex            _mutex;
102     Condition        _condition;
103     ThreadGroup      _group;
104     Thread[]         _threads;
105 }