From 1a47ff8094ee414a47aebd310826906d89428a09 Mon Sep 17 00:00:00 2001 From: Teravus Ovares Date: Fri, 30 May 2008 12:27:06 +0000 Subject: * This is Melanie's XEngine script engine. I've not tested this real well, however, it's confirmed to compile and OpenSimulator to run successfully without this script engine active. --- ThirdParty/SmartThreadPool/WorkItemsGroup.cs | 512 +++++++++++++++++++++++++++ 1 file changed, 512 insertions(+) create mode 100644 ThirdParty/SmartThreadPool/WorkItemsGroup.cs (limited to 'ThirdParty/SmartThreadPool/WorkItemsGroup.cs') diff --git a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs new file mode 100644 index 0000000..01ac8dd --- /dev/null +++ b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs @@ -0,0 +1,512 @@ +// Ami Bar +// amibar@gmail.com + +using System; +using System.Threading; +using System.Runtime.CompilerServices; +using System.Diagnostics; + +namespace Amib.Threading.Internal +{ + #region WorkItemsGroup class + + /// + /// Summary description for WorkItemsGroup. + /// + public class WorkItemsGroup : IWorkItemsGroup + { + #region Private members + + private object _lock = new object(); + /// + /// Contains the name of this instance of SmartThreadPool. + /// Can be changed by the user. + /// + private string _name = "WorkItemsGroup"; + + /// + /// A reference to the SmartThreadPool instance that created this + /// WorkItemsGroup. + /// + private SmartThreadPool _stp; + + /// + /// The OnIdle event + /// + private event WorkItemsGroupIdleHandler _onIdle; + + /// + /// Defines how many work items of this WorkItemsGroup can run at once. + /// + private int _concurrency; + + /// + /// Priority queue to hold work items before they are passed + /// to the SmartThreadPool. + /// + private PriorityQueue _workItemsQueue; + + /// + /// Indicate how many work items are waiting in the SmartThreadPool + /// queue. + /// This value is used to apply the concurrency. + /// + private int _workItemsInStpQueue; + + /// + /// Indicate how many work items are currently running in the SmartThreadPool. + /// This value is used with the Cancel, to calculate if we can send new + /// work items to the STP. + /// + private int _workItemsExecutingInStp = 0; + + /// + /// WorkItemsGroup start information + /// + private WIGStartInfo _workItemsGroupStartInfo; + + /// + /// Signaled when all of the WorkItemsGroup's work item completed. + /// + private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); + + /// + /// A common object for all the work items that this work items group + /// generate so we can mark them to cancel in O(1) + /// + private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); + + #endregion + + #region Construction + + public WorkItemsGroup( + SmartThreadPool stp, + int concurrency, + WIGStartInfo wigStartInfo) + { + if (concurrency <= 0) + { + throw new ArgumentOutOfRangeException("concurrency", concurrency, "concurrency must be greater than zero"); + } + _stp = stp; + _concurrency = concurrency; + _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo); + _workItemsQueue = new PriorityQueue(); + + // The _workItemsInStpQueue gets the number of currently executing work items, + // because once a work item is executing, it cannot be cancelled. + _workItemsInStpQueue = _workItemsExecutingInStp; + } + + #endregion + + #region IWorkItemsGroup implementation + + /// + /// Get/Set the name of the SmartThreadPool instance + /// + public string Name + { + get + { + return _name; + } + + set + { + _name = value; + } + } + + /// + /// Queue a work item + /// + /// A callback to execute + /// Returns a work item result + public IWorkItemResult QueueWorkItem(WorkItemCallback callback) + { + WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback); + EnqueueToSTPNextWorkItem(workItem); + return workItem.GetWorkItemResult(); + } + + /// + /// Queue a work item + /// + /// A callback to execute + /// The priority of the work item + /// Returns a work item result + public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority) + { + WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, workItemPriority); + EnqueueToSTPNextWorkItem(workItem); + return workItem.GetWorkItemResult(); + } + + /// + /// Queue a work item + /// + /// Work item info + /// A callback to execute + /// Returns a work item result + public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback) + { + WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback); + EnqueueToSTPNextWorkItem(workItem); + return workItem.GetWorkItemResult(); + } + + /// + /// Queue a work item + /// + /// A callback to execute + /// + /// The context object of the work item. Used for passing arguments to the work item. + /// + /// Returns a work item result + public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state) + { + WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state); + EnqueueToSTPNextWorkItem(workItem); + return workItem.GetWorkItemResult(); + } + + /// + /// Queue a work item + /// + /// A callback to execute + /// + /// The context object of the work item. Used for passing arguments to the work item. + /// + /// The work item priority + /// Returns a work item result + public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority) + { + WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, workItemPriority); + EnqueueToSTPNextWorkItem(workItem); + return workItem.GetWorkItemResult(); + } + + /// + /// Queue a work item + /// + /// Work item information + /// A callback to execute + /// + /// The context object of the work item. Used for passing arguments to the work item. + /// + /// Returns a work item result + public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state) + { + WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback, state); + EnqueueToSTPNextWorkItem(workItem); + return workItem.GetWorkItemResult(); + } + + /// + /// Queue a work item + /// + /// A callback to execute + /// + /// The context object of the work item. Used for passing arguments to the work item. + /// + /// + /// A delegate to call after the callback completion + /// + /// Returns a work item result + public IWorkItemResult QueueWorkItem( + WorkItemCallback callback, + object state, + PostExecuteWorkItemCallback postExecuteWorkItemCallback) + { + WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback); + EnqueueToSTPNextWorkItem(workItem); + return workItem.GetWorkItemResult(); + } + + /// + /// Queue a work item + /// + /// A callback to execute + /// + /// The context object of the work item. Used for passing arguments to the work item. + /// + /// + /// A delegate to call after the callback completion + /// + /// The work item priority + /// Returns a work item result + public IWorkItemResult QueueWorkItem( + WorkItemCallback callback, + object state, + PostExecuteWorkItemCallback postExecuteWorkItemCallback, + WorkItemPriority workItemPriority) + { + WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority); + EnqueueToSTPNextWorkItem(workItem); + return workItem.GetWorkItemResult(); + } + + /// + /// Queue a work item + /// + /// A callback to execute + /// + /// The context object of the work item. Used for passing arguments to the work item. + /// + /// + /// A delegate to call after the callback completion + /// + /// Indicates on which cases to call to the post execute callback + /// Returns a work item result + public IWorkItemResult QueueWorkItem( + WorkItemCallback callback, + object state, + PostExecuteWorkItemCallback postExecuteWorkItemCallback, + CallToPostExecute callToPostExecute) + { + WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute); + EnqueueToSTPNextWorkItem(workItem); + return workItem.GetWorkItemResult(); + } + + /// + /// Queue a work item + /// + /// A callback to execute + /// + /// The context object of the work item. Used for passing arguments to the work item. + /// + /// + /// A delegate to call after the callback completion + /// + /// Indicates on which cases to call to the post execute callback + /// The work item priority + /// Returns a work item result + public IWorkItemResult QueueWorkItem( + WorkItemCallback callback, + object state, + PostExecuteWorkItemCallback postExecuteWorkItemCallback, + CallToPostExecute callToPostExecute, + WorkItemPriority workItemPriority) + { + WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority); + EnqueueToSTPNextWorkItem(workItem); + return workItem.GetWorkItemResult(); + } + + /// + /// Wait for the thread pool to be idle + /// + public void WaitForIdle() + { + WaitForIdle(Timeout.Infinite); + } + + /// + /// Wait for the thread pool to be idle + /// + public bool WaitForIdle(TimeSpan timeout) + { + return WaitForIdle((int)timeout.TotalMilliseconds); + } + + /// + /// Wait for the thread pool to be idle + /// + public bool WaitForIdle(int millisecondsTimeout) + { + _stp.ValidateWorkItemsGroupWaitForIdle(this); + return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false); + } + + public int WaitingCallbacks + { + get + { + return _workItemsQueue.Count; + } + } + + public event WorkItemsGroupIdleHandler OnIdle + { + add + { + _onIdle += value; + } + remove + { + _onIdle -= value; + } + } + + public void Cancel() + { + lock(_lock) + { + _canceledWorkItemsGroup.IsCanceled = true; + _workItemsQueue.Clear(); + _workItemsInStpQueue = 0; + _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); + } + } + + public void Start() + { + lock (this) + { + if (!_workItemsGroupStartInfo.StartSuspended) + { + return; + } + _workItemsGroupStartInfo.StartSuspended = false; + } + + for(int i = 0; i < _concurrency; ++i) + { + EnqueueToSTPNextWorkItem(null, false); + } + } + + #endregion + + #region Private methods + + private void RegisterToWorkItemCompletion(IWorkItemResult wir) + { + IInternalWorkItemResult iwir = wir as IInternalWorkItemResult; + iwir.OnWorkItemStarted += new WorkItemStateCallback(OnWorkItemStartedCallback); + iwir.OnWorkItemCompleted += new WorkItemStateCallback(OnWorkItemCompletedCallback); + } + + public void OnSTPIsStarting() + { + lock (this) + { + if (_workItemsGroupStartInfo.StartSuspended) + { + return; + } + } + + for(int i = 0; i < _concurrency; ++i) + { + EnqueueToSTPNextWorkItem(null, false); + } + } + + private object FireOnIdle(object state) + { + FireOnIdleImpl(_onIdle); + return null; + } + + [MethodImpl(MethodImplOptions.NoInlining)] + private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle) + { + if(null == onIdle) + { + return; + } + + Delegate[] delegates = onIdle.GetInvocationList(); + foreach(WorkItemsGroupIdleHandler eh in delegates) + { + try + { + eh(this); + } + // Ignore exceptions + catch{} + } + } + + private void OnWorkItemStartedCallback(WorkItem workItem) + { + lock(_lock) + { + ++_workItemsExecutingInStp; + } + } + + private void OnWorkItemCompletedCallback(WorkItem workItem) + { + EnqueueToSTPNextWorkItem(null, true); + } + + private void EnqueueToSTPNextWorkItem(WorkItem workItem) + { + EnqueueToSTPNextWorkItem(workItem, false); + } + + private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue) + { + lock(_lock) + { + // Got here from OnWorkItemCompletedCallback() + if (decrementWorkItemsInStpQueue) + { + --_workItemsInStpQueue; + + if(_workItemsInStpQueue < 0) + { + _workItemsInStpQueue = 0; + } + + --_workItemsExecutingInStp; + + if(_workItemsExecutingInStp < 0) + { + _workItemsExecutingInStp = 0; + } + } + + // If the work item is not null then enqueue it + if (null != workItem) + { + workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup; + + RegisterToWorkItemCompletion(workItem.GetWorkItemResult()); + _workItemsQueue.Enqueue(workItem); + //_stp.IncrementWorkItemsCount(); + + if ((1 == _workItemsQueue.Count) && + (0 == _workItemsInStpQueue)) + { + _stp.RegisterWorkItemsGroup(this); + Trace.WriteLine("WorkItemsGroup " + Name + " is NOT idle"); + _isIdleWaitHandle.Reset(); + } + } + + // If the work items queue of the group is empty than quit + if (0 == _workItemsQueue.Count) + { + if (0 == _workItemsInStpQueue) + { + _stp.UnregisterWorkItemsGroup(this); + Trace.WriteLine("WorkItemsGroup " + Name + " is idle"); + _isIdleWaitHandle.Set(); + _stp.QueueWorkItem(new WorkItemCallback(this.FireOnIdle)); + } + return; + } + + if (!_workItemsGroupStartInfo.StartSuspended) + { + if (_workItemsInStpQueue < _concurrency) + { + WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem; + _stp.Enqueue(nextWorkItem, true); + ++_workItemsInStpQueue; + } + } + } + } + + #endregion + } + + #endregion +} -- cgit v1.1