From 134f86e8d5c414409631b25b8c6f0ee45fbd8631 Mon Sep 17 00:00:00 2001 From: David Walter Seikel Date: Thu, 3 Nov 2016 21:44:39 +1000 Subject: Initial update to OpenSim 0.8.2.1 source code. --- ThirdParty/SmartThreadPool/WorkItemsGroup.cs | 759 +++++++++++---------------- 1 file changed, 304 insertions(+), 455 deletions(-) (limited to 'ThirdParty/SmartThreadPool/WorkItemsGroup.cs') diff --git a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs index 01ac8dd..d9d34ac 100644 --- a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs +++ b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs @@ -1,6 +1,3 @@ -// Ami Bar -// amibar@gmail.com - using System; using System.Threading; using System.Runtime.CompilerServices; @@ -8,505 +5,357 @@ using System.Diagnostics; namespace Amib.Threading.Internal { - #region WorkItemsGroup class - - /// - /// Summary description for WorkItemsGroup. - /// - public class WorkItemsGroup : IWorkItemsGroup - { - #region Private members - - private object _lock = new object(); - /// - /// Contains the name of this instance of SmartThreadPool. - /// Can be changed by the user. - /// - private string _name = "WorkItemsGroup"; - - /// - /// A reference to the SmartThreadPool instance that created this - /// WorkItemsGroup. - /// - private SmartThreadPool _stp; - - /// - /// The OnIdle event - /// - private event WorkItemsGroupIdleHandler _onIdle; - /// - /// Defines how many work items of this WorkItemsGroup can run at once. - /// - private int _concurrency; + #region WorkItemsGroup class - /// - /// Priority queue to hold work items before they are passed - /// to the SmartThreadPool. - /// - private PriorityQueue _workItemsQueue; + /// + /// Summary description for WorkItemsGroup. + /// + public class WorkItemsGroup : WorkItemsGroupBase + { + #region Private members - /// - /// Indicate how many work items are waiting in the SmartThreadPool - /// queue. - /// This value is used to apply the concurrency. - /// - private int _workItemsInStpQueue; + private readonly object _lock = new object(); - /// - /// Indicate how many work items are currently running in the SmartThreadPool. - /// This value is used with the Cancel, to calculate if we can send new - /// work items to the STP. - /// - private int _workItemsExecutingInStp = 0; + /// + /// A reference to the SmartThreadPool instance that created this + /// WorkItemsGroup. + /// + private readonly SmartThreadPool _stp; - /// - /// WorkItemsGroup start information - /// - private WIGStartInfo _workItemsGroupStartInfo; + /// + /// The OnIdle event + /// + private event WorkItemsGroupIdleHandler _onIdle; /// - /// Signaled when all of the WorkItemsGroup's work item completed. + /// A flag to indicate if the Work Items Group is now suspended. /// - private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); - - /// - /// A common object for all the work items that this work items group - /// generate so we can mark them to cancel in O(1) - /// - private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); - - #endregion - - #region Construction - - public WorkItemsGroup( - SmartThreadPool stp, - int concurrency, - WIGStartInfo wigStartInfo) + private bool _isSuspended; + + /// + /// Defines how many work items of this WorkItemsGroup can run at once. + /// + private int _concurrency; + + /// + /// Priority queue to hold work items before they are passed + /// to the SmartThreadPool. + /// + private readonly PriorityQueue _workItemsQueue; + + /// + /// Indicate how many work items are waiting in the SmartThreadPool + /// queue. + /// This value is used to apply the concurrency. + /// + private int _workItemsInStpQueue; + + /// + /// Indicate how many work items are currently running in the SmartThreadPool. + /// This value is used with the Cancel, to calculate if we can send new + /// work items to the STP. + /// + private int _workItemsExecutingInStp = 0; + + /// + /// WorkItemsGroup start information + /// + private readonly WIGStartInfo _workItemsGroupStartInfo; + + /// + /// Signaled when all of the WorkItemsGroup's work item completed. + /// + //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); + private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true); + + /// + /// A common object for all the work items that this work items group + /// generate so we can mark them to cancel in O(1) + /// + private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); + + #endregion + + #region Construction + + public WorkItemsGroup( + SmartThreadPool stp, + int concurrency, + WIGStartInfo wigStartInfo) + { + if (concurrency <= 0) + { + throw new ArgumentOutOfRangeException( + "concurrency", +#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE) + concurrency, +#endif + "concurrency must be greater than zero"); + } + _stp = stp; + _concurrency = concurrency; + _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly(); + _workItemsQueue = new PriorityQueue(); + Name = "WorkItemsGroup"; + + // The _workItemsInStpQueue gets the number of currently executing work items, + // because once a work item is executing, it cannot be cancelled. + _workItemsInStpQueue = _workItemsExecutingInStp; + + _isSuspended = _workItemsGroupStartInfo.StartSuspended; + } + + #endregion + + #region WorkItemsGroupBase Overrides + + public override int Concurrency { - if (concurrency <= 0) - { - throw new ArgumentOutOfRangeException("concurrency", concurrency, "concurrency must be greater than zero"); - } - _stp = stp; - _concurrency = concurrency; - _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo); - _workItemsQueue = new PriorityQueue(); - - // The _workItemsInStpQueue gets the number of currently executing work items, - // because once a work item is executing, it cannot be cancelled. - _workItemsInStpQueue = _workItemsExecutingInStp; - } - - #endregion - - #region IWorkItemsGroup implementation - - /// - /// Get/Set the name of the SmartThreadPool instance - /// - public string Name - { - get - { - return _name; - } - + get { return _concurrency; } set { - _name = value; - } - } - - /// - /// Queue a work item - /// - /// A callback to execute - /// Returns a work item result - public IWorkItemResult QueueWorkItem(WorkItemCallback callback) - { - WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback); - EnqueueToSTPNextWorkItem(workItem); - return workItem.GetWorkItemResult(); - } + Debug.Assert(value > 0); - /// - /// Queue a work item - /// - /// A callback to execute - /// The priority of the work item - /// Returns a work item result - public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority) - { - WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, workItemPriority); - EnqueueToSTPNextWorkItem(workItem); - return workItem.GetWorkItemResult(); - } - - /// - /// Queue a work item - /// - /// Work item info - /// A callback to execute - /// Returns a work item result - public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback) - { - WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback); - EnqueueToSTPNextWorkItem(workItem); - return workItem.GetWorkItemResult(); - } - - /// - /// Queue a work item - /// - /// A callback to execute - /// - /// The context object of the work item. Used for passing arguments to the work item. - /// - /// Returns a work item result - public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state) - { - WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state); - EnqueueToSTPNextWorkItem(workItem); - return workItem.GetWorkItemResult(); - } - - /// - /// Queue a work item - /// - /// A callback to execute - /// - /// The context object of the work item. Used for passing arguments to the work item. - /// - /// The work item priority - /// Returns a work item result - public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority) - { - WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, workItemPriority); - EnqueueToSTPNextWorkItem(workItem); - return workItem.GetWorkItemResult(); - } - - /// - /// Queue a work item - /// - /// Work item information - /// A callback to execute - /// - /// The context object of the work item. Used for passing arguments to the work item. - /// - /// Returns a work item result - public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state) - { - WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback, state); - EnqueueToSTPNextWorkItem(workItem); - return workItem.GetWorkItemResult(); - } - - /// - /// Queue a work item - /// - /// A callback to execute - /// - /// The context object of the work item. Used for passing arguments to the work item. - /// - /// - /// A delegate to call after the callback completion - /// - /// Returns a work item result - public IWorkItemResult QueueWorkItem( - WorkItemCallback callback, - object state, - PostExecuteWorkItemCallback postExecuteWorkItemCallback) - { - WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback); - EnqueueToSTPNextWorkItem(workItem); - return workItem.GetWorkItemResult(); - } - - /// - /// Queue a work item - /// - /// A callback to execute - /// - /// The context object of the work item. Used for passing arguments to the work item. - /// - /// - /// A delegate to call after the callback completion - /// - /// The work item priority - /// Returns a work item result - public IWorkItemResult QueueWorkItem( - WorkItemCallback callback, - object state, - PostExecuteWorkItemCallback postExecuteWorkItemCallback, - WorkItemPriority workItemPriority) - { - WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority); - EnqueueToSTPNextWorkItem(workItem); - return workItem.GetWorkItemResult(); - } - - /// - /// Queue a work item - /// - /// A callback to execute - /// - /// The context object of the work item. Used for passing arguments to the work item. - /// - /// - /// A delegate to call after the callback completion - /// - /// Indicates on which cases to call to the post execute callback - /// Returns a work item result - public IWorkItemResult QueueWorkItem( - WorkItemCallback callback, - object state, - PostExecuteWorkItemCallback postExecuteWorkItemCallback, - CallToPostExecute callToPostExecute) - { - WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute); - EnqueueToSTPNextWorkItem(workItem); - return workItem.GetWorkItemResult(); + int diff = value - _concurrency; + _concurrency = value; + if (diff > 0) + { + EnqueueToSTPNextNWorkItem(diff); + } + } } - /// - /// Queue a work item - /// - /// A callback to execute - /// - /// The context object of the work item. Used for passing arguments to the work item. - /// - /// - /// A delegate to call after the callback completion - /// - /// Indicates on which cases to call to the post execute callback - /// The work item priority - /// Returns a work item result - public IWorkItemResult QueueWorkItem( - WorkItemCallback callback, - object state, - PostExecuteWorkItemCallback postExecuteWorkItemCallback, - CallToPostExecute callToPostExecute, - WorkItemPriority workItemPriority) + public override int WaitingCallbacks { - WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority); - EnqueueToSTPNextWorkItem(workItem); - return workItem.GetWorkItemResult(); + get { return _workItemsQueue.Count; } } - /// - /// Wait for the thread pool to be idle - /// - public void WaitForIdle() + public override object[] GetStates() { - WaitForIdle(Timeout.Infinite); + lock (_lock) + { + object[] states = new object[_workItemsQueue.Count]; + int i = 0; + foreach (WorkItem workItem in _workItemsQueue) + { + states[i] = workItem.GetWorkItemResult().State; + ++i; + } + return states; + } } - /// - /// Wait for the thread pool to be idle + /// + /// WorkItemsGroup start information /// - public bool WaitForIdle(TimeSpan timeout) + public override WIGStartInfo WIGStartInfo { - return WaitForIdle((int)timeout.TotalMilliseconds); + get { return _workItemsGroupStartInfo; } } - /// + /// + /// Start the Work Items Group if it was started suspended + /// + public override void Start() + { + // If the Work Items Group already started then quit + if (!_isSuspended) + { + return; + } + _isSuspended = false; + + EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency)); + } + + public override void Cancel(bool abortExecution) + { + lock (_lock) + { + _canceledWorkItemsGroup.IsCanceled = true; + _workItemsQueue.Clear(); + _workItemsInStpQueue = 0; + _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); + } + + if (abortExecution) + { + _stp.CancelAbortWorkItemsGroup(this); + } + } + + /// /// Wait for the thread pool to be idle /// - public bool WaitForIdle(int millisecondsTimeout) + public override bool WaitForIdle(int millisecondsTimeout) { - _stp.ValidateWorkItemsGroupWaitForIdle(this); - return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false); + SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this); + return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false); } - public int WaitingCallbacks - { - get - { - return _workItemsQueue.Count; - } - } + public override event WorkItemsGroupIdleHandler OnIdle + { + add { _onIdle += value; } + remove { _onIdle -= value; } + } - public event WorkItemsGroupIdleHandler OnIdle - { - add - { - _onIdle += value; - } - remove - { - _onIdle -= value; - } - } + #endregion - public void Cancel() - { - lock(_lock) - { - _canceledWorkItemsGroup.IsCanceled = true; - _workItemsQueue.Clear(); - _workItemsInStpQueue = 0; - _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); - } - } + #region Private methods - public void Start() - { - lock (this) - { - if (!_workItemsGroupStartInfo.StartSuspended) - { - return; - } - _workItemsGroupStartInfo.StartSuspended = false; - } - - for(int i = 0; i < _concurrency; ++i) - { - EnqueueToSTPNextWorkItem(null, false); - } - } - - #endregion + private void RegisterToWorkItemCompletion(IWorkItemResult wir) + { + IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir; + iwir.OnWorkItemStarted += OnWorkItemStartedCallback; + iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback; + } - #region Private methods - - private void RegisterToWorkItemCompletion(IWorkItemResult wir) - { - IInternalWorkItemResult iwir = wir as IInternalWorkItemResult; - iwir.OnWorkItemStarted += new WorkItemStateCallback(OnWorkItemStartedCallback); - iwir.OnWorkItemCompleted += new WorkItemStateCallback(OnWorkItemCompletedCallback); - } - - public void OnSTPIsStarting() - { - lock (this) - { - if (_workItemsGroupStartInfo.StartSuspended) - { - return; - } - } - - for(int i = 0; i < _concurrency; ++i) - { - EnqueueToSTPNextWorkItem(null, false); - } - } - - private object FireOnIdle(object state) - { - FireOnIdleImpl(_onIdle); - return null; - } - - [MethodImpl(MethodImplOptions.NoInlining)] - private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle) - { - if(null == onIdle) + public void OnSTPIsStarting() + { + if (_isSuspended) { return; } + + EnqueueToSTPNextNWorkItem(_concurrency); + } - Delegate[] delegates = onIdle.GetInvocationList(); - foreach(WorkItemsGroupIdleHandler eh in delegates) - { - try - { - eh(this); - } - // Ignore exceptions - catch{} - } - } - - private void OnWorkItemStartedCallback(WorkItem workItem) + public void EnqueueToSTPNextNWorkItem(int count) { - lock(_lock) + for (int i = 0; i < count; ++i) { - ++_workItemsExecutingInStp; + EnqueueToSTPNextWorkItem(null, false); } } - private void OnWorkItemCompletedCallback(WorkItem workItem) - { - EnqueueToSTPNextWorkItem(null, true); - } - - private void EnqueueToSTPNextWorkItem(WorkItem workItem) + private object FireOnIdle(object state) + { + FireOnIdleImpl(_onIdle); + return null; + } + + [MethodImpl(MethodImplOptions.NoInlining)] + private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle) + { + if(null == onIdle) + { + return; + } + + Delegate[] delegates = onIdle.GetInvocationList(); + foreach(WorkItemsGroupIdleHandler eh in delegates) + { + try + { + eh(this); + } + catch { } // Suppress exceptions + } + } + + private void OnWorkItemStartedCallback(WorkItem workItem) + { + lock(_lock) + { + ++_workItemsExecutingInStp; + } + } + + private void OnWorkItemCompletedCallback(WorkItem workItem) + { + EnqueueToSTPNextWorkItem(null, true); + } + + internal override void Enqueue(WorkItem workItem) { - EnqueueToSTPNextWorkItem(workItem, false); + EnqueueToSTPNextWorkItem(workItem); } - private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue) - { - lock(_lock) - { - // Got here from OnWorkItemCompletedCallback() - if (decrementWorkItemsInStpQueue) - { - --_workItemsInStpQueue; - - if(_workItemsInStpQueue < 0) - { - _workItemsInStpQueue = 0; - } - - --_workItemsExecutingInStp; - - if(_workItemsExecutingInStp < 0) - { - _workItemsExecutingInStp = 0; - } - } - - // If the work item is not null then enqueue it - if (null != workItem) - { - workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup; - - RegisterToWorkItemCompletion(workItem.GetWorkItemResult()); - _workItemsQueue.Enqueue(workItem); - //_stp.IncrementWorkItemsCount(); - - if ((1 == _workItemsQueue.Count) && - (0 == _workItemsInStpQueue)) - { - _stp.RegisterWorkItemsGroup(this); - Trace.WriteLine("WorkItemsGroup " + Name + " is NOT idle"); + private void EnqueueToSTPNextWorkItem(WorkItem workItem) + { + EnqueueToSTPNextWorkItem(workItem, false); + } + + private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue) + { + lock(_lock) + { + // Got here from OnWorkItemCompletedCallback() + if (decrementWorkItemsInStpQueue) + { + --_workItemsInStpQueue; + + if(_workItemsInStpQueue < 0) + { + _workItemsInStpQueue = 0; + } + + --_workItemsExecutingInStp; + + if(_workItemsExecutingInStp < 0) + { + _workItemsExecutingInStp = 0; + } + } + + // If the work item is not null then enqueue it + if (null != workItem) + { + workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup; + + RegisterToWorkItemCompletion(workItem.GetWorkItemResult()); + _workItemsQueue.Enqueue(workItem); + //_stp.IncrementWorkItemsCount(); + + if ((1 == _workItemsQueue.Count) && + (0 == _workItemsInStpQueue)) + { + _stp.RegisterWorkItemsGroup(this); + IsIdle = false; _isIdleWaitHandle.Reset(); - } - } - - // If the work items queue of the group is empty than quit - if (0 == _workItemsQueue.Count) - { - if (0 == _workItemsInStpQueue) - { - _stp.UnregisterWorkItemsGroup(this); - Trace.WriteLine("WorkItemsGroup " + Name + " is idle"); + } + } + + // If the work items queue of the group is empty than quit + if (0 == _workItemsQueue.Count) + { + if (0 == _workItemsInStpQueue) + { + _stp.UnregisterWorkItemsGroup(this); + IsIdle = true; _isIdleWaitHandle.Set(); - _stp.QueueWorkItem(new WorkItemCallback(this.FireOnIdle)); - } - return; - } - - if (!_workItemsGroupStartInfo.StartSuspended) - { - if (_workItemsInStpQueue < _concurrency) - { - WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem; - _stp.Enqueue(nextWorkItem, true); - ++_workItemsInStpQueue; - } - } - } - } - - #endregion + if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0) + { + _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle)); + } + } + return; + } + + if (!_isSuspended) + { + if (_workItemsInStpQueue < _concurrency) + { + WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem; + try + { + _stp.Enqueue(nextWorkItem); + } + catch (ObjectDisposedException e) + { + e.GetHashCode(); + // The STP has been shutdown + } + + ++_workItemsInStpQueue; + } + } + } + } + + #endregion } - #endregion + #endregion } -- cgit v1.1