From 1a47ff8094ee414a47aebd310826906d89428a09 Mon Sep 17 00:00:00 2001
From: Teravus Ovares
Date: Fri, 30 May 2008 12:27:06 +0000
Subject: * This is Melanie's XEngine script engine. I've not tested this real
well, however, it's confirmed to compile and OpenSimulator to run
successfully without this script engine active.
---
ThirdParty/SmartThreadPool/WorkItemsGroup.cs | 512 +++++++++++++++++++++++++++
1 file changed, 512 insertions(+)
create mode 100644 ThirdParty/SmartThreadPool/WorkItemsGroup.cs
(limited to 'ThirdParty/SmartThreadPool/WorkItemsGroup.cs')
diff --git a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
new file mode 100644
index 0000000..01ac8dd
--- /dev/null
+++ b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
@@ -0,0 +1,512 @@
+// Ami Bar
+// amibar@gmail.com
+
+using System;
+using System.Threading;
+using System.Runtime.CompilerServices;
+using System.Diagnostics;
+
+namespace Amib.Threading.Internal
+{
+ #region WorkItemsGroup class
+
+ ///
+ /// Summary description for WorkItemsGroup.
+ ///
+ public class WorkItemsGroup : IWorkItemsGroup
+ {
+ #region Private members
+
+ private object _lock = new object();
+ ///
+ /// Contains the name of this instance of SmartThreadPool.
+ /// Can be changed by the user.
+ ///
+ private string _name = "WorkItemsGroup";
+
+ ///
+ /// A reference to the SmartThreadPool instance that created this
+ /// WorkItemsGroup.
+ ///
+ private SmartThreadPool _stp;
+
+ ///
+ /// The OnIdle event
+ ///
+ private event WorkItemsGroupIdleHandler _onIdle;
+
+ ///
+ /// Defines how many work items of this WorkItemsGroup can run at once.
+ ///
+ private int _concurrency;
+
+ ///
+ /// Priority queue to hold work items before they are passed
+ /// to the SmartThreadPool.
+ ///
+ private PriorityQueue _workItemsQueue;
+
+ ///
+ /// Indicate how many work items are waiting in the SmartThreadPool
+ /// queue.
+ /// This value is used to apply the concurrency.
+ ///
+ private int _workItemsInStpQueue;
+
+ ///
+ /// Indicate how many work items are currently running in the SmartThreadPool.
+ /// This value is used with the Cancel, to calculate if we can send new
+ /// work items to the STP.
+ ///
+ private int _workItemsExecutingInStp = 0;
+
+ ///
+ /// WorkItemsGroup start information
+ ///
+ private WIGStartInfo _workItemsGroupStartInfo;
+
+ ///
+ /// Signaled when all of the WorkItemsGroup's work item completed.
+ ///
+ private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
+
+ ///
+ /// A common object for all the work items that this work items group
+ /// generate so we can mark them to cancel in O(1)
+ ///
+ private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
+
+ #endregion
+
+ #region Construction
+
+ public WorkItemsGroup(
+ SmartThreadPool stp,
+ int concurrency,
+ WIGStartInfo wigStartInfo)
+ {
+ if (concurrency <= 0)
+ {
+ throw new ArgumentOutOfRangeException("concurrency", concurrency, "concurrency must be greater than zero");
+ }
+ _stp = stp;
+ _concurrency = concurrency;
+ _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo);
+ _workItemsQueue = new PriorityQueue();
+
+ // The _workItemsInStpQueue gets the number of currently executing work items,
+ // because once a work item is executing, it cannot be cancelled.
+ _workItemsInStpQueue = _workItemsExecutingInStp;
+ }
+
+ #endregion
+
+ #region IWorkItemsGroup implementation
+
+ ///
+ /// Get/Set the name of the SmartThreadPool instance
+ ///
+ public string Name
+ {
+ get
+ {
+ return _name;
+ }
+
+ set
+ {
+ _name = value;
+ }
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(WorkItemCallback callback)
+ {
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback);
+ EnqueueToSTPNextWorkItem(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ /// The priority of the work item
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority)
+ {
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, workItemPriority);
+ EnqueueToSTPNextWorkItem(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// Work item info
+ /// A callback to execute
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback)
+ {
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback);
+ EnqueueToSTPNextWorkItem(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ ///
+ /// The context object of the work item. Used for passing arguments to the work item.
+ ///
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state)
+ {
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state);
+ EnqueueToSTPNextWorkItem(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ ///
+ /// The context object of the work item. Used for passing arguments to the work item.
+ ///
+ /// The work item priority
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority)
+ {
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, workItemPriority);
+ EnqueueToSTPNextWorkItem(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// Work item information
+ /// A callback to execute
+ ///
+ /// The context object of the work item. Used for passing arguments to the work item.
+ ///
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state)
+ {
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback, state);
+ EnqueueToSTPNextWorkItem(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ ///
+ /// The context object of the work item. Used for passing arguments to the work item.
+ ///
+ ///
+ /// A delegate to call after the callback completion
+ ///
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(
+ WorkItemCallback callback,
+ object state,
+ PostExecuteWorkItemCallback postExecuteWorkItemCallback)
+ {
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback);
+ EnqueueToSTPNextWorkItem(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ ///
+ /// The context object of the work item. Used for passing arguments to the work item.
+ ///
+ ///
+ /// A delegate to call after the callback completion
+ ///
+ /// The work item priority
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(
+ WorkItemCallback callback,
+ object state,
+ PostExecuteWorkItemCallback postExecuteWorkItemCallback,
+ WorkItemPriority workItemPriority)
+ {
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority);
+ EnqueueToSTPNextWorkItem(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ ///
+ /// The context object of the work item. Used for passing arguments to the work item.
+ ///
+ ///
+ /// A delegate to call after the callback completion
+ ///
+ /// Indicates on which cases to call to the post execute callback
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(
+ WorkItemCallback callback,
+ object state,
+ PostExecuteWorkItemCallback postExecuteWorkItemCallback,
+ CallToPostExecute callToPostExecute)
+ {
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute);
+ EnqueueToSTPNextWorkItem(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ ///
+ /// The context object of the work item. Used for passing arguments to the work item.
+ ///
+ ///
+ /// A delegate to call after the callback completion
+ ///
+ /// Indicates on which cases to call to the post execute callback
+ /// The work item priority
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(
+ WorkItemCallback callback,
+ object state,
+ PostExecuteWorkItemCallback postExecuteWorkItemCallback,
+ CallToPostExecute callToPostExecute,
+ WorkItemPriority workItemPriority)
+ {
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority);
+ EnqueueToSTPNextWorkItem(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Wait for the thread pool to be idle
+ ///
+ public void WaitForIdle()
+ {
+ WaitForIdle(Timeout.Infinite);
+ }
+
+ ///
+ /// Wait for the thread pool to be idle
+ ///
+ public bool WaitForIdle(TimeSpan timeout)
+ {
+ return WaitForIdle((int)timeout.TotalMilliseconds);
+ }
+
+ ///
+ /// Wait for the thread pool to be idle
+ ///
+ public bool WaitForIdle(int millisecondsTimeout)
+ {
+ _stp.ValidateWorkItemsGroupWaitForIdle(this);
+ return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false);
+ }
+
+ public int WaitingCallbacks
+ {
+ get
+ {
+ return _workItemsQueue.Count;
+ }
+ }
+
+ public event WorkItemsGroupIdleHandler OnIdle
+ {
+ add
+ {
+ _onIdle += value;
+ }
+ remove
+ {
+ _onIdle -= value;
+ }
+ }
+
+ public void Cancel()
+ {
+ lock(_lock)
+ {
+ _canceledWorkItemsGroup.IsCanceled = true;
+ _workItemsQueue.Clear();
+ _workItemsInStpQueue = 0;
+ _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
+ }
+ }
+
+ public void Start()
+ {
+ lock (this)
+ {
+ if (!_workItemsGroupStartInfo.StartSuspended)
+ {
+ return;
+ }
+ _workItemsGroupStartInfo.StartSuspended = false;
+ }
+
+ for(int i = 0; i < _concurrency; ++i)
+ {
+ EnqueueToSTPNextWorkItem(null, false);
+ }
+ }
+
+ #endregion
+
+ #region Private methods
+
+ private void RegisterToWorkItemCompletion(IWorkItemResult wir)
+ {
+ IInternalWorkItemResult iwir = wir as IInternalWorkItemResult;
+ iwir.OnWorkItemStarted += new WorkItemStateCallback(OnWorkItemStartedCallback);
+ iwir.OnWorkItemCompleted += new WorkItemStateCallback(OnWorkItemCompletedCallback);
+ }
+
+ public void OnSTPIsStarting()
+ {
+ lock (this)
+ {
+ if (_workItemsGroupStartInfo.StartSuspended)
+ {
+ return;
+ }
+ }
+
+ for(int i = 0; i < _concurrency; ++i)
+ {
+ EnqueueToSTPNextWorkItem(null, false);
+ }
+ }
+
+ private object FireOnIdle(object state)
+ {
+ FireOnIdleImpl(_onIdle);
+ return null;
+ }
+
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
+ {
+ if(null == onIdle)
+ {
+ return;
+ }
+
+ Delegate[] delegates = onIdle.GetInvocationList();
+ foreach(WorkItemsGroupIdleHandler eh in delegates)
+ {
+ try
+ {
+ eh(this);
+ }
+ // Ignore exceptions
+ catch{}
+ }
+ }
+
+ private void OnWorkItemStartedCallback(WorkItem workItem)
+ {
+ lock(_lock)
+ {
+ ++_workItemsExecutingInStp;
+ }
+ }
+
+ private void OnWorkItemCompletedCallback(WorkItem workItem)
+ {
+ EnqueueToSTPNextWorkItem(null, true);
+ }
+
+ private void EnqueueToSTPNextWorkItem(WorkItem workItem)
+ {
+ EnqueueToSTPNextWorkItem(workItem, false);
+ }
+
+ private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
+ {
+ lock(_lock)
+ {
+ // Got here from OnWorkItemCompletedCallback()
+ if (decrementWorkItemsInStpQueue)
+ {
+ --_workItemsInStpQueue;
+
+ if(_workItemsInStpQueue < 0)
+ {
+ _workItemsInStpQueue = 0;
+ }
+
+ --_workItemsExecutingInStp;
+
+ if(_workItemsExecutingInStp < 0)
+ {
+ _workItemsExecutingInStp = 0;
+ }
+ }
+
+ // If the work item is not null then enqueue it
+ if (null != workItem)
+ {
+ workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
+
+ RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
+ _workItemsQueue.Enqueue(workItem);
+ //_stp.IncrementWorkItemsCount();
+
+ if ((1 == _workItemsQueue.Count) &&
+ (0 == _workItemsInStpQueue))
+ {
+ _stp.RegisterWorkItemsGroup(this);
+ Trace.WriteLine("WorkItemsGroup " + Name + " is NOT idle");
+ _isIdleWaitHandle.Reset();
+ }
+ }
+
+ // If the work items queue of the group is empty than quit
+ if (0 == _workItemsQueue.Count)
+ {
+ if (0 == _workItemsInStpQueue)
+ {
+ _stp.UnregisterWorkItemsGroup(this);
+ Trace.WriteLine("WorkItemsGroup " + Name + " is idle");
+ _isIdleWaitHandle.Set();
+ _stp.QueueWorkItem(new WorkItemCallback(this.FireOnIdle));
+ }
+ return;
+ }
+
+ if (!_workItemsGroupStartInfo.StartSuspended)
+ {
+ if (_workItemsInStpQueue < _concurrency)
+ {
+ WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
+ _stp.Enqueue(nextWorkItem, true);
+ ++_workItemsInStpQueue;
+ }
+ }
+ }
+ }
+
+ #endregion
+ }
+
+ #endregion
+}
--
cgit v1.1