1 /*
2  * Geario - A cross-platform abstraction library with asynchronous I/O.
3  *
4  * Copyright (C) 2021-2022 Kerisy.com
5  *
6  * Website: https://www.kerisy.com
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module geario.util.worker.WorkerThread;
13 
14 import geario.util.Closeable;
15 import geario.util.ResoureManager;
16 import geario.util.worker.Task;
17 import geario.util.worker.Worker;
18 
19 import geario.logging;
20 
21 import core.atomic;
22 import core.memory;
23 import core.thread;
24 import core.sync.condition;
25 import core.sync.mutex;
26 import std.conv;
27 
28 
29 
30 enum WorkerThreadState {
31     Idle,
32     Busy, // occupied
33     Stopped
34 }
35 
36 bool InWorkerThread() {
37     WorkerThread th = cast(WorkerThread) Thread.getThis();
38     return th !is null;
39 }
40 
41 /**
42  *
43  */
44 class WorkerThread : Thread {
45 
46     private shared WorkerThreadState _state;
47     private size_t _index;
48     private Task _task;
49     private Duration _timeout;
50 
51     private Condition _condition;
52     private Mutex _mutex;
53 
54     this(size_t index, Duration timeout = 5.seconds, size_t stackSize = 0) {
55         _index = index;
56         _timeout = timeout;
57         _state = WorkerThreadState.Idle;
58         _mutex = new Mutex();
59         _condition = new Condition(_mutex);
60         this.name = "WorkerThread-" ~ _index.to!string();
61         super(&Run, stackSize);
62     }
63 
64     void Stop() {
65         _state = WorkerThreadState.Stopped;
66     }
67 
68     bool IsBusy() {
69         return _state == WorkerThreadState.Busy;
70     }
71     
72     bool IsIdle() {
73         return _state == WorkerThreadState.Idle;
74     }
75 
76     WorkerThreadState State() {
77         return _state;
78     }
79 
80     size_t Index() {
81         return _index;
82     }
83 
84     Task task() {
85         return _task;
86     }
87 
88     bool Attatch(Task task) {
89         assert(task !is null);
90         bool r = cas(&_state, WorkerThreadState.Idle, WorkerThreadState.Busy);
91 
92         if (r) {
93             version(GEAR_IO_DEBUG) {
94                 log.info("attatching task %d with thread %s", task.id, this.name);
95             }
96 
97             _mutex.lock();
98             scope (exit) {
99                 _mutex.unlock();
100             }
101             _task = task;
102             _condition.notify();
103             
104         } else {
105             log.warn("%s is unavailable. state: %s", this.name(), _state);
106         }
107 
108         return r;
109     }
110 
111     private void Run() {
112         while (_state != WorkerThreadState.Stopped) {
113 
114             scope (exit) {
115                 version (GEAR_IO_DEBUG) {
116                     log.trace("%s Done. state: %s", this.name(), _state);
117                 }
118 
119                 CollectResoure();
120                 _task = null;
121                 bool r = cas(&_state, WorkerThreadState.Busy, WorkerThreadState.Idle);
122                 if(!r) {
123                     log.warn("Failed to set thread %s to Idle, its state is %s", this.name, _state);
124                 }
125             } 
126 
127             try {
128                 DoRun();
129             } catch (Throwable ex) {
130                 log.warn(ex);
131             } 
132         }
133         
134         version (GEAR_DEBUG) log.trace("%s Stopped. state: %s", this.name(), _state);
135     }
136 
137     private bool _isWaiting = false;
138 
139     private void DoRun() {
140         _mutex.lock();
141         
142         Task task = _task;
143         while(task is null && _state != WorkerThreadState.Stopped) {
144             bool r = _condition.wait(_timeout);
145             task = _task;
146 
147             version(GEAR_IO_DEBUG) {
148                 if(!r && _state == WorkerThreadState.Busy) {
149                     if(task is null) {
150                         log.warn("No task attatched on a busy thread %s in %s, task: %s", this.name, _timeout);
151                     } else {
152                         log.warn("more tests need for this status, thread %s in %s", this.name, _timeout);
153                     }
154                 }
155             }
156         }
157 
158         _mutex.unlock();
159 
160         if(task !is null) {
161             version(GEAR_IO_DEBUG) {
162                 log.trace("Try to exeucte task %d in thread %s, its status: %s", task.id, this.name, task.Status);
163             }
164             task.Execute();
165         }
166     }
167 }