1 module geario.util.queue.SimpleQueue;
2 
3 import geario.logging;
4 import geario.util.queue.Queue;
5 
6 import core.atomic;
7 import core.sync.condition;
8 import core.sync.mutex;
9 import core.time;
10 import core.thread;
11 
12 import std.container.dlist;
13 // import ikod.containers.unrolledlist: UnrolledList;
14 
15 /**
16  * It's a thread-safe queue
17  */
18 class SimpleQueue(T) : Queue!(T) {
19     private DList!T _list;
20     // private UnrolledList!T _list;
21     private Mutex _headLock;
22     private Duration _timeout;
23     private bool _isWaiting = false;
24 
25     shared int _incomings = 0;
26     shared int _outgoings = 0;
27 
28     /** Wait queue for waiting takes */
29     private Condition _notEmpty;
30 
31     this(Duration timeout = 10.seconds) {
32         _timeout = timeout;
33         _headLock = new Mutex();
34         _notEmpty = new Condition(_headLock);
35     }
36 
37     override bool IsEmpty() {
38         _headLock.lock();
39         scope (exit)
40             _headLock.unlock();
41 
42         return _list.empty();
43     }
44 
45     override void Clear() {
46         _headLock.lock();
47         scope (exit)
48             _headLock.unlock();
49         _list.clear();
50     }
51 
52     override T Pop() {
53         _headLock.lock();
54         scope (exit) {
55             _headLock.unlock();
56         }
57 
58         if(IsEmpty()) {
59             _isWaiting = true;
60             bool v = _notEmpty.wait(_timeout);
61             _isWaiting = false;
62             if(!v) {
63                 version (GEAR_IO_DEBUG) {
64                     log.trace("Timeout in %s.", _timeout);
65                 }
66                 return T.init;
67             }
68         }
69 
70         T item = _list.front();
71         _list.removeFront();
72 
73         return item;
74     }
75 
76     override void Push(T item) {
77         _headLock.lock();
78         scope (exit)
79             _headLock.unlock();
80 
81         _list.insert(item);
82 
83         if(_isWaiting) {
84             _notEmpty.notify();
85         }
86     }
87 
88     bool TryDequeue(out T item) {
89         _headLock.lock();
90         scope (exit)
91             _headLock.unlock();
92         
93         if(_list.empty()) {
94             return false;
95         }
96 
97         item = _list.front();
98         _list.removeFront();
99 
100         return true;
101     }
102 }