1 /* 2 * Geario - A cross-platform abstraction library with asynchronous I/O. 3 * 4 * Copyright (C) 2021-2022 Kerisy.com 5 * 6 * Website: https://www.kerisy.com 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module geario.util.worker.WorkerThread; 13 14 import geario.util.Closeable; 15 import geario.util.ResoureManager; 16 import geario.util.worker.Task; 17 import geario.util.worker.Worker; 18 19 import geario.logging; 20 21 import core.atomic; 22 import core.memory; 23 import core.thread; 24 import core.sync.condition; 25 import core.sync.mutex; 26 import std.conv; 27 28 29 30 enum WorkerThreadState { 31 Idle, 32 Busy, // occupied 33 Stopped 34 } 35 36 bool InWorkerThread() { 37 WorkerThread th = cast(WorkerThread) Thread.getThis(); 38 return th !is null; 39 } 40 41 /** 42 * 43 */ 44 class WorkerThread : Thread { 45 46 private shared WorkerThreadState _state; 47 private size_t _index; 48 private Task _task; 49 private Duration _timeout; 50 51 private Condition _condition; 52 private Mutex _mutex; 53 54 this(size_t index, Duration timeout = 5.seconds, size_t stackSize = 0) { 55 _index = index; 56 _timeout = timeout; 57 _state = WorkerThreadState.Idle; 58 _mutex = new Mutex(); 59 _condition = new Condition(_mutex); 60 this.name = "WorkerThread-" ~ _index.to!string(); 61 super(&Run, stackSize); 62 } 63 64 void Stop() { 65 _state = WorkerThreadState.Stopped; 66 } 67 68 bool IsBusy() { 69 return _state == WorkerThreadState.Busy; 70 } 71 72 bool IsIdle() { 73 return _state == WorkerThreadState.Idle; 74 } 75 76 WorkerThreadState State() { 77 return _state; 78 } 79 80 size_t Index() { 81 return _index; 82 } 83 84 Task task() { 85 return _task; 86 } 87 88 bool Attatch(Task task) { 89 assert(task !is null); 90 bool r = cas(&_state, WorkerThreadState.Idle, WorkerThreadState.Busy); 91 92 if (r) { 93 version(GEAR_IO_DEBUG) { 94 log.info("attatching task %d with thread %s", task.id, this.name); 95 } 96 97 _mutex.lock(); 98 scope (exit) { 99 _mutex.unlock(); 100 } 101 _task = task; 102 _condition.notify(); 103 104 } else { 105 log.warn("%s is unavailable. state: %s", this.name(), _state); 106 } 107 108 return r; 109 } 110 111 private void Run() { 112 while (_state != WorkerThreadState.Stopped) { 113 114 scope (exit) { 115 version (GEAR_IO_DEBUG) { 116 log.trace("%s Done. state: %s", this.name(), _state); 117 } 118 119 CollectResoure(); 120 _task = null; 121 bool r = cas(&_state, WorkerThreadState.Busy, WorkerThreadState.Idle); 122 if(!r) { 123 log.warn("Failed to set thread %s to Idle, its state is %s", this.name, _state); 124 } 125 } 126 127 try { 128 DoRun(); 129 } catch (Throwable ex) { 130 log.warn(ex); 131 } 132 } 133 134 version (GEAR_DEBUG) log.trace("%s Stopped. state: %s", this.name(), _state); 135 } 136 137 private bool _isWaiting = false; 138 139 private void DoRun() { 140 _mutex.lock(); 141 142 Task task = _task; 143 while(task is null && _state != WorkerThreadState.Stopped) { 144 bool r = _condition.wait(_timeout); 145 task = _task; 146 147 version(GEAR_IO_DEBUG) { 148 if(!r && _state == WorkerThreadState.Busy) { 149 if(task is null) { 150 log.warn("No task attatched on a busy thread %s in %s, task: %s", this.name, _timeout); 151 } else { 152 log.warn("more tests need for this status, thread %s in %s", this.name, _timeout); 153 } 154 } 155 } 156 } 157 158 _mutex.unlock(); 159 160 if(task !is null) { 161 version(GEAR_IO_DEBUG) { 162 log.trace("Try to exeucte task %d in thread %s, its status: %s", task.id, this.name, task.Status); 163 } 164 task.Execute(); 165 } 166 } 167 }