From 134f86e8d5c414409631b25b8c6f0ee45fbd8631 Mon Sep 17 00:00:00 2001
From: David Walter Seikel
Date: Thu, 3 Nov 2016 21:44:39 +1000
Subject: Initial update to OpenSim 0.8.2.1 source code.
---
ThirdParty/SmartThreadPool/WorkItemsGroup.cs | 759 +++++++++++----------------
1 file changed, 304 insertions(+), 455 deletions(-)
(limited to 'ThirdParty/SmartThreadPool/WorkItemsGroup.cs')
diff --git a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
index 01ac8dd..d9d34ac 100644
--- a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
+++ b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
@@ -1,6 +1,3 @@
-// Ami Bar
-// amibar@gmail.com
-
using System;
using System.Threading;
using System.Runtime.CompilerServices;
@@ -8,505 +5,357 @@ using System.Diagnostics;
namespace Amib.Threading.Internal
{
- #region WorkItemsGroup class
-
- ///
- /// Summary description for WorkItemsGroup.
- ///
- public class WorkItemsGroup : IWorkItemsGroup
- {
- #region Private members
-
- private object _lock = new object();
- ///
- /// Contains the name of this instance of SmartThreadPool.
- /// Can be changed by the user.
- ///
- private string _name = "WorkItemsGroup";
-
- ///
- /// A reference to the SmartThreadPool instance that created this
- /// WorkItemsGroup.
- ///
- private SmartThreadPool _stp;
-
- ///
- /// The OnIdle event
- ///
- private event WorkItemsGroupIdleHandler _onIdle;
- ///
- /// Defines how many work items of this WorkItemsGroup can run at once.
- ///
- private int _concurrency;
+ #region WorkItemsGroup class
- ///
- /// Priority queue to hold work items before they are passed
- /// to the SmartThreadPool.
- ///
- private PriorityQueue _workItemsQueue;
+ ///
+ /// Summary description for WorkItemsGroup.
+ ///
+ public class WorkItemsGroup : WorkItemsGroupBase
+ {
+ #region Private members
- ///
- /// Indicate how many work items are waiting in the SmartThreadPool
- /// queue.
- /// This value is used to apply the concurrency.
- ///
- private int _workItemsInStpQueue;
+ private readonly object _lock = new object();
- ///
- /// Indicate how many work items are currently running in the SmartThreadPool.
- /// This value is used with the Cancel, to calculate if we can send new
- /// work items to the STP.
- ///
- private int _workItemsExecutingInStp = 0;
+ ///
+ /// A reference to the SmartThreadPool instance that created this
+ /// WorkItemsGroup.
+ ///
+ private readonly SmartThreadPool _stp;
- ///
- /// WorkItemsGroup start information
- ///
- private WIGStartInfo _workItemsGroupStartInfo;
+ ///
+ /// The OnIdle event
+ ///
+ private event WorkItemsGroupIdleHandler _onIdle;
///
- /// Signaled when all of the WorkItemsGroup's work item completed.
+ /// A flag to indicate if the Work Items Group is now suspended.
///
- private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
-
- ///
- /// A common object for all the work items that this work items group
- /// generate so we can mark them to cancel in O(1)
- ///
- private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
-
- #endregion
-
- #region Construction
-
- public WorkItemsGroup(
- SmartThreadPool stp,
- int concurrency,
- WIGStartInfo wigStartInfo)
+ private bool _isSuspended;
+
+ ///
+ /// Defines how many work items of this WorkItemsGroup can run at once.
+ ///
+ private int _concurrency;
+
+ ///
+ /// Priority queue to hold work items before they are passed
+ /// to the SmartThreadPool.
+ ///
+ private readonly PriorityQueue _workItemsQueue;
+
+ ///
+ /// Indicate how many work items are waiting in the SmartThreadPool
+ /// queue.
+ /// This value is used to apply the concurrency.
+ ///
+ private int _workItemsInStpQueue;
+
+ ///
+ /// Indicate how many work items are currently running in the SmartThreadPool.
+ /// This value is used with the Cancel, to calculate if we can send new
+ /// work items to the STP.
+ ///
+ private int _workItemsExecutingInStp = 0;
+
+ ///
+ /// WorkItemsGroup start information
+ ///
+ private readonly WIGStartInfo _workItemsGroupStartInfo;
+
+ ///
+ /// Signaled when all of the WorkItemsGroup's work item completed.
+ ///
+ //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
+ private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true);
+
+ ///
+ /// A common object for all the work items that this work items group
+ /// generate so we can mark them to cancel in O(1)
+ ///
+ private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
+
+ #endregion
+
+ #region Construction
+
+ public WorkItemsGroup(
+ SmartThreadPool stp,
+ int concurrency,
+ WIGStartInfo wigStartInfo)
+ {
+ if (concurrency <= 0)
+ {
+ throw new ArgumentOutOfRangeException(
+ "concurrency",
+#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
+ concurrency,
+#endif
+ "concurrency must be greater than zero");
+ }
+ _stp = stp;
+ _concurrency = concurrency;
+ _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly();
+ _workItemsQueue = new PriorityQueue();
+ Name = "WorkItemsGroup";
+
+ // The _workItemsInStpQueue gets the number of currently executing work items,
+ // because once a work item is executing, it cannot be cancelled.
+ _workItemsInStpQueue = _workItemsExecutingInStp;
+
+ _isSuspended = _workItemsGroupStartInfo.StartSuspended;
+ }
+
+ #endregion
+
+ #region WorkItemsGroupBase Overrides
+
+ public override int Concurrency
{
- if (concurrency <= 0)
- {
- throw new ArgumentOutOfRangeException("concurrency", concurrency, "concurrency must be greater than zero");
- }
- _stp = stp;
- _concurrency = concurrency;
- _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo);
- _workItemsQueue = new PriorityQueue();
-
- // The _workItemsInStpQueue gets the number of currently executing work items,
- // because once a work item is executing, it cannot be cancelled.
- _workItemsInStpQueue = _workItemsExecutingInStp;
- }
-
- #endregion
-
- #region IWorkItemsGroup implementation
-
- ///
- /// Get/Set the name of the SmartThreadPool instance
- ///
- public string Name
- {
- get
- {
- return _name;
- }
-
+ get { return _concurrency; }
set
{
- _name = value;
- }
- }
-
- ///
- /// Queue a work item
- ///
- /// A callback to execute
- /// Returns a work item result
- public IWorkItemResult QueueWorkItem(WorkItemCallback callback)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
+ Debug.Assert(value > 0);
- ///
- /// Queue a work item
- ///
- /// A callback to execute
- /// The priority of the work item
- /// Returns a work item result
- public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, workItemPriority);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
-
- ///
- /// Queue a work item
- ///
- /// Work item info
- /// A callback to execute
- /// Returns a work item result
- public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
-
- ///
- /// Queue a work item
- ///
- /// A callback to execute
- ///
- /// The context object of the work item. Used for passing arguments to the work item.
- ///
- /// Returns a work item result
- public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
-
- ///
- /// Queue a work item
- ///
- /// A callback to execute
- ///
- /// The context object of the work item. Used for passing arguments to the work item.
- ///
- /// The work item priority
- /// Returns a work item result
- public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, workItemPriority);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
-
- ///
- /// Queue a work item
- ///
- /// Work item information
- /// A callback to execute
- ///
- /// The context object of the work item. Used for passing arguments to the work item.
- ///
- /// Returns a work item result
- public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback, state);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
-
- ///
- /// Queue a work item
- ///
- /// A callback to execute
- ///
- /// The context object of the work item. Used for passing arguments to the work item.
- ///
- ///
- /// A delegate to call after the callback completion
- ///
- /// Returns a work item result
- public IWorkItemResult QueueWorkItem(
- WorkItemCallback callback,
- object state,
- PostExecuteWorkItemCallback postExecuteWorkItemCallback)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
-
- ///
- /// Queue a work item
- ///
- /// A callback to execute
- ///
- /// The context object of the work item. Used for passing arguments to the work item.
- ///
- ///
- /// A delegate to call after the callback completion
- ///
- /// The work item priority
- /// Returns a work item result
- public IWorkItemResult QueueWorkItem(
- WorkItemCallback callback,
- object state,
- PostExecuteWorkItemCallback postExecuteWorkItemCallback,
- WorkItemPriority workItemPriority)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
- }
-
- ///
- /// Queue a work item
- ///
- /// A callback to execute
- ///
- /// The context object of the work item. Used for passing arguments to the work item.
- ///
- ///
- /// A delegate to call after the callback completion
- ///
- /// Indicates on which cases to call to the post execute callback
- /// Returns a work item result
- public IWorkItemResult QueueWorkItem(
- WorkItemCallback callback,
- object state,
- PostExecuteWorkItemCallback postExecuteWorkItemCallback,
- CallToPostExecute callToPostExecute)
- {
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
+ int diff = value - _concurrency;
+ _concurrency = value;
+ if (diff > 0)
+ {
+ EnqueueToSTPNextNWorkItem(diff);
+ }
+ }
}
- ///
- /// Queue a work item
- ///
- /// A callback to execute
- ///
- /// The context object of the work item. Used for passing arguments to the work item.
- ///
- ///
- /// A delegate to call after the callback completion
- ///
- /// Indicates on which cases to call to the post execute callback
- /// The work item priority
- /// Returns a work item result
- public IWorkItemResult QueueWorkItem(
- WorkItemCallback callback,
- object state,
- PostExecuteWorkItemCallback postExecuteWorkItemCallback,
- CallToPostExecute callToPostExecute,
- WorkItemPriority workItemPriority)
+ public override int WaitingCallbacks
{
- WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority);
- EnqueueToSTPNextWorkItem(workItem);
- return workItem.GetWorkItemResult();
+ get { return _workItemsQueue.Count; }
}
- ///
- /// Wait for the thread pool to be idle
- ///
- public void WaitForIdle()
+ public override object[] GetStates()
{
- WaitForIdle(Timeout.Infinite);
+ lock (_lock)
+ {
+ object[] states = new object[_workItemsQueue.Count];
+ int i = 0;
+ foreach (WorkItem workItem in _workItemsQueue)
+ {
+ states[i] = workItem.GetWorkItemResult().State;
+ ++i;
+ }
+ return states;
+ }
}
- ///
- /// Wait for the thread pool to be idle
+ ///
+ /// WorkItemsGroup start information
///
- public bool WaitForIdle(TimeSpan timeout)
+ public override WIGStartInfo WIGStartInfo
{
- return WaitForIdle((int)timeout.TotalMilliseconds);
+ get { return _workItemsGroupStartInfo; }
}
- ///
+ ///
+ /// Start the Work Items Group if it was started suspended
+ ///
+ public override void Start()
+ {
+ // If the Work Items Group already started then quit
+ if (!_isSuspended)
+ {
+ return;
+ }
+ _isSuspended = false;
+
+ EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency));
+ }
+
+ public override void Cancel(bool abortExecution)
+ {
+ lock (_lock)
+ {
+ _canceledWorkItemsGroup.IsCanceled = true;
+ _workItemsQueue.Clear();
+ _workItemsInStpQueue = 0;
+ _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
+ }
+
+ if (abortExecution)
+ {
+ _stp.CancelAbortWorkItemsGroup(this);
+ }
+ }
+
+ ///
/// Wait for the thread pool to be idle
///
- public bool WaitForIdle(int millisecondsTimeout)
+ public override bool WaitForIdle(int millisecondsTimeout)
{
- _stp.ValidateWorkItemsGroupWaitForIdle(this);
- return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false);
+ SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this);
+ return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
}
- public int WaitingCallbacks
- {
- get
- {
- return _workItemsQueue.Count;
- }
- }
+ public override event WorkItemsGroupIdleHandler OnIdle
+ {
+ add { _onIdle += value; }
+ remove { _onIdle -= value; }
+ }
- public event WorkItemsGroupIdleHandler OnIdle
- {
- add
- {
- _onIdle += value;
- }
- remove
- {
- _onIdle -= value;
- }
- }
+ #endregion
- public void Cancel()
- {
- lock(_lock)
- {
- _canceledWorkItemsGroup.IsCanceled = true;
- _workItemsQueue.Clear();
- _workItemsInStpQueue = 0;
- _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
- }
- }
+ #region Private methods
- public void Start()
- {
- lock (this)
- {
- if (!_workItemsGroupStartInfo.StartSuspended)
- {
- return;
- }
- _workItemsGroupStartInfo.StartSuspended = false;
- }
-
- for(int i = 0; i < _concurrency; ++i)
- {
- EnqueueToSTPNextWorkItem(null, false);
- }
- }
-
- #endregion
+ private void RegisterToWorkItemCompletion(IWorkItemResult wir)
+ {
+ IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir;
+ iwir.OnWorkItemStarted += OnWorkItemStartedCallback;
+ iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback;
+ }
- #region Private methods
-
- private void RegisterToWorkItemCompletion(IWorkItemResult wir)
- {
- IInternalWorkItemResult iwir = wir as IInternalWorkItemResult;
- iwir.OnWorkItemStarted += new WorkItemStateCallback(OnWorkItemStartedCallback);
- iwir.OnWorkItemCompleted += new WorkItemStateCallback(OnWorkItemCompletedCallback);
- }
-
- public void OnSTPIsStarting()
- {
- lock (this)
- {
- if (_workItemsGroupStartInfo.StartSuspended)
- {
- return;
- }
- }
-
- for(int i = 0; i < _concurrency; ++i)
- {
- EnqueueToSTPNextWorkItem(null, false);
- }
- }
-
- private object FireOnIdle(object state)
- {
- FireOnIdleImpl(_onIdle);
- return null;
- }
-
- [MethodImpl(MethodImplOptions.NoInlining)]
- private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
- {
- if(null == onIdle)
+ public void OnSTPIsStarting()
+ {
+ if (_isSuspended)
{
return;
}
+
+ EnqueueToSTPNextNWorkItem(_concurrency);
+ }
- Delegate[] delegates = onIdle.GetInvocationList();
- foreach(WorkItemsGroupIdleHandler eh in delegates)
- {
- try
- {
- eh(this);
- }
- // Ignore exceptions
- catch{}
- }
- }
-
- private void OnWorkItemStartedCallback(WorkItem workItem)
+ public void EnqueueToSTPNextNWorkItem(int count)
{
- lock(_lock)
+ for (int i = 0; i < count; ++i)
{
- ++_workItemsExecutingInStp;
+ EnqueueToSTPNextWorkItem(null, false);
}
}
- private void OnWorkItemCompletedCallback(WorkItem workItem)
- {
- EnqueueToSTPNextWorkItem(null, true);
- }
-
- private void EnqueueToSTPNextWorkItem(WorkItem workItem)
+ private object FireOnIdle(object state)
+ {
+ FireOnIdleImpl(_onIdle);
+ return null;
+ }
+
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
+ {
+ if(null == onIdle)
+ {
+ return;
+ }
+
+ Delegate[] delegates = onIdle.GetInvocationList();
+ foreach(WorkItemsGroupIdleHandler eh in delegates)
+ {
+ try
+ {
+ eh(this);
+ }
+ catch { } // Suppress exceptions
+ }
+ }
+
+ private void OnWorkItemStartedCallback(WorkItem workItem)
+ {
+ lock(_lock)
+ {
+ ++_workItemsExecutingInStp;
+ }
+ }
+
+ private void OnWorkItemCompletedCallback(WorkItem workItem)
+ {
+ EnqueueToSTPNextWorkItem(null, true);
+ }
+
+ internal override void Enqueue(WorkItem workItem)
{
- EnqueueToSTPNextWorkItem(workItem, false);
+ EnqueueToSTPNextWorkItem(workItem);
}
- private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
- {
- lock(_lock)
- {
- // Got here from OnWorkItemCompletedCallback()
- if (decrementWorkItemsInStpQueue)
- {
- --_workItemsInStpQueue;
-
- if(_workItemsInStpQueue < 0)
- {
- _workItemsInStpQueue = 0;
- }
-
- --_workItemsExecutingInStp;
-
- if(_workItemsExecutingInStp < 0)
- {
- _workItemsExecutingInStp = 0;
- }
- }
-
- // If the work item is not null then enqueue it
- if (null != workItem)
- {
- workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
-
- RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
- _workItemsQueue.Enqueue(workItem);
- //_stp.IncrementWorkItemsCount();
-
- if ((1 == _workItemsQueue.Count) &&
- (0 == _workItemsInStpQueue))
- {
- _stp.RegisterWorkItemsGroup(this);
- Trace.WriteLine("WorkItemsGroup " + Name + " is NOT idle");
+ private void EnqueueToSTPNextWorkItem(WorkItem workItem)
+ {
+ EnqueueToSTPNextWorkItem(workItem, false);
+ }
+
+ private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
+ {
+ lock(_lock)
+ {
+ // Got here from OnWorkItemCompletedCallback()
+ if (decrementWorkItemsInStpQueue)
+ {
+ --_workItemsInStpQueue;
+
+ if(_workItemsInStpQueue < 0)
+ {
+ _workItemsInStpQueue = 0;
+ }
+
+ --_workItemsExecutingInStp;
+
+ if(_workItemsExecutingInStp < 0)
+ {
+ _workItemsExecutingInStp = 0;
+ }
+ }
+
+ // If the work item is not null then enqueue it
+ if (null != workItem)
+ {
+ workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
+
+ RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
+ _workItemsQueue.Enqueue(workItem);
+ //_stp.IncrementWorkItemsCount();
+
+ if ((1 == _workItemsQueue.Count) &&
+ (0 == _workItemsInStpQueue))
+ {
+ _stp.RegisterWorkItemsGroup(this);
+ IsIdle = false;
_isIdleWaitHandle.Reset();
- }
- }
-
- // If the work items queue of the group is empty than quit
- if (0 == _workItemsQueue.Count)
- {
- if (0 == _workItemsInStpQueue)
- {
- _stp.UnregisterWorkItemsGroup(this);
- Trace.WriteLine("WorkItemsGroup " + Name + " is idle");
+ }
+ }
+
+ // If the work items queue of the group is empty than quit
+ if (0 == _workItemsQueue.Count)
+ {
+ if (0 == _workItemsInStpQueue)
+ {
+ _stp.UnregisterWorkItemsGroup(this);
+ IsIdle = true;
_isIdleWaitHandle.Set();
- _stp.QueueWorkItem(new WorkItemCallback(this.FireOnIdle));
- }
- return;
- }
-
- if (!_workItemsGroupStartInfo.StartSuspended)
- {
- if (_workItemsInStpQueue < _concurrency)
- {
- WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
- _stp.Enqueue(nextWorkItem, true);
- ++_workItemsInStpQueue;
- }
- }
- }
- }
-
- #endregion
+ if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0)
+ {
+ _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle));
+ }
+ }
+ return;
+ }
+
+ if (!_isSuspended)
+ {
+ if (_workItemsInStpQueue < _concurrency)
+ {
+ WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
+ try
+ {
+ _stp.Enqueue(nextWorkItem);
+ }
+ catch (ObjectDisposedException e)
+ {
+ e.GetHashCode();
+ // The STP has been shutdown
+ }
+
+ ++_workItemsInStpQueue;
+ }
+ }
+ }
+ }
+
+ #endregion
}
- #endregion
+ #endregion
}
--
cgit v1.1