From 1a47ff8094ee414a47aebd310826906d89428a09 Mon Sep 17 00:00:00 2001 From: Teravus Ovares Date: Fri, 30 May 2008 12:27:06 +0000 Subject: * This is Melanie's XEngine script engine. I've not tested this real well, however, it's confirmed to compile and OpenSimulator to run successfully without this script engine active. --- ThirdParty/SmartThreadPool/WorkItemsQueue.cs | 600 +++++++++++++++++++++++++++ 1 file changed, 600 insertions(+) create mode 100644 ThirdParty/SmartThreadPool/WorkItemsQueue.cs (limited to 'ThirdParty/SmartThreadPool/WorkItemsQueue.cs') diff --git a/ThirdParty/SmartThreadPool/WorkItemsQueue.cs b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs new file mode 100644 index 0000000..af5af07 --- /dev/null +++ b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs @@ -0,0 +1,600 @@ +// Ami Bar +// amibar@gmail.com + +using System; +using System.Threading; + +namespace Amib.Threading.Internal +{ + #region WorkItemsQueue class + + /// + /// WorkItemsQueue class. + /// + public class WorkItemsQueue : IDisposable + { + #region Member variables + + /// + /// Waiters queue (implemented as stack). + /// + private WaiterEntry _headWaiterEntry = new WaiterEntry(); + + /// + /// Waiters count + /// + private int _waitersCount = 0; + + /// + /// Work items queue + /// + private PriorityQueue _workItems = new PriorityQueue(); + + /// + /// Indicate that work items are allowed to be queued + /// + private bool _isWorkItemsQueueActive = true; + + /// + /// Each thread in the thread pool keeps its own waiter entry. + /// + [ThreadStatic] + private static WaiterEntry _waiterEntry; + + /// + /// A flag that indicates if the WorkItemsQueue has been disposed. + /// + private bool _isDisposed = false; + + #endregion + + #region Public properties + + /// + /// Returns the current number of work items in the queue + /// + public int Count + { + get + { + lock(this) + { + ValidateNotDisposed(); + return _workItems.Count; + } + } + } + + /// + /// Returns the current number of waiters + /// + public int WaitersCount + { + get + { + lock(this) + { + ValidateNotDisposed(); + return _waitersCount; + } + } + } + + + #endregion + + #region Public methods + + /// + /// Enqueue a work item to the queue. + /// + public bool EnqueueWorkItem(WorkItem workItem) + { + // A work item cannot be null, since null is used in the + // WaitForWorkItem() method to indicate timeout or cancel + if (null == workItem) + { + throw new ArgumentNullException("workItem" , "workItem cannot be null"); + } + + bool enqueue = true; + + // First check if there is a waiter waiting for work item. During + // the check, timed out waiters are ignored. If there is no + // waiter then the work item is queued. + lock(this) + { + ValidateNotDisposed(); + + if (!_isWorkItemsQueueActive) + { + return false; + } + + while(_waitersCount > 0) + { + // Dequeue a waiter. + WaiterEntry waiterEntry = PopWaiter(); + + // Signal the waiter. On success break the loop + if (waiterEntry.Signal(workItem)) + { + enqueue = false; + break; + } + } + + if (enqueue) + { + // Enqueue the work item + _workItems.Enqueue(workItem); + } + } + return true; + } + + + /// + /// Waits for a work item or exits on timeout or cancel + /// + /// Timeout in milliseconds + /// Cancel wait handle + /// Returns true if the resource was granted + public WorkItem DequeueWorkItem( + int millisecondsTimeout, + WaitHandle cancelEvent) + { + /// This method cause the caller to wait for a work item. + /// If there is at least one waiting work item then the + /// method returns immidiately with true. + /// + /// If there are no waiting work items then the caller + /// is queued between other waiters for a work item to arrive. + /// + /// If a work item didn't come within millisecondsTimeout or + /// the user canceled the wait by signaling the cancelEvent + /// then the method returns false to indicate that the caller + /// didn't get a work item. + + WaiterEntry waiterEntry = null; + WorkItem workItem = null; + + lock(this) + { + ValidateNotDisposed(); + + // If there are waiting work items then take one and return. + if (_workItems.Count > 0) + { + workItem = _workItems.Dequeue() as WorkItem; + return workItem; + } + // No waiting work items ... + else + { + // Get the wait entry for the waiters queue + waiterEntry = GetThreadWaiterEntry(); + + // Put the waiter with the other waiters + PushWaiter(waiterEntry); + } + } + + // Prepare array of wait handle for the WaitHandle.WaitAny() + WaitHandle [] waitHandles = new WaitHandle [] { + waiterEntry.WaitHandle, + cancelEvent }; + + // Wait for an available resource, cancel event, or timeout. + + // During the wait we are supposes to exit the synchronization + // domain. (Placing true as the third argument of the WaitAny()) + // It just doesn't work, I don't know why, so I have lock(this) + // statments insted of one. + + int index = WaitHandle.WaitAny( + waitHandles, + millisecondsTimeout, + true); + + lock(this) + { + // success is true if it got a work item. + bool success = (0 == index); + + // The timeout variable is used only for readability. + // (We treat cancel as timeout) + bool timeout = !success; + + // On timeout update the waiterEntry that it is timed out + if (timeout) + { + // The Timeout() fails if the waiter has already been signaled + timeout = waiterEntry.Timeout(); + + // On timeout remove the waiter from the queue. + // Note that the complexity is O(1). + if(timeout) + { + RemoveWaiter(waiterEntry, false); + } + + // Again readability + success = !timeout; + } + + // On success return the work item + if (success) + { + workItem = waiterEntry.WorkItem; + + if (null == workItem) + { + workItem = _workItems.Dequeue() as WorkItem; + } + } + } + // On failure return null. + return workItem; + } + + /// + /// Cleanup the work items queue, hence no more work + /// items are allowed to be queue + /// + protected virtual void Cleanup() + { + lock(this) + { + // Deactivate only once + if (!_isWorkItemsQueueActive) + { + return; + } + + // Don't queue more work items + _isWorkItemsQueueActive = false; + + foreach(WorkItem workItem in _workItems) + { + workItem.DisposeOfState(); + } + + // Clear the work items that are already queued + _workItems.Clear(); + + // Note: + // I don't iterate over the queue and dispose of work items's states, + // since if a work item has a state object that is still in use in the + // application then I must not dispose it. + + // Tell the waiters that they were timed out. + // It won't signal them to exit, but to ignore their + // next work item. + while(_waitersCount > 0) + { + WaiterEntry waiterEntry = PopWaiter(); + waiterEntry.Timeout(); + } + } + } + + #endregion + + #region Private methods + + /// + /// Returns the WaiterEntry of the current thread + /// + /// + /// In order to avoid creation and destuction of WaiterEntry + /// objects each thread has its own WaiterEntry object. + private WaiterEntry GetThreadWaiterEntry() + { + if (null == _waiterEntry) + { + _waiterEntry = new WaiterEntry(); + } + _waiterEntry.Reset(); + return _waiterEntry; + } + + #region Waiters stack methods + + /// + /// Push a new waiter into the waiter's stack + /// + /// A waiter to put in the stack + public void PushWaiter(WaiterEntry newWaiterEntry) + { + // Remove the waiter if it is already in the stack and + // update waiter's count as needed + RemoveWaiter(newWaiterEntry, false); + + // If the stack is empty then newWaiterEntry is the new head of the stack + if (null == _headWaiterEntry._nextWaiterEntry) + { + _headWaiterEntry._nextWaiterEntry = newWaiterEntry; + newWaiterEntry._prevWaiterEntry = _headWaiterEntry; + + } + // If the stack is not empty then put newWaiterEntry as the new head + // of the stack. + else + { + // Save the old first waiter entry + WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry; + + // Update the links + _headWaiterEntry._nextWaiterEntry = newWaiterEntry; + newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry; + newWaiterEntry._prevWaiterEntry = _headWaiterEntry; + oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry; + } + + // Increment the number of waiters + ++_waitersCount; + } + + /// + /// Pop a waiter from the waiter's stack + /// + /// Returns the first waiter in the stack + private WaiterEntry PopWaiter() + { + // Store the current stack head + WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry; + + // Store the new stack head + WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry; + + // Update the old stack head list links and decrement the number + // waiters. + RemoveWaiter(oldFirstWaiterEntry, true); + + // Update the new stack head + _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry; + if (null != newHeadWaiterEntry) + { + newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry; + } + + // Return the old stack head + return oldFirstWaiterEntry; + } + + /// + /// Remove a waiter from the stack + /// + /// A waiter entry to remove + /// If true the waiter count is always decremented + private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement) + { + // Store the prev entry in the list + WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry; + + // Store the next entry in the list + WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry; + + // A flag to indicate if we need to decrement the waiters count. + // If we got here from PopWaiter then we must decrement. + // If we got here from PushWaiter then we decrement only if + // the waiter was already in the stack. + bool decrementCounter = popDecrement; + + // Null the waiter's entry links + waiterEntry._prevWaiterEntry = null; + waiterEntry._nextWaiterEntry = null; + + // If the waiter entry had a prev link then update it. + // It also means that the waiter is already in the list and we + // need to decrement the waiters count. + if (null != prevWaiterEntry) + { + prevWaiterEntry._nextWaiterEntry = nextWaiterEntry; + decrementCounter = true; + } + + // If the waiter entry had a next link then update it. + // It also means that the waiter is already in the list and we + // need to decrement the waiters count. + if (null != nextWaiterEntry) + { + nextWaiterEntry._prevWaiterEntry = prevWaiterEntry; + decrementCounter = true; + } + + // Decrement the waiters count if needed + if (decrementCounter) + { + --_waitersCount; + } + } + + #endregion + + #endregion + + #region WaiterEntry class + + // A waiter entry in the _waiters queue. + public class WaiterEntry : IDisposable + { + #region Member variables + + /// + /// Event to signal the waiter that it got the work item. + /// + private AutoResetEvent _waitHandle = new AutoResetEvent(false); + + /// + /// Flag to know if this waiter already quited from the queue + /// because of a timeout. + /// + private bool _isTimedout = false; + + /// + /// Flag to know if the waiter was signaled and got a work item. + /// + private bool _isSignaled = false; + + /// + /// A work item that passed directly to the waiter withou going + /// through the queue + /// + private WorkItem _workItem = null; + + private bool _isDisposed = false; + + // Linked list members + internal WaiterEntry _nextWaiterEntry = null; + internal WaiterEntry _prevWaiterEntry = null; + + #endregion + + #region Construction + + public WaiterEntry() + { + Reset(); + } + + #endregion + + #region Public methods + + public WaitHandle WaitHandle + { + get { return _waitHandle; } + } + + public WorkItem WorkItem + { + get + { + lock(this) + { + return _workItem; + } + } + } + + /// + /// Signal the waiter that it got a work item. + /// + /// Return true on success + /// The method fails if Timeout() preceded its call + public bool Signal(WorkItem workItem) + { + lock(this) + { + if (!_isTimedout) + { + _workItem = workItem; + _isSignaled = true; + _waitHandle.Set(); + return true; + } + } + return false; + } + + /// + /// Mark the wait entry that it has been timed out + /// + /// Return true on success + /// The method fails if Signal() preceded its call + public bool Timeout() + { + lock(this) + { + // Time out can happen only if the waiter wasn't marked as + // signaled + if (!_isSignaled) + { + // We don't remove the waiter from the queue, the DequeueWorkItem + // method skips _waiters that were timed out. + _isTimedout = true; + return true; + } + } + return false; + } + + /// + /// Reset the wait entry so it can be used again + /// + public void Reset() + { + _workItem = null; + _isTimedout = false; + _isSignaled = false; + _waitHandle.Reset(); + } + + /// + /// Free resources + /// + public void Close() + { + if (null != _waitHandle) + { + _waitHandle.Close(); + _waitHandle = null; + } + } + + #endregion + + #region IDisposable Members + + public void Dispose() + { + if (!_isDisposed) + { + Close(); + _isDisposed = true; + } + } + + ~WaiterEntry() + { + Dispose(); + } + + #endregion + } + + #endregion + + #region IDisposable Members + + public void Dispose() + { + if (!_isDisposed) + { + Cleanup(); + _isDisposed = true; + GC.SuppressFinalize(this); + } + } + + ~WorkItemsQueue() + { + Cleanup(); + } + + private void ValidateNotDisposed() + { + if(_isDisposed) + { + throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown"); + } + } + + #endregion + } + + #endregion +} + -- cgit v1.1