From 134f86e8d5c414409631b25b8c6f0ee45fbd8631 Mon Sep 17 00:00:00 2001 From: David Walter Seikel Date: Thu, 3 Nov 2016 21:44:39 +1000 Subject: Initial update to OpenSim 0.8.2.1 source code. --- ThirdParty/SmartThreadPool/WorkItemsQueue.cs | 973 ++++++++++++++------------- 1 file changed, 509 insertions(+), 464 deletions(-) (limited to 'ThirdParty/SmartThreadPool/WorkItemsQueue.cs') diff --git a/ThirdParty/SmartThreadPool/WorkItemsQueue.cs b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs index af5af07..e0bc916 100644 --- a/ThirdParty/SmartThreadPool/WorkItemsQueue.cs +++ b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs @@ -1,109 +1,151 @@ -// Ami Bar -// amibar@gmail.com - using System; +using System.Collections.Generic; using System.Threading; namespace Amib.Threading.Internal { - #region WorkItemsQueue class - - /// - /// WorkItemsQueue class. - /// - public class WorkItemsQueue : IDisposable - { - #region Member variables - - /// - /// Waiters queue (implemented as stack). - /// - private WaiterEntry _headWaiterEntry = new WaiterEntry(); - - /// - /// Waiters count - /// - private int _waitersCount = 0; - - /// - /// Work items queue - /// - private PriorityQueue _workItems = new PriorityQueue(); + #region WorkItemsQueue class + + /// + /// WorkItemsQueue class. + /// + public class WorkItemsQueue : IDisposable + { + #region Member variables + + /// + /// Waiters queue (implemented as stack). + /// + private readonly WaiterEntry _headWaiterEntry = new WaiterEntry(); + + /// + /// Waiters count + /// + private int _waitersCount = 0; + + /// + /// Work items queue + /// + private readonly PriorityQueue _workItems = new PriorityQueue(); /// /// Indicate that work items are allowed to be queued /// private bool _isWorkItemsQueueActive = true; - /// - /// Each thread in the thread pool keeps its own waiter entry. - /// - [ThreadStatic] - private static WaiterEntry _waiterEntry; - /// - /// A flag that indicates if the WorkItemsQueue has been disposed. - /// - private bool _isDisposed = false; +#if (WINDOWS_PHONE) + private static readonly Dictionary _waiterEntries = new Dictionary(); +#elif (_WINDOWS_CE) + private static LocalDataStoreSlot _waiterEntrySlot = Thread.AllocateDataSlot(); +#else - #endregion + [ThreadStatic] + private static WaiterEntry _waiterEntry; +#endif - #region Public properties /// - /// Returns the current number of work items in the queue + /// Each thread in the thread pool keeps its own waiter entry. /// - public int Count + private static WaiterEntry CurrentWaiterEntry { +#if (WINDOWS_PHONE) get { - lock(this) + lock (_waiterEntries) { - ValidateNotDisposed(); - return _workItems.Count; + WaiterEntry waiterEntry; + if (_waiterEntries.TryGetValue(Thread.CurrentThread.ManagedThreadId, out waiterEntry)) + { + return waiterEntry; + } } + return null; } - } - - /// - /// Returns the current number of waiters - /// - public int WaitersCount - { - get + set { - lock(this) + lock (_waiterEntries) { - ValidateNotDisposed(); - return _waitersCount; + _waiterEntries[Thread.CurrentThread.ManagedThreadId] = value; } } +#elif (_WINDOWS_CE) + get + { + return Thread.GetData(_waiterEntrySlot) as WaiterEntry; + } + set + { + Thread.SetData(_waiterEntrySlot, value); + } +#else + get + { + return _waiterEntry; + } + set + { + _waiterEntry = value; + } +#endif } + /// + /// A flag that indicates if the WorkItemsQueue has been disposed. + /// + private bool _isDisposed = false; - #endregion + #endregion - #region Public methods + #region Public properties /// - /// Enqueue a work item to the queue. + /// Returns the current number of work items in the queue /// - public bool EnqueueWorkItem(WorkItem workItem) - { - // A work item cannot be null, since null is used in the - // WaitForWorkItem() method to indicate timeout or cancel - if (null == workItem) - { - throw new ArgumentNullException("workItem" , "workItem cannot be null"); - } + public int Count + { + get + { + return _workItems.Count; + } + } + + /// + /// Returns the current number of waiters + /// + public int WaitersCount + { + get + { + return _waitersCount; + } + } - bool enqueue = true; - // First check if there is a waiter waiting for work item. During - // the check, timed out waiters are ignored. If there is no - // waiter then the work item is queued. - lock(this) - { + #endregion + + #region Public methods + + /// + /// Enqueue a work item to the queue. + /// + public bool EnqueueWorkItem(WorkItem workItem) + { + // A work item cannot be null, since null is used in the + // WaitForWorkItem() method to indicate timeout or cancel + if (null == workItem) + { + throw new ArgumentNullException("workItem" , "workItem cannot be null"); + } + + bool enqueue = true; + + // First check if there is a waiter waiting for work item. During + // the check, timed out waiters are ignored. If there is no + // waiter then the work item is queued. + lock(this) + { ValidateNotDisposed(); if (!_isWorkItemsQueueActive) @@ -111,56 +153,55 @@ namespace Amib.Threading.Internal return false; } - while(_waitersCount > 0) - { - // Dequeue a waiter. - WaiterEntry waiterEntry = PopWaiter(); - - // Signal the waiter. On success break the loop - if (waiterEntry.Signal(workItem)) - { - enqueue = false; - break; - } - } - - if (enqueue) - { - // Enqueue the work item - _workItems.Enqueue(workItem); - } - } + while(_waitersCount > 0) + { + // Dequeue a waiter. + WaiterEntry waiterEntry = PopWaiter(); + + // Signal the waiter. On success break the loop + if (waiterEntry.Signal(workItem)) + { + enqueue = false; + break; + } + } + + if (enqueue) + { + // Enqueue the work item + _workItems.Enqueue(workItem); + } + } return true; - } - - - /// - /// Waits for a work item or exits on timeout or cancel - /// - /// Timeout in milliseconds - /// Cancel wait handle - /// Returns true if the resource was granted - public WorkItem DequeueWorkItem( - int millisecondsTimeout, - WaitHandle cancelEvent) - { - /// This method cause the caller to wait for a work item. - /// If there is at least one waiting work item then the - /// method returns immidiately with true. - /// - /// If there are no waiting work items then the caller - /// is queued between other waiters for a work item to arrive. - /// - /// If a work item didn't come within millisecondsTimeout or - /// the user canceled the wait by signaling the cancelEvent - /// then the method returns false to indicate that the caller - /// didn't get a work item. - - WaiterEntry waiterEntry = null; - WorkItem workItem = null; - - lock(this) - { + } + + + /// + /// Waits for a work item or exits on timeout or cancel + /// + /// Timeout in milliseconds + /// Cancel wait handle + /// Returns true if the resource was granted + public WorkItem DequeueWorkItem( + int millisecondsTimeout, + WaitHandle cancelEvent) + { + // This method cause the caller to wait for a work item. + // If there is at least one waiting work item then the + // method returns immidiately with it. + // + // If there are no waiting work items then the caller + // is queued between other waiters for a work item to arrive. + // + // If a work item didn't come within millisecondsTimeout or + // the user canceled the wait by signaling the cancelEvent + // then the method returns null to indicate that the caller + // didn't get a work item. + + WaiterEntry waiterEntry; + WorkItem workItem = null; + lock (this) + { ValidateNotDisposed(); // If there are waiting work items then take one and return. @@ -169,80 +210,79 @@ namespace Amib.Threading.Internal workItem = _workItems.Dequeue() as WorkItem; return workItem; } - // No waiting work items ... - else - { - // Get the wait entry for the waiters queue - waiterEntry = GetThreadWaiterEntry(); - - // Put the waiter with the other waiters - PushWaiter(waiterEntry); - } - } - - // Prepare array of wait handle for the WaitHandle.WaitAny() - WaitHandle [] waitHandles = new WaitHandle [] { - waiterEntry.WaitHandle, - cancelEvent }; - - // Wait for an available resource, cancel event, or timeout. - // During the wait we are supposes to exit the synchronization - // domain. (Placing true as the third argument of the WaitAny()) - // It just doesn't work, I don't know why, so I have lock(this) - // statments insted of one. - - int index = WaitHandle.WaitAny( - waitHandles, - millisecondsTimeout, - true); - - lock(this) - { - // success is true if it got a work item. - bool success = (0 == index); - - // The timeout variable is used only for readability. - // (We treat cancel as timeout) - bool timeout = !success; - - // On timeout update the waiterEntry that it is timed out - if (timeout) - { - // The Timeout() fails if the waiter has already been signaled - timeout = waiterEntry.Timeout(); - - // On timeout remove the waiter from the queue. - // Note that the complexity is O(1). - if(timeout) - { - RemoveWaiter(waiterEntry, false); - } - - // Again readability - success = !timeout; - } - - // On success return the work item - if (success) - { - workItem = waiterEntry.WorkItem; - - if (null == workItem) - { - workItem = _workItems.Dequeue() as WorkItem; - } - } - } - // On failure return null. - return workItem; - } + // No waiting work items ... + + // Get the waiter entry for the waiters queue + waiterEntry = GetThreadWaiterEntry(); + + // Put the waiter with the other waiters + PushWaiter(waiterEntry); + } + + // Prepare array of wait handle for the WaitHandle.WaitAny() + WaitHandle [] waitHandles = new WaitHandle[] { + waiterEntry.WaitHandle, + cancelEvent }; + + // Wait for an available resource, cancel event, or timeout. + + // During the wait we are supposes to exit the synchronization + // domain. (Placing true as the third argument of the WaitAny()) + // It just doesn't work, I don't know why, so I have two lock(this) + // statments instead of one. + + int index = STPEventWaitHandle.WaitAny( + waitHandles, + millisecondsTimeout, + true); + + lock(this) + { + // success is true if it got a work item. + bool success = (0 == index); + + // The timeout variable is used only for readability. + // (We treat cancel as timeout) + bool timeout = !success; + + // On timeout update the waiterEntry that it is timed out + if (timeout) + { + // The Timeout() fails if the waiter has already been signaled + timeout = waiterEntry.Timeout(); + + // On timeout remove the waiter from the queue. + // Note that the complexity is O(1). + if(timeout) + { + RemoveWaiter(waiterEntry, false); + } + + // Again readability + success = !timeout; + } + + // On success return the work item + if (success) + { + workItem = waiterEntry.WorkItem; + + if (null == workItem) + { + workItem = _workItems.Dequeue() as WorkItem; + } + } + } + // On failure return null. + return workItem; + } /// /// Cleanup the work items queue, hence no more work /// items are allowed to be queue /// - protected virtual void Cleanup() + private void Cleanup() { lock(this) { @@ -271,301 +311,312 @@ namespace Amib.Threading.Internal // Tell the waiters that they were timed out. // It won't signal them to exit, but to ignore their // next work item. - while(_waitersCount > 0) - { - WaiterEntry waiterEntry = PopWaiter(); - waiterEntry.Timeout(); - } + while(_waitersCount > 0) + { + WaiterEntry waiterEntry = PopWaiter(); + waiterEntry.Timeout(); + } } } - #endregion - - #region Private methods - - /// - /// Returns the WaiterEntry of the current thread - /// - /// - /// In order to avoid creation and destuction of WaiterEntry - /// objects each thread has its own WaiterEntry object. - private WaiterEntry GetThreadWaiterEntry() + public object[] GetStates() { - if (null == _waiterEntry) + lock (this) { - _waiterEntry = new WaiterEntry(); + object[] states = new object[_workItems.Count]; + int i = 0; + foreach (WorkItem workItem in _workItems) + { + states[i] = workItem.GetWorkItemResult().State; + ++i; + } + return states; } - _waiterEntry.Reset(); - return _waiterEntry; } - #region Waiters stack methods - - /// - /// Push a new waiter into the waiter's stack - /// - /// A waiter to put in the stack - public void PushWaiter(WaiterEntry newWaiterEntry) - { - // Remove the waiter if it is already in the stack and - // update waiter's count as needed - RemoveWaiter(newWaiterEntry, false); - - // If the stack is empty then newWaiterEntry is the new head of the stack - if (null == _headWaiterEntry._nextWaiterEntry) - { - _headWaiterEntry._nextWaiterEntry = newWaiterEntry; - newWaiterEntry._prevWaiterEntry = _headWaiterEntry; - - } - // If the stack is not empty then put newWaiterEntry as the new head - // of the stack. - else - { - // Save the old first waiter entry - WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry; + #endregion + + #region Private methods + + /// + /// Returns the WaiterEntry of the current thread + /// + /// + /// In order to avoid creation and destuction of WaiterEntry + /// objects each thread has its own WaiterEntry object. + private static WaiterEntry GetThreadWaiterEntry() + { + if (null == CurrentWaiterEntry) + { + CurrentWaiterEntry = new WaiterEntry(); + } + CurrentWaiterEntry.Reset(); + return CurrentWaiterEntry; + } + + #region Waiters stack methods + + /// + /// Push a new waiter into the waiter's stack + /// + /// A waiter to put in the stack + public void PushWaiter(WaiterEntry newWaiterEntry) + { + // Remove the waiter if it is already in the stack and + // update waiter's count as needed + RemoveWaiter(newWaiterEntry, false); + + // If the stack is empty then newWaiterEntry is the new head of the stack + if (null == _headWaiterEntry._nextWaiterEntry) + { + _headWaiterEntry._nextWaiterEntry = newWaiterEntry; + newWaiterEntry._prevWaiterEntry = _headWaiterEntry; + + } + // If the stack is not empty then put newWaiterEntry as the new head + // of the stack. + else + { + // Save the old first waiter entry + WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry; // Update the links - _headWaiterEntry._nextWaiterEntry = newWaiterEntry; - newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry; - newWaiterEntry._prevWaiterEntry = _headWaiterEntry; - oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry; - } - - // Increment the number of waiters - ++_waitersCount; - } - - /// - /// Pop a waiter from the waiter's stack - /// - /// Returns the first waiter in the stack - private WaiterEntry PopWaiter() - { - // Store the current stack head - WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry; - - // Store the new stack head - WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry; - - // Update the old stack head list links and decrement the number - // waiters. - RemoveWaiter(oldFirstWaiterEntry, true); - - // Update the new stack head - _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry; - if (null != newHeadWaiterEntry) - { - newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry; - } - - // Return the old stack head - return oldFirstWaiterEntry; - } - - /// - /// Remove a waiter from the stack - /// - /// A waiter entry to remove - /// If true the waiter count is always decremented - private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement) - { - // Store the prev entry in the list - WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry; - - // Store the next entry in the list - WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry; - - // A flag to indicate if we need to decrement the waiters count. - // If we got here from PopWaiter then we must decrement. - // If we got here from PushWaiter then we decrement only if - // the waiter was already in the stack. - bool decrementCounter = popDecrement; - - // Null the waiter's entry links - waiterEntry._prevWaiterEntry = null; - waiterEntry._nextWaiterEntry = null; - - // If the waiter entry had a prev link then update it. - // It also means that the waiter is already in the list and we - // need to decrement the waiters count. - if (null != prevWaiterEntry) - { - prevWaiterEntry._nextWaiterEntry = nextWaiterEntry; - decrementCounter = true; - } - - // If the waiter entry had a next link then update it. - // It also means that the waiter is already in the list and we - // need to decrement the waiters count. - if (null != nextWaiterEntry) - { - nextWaiterEntry._prevWaiterEntry = prevWaiterEntry; - decrementCounter = true; - } - - // Decrement the waiters count if needed - if (decrementCounter) - { - --_waitersCount; - } - } - - #endregion - - #endregion - - #region WaiterEntry class - - // A waiter entry in the _waiters queue. - public class WaiterEntry : IDisposable - { - #region Member variables - - /// - /// Event to signal the waiter that it got the work item. - /// - private AutoResetEvent _waitHandle = new AutoResetEvent(false); - - /// - /// Flag to know if this waiter already quited from the queue - /// because of a timeout. - /// - private bool _isTimedout = false; - - /// - /// Flag to know if the waiter was signaled and got a work item. - /// - private bool _isSignaled = false; + _headWaiterEntry._nextWaiterEntry = newWaiterEntry; + newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry; + newWaiterEntry._prevWaiterEntry = _headWaiterEntry; + oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry; + } + + // Increment the number of waiters + ++_waitersCount; + } + + /// + /// Pop a waiter from the waiter's stack + /// + /// Returns the first waiter in the stack + private WaiterEntry PopWaiter() + { + // Store the current stack head + WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry; + + // Store the new stack head + WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry; + + // Update the old stack head list links and decrement the number + // waiters. + RemoveWaiter(oldFirstWaiterEntry, true); + + // Update the new stack head + _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry; + if (null != newHeadWaiterEntry) + { + newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry; + } + + // Return the old stack head + return oldFirstWaiterEntry; + } + + /// + /// Remove a waiter from the stack + /// + /// A waiter entry to remove + /// If true the waiter count is always decremented + private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement) + { + // Store the prev entry in the list + WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry; + + // Store the next entry in the list + WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry; + + // A flag to indicate if we need to decrement the waiters count. + // If we got here from PopWaiter then we must decrement. + // If we got here from PushWaiter then we decrement only if + // the waiter was already in the stack. + bool decrementCounter = popDecrement; + + // Null the waiter's entry links + waiterEntry._prevWaiterEntry = null; + waiterEntry._nextWaiterEntry = null; + + // If the waiter entry had a prev link then update it. + // It also means that the waiter is already in the list and we + // need to decrement the waiters count. + if (null != prevWaiterEntry) + { + prevWaiterEntry._nextWaiterEntry = nextWaiterEntry; + decrementCounter = true; + } + + // If the waiter entry had a next link then update it. + // It also means that the waiter is already in the list and we + // need to decrement the waiters count. + if (null != nextWaiterEntry) + { + nextWaiterEntry._prevWaiterEntry = prevWaiterEntry; + decrementCounter = true; + } + + // Decrement the waiters count if needed + if (decrementCounter) + { + --_waitersCount; + } + } + + #endregion + + #endregion + + #region WaiterEntry class + + // A waiter entry in the _waiters queue. + public sealed class WaiterEntry : IDisposable + { + #region Member variables + + /// + /// Event to signal the waiter that it got the work item. + /// + //private AutoResetEvent _waitHandle = new AutoResetEvent(false); + private AutoResetEvent _waitHandle = EventWaitHandleFactory.CreateAutoResetEvent(); + + /// + /// Flag to know if this waiter already quited from the queue + /// because of a timeout. + /// + private bool _isTimedout = false; + + /// + /// Flag to know if the waiter was signaled and got a work item. + /// + private bool _isSignaled = false; /// /// A work item that passed directly to the waiter withou going /// through the queue /// - private WorkItem _workItem = null; + private WorkItem _workItem = null; private bool _isDisposed = false; - // Linked list members - internal WaiterEntry _nextWaiterEntry = null; - internal WaiterEntry _prevWaiterEntry = null; - - #endregion - - #region Construction - - public WaiterEntry() - { - Reset(); - } - - #endregion - - #region Public methods - - public WaitHandle WaitHandle - { - get { return _waitHandle; } - } - - public WorkItem WorkItem - { - get - { - lock(this) - { - return _workItem; - } - } - } - - /// - /// Signal the waiter that it got a work item. - /// - /// Return true on success - /// The method fails if Timeout() preceded its call - public bool Signal(WorkItem workItem) - { - lock(this) - { - if (!_isTimedout) - { - _workItem = workItem; - _isSignaled = true; - _waitHandle.Set(); - return true; - } - } - return false; - } - - /// - /// Mark the wait entry that it has been timed out - /// - /// Return true on success - /// The method fails if Signal() preceded its call - public bool Timeout() - { - lock(this) + // Linked list members + internal WaiterEntry _nextWaiterEntry = null; + internal WaiterEntry _prevWaiterEntry = null; + + #endregion + + #region Construction + + public WaiterEntry() + { + Reset(); + } + + #endregion + + #region Public methods + + public WaitHandle WaitHandle + { + get { return _waitHandle; } + } + + public WorkItem WorkItem + { + get + { + return _workItem; + } + } + + /// + /// Signal the waiter that it got a work item. + /// + /// Return true on success + /// The method fails if Timeout() preceded its call + public bool Signal(WorkItem workItem) + { + lock(this) + { + if (!_isTimedout) + { + _workItem = workItem; + _isSignaled = true; + _waitHandle.Set(); + return true; + } + } + return false; + } + + /// + /// Mark the wait entry that it has been timed out + /// + /// Return true on success + /// The method fails if Signal() preceded its call + public bool Timeout() + { + lock(this) + { + // Time out can happen only if the waiter wasn't marked as + // signaled + if (!_isSignaled) + { + // We don't remove the waiter from the queue, the DequeueWorkItem + // method skips _waiters that were timed out. + _isTimedout = true; + return true; + } + } + return false; + } + + /// + /// Reset the wait entry so it can be used again + /// + public void Reset() + { + _workItem = null; + _isTimedout = false; + _isSignaled = false; + _waitHandle.Reset(); + } + + /// + /// Free resources + /// + public void Close() + { + if (null != _waitHandle) + { + _waitHandle.Close(); + _waitHandle = null; + } + } + + #endregion + + #region IDisposable Members + + public void Dispose() + { + lock (this) { - // Time out can happen only if the waiter wasn't marked as - // signaled - if (!_isSignaled) + if (!_isDisposed) { - // We don't remove the waiter from the queue, the DequeueWorkItem - // method skips _waiters that were timed out. - _isTimedout = true; - return true; + Close(); } - } - return false; - } - - /// - /// Reset the wait entry so it can be used again - /// - public void Reset() - { - _workItem = null; - _isTimedout = false; - _isSignaled = false; - _waitHandle.Reset(); - } - - /// - /// Free resources - /// - public void Close() - { - if (null != _waitHandle) - { - _waitHandle.Close(); - _waitHandle = null; - } - } - - #endregion - - #region IDisposable Members - - public void Dispose() - { - if (!_isDisposed) - { - Close(); _isDisposed = true; } } - ~WaiterEntry() - { - Dispose(); - } + #endregion + } - #endregion - } - - #endregion + #endregion #region IDisposable Members @@ -574,14 +625,8 @@ namespace Amib.Threading.Internal if (!_isDisposed) { Cleanup(); - _isDisposed = true; - GC.SuppressFinalize(this); } - } - - ~WorkItemsQueue() - { - Cleanup(); + _isDisposed = true; } private void ValidateNotDisposed() @@ -595,6 +640,6 @@ namespace Amib.Threading.Internal #endregion } - #endregion + #endregion } -- cgit v1.1