1 module geario.util.worker.Task; 2 3 import geario.util.queue; 4 import geario.logging; 5 6 import core.atomic; 7 import std.datetime; 8 import std.format; 9 10 enum TaskStatus : ubyte { 11 Ready, 12 Processing, 13 Terminated, 14 Done 15 } 16 17 alias TaskQueue = Queue!Task; 18 alias MemoryTaskQueue = SimpleQueue!Task; 19 20 /** 21 * 22 */ 23 abstract class Task { 24 protected shared TaskStatus _status; 25 26 size_t id; 27 28 private MonoTime _createTime; 29 private MonoTime _startTime; 30 private MonoTime _endTime; 31 32 this() { 33 _status = TaskStatus.Ready; 34 _createTime = MonoTime.currTime; 35 } 36 37 Duration SurvivalTime() { 38 return _endTime - _createTime; 39 } 40 41 Duration ExecutionTime() { 42 return _endTime - _startTime; 43 } 44 45 Duration LifeTime() { 46 if(_endTime > _createTime) { 47 return SurvivalTime(); 48 } else { 49 return MonoTime.currTime - _createTime; 50 } 51 } 52 53 TaskStatus Status() { 54 return _status; 55 } 56 57 bool IsReady() { 58 return _status == TaskStatus.Ready; 59 } 60 61 bool IsProcessing() { 62 return _status == TaskStatus.Processing; 63 } 64 65 bool IsTerminated() { 66 return _status == TaskStatus.Terminated; 67 } 68 69 bool IsDone() { 70 return _status == TaskStatus.Done; 71 } 72 73 void Stop() { 74 75 version(GEAR_IO_DEBUG) { 76 log.trace("The task status: %s", _status); 77 } 78 79 if(!cas(&_status, TaskStatus.Processing, TaskStatus.Terminated) && 80 !cas(&_status, TaskStatus.Ready, TaskStatus.Terminated)) { 81 version(GEAR_IO_DEBUG) { 82 log.warn("The task status: %s", _status); 83 } 84 } 85 } 86 87 void Finish() { 88 version(GEAR_IO_DEBUG) { 89 log.trace("The task status: %s", _status); 90 } 91 92 if(cas(&_status, TaskStatus.Processing, TaskStatus.Done) || 93 cas(&_status, TaskStatus.Ready, TaskStatus.Done)) { 94 95 _endTime = MonoTime.currTime; 96 version(GEAR_IO_DEBUG) { 97 log.info("The task done."); 98 } 99 } else { 100 version(GEAR_IO_DEBUG) { 101 log.warn("The task status: %s", _status); 102 log.warn("Failed to set the task status to Done: %s", _status); 103 } 104 } 105 } 106 107 protected void DoExecute(); 108 109 void Execute() { 110 if(cas(&_status, TaskStatus.Ready, TaskStatus.Processing)) { 111 version(GEAR_IO_DEBUG) { 112 log.trace("Task %d executing... status: %s", id, _status); 113 } 114 _startTime = MonoTime.currTime; 115 scope(exit) { 116 Finish(); 117 version(GEAR_IO_DEBUG) { 118 Info("Task Done!"); 119 } 120 } 121 DoExecute(); 122 } else { 123 log.warn("Failed to Execute task %d. Its status is: %s", id, _status); 124 } 125 } 126 127 override string toString() { 128 return format("id: %d, status: %s", id, _status); 129 } 130 131 }