From 1a47ff8094ee414a47aebd310826906d89428a09 Mon Sep 17 00:00:00 2001
From: Teravus Ovares
Date: Fri, 30 May 2008 12:27:06 +0000
Subject: * This is Melanie's XEngine script engine. I've not tested this real
well, however, it's confirmed to compile and OpenSimulator to run
successfully without this script engine active.
---
ThirdParty/SmartThreadPool/WorkItemsQueue.cs | 600 +++++++++++++++++++++++++++
1 file changed, 600 insertions(+)
create mode 100644 ThirdParty/SmartThreadPool/WorkItemsQueue.cs
(limited to 'ThirdParty/SmartThreadPool/WorkItemsQueue.cs')
diff --git a/ThirdParty/SmartThreadPool/WorkItemsQueue.cs b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs
new file mode 100644
index 0000000..af5af07
--- /dev/null
+++ b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs
@@ -0,0 +1,600 @@
+// Ami Bar
+// amibar@gmail.com
+
+using System;
+using System.Threading;
+
+namespace Amib.Threading.Internal
+{
+ #region WorkItemsQueue class
+
+ ///
+ /// WorkItemsQueue class.
+ ///
+ public class WorkItemsQueue : IDisposable
+ {
+ #region Member variables
+
+ ///
+ /// Waiters queue (implemented as stack).
+ ///
+ private WaiterEntry _headWaiterEntry = new WaiterEntry();
+
+ ///
+ /// Waiters count
+ ///
+ private int _waitersCount = 0;
+
+ ///
+ /// Work items queue
+ ///
+ private PriorityQueue _workItems = new PriorityQueue();
+
+ ///
+ /// Indicate that work items are allowed to be queued
+ ///
+ private bool _isWorkItemsQueueActive = true;
+
+ ///
+ /// Each thread in the thread pool keeps its own waiter entry.
+ ///
+ [ThreadStatic]
+ private static WaiterEntry _waiterEntry;
+
+ ///
+ /// A flag that indicates if the WorkItemsQueue has been disposed.
+ ///
+ private bool _isDisposed = false;
+
+ #endregion
+
+ #region Public properties
+
+ ///
+ /// Returns the current number of work items in the queue
+ ///
+ public int Count
+ {
+ get
+ {
+ lock(this)
+ {
+ ValidateNotDisposed();
+ return _workItems.Count;
+ }
+ }
+ }
+
+ ///
+ /// Returns the current number of waiters
+ ///
+ public int WaitersCount
+ {
+ get
+ {
+ lock(this)
+ {
+ ValidateNotDisposed();
+ return _waitersCount;
+ }
+ }
+ }
+
+
+ #endregion
+
+ #region Public methods
+
+ ///
+ /// Enqueue a work item to the queue.
+ ///
+ public bool EnqueueWorkItem(WorkItem workItem)
+ {
+ // A work item cannot be null, since null is used in the
+ // WaitForWorkItem() method to indicate timeout or cancel
+ if (null == workItem)
+ {
+ throw new ArgumentNullException("workItem" , "workItem cannot be null");
+ }
+
+ bool enqueue = true;
+
+ // First check if there is a waiter waiting for work item. During
+ // the check, timed out waiters are ignored. If there is no
+ // waiter then the work item is queued.
+ lock(this)
+ {
+ ValidateNotDisposed();
+
+ if (!_isWorkItemsQueueActive)
+ {
+ return false;
+ }
+
+ while(_waitersCount > 0)
+ {
+ // Dequeue a waiter.
+ WaiterEntry waiterEntry = PopWaiter();
+
+ // Signal the waiter. On success break the loop
+ if (waiterEntry.Signal(workItem))
+ {
+ enqueue = false;
+ break;
+ }
+ }
+
+ if (enqueue)
+ {
+ // Enqueue the work item
+ _workItems.Enqueue(workItem);
+ }
+ }
+ return true;
+ }
+
+
+ ///
+ /// Waits for a work item or exits on timeout or cancel
+ ///
+ /// Timeout in milliseconds
+ /// Cancel wait handle
+ /// Returns true if the resource was granted
+ public WorkItem DequeueWorkItem(
+ int millisecondsTimeout,
+ WaitHandle cancelEvent)
+ {
+ /// This method cause the caller to wait for a work item.
+ /// If there is at least one waiting work item then the
+ /// method returns immidiately with true.
+ ///
+ /// If there are no waiting work items then the caller
+ /// is queued between other waiters for a work item to arrive.
+ ///
+ /// If a work item didn't come within millisecondsTimeout or
+ /// the user canceled the wait by signaling the cancelEvent
+ /// then the method returns false to indicate that the caller
+ /// didn't get a work item.
+
+ WaiterEntry waiterEntry = null;
+ WorkItem workItem = null;
+
+ lock(this)
+ {
+ ValidateNotDisposed();
+
+ // If there are waiting work items then take one and return.
+ if (_workItems.Count > 0)
+ {
+ workItem = _workItems.Dequeue() as WorkItem;
+ return workItem;
+ }
+ // No waiting work items ...
+ else
+ {
+ // Get the wait entry for the waiters queue
+ waiterEntry = GetThreadWaiterEntry();
+
+ // Put the waiter with the other waiters
+ PushWaiter(waiterEntry);
+ }
+ }
+
+ // Prepare array of wait handle for the WaitHandle.WaitAny()
+ WaitHandle [] waitHandles = new WaitHandle [] {
+ waiterEntry.WaitHandle,
+ cancelEvent };
+
+ // Wait for an available resource, cancel event, or timeout.
+
+ // During the wait we are supposes to exit the synchronization
+ // domain. (Placing true as the third argument of the WaitAny())
+ // It just doesn't work, I don't know why, so I have lock(this)
+ // statments insted of one.
+
+ int index = WaitHandle.WaitAny(
+ waitHandles,
+ millisecondsTimeout,
+ true);
+
+ lock(this)
+ {
+ // success is true if it got a work item.
+ bool success = (0 == index);
+
+ // The timeout variable is used only for readability.
+ // (We treat cancel as timeout)
+ bool timeout = !success;
+
+ // On timeout update the waiterEntry that it is timed out
+ if (timeout)
+ {
+ // The Timeout() fails if the waiter has already been signaled
+ timeout = waiterEntry.Timeout();
+
+ // On timeout remove the waiter from the queue.
+ // Note that the complexity is O(1).
+ if(timeout)
+ {
+ RemoveWaiter(waiterEntry, false);
+ }
+
+ // Again readability
+ success = !timeout;
+ }
+
+ // On success return the work item
+ if (success)
+ {
+ workItem = waiterEntry.WorkItem;
+
+ if (null == workItem)
+ {
+ workItem = _workItems.Dequeue() as WorkItem;
+ }
+ }
+ }
+ // On failure return null.
+ return workItem;
+ }
+
+ ///
+ /// Cleanup the work items queue, hence no more work
+ /// items are allowed to be queue
+ ///
+ protected virtual void Cleanup()
+ {
+ lock(this)
+ {
+ // Deactivate only once
+ if (!_isWorkItemsQueueActive)
+ {
+ return;
+ }
+
+ // Don't queue more work items
+ _isWorkItemsQueueActive = false;
+
+ foreach(WorkItem workItem in _workItems)
+ {
+ workItem.DisposeOfState();
+ }
+
+ // Clear the work items that are already queued
+ _workItems.Clear();
+
+ // Note:
+ // I don't iterate over the queue and dispose of work items's states,
+ // since if a work item has a state object that is still in use in the
+ // application then I must not dispose it.
+
+ // Tell the waiters that they were timed out.
+ // It won't signal them to exit, but to ignore their
+ // next work item.
+ while(_waitersCount > 0)
+ {
+ WaiterEntry waiterEntry = PopWaiter();
+ waiterEntry.Timeout();
+ }
+ }
+ }
+
+ #endregion
+
+ #region Private methods
+
+ ///
+ /// Returns the WaiterEntry of the current thread
+ ///
+ ///
+ /// In order to avoid creation and destuction of WaiterEntry
+ /// objects each thread has its own WaiterEntry object.
+ private WaiterEntry GetThreadWaiterEntry()
+ {
+ if (null == _waiterEntry)
+ {
+ _waiterEntry = new WaiterEntry();
+ }
+ _waiterEntry.Reset();
+ return _waiterEntry;
+ }
+
+ #region Waiters stack methods
+
+ ///
+ /// Push a new waiter into the waiter's stack
+ ///
+ /// A waiter to put in the stack
+ public void PushWaiter(WaiterEntry newWaiterEntry)
+ {
+ // Remove the waiter if it is already in the stack and
+ // update waiter's count as needed
+ RemoveWaiter(newWaiterEntry, false);
+
+ // If the stack is empty then newWaiterEntry is the new head of the stack
+ if (null == _headWaiterEntry._nextWaiterEntry)
+ {
+ _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
+ newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
+
+ }
+ // If the stack is not empty then put newWaiterEntry as the new head
+ // of the stack.
+ else
+ {
+ // Save the old first waiter entry
+ WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
+
+ // Update the links
+ _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
+ newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry;
+ newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
+ oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry;
+ }
+
+ // Increment the number of waiters
+ ++_waitersCount;
+ }
+
+ ///
+ /// Pop a waiter from the waiter's stack
+ ///
+ /// Returns the first waiter in the stack
+ private WaiterEntry PopWaiter()
+ {
+ // Store the current stack head
+ WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
+
+ // Store the new stack head
+ WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry;
+
+ // Update the old stack head list links and decrement the number
+ // waiters.
+ RemoveWaiter(oldFirstWaiterEntry, true);
+
+ // Update the new stack head
+ _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry;
+ if (null != newHeadWaiterEntry)
+ {
+ newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry;
+ }
+
+ // Return the old stack head
+ return oldFirstWaiterEntry;
+ }
+
+ ///
+ /// Remove a waiter from the stack
+ ///
+ /// A waiter entry to remove
+ /// If true the waiter count is always decremented
+ private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement)
+ {
+ // Store the prev entry in the list
+ WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry;
+
+ // Store the next entry in the list
+ WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry;
+
+ // A flag to indicate if we need to decrement the waiters count.
+ // If we got here from PopWaiter then we must decrement.
+ // If we got here from PushWaiter then we decrement only if
+ // the waiter was already in the stack.
+ bool decrementCounter = popDecrement;
+
+ // Null the waiter's entry links
+ waiterEntry._prevWaiterEntry = null;
+ waiterEntry._nextWaiterEntry = null;
+
+ // If the waiter entry had a prev link then update it.
+ // It also means that the waiter is already in the list and we
+ // need to decrement the waiters count.
+ if (null != prevWaiterEntry)
+ {
+ prevWaiterEntry._nextWaiterEntry = nextWaiterEntry;
+ decrementCounter = true;
+ }
+
+ // If the waiter entry had a next link then update it.
+ // It also means that the waiter is already in the list and we
+ // need to decrement the waiters count.
+ if (null != nextWaiterEntry)
+ {
+ nextWaiterEntry._prevWaiterEntry = prevWaiterEntry;
+ decrementCounter = true;
+ }
+
+ // Decrement the waiters count if needed
+ if (decrementCounter)
+ {
+ --_waitersCount;
+ }
+ }
+
+ #endregion
+
+ #endregion
+
+ #region WaiterEntry class
+
+ // A waiter entry in the _waiters queue.
+ public class WaiterEntry : IDisposable
+ {
+ #region Member variables
+
+ ///
+ /// Event to signal the waiter that it got the work item.
+ ///
+ private AutoResetEvent _waitHandle = new AutoResetEvent(false);
+
+ ///
+ /// Flag to know if this waiter already quited from the queue
+ /// because of a timeout.
+ ///
+ private bool _isTimedout = false;
+
+ ///
+ /// Flag to know if the waiter was signaled and got a work item.
+ ///
+ private bool _isSignaled = false;
+
+ ///
+ /// A work item that passed directly to the waiter withou going
+ /// through the queue
+ ///
+ private WorkItem _workItem = null;
+
+ private bool _isDisposed = false;
+
+ // Linked list members
+ internal WaiterEntry _nextWaiterEntry = null;
+ internal WaiterEntry _prevWaiterEntry = null;
+
+ #endregion
+
+ #region Construction
+
+ public WaiterEntry()
+ {
+ Reset();
+ }
+
+ #endregion
+
+ #region Public methods
+
+ public WaitHandle WaitHandle
+ {
+ get { return _waitHandle; }
+ }
+
+ public WorkItem WorkItem
+ {
+ get
+ {
+ lock(this)
+ {
+ return _workItem;
+ }
+ }
+ }
+
+ ///
+ /// Signal the waiter that it got a work item.
+ ///
+ /// Return true on success
+ /// The method fails if Timeout() preceded its call
+ public bool Signal(WorkItem workItem)
+ {
+ lock(this)
+ {
+ if (!_isTimedout)
+ {
+ _workItem = workItem;
+ _isSignaled = true;
+ _waitHandle.Set();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ ///
+ /// Mark the wait entry that it has been timed out
+ ///
+ /// Return true on success
+ /// The method fails if Signal() preceded its call
+ public bool Timeout()
+ {
+ lock(this)
+ {
+ // Time out can happen only if the waiter wasn't marked as
+ // signaled
+ if (!_isSignaled)
+ {
+ // We don't remove the waiter from the queue, the DequeueWorkItem
+ // method skips _waiters that were timed out.
+ _isTimedout = true;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ ///
+ /// Reset the wait entry so it can be used again
+ ///
+ public void Reset()
+ {
+ _workItem = null;
+ _isTimedout = false;
+ _isSignaled = false;
+ _waitHandle.Reset();
+ }
+
+ ///
+ /// Free resources
+ ///
+ public void Close()
+ {
+ if (null != _waitHandle)
+ {
+ _waitHandle.Close();
+ _waitHandle = null;
+ }
+ }
+
+ #endregion
+
+ #region IDisposable Members
+
+ public void Dispose()
+ {
+ if (!_isDisposed)
+ {
+ Close();
+ _isDisposed = true;
+ }
+ }
+
+ ~WaiterEntry()
+ {
+ Dispose();
+ }
+
+ #endregion
+ }
+
+ #endregion
+
+ #region IDisposable Members
+
+ public void Dispose()
+ {
+ if (!_isDisposed)
+ {
+ Cleanup();
+ _isDisposed = true;
+ GC.SuppressFinalize(this);
+ }
+ }
+
+ ~WorkItemsQueue()
+ {
+ Cleanup();
+ }
+
+ private void ValidateNotDisposed()
+ {
+ if(_isDisposed)
+ {
+ throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
+ }
+ }
+
+ #endregion
+ }
+
+ #endregion
+}
+
--
cgit v1.1