aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/ThirdParty/SmartThreadPool/WorkItemsQueue.cs
diff options
context:
space:
mode:
Diffstat (limited to 'ThirdParty/SmartThreadPool/WorkItemsQueue.cs')
-rw-r--r--ThirdParty/SmartThreadPool/WorkItemsQueue.cs646
1 files changed, 646 insertions, 0 deletions
diff --git a/ThirdParty/SmartThreadPool/WorkItemsQueue.cs b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs
new file mode 100644
index 0000000..21403a0
--- /dev/null
+++ b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs
@@ -0,0 +1,646 @@
1using System;
2using System.Collections.Generic;
3using System.Threading;
4
5namespace Amib.Threading.Internal
6{
7 #region WorkItemsQueue class
8
9 /// <summary>
10 /// WorkItemsQueue class.
11 /// </summary>
12 public class WorkItemsQueue : IDisposable
13 {
14 #region Member variables
15
16 /// <summary>
17 /// Waiters queue (implemented as stack).
18 /// </summary>
19 private readonly WaiterEntry _headWaiterEntry = new WaiterEntry();
20
21 /// <summary>
22 /// Waiters count
23 /// </summary>
24 private int _waitersCount = 0;
25
26 /// <summary>
27 /// Work items queue
28 /// </summary>
29 private readonly PriorityQueue _workItems = new PriorityQueue();
30
31 /// <summary>
32 /// Indicate that work items are allowed to be queued
33 /// </summary>
34 private bool _isWorkItemsQueueActive = true;
35
36
37#if (WINDOWS_PHONE)
38 private static readonly Dictionary<int, WaiterEntry> _waiterEntries = new Dictionary<int, WaiterEntry>();
39#elif (_WINDOWS_CE)
40 private static LocalDataStoreSlot _waiterEntrySlot = Thread.AllocateDataSlot();
41#else
42
43 [ThreadStatic]
44 private static WaiterEntry _waiterEntry;
45#endif
46
47
48 /// <summary>
49 /// Each thread in the thread pool keeps its own waiter entry.
50 /// </summary>
51 private static WaiterEntry CurrentWaiterEntry
52 {
53#if (WINDOWS_PHONE)
54 get
55 {
56 lock (_waiterEntries)
57 {
58 WaiterEntry waiterEntry;
59 if (_waiterEntries.TryGetValue(Thread.CurrentThread.ManagedThreadId, out waiterEntry))
60 {
61 return waiterEntry;
62 }
63 }
64 return null;
65 }
66 set
67 {
68 lock (_waiterEntries)
69 {
70 _waiterEntries[Thread.CurrentThread.ManagedThreadId] = value;
71 }
72 }
73#elif (_WINDOWS_CE)
74 get
75 {
76 return Thread.GetData(_waiterEntrySlot) as WaiterEntry;
77 }
78 set
79 {
80 Thread.SetData(_waiterEntrySlot, value);
81 }
82#else
83 get
84 {
85 return _waiterEntry;
86 }
87 set
88 {
89 _waiterEntry = value;
90 }
91#endif
92 }
93
94 /// <summary>
95 /// A flag that indicates if the WorkItemsQueue has been disposed.
96 /// </summary>
97 private bool _isDisposed = false;
98
99 #endregion
100
101 #region Public properties
102
103 /// <summary>
104 /// Returns the current number of work items in the queue
105 /// </summary>
106 public int Count
107 {
108 get
109 {
110 return _workItems.Count;
111 }
112 }
113
114 /// <summary>
115 /// Returns the current number of waiters
116 /// </summary>
117 public int WaitersCount
118 {
119 get
120 {
121 return _waitersCount;
122 }
123 }
124
125
126 #endregion
127
128 #region Public methods
129
130 /// <summary>
131 /// Enqueue a work item to the queue.
132 /// </summary>
133 public bool EnqueueWorkItem(WorkItem workItem)
134 {
135 // A work item cannot be null, since null is used in the
136 // WaitForWorkItem() method to indicate timeout or cancel
137 if (null == workItem)
138 {
139 throw new ArgumentNullException("workItem" , "workItem cannot be null");
140 }
141
142 bool enqueue = true;
143
144 // First check if there is a waiter waiting for work item. During
145 // the check, timed out waiters are ignored. If there is no
146 // waiter then the work item is queued.
147 lock(this)
148 {
149 ValidateNotDisposed();
150
151 if (!_isWorkItemsQueueActive)
152 {
153 return false;
154 }
155
156 while(_waitersCount > 0)
157 {
158 // Dequeue a waiter.
159 WaiterEntry waiterEntry = PopWaiter();
160
161 // Signal the waiter. On success break the loop
162 if (waiterEntry.Signal(workItem))
163 {
164 enqueue = false;
165 break;
166 }
167 }
168
169 if (enqueue)
170 {
171 // Enqueue the work item
172 _workItems.Enqueue(workItem);
173 }
174 }
175 return true;
176 }
177
178
179 /// <summary>
180 /// Waits for a work item or exits on timeout or cancel
181 /// </summary>
182 /// <param name="millisecondsTimeout">Timeout in milliseconds</param>
183 /// <param name="cancelEvent">Cancel wait handle</param>
184 /// <returns>Returns true if the resource was granted</returns>
185 public WorkItem DequeueWorkItem(
186 int millisecondsTimeout,
187 WaitHandle cancelEvent)
188 {
189 // This method cause the caller to wait for a work item.
190 // If there is at least one waiting work item then the
191 // method returns immidiately with it.
192 //
193 // If there are no waiting work items then the caller
194 // is queued between other waiters for a work item to arrive.
195 //
196 // If a work item didn't come within millisecondsTimeout or
197 // the user canceled the wait by signaling the cancelEvent
198 // then the method returns null to indicate that the caller
199 // didn't get a work item.
200
201 WaiterEntry waiterEntry;
202 WorkItem workItem = null;
203 lock (this)
204 {
205 ValidateNotDisposed();
206
207 // If there are waiting work items then take one and return.
208 if (_workItems.Count > 0)
209 {
210 workItem = _workItems.Dequeue() as WorkItem;
211 return workItem;
212 }
213
214 // No waiting work items ...
215
216 // Get the waiter entry for the waiters queue
217 waiterEntry = GetThreadWaiterEntry();
218
219 // Put the waiter with the other waiters
220 PushWaiter(waiterEntry);
221 }
222
223 // Prepare array of wait handle for the WaitHandle.WaitAny()
224 WaitHandle [] waitHandles = new WaitHandle[] {
225 waiterEntry.WaitHandle,
226 cancelEvent };
227
228 // Wait for an available resource, cancel event, or timeout.
229
230 // During the wait we are supposes to exit the synchronization
231 // domain. (Placing true as the third argument of the WaitAny())
232 // It just doesn't work, I don't know why, so I have two lock(this)
233 // statments instead of one.
234
235 int index = STPEventWaitHandle.WaitAny(
236 waitHandles,
237 millisecondsTimeout,
238 true);
239
240 lock(this)
241 {
242 // success is true if it got a work item.
243 bool success = (0 == index);
244
245 // The timeout variable is used only for readability.
246 // (We treat cancel as timeout)
247 bool timeout = !success;
248
249 // On timeout update the waiterEntry that it is timed out
250 if (timeout)
251 {
252 // The Timeout() fails if the waiter has already been signaled
253 timeout = waiterEntry.Timeout();
254
255 // On timeout remove the waiter from the queue.
256 // Note that the complexity is O(1).
257 if(timeout)
258 {
259 RemoveWaiter(waiterEntry, false);
260 }
261
262 // Again readability
263 success = !timeout;
264 }
265
266 // On success return the work item
267 if (success)
268 {
269 workItem = waiterEntry.WorkItem;
270
271 if (null == workItem)
272 {
273 workItem = _workItems.Dequeue() as WorkItem;
274 }
275 }
276 }
277 // On failure return null.
278 return workItem;
279 }
280
281 /// <summary>
282 /// Cleanup the work items queue, hence no more work
283 /// items are allowed to be queue
284 /// </summary>
285 private void Cleanup()
286 {
287 lock(this)
288 {
289 // Deactivate only once
290 if (!_isWorkItemsQueueActive)
291 {
292 return;
293 }
294
295 // Don't queue more work items
296 _isWorkItemsQueueActive = false;
297
298 foreach(WorkItem workItem in _workItems)
299 {
300 workItem.DisposeOfState();
301 }
302
303 // Clear the work items that are already queued
304 _workItems.Clear();
305
306 // Note:
307 // I don't iterate over the queue and dispose of work items's states,
308 // since if a work item has a state object that is still in use in the
309 // application then I must not dispose it.
310
311 // Tell the waiters that they were timed out.
312 // It won't signal them to exit, but to ignore their
313 // next work item.
314 while(_waitersCount > 0)
315 {
316 WaiterEntry waiterEntry = PopWaiter();
317 waiterEntry.Timeout();
318 }
319 }
320 }
321
322 public object[] GetStates()
323 {
324 lock (this)
325 {
326 object[] states = new object[_workItems.Count];
327 int i = 0;
328 foreach (WorkItem workItem in _workItems)
329 {
330 states[i] = workItem.GetWorkItemResult().State;
331 ++i;
332 }
333 return states;
334 }
335 }
336
337 #endregion
338
339 #region Private methods
340
341 /// <summary>
342 /// Returns the WaiterEntry of the current thread
343 /// </summary>
344 /// <returns></returns>
345 /// In order to avoid creation and destuction of WaiterEntry
346 /// objects each thread has its own WaiterEntry object.
347 private static WaiterEntry GetThreadWaiterEntry()
348 {
349 if (null == CurrentWaiterEntry)
350 {
351 CurrentWaiterEntry = new WaiterEntry();
352 }
353 CurrentWaiterEntry.Reset();
354 return CurrentWaiterEntry;
355 }
356
357 #region Waiters stack methods
358
359 /// <summary>
360 /// Push a new waiter into the waiter's stack
361 /// </summary>
362 /// <param name="newWaiterEntry">A waiter to put in the stack</param>
363 public void PushWaiter(WaiterEntry newWaiterEntry)
364 {
365 // Remove the waiter if it is already in the stack and
366 // update waiter's count as needed
367 RemoveWaiter(newWaiterEntry, false);
368
369 // If the stack is empty then newWaiterEntry is the new head of the stack
370 if (null == _headWaiterEntry._nextWaiterEntry)
371 {
372 _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
373 newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
374
375 }
376 // If the stack is not empty then put newWaiterEntry as the new head
377 // of the stack.
378 else
379 {
380 // Save the old first waiter entry
381 WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
382
383 // Update the links
384 _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
385 newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry;
386 newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
387 oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry;
388 }
389
390 // Increment the number of waiters
391 ++_waitersCount;
392 }
393
394 /// <summary>
395 /// Pop a waiter from the waiter's stack
396 /// </summary>
397 /// <returns>Returns the first waiter in the stack</returns>
398 private WaiterEntry PopWaiter()
399 {
400 // Store the current stack head
401 WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
402
403 // Store the new stack head
404 WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry;
405
406 // Update the old stack head list links and decrement the number
407 // waiters.
408 RemoveWaiter(oldFirstWaiterEntry, true);
409
410 // Update the new stack head
411 _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry;
412 if (null != newHeadWaiterEntry)
413 {
414 newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry;
415 }
416
417 // Return the old stack head
418 return oldFirstWaiterEntry;
419 }
420
421 /// <summary>
422 /// Remove a waiter from the stack
423 /// </summary>
424 /// <param name="waiterEntry">A waiter entry to remove</param>
425 /// <param name="popDecrement">If true the waiter count is always decremented</param>
426 private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement)
427 {
428 // Store the prev entry in the list
429 WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry;
430
431 // Store the next entry in the list
432 WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry;
433
434 // A flag to indicate if we need to decrement the waiters count.
435 // If we got here from PopWaiter then we must decrement.
436 // If we got here from PushWaiter then we decrement only if
437 // the waiter was already in the stack.
438 bool decrementCounter = popDecrement;
439
440 // Null the waiter's entry links
441 waiterEntry._prevWaiterEntry = null;
442 waiterEntry._nextWaiterEntry = null;
443
444 // If the waiter entry had a prev link then update it.
445 // It also means that the waiter is already in the list and we
446 // need to decrement the waiters count.
447 if (null != prevWaiterEntry)
448 {
449 prevWaiterEntry._nextWaiterEntry = nextWaiterEntry;
450 decrementCounter = true;
451 }
452
453 // If the waiter entry had a next link then update it.
454 // It also means that the waiter is already in the list and we
455 // need to decrement the waiters count.
456 if (null != nextWaiterEntry)
457 {
458 nextWaiterEntry._prevWaiterEntry = prevWaiterEntry;
459 decrementCounter = true;
460 }
461
462 // Decrement the waiters count if needed
463 if (decrementCounter)
464 {
465 --_waitersCount;
466 }
467 }
468
469 #endregion
470
471 #endregion
472
473 #region WaiterEntry class
474
475 // A waiter entry in the _waiters queue.
476 public sealed class WaiterEntry : IDisposable
477 {
478 #region Member variables
479
480 /// <summary>
481 /// Event to signal the waiter that it got the work item.
482 /// </summary>
483 //private AutoResetEvent _waitHandle = new AutoResetEvent(false);
484 private AutoResetEvent _waitHandle = EventWaitHandleFactory.CreateAutoResetEvent();
485
486 /// <summary>
487 /// Flag to know if this waiter already quited from the queue
488 /// because of a timeout.
489 /// </summary>
490 private bool _isTimedout = false;
491
492 /// <summary>
493 /// Flag to know if the waiter was signaled and got a work item.
494 /// </summary>
495 private bool _isSignaled = false;
496
497 /// <summary>
498 /// A work item that passed directly to the waiter withou going
499 /// through the queue
500 /// </summary>
501 private WorkItem _workItem = null;
502
503 private bool _isDisposed = false;
504
505 // Linked list members
506 internal WaiterEntry _nextWaiterEntry = null;
507 internal WaiterEntry _prevWaiterEntry = null;
508
509 #endregion
510
511 #region Construction
512
513 public WaiterEntry()
514 {
515 Reset();
516 }
517
518 #endregion
519
520 #region Public methods
521
522 public WaitHandle WaitHandle
523 {
524 get { return _waitHandle; }
525 }
526
527 public WorkItem WorkItem
528 {
529 get
530 {
531 return _workItem;
532 }
533 }
534
535 /// <summary>
536 /// Signal the waiter that it got a work item.
537 /// </summary>
538 /// <returns>Return true on success</returns>
539 /// The method fails if Timeout() preceded its call
540 public bool Signal(WorkItem workItem)
541 {
542 lock(this)
543 {
544 if (!_isTimedout)
545 {
546 _workItem = workItem;
547 _isSignaled = true;
548 _waitHandle.Set();
549 return true;
550 }
551 }
552 return false;
553 }
554
555 /// <summary>
556 /// Mark the wait entry that it has been timed out
557 /// </summary>
558 /// <returns>Return true on success</returns>
559 /// The method fails if Signal() preceded its call
560 public bool Timeout()
561 {
562 lock(this)
563 {
564 // Time out can happen only if the waiter wasn't marked as
565 // signaled
566 if (!_isSignaled)
567 {
568 // We don't remove the waiter from the queue, the DequeueWorkItem
569 // method skips _waiters that were timed out.
570 _isTimedout = true;
571 return true;
572 }
573 }
574 return false;
575 }
576
577 /// <summary>
578 /// Reset the wait entry so it can be used again
579 /// </summary>
580 public void Reset()
581 {
582 _workItem = null;
583 _isTimedout = false;
584 _isSignaled = false;
585 _waitHandle.Reset();
586 }
587
588 /// <summary>
589 /// Free resources
590 /// </summary>
591 public void Close()
592 {
593 if (null != _waitHandle)
594 {
595 _waitHandle.Close();
596 _waitHandle = null;
597 }
598 }
599
600 #endregion
601
602 #region IDisposable Members
603
604 public void Dispose()
605 {
606 lock (this)
607 {
608 if (!_isDisposed)
609 {
610 Close();
611 }
612 _isDisposed = true;
613 }
614 }
615
616 #endregion
617 }
618
619 #endregion
620
621 #region IDisposable Members
622
623 public void Dispose()
624 {
625 if (!_isDisposed)
626 {
627 Cleanup();
628 _headWaiterEntry.Close();
629 }
630 _isDisposed = true;
631 }
632
633 private void ValidateNotDisposed()
634 {
635 if(_isDisposed)
636 {
637 throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
638 }
639 }
640
641 #endregion
642 }
643
644 #endregion
645}
646