1 module geario.util.pool.ObjectPool; 2 3 import geario.concurrency.Future; 4 import geario.concurrency.Promise; 5 import geario.concurrency.FuturePromise; 6 import geario.logging; 7 8 import core.sync.mutex; 9 10 import std.container.dlist; 11 import core.time; 12 import std.format; 13 import std.range : walkLength; 14 15 import geario.util.pool.ObjectFactory; 16 import geario.util.pool.PooledObject; 17 18 /** 19 * 20 */ 21 enum CreationMode { 22 Lazy, 23 Eager 24 } 25 26 /** 27 * 28 */ 29 class PoolOptions { 30 size_t size = 5; 31 CreationMode creationMode = CreationMode.Lazy; 32 } 33 34 35 /** 36 * 37 */ 38 class ObjectPool(T) { 39 private ObjectFactory!(T) _factory; 40 private PooledObject!(T)[] _pooledObjects; 41 private Mutex _locker; 42 private DList!(FuturePromise!T) _waiters; 43 private PoolOptions _poolOptions; 44 45 this(PoolOptions options) { 46 this(new DefaultObjectFactory!(T)(), options); 47 } 48 49 this(ObjectFactory!(T) factory, PoolOptions options) { 50 _factory = factory; 51 _poolOptions = options; 52 _pooledObjects = new PooledObject!(T)[options.size]; 53 _locker = new Mutex(); 54 } 55 56 size_t size() { 57 return _poolOptions.size; 58 } 59 60 /** 61 * Obtains an instance from this pool. 62 * <p> 63 * By contract, clients <strong>must</strong> return the borrowed instance 64 * using {@link #ReturnObject}, {@link #invalidateObject}, or a related 65 * method as defined in an implementation or sub-interface. 66 * </p> 67 * <p> 68 * The behaviour of this method when the pool has been exhausted 69 * is not strictly specified (although it may be specified by 70 * implementations). 71 * </p> 72 * 73 * @return an instance from this pool. 74 */ 75 T Borrow(Duration timeout = 10.seconds, bool isQuiet = true) { 76 T r; 77 if(timeout == Duration.zero) { 78 _locker.lock(); 79 scope(exit) { 80 _locker.unlock(); 81 } 82 83 r = DoBorrow(); 84 if(r is null && !isQuiet) { 85 throw new Exception("No idle object avaliable."); 86 } 87 } else { 88 Future!T future = BorrowAsync(); 89 if(timeout.isNegative()) { 90 r = future.Get(); 91 } else { 92 r = future.Get(timeout); 93 } 94 } 95 return r; 96 } 97 98 99 /** 100 * 101 */ 102 Future!T BorrowAsync() { 103 _locker.lock(); 104 scope(exit) { 105 _locker.unlock(); 106 } 107 108 FuturePromise!T promise = new FuturePromise!T(); 109 110 if(_waiters.empty()) { 111 T r = DoBorrow(); 112 if(r is null) { 113 _waiters.stableInsert(promise); 114 version(GEAR_DEBUG) { 115 log.warn("New waiter...%d", GetNumWaiters()); 116 } 117 } else { 118 promise.Succeeded(r); 119 } 120 } else { 121 _waiters.stableInsert(promise); 122 version(GEAR_DEBUG) { 123 log.warn("New waiter...%d", GetNumWaiters()); 124 } 125 } 126 127 return promise; 128 } 129 130 /** 131 * 132 */ 133 private T DoBorrow() { 134 PooledObject!(T) pooledObj; 135 136 for(size_t index; index<_pooledObjects.length; index++) { 137 pooledObj = _pooledObjects[index]; 138 139 if(pooledObj is null) { 140 T underlyingObj = _factory.MakeObject(); 141 pooledObj = new PooledObject!(T)(underlyingObj); 142 _pooledObjects[index] = pooledObj; 143 break; 144 } else if(pooledObj.IsIdle()) { 145 T underlyingObj = pooledObj.GetObject(); 146 bool isValid = _factory.IsValid(underlyingObj); 147 if(!isValid) { 148 pooledObj.Invalidate(); 149 version(GEAR_DEBUG) { 150 log.warn("An invalid object (id=%d) detected at slot %d.", pooledObj.Id(), index); 151 } 152 _factory.DestroyObject(underlyingObj); 153 underlyingObj = _factory.MakeObject(); 154 pooledObj = new PooledObject!(T)(underlyingObj); 155 _pooledObjects[index] = pooledObj; 156 } 157 break; 158 } else if(pooledObj.IsInvalid()) { 159 T underlyingObj = pooledObj.GetObject(); 160 version(GEAR_DEBUG) { 161 log.warn("An invalid object (id=%d) detected at slot %d.", pooledObj.Id(), index); 162 } 163 _factory.DestroyObject(underlyingObj); 164 underlyingObj = _factory.MakeObject(); 165 pooledObj = new PooledObject!(T)(underlyingObj); 166 _pooledObjects[index] = pooledObj; 167 break; 168 } 169 170 pooledObj = null; 171 } 172 173 if(pooledObj is null) { 174 version(GEAR_DEBUG) { 175 log.warn("No idle object avaliable."); 176 } 177 return null; 178 } 179 180 pooledObj.Allocate(); 181 182 version(GEAR_DEBUG) { 183 log.info("borrowed: id=%d, createTime=%s; pool status = { %s }", 184 pooledObj.Id(), pooledObj.CreateTime(), toString()); 185 } 186 return pooledObj.GetObject(); 187 } 188 189 /** 190 * Returns an instance to the pool. By contract, <code>obj</code> 191 * <strong>must</strong> have been obtained using {@link #borrowObject()} or 192 * a related method as defined in an implementation or sub-interface. 193 * 194 * @param obj a {@link #borrowObject borrowed} instance to be returned. 195 */ 196 void ReturnObject(T obj) { 197 if(obj is null) { 198 version(GEAR_DEBUG) log.warn("Do nothing for a null object"); 199 return; 200 } 201 202 scope(exit) { 203 _locker.lock(); 204 scope(exit) { 205 _locker.unlock(); 206 } 207 HandleWaiters(); 208 } 209 210 DoReturning(obj); 211 } 212 213 private bool DoReturning(T obj) { 214 bool result = false; 215 216 PooledObject!(T) pooledObj; 217 for(size_t index; index<_pooledObjects.length; index++) { 218 pooledObj = _pooledObjects[index]; 219 if(pooledObj is null) { 220 continue; 221 } 222 223 T underlyingObj = pooledObj.GetObject(); 224 if(underlyingObj is obj) { 225 version(GEAR_DEBUG_MORE) { 226 log.trace("returning: id=%d, state=%s, count=%s, createTime=%s", 227 pooledObj.Id(), pooledObj.State(), pooledObj.BorrowedCount(), pooledObj.CreateTime()); 228 } 229 230 // pooledObj.Returning(); 231 result = pooledObj.Deallocate(); 232 version(GEAR_DEBUG) { 233 if(result) { 234 log.info("Returned: id=%d", pooledObj.Id()); 235 } else { 236 log.warn("Return failed: id=%d", pooledObj.Id()); 237 } 238 } 239 break; 240 } 241 } 242 243 version(GEAR_DEBUG) { 244 Info(toString()); 245 } 246 return result; 247 } 248 249 private void HandleWaiters() { 250 if(_waiters.empty()) 251 return; 252 253 FuturePromise!T waiter = _waiters.front(); 254 255 // clear up all the finished waiter 256 while(waiter.IsDone()) { 257 _waiters.removeFront(); 258 if(_waiters.empty()) { 259 return; 260 } 261 262 waiter = _waiters.front(); 263 } 264 265 // 266 T r = DoBorrow(); 267 if(r is null) { 268 log.warn("No idle object avaliable for waiter"); 269 } else { 270 _waiters.removeFront(); 271 try { 272 waiter.Succeeded(r); 273 } catch(Exception ex) { 274 log.warn(ex); 275 } 276 } 277 } 278 279 /** 280 * Returns the number of instances currently idle in this pool. This may be 281 * considered an approximation of the number of objects that can be 282 * {@link #borrowObject borrowed} without creating any new instances. 283 * Returns a negative value if this information is not available. 284 * @return the number of instances currently idle in this pool. 285 */ 286 size_t GetNumIdle() { 287 size_t count = 0; 288 289 foreach(PooledObject!(T) obj; _pooledObjects) { 290 if(obj is null || obj.IsIdle()) { 291 count++; 292 } 293 } 294 295 return count; 296 } 297 298 /** 299 * Returns the number of instances currently borrowed from this pool. Returns 300 * a negative value if this information is not available. 301 * @return the number of instances currently borrowed from this pool. 302 */ 303 size_t GetNumActive() { 304 size_t count = 0; 305 306 foreach(PooledObject!(T) obj; _pooledObjects) { 307 if(obj !is null && obj.IsInUse()) { 308 count++; 309 } 310 } 311 312 return count; 313 } 314 315 /** 316 * Returns an estimate of the number of threads currently blocked waiting for 317 * an object from the pool. This is intended for monitoring only, not for 318 * synchronization control. 319 * 320 * @return The estimate of the number of threads currently blocked waiting 321 * for an object from the pool 322 */ 323 size_t GetNumWaiters() { 324 return walkLength(_waiters[]); 325 } 326 327 /** 328 * Clears any objects sitting idle in the pool, releasing any associated 329 * resources (optional operation). Idle objects cleared must be 330 * {@link PooledObjectFactory#DestroyObject(PooledObject)}. 331 * 332 * @throws Exception if the pool cannot be cleared 333 */ 334 void Clear() { 335 version(GEAR_DEBUG) { 336 Info("Pool is clearing..."); 337 } 338 339 _locker.lock(); 340 scope(exit) { 341 _locker.unlock(); 342 } 343 344 for(size_t index; index<_pooledObjects.length; index++) { 345 PooledObject!(T) obj = _pooledObjects[index]; 346 347 if(obj !is null) { 348 version(GEAR_DEBUG) { 349 log.trace("clearing object: id=%d, slot=%d", obj.Id(), index); 350 } 351 352 _pooledObjects[index] = null; 353 obj.Abandoned(); 354 _factory.DestroyObject(obj.GetObject()); 355 } 356 } 357 } 358 359 /** 360 * Closes this pool, and free any resources associated with it. 361 * <p> 362 * Calling {@link #borrowObject} after invoking this 363 * method on a pool will cause them to throw an {@link IllegalStateException}. 364 * </p> 365 * <p> 366 * Implementations should silently fail if not all resources can be freed. 367 * </p> 368 */ 369 void Close() { 370 version(GEAR_DEBUG) { 371 Info("Pool is closing..."); 372 } 373 374 _locker.lock(); 375 scope(exit) { 376 _locker.unlock(); 377 } 378 379 for(size_t index; index<_pooledObjects.length; index++) { 380 PooledObject!(T) obj = _pooledObjects[index]; 381 382 if(obj !is null) { 383 version(GEAR_DEBUG) { 384 log.trace("destroying object: id=%d, slot=%d", obj.Id(), index); 385 } 386 387 _pooledObjects[index] = null; 388 obj.Abandoned(); 389 _factory.DestroyObject(obj.GetObject()); 390 } 391 } 392 393 } 394 395 override string toString() { 396 string str = format("Total: %d, Active: %d, Idle: %d, Waiters: %d", 397 size(), GetNumActive(), GetNumIdle(), GetNumWaiters()); 398 return str; 399 } 400 }