1 module geario.util.queue.SimpleQueue; 2 3 import geario.logging; 4 import geario.util.queue.Queue; 5 6 import core.atomic; 7 import core.sync.condition; 8 import core.sync.mutex; 9 import core.time; 10 import core.thread; 11 12 import std.container.dlist; 13 // import ikod.containers.unrolledlist: UnrolledList; 14 15 /** 16 * It's a thread-safe queue 17 */ 18 class SimpleQueue(T) : Queue!(T) { 19 private DList!T _list; 20 // private UnrolledList!T _list; 21 private Mutex _headLock; 22 private Duration _timeout; 23 private bool _isWaiting = false; 24 25 shared int _incomings = 0; 26 shared int _outgoings = 0; 27 28 /** Wait queue for waiting takes */ 29 private Condition _notEmpty; 30 31 this(Duration timeout = 10.seconds) { 32 _timeout = timeout; 33 _headLock = new Mutex(); 34 _notEmpty = new Condition(_headLock); 35 } 36 37 override bool IsEmpty() { 38 _headLock.lock(); 39 scope (exit) 40 _headLock.unlock(); 41 42 return _list.empty(); 43 } 44 45 override void Clear() { 46 _headLock.lock(); 47 scope (exit) 48 _headLock.unlock(); 49 _list.clear(); 50 } 51 52 override T Pop() { 53 _headLock.lock(); 54 scope (exit) { 55 _headLock.unlock(); 56 } 57 58 if(IsEmpty()) { 59 _isWaiting = true; 60 bool v = _notEmpty.wait(_timeout); 61 _isWaiting = false; 62 if(!v) { 63 version (GEAR_IO_DEBUG) { 64 log.trace("Timeout in %s.", _timeout); 65 } 66 return T.init; 67 } 68 } 69 70 T item = _list.front(); 71 _list.removeFront(); 72 73 return item; 74 } 75 76 override void Push(T item) { 77 _headLock.lock(); 78 scope (exit) 79 _headLock.unlock(); 80 81 _list.insert(item); 82 83 if(_isWaiting) { 84 _notEmpty.notify(); 85 } 86 } 87 88 bool TryDequeue(out T item) { 89 _headLock.lock(); 90 scope (exit) 91 _headLock.unlock(); 92 93 if(_list.empty()) { 94 return false; 95 } 96 97 item = _list.front(); 98 _list.removeFront(); 99 100 return true; 101 } 102 }