aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/ThirdParty/SmartThreadPool/WorkItemsQueue.cs
diff options
context:
space:
mode:
authorTeravus Ovares2008-05-30 12:27:06 +0000
committerTeravus Ovares2008-05-30 12:27:06 +0000
commit1a47ff8094ee414a47aebd310826906d89428a09 (patch)
tree0e90b3a33f43ff8617a077bb57b86d6b28e63e71 /ThirdParty/SmartThreadPool/WorkItemsQueue.cs
parent* Fixed a dangling event hook that I added. (diff)
downloadopensim-SC-1a47ff8094ee414a47aebd310826906d89428a09.zip
opensim-SC-1a47ff8094ee414a47aebd310826906d89428a09.tar.gz
opensim-SC-1a47ff8094ee414a47aebd310826906d89428a09.tar.bz2
opensim-SC-1a47ff8094ee414a47aebd310826906d89428a09.tar.xz
* This is Melanie's XEngine script engine. I've not tested this real well, however, it's confirmed to compile and OpenSimulator to run successfully without this script engine active.
Diffstat (limited to 'ThirdParty/SmartThreadPool/WorkItemsQueue.cs')
-rw-r--r--ThirdParty/SmartThreadPool/WorkItemsQueue.cs600
1 files changed, 600 insertions, 0 deletions
diff --git a/ThirdParty/SmartThreadPool/WorkItemsQueue.cs b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs
new file mode 100644
index 0000000..af5af07
--- /dev/null
+++ b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs
@@ -0,0 +1,600 @@
1// Ami Bar
2// amibar@gmail.com
3
4using System;
5using System.Threading;
6
7namespace Amib.Threading.Internal
8{
9 #region WorkItemsQueue class
10
11 /// <summary>
12 /// WorkItemsQueue class.
13 /// </summary>
14 public class WorkItemsQueue : IDisposable
15 {
16 #region Member variables
17
18 /// <summary>
19 /// Waiters queue (implemented as stack).
20 /// </summary>
21 private WaiterEntry _headWaiterEntry = new WaiterEntry();
22
23 /// <summary>
24 /// Waiters count
25 /// </summary>
26 private int _waitersCount = 0;
27
28 /// <summary>
29 /// Work items queue
30 /// </summary>
31 private PriorityQueue _workItems = new PriorityQueue();
32
33 /// <summary>
34 /// Indicate that work items are allowed to be queued
35 /// </summary>
36 private bool _isWorkItemsQueueActive = true;
37
38 /// <summary>
39 /// Each thread in the thread pool keeps its own waiter entry.
40 /// </summary>
41 [ThreadStatic]
42 private static WaiterEntry _waiterEntry;
43
44 /// <summary>
45 /// A flag that indicates if the WorkItemsQueue has been disposed.
46 /// </summary>
47 private bool _isDisposed = false;
48
49 #endregion
50
51 #region Public properties
52
53 /// <summary>
54 /// Returns the current number of work items in the queue
55 /// </summary>
56 public int Count
57 {
58 get
59 {
60 lock(this)
61 {
62 ValidateNotDisposed();
63 return _workItems.Count;
64 }
65 }
66 }
67
68 /// <summary>
69 /// Returns the current number of waiters
70 /// </summary>
71 public int WaitersCount
72 {
73 get
74 {
75 lock(this)
76 {
77 ValidateNotDisposed();
78 return _waitersCount;
79 }
80 }
81 }
82
83
84 #endregion
85
86 #region Public methods
87
88 /// <summary>
89 /// Enqueue a work item to the queue.
90 /// </summary>
91 public bool EnqueueWorkItem(WorkItem workItem)
92 {
93 // A work item cannot be null, since null is used in the
94 // WaitForWorkItem() method to indicate timeout or cancel
95 if (null == workItem)
96 {
97 throw new ArgumentNullException("workItem" , "workItem cannot be null");
98 }
99
100 bool enqueue = true;
101
102 // First check if there is a waiter waiting for work item. During
103 // the check, timed out waiters are ignored. If there is no
104 // waiter then the work item is queued.
105 lock(this)
106 {
107 ValidateNotDisposed();
108
109 if (!_isWorkItemsQueueActive)
110 {
111 return false;
112 }
113
114 while(_waitersCount > 0)
115 {
116 // Dequeue a waiter.
117 WaiterEntry waiterEntry = PopWaiter();
118
119 // Signal the waiter. On success break the loop
120 if (waiterEntry.Signal(workItem))
121 {
122 enqueue = false;
123 break;
124 }
125 }
126
127 if (enqueue)
128 {
129 // Enqueue the work item
130 _workItems.Enqueue(workItem);
131 }
132 }
133 return true;
134 }
135
136
137 /// <summary>
138 /// Waits for a work item or exits on timeout or cancel
139 /// </summary>
140 /// <param name="millisecondsTimeout">Timeout in milliseconds</param>
141 /// <param name="cancelEvent">Cancel wait handle</param>
142 /// <returns>Returns true if the resource was granted</returns>
143 public WorkItem DequeueWorkItem(
144 int millisecondsTimeout,
145 WaitHandle cancelEvent)
146 {
147 /// This method cause the caller to wait for a work item.
148 /// If there is at least one waiting work item then the
149 /// method returns immidiately with true.
150 ///
151 /// If there are no waiting work items then the caller
152 /// is queued between other waiters for a work item to arrive.
153 ///
154 /// If a work item didn't come within millisecondsTimeout or
155 /// the user canceled the wait by signaling the cancelEvent
156 /// then the method returns false to indicate that the caller
157 /// didn't get a work item.
158
159 WaiterEntry waiterEntry = null;
160 WorkItem workItem = null;
161
162 lock(this)
163 {
164 ValidateNotDisposed();
165
166 // If there are waiting work items then take one and return.
167 if (_workItems.Count > 0)
168 {
169 workItem = _workItems.Dequeue() as WorkItem;
170 return workItem;
171 }
172 // No waiting work items ...
173 else
174 {
175 // Get the wait entry for the waiters queue
176 waiterEntry = GetThreadWaiterEntry();
177
178 // Put the waiter with the other waiters
179 PushWaiter(waiterEntry);
180 }
181 }
182
183 // Prepare array of wait handle for the WaitHandle.WaitAny()
184 WaitHandle [] waitHandles = new WaitHandle [] {
185 waiterEntry.WaitHandle,
186 cancelEvent };
187
188 // Wait for an available resource, cancel event, or timeout.
189
190 // During the wait we are supposes to exit the synchronization
191 // domain. (Placing true as the third argument of the WaitAny())
192 // It just doesn't work, I don't know why, so I have lock(this)
193 // statments insted of one.
194
195 int index = WaitHandle.WaitAny(
196 waitHandles,
197 millisecondsTimeout,
198 true);
199
200 lock(this)
201 {
202 // success is true if it got a work item.
203 bool success = (0 == index);
204
205 // The timeout variable is used only for readability.
206 // (We treat cancel as timeout)
207 bool timeout = !success;
208
209 // On timeout update the waiterEntry that it is timed out
210 if (timeout)
211 {
212 // The Timeout() fails if the waiter has already been signaled
213 timeout = waiterEntry.Timeout();
214
215 // On timeout remove the waiter from the queue.
216 // Note that the complexity is O(1).
217 if(timeout)
218 {
219 RemoveWaiter(waiterEntry, false);
220 }
221
222 // Again readability
223 success = !timeout;
224 }
225
226 // On success return the work item
227 if (success)
228 {
229 workItem = waiterEntry.WorkItem;
230
231 if (null == workItem)
232 {
233 workItem = _workItems.Dequeue() as WorkItem;
234 }
235 }
236 }
237 // On failure return null.
238 return workItem;
239 }
240
241 /// <summary>
242 /// Cleanup the work items queue, hence no more work
243 /// items are allowed to be queue
244 /// </summary>
245 protected virtual void Cleanup()
246 {
247 lock(this)
248 {
249 // Deactivate only once
250 if (!_isWorkItemsQueueActive)
251 {
252 return;
253 }
254
255 // Don't queue more work items
256 _isWorkItemsQueueActive = false;
257
258 foreach(WorkItem workItem in _workItems)
259 {
260 workItem.DisposeOfState();
261 }
262
263 // Clear the work items that are already queued
264 _workItems.Clear();
265
266 // Note:
267 // I don't iterate over the queue and dispose of work items's states,
268 // since if a work item has a state object that is still in use in the
269 // application then I must not dispose it.
270
271 // Tell the waiters that they were timed out.
272 // It won't signal them to exit, but to ignore their
273 // next work item.
274 while(_waitersCount > 0)
275 {
276 WaiterEntry waiterEntry = PopWaiter();
277 waiterEntry.Timeout();
278 }
279 }
280 }
281
282 #endregion
283
284 #region Private methods
285
286 /// <summary>
287 /// Returns the WaiterEntry of the current thread
288 /// </summary>
289 /// <returns></returns>
290 /// In order to avoid creation and destuction of WaiterEntry
291 /// objects each thread has its own WaiterEntry object.
292 private WaiterEntry GetThreadWaiterEntry()
293 {
294 if (null == _waiterEntry)
295 {
296 _waiterEntry = new WaiterEntry();
297 }
298 _waiterEntry.Reset();
299 return _waiterEntry;
300 }
301
302 #region Waiters stack methods
303
304 /// <summary>
305 /// Push a new waiter into the waiter's stack
306 /// </summary>
307 /// <param name="newWaiterEntry">A waiter to put in the stack</param>
308 public void PushWaiter(WaiterEntry newWaiterEntry)
309 {
310 // Remove the waiter if it is already in the stack and
311 // update waiter's count as needed
312 RemoveWaiter(newWaiterEntry, false);
313
314 // If the stack is empty then newWaiterEntry is the new head of the stack
315 if (null == _headWaiterEntry._nextWaiterEntry)
316 {
317 _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
318 newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
319
320 }
321 // If the stack is not empty then put newWaiterEntry as the new head
322 // of the stack.
323 else
324 {
325 // Save the old first waiter entry
326 WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
327
328 // Update the links
329 _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
330 newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry;
331 newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
332 oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry;
333 }
334
335 // Increment the number of waiters
336 ++_waitersCount;
337 }
338
339 /// <summary>
340 /// Pop a waiter from the waiter's stack
341 /// </summary>
342 /// <returns>Returns the first waiter in the stack</returns>
343 private WaiterEntry PopWaiter()
344 {
345 // Store the current stack head
346 WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
347
348 // Store the new stack head
349 WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry;
350
351 // Update the old stack head list links and decrement the number
352 // waiters.
353 RemoveWaiter(oldFirstWaiterEntry, true);
354
355 // Update the new stack head
356 _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry;
357 if (null != newHeadWaiterEntry)
358 {
359 newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry;
360 }
361
362 // Return the old stack head
363 return oldFirstWaiterEntry;
364 }
365
366 /// <summary>
367 /// Remove a waiter from the stack
368 /// </summary>
369 /// <param name="waiterEntry">A waiter entry to remove</param>
370 /// <param name="popDecrement">If true the waiter count is always decremented</param>
371 private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement)
372 {
373 // Store the prev entry in the list
374 WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry;
375
376 // Store the next entry in the list
377 WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry;
378
379 // A flag to indicate if we need to decrement the waiters count.
380 // If we got here from PopWaiter then we must decrement.
381 // If we got here from PushWaiter then we decrement only if
382 // the waiter was already in the stack.
383 bool decrementCounter = popDecrement;
384
385 // Null the waiter's entry links
386 waiterEntry._prevWaiterEntry = null;
387 waiterEntry._nextWaiterEntry = null;
388
389 // If the waiter entry had a prev link then update it.
390 // It also means that the waiter is already in the list and we
391 // need to decrement the waiters count.
392 if (null != prevWaiterEntry)
393 {
394 prevWaiterEntry._nextWaiterEntry = nextWaiterEntry;
395 decrementCounter = true;
396 }
397
398 // If the waiter entry had a next link then update it.
399 // It also means that the waiter is already in the list and we
400 // need to decrement the waiters count.
401 if (null != nextWaiterEntry)
402 {
403 nextWaiterEntry._prevWaiterEntry = prevWaiterEntry;
404 decrementCounter = true;
405 }
406
407 // Decrement the waiters count if needed
408 if (decrementCounter)
409 {
410 --_waitersCount;
411 }
412 }
413
414 #endregion
415
416 #endregion
417
418 #region WaiterEntry class
419
420 // A waiter entry in the _waiters queue.
421 public class WaiterEntry : IDisposable
422 {
423 #region Member variables
424
425 /// <summary>
426 /// Event to signal the waiter that it got the work item.
427 /// </summary>
428 private AutoResetEvent _waitHandle = new AutoResetEvent(false);
429
430 /// <summary>
431 /// Flag to know if this waiter already quited from the queue
432 /// because of a timeout.
433 /// </summary>
434 private bool _isTimedout = false;
435
436 /// <summary>
437 /// Flag to know if the waiter was signaled and got a work item.
438 /// </summary>
439 private bool _isSignaled = false;
440
441 /// <summary>
442 /// A work item that passed directly to the waiter withou going
443 /// through the queue
444 /// </summary>
445 private WorkItem _workItem = null;
446
447 private bool _isDisposed = false;
448
449 // Linked list members
450 internal WaiterEntry _nextWaiterEntry = null;
451 internal WaiterEntry _prevWaiterEntry = null;
452
453 #endregion
454
455 #region Construction
456
457 public WaiterEntry()
458 {
459 Reset();
460 }
461
462 #endregion
463
464 #region Public methods
465
466 public WaitHandle WaitHandle
467 {
468 get { return _waitHandle; }
469 }
470
471 public WorkItem WorkItem
472 {
473 get
474 {
475 lock(this)
476 {
477 return _workItem;
478 }
479 }
480 }
481
482 /// <summary>
483 /// Signal the waiter that it got a work item.
484 /// </summary>
485 /// <returns>Return true on success</returns>
486 /// The method fails if Timeout() preceded its call
487 public bool Signal(WorkItem workItem)
488 {
489 lock(this)
490 {
491 if (!_isTimedout)
492 {
493 _workItem = workItem;
494 _isSignaled = true;
495 _waitHandle.Set();
496 return true;
497 }
498 }
499 return false;
500 }
501
502 /// <summary>
503 /// Mark the wait entry that it has been timed out
504 /// </summary>
505 /// <returns>Return true on success</returns>
506 /// The method fails if Signal() preceded its call
507 public bool Timeout()
508 {
509 lock(this)
510 {
511 // Time out can happen only if the waiter wasn't marked as
512 // signaled
513 if (!_isSignaled)
514 {
515 // We don't remove the waiter from the queue, the DequeueWorkItem
516 // method skips _waiters that were timed out.
517 _isTimedout = true;
518 return true;
519 }
520 }
521 return false;
522 }
523
524 /// <summary>
525 /// Reset the wait entry so it can be used again
526 /// </summary>
527 public void Reset()
528 {
529 _workItem = null;
530 _isTimedout = false;
531 _isSignaled = false;
532 _waitHandle.Reset();
533 }
534
535 /// <summary>
536 /// Free resources
537 /// </summary>
538 public void Close()
539 {
540 if (null != _waitHandle)
541 {
542 _waitHandle.Close();
543 _waitHandle = null;
544 }
545 }
546
547 #endregion
548
549 #region IDisposable Members
550
551 public void Dispose()
552 {
553 if (!_isDisposed)
554 {
555 Close();
556 _isDisposed = true;
557 }
558 }
559
560 ~WaiterEntry()
561 {
562 Dispose();
563 }
564
565 #endregion
566 }
567
568 #endregion
569
570 #region IDisposable Members
571
572 public void Dispose()
573 {
574 if (!_isDisposed)
575 {
576 Cleanup();
577 _isDisposed = true;
578 GC.SuppressFinalize(this);
579 }
580 }
581
582 ~WorkItemsQueue()
583 {
584 Cleanup();
585 }
586
587 private void ValidateNotDisposed()
588 {
589 if(_isDisposed)
590 {
591 throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
592 }
593 }
594
595 #endregion
596 }
597
598 #endregion
599}
600