1 module geario.util.worker.Worker; 2 3 import geario.util.worker.Task; 4 // import geario.util.worker.TaskQueue; 5 import geario.util.worker.WorkerThread; 6 import geario.logging; 7 8 import core.atomic; 9 import core.sync.condition; 10 import core.sync.mutex; 11 import core.thread; 12 13 import std.conv; 14 import std.concurrency; 15 16 17 18 /** 19 * 20 */ 21 class Worker { 22 23 private size_t _size; 24 private WorkerThread[] _workerThreads; 25 private Task[size_t] _tasks; 26 private Mutex _taskLocker; 27 28 29 private TaskQueue _taskQueue; 30 private shared bool _isRunning = false; 31 32 this(TaskQueue taskQueue, size_t size = 8) { 33 _taskQueue = taskQueue; 34 _size = size; 35 36 version(GEAR_DEBUG) { 37 log.info("Worker size: %d", size); 38 } 39 40 Initialize(); 41 } 42 43 private void Initialize() { 44 _taskLocker = new Mutex(); 45 _workerThreads = new WorkerThread[_size]; 46 47 foreach(size_t index; 0 .. _size) { 48 WorkerThread thread = new WorkerThread(index); 49 thread.start(); 50 51 _workerThreads[index] = thread; 52 } 53 } 54 55 void Inspect() { 56 57 foreach(WorkerThread th; _workerThreads) { 58 59 Task task = th.task(); 60 61 if(th.State() == WorkerThreadState.Busy) { 62 if(task is null) { 63 log.warn("A dead worker thread detected: %s, %s", th.name, th.State()); 64 } else { 65 log.trace("Thread: %s, state: %s, LifeTime: %s", th.name, th.State(), task.LifeTime()); 66 } 67 } else { 68 if(task is null) { 69 log.trace("Thread: %s, state: %s", th.name, th.State()); 70 } else { 71 log.trace("Thread: %s, state: %s", th.name, th.State(), task.ExecutionTime); 72 } 73 } 74 } 75 } 76 77 void put(Task task) { 78 _taskQueue.Push(task); 79 80 _taskLocker.lock(); 81 scope(exit) { 82 _taskLocker.unlock(); 83 } 84 85 _tasks[task.id] = task; 86 } 87 88 Task get(size_t id) { 89 _taskLocker.lock(); 90 scope(exit) { 91 _taskLocker.unlock(); 92 } 93 94 auto itemPtr = id in _tasks; 95 if(itemPtr is null) { 96 throw new Exception("Task does NOT exist: " ~ id.to!string); 97 } 98 99 return *itemPtr; 100 } 101 102 void Remove(size_t id) { 103 _taskLocker.lock(); 104 scope(exit) { 105 _taskLocker.unlock(); 106 } 107 108 _tasks.remove(id); 109 } 110 111 void Clear() { 112 _taskLocker.lock(); 113 scope(exit) { 114 _taskLocker.unlock(); 115 } 116 _tasks.clear(); 117 118 } 119 120 void Run() { 121 bool r = cas(&_isRunning, false, true); 122 if(r) { 123 import std.parallelism; 124 auto t = task(&DoRun); 125 t.executeInNewThread(); 126 } 127 } 128 129 void Stop() { 130 _isRunning = false; 131 foreach(size_t index; 0 .. _size) { 132 _workerThreads[index].Stop(); 133 } 134 } 135 136 private WorkerThread findIdleThread() { 137 foreach(size_t index, WorkerThread thread; _workerThreads) { 138 version(GEAR_IO_DEBUG) { 139 log.trace("Thread: %s, state: %s", thread.name, thread.State); 140 } 141 142 if(thread.IsIdle()) 143 return thread; 144 } 145 146 return null; 147 } 148 149 private void DoRun() { 150 while(_isRunning) { 151 try { 152 version(GEAR_IO_DEBUG) Info("running..."); 153 Task task = _taskQueue.Pop(); 154 if(task is null) { 155 version(GEAR_IO_DEBUG) { 156 log.warn("A null task popped!"); 157 Inspect(); 158 } 159 continue; 160 } 161 162 WorkerThread workerThread; 163 bool isAttatched = false; 164 165 do { 166 workerThread = findIdleThread(); 167 168 // All worker threads are busy! 169 if(workerThread is null) { 170 // version(GEAR_METRIC) { 171 // _taskQueue.Inspect(); 172 // } 173 // Trace("All worker threads are busy!"); 174 // Thread.sleep(1.seconds); 175 // Thread.sleep(10.msecs); 176 Thread.yield(); 177 } else { 178 isAttatched = workerThread.Attatch(task); 179 } 180 } while(!isAttatched && _isRunning); 181 182 } catch(Throwable ex) { 183 log.warn(ex); 184 } 185 } 186 187 version(GEAR_IO_DEBUG) log.warn("Worker stopped!"); 188 189 } 190 191 } 192