diff options
Diffstat (limited to 'ThirdParty/SmartThreadPool/WorkItemsQueue.cs')
-rw-r--r-- | ThirdParty/SmartThreadPool/WorkItemsQueue.cs | 600 |
1 files changed, 600 insertions, 0 deletions
diff --git a/ThirdParty/SmartThreadPool/WorkItemsQueue.cs b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs new file mode 100644 index 0000000..af5af07 --- /dev/null +++ b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs | |||
@@ -0,0 +1,600 @@ | |||
1 | // Ami Bar | ||
2 | // amibar@gmail.com | ||
3 | |||
4 | using System; | ||
5 | using System.Threading; | ||
6 | |||
7 | namespace Amib.Threading.Internal | ||
8 | { | ||
9 | #region WorkItemsQueue class | ||
10 | |||
11 | /// <summary> | ||
12 | /// WorkItemsQueue class. | ||
13 | /// </summary> | ||
14 | public class WorkItemsQueue : IDisposable | ||
15 | { | ||
16 | #region Member variables | ||
17 | |||
18 | /// <summary> | ||
19 | /// Waiters queue (implemented as stack). | ||
20 | /// </summary> | ||
21 | private WaiterEntry _headWaiterEntry = new WaiterEntry(); | ||
22 | |||
23 | /// <summary> | ||
24 | /// Waiters count | ||
25 | /// </summary> | ||
26 | private int _waitersCount = 0; | ||
27 | |||
28 | /// <summary> | ||
29 | /// Work items queue | ||
30 | /// </summary> | ||
31 | private PriorityQueue _workItems = new PriorityQueue(); | ||
32 | |||
33 | /// <summary> | ||
34 | /// Indicate that work items are allowed to be queued | ||
35 | /// </summary> | ||
36 | private bool _isWorkItemsQueueActive = true; | ||
37 | |||
38 | /// <summary> | ||
39 | /// Each thread in the thread pool keeps its own waiter entry. | ||
40 | /// </summary> | ||
41 | [ThreadStatic] | ||
42 | private static WaiterEntry _waiterEntry; | ||
43 | |||
44 | /// <summary> | ||
45 | /// A flag that indicates if the WorkItemsQueue has been disposed. | ||
46 | /// </summary> | ||
47 | private bool _isDisposed = false; | ||
48 | |||
49 | #endregion | ||
50 | |||
51 | #region Public properties | ||
52 | |||
53 | /// <summary> | ||
54 | /// Returns the current number of work items in the queue | ||
55 | /// </summary> | ||
56 | public int Count | ||
57 | { | ||
58 | get | ||
59 | { | ||
60 | lock(this) | ||
61 | { | ||
62 | ValidateNotDisposed(); | ||
63 | return _workItems.Count; | ||
64 | } | ||
65 | } | ||
66 | } | ||
67 | |||
68 | /// <summary> | ||
69 | /// Returns the current number of waiters | ||
70 | /// </summary> | ||
71 | public int WaitersCount | ||
72 | { | ||
73 | get | ||
74 | { | ||
75 | lock(this) | ||
76 | { | ||
77 | ValidateNotDisposed(); | ||
78 | return _waitersCount; | ||
79 | } | ||
80 | } | ||
81 | } | ||
82 | |||
83 | |||
84 | #endregion | ||
85 | |||
86 | #region Public methods | ||
87 | |||
88 | /// <summary> | ||
89 | /// Enqueue a work item to the queue. | ||
90 | /// </summary> | ||
91 | public bool EnqueueWorkItem(WorkItem workItem) | ||
92 | { | ||
93 | // A work item cannot be null, since null is used in the | ||
94 | // WaitForWorkItem() method to indicate timeout or cancel | ||
95 | if (null == workItem) | ||
96 | { | ||
97 | throw new ArgumentNullException("workItem" , "workItem cannot be null"); | ||
98 | } | ||
99 | |||
100 | bool enqueue = true; | ||
101 | |||
102 | // First check if there is a waiter waiting for work item. During | ||
103 | // the check, timed out waiters are ignored. If there is no | ||
104 | // waiter then the work item is queued. | ||
105 | lock(this) | ||
106 | { | ||
107 | ValidateNotDisposed(); | ||
108 | |||
109 | if (!_isWorkItemsQueueActive) | ||
110 | { | ||
111 | return false; | ||
112 | } | ||
113 | |||
114 | while(_waitersCount > 0) | ||
115 | { | ||
116 | // Dequeue a waiter. | ||
117 | WaiterEntry waiterEntry = PopWaiter(); | ||
118 | |||
119 | // Signal the waiter. On success break the loop | ||
120 | if (waiterEntry.Signal(workItem)) | ||
121 | { | ||
122 | enqueue = false; | ||
123 | break; | ||
124 | } | ||
125 | } | ||
126 | |||
127 | if (enqueue) | ||
128 | { | ||
129 | // Enqueue the work item | ||
130 | _workItems.Enqueue(workItem); | ||
131 | } | ||
132 | } | ||
133 | return true; | ||
134 | } | ||
135 | |||
136 | |||
137 | /// <summary> | ||
138 | /// Waits for a work item or exits on timeout or cancel | ||
139 | /// </summary> | ||
140 | /// <param name="millisecondsTimeout">Timeout in milliseconds</param> | ||
141 | /// <param name="cancelEvent">Cancel wait handle</param> | ||
142 | /// <returns>Returns true if the resource was granted</returns> | ||
143 | public WorkItem DequeueWorkItem( | ||
144 | int millisecondsTimeout, | ||
145 | WaitHandle cancelEvent) | ||
146 | { | ||
147 | /// This method cause the caller to wait for a work item. | ||
148 | /// If there is at least one waiting work item then the | ||
149 | /// method returns immidiately with true. | ||
150 | /// | ||
151 | /// If there are no waiting work items then the caller | ||
152 | /// is queued between other waiters for a work item to arrive. | ||
153 | /// | ||
154 | /// If a work item didn't come within millisecondsTimeout or | ||
155 | /// the user canceled the wait by signaling the cancelEvent | ||
156 | /// then the method returns false to indicate that the caller | ||
157 | /// didn't get a work item. | ||
158 | |||
159 | WaiterEntry waiterEntry = null; | ||
160 | WorkItem workItem = null; | ||
161 | |||
162 | lock(this) | ||
163 | { | ||
164 | ValidateNotDisposed(); | ||
165 | |||
166 | // If there are waiting work items then take one and return. | ||
167 | if (_workItems.Count > 0) | ||
168 | { | ||
169 | workItem = _workItems.Dequeue() as WorkItem; | ||
170 | return workItem; | ||
171 | } | ||
172 | // No waiting work items ... | ||
173 | else | ||
174 | { | ||
175 | // Get the wait entry for the waiters queue | ||
176 | waiterEntry = GetThreadWaiterEntry(); | ||
177 | |||
178 | // Put the waiter with the other waiters | ||
179 | PushWaiter(waiterEntry); | ||
180 | } | ||
181 | } | ||
182 | |||
183 | // Prepare array of wait handle for the WaitHandle.WaitAny() | ||
184 | WaitHandle [] waitHandles = new WaitHandle [] { | ||
185 | waiterEntry.WaitHandle, | ||
186 | cancelEvent }; | ||
187 | |||
188 | // Wait for an available resource, cancel event, or timeout. | ||
189 | |||
190 | // During the wait we are supposes to exit the synchronization | ||
191 | // domain. (Placing true as the third argument of the WaitAny()) | ||
192 | // It just doesn't work, I don't know why, so I have lock(this) | ||
193 | // statments insted of one. | ||
194 | |||
195 | int index = WaitHandle.WaitAny( | ||
196 | waitHandles, | ||
197 | millisecondsTimeout, | ||
198 | true); | ||
199 | |||
200 | lock(this) | ||
201 | { | ||
202 | // success is true if it got a work item. | ||
203 | bool success = (0 == index); | ||
204 | |||
205 | // The timeout variable is used only for readability. | ||
206 | // (We treat cancel as timeout) | ||
207 | bool timeout = !success; | ||
208 | |||
209 | // On timeout update the waiterEntry that it is timed out | ||
210 | if (timeout) | ||
211 | { | ||
212 | // The Timeout() fails if the waiter has already been signaled | ||
213 | timeout = waiterEntry.Timeout(); | ||
214 | |||
215 | // On timeout remove the waiter from the queue. | ||
216 | // Note that the complexity is O(1). | ||
217 | if(timeout) | ||
218 | { | ||
219 | RemoveWaiter(waiterEntry, false); | ||
220 | } | ||
221 | |||
222 | // Again readability | ||
223 | success = !timeout; | ||
224 | } | ||
225 | |||
226 | // On success return the work item | ||
227 | if (success) | ||
228 | { | ||
229 | workItem = waiterEntry.WorkItem; | ||
230 | |||
231 | if (null == workItem) | ||
232 | { | ||
233 | workItem = _workItems.Dequeue() as WorkItem; | ||
234 | } | ||
235 | } | ||
236 | } | ||
237 | // On failure return null. | ||
238 | return workItem; | ||
239 | } | ||
240 | |||
241 | /// <summary> | ||
242 | /// Cleanup the work items queue, hence no more work | ||
243 | /// items are allowed to be queue | ||
244 | /// </summary> | ||
245 | protected virtual void Cleanup() | ||
246 | { | ||
247 | lock(this) | ||
248 | { | ||
249 | // Deactivate only once | ||
250 | if (!_isWorkItemsQueueActive) | ||
251 | { | ||
252 | return; | ||
253 | } | ||
254 | |||
255 | // Don't queue more work items | ||
256 | _isWorkItemsQueueActive = false; | ||
257 | |||
258 | foreach(WorkItem workItem in _workItems) | ||
259 | { | ||
260 | workItem.DisposeOfState(); | ||
261 | } | ||
262 | |||
263 | // Clear the work items that are already queued | ||
264 | _workItems.Clear(); | ||
265 | |||
266 | // Note: | ||
267 | // I don't iterate over the queue and dispose of work items's states, | ||
268 | // since if a work item has a state object that is still in use in the | ||
269 | // application then I must not dispose it. | ||
270 | |||
271 | // Tell the waiters that they were timed out. | ||
272 | // It won't signal them to exit, but to ignore their | ||
273 | // next work item. | ||
274 | while(_waitersCount > 0) | ||
275 | { | ||
276 | WaiterEntry waiterEntry = PopWaiter(); | ||
277 | waiterEntry.Timeout(); | ||
278 | } | ||
279 | } | ||
280 | } | ||
281 | |||
282 | #endregion | ||
283 | |||
284 | #region Private methods | ||
285 | |||
286 | /// <summary> | ||
287 | /// Returns the WaiterEntry of the current thread | ||
288 | /// </summary> | ||
289 | /// <returns></returns> | ||
290 | /// In order to avoid creation and destuction of WaiterEntry | ||
291 | /// objects each thread has its own WaiterEntry object. | ||
292 | private WaiterEntry GetThreadWaiterEntry() | ||
293 | { | ||
294 | if (null == _waiterEntry) | ||
295 | { | ||
296 | _waiterEntry = new WaiterEntry(); | ||
297 | } | ||
298 | _waiterEntry.Reset(); | ||
299 | return _waiterEntry; | ||
300 | } | ||
301 | |||
302 | #region Waiters stack methods | ||
303 | |||
304 | /// <summary> | ||
305 | /// Push a new waiter into the waiter's stack | ||
306 | /// </summary> | ||
307 | /// <param name="newWaiterEntry">A waiter to put in the stack</param> | ||
308 | public void PushWaiter(WaiterEntry newWaiterEntry) | ||
309 | { | ||
310 | // Remove the waiter if it is already in the stack and | ||
311 | // update waiter's count as needed | ||
312 | RemoveWaiter(newWaiterEntry, false); | ||
313 | |||
314 | // If the stack is empty then newWaiterEntry is the new head of the stack | ||
315 | if (null == _headWaiterEntry._nextWaiterEntry) | ||
316 | { | ||
317 | _headWaiterEntry._nextWaiterEntry = newWaiterEntry; | ||
318 | newWaiterEntry._prevWaiterEntry = _headWaiterEntry; | ||
319 | |||
320 | } | ||
321 | // If the stack is not empty then put newWaiterEntry as the new head | ||
322 | // of the stack. | ||
323 | else | ||
324 | { | ||
325 | // Save the old first waiter entry | ||
326 | WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry; | ||
327 | |||
328 | // Update the links | ||
329 | _headWaiterEntry._nextWaiterEntry = newWaiterEntry; | ||
330 | newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry; | ||
331 | newWaiterEntry._prevWaiterEntry = _headWaiterEntry; | ||
332 | oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry; | ||
333 | } | ||
334 | |||
335 | // Increment the number of waiters | ||
336 | ++_waitersCount; | ||
337 | } | ||
338 | |||
339 | /// <summary> | ||
340 | /// Pop a waiter from the waiter's stack | ||
341 | /// </summary> | ||
342 | /// <returns>Returns the first waiter in the stack</returns> | ||
343 | private WaiterEntry PopWaiter() | ||
344 | { | ||
345 | // Store the current stack head | ||
346 | WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry; | ||
347 | |||
348 | // Store the new stack head | ||
349 | WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry; | ||
350 | |||
351 | // Update the old stack head list links and decrement the number | ||
352 | // waiters. | ||
353 | RemoveWaiter(oldFirstWaiterEntry, true); | ||
354 | |||
355 | // Update the new stack head | ||
356 | _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry; | ||
357 | if (null != newHeadWaiterEntry) | ||
358 | { | ||
359 | newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry; | ||
360 | } | ||
361 | |||
362 | // Return the old stack head | ||
363 | return oldFirstWaiterEntry; | ||
364 | } | ||
365 | |||
366 | /// <summary> | ||
367 | /// Remove a waiter from the stack | ||
368 | /// </summary> | ||
369 | /// <param name="waiterEntry">A waiter entry to remove</param> | ||
370 | /// <param name="popDecrement">If true the waiter count is always decremented</param> | ||
371 | private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement) | ||
372 | { | ||
373 | // Store the prev entry in the list | ||
374 | WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry; | ||
375 | |||
376 | // Store the next entry in the list | ||
377 | WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry; | ||
378 | |||
379 | // A flag to indicate if we need to decrement the waiters count. | ||
380 | // If we got here from PopWaiter then we must decrement. | ||
381 | // If we got here from PushWaiter then we decrement only if | ||
382 | // the waiter was already in the stack. | ||
383 | bool decrementCounter = popDecrement; | ||
384 | |||
385 | // Null the waiter's entry links | ||
386 | waiterEntry._prevWaiterEntry = null; | ||
387 | waiterEntry._nextWaiterEntry = null; | ||
388 | |||
389 | // If the waiter entry had a prev link then update it. | ||
390 | // It also means that the waiter is already in the list and we | ||
391 | // need to decrement the waiters count. | ||
392 | if (null != prevWaiterEntry) | ||
393 | { | ||
394 | prevWaiterEntry._nextWaiterEntry = nextWaiterEntry; | ||
395 | decrementCounter = true; | ||
396 | } | ||
397 | |||
398 | // If the waiter entry had a next link then update it. | ||
399 | // It also means that the waiter is already in the list and we | ||
400 | // need to decrement the waiters count. | ||
401 | if (null != nextWaiterEntry) | ||
402 | { | ||
403 | nextWaiterEntry._prevWaiterEntry = prevWaiterEntry; | ||
404 | decrementCounter = true; | ||
405 | } | ||
406 | |||
407 | // Decrement the waiters count if needed | ||
408 | if (decrementCounter) | ||
409 | { | ||
410 | --_waitersCount; | ||
411 | } | ||
412 | } | ||
413 | |||
414 | #endregion | ||
415 | |||
416 | #endregion | ||
417 | |||
418 | #region WaiterEntry class | ||
419 | |||
420 | // A waiter entry in the _waiters queue. | ||
421 | public class WaiterEntry : IDisposable | ||
422 | { | ||
423 | #region Member variables | ||
424 | |||
425 | /// <summary> | ||
426 | /// Event to signal the waiter that it got the work item. | ||
427 | /// </summary> | ||
428 | private AutoResetEvent _waitHandle = new AutoResetEvent(false); | ||
429 | |||
430 | /// <summary> | ||
431 | /// Flag to know if this waiter already quited from the queue | ||
432 | /// because of a timeout. | ||
433 | /// </summary> | ||
434 | private bool _isTimedout = false; | ||
435 | |||
436 | /// <summary> | ||
437 | /// Flag to know if the waiter was signaled and got a work item. | ||
438 | /// </summary> | ||
439 | private bool _isSignaled = false; | ||
440 | |||
441 | /// <summary> | ||
442 | /// A work item that passed directly to the waiter withou going | ||
443 | /// through the queue | ||
444 | /// </summary> | ||
445 | private WorkItem _workItem = null; | ||
446 | |||
447 | private bool _isDisposed = false; | ||
448 | |||
449 | // Linked list members | ||
450 | internal WaiterEntry _nextWaiterEntry = null; | ||
451 | internal WaiterEntry _prevWaiterEntry = null; | ||
452 | |||
453 | #endregion | ||
454 | |||
455 | #region Construction | ||
456 | |||
457 | public WaiterEntry() | ||
458 | { | ||
459 | Reset(); | ||
460 | } | ||
461 | |||
462 | #endregion | ||
463 | |||
464 | #region Public methods | ||
465 | |||
466 | public WaitHandle WaitHandle | ||
467 | { | ||
468 | get { return _waitHandle; } | ||
469 | } | ||
470 | |||
471 | public WorkItem WorkItem | ||
472 | { | ||
473 | get | ||
474 | { | ||
475 | lock(this) | ||
476 | { | ||
477 | return _workItem; | ||
478 | } | ||
479 | } | ||
480 | } | ||
481 | |||
482 | /// <summary> | ||
483 | /// Signal the waiter that it got a work item. | ||
484 | /// </summary> | ||
485 | /// <returns>Return true on success</returns> | ||
486 | /// The method fails if Timeout() preceded its call | ||
487 | public bool Signal(WorkItem workItem) | ||
488 | { | ||
489 | lock(this) | ||
490 | { | ||
491 | if (!_isTimedout) | ||
492 | { | ||
493 | _workItem = workItem; | ||
494 | _isSignaled = true; | ||
495 | _waitHandle.Set(); | ||
496 | return true; | ||
497 | } | ||
498 | } | ||
499 | return false; | ||
500 | } | ||
501 | |||
502 | /// <summary> | ||
503 | /// Mark the wait entry that it has been timed out | ||
504 | /// </summary> | ||
505 | /// <returns>Return true on success</returns> | ||
506 | /// The method fails if Signal() preceded its call | ||
507 | public bool Timeout() | ||
508 | { | ||
509 | lock(this) | ||
510 | { | ||
511 | // Time out can happen only if the waiter wasn't marked as | ||
512 | // signaled | ||
513 | if (!_isSignaled) | ||
514 | { | ||
515 | // We don't remove the waiter from the queue, the DequeueWorkItem | ||
516 | // method skips _waiters that were timed out. | ||
517 | _isTimedout = true; | ||
518 | return true; | ||
519 | } | ||
520 | } | ||
521 | return false; | ||
522 | } | ||
523 | |||
524 | /// <summary> | ||
525 | /// Reset the wait entry so it can be used again | ||
526 | /// </summary> | ||
527 | public void Reset() | ||
528 | { | ||
529 | _workItem = null; | ||
530 | _isTimedout = false; | ||
531 | _isSignaled = false; | ||
532 | _waitHandle.Reset(); | ||
533 | } | ||
534 | |||
535 | /// <summary> | ||
536 | /// Free resources | ||
537 | /// </summary> | ||
538 | public void Close() | ||
539 | { | ||
540 | if (null != _waitHandle) | ||
541 | { | ||
542 | _waitHandle.Close(); | ||
543 | _waitHandle = null; | ||
544 | } | ||
545 | } | ||
546 | |||
547 | #endregion | ||
548 | |||
549 | #region IDisposable Members | ||
550 | |||
551 | public void Dispose() | ||
552 | { | ||
553 | if (!_isDisposed) | ||
554 | { | ||
555 | Close(); | ||
556 | _isDisposed = true; | ||
557 | } | ||
558 | } | ||
559 | |||
560 | ~WaiterEntry() | ||
561 | { | ||
562 | Dispose(); | ||
563 | } | ||
564 | |||
565 | #endregion | ||
566 | } | ||
567 | |||
568 | #endregion | ||
569 | |||
570 | #region IDisposable Members | ||
571 | |||
572 | public void Dispose() | ||
573 | { | ||
574 | if (!_isDisposed) | ||
575 | { | ||
576 | Cleanup(); | ||
577 | _isDisposed = true; | ||
578 | GC.SuppressFinalize(this); | ||
579 | } | ||
580 | } | ||
581 | |||
582 | ~WorkItemsQueue() | ||
583 | { | ||
584 | Cleanup(); | ||
585 | } | ||
586 | |||
587 | private void ValidateNotDisposed() | ||
588 | { | ||
589 | if(_isDisposed) | ||
590 | { | ||
591 | throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown"); | ||
592 | } | ||
593 | } | ||
594 | |||
595 | #endregion | ||
596 | } | ||
597 | |||
598 | #endregion | ||
599 | } | ||
600 | |||