From 854dcd1abddc3eef33da953592deb61133e5e7ed Mon Sep 17 00:00:00 2001 From: Justin Clark-Casey (justincc) Date: Wed, 1 May 2013 23:00:46 +0100 Subject: Fix SmartThreadPool line endings in recent update from dos to unix --- ThirdParty/SmartThreadPool/WorkItemsGroup.cs | 722 +++++++++++++-------------- 1 file changed, 361 insertions(+), 361 deletions(-) (limited to 'ThirdParty/SmartThreadPool/WorkItemsGroup.cs') diff --git a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs index 67dcbdd..d9d34ac 100644 --- a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs +++ b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs @@ -1,361 +1,361 @@ -using System; -using System.Threading; -using System.Runtime.CompilerServices; -using System.Diagnostics; - -namespace Amib.Threading.Internal -{ - - #region WorkItemsGroup class - - /// - /// Summary description for WorkItemsGroup. - /// - public class WorkItemsGroup : WorkItemsGroupBase - { - #region Private members - - private readonly object _lock = new object(); - - /// - /// A reference to the SmartThreadPool instance that created this - /// WorkItemsGroup. - /// - private readonly SmartThreadPool _stp; - - /// - /// The OnIdle event - /// - private event WorkItemsGroupIdleHandler _onIdle; - - /// - /// A flag to indicate if the Work Items Group is now suspended. - /// - private bool _isSuspended; - - /// - /// Defines how many work items of this WorkItemsGroup can run at once. - /// - private int _concurrency; - - /// - /// Priority queue to hold work items before they are passed - /// to the SmartThreadPool. - /// - private readonly PriorityQueue _workItemsQueue; - - /// - /// Indicate how many work items are waiting in the SmartThreadPool - /// queue. - /// This value is used to apply the concurrency. - /// - private int _workItemsInStpQueue; - - /// - /// Indicate how many work items are currently running in the SmartThreadPool. - /// This value is used with the Cancel, to calculate if we can send new - /// work items to the STP. - /// - private int _workItemsExecutingInStp = 0; - - /// - /// WorkItemsGroup start information - /// - private readonly WIGStartInfo _workItemsGroupStartInfo; - - /// - /// Signaled when all of the WorkItemsGroup's work item completed. - /// - //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); - private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true); - - /// - /// A common object for all the work items that this work items group - /// generate so we can mark them to cancel in O(1) - /// - private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); - - #endregion - - #region Construction - - public WorkItemsGroup( - SmartThreadPool stp, - int concurrency, - WIGStartInfo wigStartInfo) - { - if (concurrency <= 0) - { - throw new ArgumentOutOfRangeException( - "concurrency", -#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE) - concurrency, -#endif - "concurrency must be greater than zero"); - } - _stp = stp; - _concurrency = concurrency; - _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly(); - _workItemsQueue = new PriorityQueue(); - Name = "WorkItemsGroup"; - - // The _workItemsInStpQueue gets the number of currently executing work items, - // because once a work item is executing, it cannot be cancelled. - _workItemsInStpQueue = _workItemsExecutingInStp; - - _isSuspended = _workItemsGroupStartInfo.StartSuspended; - } - - #endregion - - #region WorkItemsGroupBase Overrides - - public override int Concurrency - { - get { return _concurrency; } - set - { - Debug.Assert(value > 0); - - int diff = value - _concurrency; - _concurrency = value; - if (diff > 0) - { - EnqueueToSTPNextNWorkItem(diff); - } - } - } - - public override int WaitingCallbacks - { - get { return _workItemsQueue.Count; } - } - - public override object[] GetStates() - { - lock (_lock) - { - object[] states = new object[_workItemsQueue.Count]; - int i = 0; - foreach (WorkItem workItem in _workItemsQueue) - { - states[i] = workItem.GetWorkItemResult().State; - ++i; - } - return states; - } - } - - /// - /// WorkItemsGroup start information - /// - public override WIGStartInfo WIGStartInfo - { - get { return _workItemsGroupStartInfo; } - } - - /// - /// Start the Work Items Group if it was started suspended - /// - public override void Start() - { - // If the Work Items Group already started then quit - if (!_isSuspended) - { - return; - } - _isSuspended = false; - - EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency)); - } - - public override void Cancel(bool abortExecution) - { - lock (_lock) - { - _canceledWorkItemsGroup.IsCanceled = true; - _workItemsQueue.Clear(); - _workItemsInStpQueue = 0; - _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); - } - - if (abortExecution) - { - _stp.CancelAbortWorkItemsGroup(this); - } - } - - /// - /// Wait for the thread pool to be idle - /// - public override bool WaitForIdle(int millisecondsTimeout) - { - SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this); - return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false); - } - - public override event WorkItemsGroupIdleHandler OnIdle - { - add { _onIdle += value; } - remove { _onIdle -= value; } - } - - #endregion - - #region Private methods - - private void RegisterToWorkItemCompletion(IWorkItemResult wir) - { - IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir; - iwir.OnWorkItemStarted += OnWorkItemStartedCallback; - iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback; - } - - public void OnSTPIsStarting() - { - if (_isSuspended) - { - return; - } - - EnqueueToSTPNextNWorkItem(_concurrency); - } - - public void EnqueueToSTPNextNWorkItem(int count) - { - for (int i = 0; i < count; ++i) - { - EnqueueToSTPNextWorkItem(null, false); - } - } - - private object FireOnIdle(object state) - { - FireOnIdleImpl(_onIdle); - return null; - } - - [MethodImpl(MethodImplOptions.NoInlining)] - private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle) - { - if(null == onIdle) - { - return; - } - - Delegate[] delegates = onIdle.GetInvocationList(); - foreach(WorkItemsGroupIdleHandler eh in delegates) - { - try - { - eh(this); - } - catch { } // Suppress exceptions - } - } - - private void OnWorkItemStartedCallback(WorkItem workItem) - { - lock(_lock) - { - ++_workItemsExecutingInStp; - } - } - - private void OnWorkItemCompletedCallback(WorkItem workItem) - { - EnqueueToSTPNextWorkItem(null, true); - } - - internal override void Enqueue(WorkItem workItem) - { - EnqueueToSTPNextWorkItem(workItem); - } - - private void EnqueueToSTPNextWorkItem(WorkItem workItem) - { - EnqueueToSTPNextWorkItem(workItem, false); - } - - private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue) - { - lock(_lock) - { - // Got here from OnWorkItemCompletedCallback() - if (decrementWorkItemsInStpQueue) - { - --_workItemsInStpQueue; - - if(_workItemsInStpQueue < 0) - { - _workItemsInStpQueue = 0; - } - - --_workItemsExecutingInStp; - - if(_workItemsExecutingInStp < 0) - { - _workItemsExecutingInStp = 0; - } - } - - // If the work item is not null then enqueue it - if (null != workItem) - { - workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup; - - RegisterToWorkItemCompletion(workItem.GetWorkItemResult()); - _workItemsQueue.Enqueue(workItem); - //_stp.IncrementWorkItemsCount(); - - if ((1 == _workItemsQueue.Count) && - (0 == _workItemsInStpQueue)) - { - _stp.RegisterWorkItemsGroup(this); - IsIdle = false; - _isIdleWaitHandle.Reset(); - } - } - - // If the work items queue of the group is empty than quit - if (0 == _workItemsQueue.Count) - { - if (0 == _workItemsInStpQueue) - { - _stp.UnregisterWorkItemsGroup(this); - IsIdle = true; - _isIdleWaitHandle.Set(); - if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0) - { - _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle)); - } - } - return; - } - - if (!_isSuspended) - { - if (_workItemsInStpQueue < _concurrency) - { - WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem; - try - { - _stp.Enqueue(nextWorkItem); - } - catch (ObjectDisposedException e) - { - e.GetHashCode(); - // The STP has been shutdown - } - - ++_workItemsInStpQueue; - } - } - } - } - - #endregion - } - - #endregion -} +using System; +using System.Threading; +using System.Runtime.CompilerServices; +using System.Diagnostics; + +namespace Amib.Threading.Internal +{ + + #region WorkItemsGroup class + + /// + /// Summary description for WorkItemsGroup. + /// + public class WorkItemsGroup : WorkItemsGroupBase + { + #region Private members + + private readonly object _lock = new object(); + + /// + /// A reference to the SmartThreadPool instance that created this + /// WorkItemsGroup. + /// + private readonly SmartThreadPool _stp; + + /// + /// The OnIdle event + /// + private event WorkItemsGroupIdleHandler _onIdle; + + /// + /// A flag to indicate if the Work Items Group is now suspended. + /// + private bool _isSuspended; + + /// + /// Defines how many work items of this WorkItemsGroup can run at once. + /// + private int _concurrency; + + /// + /// Priority queue to hold work items before they are passed + /// to the SmartThreadPool. + /// + private readonly PriorityQueue _workItemsQueue; + + /// + /// Indicate how many work items are waiting in the SmartThreadPool + /// queue. + /// This value is used to apply the concurrency. + /// + private int _workItemsInStpQueue; + + /// + /// Indicate how many work items are currently running in the SmartThreadPool. + /// This value is used with the Cancel, to calculate if we can send new + /// work items to the STP. + /// + private int _workItemsExecutingInStp = 0; + + /// + /// WorkItemsGroup start information + /// + private readonly WIGStartInfo _workItemsGroupStartInfo; + + /// + /// Signaled when all of the WorkItemsGroup's work item completed. + /// + //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); + private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true); + + /// + /// A common object for all the work items that this work items group + /// generate so we can mark them to cancel in O(1) + /// + private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); + + #endregion + + #region Construction + + public WorkItemsGroup( + SmartThreadPool stp, + int concurrency, + WIGStartInfo wigStartInfo) + { + if (concurrency <= 0) + { + throw new ArgumentOutOfRangeException( + "concurrency", +#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE) + concurrency, +#endif + "concurrency must be greater than zero"); + } + _stp = stp; + _concurrency = concurrency; + _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly(); + _workItemsQueue = new PriorityQueue(); + Name = "WorkItemsGroup"; + + // The _workItemsInStpQueue gets the number of currently executing work items, + // because once a work item is executing, it cannot be cancelled. + _workItemsInStpQueue = _workItemsExecutingInStp; + + _isSuspended = _workItemsGroupStartInfo.StartSuspended; + } + + #endregion + + #region WorkItemsGroupBase Overrides + + public override int Concurrency + { + get { return _concurrency; } + set + { + Debug.Assert(value > 0); + + int diff = value - _concurrency; + _concurrency = value; + if (diff > 0) + { + EnqueueToSTPNextNWorkItem(diff); + } + } + } + + public override int WaitingCallbacks + { + get { return _workItemsQueue.Count; } + } + + public override object[] GetStates() + { + lock (_lock) + { + object[] states = new object[_workItemsQueue.Count]; + int i = 0; + foreach (WorkItem workItem in _workItemsQueue) + { + states[i] = workItem.GetWorkItemResult().State; + ++i; + } + return states; + } + } + + /// + /// WorkItemsGroup start information + /// + public override WIGStartInfo WIGStartInfo + { + get { return _workItemsGroupStartInfo; } + } + + /// + /// Start the Work Items Group if it was started suspended + /// + public override void Start() + { + // If the Work Items Group already started then quit + if (!_isSuspended) + { + return; + } + _isSuspended = false; + + EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency)); + } + + public override void Cancel(bool abortExecution) + { + lock (_lock) + { + _canceledWorkItemsGroup.IsCanceled = true; + _workItemsQueue.Clear(); + _workItemsInStpQueue = 0; + _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); + } + + if (abortExecution) + { + _stp.CancelAbortWorkItemsGroup(this); + } + } + + /// + /// Wait for the thread pool to be idle + /// + public override bool WaitForIdle(int millisecondsTimeout) + { + SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this); + return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false); + } + + public override event WorkItemsGroupIdleHandler OnIdle + { + add { _onIdle += value; } + remove { _onIdle -= value; } + } + + #endregion + + #region Private methods + + private void RegisterToWorkItemCompletion(IWorkItemResult wir) + { + IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir; + iwir.OnWorkItemStarted += OnWorkItemStartedCallback; + iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback; + } + + public void OnSTPIsStarting() + { + if (_isSuspended) + { + return; + } + + EnqueueToSTPNextNWorkItem(_concurrency); + } + + public void EnqueueToSTPNextNWorkItem(int count) + { + for (int i = 0; i < count; ++i) + { + EnqueueToSTPNextWorkItem(null, false); + } + } + + private object FireOnIdle(object state) + { + FireOnIdleImpl(_onIdle); + return null; + } + + [MethodImpl(MethodImplOptions.NoInlining)] + private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle) + { + if(null == onIdle) + { + return; + } + + Delegate[] delegates = onIdle.GetInvocationList(); + foreach(WorkItemsGroupIdleHandler eh in delegates) + { + try + { + eh(this); + } + catch { } // Suppress exceptions + } + } + + private void OnWorkItemStartedCallback(WorkItem workItem) + { + lock(_lock) + { + ++_workItemsExecutingInStp; + } + } + + private void OnWorkItemCompletedCallback(WorkItem workItem) + { + EnqueueToSTPNextWorkItem(null, true); + } + + internal override void Enqueue(WorkItem workItem) + { + EnqueueToSTPNextWorkItem(workItem); + } + + private void EnqueueToSTPNextWorkItem(WorkItem workItem) + { + EnqueueToSTPNextWorkItem(workItem, false); + } + + private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue) + { + lock(_lock) + { + // Got here from OnWorkItemCompletedCallback() + if (decrementWorkItemsInStpQueue) + { + --_workItemsInStpQueue; + + if(_workItemsInStpQueue < 0) + { + _workItemsInStpQueue = 0; + } + + --_workItemsExecutingInStp; + + if(_workItemsExecutingInStp < 0) + { + _workItemsExecutingInStp = 0; + } + } + + // If the work item is not null then enqueue it + if (null != workItem) + { + workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup; + + RegisterToWorkItemCompletion(workItem.GetWorkItemResult()); + _workItemsQueue.Enqueue(workItem); + //_stp.IncrementWorkItemsCount(); + + if ((1 == _workItemsQueue.Count) && + (0 == _workItemsInStpQueue)) + { + _stp.RegisterWorkItemsGroup(this); + IsIdle = false; + _isIdleWaitHandle.Reset(); + } + } + + // If the work items queue of the group is empty than quit + if (0 == _workItemsQueue.Count) + { + if (0 == _workItemsInStpQueue) + { + _stp.UnregisterWorkItemsGroup(this); + IsIdle = true; + _isIdleWaitHandle.Set(); + if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0) + { + _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle)); + } + } + return; + } + + if (!_isSuspended) + { + if (_workItemsInStpQueue < _concurrency) + { + WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem; + try + { + _stp.Enqueue(nextWorkItem); + } + catch (ObjectDisposedException e) + { + e.GetHashCode(); + // The STP has been shutdown + } + + ++_workItemsInStpQueue; + } + } + } + } + + #endregion + } + + #endregion +} -- cgit v1.1