// Ami Bar
// amibar@gmail.com
using System;
using System.Threading;
using System.Runtime.CompilerServices;
using System.Diagnostics;
namespace Amib.Threading.Internal
{
#region WorkItemsGroup class
///
/// Summary description for WorkItemsGroup.
///
public class WorkItemsGroup : IWorkItemsGroup
{
#region Private members
private object _lock = new object();
///
/// Contains the name of this instance of SmartThreadPool.
/// Can be changed by the user.
///
private string _name = "WorkItemsGroup";
///
/// A reference to the SmartThreadPool instance that created this
/// WorkItemsGroup.
///
private SmartThreadPool _stp;
///
/// The OnIdle event
///
private event WorkItemsGroupIdleHandler _onIdle;
///
/// Defines how many work items of this WorkItemsGroup can run at once.
///
private int _concurrency;
///
/// Priority queue to hold work items before they are passed
/// to the SmartThreadPool.
///
private PriorityQueue _workItemsQueue;
///
/// Indicate how many work items are waiting in the SmartThreadPool
/// queue.
/// This value is used to apply the concurrency.
///
private int _workItemsInStpQueue;
///
/// Indicate how many work items are currently running in the SmartThreadPool.
/// This value is used with the Cancel, to calculate if we can send new
/// work items to the STP.
///
private int _workItemsExecutingInStp = 0;
///
/// WorkItemsGroup start information
///
private WIGStartInfo _workItemsGroupStartInfo;
///
/// Signaled when all of the WorkItemsGroup's work item completed.
///
private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
///
/// A common object for all the work items that this work items group
/// generate so we can mark them to cancel in O(1)
///
private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
#endregion
#region Construction
public WorkItemsGroup(
SmartThreadPool stp,
int concurrency,
WIGStartInfo wigStartInfo)
{
if (concurrency <= 0)
{
throw new ArgumentOutOfRangeException("concurrency", concurrency, "concurrency must be greater than zero");
}
_stp = stp;
_concurrency = concurrency;
_workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo);
_workItemsQueue = new PriorityQueue();
// The _workItemsInStpQueue gets the number of currently executing work items,
// because once a work item is executing, it cannot be cancelled.
_workItemsInStpQueue = _workItemsExecutingInStp;
}
#endregion
#region IWorkItemsGroup implementation
///
/// Get/Set the name of the SmartThreadPool instance
///
public string Name
{
get
{
return _name;
}
set
{
_name = value;
}
}
///
/// Queue a work item
///
/// A callback to execute
/// Returns a work item result
public IWorkItemResult QueueWorkItem(WorkItemCallback callback)
{
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback);
EnqueueToSTPNextWorkItem(workItem);
return workItem.GetWorkItemResult();
}
///
/// Queue a work item
///
/// A callback to execute
/// The priority of the work item
/// Returns a work item result
public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority)
{
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, workItemPriority);
EnqueueToSTPNextWorkItem(workItem);
return workItem.GetWorkItemResult();
}
///
/// Queue a work item
///
/// Work item info
/// A callback to execute
/// Returns a work item result
public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback)
{
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback);
EnqueueToSTPNextWorkItem(workItem);
return workItem.GetWorkItemResult();
}
///
/// Queue a work item
///
/// A callback to execute
///
/// The context object of the work item. Used for passing arguments to the work item.
///
/// Returns a work item result
public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state)
{
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state);
EnqueueToSTPNextWorkItem(workItem);
return workItem.GetWorkItemResult();
}
///
/// Queue a work item
///
/// A callback to execute
///
/// The context object of the work item. Used for passing arguments to the work item.
///
/// The work item priority
/// Returns a work item result
public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority)
{
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, workItemPriority);
EnqueueToSTPNextWorkItem(workItem);
return workItem.GetWorkItemResult();
}
///
/// Queue a work item
///
/// Work item information
/// A callback to execute
///
/// The context object of the work item. Used for passing arguments to the work item.
///
/// Returns a work item result
public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state)
{
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback, state);
EnqueueToSTPNextWorkItem(workItem);
return workItem.GetWorkItemResult();
}
///
/// Queue a work item
///
/// A callback to execute
///
/// The context object of the work item. Used for passing arguments to the work item.
///
///
/// A delegate to call after the callback completion
///
/// Returns a work item result
public IWorkItemResult QueueWorkItem(
WorkItemCallback callback,
object state,
PostExecuteWorkItemCallback postExecuteWorkItemCallback)
{
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback);
EnqueueToSTPNextWorkItem(workItem);
return workItem.GetWorkItemResult();
}
///
/// Queue a work item
///
/// A callback to execute
///
/// The context object of the work item. Used for passing arguments to the work item.
///
///
/// A delegate to call after the callback completion
///
/// The work item priority
/// Returns a work item result
public IWorkItemResult QueueWorkItem(
WorkItemCallback callback,
object state,
PostExecuteWorkItemCallback postExecuteWorkItemCallback,
WorkItemPriority workItemPriority)
{
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority);
EnqueueToSTPNextWorkItem(workItem);
return workItem.GetWorkItemResult();
}
///
/// Queue a work item
///
/// A callback to execute
///
/// The context object of the work item. Used for passing arguments to the work item.
///
///
/// A delegate to call after the callback completion
///
/// Indicates on which cases to call to the post execute callback
/// Returns a work item result
public IWorkItemResult QueueWorkItem(
WorkItemCallback callback,
object state,
PostExecuteWorkItemCallback postExecuteWorkItemCallback,
CallToPostExecute callToPostExecute)
{
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute);
EnqueueToSTPNextWorkItem(workItem);
return workItem.GetWorkItemResult();
}
///
/// Queue a work item
///
/// A callback to execute
///
/// The context object of the work item. Used for passing arguments to the work item.
///
///
/// A delegate to call after the callback completion
///
/// Indicates on which cases to call to the post execute callback
/// The work item priority
/// Returns a work item result
public IWorkItemResult QueueWorkItem(
WorkItemCallback callback,
object state,
PostExecuteWorkItemCallback postExecuteWorkItemCallback,
CallToPostExecute callToPostExecute,
WorkItemPriority workItemPriority)
{
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority);
EnqueueToSTPNextWorkItem(workItem);
return workItem.GetWorkItemResult();
}
///
/// Wait for the thread pool to be idle
///
public void WaitForIdle()
{
WaitForIdle(Timeout.Infinite);
}
///
/// Wait for the thread pool to be idle
///
public bool WaitForIdle(TimeSpan timeout)
{
return WaitForIdle((int)timeout.TotalMilliseconds);
}
///
/// Wait for the thread pool to be idle
///
public bool WaitForIdle(int millisecondsTimeout)
{
_stp.ValidateWorkItemsGroupWaitForIdle(this);
return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false);
}
public int WaitingCallbacks
{
get
{
return _workItemsQueue.Count;
}
}
public event WorkItemsGroupIdleHandler OnIdle
{
add
{
_onIdle += value;
}
remove
{
_onIdle -= value;
}
}
public void Cancel()
{
lock(_lock)
{
_canceledWorkItemsGroup.IsCanceled = true;
_workItemsQueue.Clear();
_workItemsInStpQueue = 0;
_canceledWorkItemsGroup = new CanceledWorkItemsGroup();
}
}
public void Start()
{
lock (this)
{
if (!_workItemsGroupStartInfo.StartSuspended)
{
return;
}
_workItemsGroupStartInfo.StartSuspended = false;
}
for(int i = 0; i < _concurrency; ++i)
{
EnqueueToSTPNextWorkItem(null, false);
}
}
#endregion
#region Private methods
private void RegisterToWorkItemCompletion(IWorkItemResult wir)
{
IInternalWorkItemResult iwir = wir as IInternalWorkItemResult;
iwir.OnWorkItemStarted += new WorkItemStateCallback(OnWorkItemStartedCallback);
iwir.OnWorkItemCompleted += new WorkItemStateCallback(OnWorkItemCompletedCallback);
}
public void OnSTPIsStarting()
{
lock (this)
{
if (_workItemsGroupStartInfo.StartSuspended)
{
return;
}
}
for(int i = 0; i < _concurrency; ++i)
{
EnqueueToSTPNextWorkItem(null, false);
}
}
private object FireOnIdle(object state)
{
FireOnIdleImpl(_onIdle);
return null;
}
[MethodImpl(MethodImplOptions.NoInlining)]
private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
{
if(null == onIdle)
{
return;
}
Delegate[] delegates = onIdle.GetInvocationList();
foreach(WorkItemsGroupIdleHandler eh in delegates)
{
try
{
eh(this);
}
// Ignore exceptions
catch{}
}
}
private void OnWorkItemStartedCallback(WorkItem workItem)
{
lock(_lock)
{
++_workItemsExecutingInStp;
}
}
private void OnWorkItemCompletedCallback(WorkItem workItem)
{
EnqueueToSTPNextWorkItem(null, true);
}
private void EnqueueToSTPNextWorkItem(WorkItem workItem)
{
EnqueueToSTPNextWorkItem(workItem, false);
}
private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
{
lock(_lock)
{
// Got here from OnWorkItemCompletedCallback()
if (decrementWorkItemsInStpQueue)
{
--_workItemsInStpQueue;
if(_workItemsInStpQueue < 0)
{
_workItemsInStpQueue = 0;
}
--_workItemsExecutingInStp;
if(_workItemsExecutingInStp < 0)
{
_workItemsExecutingInStp = 0;
}
}
// If the work item is not null then enqueue it
if (null != workItem)
{
workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
_workItemsQueue.Enqueue(workItem);
//_stp.IncrementWorkItemsCount();
if ((1 == _workItemsQueue.Count) &&
(0 == _workItemsInStpQueue))
{
_stp.RegisterWorkItemsGroup(this);
Trace.WriteLine("WorkItemsGroup " + Name + " is NOT idle");
_isIdleWaitHandle.Reset();
}
}
// If the work items queue of the group is empty than quit
if (0 == _workItemsQueue.Count)
{
if (0 == _workItemsInStpQueue)
{
_stp.UnregisterWorkItemsGroup(this);
Trace.WriteLine("WorkItemsGroup " + Name + " is idle");
_isIdleWaitHandle.Set();
_stp.QueueWorkItem(new WorkItemCallback(this.FireOnIdle));
}
return;
}
if (!_workItemsGroupStartInfo.StartSuspended)
{
if (_workItemsInStpQueue < _concurrency)
{
WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
_stp.Enqueue(nextWorkItem, true);
++_workItemsInStpQueue;
}
}
}
}
#endregion
}
#endregion
}