From 1a47ff8094ee414a47aebd310826906d89428a09 Mon Sep 17 00:00:00 2001
From: Teravus Ovares
Date: Fri, 30 May 2008 12:27:06 +0000
Subject: * This is Melanie's XEngine script engine. I've not tested this real
well, however, it's confirmed to compile and OpenSimulator to run
successfully without this script engine active.
---
ThirdParty/SmartThreadPool/SmartThreadPool.cs | 1438 +++++++++++++++++++++++++
1 file changed, 1438 insertions(+)
create mode 100644 ThirdParty/SmartThreadPool/SmartThreadPool.cs
(limited to 'ThirdParty/SmartThreadPool/SmartThreadPool.cs')
diff --git a/ThirdParty/SmartThreadPool/SmartThreadPool.cs b/ThirdParty/SmartThreadPool/SmartThreadPool.cs
new file mode 100644
index 0000000..c21984e
--- /dev/null
+++ b/ThirdParty/SmartThreadPool/SmartThreadPool.cs
@@ -0,0 +1,1438 @@
+// Ami Bar
+// amibar@gmail.com
+//
+// Smart thread pool in C#.
+// 7 Aug 2004 - Initial release
+// 14 Sep 2004 - Bug fixes
+// 15 Oct 2004 - Added new features
+// - Work items return result.
+// - Support waiting synchronization for multiple work items.
+// - Work items can be cancelled.
+// - Passage of the caller thread’s context to the thread in the pool.
+// - Minimal usage of WIN32 handles.
+// - Minor bug fixes.
+// 26 Dec 2004 - Changes:
+// - Removed static constructors.
+// - Added finalizers.
+// - Changed Exceptions so they are serializable.
+// - Fixed the bug in one of the SmartThreadPool constructors.
+// - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters.
+// The SmartThreadPool.WaitAny() is still limited by the .NET Framework.
+// - Added PostExecute with options on which cases to call it.
+// - Added option to dispose of the state objects.
+// - Added a WaitForIdle() method that waits until the work items queue is empty.
+// - Added an STPStartInfo class for the initialization of the thread pool.
+// - Changed exception handling so if a work item throws an exception it
+// is rethrown at GetResult(), rather then firing an UnhandledException event.
+// Note that PostExecute exception are always ignored.
+// 25 Mar 2005 - Changes:
+// - Fixed lost of work items bug
+// 3 Jul 2005: Changes.
+// - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, hardly reconstructed.
+// 16 Aug 2005: Changes.
+// - Fixed bug where the InUseThreads becomes negative when canceling work items.
+//
+// 31 Jan 2006 - Changes:
+// - Added work items priority
+// - Removed support of chained delegates in callbacks and post executes (nobody really use this)
+// - Added work items groups
+// - Added work items groups idle event
+// - Changed SmartThreadPool.WaitAll() behavior so when it gets empty array
+// it returns true rather then throwing an exception.
+// - Added option to start the STP and the WIG as suspended
+// - Exception behavior changed, the real exception is returned by an
+// inner exception
+// - Added option to keep the Http context of the caller thread. (Thanks to Steven T.)
+// - Added performance counters
+// - Added priority to the threads in the pool
+//
+// 13 Feb 2006 - Changes:
+// - Added a call to the dispose of the Performance Counter so
+// their won't be a Performance Counter leak.
+// - Added exception catch in case the Performance Counters cannot
+// be created.
+
+using System;
+using System.Security;
+using System.Threading;
+using System.Collections;
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+
+using Amib.Threading.Internal;
+
+namespace Amib.Threading
+{
+ #region SmartThreadPool class
+ ///
+ /// Smart thread pool class.
+ ///
+ public class SmartThreadPool : IWorkItemsGroup, IDisposable
+ {
+ #region Default Constants
+
+ ///
+ /// Default minimum number of threads the thread pool contains. (0)
+ ///
+ public const int DefaultMinWorkerThreads = 0;
+
+ ///
+ /// Default maximum number of threads the thread pool contains. (25)
+ ///
+ public const int DefaultMaxWorkerThreads = 25;
+
+ ///
+ /// Default idle timeout in milliseconds. (One minute)
+ ///
+ public const int DefaultIdleTimeout = 60*1000; // One minute
+
+ ///
+ /// Indicate to copy the security context of the caller and then use it in the call. (false)
+ ///
+ public const bool DefaultUseCallerCallContext = false;
+
+ ///
+ /// Indicate to copy the HTTP context of the caller and then use it in the call. (false)
+ ///
+ public const bool DefaultUseCallerHttpContext = false;
+
+ ///
+ /// Indicate to dispose of the state objects if they support the IDispose interface. (false)
+ ///
+ public const bool DefaultDisposeOfStateObjects = false;
+
+ ///
+ /// The default option to run the post execute
+ ///
+ public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always;
+
+ ///
+ /// The default post execute method to run.
+ /// When null it means not to call it.
+ ///
+ public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback = null;
+
+ ///
+ /// The default work item priority
+ ///
+ public const WorkItemPriority DefaultWorkItemPriority = WorkItemPriority.Normal;
+
+ ///
+ /// The default is to work on work items as soon as they arrive
+ /// and not to wait for the start.
+ ///
+ public const bool DefaultStartSuspended = false;
+
+ ///
+ /// The default is not to use the performance counters
+ ///
+ public static readonly string DefaultPerformanceCounterInstanceName = null;
+
+ public static readonly int DefaultStackSize = 0;
+
+ ///
+ /// The default thread priority
+ ///
+ public const ThreadPriority DefaultThreadPriority = ThreadPriority.Normal;
+
+ #endregion
+
+ #region Member Variables
+
+ ///
+ /// Contains the name of this instance of SmartThreadPool.
+ /// Can be changed by the user.
+ ///
+ private string _name = "SmartThreadPool";
+
+ ///
+ /// Hashtable of all the threads in the thread pool.
+ ///
+ private Hashtable _workerThreads = Hashtable.Synchronized(new Hashtable());
+
+ ///
+ /// Queue of work items.
+ ///
+ private WorkItemsQueue _workItemsQueue = new WorkItemsQueue();
+
+ ///
+ /// Count the work items handled.
+ /// Used by the performance counter.
+ ///
+ private long _workItemsProcessed = 0;
+
+ ///
+ /// Number of threads that currently work (not idle).
+ ///
+ private int _inUseWorkerThreads = 0;
+
+ ///
+ /// Start information to use.
+ /// It is simpler than providing many constructors.
+ ///
+ private STPStartInfo _stpStartInfo = new STPStartInfo();
+
+ ///
+ /// Total number of work items that are stored in the work items queue
+ /// plus the work items that the threads in the pool are working on.
+ ///
+ private int _currentWorkItemsCount = 0;
+
+ ///
+ /// Signaled when the thread pool is idle, i.e. no thread is busy
+ /// and the work items queue is empty
+ ///
+ private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
+
+ ///
+ /// An event to signal all the threads to quit immediately.
+ ///
+ private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false);
+
+ ///
+ /// A flag to indicate the threads to quit.
+ ///
+ private bool _shutdown = false;
+
+ ///
+ /// Counts the threads created in the pool.
+ /// It is used to name the threads.
+ ///
+ private int _threadCounter = 0;
+
+ ///
+ /// Indicate that the SmartThreadPool has been disposed
+ ///
+ private bool _isDisposed = false;
+
+ ///
+ /// Event to send that the thread pool is idle
+ ///
+ private event EventHandler _stpIdle;
+
+ ///
+ /// On idle event
+ ///
+ //private event WorkItemsGroupIdleHandler _onIdle;
+
+ ///
+ /// Holds all the WorkItemsGroup instaces that have at least one
+ /// work item int the SmartThreadPool
+ /// This variable is used in case of Shutdown
+ ///
+ private Hashtable _workItemsGroups = Hashtable.Synchronized(new Hashtable());
+
+ ///
+ /// A reference from each thread in the thread pool to its SmartThreadPool
+ /// object container.
+ /// With this variable a thread can know whatever it belongs to a
+ /// SmartThreadPool.
+ ///
+ [ThreadStatic]
+ private static SmartThreadPool _smartThreadPool;
+
+ ///
+ /// A reference to the current work item a thread from the thread pool
+ /// is executing.
+ ///
+ [ThreadStatic]
+ private static WorkItem _currentWorkItem;
+
+ ///
+ /// STP performance counters
+ ///
+ private ISTPInstancePerformanceCounters _pcs = NullSTPInstancePerformanceCounters.Instance;
+
+ #endregion
+
+ #region Construction and Finalization
+
+ ///
+ /// Constructor
+ ///
+ public SmartThreadPool()
+ {
+ Initialize();
+ }
+
+ ///
+ /// Constructor
+ ///
+ /// Idle timeout in milliseconds
+ public SmartThreadPool(int idleTimeout)
+ {
+ _stpStartInfo.IdleTimeout = idleTimeout;
+ Initialize();
+ }
+
+ ///
+ /// Constructor
+ ///
+ /// Idle timeout in milliseconds
+ /// Upper limit of threads in the pool
+ public SmartThreadPool(
+ int idleTimeout,
+ int maxWorkerThreads)
+ {
+ _stpStartInfo.IdleTimeout = idleTimeout;
+ _stpStartInfo.MaxWorkerThreads = maxWorkerThreads;
+ Initialize();
+ }
+
+ ///
+ /// Constructor
+ ///
+ /// Idle timeout in milliseconds
+ /// Upper limit of threads in the pool
+ /// Lower limit of threads in the pool
+ public SmartThreadPool(
+ int idleTimeout,
+ int maxWorkerThreads,
+ int minWorkerThreads)
+ {
+ _stpStartInfo.IdleTimeout = idleTimeout;
+ _stpStartInfo.MaxWorkerThreads = maxWorkerThreads;
+ _stpStartInfo.MinWorkerThreads = minWorkerThreads;
+ Initialize();
+ }
+
+ ///
+ /// Constructor
+ ///
+ public SmartThreadPool(STPStartInfo stpStartInfo)
+ {
+ _stpStartInfo = new STPStartInfo(stpStartInfo);
+ Initialize();
+ }
+
+ private void Initialize()
+ {
+ ValidateSTPStartInfo();
+
+ if (null != _stpStartInfo.PerformanceCounterInstanceName)
+ {
+ try
+ {
+ _pcs = new STPInstancePerformanceCounters(_stpStartInfo.PerformanceCounterInstanceName);
+ }
+ catch(Exception e)
+ {
+ Debug.WriteLine("Unable to create Performance Counters: " + e.ToString());
+ _pcs = NullSTPInstancePerformanceCounters.Instance;
+ }
+ }
+
+ StartOptimalNumberOfThreads();
+ }
+
+ private void StartOptimalNumberOfThreads()
+ {
+ int threadsCount = Math.Max(_workItemsQueue.Count, _stpStartInfo.MinWorkerThreads);
+ threadsCount = Math.Min(threadsCount, _stpStartInfo.MaxWorkerThreads);
+ StartThreads(threadsCount);
+ }
+
+ private void ValidateSTPStartInfo()
+ {
+ if (_stpStartInfo.MinWorkerThreads < 0)
+ {
+ throw new ArgumentOutOfRangeException(
+ "MinWorkerThreads", "MinWorkerThreads cannot be negative");
+ }
+
+ if (_stpStartInfo.MaxWorkerThreads <= 0)
+ {
+ throw new ArgumentOutOfRangeException(
+ "MaxWorkerThreads", "MaxWorkerThreads must be greater than zero");
+ }
+
+ if (_stpStartInfo.MinWorkerThreads > _stpStartInfo.MaxWorkerThreads)
+ {
+ throw new ArgumentOutOfRangeException(
+ "MinWorkerThreads, maxWorkerThreads",
+ "MaxWorkerThreads must be greater or equal to MinWorkerThreads");
+ }
+ }
+
+ private void ValidateCallback(Delegate callback)
+ {
+ if(callback.GetInvocationList().Length > 1)
+ {
+ throw new NotSupportedException("SmartThreadPool doesn't support delegates chains");
+ }
+ }
+
+ #endregion
+
+ #region Thread Processing
+
+ ///
+ /// Waits on the queue for a work item, shutdown, or timeout.
+ ///
+ ///
+ /// Returns the WaitingCallback or null in case of timeout or shutdown.
+ ///
+ private WorkItem Dequeue()
+ {
+ WorkItem workItem =
+ _workItemsQueue.DequeueWorkItem(_stpStartInfo.IdleTimeout, _shuttingDownEvent);
+
+ return workItem;
+ }
+
+ ///
+ /// Put a new work item in the queue
+ ///
+ /// A work item to queue
+ private void Enqueue(WorkItem workItem)
+ {
+ Enqueue(workItem, true);
+ }
+
+ ///
+ /// Put a new work item in the queue
+ ///
+ /// A work item to queue
+ internal void Enqueue(WorkItem workItem, bool incrementWorkItems)
+ {
+ // Make sure the workItem is not null
+ Debug.Assert(null != workItem);
+
+ if (incrementWorkItems)
+ {
+ IncrementWorkItemsCount();
+ }
+
+ _workItemsQueue.EnqueueWorkItem(workItem);
+ workItem.WorkItemIsQueued();
+
+ // If all the threads are busy then try to create a new one
+ if ((InUseThreads + WaitingCallbacks) > _workerThreads.Count)
+ {
+ StartThreads(1);
+ }
+ }
+
+ private void IncrementWorkItemsCount()
+ {
+ _pcs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
+
+ int count = Interlocked.Increment(ref _currentWorkItemsCount);
+ //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
+ if (count == 1)
+ {
+ //Trace.WriteLine("STP is NOT idle");
+ _isIdleWaitHandle.Reset();
+ }
+ }
+
+ private void DecrementWorkItemsCount()
+ {
+ ++_workItemsProcessed;
+
+ // The counter counts even if the work item was cancelled
+ _pcs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
+
+ int count = Interlocked.Decrement(ref _currentWorkItemsCount);
+ //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
+ if (count == 0)
+ {
+ //Trace.WriteLine("STP is idle");
+ _isIdleWaitHandle.Set();
+ }
+ }
+
+ internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
+ {
+ _workItemsGroups[workItemsGroup] = workItemsGroup;
+ }
+
+ internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
+ {
+ if (_workItemsGroups.Contains(workItemsGroup))
+ {
+ _workItemsGroups.Remove(workItemsGroup);
+ }
+ }
+
+ ///
+ /// Inform that the current thread is about to quit or quiting.
+ /// The same thread may call this method more than once.
+ ///
+ private void InformCompleted()
+ {
+ // There is no need to lock the two methods together
+ // since only the current thread removes itself
+ // and the _workerThreads is a synchronized hashtable
+ if (_workerThreads.Contains(Thread.CurrentThread))
+ {
+ _workerThreads.Remove(Thread.CurrentThread);
+ _pcs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
+ }
+ }
+
+ ///
+ /// Starts new threads
+ ///
+ /// The number of threads to start
+ private void StartThreads(int threadsCount)
+ {
+ if (_stpStartInfo.StartSuspended)
+ {
+ return;
+ }
+
+ lock(_workerThreads.SyncRoot)
+ {
+ // Don't start threads on shut down
+ if (_shutdown)
+ {
+ return;
+ }
+
+ for(int i = 0; i < threadsCount; ++i)
+ {
+ // Don't create more threads then the upper limit
+ if (_workerThreads.Count >= _stpStartInfo.MaxWorkerThreads)
+ {
+ return;
+ }
+
+ // Create a new thread
+ Thread workerThread = new Thread(new ThreadStart(ProcessQueuedItems), _stpStartInfo.StackSize);
+
+ // Configure the new thread and start it
+ workerThread.Name = "STP " + Name + " Thread #" + _threadCounter;
+ workerThread.IsBackground = true;
+ workerThread.Priority = _stpStartInfo.ThreadPriority;
+ workerThread.Start();
+ ++_threadCounter;
+
+ // Add the new thread to the hashtable and update its creation
+ // time.
+ _workerThreads[workerThread] = DateTime.Now;
+ _pcs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
+ }
+ }
+ }
+
+ ///
+ /// A worker thread method that processes work items from the work items queue.
+ ///
+ private void ProcessQueuedItems()
+ {
+ // Initialize the _smartThreadPool variable
+ _smartThreadPool = this;
+
+ try
+ {
+ bool bInUseWorkerThreadsWasIncremented = false;
+
+ // Process until shutdown.
+ while(!_shutdown)
+ {
+ // Update the last time this thread was seen alive.
+ // It's good for debugging.
+ _workerThreads[Thread.CurrentThread] = DateTime.Now;
+
+ // Wait for a work item, shutdown, or timeout
+ WorkItem workItem = Dequeue();
+
+ // Update the last time this thread was seen alive.
+ // It's good for debugging.
+ _workerThreads[Thread.CurrentThread] = DateTime.Now;
+
+ // On timeout or shut down.
+ if (null == workItem)
+ {
+ // Double lock for quit.
+ if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
+ {
+ lock(_workerThreads.SyncRoot)
+ {
+ if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
+ {
+ // Inform that the thread is quiting and then quit.
+ // This method must be called within this lock or else
+ // more threads will quit and the thread pool will go
+ // below the lower limit.
+ InformCompleted();
+ break;
+ }
+ }
+ }
+ }
+
+ // If we didn't quit then skip to the next iteration.
+ if (null == workItem)
+ {
+ continue;
+ }
+
+ try
+ {
+ // Initialize the value to false
+ bInUseWorkerThreadsWasIncremented = false;
+
+ // Change the state of the work item to 'in progress' if possible.
+ // We do it here so if the work item has been canceled we won't
+ // increment the _inUseWorkerThreads.
+ // The cancel mechanism doesn't delete items from the queue,
+ // it marks the work item as canceled, and when the work item
+ // is dequeued, we just skip it.
+ // If the post execute of work item is set to always or to
+ // call when the work item is canceled then the StartingWorkItem()
+ // will return true, so the post execute can run.
+ if (!workItem.StartingWorkItem())
+ {
+ continue;
+ }
+
+ // Execute the callback. Make sure to accurately
+ // record how many callbacks are currently executing.
+ int inUseWorkerThreads = Interlocked.Increment(ref _inUseWorkerThreads);
+ _pcs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
+
+ // Mark that the _inUseWorkerThreads incremented, so in the finally{}
+ // statement we will decrement it correctly.
+ bInUseWorkerThreadsWasIncremented = true;
+
+ // Set the _currentWorkItem to the current work item
+ _currentWorkItem = workItem;
+
+ lock(workItem)
+ {
+ workItem.currentThread = Thread.CurrentThread;
+ }
+
+ ExecuteWorkItem(workItem);
+
+ lock(workItem)
+ {
+ workItem.currentThread = null;
+ }
+
+ }
+ catch(ThreadAbortException ex)
+ {
+ lock(workItem)
+ {
+ workItem.currentThread = null;
+ }
+ ex.GetHashCode();
+ Thread.ResetAbort();
+ }
+ catch(Exception ex)
+ {
+ ex.GetHashCode();
+ // Do nothing
+ }
+ finally
+ {
+ lock(workItem)
+ {
+ workItem.currentThread = null;
+ }
+
+ if (null != workItem)
+ {
+ workItem.DisposeOfState();
+ }
+
+ // Set the _currentWorkItem to null, since we
+ // no longer run user's code.
+ _currentWorkItem = null;
+
+ // Decrement the _inUseWorkerThreads only if we had
+ // incremented it. Note the cancelled work items don't
+ // increment _inUseWorkerThreads.
+ if (bInUseWorkerThreadsWasIncremented)
+ {
+ int inUseWorkerThreads = Interlocked.Decrement(ref _inUseWorkerThreads);
+ _pcs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
+ }
+
+ // Notify that the work item has been completed.
+ // WorkItemsGroup may enqueue their next work item.
+ workItem.FireWorkItemCompleted();
+
+ // Decrement the number of work items here so the idle
+ // ManualResetEvent won't fluctuate.
+ DecrementWorkItemsCount();
+ }
+ }
+ }
+ catch(ThreadAbortException tae)
+ {
+ tae.GetHashCode();
+ // Handle the abort exception gracfully.
+ Thread.ResetAbort();
+ }
+ catch(Exception e)
+ {
+ Debug.Assert(null != e);
+ }
+ finally
+ {
+ InformCompleted();
+ }
+ }
+
+ private void ExecuteWorkItem(WorkItem workItem)
+ {
+ _pcs.SampleWorkItemsWaitTime(workItem.WaitingTime);
+ try
+ {
+ workItem.Execute();
+ }
+ catch
+ {
+ throw;
+ }
+ finally
+ {
+ _pcs.SampleWorkItemsProcessTime(workItem.ProcessTime);
+ }
+ }
+
+
+ #endregion
+
+ #region Public Methods
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(WorkItemCallback callback)
+ {
+ ValidateNotDisposed();
+ ValidateCallback(callback);
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback);
+ Enqueue(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ /// The priority of the work item
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority)
+ {
+ ValidateNotDisposed();
+ ValidateCallback(callback);
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, workItemPriority);
+ Enqueue(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// Work item info
+ /// A callback to execute
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback)
+ {
+ ValidateNotDisposed();
+ ValidateCallback(callback);
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, workItemInfo, callback);
+ Enqueue(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ ///
+ /// The context object of the work item. Used for passing arguments to the work item.
+ ///
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state)
+ {
+ ValidateNotDisposed();
+ ValidateCallback(callback);
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state);
+ Enqueue(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ ///
+ /// The context object of the work item. Used for passing arguments to the work item.
+ ///
+ /// The work item priority
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority)
+ {
+ ValidateNotDisposed();
+ ValidateCallback(callback);
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, workItemPriority);
+ Enqueue(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// Work item information
+ /// A callback to execute
+ ///
+ /// The context object of the work item. Used for passing arguments to the work item.
+ ///
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state)
+ {
+ ValidateNotDisposed();
+ ValidateCallback(callback);
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, workItemInfo, callback, state);
+ Enqueue(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ ///
+ /// The context object of the work item. Used for passing arguments to the work item.
+ ///
+ ///
+ /// A delegate to call after the callback completion
+ ///
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(
+ WorkItemCallback callback,
+ object state,
+ PostExecuteWorkItemCallback postExecuteWorkItemCallback)
+ {
+ ValidateNotDisposed();
+ ValidateCallback(callback);
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback);
+ Enqueue(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ ///
+ /// The context object of the work item. Used for passing arguments to the work item.
+ ///
+ ///
+ /// A delegate to call after the callback completion
+ ///
+ /// The work item priority
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(
+ WorkItemCallback callback,
+ object state,
+ PostExecuteWorkItemCallback postExecuteWorkItemCallback,
+ WorkItemPriority workItemPriority)
+ {
+ ValidateNotDisposed();
+ ValidateCallback(callback);
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority);
+ Enqueue(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ ///
+ /// The context object of the work item. Used for passing arguments to the work item.
+ ///
+ ///
+ /// A delegate to call after the callback completion
+ ///
+ /// Indicates on which cases to call to the post execute callback
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(
+ WorkItemCallback callback,
+ object state,
+ PostExecuteWorkItemCallback postExecuteWorkItemCallback,
+ CallToPostExecute callToPostExecute)
+ {
+ ValidateNotDisposed();
+ ValidateCallback(callback);
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute);
+ Enqueue(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Queue a work item
+ ///
+ /// A callback to execute
+ ///
+ /// The context object of the work item. Used for passing arguments to the work item.
+ ///
+ ///
+ /// A delegate to call after the callback completion
+ ///
+ /// Indicates on which cases to call to the post execute callback
+ /// The work item priority
+ /// Returns a work item result
+ public IWorkItemResult QueueWorkItem(
+ WorkItemCallback callback,
+ object state,
+ PostExecuteWorkItemCallback postExecuteWorkItemCallback,
+ CallToPostExecute callToPostExecute,
+ WorkItemPriority workItemPriority)
+ {
+ ValidateNotDisposed();
+ ValidateCallback(callback);
+ WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority);
+ Enqueue(workItem);
+ return workItem.GetWorkItemResult();
+ }
+
+ ///
+ /// Wait for the thread pool to be idle
+ ///
+ public void WaitForIdle()
+ {
+ WaitForIdle(Timeout.Infinite);
+ }
+
+ ///
+ /// Wait for the thread pool to be idle
+ ///
+ public bool WaitForIdle(TimeSpan timeout)
+ {
+ return WaitForIdle((int)timeout.TotalMilliseconds);
+ }
+
+ ///
+ /// Wait for the thread pool to be idle
+ ///
+ public bool WaitForIdle(int millisecondsTimeout)
+ {
+ ValidateWaitForIdle();
+ return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false);
+ }
+
+ private void ValidateWaitForIdle()
+ {
+ if(_smartThreadPool == this)
+ {
+ throw new NotSupportedException(
+ "WaitForIdle cannot be called from a thread on its SmartThreadPool, it will cause may cause a deadlock");
+ }
+ }
+
+ internal void ValidateWorkItemsGroupWaitForIdle(IWorkItemsGroup workItemsGroup)
+ {
+ ValidateWorkItemsGroupWaitForIdleImpl(workItemsGroup, SmartThreadPool._currentWorkItem);
+ if ((null != workItemsGroup) &&
+ (null != SmartThreadPool._currentWorkItem) &&
+ SmartThreadPool._currentWorkItem.WasQueuedBy(workItemsGroup))
+ {
+ throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it will cause may cause a deadlock");
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ private void ValidateWorkItemsGroupWaitForIdleImpl(IWorkItemsGroup workItemsGroup, WorkItem workItem)
+ {
+ if ((null != workItemsGroup) &&
+ (null != workItem) &&
+ workItem.WasQueuedBy(workItemsGroup))
+ {
+ throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it will cause may cause a deadlock");
+ }
+ }
+
+
+
+ ///
+ /// Force the SmartThreadPool to shutdown
+ ///
+ public void Shutdown()
+ {
+ Shutdown(true, 0);
+ }
+
+ public void Shutdown(bool forceAbort, TimeSpan timeout)
+ {
+ Shutdown(forceAbort, (int)timeout.TotalMilliseconds);
+ }
+
+ ///
+ /// Empties the queue of work items and abort the threads in the pool.
+ ///
+ public void Shutdown(bool forceAbort, int millisecondsTimeout)
+ {
+ ValidateNotDisposed();
+
+ ISTPInstancePerformanceCounters pcs = _pcs;
+
+ if (NullSTPInstancePerformanceCounters.Instance != _pcs)
+ {
+ _pcs.Dispose();
+ // Set the _pcs to "null" to stop updating the performance
+ // counters
+ _pcs = NullSTPInstancePerformanceCounters.Instance;
+ }
+
+ Thread [] threads = null;
+ lock(_workerThreads.SyncRoot)
+ {
+ // Shutdown the work items queue
+ _workItemsQueue.Dispose();
+
+ // Signal the threads to exit
+ _shutdown = true;
+ _shuttingDownEvent.Set();
+
+ // Make a copy of the threads' references in the pool
+ threads = new Thread [_workerThreads.Count];
+ _workerThreads.Keys.CopyTo(threads, 0);
+ }
+
+ int millisecondsLeft = millisecondsTimeout;
+ DateTime start = DateTime.Now;
+ bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout);
+ bool timeout = false;
+
+ // Each iteration we update the time left for the timeout.
+ foreach(Thread thread in threads)
+ {
+ // Join don't work with negative numbers
+ if (!waitInfinitely && (millisecondsLeft < 0))
+ {
+ timeout = true;
+ break;
+ }
+
+ // Wait for the thread to terminate
+ bool success = thread.Join(millisecondsLeft);
+ if(!success)
+ {
+ timeout = true;
+ break;
+ }
+
+ if(!waitInfinitely)
+ {
+ // Update the time left to wait
+ TimeSpan ts = DateTime.Now - start;
+ millisecondsLeft = millisecondsTimeout - (int)ts.TotalMilliseconds;
+ }
+ }
+
+ if (timeout && forceAbort)
+ {
+ // Abort the threads in the pool
+ foreach(Thread thread in threads)
+ {
+ if ((thread != null) && thread.IsAlive)
+ {
+ try
+ {
+ thread.Abort("Shutdown");
+ }
+ catch(SecurityException e)
+ {
+ e.GetHashCode();
+ }
+ catch(ThreadStateException ex)
+ {
+ ex.GetHashCode();
+ // In case the thread has been terminated
+ // after the check if it is alive.
+ }
+ }
+ }
+ }
+
+ // Dispose of the performance counters
+ pcs.Dispose();
+ }
+
+ ///
+ /// Wait for all work items to complete
+ ///
+ /// Array of work item result objects
+ ///
+ /// true when every work item in workItemResults has completed; otherwise false.
+ ///
+ public static bool WaitAll(
+ IWorkItemResult [] workItemResults)
+ {
+ return WaitAll(workItemResults, Timeout.Infinite, true);
+ }
+
+ ///
+ /// Wait for all work items to complete
+ ///
+ /// Array of work item result objects
+ /// The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.
+ ///
+ /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
+ ///
+ ///
+ /// true when every work item in workItemResults has completed; otherwise false.
+ ///
+ public static bool WaitAll(
+ IWorkItemResult [] workItemResults,
+ TimeSpan timeout,
+ bool exitContext)
+ {
+ return WaitAll(workItemResults, (int)timeout.TotalMilliseconds, exitContext);
+ }
+
+ ///
+ /// Wait for all work items to complete
+ ///
+ /// Array of work item result objects
+ /// The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.
+ ///
+ /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
+ ///
+ /// A cancel wait handle to interrupt the wait if needed
+ ///
+ /// true when every work item in workItemResults has completed; otherwise false.
+ ///
+ public static bool WaitAll(
+ IWorkItemResult [] workItemResults,
+ TimeSpan timeout,
+ bool exitContext,
+ WaitHandle cancelWaitHandle)
+ {
+ return WaitAll(workItemResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle);
+ }
+
+ ///
+ /// Wait for all work items to complete
+ ///
+ /// Array of work item result objects
+ /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.
+ ///
+ /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
+ ///
+ ///
+ /// true when every work item in workItemResults has completed; otherwise false.
+ ///
+ public static bool WaitAll(
+ IWorkItemResult [] workItemResults,
+ int millisecondsTimeout,
+ bool exitContext)
+ {
+ return WorkItem.WaitAll(workItemResults, millisecondsTimeout, exitContext, null);
+ }
+
+ ///
+ /// Wait for all work items to complete
+ ///
+ /// Array of work item result objects
+ /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.
+ ///
+ /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
+ ///
+ /// A cancel wait handle to interrupt the wait if needed
+ ///
+ /// true when every work item in workItemResults has completed; otherwise false.
+ ///
+ public static bool WaitAll(
+ IWorkItemResult [] workItemResults,
+ int millisecondsTimeout,
+ bool exitContext,
+ WaitHandle cancelWaitHandle)
+ {
+ return WorkItem.WaitAll(workItemResults, millisecondsTimeout, exitContext, cancelWaitHandle);
+ }
+
+
+ ///
+ /// Waits for any of the work items in the specified array to complete, cancel, or timeout
+ ///
+ /// Array of work item result objects
+ ///
+ /// The array index of the work item result that satisfied the wait, or WaitTimeout if any of the work items has been canceled.
+ ///
+ public static int WaitAny(
+ IWorkItemResult [] workItemResults)
+ {
+ return WaitAny(workItemResults, Timeout.Infinite, true);
+ }
+
+ ///
+ /// Waits for any of the work items in the specified array to complete, cancel, or timeout
+ ///
+ /// Array of work item result objects
+ /// The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.
+ ///
+ /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
+ ///
+ ///
+ /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
+ ///
+ public static int WaitAny(
+ IWorkItemResult [] workItemResults,
+ TimeSpan timeout,
+ bool exitContext)
+ {
+ return WaitAny(workItemResults, (int)timeout.TotalMilliseconds, exitContext);
+ }
+
+ ///
+ /// Waits for any of the work items in the specified array to complete, cancel, or timeout
+ ///
+ /// Array of work item result objects
+ /// The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely.
+ ///
+ /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
+ ///
+ /// A cancel wait handle to interrupt the wait if needed
+ ///
+ /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
+ ///
+ public static int WaitAny(
+ IWorkItemResult [] workItemResults,
+ TimeSpan timeout,
+ bool exitContext,
+ WaitHandle cancelWaitHandle)
+ {
+ return WaitAny(workItemResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle);
+ }
+
+ ///
+ /// Waits for any of the work items in the specified array to complete, cancel, or timeout
+ ///
+ /// Array of work item result objects
+ /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.
+ ///
+ /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
+ ///
+ ///
+ /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
+ ///
+ public static int WaitAny(
+ IWorkItemResult [] workItemResults,
+ int millisecondsTimeout,
+ bool exitContext)
+ {
+ return WorkItem.WaitAny(workItemResults, millisecondsTimeout, exitContext, null);
+ }
+
+ ///
+ /// Waits for any of the work items in the specified array to complete, cancel, or timeout
+ ///
+ /// Array of work item result objects
+ /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.
+ ///
+ /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
+ ///
+ /// A cancel wait handle to interrupt the wait if needed
+ ///
+ /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
+ ///
+ public static int WaitAny(
+ IWorkItemResult [] workItemResults,
+ int millisecondsTimeout,
+ bool exitContext,
+ WaitHandle cancelWaitHandle)
+ {
+ return WorkItem.WaitAny(workItemResults, millisecondsTimeout, exitContext, cancelWaitHandle);
+ }
+
+ public IWorkItemsGroup CreateWorkItemsGroup(int concurrency)
+ {
+ IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, _stpStartInfo);
+ return workItemsGroup;
+ }
+
+ public IWorkItemsGroup CreateWorkItemsGroup(int concurrency, WIGStartInfo wigStartInfo)
+ {
+ IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, wigStartInfo);
+ return workItemsGroup;
+ }
+
+ public event WorkItemsGroupIdleHandler OnIdle
+ {
+ add
+ {
+ throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature.");
+ //_onIdle += value;
+ }
+ remove
+ {
+ throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature.");
+ //_onIdle -= value;
+ }
+ }
+
+ public void Cancel()
+ {
+ ICollection workItemsGroups = _workItemsGroups.Values;
+ foreach(WorkItemsGroup workItemsGroup in workItemsGroups)
+ {
+ workItemsGroup.Cancel();
+ }
+ }
+
+ public void Start()
+ {
+ lock (this)
+ {
+ if (!this._stpStartInfo.StartSuspended)
+ {
+ return;
+ }
+ _stpStartInfo.StartSuspended = false;
+ }
+
+ ICollection workItemsGroups = _workItemsGroups.Values;
+ foreach(WorkItemsGroup workItemsGroup in workItemsGroups)
+ {
+ workItemsGroup.OnSTPIsStarting();
+ }
+
+ StartOptimalNumberOfThreads();
+ }
+
+ #endregion
+
+ #region Properties
+
+ ///
+ /// Get/Set the name of the SmartThreadPool instance
+ ///
+ public string Name
+ {
+ get
+ {
+ return _name;
+ }
+
+ set
+ {
+ _name = value;
+ }
+ }
+
+ ///
+ /// Get the lower limit of threads in the pool.
+ ///
+ public int MinThreads
+ {
+ get
+ {
+ ValidateNotDisposed();
+ return _stpStartInfo.MinWorkerThreads;
+ }
+ }
+
+ ///
+ /// Get the upper limit of threads in the pool.
+ ///
+ public int MaxThreads
+ {
+ get
+ {
+ ValidateNotDisposed();
+ return _stpStartInfo.MaxWorkerThreads;
+ }
+ }
+ ///
+ /// Get the number of threads in the thread pool.
+ /// Should be between the lower and the upper limits.
+ ///
+ public int ActiveThreads
+ {
+ get
+ {
+ ValidateNotDisposed();
+ return _workerThreads.Count;
+ }
+ }
+
+ ///
+ /// Get the number of busy (not idle) threads in the thread pool.
+ ///
+ public int InUseThreads
+ {
+ get
+ {
+ ValidateNotDisposed();
+ return _inUseWorkerThreads;
+ }
+ }
+
+ ///
+ /// Get the number of work items in the queue.
+ ///
+ public int WaitingCallbacks
+ {
+ get
+ {
+ ValidateNotDisposed();
+ return _workItemsQueue.Count;
+ }
+ }
+
+
+ public event EventHandler Idle
+ {
+ add
+ {
+ _stpIdle += value;
+ }
+
+ remove
+ {
+ _stpIdle -= value;
+ }
+ }
+
+ #endregion
+
+ #region IDisposable Members
+
+// ~SmartThreadPool()
+// {
+// Dispose();
+// }
+
+ public void Dispose()
+ {
+ if (!_isDisposed)
+ {
+ if (!_shutdown)
+ {
+ Shutdown();
+ }
+
+ if (null != _shuttingDownEvent)
+ {
+ _shuttingDownEvent.Close();
+ _shuttingDownEvent = null;
+ }
+ _workerThreads.Clear();
+ _isDisposed = true;
+ GC.SuppressFinalize(this);
+ }
+ }
+
+ private void ValidateNotDisposed()
+ {
+ if(_isDisposed)
+ {
+ throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
+ }
+ }
+ #endregion
+ }
+ #endregion
+}
--
cgit v1.1