1 /*
2  * Hunt - A cross-platform abstraction library with asynchronous I/O.
3  *
4  * Copyright (C) 2021-2022 Kerisy.com
5  *
6  * Website: https://www.kerisy.com
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module geario.concurrency.FuturePromise;
13 
14 import geario.concurrency.Future;
15 import geario.concurrency.Promise;
16 
17 import geario.Exceptions;
18 import geario.logging;
19 
20 import core.atomic;
21 import core.thread;
22 import core.sync.mutex;
23 import core.sync.condition;
24 
25 import std.format;
26 import std.datetime;
27 
28 alias ThenHandler(T) = void delegate(T);
29 
30 /**
31  * 
32  */
33 class FuturePromise(T) : Future!T, Promise!T {
34     alias VoidHandler = void delegate();
35     
36     private shared bool _isCompleting = false;
37     private bool _isCompleted = false;
38     private Exception _cause;
39     private string _id;
40     private Mutex _waiterLocker;
41     private Condition _waiterCondition;
42 
43     this() {
44         _waiterLocker = new Mutex(this);
45         _waiterCondition = new Condition(_waiterLocker);
46     }
47 
48     string id() {
49         return _id;
50     }
51 
52     void id(string id) {
53         _id = id;
54     }
55 
56     ThenHandler!(Exception) _thenFailedHandler;
57 
58 static if(is(T == void)) {
59     VoidHandler _thenSucceededHandler;
60 
61     FuturePromise!R then(R)(R delegate() handler) {
62         FuturePromise!R result = new FuturePromise!(R);
63         _thenSucceededHandler = () {
64             try {
65                 R r = handler();
66                 result.Succeeded(r);
67             } catch(Exception ex) {
68                 Exception e = new Exception("then exception", ex);
69                 result.Failed(e);
70             }
71         };
72 
73         _thenFailedHandler = (Exception ex) {
74             Exception e = new Exception("then exception", ex);
75             result.Failed(e);
76         };
77 
78         return result;
79     }
80 
81     /**
82      * TODO: 
83      *     1) keep this operation atomic
84      *     2) return a flag to indicate whether this option is successful.
85      */
86     void Succeeded() {
87         if (cas(&_isCompleting, false, true)) {
88             OnCompleted();
89         } else {
90             log.warn("This promise has been done, and can't be set again. cause: %s", 
91                 typeid(_cause));
92         }
93     }
94 
95 } else {
96     ThenHandler!(T) _thenSucceededHandler;
97 
98     FuturePromise!R then(R)(R delegate(T) handler) {
99         FuturePromise!R result = new FuturePromise!(R);
100         _thenSucceededHandler = (T t) {
101             try {
102                 static if(is(R == void)) {
103                     handler(t);
104                     result.Succeeded();
105                 } else {
106                     R r = handler(t);
107                     result.Succeeded(r);
108                 }
109             } catch(Exception ex) {
110                 Exception e = new Exception("then exception", ex);
111                 result.Failed(e);
112             }
113         };
114 
115         _thenFailedHandler = (Exception ex) {
116             Exception e = new Exception("then exception", ex);
117             result.Failed(e);
118         };
119 
120         return result;
121     }
122 
123     /**
124      * TODO: 
125      *     1) keep this operation atomic
126      *     2) return a flag to indicate whether this option is successful.
127      */
128     void Succeeded(T result) {
129         if (cas(&_isCompleting, false, true)) {
130             _result = result;
131             OnCompleted();
132         } else {
133             log.warn("This promise has been done, and can't be set again.");
134         }
135     }
136     private T _result;
137 }
138 
139     /**
140      * TODO: 
141      *     1) keep this operation atomic
142      *     2) return a flag to indicate whether this option is successful.
143      */
144     void Failed(Exception cause) {
145         if (cas(&_isCompleting, false, true)) {
146             _cause = cause;    
147             OnCompleted();        
148         } else {
149             log.warn("This promise has been done, and can't be set again. cause: %s", 
150                 typeid(_cause));
151         }
152     }
153 
154     bool Cancel(bool mayInterruptIfRunning) {
155         if (cas(&_isCompleting, false, true)) {
156             static if(!is(T == void)) {
157                 _result = T.init;
158             }
159             _cause = new CancellationException("");
160             OnCompleted();
161             return true;
162         }
163         return false;
164     }
165 
166     private void OnCompleted() {
167         _waiterLocker.lock();
168         _isCompleted = true;
169         scope(exit) {
170             _waiterLocker.unlock();
171         }
172         
173         _waiterCondition.notifyAll();
174 
175         if(_cause is null) {
176             if(_thenSucceededHandler !is null) {
177                 static if(is(T == void)) {
178                     _thenSucceededHandler();
179                 } else {
180                     _thenSucceededHandler(_result);
181                 }
182             }
183         } else {
184             if(_thenFailedHandler !is null) {
185                 _thenFailedHandler(_cause);
186             }
187         }
188     }
189 
190     bool IsCancelled() {
191         if (_isCompleted) {
192             try {
193                 // _latch.await();
194                 // TODO: Tasks pending completion -@zhangxueping at 2019-12-26T15:18:42+08:00
195                 // 
196             } catch (InterruptedException e) {
197                 throw new RuntimeException(e.msg);
198             }
199             return typeid(_cause) == typeid(CancellationException);
200         }
201         return false;
202     }
203 
204     bool IsDone() {
205         return _isCompleted;
206     }
207 
208     T Get() {
209         return Get(-1.msecs);
210     }
211 
212     T Get(Duration timeout) {
213         // waitting for the completion
214         if(!_isCompleted) {
215             _waiterLocker.lock();
216             scope(exit) {
217                 _waiterLocker.unlock();
218             }
219 
220             if(timeout.isNegative()) {
221                 version (GEAR_DEBUG) Info("Waiting for a promise...");
222                 _waiterCondition.wait();
223             } else {
224                 version (GEAR_DEBUG) {
225                     log.info("Waiting for a promise in %s...", timeout);
226                 }
227                 bool r = _waiterCondition.wait(timeout);
228                 if(!r) {
229                     debug log.warn("Timeout for a promise in %s...", timeout);
230                     if (cas(&_isCompleting, false, true)) {
231                         _isCompleted = true;
232                         _cause = new TimeoutException("Timeout in " ~ timeout.toString());
233                     }
234                 }
235             }
236             
237             if(_cause is null) {
238                 version (GEAR_DEBUG) log.info("Got a succeeded promise.");
239             } else {
240                 version (GEAR_DEBUG) log.warn("Got a failed promise: %s", typeid(_cause));
241             }
242         } 
243 
244         // succeeded
245         if (_cause is null) {
246             static if(is(T == void)) {
247                 return;
248             } else {
249                 return _result;
250             }
251         }
252 
253         CancellationException c = cast(CancellationException) _cause;
254         if (c !is null) {
255             version(GEAR_DEBUG) Info("A promise cancelled.");
256             throw c;
257         }
258         
259         debug log.warn("Get a exception in a promise: ", _cause.msg);
260         version (GEAR_DEBUG) log.warn(_cause);
261         throw new ExecutionException(_cause);
262     }    
263 
264     override string toString() {
265         static if(is(T == void)) {
266             return format("FutureCallback@%x{%b, %b, void}", toHash(), _isCompleted, _cause is null);
267         } else {
268             return format("FutureCallback@%x{%b, %b, %s}", toHash(), _isCompleted, _cause is null, _result);
269         }
270     }
271 }