1 module geario.util.worker.Worker;
2 
3 import geario.util.worker.Task;
4 // import geario.util.worker.TaskQueue;
5 import geario.util.worker.WorkerThread;
6 import geario.logging;
7 
8 import core.atomic;
9 import core.sync.condition;
10 import core.sync.mutex;
11 import core.thread;
12 
13 import std.conv; 
14 import std.concurrency;
15 
16 
17 
18 /**
19  * 
20  */
21 class Worker {
22 
23     private size_t _size;
24     private WorkerThread[] _workerThreads;
25     private Task[size_t] _tasks;
26     private Mutex _taskLocker;
27 
28 
29     private TaskQueue _taskQueue;
30     private shared bool _isRunning = false;
31 
32     this(TaskQueue taskQueue, size_t size = 8) {
33         _taskQueue = taskQueue;
34         _size = size;
35 
36         version(GEAR_DEBUG) {
37             log.info("Worker size: %d", size);
38         }
39 
40         Initialize();
41     }
42 
43     private void Initialize() {
44         _taskLocker = new Mutex();
45         _workerThreads = new WorkerThread[_size];
46         
47         foreach(size_t index; 0 .. _size) {
48             WorkerThread thread = new WorkerThread(index);
49             thread.start();
50 
51             _workerThreads[index] = thread;
52         }
53     }
54 
55     void Inspect() {
56 
57         foreach(WorkerThread th; _workerThreads) {
58             
59             Task task = th.task();
60 
61             if(th.State() == WorkerThreadState.Busy) {
62                 if(task is null) {
63                     log.warn("A dead worker thread detected: %s, %s", th.name, th.State());
64                 } else {
65                     log.trace("Thread: %s,  state: %s, LifeTime: %s", th.name, th.State(), task.LifeTime());
66                 }
67             } else {
68                 if(task is null) {
69                     log.trace("Thread: %s,  state: %s", th.name, th.State());
70                 } else {
71                     log.trace("Thread: %s,  state: %s", th.name, th.State(), task.ExecutionTime);
72                 }
73             }
74         }
75     }
76 
77     void put(Task task) {
78         _taskQueue.Push(task);
79 
80         _taskLocker.lock();
81         scope(exit) {
82             _taskLocker.unlock();
83         }
84 
85         _tasks[task.id] = task;
86     }
87 
88     Task get(size_t id) {
89         _taskLocker.lock();
90         scope(exit) {
91             _taskLocker.unlock();
92         } 
93 
94         auto itemPtr = id in _tasks;
95         if(itemPtr is null) {
96             throw new Exception("Task does NOT exist: " ~ id.to!string);
97         }
98 
99         return *itemPtr;
100     }
101 
102     void Remove(size_t id) {
103         _taskLocker.lock();
104         scope(exit) {
105             _taskLocker.unlock();
106         } 
107 
108         _tasks.remove(id);
109     }
110 
111     void Clear() {
112         _taskLocker.lock();
113         scope(exit) {
114             _taskLocker.unlock();
115         } 
116         _tasks.clear();
117 
118     }
119 
120     void Run() {
121         bool r = cas(&_isRunning, false, true);
122         if(r) {
123             import std.parallelism;
124             auto t = task(&DoRun);
125             t.executeInNewThread();
126         }
127     }
128 
129     void Stop() {
130         _isRunning = false;
131         foreach(size_t index; 0 .. _size) {
132             _workerThreads[index].Stop();
133         }
134     }
135 
136     private WorkerThread findIdleThread() {
137         foreach(size_t index, WorkerThread thread; _workerThreads) {
138             version(GEAR_IO_DEBUG) {
139                 log.trace("Thread: %s, state: %s", thread.name, thread.State);
140             }
141 
142             if(thread.IsIdle())
143                 return thread;
144         }
145 
146         return null;
147     } 
148 
149     private void DoRun() {
150         while(_isRunning) {
151             try {
152                 version(GEAR_IO_DEBUG) Info("running...");
153                 Task task = _taskQueue.Pop();
154                 if(task is null) {
155                     version(GEAR_IO_DEBUG) {
156                         log.warn("A null task popped!");
157                         Inspect();
158                     }
159                     continue;
160                 }
161 
162                 WorkerThread workerThread;
163                 bool isAttatched = false;
164                 
165                 do {
166                     workerThread = findIdleThread();
167 
168                     // All worker threads are busy!
169                     if(workerThread is null) {
170                         // version(GEAR_METRIC) {
171                         //     _taskQueue.Inspect();
172                         // }
173                         // Trace("All worker threads are busy!");
174                         // Thread.sleep(1.seconds);
175                         // Thread.sleep(10.msecs);
176                         Thread.yield();
177                     } else {
178                         isAttatched = workerThread.Attatch(task);
179                     }
180                 } while(!isAttatched && _isRunning);
181 
182             } catch(Throwable ex) {
183                 log.warn(ex);
184             }
185         }
186 
187         version(GEAR_IO_DEBUG) log.warn("Worker stopped!");
188 
189     }
190 
191 }
192