aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/ThirdParty/SmartThreadPool/WorkItemsQueue.cs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--ThirdParty/SmartThreadPool/WorkItemsQueue.cs600
1 files changed, 600 insertions, 0 deletions
diff --git a/ThirdParty/SmartThreadPool/WorkItemsQueue.cs b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs
new file mode 100644
index 0000000..af5af07
--- /dev/null
+++ b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs
@@ -0,0 +1,600 @@
1// Ami Bar
2// amibar@gmail.com
3
4using System;
5using System.Threading;
6
7namespace Amib.Threading.Internal
8{
9 #region WorkItemsQueue class
10
11 /// <summary>
12 /// WorkItemsQueue class.
13 /// </summary>
14 public class WorkItemsQueue : IDisposable
15 {
16 #region Member variables
17
18 /// <summary>
19 /// Waiters queue (implemented as stack).
20 /// </summary>
21 private WaiterEntry _headWaiterEntry = new WaiterEntry();
22
23 /// <summary>
24 /// Waiters count
25 /// </summary>
26 private int _waitersCount = 0;
27
28 /// <summary>
29 /// Work items queue
30 /// </summary>
31 private PriorityQueue _workItems = new PriorityQueue();
32
33 /// <summary>
34 /// Indicate that work items are allowed to be queued
35 /// </summary>
36 private bool _isWorkItemsQueueActive = true;
37
38 /// <summary>
39 /// Each thread in the thread pool keeps its own waiter entry.
40 /// </summary>
41 [ThreadStatic]
42 private static WaiterEntry _waiterEntry;
43
44 /// <summary>
45 /// A flag that indicates if the WorkItemsQueue has been disposed.
46 /// </summary>
47 private bool _isDisposed = false;
48
49 #endregion
50
51 #region Public properties
52
53 /// <summary>
54 /// Returns the current number of work items in the queue
55 /// </summary>
56 public int Count
57 {
58 get
59 {
60 lock(this)
61 {
62 ValidateNotDisposed();
63 return _workItems.Count;
64 }
65 }
66 }
67
68 /// <summary>
69 /// Returns the current number of waiters
70 /// </summary>
71 public int WaitersCount
72 {
73 get
74 {
75 lock(this)
76 {
77 ValidateNotDisposed();
78 return _waitersCount;
79 }
80 }
81 }
82
83
84 #endregion
85
86 #region Public methods
87
88 /// <summary>
89 /// Enqueue a work item to the queue.
90 /// </summary>
91 public bool EnqueueWorkItem(WorkItem workItem)
92 {
93 // A work item cannot be null, since null is used in the
94 // WaitForWorkItem() method to indicate timeout or cancel
95 if (null == workItem)
96 {
97 throw new ArgumentNullException("workItem" , "workItem cannot be null");
98 }
99
100 bool enqueue = true;
101
102 // First check if there is a waiter waiting for work item. During
103 // the check, timed out waiters are ignored. If there is no
104 // waiter then the work item is queued.
105 lock(this)
106 {
107 ValidateNotDisposed();
108
109 if (!_isWorkItemsQueueActive)
110 {
111 return false;
112 }
113
114 while(_waitersCount > 0)
115 {
116 // Dequeue a waiter.
117 WaiterEntry waiterEntry = PopWaiter();
118
119 // Signal the waiter. On success break the loop
120 if (waiterEntry.Signal(workItem))
121 {
122 enqueue = false;
123 break;
124 }
125 }
126
127 if (enqueue)
128 {
129 // Enqueue the work item
130 _workItems.Enqueue(workItem);
131 }
132 }
133 return true;
134 }
135
136
137 /// <summary>
138 /// Waits for a work item or exits on timeout or cancel
139 /// </summary>
140 /// <param name="millisecondsTimeout">Timeout in milliseconds</param>
141 /// <param name="cancelEvent">Cancel wait handle</param>
142 /// <returns>Returns true if the resource was granted</returns>
143 public WorkItem DequeueWorkItem(
144 int millisecondsTimeout,
145 WaitHandle cancelEvent)
146 {
147 /// This method cause the caller to wait for a work item.
148 /// If there is at least one waiting work item then the
149 /// method returns immidiately with true.
150 ///
151 /// If there are no waiting work items then the caller
152 /// is queued between other waiters for a work item to arrive.
153 ///
154 /// If a work item didn't come within millisecondsTimeout or
155 /// the user canceled the wait by signaling the cancelEvent
156 /// then the method returns false to indicate that the caller
157 /// didn't get a work item.
158
159 WaiterEntry waiterEntry = null;
160 WorkItem workItem = null;
161
162 lock(this)
163 {
164 ValidateNotDisposed();
165
166 // If there are waiting work items then take one and return.
167 if (_workItems.Count > 0)
168 {
169 workItem = _workItems.Dequeue() as WorkItem;
170 return workItem;
171 }
172 // No waiting work items ...
173 else
174 {
175 // Get the wait entry for the waiters queue
176 waiterEntry = GetThreadWaiterEntry();
177
178 // Put the waiter with the other waiters
179 PushWaiter(waiterEntry);
180 }
181 }
182
183 // Prepare array of wait handle for the WaitHandle.WaitAny()
184 WaitHandle [] waitHandles = new WaitHandle [] {
185 waiterEntry.WaitHandle,
186 cancelEvent };
187
188 // Wait for an available resource, cancel event, or timeout.
189
190 // During the wait we are supposes to exit the synchronization
191 // domain. (Placing true as the third argument of the WaitAny())
192 // It just doesn't work, I don't know why, so I have lock(this)
193 // statments insted of one.
194
195 int index = WaitHandle.WaitAny(
196 waitHandles,
197 millisecondsTimeout,
198 true);
199
200 lock(this)
201 {
202 // success is true if it got a work item.
203 bool success = (0 == index);
204
205 // The timeout variable is used only for readability.
206 // (We treat cancel as timeout)
207 bool timeout = !success;
208
209 // On timeout update the waiterEntry that it is timed out
210 if (timeout)
211 {
212 // The Timeout() fails if the waiter has already been signaled
213 timeout = waiterEntry.Timeout();
214
215 // On timeout remove the waiter from the queue.
216 // Note that the complexity is O(1).
217 if(timeout)
218 {
219 RemoveWaiter(waiterEntry, false);
220 }
221
222 // Again readability
223 success = !timeout;
224 }
225
226 // On success return the work item
227 if (success)
228 {
229 workItem = waiterEntry.WorkItem;
230
231 if (null == workItem)
232 {
233 workItem = _workItems.Dequeue() as WorkItem;
234 }
235 }
236 }
237 // On failure return null.
238 return workItem;
239 }
240
241 /// <summary>
242 /// Cleanup the work items queue, hence no more work
243 /// items are allowed to be queue
244 /// </summary>
245 protected virtual void Cleanup()
246 {
247 lock(this)
248 {
249 // Deactivate only once
250 if (!_isWorkItemsQueueActive)
251 {
252 return;
253 }
254
255 // Don't queue more work items
256 _isWorkItemsQueueActive = false;
257
258 foreach(WorkItem workItem in _workItems)
259 {
260 workItem.DisposeOfState();
261 }
262
263 // Clear the work items that are already queued
264 _workItems.Clear();
265
266 // Note:
267 // I don't iterate over the queue and dispose of work items's states,
268 // since if a work item has a state object that is still in use in the
269 // application then I must not dispose it.
270
271 // Tell the waiters that they were timed out.
272 // It won't signal them to exit, but to ignore their
273 // next work item.
274 while(_waitersCount > 0)
275 {
276 WaiterEntry waiterEntry = PopWaiter();
277 waiterEntry.Timeout();
278 }
279 }
280 }
281
282 #endregion
283
284 #region Private methods
285
286 /// <summary>
287 /// Returns the WaiterEntry of the current thread
288 /// </summary>
289 /// <returns></returns>
290 /// In order to avoid creation and destuction of WaiterEntry
291 /// objects each thread has its own WaiterEntry object.
292 private WaiterEntry GetThreadWaiterEntry()
293 {
294 if (null == _waiterEntry)
295 {
296 _waiterEntry = new WaiterEntry();
297 }
298 _waiterEntry.Reset();
299 return _waiterEntry;
300 }
301
302 #region Waiters stack methods
303
304 /// <summary>
305 /// Push a new waiter into the waiter's stack
306 /// </summary>
307 /// <param name="newWaiterEntry">A waiter to put in the stack</param>
308 public void PushWaiter(WaiterEntry newWaiterEntry)
309 {
310 // Remove the waiter if it is already in the stack and
311 // update waiter's count as needed
312 RemoveWaiter(newWaiterEntry, false);
313
314 // If the stack is empty then newWaiterEntry is the new head of the stack
315 if (null == _headWaiterEntry._nextWaiterEntry)
316 {
317 _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
318 newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
319
320 }
321 // If the stack is not empty then put newWaiterEntry as the new head
322 // of the stack.
323 else
324 {
325 // Save the old first waiter entry
326 WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
327
328 // Update the links
329 _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
330 newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry;
331 newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
332 oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry;
333 }
334
335 // Increment the number of waiters
336 ++_waitersCount;
337 }
338
339 /// <summary>
340 /// Pop a waiter from the waiter's stack
341 /// </summary>
342 /// <returns>Returns the first waiter in the stack</returns>
343 private WaiterEntry PopWaiter()
344 {
345 // Store the current stack head
346 WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
347
348 // Store the new stack head
349 WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry;
350
351 // Update the old stack head list links and decrement the number
352 // waiters.
353 RemoveWaiter(oldFirstWaiterEntry, true);
354
355 // Update the new stack head
356 _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry;
357 if (null != newHeadWaiterEntry)
358 {
359 newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry;
360 }
361
362 // Return the old stack head
363 return oldFirstWaiterEntry;
364 }
365
366 /// <summary>
367 /// Remove a waiter from the stack
368 /// </summary>
369 /// <param name="waiterEntry">A waiter entry to remove</param>
370 /// <param name="popDecrement">If true the waiter count is always decremented</param>
371 private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement)
372 {
373 // Store the prev entry in the list
374 WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry;
375
376 // Store the next entry in the list
377 WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry;
378
379 // A flag to indicate if we need to decrement the waiters count.
380 // If we got here from PopWaiter then we must decrement.
381 // If we got here from PushWaiter then we decrement only if
382 // the waiter was already in the stack.
383 bool decrementCounter = popDecrement;
384
385 // Null the waiter's entry links
386 waiterEntry._prevWaiterEntry = null;
387 waiterEntry._nextWaiterEntry = null;
388
389 // If the waiter entry had a prev link then update it.
390 // It also means that the waiter is already in the list and we
391 // need to decrement the waiters count.
392 if (null != prevWaiterEntry)
393 {
394 prevWaiterEntry._nextWaiterEntry = nextWaiterEntry;
395 decrementCounter = true;
396 }
397
398 // If the waiter entry had a next link then update it.
399 // It also means that the waiter is already in the list and we
400 // need to decrement the waiters count.
401 if (null != nextWaiterEntry)
402 {
403 nextWaiterEntry._prevWaiterEntry = prevWaiterEntry;
404 decrementCounter = true;
405 }
406
407 // Decrement the waiters count if needed
408 if (decrementCounter)
409 {
410 --_waitersCount;
411 }
412 }
413
414 #endregion
415
416 #endregion
417
418 #region WaiterEntry class
419
420 // A waiter entry in the _waiters queue.
421 public class WaiterEntry : IDisposable
422 {
423 #region Member variables
424
425 /// <summary>
426 /// Event to signal the waiter that it got the work item.
427 /// </summary>
428 private AutoResetEvent _waitHandle = new AutoResetEvent(false);
429
430 /// <summary>
431 /// Flag to know if this waiter already quited from the queue
432 /// because of a timeout.
433 /// </summary>
434 private bool _isTimedout = false;
435
436 /// <summary>
437 /// Flag to know if the waiter was signaled and got a work item.
438 /// </summary>
439 private bool _isSignaled = false;
440
441 /// <summary>
442 /// A work item that passed directly to the waiter withou going
443 /// through the queue
444 /// </summary>
445 private WorkItem _workItem = null;
446
447 private bool _isDisposed = false;
448
449 // Linked list members
450 internal WaiterEntry _nextWaiterEntry = null;
451 internal WaiterEntry _prevWaiterEntry = null;
452
453 #endregion
454
455 #region Construction
456
457 public WaiterEntry()
458 {
459 Reset();
460 }
461
462 #endregion
463
464 #region Public methods
465
466 public WaitHandle WaitHandle
467 {
468 get { return _waitHandle; }
469 }
470
471 public WorkItem WorkItem
472 {
473 get
474 {
475 lock(this)
476 {
477 return _workItem;
478 }
479 }
480 }
481
482 /// <summary>
483 /// Signal the waiter that it got a work item.
484 /// </summary>
485 /// <returns>Return true on success</returns>
486 /// The method fails if Timeout() preceded its call
487 public bool Signal(WorkItem workItem)
488 {
489 lock(this)
490 {
491 if (!_isTimedout)
492 {
493 _workItem = workItem;
494 _isSignaled = true;
495 _waitHandle.Set();
496 return true;
497 }
498 }
499 return false;
500 }
501
502 /// <summary>
503 /// Mark the wait entry that it has been timed out
504 /// </summary>
505 /// <returns>Return true on success</returns>
506 /// The method fails if Signal() preceded its call
507 public bool Timeout()
508 {
509 lock(this)
510 {
511 // Time out can happen only if the waiter wasn't marked as
512 // signaled
513 if (!_isSignaled)
514 {
515 // We don't remove the waiter from the queue, the DequeueWorkItem
516 // method skips _waiters that were timed out.
517 _isTimedout = true;
518 return true;
519 }
520 }
521 return false;
522 }
523
524 /// <summary>
525 /// Reset the wait entry so it can be used again
526 /// </summary>
527 public void Reset()
528 {
529 _workItem = null;
530 _isTimedout = false;
531 _isSignaled = false;
532 _waitHandle.Reset();
533 }
534
535 /// <summary>
536 /// Free resources
537 /// </summary>
538 public void Close()
539 {
540 if (null != _waitHandle)
541 {
542 _waitHandle.Close();
543 _waitHandle = null;
544 }
545 }
546
547 #endregion
548
549 #region IDisposable Members
550
551 public void Dispose()
552 {
553 if (!_isDisposed)
554 {
555 Close();
556 _isDisposed = true;
557 }
558 }
559
560 ~WaiterEntry()
561 {
562 Dispose();
563 }
564
565 #endregion
566 }
567
568 #endregion
569
570 #region IDisposable Members
571
572 public void Dispose()
573 {
574 if (!_isDisposed)
575 {
576 Cleanup();
577 _isDisposed = true;
578 GC.SuppressFinalize(this);
579 }
580 }
581
582 ~WorkItemsQueue()
583 {
584 Cleanup();
585 }
586
587 private void ValidateNotDisposed()
588 {
589 if(_isDisposed)
590 {
591 throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
592 }
593 }
594
595 #endregion
596 }
597
598 #endregion
599}
600