aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/ThirdParty/SmartThreadPool/SmartThreadPool.cs
diff options
context:
space:
mode:
authorJustin Clark-Casey (justincc)2013-05-01 19:01:43 +0100
committerJustin Clark-Casey (justincc)2013-05-01 19:01:43 +0100
commit206fb306a7820cf593570e35ddfa8e7c5a10e449 (patch)
tree0ef0fdf42ddc0b63224af52b62b0bad42f62e352 /ThirdParty/SmartThreadPool/SmartThreadPool.cs
parentFix CAPS to work like they should - do not send caps to the viewer if they're... (diff)
downloadopensim-SC-206fb306a7820cf593570e35ddfa8e7c5a10e449.zip
opensim-SC-206fb306a7820cf593570e35ddfa8e7c5a10e449.tar.gz
opensim-SC-206fb306a7820cf593570e35ddfa8e7c5a10e449.tar.bz2
opensim-SC-206fb306a7820cf593570e35ddfa8e7c5a10e449.tar.xz
Update SmartThreadPool to latest version 2.2.3 with a major and minor change.
SmartThreadPool code comes from http://www.codeproject.com/Articles/7933/Smart-Thread-Pool This version implements thread abort (via WorkItem.Cancel(true)), threadpool naming, max thread stack, etc. so we no longer need to manually patch those. However, two changes have been made to stock 2.2.3. Major change: WorkItem.Cancel(bool abortExecution) in our version does not succeed if the work item was in progress and thread abort was not specified. This is to match previous behaviour where we handle co-operative termination via another mechanism rather than checking WorkItem.IsCanceled. Minor change: Did not add STP's StopWatch implementation as this is only used WinCE and Silverlight and causes a build clash with System.Diagnostics.StopWatch The reason for updating is to see if this improves http://opensimulator.org/mantis/view.php?id=6557 and http://opensimulator.org/mantis/view.php?id=6586
Diffstat (limited to 'ThirdParty/SmartThreadPool/SmartThreadPool.cs')
-rw-r--r--ThirdParty/SmartThreadPool/SmartThreadPool.cs3180
1 files changed, 1732 insertions, 1448 deletions
diff --git a/ThirdParty/SmartThreadPool/SmartThreadPool.cs b/ThirdParty/SmartThreadPool/SmartThreadPool.cs
index 19a0007..9256777 100644
--- a/ThirdParty/SmartThreadPool/SmartThreadPool.cs
+++ b/ThirdParty/SmartThreadPool/SmartThreadPool.cs
@@ -1,1448 +1,1732 @@
1// Ami Bar 1#region Release History
2// amibar@gmail.com 2
3// 3// Smart Thread Pool
4// Smart thread pool in C#. 4// 7 Aug 2004 - Initial release
5// 7 Aug 2004 - Initial release 5//
6// 14 Sep 2004 - Bug fixes 6// 14 Sep 2004 - Bug fixes
7// 15 Oct 2004 - Added new features 7//
8// - Work items return result. 8// 15 Oct 2004 - Added new features
9// - Support waiting synchronization for multiple work items. 9// - Work items return result.
10// - Work items can be cancelled. 10// - Support waiting synchronization for multiple work items.
11// - Passage of the caller thread’s context to the thread in the pool. 11// - Work items can be cancelled.
12// - Minimal usage of WIN32 handles. 12// - Passage of the caller thread’s context to the thread in the pool.
13// - Minor bug fixes. 13// - Minimal usage of WIN32 handles.
14// 26 Dec 2004 - Changes: 14// - Minor bug fixes.
15// - Removed static constructors. 15//
16// - Added finalizers. 16// 26 Dec 2004 - Changes:
17// - Changed Exceptions so they are serializable. 17// - Removed static constructors.
18// - Fixed the bug in one of the SmartThreadPool constructors. 18// - Added finalizers.
19// - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters. 19// - Changed Exceptions so they are serializable.
20// The SmartThreadPool.WaitAny() is still limited by the .NET Framework. 20// - Fixed the bug in one of the SmartThreadPool constructors.
21// - Added PostExecute with options on which cases to call it. 21// - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters.
22// - Added option to dispose of the state objects. 22// The SmartThreadPool.WaitAny() is still limited by the .NET Framework.
23// - Added a WaitForIdle() method that waits until the work items queue is empty. 23// - Added PostExecute with options on which cases to call it.
24// - Added an STPStartInfo class for the initialization of the thread pool. 24// - Added option to dispose of the state objects.
25// - Changed exception handling so if a work item throws an exception it 25// - Added a WaitForIdle() method that waits until the work items queue is empty.
26// is rethrown at GetResult(), rather then firing an UnhandledException event. 26// - Added an STPStartInfo class for the initialization of the thread pool.
27// Note that PostExecute exception are always ignored. 27// - Changed exception handling so if a work item throws an exception it
28// 25 Mar 2005 - Changes: 28// is rethrown at GetResult(), rather then firing an UnhandledException event.
29// - Fixed lost of work items bug 29// Note that PostExecute exception are always ignored.
30// 3 Jul 2005: Changes. 30//
31// - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, hardly reconstructed. 31// 25 Mar 2005 - Changes:
32// 16 Aug 2005: Changes. 32// - Fixed lost of work items bug
33// - Fixed bug where the InUseThreads becomes negative when canceling work items. 33//
34// 34// 3 Jul 2005: Changes.
35// 31 Jan 2006 - Changes: 35// - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, hardly reconstructed.
36// - Added work items priority 36//
37// - Removed support of chained delegates in callbacks and post executes (nobody really use this) 37// 16 Aug 2005: Changes.
38// - Added work items groups 38// - Fixed bug where the InUseThreads becomes negative when canceling work items.
39// - Added work items groups idle event 39//
40// - Changed SmartThreadPool.WaitAll() behavior so when it gets empty array 40// 31 Jan 2006 - Changes:
41// it returns true rather then throwing an exception. 41// - Added work items priority
42// - Added option to start the STP and the WIG as suspended 42// - Removed support of chained delegates in callbacks and post executes (nobody really use this)
43// - Exception behavior changed, the real exception is returned by an 43// - Added work items groups
44// inner exception 44// - Added work items groups idle event
45// - Added option to keep the Http context of the caller thread. (Thanks to Steven T.) 45// - Changed SmartThreadPool.WaitAll() behavior so when it gets empty array
46// - Added performance counters 46// it returns true rather then throwing an exception.
47// - Added priority to the threads in the pool 47// - Added option to start the STP and the WIG as suspended
48// 48// - Exception behavior changed, the real exception is returned by an
49// 13 Feb 2006 - Changes: 49// inner exception
50// - Added a call to the dispose of the Performance Counter so 50// - Added option to keep the Http context of the caller thread. (Thanks to Steven T.)
51// their won't be a Performance Counter leak. 51// - Added performance counters
52// - Added exception catch in case the Performance Counters cannot 52// - Added priority to the threads in the pool
53// be created. 53//
54 54// 13 Feb 2006 - Changes:
55using System; 55// - Added a call to the dispose of the Performance Counter so
56using System.Security; 56// their won't be a Performance Counter leak.
57using System.Threading; 57// - Added exception catch in case the Performance Counters cannot
58using System.Collections; 58// be created.
59using System.Diagnostics; 59//
60using System.Runtime.CompilerServices; 60// 17 May 2008 - Changes:
61 61// - Changed the dispose behavior and removed the Finalizers.
62using Amib.Threading.Internal; 62// - Enabled the change of the MaxThreads and MinThreads at run time.
63 63// - Enabled the change of the Concurrency of a IWorkItemsGroup at run
64namespace Amib.Threading 64// time If the IWorkItemsGroup is a SmartThreadPool then the Concurrency
65{ 65// refers to the MaxThreads.
66 #region SmartThreadPool class 66// - Improved the cancel behavior.
67 /// <summary> 67// - Added events for thread creation and termination.
68 /// Smart thread pool class. 68// - Fixed the HttpContext context capture.
69 /// </summary> 69// - Changed internal collections so they use generic collections
70 public class SmartThreadPool : IWorkItemsGroup, IDisposable 70// - Added IsIdle flag to the SmartThreadPool and IWorkItemsGroup
71 { 71// - Added support for WinCE
72 #region Default Constants 72// - Added support for Action<T> and Func<T>
73 73//
74 /// <summary> 74// 07 April 2009 - Changes:
75 /// Default minimum number of threads the thread pool contains. (0) 75// - Added support for Silverlight and Mono
76 /// </summary> 76// - Added Join, Choice, and Pipe to SmartThreadPool.
77 public const int DefaultMinWorkerThreads = 0; 77// - Added local performance counters (for Mono, Silverlight, and WindowsCE)
78 78// - Changed duration measures from DateTime.Now to Stopwatch.
79 /// <summary> 79// - Queues changed from System.Collections.Queue to System.Collections.Generic.LinkedList<T>.
80 /// Default maximum number of threads the thread pool contains. (25) 80//
81 /// </summary> 81// 21 December 2009 - Changes:
82 public const int DefaultMaxWorkerThreads = 25; 82// - Added work item timeout (passive)
83 83//
84 /// <summary> 84// 20 August 2012 - Changes:
85 /// Default idle timeout in milliseconds. (One minute) 85// - Added set name to threads
86 /// </summary> 86// - Fixed the WorkItemsQueue.Dequeue.
87 public const int DefaultIdleTimeout = 60*1000; // One minute 87// Replaced while (!Monitor.TryEnter(this)); with lock(this) { ... }
88 88// - Fixed SmartThreadPool.Pipe
89 /// <summary> 89// - Added IsBackground option to threads
90 /// Indicate to copy the security context of the caller and then use it in the call. (false) 90// - Added ApartmentState to threads
91 /// </summary> 91// - Fixed thread creation when queuing many work items at the same time.
92 public const bool DefaultUseCallerCallContext = false; 92//
93 93// 24 August 2012 - Changes:
94 /// <summary> 94// - Enabled cancel abort after cancel. See: http://smartthreadpool.codeplex.com/discussions/345937 by alecswan
95 /// Indicate to copy the HTTP context of the caller and then use it in the call. (false) 95// - Added option to set MaxStackSize of threads
96 /// </summary> 96
97 public const bool DefaultUseCallerHttpContext = false; 97#endregion
98 98
99 /// <summary> 99using System;
100 /// Indicate to dispose of the state objects if they support the IDispose interface. (false) 100using System.Security;
101 /// </summary> 101using System.Threading;
102 public const bool DefaultDisposeOfStateObjects = false; 102using System.Collections;
103 103using System.Collections.Generic;
104 /// <summary> 104using System.Diagnostics;
105 /// The default option to run the post execute 105using System.Runtime.CompilerServices;
106 /// </summary> 106
107 public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always; 107using Amib.Threading.Internal;
108 108
109 /// <summary> 109namespace Amib.Threading
110 /// The default post execute method to run. 110{
111 /// When null it means not to call it. 111 #region SmartThreadPool class
112 /// </summary> 112 /// <summary>
113 public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback = null; 113 /// Smart thread pool class.
114 114 /// </summary>
115 /// <summary> 115 public partial class SmartThreadPool : WorkItemsGroupBase, IDisposable
116 /// The default work item priority 116 {
117 /// </summary> 117 #region Public Default Constants
118 public const WorkItemPriority DefaultWorkItemPriority = WorkItemPriority.Normal; 118
119 119 /// <summary>
120 /// <summary> 120 /// Default minimum number of threads the thread pool contains. (0)
121 /// The default is to work on work items as soon as they arrive 121 /// </summary>
122 /// and not to wait for the start. 122 public const int DefaultMinWorkerThreads = 0;
123 /// </summary> 123
124 public const bool DefaultStartSuspended = false; 124 /// <summary>
125 125 /// Default maximum number of threads the thread pool contains. (25)
126 /// <summary> 126 /// </summary>
127 /// The default is not to use the performance counters 127 public const int DefaultMaxWorkerThreads = 25;
128 /// </summary> 128
129 public static readonly string DefaultPerformanceCounterInstanceName = null; 129 /// <summary>
130 130 /// Default idle timeout in milliseconds. (One minute)
131 public static readonly int DefaultStackSize = 0; 131 /// </summary>
132 132 public const int DefaultIdleTimeout = 60*1000; // One minute
133 /// <summary> 133
134 /// The default thread priority 134 /// <summary>
135 /// </summary> 135 /// Indicate to copy the security context of the caller and then use it in the call. (false)
136 public const ThreadPriority DefaultThreadPriority = ThreadPriority.Normal; 136 /// </summary>
137 137 public const bool DefaultUseCallerCallContext = false;
138 /// <summary> 138
139 /// The default thread pool name 139 /// <summary>
140 /// </summary> 140 /// Indicate to copy the HTTP context of the caller and then use it in the call. (false)
141 public const string DefaultThreadPoolName = "SmartThreadPool"; 141 /// </summary>
142 142 public const bool DefaultUseCallerHttpContext = false;
143 #endregion 143
144 144 /// <summary>
145 #region Member Variables 145 /// Indicate to dispose of the state objects if they support the IDispose interface. (false)
146 146 /// </summary>
147 /// <summary> 147 public const bool DefaultDisposeOfStateObjects = false;
148 /// Contains the name of this instance of SmartThreadPool. 148
149 /// Can be changed by the user. 149 /// <summary>
150 /// </summary> 150 /// The default option to run the post execute (CallToPostExecute.Always)
151 private string _name = DefaultThreadPoolName; 151 /// </summary>
152 152 public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always;
153 /// <summary> 153
154 /// Hashtable of all the threads in the thread pool. 154 /// <summary>
155 /// </summary> 155 /// The default post execute method to run. (None)
156 private Hashtable _workerThreads = Hashtable.Synchronized(new Hashtable()); 156 /// When null it means not to call it.
157 157 /// </summary>
158 /// <summary> 158 public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback;
159 /// Queue of work items. 159
160 /// </summary> 160 /// <summary>
161 private WorkItemsQueue _workItemsQueue = new WorkItemsQueue(); 161 /// The default work item priority (WorkItemPriority.Normal)
162 162 /// </summary>
163 /// <summary> 163 public const WorkItemPriority DefaultWorkItemPriority = WorkItemPriority.Normal;
164 /// Count the work items handled. 164
165 /// Used by the performance counter. 165 /// <summary>
166 /// </summary> 166 /// The default is to work on work items as soon as they arrive
167 private long _workItemsProcessed = 0; 167 /// and not to wait for the start. (false)
168 168 /// </summary>
169 /// <summary> 169 public const bool DefaultStartSuspended = false;
170 /// Number of threads that currently work (not idle). 170
171 /// </summary> 171 /// <summary>
172 private int _inUseWorkerThreads = 0; 172 /// The default name to use for the performance counters instance. (null)
173 173 /// </summary>
174 /// <summary> 174 public static readonly string DefaultPerformanceCounterInstanceName;
175 /// Start information to use. 175
176 /// It is simpler than providing many constructors. 176#if !(WINDOWS_PHONE)
177 /// </summary> 177
178 private STPStartInfo _stpStartInfo = new STPStartInfo(); 178 /// <summary>
179 179 /// The default thread priority (ThreadPriority.Normal)
180 /// <summary> 180 /// </summary>
181 /// Total number of work items that are stored in the work items queue 181 public const ThreadPriority DefaultThreadPriority = ThreadPriority.Normal;
182 /// plus the work items that the threads in the pool are working on. 182#endif
183 /// </summary> 183 /// <summary>
184 private int _currentWorkItemsCount = 0; 184 /// The default thread pool name. (SmartThreadPool)
185 185 /// </summary>
186 /// <summary> 186 public const string DefaultThreadPoolName = "SmartThreadPool";
187 /// Signaled when the thread pool is idle, i.e. no thread is busy 187
188 /// and the work items queue is empty 188 /// <summary>
189 /// </summary> 189 /// The default Max Stack Size. (SmartThreadPool)
190 private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); 190 /// </summary>
191 191 public static readonly int? DefaultMaxStackSize = null;
192 /// <summary> 192
193 /// An event to signal all the threads to quit immediately. 193 /// <summary>
194 /// </summary> 194 /// The default fill state with params. (false)
195 private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false); 195 /// It is relevant only to QueueWorkItem of Action&lt;...&gt;/Func&lt;...&gt;
196 196 /// </summary>
197 /// <summary> 197 public const bool DefaultFillStateWithArgs = false;
198 /// A flag to indicate the threads to quit. 198
199 /// </summary> 199 /// <summary>
200 private bool _shutdown = false; 200 /// The default thread backgroundness. (true)
201 201 /// </summary>
202 /// <summary> 202 public const bool DefaultAreThreadsBackground = true;
203 /// Counts the threads created in the pool. 203
204 /// It is used to name the threads. 204#if !(_SILVERLIGHT) && !(WINDOWS_PHONE)
205 /// </summary> 205 /// <summary>
206 private int _threadCounter = 0; 206 /// The default apartment state of a thread in the thread pool.
207 207 /// The default is ApartmentState.Unknown which means the STP will not
208 /// <summary> 208 /// set the apartment of the thread. It will use the .NET default.
209 /// Indicate that the SmartThreadPool has been disposed 209 /// </summary>
210 /// </summary> 210 public const ApartmentState DefaultApartmentState = ApartmentState.Unknown;
211 private bool _isDisposed = false; 211#endif
212 212
213 /// <summary> 213 #endregion
214 /// Event to send that the thread pool is idle 214
215 /// </summary> 215 #region Member Variables
216 private event EventHandler _stpIdle; 216
217 217 /// <summary>
218 /// <summary> 218 /// Dictionary of all the threads in the thread pool.
219 /// On idle event 219 /// </summary>
220 /// </summary> 220 private readonly SynchronizedDictionary<Thread, ThreadEntry> _workerThreads = new SynchronizedDictionary<Thread, ThreadEntry>();
221 //private event WorkItemsGroupIdleHandler _onIdle; 221
222 222 /// <summary>
223 /// <summary> 223 /// Queue of work items.
224 /// Holds all the WorkItemsGroup instaces that have at least one 224 /// </summary>
225 /// work item int the SmartThreadPool 225 private readonly WorkItemsQueue _workItemsQueue = new WorkItemsQueue();
226 /// This variable is used in case of Shutdown 226
227 /// </summary> 227 /// <summary>
228 private Hashtable _workItemsGroups = Hashtable.Synchronized(new Hashtable()); 228 /// Count the work items handled.
229 229 /// Used by the performance counter.
230 /// <summary> 230 /// </summary>
231 /// A reference from each thread in the thread pool to its SmartThreadPool 231 private int _workItemsProcessed;
232 /// object container. 232
233 /// With this variable a thread can know whatever it belongs to a 233 /// <summary>
234 /// SmartThreadPool. 234 /// Number of threads that currently work (not idle).
235 /// </summary> 235 /// </summary>
236 [ThreadStatic] 236 private int _inUseWorkerThreads;
237 private static SmartThreadPool _smartThreadPool; 237
238 238 /// <summary>
239 /// <summary> 239 /// Stores a copy of the original STPStartInfo.
240 /// A reference to the current work item a thread from the thread pool 240 /// It is used to change the MinThread and MaxThreads
241 /// is executing. 241 /// </summary>
242 /// </summary> 242 private STPStartInfo _stpStartInfo;
243 [ThreadStatic] 243
244 private static WorkItem _currentWorkItem; 244 /// <summary>
245 245 /// Total number of work items that are stored in the work items queue
246 /// <summary> 246 /// plus the work items that the threads in the pool are working on.
247 /// STP performance counters 247 /// </summary>
248 /// </summary> 248 private int _currentWorkItemsCount;
249 private ISTPInstancePerformanceCounters _pcs = NullSTPInstancePerformanceCounters.Instance; 249
250 250 /// <summary>
251 #endregion 251 /// Signaled when the thread pool is idle, i.e. no thread is busy
252 252 /// and the work items queue is empty
253 #region Construction and Finalization 253 /// </summary>
254 254 //private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
255 /// <summary> 255 private ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true);
256 /// Constructor 256
257 /// </summary> 257 /// <summary>
258 public SmartThreadPool() 258 /// An event to signal all the threads to quit immediately.
259 { 259 /// </summary>
260 Initialize(); 260 //private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false);
261 } 261 private ManualResetEvent _shuttingDownEvent = EventWaitHandleFactory.CreateManualResetEvent(false);
262 262
263 /// <summary> 263 /// <summary>
264 /// Constructor 264 /// A flag to indicate if the Smart Thread Pool is now suspended.
265 /// </summary> 265 /// </summary>
266 /// <param name="idleTimeout">Idle timeout in milliseconds</param> 266 private bool _isSuspended;
267 public SmartThreadPool(int idleTimeout) 267
268 { 268 /// <summary>
269 _stpStartInfo.IdleTimeout = idleTimeout; 269 /// A flag to indicate the threads to quit.
270 Initialize(); 270 /// </summary>
271 } 271 private bool _shutdown;
272 272
273 /// <summary> 273 /// <summary>
274 /// Constructor 274 /// Counts the threads created in the pool.
275 /// </summary> 275 /// It is used to name the threads.
276 /// <param name="idleTimeout">Idle timeout in milliseconds</param> 276 /// </summary>
277 /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param> 277 private int _threadCounter;
278 public SmartThreadPool( 278
279 int idleTimeout, 279 /// <summary>
280 int maxWorkerThreads) 280 /// Indicate that the SmartThreadPool has been disposed
281 { 281 /// </summary>
282 _stpStartInfo.IdleTimeout = idleTimeout; 282 private bool _isDisposed;
283 _stpStartInfo.MaxWorkerThreads = maxWorkerThreads; 283
284 Initialize(); 284 /// <summary>
285 } 285 /// Holds all the WorkItemsGroup instaces that have at least one
286 286 /// work item int the SmartThreadPool
287 /// <summary> 287 /// This variable is used in case of Shutdown
288 /// Constructor 288 /// </summary>
289 /// </summary> 289 private readonly SynchronizedDictionary<IWorkItemsGroup, IWorkItemsGroup> _workItemsGroups = new SynchronizedDictionary<IWorkItemsGroup, IWorkItemsGroup>();
290 /// <param name="idleTimeout">Idle timeout in milliseconds</param> 290
291 /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param> 291 /// <summary>
292 /// <param name="minWorkerThreads">Lower limit of threads in the pool</param> 292 /// A common object for all the work items int the STP
293 public SmartThreadPool( 293 /// so we can mark them to cancel in O(1)
294 int idleTimeout, 294 /// </summary>
295 int maxWorkerThreads, 295 private CanceledWorkItemsGroup _canceledSmartThreadPool = new CanceledWorkItemsGroup();
296 int minWorkerThreads) 296
297 { 297 /// <summary>
298 _stpStartInfo.IdleTimeout = idleTimeout; 298 /// Windows STP performance counters
299 _stpStartInfo.MaxWorkerThreads = maxWorkerThreads; 299 /// </summary>
300 _stpStartInfo.MinWorkerThreads = minWorkerThreads; 300 private ISTPInstancePerformanceCounters _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
301 Initialize(); 301
302 } 302 /// <summary>
303 303 /// Local STP performance counters
304 /// <summary> 304 /// </summary>
305 /// Constructor 305 private ISTPInstancePerformanceCounters _localPCs = NullSTPInstancePerformanceCounters.Instance;
306 /// </summary> 306
307 public SmartThreadPool(STPStartInfo stpStartInfo) 307
308 { 308#if (WINDOWS_PHONE)
309 _stpStartInfo = new STPStartInfo(stpStartInfo); 309 private static readonly Dictionary<int, ThreadEntry> _threadEntries = new Dictionary<int, ThreadEntry>();
310 Initialize(); 310#elif (_WINDOWS_CE)
311 } 311 private static LocalDataStoreSlot _threadEntrySlot = Thread.AllocateDataSlot();
312 312#else
313 private void Initialize() 313 [ThreadStatic]
314 { 314 private static ThreadEntry _threadEntry;
315 Name = _stpStartInfo.ThreadPoolName; 315
316 ValidateSTPStartInfo(); 316#endif
317 317
318 if (null != _stpStartInfo.PerformanceCounterInstanceName) 318 /// <summary>
319 { 319 /// An event to call after a thread is created, but before
320 try 320 /// it's first use.
321 { 321 /// </summary>
322 _pcs = new STPInstancePerformanceCounters(_stpStartInfo.PerformanceCounterInstanceName); 322 private event ThreadInitializationHandler _onThreadInitialization;
323 } 323
324 catch(Exception e) 324 /// <summary>
325 { 325 /// An event to call when a thread is about to exit, after
326 Debug.WriteLine("Unable to create Performance Counters: " + e.ToString()); 326 /// it is no longer belong to the pool.
327 _pcs = NullSTPInstancePerformanceCounters.Instance; 327 /// </summary>
328 } 328 private event ThreadTerminationHandler _onThreadTermination;
329 } 329
330 330 #endregion
331 StartOptimalNumberOfThreads(); 331
332 } 332 #region Per thread properties
333 333
334 private void StartOptimalNumberOfThreads() 334 /// <summary>
335 { 335 /// A reference to the current work item a thread from the thread pool
336 int threadsCount = Math.Max(_workItemsQueue.Count, _stpStartInfo.MinWorkerThreads); 336 /// is executing.
337 threadsCount = Math.Min(threadsCount, _stpStartInfo.MaxWorkerThreads); 337 /// </summary>
338 StartThreads(threadsCount); 338 internal static ThreadEntry CurrentThreadEntry
339 } 339 {
340 340#if (WINDOWS_PHONE)
341 private void ValidateSTPStartInfo() 341 get
342 { 342 {
343 if (_stpStartInfo.MinWorkerThreads < 0) 343 lock(_threadEntries)
344 { 344 {
345 throw new ArgumentOutOfRangeException( 345 ThreadEntry threadEntry;
346 "MinWorkerThreads", "MinWorkerThreads cannot be negative"); 346 if (_threadEntries.TryGetValue(Thread.CurrentThread.ManagedThreadId, out threadEntry))
347 } 347 {
348 348 return threadEntry;
349 if (_stpStartInfo.MaxWorkerThreads <= 0) 349 }
350 { 350 }
351 throw new ArgumentOutOfRangeException( 351 return null;
352 "MaxWorkerThreads", "MaxWorkerThreads must be greater than zero"); 352 }
353 } 353 set
354 354 {
355 if (_stpStartInfo.MinWorkerThreads > _stpStartInfo.MaxWorkerThreads) 355 lock(_threadEntries)
356 { 356 {
357 throw new ArgumentOutOfRangeException( 357 _threadEntries[Thread.CurrentThread.ManagedThreadId] = value;
358 "MinWorkerThreads, maxWorkerThreads", 358 }
359 "MaxWorkerThreads must be greater or equal to MinWorkerThreads"); 359 }
360 } 360#elif (_WINDOWS_CE)
361 } 361 get
362 362 {
363 private void ValidateCallback(Delegate callback) 363 //Thread.CurrentThread.ManagedThreadId
364 { 364 return Thread.GetData(_threadEntrySlot) as ThreadEntry;
365 if(callback.GetInvocationList().Length > 1) 365 }
366 { 366 set
367 throw new NotSupportedException("SmartThreadPool doesn't support delegates chains"); 367 {
368 } 368 Thread.SetData(_threadEntrySlot, value);
369 } 369 }
370 370#else
371 #endregion 371 get
372 372 {
373 #region Thread Processing 373 return _threadEntry;
374 374 }
375 /// <summary> 375 set
376 /// Waits on the queue for a work item, shutdown, or timeout. 376 {
377 /// </summary> 377 _threadEntry = value;
378 /// <returns> 378 }
379 /// Returns the WaitingCallback or null in case of timeout or shutdown. 379#endif
380 /// </returns> 380 }
381 private WorkItem Dequeue() 381 #endregion
382 { 382
383 WorkItem workItem = 383 #region Construction and Finalization
384 _workItemsQueue.DequeueWorkItem(_stpStartInfo.IdleTimeout, _shuttingDownEvent); 384
385 385 /// <summary>
386 return workItem; 386 /// Constructor
387 } 387 /// </summary>
388 388 public SmartThreadPool()
389 /// <summary> 389 {
390 /// Put a new work item in the queue 390 _stpStartInfo = new STPStartInfo();
391 /// </summary> 391 Initialize();
392 /// <param name="workItem">A work item to queue</param> 392 }
393 private void Enqueue(WorkItem workItem) 393
394 { 394 /// <summary>
395 Enqueue(workItem, true); 395 /// Constructor
396 } 396 /// </summary>
397 397 /// <param name="idleTimeout">Idle timeout in milliseconds</param>
398 /// <summary> 398 public SmartThreadPool(int idleTimeout)
399 /// Put a new work item in the queue 399 {
400 /// </summary> 400 _stpStartInfo = new STPStartInfo
401 /// <param name="workItem">A work item to queue</param> 401 {
402 internal void Enqueue(WorkItem workItem, bool incrementWorkItems) 402 IdleTimeout = idleTimeout,
403 { 403 };
404 // Make sure the workItem is not null 404 Initialize();
405 Debug.Assert(null != workItem); 405 }
406 406
407 if (incrementWorkItems) 407 /// <summary>
408 { 408 /// Constructor
409 IncrementWorkItemsCount(); 409 /// </summary>
410 } 410 /// <param name="idleTimeout">Idle timeout in milliseconds</param>
411 411 /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
412 _workItemsQueue.EnqueueWorkItem(workItem); 412 public SmartThreadPool(
413 workItem.WorkItemIsQueued(); 413 int idleTimeout,
414 414 int maxWorkerThreads)
415 // If all the threads are busy then try to create a new one 415 {
416 if ((InUseThreads + WaitingCallbacks) > _workerThreads.Count) 416 _stpStartInfo = new STPStartInfo
417 { 417 {
418 StartThreads(1); 418 IdleTimeout = idleTimeout,
419 } 419 MaxWorkerThreads = maxWorkerThreads,
420 } 420 };
421 421 Initialize();
422 private void IncrementWorkItemsCount() 422 }
423 { 423
424 _pcs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed); 424 /// <summary>
425 425 /// Constructor
426 int count = Interlocked.Increment(ref _currentWorkItemsCount); 426 /// </summary>
427 //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString()); 427 /// <param name="idleTimeout">Idle timeout in milliseconds</param>
428 if (count == 1) 428 /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
429 { 429 /// <param name="minWorkerThreads">Lower limit of threads in the pool</param>
430 //Trace.WriteLine("STP is NOT idle"); 430 public SmartThreadPool(
431 _isIdleWaitHandle.Reset(); 431 int idleTimeout,
432 } 432 int maxWorkerThreads,
433 } 433 int minWorkerThreads)
434 434 {
435 private void DecrementWorkItemsCount() 435 _stpStartInfo = new STPStartInfo
436 { 436 {
437 ++_workItemsProcessed; 437 IdleTimeout = idleTimeout,
438 438 MaxWorkerThreads = maxWorkerThreads,
439 // The counter counts even if the work item was cancelled 439 MinWorkerThreads = minWorkerThreads,
440 _pcs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed); 440 };
441 441 Initialize();
442 int count = Interlocked.Decrement(ref _currentWorkItemsCount); 442 }
443 //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString()); 443
444 if (count == 0) 444 /// <summary>
445 { 445 /// Constructor
446 //Trace.WriteLine("STP is idle"); 446 /// </summary>
447 _isIdleWaitHandle.Set(); 447 /// <param name="stpStartInfo">A SmartThreadPool configuration that overrides the default behavior</param>
448 } 448 public SmartThreadPool(STPStartInfo stpStartInfo)
449 } 449 {
450 450 _stpStartInfo = new STPStartInfo(stpStartInfo);
451 internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup) 451 Initialize();
452 { 452 }
453 _workItemsGroups[workItemsGroup] = workItemsGroup; 453
454 } 454 private void Initialize()
455 455 {
456 internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup) 456 Name = _stpStartInfo.ThreadPoolName;
457 { 457 ValidateSTPStartInfo();
458 if (_workItemsGroups.Contains(workItemsGroup)) 458
459 { 459 // _stpStartInfoRW stores a read/write copy of the STPStartInfo.
460 _workItemsGroups.Remove(workItemsGroup); 460 // Actually only MaxWorkerThreads and MinWorkerThreads are overwritten
461 } 461
462 } 462 _isSuspended = _stpStartInfo.StartSuspended;
463 463
464 /// <summary> 464#if (_WINDOWS_CE) || (_SILVERLIGHT) || (_MONO) || (WINDOWS_PHONE)
465 /// Inform that the current thread is about to quit or quiting. 465 if (null != _stpStartInfo.PerformanceCounterInstanceName)
466 /// The same thread may call this method more than once. 466 {
467 /// </summary> 467 throw new NotSupportedException("Performance counters are not implemented for Compact Framework/Silverlight/Mono, instead use StpStartInfo.EnableLocalPerformanceCounters");
468 private void InformCompleted() 468 }
469 { 469#else
470 // There is no need to lock the two methods together 470 if (null != _stpStartInfo.PerformanceCounterInstanceName)
471 // since only the current thread removes itself 471 {
472 // and the _workerThreads is a synchronized hashtable 472 try
473 if (_workerThreads.Contains(Thread.CurrentThread)) 473 {
474 { 474 _windowsPCs = new STPInstancePerformanceCounters(_stpStartInfo.PerformanceCounterInstanceName);
475 _workerThreads.Remove(Thread.CurrentThread); 475 }
476 _pcs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); 476 catch (Exception e)
477 } 477 {
478 } 478 Debug.WriteLine("Unable to create Performance Counters: " + e);
479 479 _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
480 /// <summary> 480 }
481 /// Starts new threads 481 }
482 /// </summary> 482#endif
483 /// <param name="threadsCount">The number of threads to start</param> 483
484 private void StartThreads(int threadsCount) 484 if (_stpStartInfo.EnableLocalPerformanceCounters)
485 { 485 {
486 if (_stpStartInfo.StartSuspended) 486 _localPCs = new LocalSTPInstancePerformanceCounters();
487 { 487 }
488 return; 488
489 } 489 // If the STP is not started suspended then start the threads.
490 490 if (!_isSuspended)
491 lock(_workerThreads.SyncRoot) 491 {
492 { 492 StartOptimalNumberOfThreads();
493 // Don't start threads on shut down 493 }
494 if (_shutdown) 494 }
495 { 495
496 return; 496 private void StartOptimalNumberOfThreads()
497 } 497 {
498 498 int threadsCount = Math.Max(_workItemsQueue.Count, _stpStartInfo.MinWorkerThreads);
499 for(int i = 0; i < threadsCount; ++i) 499 threadsCount = Math.Min(threadsCount, _stpStartInfo.MaxWorkerThreads);
500 { 500 threadsCount -= _workerThreads.Count;
501 // Don't create more threads then the upper limit 501 if (threadsCount > 0)
502 if (_workerThreads.Count >= _stpStartInfo.MaxWorkerThreads) 502 {
503 { 503 StartThreads(threadsCount);
504 return; 504 }
505 } 505 }
506 506
507 // Create a new thread 507 private void ValidateSTPStartInfo()
508 Thread workerThread; 508 {
509 if (_stpStartInfo.StackSize > 0) 509 if (_stpStartInfo.MinWorkerThreads < 0)
510 workerThread = new Thread(ProcessQueuedItems, _stpStartInfo.StackSize); 510 {
511 else 511 throw new ArgumentOutOfRangeException(
512 workerThread = new Thread(ProcessQueuedItems); 512 "MinWorkerThreads", "MinWorkerThreads cannot be negative");
513 513 }
514 // Configure the new thread and start it 514
515 workerThread.Name = "STP " + Name + " Thread #" + _threadCounter; 515 if (_stpStartInfo.MaxWorkerThreads <= 0)
516 workerThread.IsBackground = true; 516 {
517 workerThread.Priority = _stpStartInfo.ThreadPriority; 517 throw new ArgumentOutOfRangeException(
518 workerThread.Start(); 518 "MaxWorkerThreads", "MaxWorkerThreads must be greater than zero");
519 ++_threadCounter; 519 }
520 520
521 // Add the new thread to the hashtable and update its creation 521 if (_stpStartInfo.MinWorkerThreads > _stpStartInfo.MaxWorkerThreads)
522 // time. 522 {
523 _workerThreads[workerThread] = DateTime.Now; 523 throw new ArgumentOutOfRangeException(
524 _pcs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); 524 "MinWorkerThreads, maxWorkerThreads",
525 } 525 "MaxWorkerThreads must be greater or equal to MinWorkerThreads");
526 } 526 }
527 } 527 }
528 528
529 /// <summary> 529 private static void ValidateCallback(Delegate callback)
530 /// A worker thread method that processes work items from the work items queue. 530 {
531 /// </summary> 531 if(callback.GetInvocationList().Length > 1)
532 private void ProcessQueuedItems() 532 {
533 { 533 throw new NotSupportedException("SmartThreadPool doesn't support delegates chains");
534 // Initialize the _smartThreadPool variable 534 }
535 _smartThreadPool = this; 535 }
536 536
537 try 537 #endregion
538 { 538
539 bool bInUseWorkerThreadsWasIncremented = false; 539 #region Thread Processing
540 540
541 // Process until shutdown. 541 /// <summary>
542 while(!_shutdown) 542 /// Waits on the queue for a work item, shutdown, or timeout.
543 { 543 /// </summary>
544 // Update the last time this thread was seen alive. 544 /// <returns>
545 // It's good for debugging. 545 /// Returns the WaitingCallback or null in case of timeout or shutdown.
546 _workerThreads[Thread.CurrentThread] = DateTime.Now; 546 /// </returns>
547 547 private WorkItem Dequeue()
548 // Wait for a work item, shutdown, or timeout 548 {
549 WorkItem workItem = Dequeue(); 549 WorkItem workItem =
550 550 _workItemsQueue.DequeueWorkItem(_stpStartInfo.IdleTimeout, _shuttingDownEvent);
551 // Update the last time this thread was seen alive. 551
552 // It's good for debugging. 552 return workItem;
553 _workerThreads[Thread.CurrentThread] = DateTime.Now; 553 }
554 554
555 // On timeout or shut down. 555 /// <summary>
556 if (null == workItem) 556 /// Put a new work item in the queue
557 { 557 /// </summary>
558 // Double lock for quit. 558 /// <param name="workItem">A work item to queue</param>
559 if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads) 559 internal override void Enqueue(WorkItem workItem)
560 { 560 {
561 lock(_workerThreads.SyncRoot) 561 // Make sure the workItem is not null
562 { 562 Debug.Assert(null != workItem);
563 if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads) 563
564 { 564 IncrementWorkItemsCount();
565 // Inform that the thread is quiting and then quit. 565
566 // This method must be called within this lock or else 566 workItem.CanceledSmartThreadPool = _canceledSmartThreadPool;
567 // more threads will quit and the thread pool will go 567 _workItemsQueue.EnqueueWorkItem(workItem);
568 // below the lower limit. 568 workItem.WorkItemIsQueued();
569 InformCompleted(); 569
570 break; 570 // If all the threads are busy then try to create a new one
571 } 571 if (_currentWorkItemsCount > _workerThreads.Count)
572 } 572 {
573 } 573 StartThreads(1);
574 } 574 }
575 575 }
576 // If we didn't quit then skip to the next iteration. 576
577 if (null == workItem) 577 private void IncrementWorkItemsCount()
578 { 578 {
579 continue; 579 _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
580 } 580 _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
581 581
582 try 582 int count = Interlocked.Increment(ref _currentWorkItemsCount);
583 { 583 //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
584 // Initialize the value to false 584 if (count == 1)
585 bInUseWorkerThreadsWasIncremented = false; 585 {
586 586 IsIdle = false;
587 // Change the state of the work item to 'in progress' if possible. 587 _isIdleWaitHandle.Reset();
588 // We do it here so if the work item has been canceled we won't 588 }
589 // increment the _inUseWorkerThreads. 589 }
590 // The cancel mechanism doesn't delete items from the queue, 590
591 // it marks the work item as canceled, and when the work item 591 private void DecrementWorkItemsCount()
592 // is dequeued, we just skip it. 592 {
593 // If the post execute of work item is set to always or to 593 int count = Interlocked.Decrement(ref _currentWorkItemsCount);
594 // call when the work item is canceled then the StartingWorkItem() 594 //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
595 // will return true, so the post execute can run. 595 if (count == 0)
596 if (!workItem.StartingWorkItem()) 596 {
597 { 597 IsIdle = true;
598 continue; 598 _isIdleWaitHandle.Set();
599 } 599 }
600 600
601 // Execute the callback. Make sure to accurately 601 Interlocked.Increment(ref _workItemsProcessed);
602 // record how many callbacks are currently executing. 602
603 int inUseWorkerThreads = Interlocked.Increment(ref _inUseWorkerThreads); 603 if (!_shutdown)
604 _pcs.SampleThreads(_workerThreads.Count, inUseWorkerThreads); 604 {
605 605 // The counter counts even if the work item was cancelled
606 // Mark that the _inUseWorkerThreads incremented, so in the finally{} 606 _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
607 // statement we will decrement it correctly. 607 _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
608 bInUseWorkerThreadsWasIncremented = true; 608 }
609 609
610 // Set the _currentWorkItem to the current work item 610 }
611 _currentWorkItem = workItem; 611
612 612 internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
613 lock(workItem) 613 {
614 { 614 _workItemsGroups[workItemsGroup] = workItemsGroup;
615 workItem.currentThread = Thread.CurrentThread; 615 }
616 } 616
617 617 internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
618 ExecuteWorkItem(workItem); 618 {
619 619 if (_workItemsGroups.Contains(workItemsGroup))
620 lock(workItem) 620 {
621 { 621 _workItemsGroups.Remove(workItemsGroup);
622 workItem.currentThread = null; 622 }
623 } 623 }
624 624
625 } 625 /// <summary>
626 catch(ThreadAbortException ex) 626 /// Inform that the current thread is about to quit or quiting.
627 { 627 /// The same thread may call this method more than once.
628 lock(workItem) 628 /// </summary>
629 { 629 private void InformCompleted()
630 workItem.currentThread = null; 630 {
631 } 631 // There is no need to lock the two methods together
632 ex.GetHashCode(); 632 // since only the current thread removes itself
633 Thread.ResetAbort(); 633 // and the _workerThreads is a synchronized dictionary
634 } 634 if (_workerThreads.Contains(Thread.CurrentThread))
635 catch(Exception ex) 635 {
636 { 636 _workerThreads.Remove(Thread.CurrentThread);
637 ex.GetHashCode(); 637 _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
638 // Do nothing 638 _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
639 } 639 }
640 finally 640 }
641 { 641
642 lock(workItem) 642 /// <summary>
643 { 643 /// Starts new threads
644 workItem.currentThread = null; 644 /// </summary>
645 } 645 /// <param name="threadsCount">The number of threads to start</param>
646 646 private void StartThreads(int threadsCount)
647 if (null != workItem) 647 {
648 { 648 if (_isSuspended)
649 workItem.DisposeOfState(); 649 {
650 } 650 return;
651 651 }
652 // Set the _currentWorkItem to null, since we 652
653 // no longer run user's code. 653 lock(_workerThreads.SyncRoot)
654 _currentWorkItem = null; 654 {
655 655 // Don't start threads on shut down
656 // Decrement the _inUseWorkerThreads only if we had 656 if (_shutdown)
657 // incremented it. Note the cancelled work items don't 657 {
658 // increment _inUseWorkerThreads. 658 return;
659 if (bInUseWorkerThreadsWasIncremented) 659 }
660 { 660
661 int inUseWorkerThreads = Interlocked.Decrement(ref _inUseWorkerThreads); 661 for(int i = 0; i < threadsCount; ++i)
662 _pcs.SampleThreads(_workerThreads.Count, inUseWorkerThreads); 662 {
663 } 663 // Don't create more threads then the upper limit
664 664 if (_workerThreads.Count >= _stpStartInfo.MaxWorkerThreads)
665 // Notify that the work item has been completed. 665 {
666 // WorkItemsGroup may enqueue their next work item. 666 return;
667 workItem.FireWorkItemCompleted(); 667 }
668 668
669 // Decrement the number of work items here so the idle 669 // Create a new thread
670 // ManualResetEvent won't fluctuate. 670
671 DecrementWorkItemsCount(); 671#if (_SILVERLIGHT) || (WINDOWS_PHONE)
672 } 672 Thread workerThread = new Thread(ProcessQueuedItems);
673 } 673#else
674 } 674 Thread workerThread =
675 catch(ThreadAbortException tae) 675 _stpStartInfo.MaxStackSize.HasValue
676 { 676 ? new Thread(ProcessQueuedItems, _stpStartInfo.MaxStackSize.Value)
677 tae.GetHashCode(); 677 : new Thread(ProcessQueuedItems);
678 // Handle the abort exception gracfully. 678#endif
679 Thread.ResetAbort(); 679 // Configure the new thread and start it
680 } 680 workerThread.Name = "STP " + Name + " Thread #" + _threadCounter;
681 catch(Exception e) 681 workerThread.IsBackground = _stpStartInfo.AreThreadsBackground;
682 { 682
683 Debug.Assert(null != e); 683#if !(_SILVERLIGHT) && !(_WINDOWS_CE) && !(WINDOWS_PHONE)
684 } 684 if (_stpStartInfo.ApartmentState != ApartmentState.Unknown)
685 finally 685 {
686 { 686 workerThread.SetApartmentState(_stpStartInfo.ApartmentState);
687 InformCompleted(); 687 }
688 } 688#endif
689 } 689
690 690#if !(_SILVERLIGHT) && !(WINDOWS_PHONE)
691 private void ExecuteWorkItem(WorkItem workItem) 691 workerThread.Priority = _stpStartInfo.ThreadPriority;
692 { 692#endif
693 _pcs.SampleWorkItemsWaitTime(workItem.WaitingTime); 693 workerThread.Start();
694 try 694 ++_threadCounter;
695 { 695
696 workItem.Execute(); 696 // Add it to the dictionary and update its creation time.
697 } 697 _workerThreads[workerThread] = new ThreadEntry(this);
698 catch 698
699 { 699 _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
700 throw; 700 _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
701 } 701 }
702 finally 702 }
703 { 703 }
704 _pcs.SampleWorkItemsProcessTime(workItem.ProcessTime); 704
705 } 705 /// <summary>
706 } 706 /// A worker thread method that processes work items from the work items queue.
707 707 /// </summary>
708 708 private void ProcessQueuedItems()
709 #endregion 709 {
710 710 // Keep the entry of the dictionary as thread's variable to avoid the synchronization locks
711 #region Public Methods 711 // of the dictionary.
712 712 CurrentThreadEntry = _workerThreads[Thread.CurrentThread];
713 /// <summary> 713
714 /// Queue a work item 714 FireOnThreadInitialization();
715 /// </summary> 715
716 /// <param name="callback">A callback to execute</param> 716 try
717 /// <returns>Returns a work item result</returns> 717 {
718 public IWorkItemResult QueueWorkItem(WorkItemCallback callback) 718 bool bInUseWorkerThreadsWasIncremented = false;
719 { 719
720 ValidateNotDisposed(); 720 // Process until shutdown.
721 ValidateCallback(callback); 721 while(!_shutdown)
722 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback); 722 {
723 Enqueue(workItem); 723 // Update the last time this thread was seen alive.
724 return workItem.GetWorkItemResult(); 724 // It's good for debugging.
725 } 725 CurrentThreadEntry.IAmAlive();
726 726
727 /// <summary> 727 // The following block handles the when the MaxWorkerThreads has been
728 /// Queue a work item 728 // incremented by the user at run-time.
729 /// </summary> 729 // Double lock for quit.
730 /// <param name="callback">A callback to execute</param> 730 if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads)
731 /// <param name="workItemPriority">The priority of the work item</param> 731 {
732 /// <returns>Returns a work item result</returns> 732 lock (_workerThreads.SyncRoot)
733 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority) 733 {
734 { 734 if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads)
735 ValidateNotDisposed(); 735 {
736 ValidateCallback(callback); 736 // Inform that the thread is quiting and then quit.
737 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, workItemPriority); 737 // This method must be called within this lock or else
738 Enqueue(workItem); 738 // more threads will quit and the thread pool will go
739 return workItem.GetWorkItemResult(); 739 // below the lower limit.
740 } 740 InformCompleted();
741 741 break;
742 /// <summary> 742 }
743 /// Queue a work item 743 }
744 /// </summary> 744 }
745 /// <param name="workItemInfo">Work item info</param> 745
746 /// <param name="callback">A callback to execute</param> 746 // Wait for a work item, shutdown, or timeout
747 /// <returns>Returns a work item result</returns> 747 WorkItem workItem = Dequeue();
748 public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback) 748
749 { 749 // Update the last time this thread was seen alive.
750 ValidateNotDisposed(); 750 // It's good for debugging.
751 ValidateCallback(callback); 751 CurrentThreadEntry.IAmAlive();
752 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, workItemInfo, callback); 752
753 Enqueue(workItem); 753 // On timeout or shut down.
754 return workItem.GetWorkItemResult(); 754 if (null == workItem)
755 } 755 {
756 756 // Double lock for quit.
757 /// <summary> 757 if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
758 /// Queue a work item 758 {
759 /// </summary> 759 lock(_workerThreads.SyncRoot)
760 /// <param name="callback">A callback to execute</param> 760 {
761 /// <param name="state"> 761 if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
762 /// The context object of the work item. Used for passing arguments to the work item. 762 {
763 /// </param> 763 // Inform that the thread is quiting and then quit.
764 /// <returns>Returns a work item result</returns> 764 // This method must be called within this lock or else
765 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state) 765 // more threads will quit and the thread pool will go
766 { 766 // below the lower limit.
767 ValidateNotDisposed(); 767 InformCompleted();
768 ValidateCallback(callback); 768 break;
769 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state); 769 }
770 Enqueue(workItem); 770 }
771 return workItem.GetWorkItemResult(); 771 }
772 } 772 }
773 773
774 /// <summary> 774 // If we didn't quit then skip to the next iteration.
775 /// Queue a work item 775 if (null == workItem)
776 /// </summary> 776 {
777 /// <param name="callback">A callback to execute</param> 777 continue;
778 /// <param name="state"> 778 }
779 /// The context object of the work item. Used for passing arguments to the work item. 779
780 /// </param> 780 try
781 /// <param name="workItemPriority">The work item priority</param> 781 {
782 /// <returns>Returns a work item result</returns> 782 // Initialize the value to false
783 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority) 783 bInUseWorkerThreadsWasIncremented = false;
784 { 784
785 ValidateNotDisposed(); 785 // Set the Current Work Item of the thread.
786 ValidateCallback(callback); 786 // Store the Current Work Item before the workItem.StartingWorkItem() is called,
787 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, workItemPriority); 787 // so WorkItem.Cancel can work when the work item is between InQueue and InProgress
788 Enqueue(workItem); 788 // states.
789 return workItem.GetWorkItemResult(); 789 // If the work item has been cancelled BEFORE the workItem.StartingWorkItem()
790 } 790 // (work item is in InQueue state) then workItem.StartingWorkItem() will return false.
791 791 // If the work item has been cancelled AFTER the workItem.StartingWorkItem() then
792 /// <summary> 792 // (work item is in InProgress state) then the thread will be aborted
793 /// Queue a work item 793 CurrentThreadEntry.CurrentWorkItem = workItem;
794 /// </summary> 794
795 /// <param name="workItemInfo">Work item information</param> 795 // Change the state of the work item to 'in progress' if possible.
796 /// <param name="callback">A callback to execute</param> 796 // We do it here so if the work item has been canceled we won't
797 /// <param name="state"> 797 // increment the _inUseWorkerThreads.
798 /// The context object of the work item. Used for passing arguments to the work item. 798 // The cancel mechanism doesn't delete items from the queue,
799 /// </param> 799 // it marks the work item as canceled, and when the work item
800 /// <returns>Returns a work item result</returns> 800 // is dequeued, we just skip it.
801 public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state) 801 // If the post execute of work item is set to always or to
802 { 802 // call when the work item is canceled then the StartingWorkItem()
803 ValidateNotDisposed(); 803 // will return true, so the post execute can run.
804 ValidateCallback(callback); 804 if (!workItem.StartingWorkItem())
805 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, workItemInfo, callback, state); 805 {
806 Enqueue(workItem); 806 continue;
807 return workItem.GetWorkItemResult(); 807 }
808 } 808
809 809 // Execute the callback. Make sure to accurately
810 /// <summary> 810 // record how many callbacks are currently executing.
811 /// Queue a work item 811 int inUseWorkerThreads = Interlocked.Increment(ref _inUseWorkerThreads);
812 /// </summary> 812 _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
813 /// <param name="callback">A callback to execute</param> 813 _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
814 /// <param name="state"> 814
815 /// The context object of the work item. Used for passing arguments to the work item. 815 // Mark that the _inUseWorkerThreads incremented, so in the finally{}
816 /// </param> 816 // statement we will decrement it correctly.
817 /// <param name="postExecuteWorkItemCallback"> 817 bInUseWorkerThreadsWasIncremented = true;
818 /// A delegate to call after the callback completion 818
819 /// </param> 819 workItem.FireWorkItemStarted();
820 /// <returns>Returns a work item result</returns> 820
821 public IWorkItemResult QueueWorkItem( 821 ExecuteWorkItem(workItem);
822 WorkItemCallback callback, 822 }
823 object state, 823 catch(Exception ex)
824 PostExecuteWorkItemCallback postExecuteWorkItemCallback) 824 {
825 { 825 ex.GetHashCode();
826 ValidateNotDisposed(); 826 // Do nothing
827 ValidateCallback(callback); 827 }
828 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback); 828 finally
829 Enqueue(workItem); 829 {
830 return workItem.GetWorkItemResult(); 830 workItem.DisposeOfState();
831 } 831
832 832 // Set the CurrentWorkItem to null, since we
833 /// <summary> 833 // no longer run user's code.
834 /// Queue a work item 834 CurrentThreadEntry.CurrentWorkItem = null;
835 /// </summary> 835
836 /// <param name="callback">A callback to execute</param> 836 // Decrement the _inUseWorkerThreads only if we had
837 /// <param name="state"> 837 // incremented it. Note the cancelled work items don't
838 /// The context object of the work item. Used for passing arguments to the work item. 838 // increment _inUseWorkerThreads.
839 /// </param> 839 if (bInUseWorkerThreadsWasIncremented)
840 /// <param name="postExecuteWorkItemCallback"> 840 {
841 /// A delegate to call after the callback completion 841 int inUseWorkerThreads = Interlocked.Decrement(ref _inUseWorkerThreads);
842 /// </param> 842 _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
843 /// <param name="workItemPriority">The work item priority</param> 843 _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
844 /// <returns>Returns a work item result</returns> 844 }
845 public IWorkItemResult QueueWorkItem( 845
846 WorkItemCallback callback, 846 // Notify that the work item has been completed.
847 object state, 847 // WorkItemsGroup may enqueue their next work item.
848 PostExecuteWorkItemCallback postExecuteWorkItemCallback, 848 workItem.FireWorkItemCompleted();
849 WorkItemPriority workItemPriority) 849
850 { 850 // Decrement the number of work items here so the idle
851 ValidateNotDisposed(); 851 // ManualResetEvent won't fluctuate.
852 ValidateCallback(callback); 852 DecrementWorkItemsCount();
853 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority); 853 }
854 Enqueue(workItem); 854 }
855 return workItem.GetWorkItemResult(); 855 }
856 } 856 catch(ThreadAbortException tae)
857 857 {
858 /// <summary> 858 tae.GetHashCode();
859 /// Queue a work item 859 // Handle the abort exception gracfully.
860 /// </summary> 860#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
861 /// <param name="callback">A callback to execute</param> 861 Thread.ResetAbort();
862 /// <param name="state"> 862#endif
863 /// The context object of the work item. Used for passing arguments to the work item. 863 }
864 /// </param> 864 catch(Exception e)
865 /// <param name="postExecuteWorkItemCallback"> 865 {
866 /// A delegate to call after the callback completion 866 Debug.Assert(null != e);
867 /// </param> 867 }
868 /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param> 868 finally
869 /// <returns>Returns a work item result</returns> 869 {
870 public IWorkItemResult QueueWorkItem( 870 InformCompleted();
871 WorkItemCallback callback, 871 FireOnThreadTermination();
872 object state, 872 }
873 PostExecuteWorkItemCallback postExecuteWorkItemCallback, 873 }
874 CallToPostExecute callToPostExecute) 874
875 { 875 private void ExecuteWorkItem(WorkItem workItem)
876 ValidateNotDisposed(); 876 {
877 ValidateCallback(callback); 877 _windowsPCs.SampleWorkItemsWaitTime(workItem.WaitingTime);
878 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute); 878 _localPCs.SampleWorkItemsWaitTime(workItem.WaitingTime);
879 Enqueue(workItem); 879 try
880 return workItem.GetWorkItemResult(); 880 {
881 } 881 workItem.Execute();
882 882 }
883 /// <summary> 883 finally
884 /// Queue a work item 884 {
885 /// </summary> 885 _windowsPCs.SampleWorkItemsProcessTime(workItem.ProcessTime);
886 /// <param name="callback">A callback to execute</param> 886 _localPCs.SampleWorkItemsProcessTime(workItem.ProcessTime);
887 /// <param name="state"> 887 }
888 /// The context object of the work item. Used for passing arguments to the work item. 888 }
889 /// </param> 889
890 /// <param name="postExecuteWorkItemCallback"> 890
891 /// A delegate to call after the callback completion 891 #endregion
892 /// </param> 892
893 /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param> 893 #region Public Methods
894 /// <param name="workItemPriority">The work item priority</param> 894
895 /// <returns>Returns a work item result</returns> 895 private void ValidateWaitForIdle()
896 public IWorkItemResult QueueWorkItem( 896 {
897 WorkItemCallback callback, 897 if (null != CurrentThreadEntry && CurrentThreadEntry.AssociatedSmartThreadPool == this)
898 object state, 898 {
899 PostExecuteWorkItemCallback postExecuteWorkItemCallback, 899 throw new NotSupportedException(
900 CallToPostExecute callToPostExecute, 900 "WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
901 WorkItemPriority workItemPriority) 901 }
902 { 902 }
903 ValidateNotDisposed(); 903
904 ValidateCallback(callback); 904 internal static void ValidateWorkItemsGroupWaitForIdle(IWorkItemsGroup workItemsGroup)
905 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority); 905 {
906 Enqueue(workItem); 906 if (null == CurrentThreadEntry)
907 return workItem.GetWorkItemResult(); 907 {
908 } 908 return;
909 909 }
910 /// <summary> 910
911 /// Wait for the thread pool to be idle 911 WorkItem workItem = CurrentThreadEntry.CurrentWorkItem;
912 /// </summary> 912 ValidateWorkItemsGroupWaitForIdleImpl(workItemsGroup, workItem);
913 public void WaitForIdle() 913 if ((null != workItemsGroup) &&
914 { 914 (null != workItem) &&
915 WaitForIdle(Timeout.Infinite); 915 CurrentThreadEntry.CurrentWorkItem.WasQueuedBy(workItemsGroup))
916 } 916 {
917 917 throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
918 /// <summary> 918 }
919 /// Wait for the thread pool to be idle 919 }
920 /// </summary> 920
921 public bool WaitForIdle(TimeSpan timeout) 921 [MethodImpl(MethodImplOptions.NoInlining)]
922 { 922 private static void ValidateWorkItemsGroupWaitForIdleImpl(IWorkItemsGroup workItemsGroup, WorkItem workItem)
923 return WaitForIdle((int)timeout.TotalMilliseconds); 923 {
924 } 924 if ((null != workItemsGroup) &&
925 925 (null != workItem) &&
926 /// <summary> 926 workItem.WasQueuedBy(workItemsGroup))
927 /// Wait for the thread pool to be idle 927 {
928 /// </summary> 928 throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
929 public bool WaitForIdle(int millisecondsTimeout) 929 }
930 { 930 }
931 ValidateWaitForIdle(); 931
932 return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false); 932 /// <summary>
933 } 933 /// Force the SmartThreadPool to shutdown
934 934 /// </summary>
935 private void ValidateWaitForIdle() 935 public void Shutdown()
936 { 936 {
937 if(_smartThreadPool == this) 937 Shutdown(true, 0);
938 { 938 }
939 throw new NotSupportedException( 939
940 "WaitForIdle cannot be called from a thread on its SmartThreadPool, it will cause may cause a deadlock"); 940 /// <summary>
941 } 941 /// Force the SmartThreadPool to shutdown with timeout
942 } 942 /// </summary>
943 943 public void Shutdown(bool forceAbort, TimeSpan timeout)
944 internal void ValidateWorkItemsGroupWaitForIdle(IWorkItemsGroup workItemsGroup) 944 {
945 { 945 Shutdown(forceAbort, (int)timeout.TotalMilliseconds);
946 ValidateWorkItemsGroupWaitForIdleImpl(workItemsGroup, SmartThreadPool._currentWorkItem); 946 }
947 if ((null != workItemsGroup) && 947
948 (null != SmartThreadPool._currentWorkItem) && 948 /// <summary>
949 SmartThreadPool._currentWorkItem.WasQueuedBy(workItemsGroup)) 949 /// Empties the queue of work items and abort the threads in the pool.
950 { 950 /// </summary>
951 throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it will cause may cause a deadlock"); 951 public void Shutdown(bool forceAbort, int millisecondsTimeout)
952 } 952 {
953 } 953 ValidateNotDisposed();
954 954
955 [MethodImpl(MethodImplOptions.NoInlining)] 955 ISTPInstancePerformanceCounters pcs = _windowsPCs;
956 private void ValidateWorkItemsGroupWaitForIdleImpl(IWorkItemsGroup workItemsGroup, WorkItem workItem) 956
957 { 957 if (NullSTPInstancePerformanceCounters.Instance != _windowsPCs)
958 if ((null != workItemsGroup) && 958 {
959 (null != workItem) && 959 // Set the _pcs to "null" to stop updating the performance
960 workItem.WasQueuedBy(workItemsGroup)) 960 // counters
961 { 961 _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
962 throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it will cause may cause a deadlock"); 962
963 } 963 pcs.Dispose();
964 } 964 }
965 965
966 966 Thread [] threads;
967 967 lock(_workerThreads.SyncRoot)
968 /// <summary> 968 {
969 /// Force the SmartThreadPool to shutdown 969 // Shutdown the work items queue
970 /// </summary> 970 _workItemsQueue.Dispose();
971 public void Shutdown() 971
972 { 972 // Signal the threads to exit
973 Shutdown(true, 0); 973 _shutdown = true;
974 } 974 _shuttingDownEvent.Set();
975 975
976 public void Shutdown(bool forceAbort, TimeSpan timeout) 976 // Make a copy of the threads' references in the pool
977 { 977 threads = new Thread [_workerThreads.Count];
978 Shutdown(forceAbort, (int)timeout.TotalMilliseconds); 978 _workerThreads.Keys.CopyTo(threads, 0);
979 } 979 }
980 980
981 /// <summary> 981 int millisecondsLeft = millisecondsTimeout;
982 /// Empties the queue of work items and abort the threads in the pool. 982 Stopwatch stopwatch = Stopwatch.StartNew();
983 /// </summary> 983 //DateTime start = DateTime.UtcNow;
984 public void Shutdown(bool forceAbort, int millisecondsTimeout) 984 bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout);
985 { 985 bool timeout = false;
986 ValidateNotDisposed(); 986
987 987 // Each iteration we update the time left for the timeout.
988 ISTPInstancePerformanceCounters pcs = _pcs; 988 foreach(Thread thread in threads)
989 989 {
990 if (NullSTPInstancePerformanceCounters.Instance != _pcs) 990 // Join don't work with negative numbers
991 { 991 if (!waitInfinitely && (millisecondsLeft < 0))
992 _pcs.Dispose(); 992 {
993 // Set the _pcs to "null" to stop updating the performance 993 timeout = true;
994 // counters 994 break;
995 _pcs = NullSTPInstancePerformanceCounters.Instance; 995 }
996 } 996
997 997 // Wait for the thread to terminate
998 Thread [] threads = null; 998 bool success = thread.Join(millisecondsLeft);
999 lock(_workerThreads.SyncRoot) 999 if(!success)
1000 { 1000 {
1001 // Shutdown the work items queue 1001 timeout = true;
1002 _workItemsQueue.Dispose(); 1002 break;
1003 1003 }
1004 // Signal the threads to exit 1004
1005 _shutdown = true; 1005 if(!waitInfinitely)
1006 _shuttingDownEvent.Set(); 1006 {
1007 1007 // Update the time left to wait
1008 // Make a copy of the threads' references in the pool 1008 //TimeSpan ts = DateTime.UtcNow - start;
1009 threads = new Thread [_workerThreads.Count]; 1009 millisecondsLeft = millisecondsTimeout - (int)stopwatch.ElapsedMilliseconds;
1010 _workerThreads.Keys.CopyTo(threads, 0); 1010 }
1011 } 1011 }
1012 1012
1013 int millisecondsLeft = millisecondsTimeout; 1013 if (timeout && forceAbort)
1014 DateTime start = DateTime.Now; 1014 {
1015 bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout); 1015 // Abort the threads in the pool
1016 bool timeout = false; 1016 foreach(Thread thread in threads)
1017 1017 {
1018 // Each iteration we update the time left for the timeout. 1018
1019 foreach(Thread thread in threads) 1019 if ((thread != null)
1020 { 1020#if !(_WINDOWS_CE)
1021 // Join don't work with negative numbers 1021 && thread.IsAlive
1022 if (!waitInfinitely && (millisecondsLeft < 0)) 1022#endif
1023 { 1023 )
1024 timeout = true; 1024 {
1025 break; 1025 try
1026 } 1026 {
1027 1027 thread.Abort(); // Shutdown
1028 // Wait for the thread to terminate 1028 }
1029 bool success = thread.Join(millisecondsLeft); 1029 catch(SecurityException e)
1030 if(!success) 1030 {
1031 { 1031 e.GetHashCode();
1032 timeout = true; 1032 }
1033 break; 1033 catch(ThreadStateException ex)
1034 } 1034 {
1035 1035 ex.GetHashCode();
1036 if(!waitInfinitely) 1036 // In case the thread has been terminated
1037 { 1037 // after the check if it is alive.
1038 // Update the time left to wait 1038 }
1039 TimeSpan ts = DateTime.Now - start; 1039 }
1040 millisecondsLeft = millisecondsTimeout - (int)ts.TotalMilliseconds; 1040 }
1041 } 1041 }
1042 } 1042 }
1043 1043
1044 if (timeout && forceAbort) 1044 /// <summary>
1045 { 1045 /// Wait for all work items to complete
1046 // Abort the threads in the pool 1046 /// </summary>
1047 foreach(Thread thread in threads) 1047 /// <param name="waitableResults">Array of work item result objects</param>
1048 { 1048 /// <returns>
1049 if ((thread != null) && thread.IsAlive) 1049 /// true when every work item in workItemResults has completed; otherwise false.
1050 { 1050 /// </returns>
1051 try 1051 public static bool WaitAll(
1052 { 1052 IWaitableResult [] waitableResults)
1053 thread.Abort("Shutdown"); 1053 {
1054 } 1054 return WaitAll(waitableResults, Timeout.Infinite, true);
1055 catch(SecurityException e) 1055 }
1056 { 1056
1057 e.GetHashCode(); 1057 /// <summary>
1058 } 1058 /// Wait for all work items to complete
1059 catch(ThreadStateException ex) 1059 /// </summary>
1060 { 1060 /// <param name="waitableResults">Array of work item result objects</param>
1061 ex.GetHashCode(); 1061 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1062 // In case the thread has been terminated 1062 /// <param name="exitContext">
1063 // after the check if it is alive. 1063 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1064 } 1064 /// </param>
1065 } 1065 /// <returns>
1066 } 1066 /// true when every work item in workItemResults has completed; otherwise false.
1067 } 1067 /// </returns>
1068 1068 public static bool WaitAll(
1069 // Dispose of the performance counters 1069 IWaitableResult [] waitableResults,
1070 pcs.Dispose(); 1070 TimeSpan timeout,
1071 } 1071 bool exitContext)
1072 1072 {
1073 /// <summary> 1073 return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext);
1074 /// Wait for all work items to complete 1074 }
1075 /// </summary> 1075
1076 /// <param name="workItemResults">Array of work item result objects</param> 1076 /// <summary>
1077 /// <returns> 1077 /// Wait for all work items to complete
1078 /// true when every work item in workItemResults has completed; otherwise false. 1078 /// </summary>
1079 /// </returns> 1079 /// <param name="waitableResults">Array of work item result objects</param>
1080 public static bool WaitAll( 1080 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1081 IWorkItemResult [] workItemResults) 1081 /// <param name="exitContext">
1082 { 1082 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1083 return WaitAll(workItemResults, Timeout.Infinite, true); 1083 /// </param>
1084 } 1084 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1085 1085 /// <returns>
1086 /// <summary> 1086 /// true when every work item in workItemResults has completed; otherwise false.
1087 /// Wait for all work items to complete 1087 /// </returns>
1088 /// </summary> 1088 public static bool WaitAll(
1089 /// <param name="workItemResults">Array of work item result objects</param> 1089 IWaitableResult[] waitableResults,
1090 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param> 1090 TimeSpan timeout,
1091 /// <param name="exitContext"> 1091 bool exitContext,
1092 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1092 WaitHandle cancelWaitHandle)
1093 /// </param> 1093 {
1094 /// <returns> 1094 return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle);
1095 /// true when every work item in workItemResults has completed; otherwise false. 1095 }
1096 /// </returns> 1096
1097 public static bool WaitAll( 1097 /// <summary>
1098 IWorkItemResult [] workItemResults, 1098 /// Wait for all work items to complete
1099 TimeSpan timeout, 1099 /// </summary>
1100 bool exitContext) 1100 /// <param name="waitableResults">Array of work item result objects</param>
1101 { 1101 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1102 return WaitAll(workItemResults, (int)timeout.TotalMilliseconds, exitContext); 1102 /// <param name="exitContext">
1103 } 1103 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1104 1104 /// </param>
1105 /// <summary> 1105 /// <returns>
1106 /// Wait for all work items to complete 1106 /// true when every work item in workItemResults has completed; otherwise false.
1107 /// </summary> 1107 /// </returns>
1108 /// <param name="workItemResults">Array of work item result objects</param> 1108 public static bool WaitAll(
1109 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param> 1109 IWaitableResult [] waitableResults,
1110 /// <param name="exitContext"> 1110 int millisecondsTimeout,
1111 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1111 bool exitContext)
1112 /// </param> 1112 {
1113 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param> 1113 return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, null);
1114 /// <returns> 1114 }
1115 /// true when every work item in workItemResults has completed; otherwise false. 1115
1116 /// </returns> 1116 /// <summary>
1117 public static bool WaitAll( 1117 /// Wait for all work items to complete
1118 IWorkItemResult [] workItemResults, 1118 /// </summary>
1119 TimeSpan timeout, 1119 /// <param name="waitableResults">Array of work item result objects</param>
1120 bool exitContext, 1120 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1121 WaitHandle cancelWaitHandle) 1121 /// <param name="exitContext">
1122 { 1122 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1123 return WaitAll(workItemResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle); 1123 /// </param>
1124 } 1124 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1125 1125 /// <returns>
1126 /// <summary> 1126 /// true when every work item in workItemResults has completed; otherwise false.
1127 /// Wait for all work items to complete 1127 /// </returns>
1128 /// </summary> 1128 public static bool WaitAll(
1129 /// <param name="workItemResults">Array of work item result objects</param> 1129 IWaitableResult[] waitableResults,
1130 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param> 1130 int millisecondsTimeout,
1131 /// <param name="exitContext"> 1131 bool exitContext,
1132 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1132 WaitHandle cancelWaitHandle)
1133 /// </param> 1133 {
1134 /// <returns> 1134 return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle);
1135 /// true when every work item in workItemResults has completed; otherwise false. 1135 }
1136 /// </returns> 1136
1137 public static bool WaitAll( 1137
1138 IWorkItemResult [] workItemResults, 1138 /// <summary>
1139 int millisecondsTimeout, 1139 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1140 bool exitContext) 1140 /// </summary>
1141 { 1141 /// <param name="waitableResults">Array of work item result objects</param>
1142 return WorkItem.WaitAll(workItemResults, millisecondsTimeout, exitContext, null); 1142 /// <returns>
1143 } 1143 /// The array index of the work item result that satisfied the wait, or WaitTimeout if any of the work items has been canceled.
1144 1144 /// </returns>
1145 /// <summary> 1145 public static int WaitAny(
1146 /// Wait for all work items to complete 1146 IWaitableResult [] waitableResults)
1147 /// </summary> 1147 {
1148 /// <param name="workItemResults">Array of work item result objects</param> 1148 return WaitAny(waitableResults, Timeout.Infinite, true);
1149 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param> 1149 }
1150 /// <param name="exitContext"> 1150
1151 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1151 /// <summary>
1152 /// </param> 1152 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1153 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param> 1153 /// </summary>
1154 /// <returns> 1154 /// <param name="waitableResults">Array of work item result objects</param>
1155 /// true when every work item in workItemResults has completed; otherwise false. 1155 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1156 /// </returns> 1156 /// <param name="exitContext">
1157 public static bool WaitAll( 1157 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1158 IWorkItemResult [] workItemResults, 1158 /// </param>
1159 int millisecondsTimeout, 1159 /// <returns>
1160 bool exitContext, 1160 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1161 WaitHandle cancelWaitHandle) 1161 /// </returns>
1162 { 1162 public static int WaitAny(
1163 return WorkItem.WaitAll(workItemResults, millisecondsTimeout, exitContext, cancelWaitHandle); 1163 IWaitableResult[] waitableResults,
1164 } 1164 TimeSpan timeout,
1165 1165 bool exitContext)
1166 1166 {
1167 /// <summary> 1167 return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext);
1168 /// Waits for any of the work items in the specified array to complete, cancel, or timeout 1168 }
1169 /// </summary> 1169
1170 /// <param name="workItemResults">Array of work item result objects</param> 1170 /// <summary>
1171 /// <returns> 1171 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1172 /// The array index of the work item result that satisfied the wait, or WaitTimeout if any of the work items has been canceled. 1172 /// </summary>
1173 /// </returns> 1173 /// <param name="waitableResults">Array of work item result objects</param>
1174 public static int WaitAny( 1174 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1175 IWorkItemResult [] workItemResults) 1175 /// <param name="exitContext">
1176 { 1176 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1177 return WaitAny(workItemResults, Timeout.Infinite, true); 1177 /// </param>
1178 } 1178 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1179 1179 /// <returns>
1180 /// <summary> 1180 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1181 /// Waits for any of the work items in the specified array to complete, cancel, or timeout 1181 /// </returns>
1182 /// </summary> 1182 public static int WaitAny(
1183 /// <param name="workItemResults">Array of work item result objects</param> 1183 IWaitableResult [] waitableResults,
1184 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param> 1184 TimeSpan timeout,
1185 /// <param name="exitContext"> 1185 bool exitContext,
1186 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1186 WaitHandle cancelWaitHandle)
1187 /// </param> 1187 {
1188 /// <returns> 1188 return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle);
1189 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. 1189 }
1190 /// </returns> 1190
1191 public static int WaitAny( 1191 /// <summary>
1192 IWorkItemResult [] workItemResults, 1192 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1193 TimeSpan timeout, 1193 /// </summary>
1194 bool exitContext) 1194 /// <param name="waitableResults">Array of work item result objects</param>
1195 { 1195 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1196 return WaitAny(workItemResults, (int)timeout.TotalMilliseconds, exitContext); 1196 /// <param name="exitContext">
1197 } 1197 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1198 1198 /// </param>
1199 /// <summary> 1199 /// <returns>
1200 /// Waits for any of the work items in the specified array to complete, cancel, or timeout 1200 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1201 /// </summary> 1201 /// </returns>
1202 /// <param name="workItemResults">Array of work item result objects</param> 1202 public static int WaitAny(
1203 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param> 1203 IWaitableResult [] waitableResults,
1204 /// <param name="exitContext"> 1204 int millisecondsTimeout,
1205 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1205 bool exitContext)
1206 /// </param> 1206 {
1207 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param> 1207 return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, null);
1208 /// <returns> 1208 }
1209 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. 1209
1210 /// </returns> 1210 /// <summary>
1211 public static int WaitAny( 1211 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1212 IWorkItemResult [] workItemResults, 1212 /// </summary>
1213 TimeSpan timeout, 1213 /// <param name="waitableResults">Array of work item result objects</param>
1214 bool exitContext, 1214 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1215 WaitHandle cancelWaitHandle) 1215 /// <param name="exitContext">
1216 { 1216 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1217 return WaitAny(workItemResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle); 1217 /// </param>
1218 } 1218 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1219 1219 /// <returns>
1220 /// <summary> 1220 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1221 /// Waits for any of the work items in the specified array to complete, cancel, or timeout 1221 /// </returns>
1222 /// </summary> 1222 public static int WaitAny(
1223 /// <param name="workItemResults">Array of work item result objects</param> 1223 IWaitableResult [] waitableResults,
1224 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param> 1224 int millisecondsTimeout,
1225 /// <param name="exitContext"> 1225 bool exitContext,
1226 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1226 WaitHandle cancelWaitHandle)
1227 /// </param> 1227 {
1228 /// <returns> 1228 return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle);
1229 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. 1229 }
1230 /// </returns> 1230
1231 public static int WaitAny( 1231 /// <summary>
1232 IWorkItemResult [] workItemResults, 1232 /// Creates a new WorkItemsGroup.
1233 int millisecondsTimeout, 1233 /// </summary>
1234 bool exitContext) 1234 /// <param name="concurrency">The number of work items that can be run concurrently</param>
1235 { 1235 /// <returns>A reference to the WorkItemsGroup</returns>
1236 return WorkItem.WaitAny(workItemResults, millisecondsTimeout, exitContext, null); 1236 public IWorkItemsGroup CreateWorkItemsGroup(int concurrency)
1237 } 1237 {
1238 1238 IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, _stpStartInfo);
1239 /// <summary> 1239 return workItemsGroup;
1240 /// Waits for any of the work items in the specified array to complete, cancel, or timeout 1240 }
1241 /// </summary> 1241
1242 /// <param name="workItemResults">Array of work item result objects</param> 1242 /// <summary>
1243 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param> 1243 /// Creates a new WorkItemsGroup.
1244 /// <param name="exitContext"> 1244 /// </summary>
1245 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1245 /// <param name="concurrency">The number of work items that can be run concurrently</param>
1246 /// </param> 1246 /// <param name="wigStartInfo">A WorkItemsGroup configuration that overrides the default behavior</param>
1247 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param> 1247 /// <returns>A reference to the WorkItemsGroup</returns>
1248 /// <returns> 1248 public IWorkItemsGroup CreateWorkItemsGroup(int concurrency, WIGStartInfo wigStartInfo)
1249 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. 1249 {
1250 /// </returns> 1250 IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, wigStartInfo);
1251 public static int WaitAny( 1251 return workItemsGroup;
1252 IWorkItemResult [] workItemResults, 1252 }
1253 int millisecondsTimeout, 1253
1254 bool exitContext, 1254 #region Fire Thread's Events
1255 WaitHandle cancelWaitHandle) 1255
1256 { 1256 private void FireOnThreadInitialization()
1257 return WorkItem.WaitAny(workItemResults, millisecondsTimeout, exitContext, cancelWaitHandle); 1257 {
1258 } 1258 if (null != _onThreadInitialization)
1259 1259 {
1260 public IWorkItemsGroup CreateWorkItemsGroup(int concurrency) 1260 foreach (ThreadInitializationHandler tih in _onThreadInitialization.GetInvocationList())
1261 { 1261 {
1262 IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, _stpStartInfo); 1262 try
1263 return workItemsGroup; 1263 {
1264 } 1264 tih();
1265 1265 }
1266 public IWorkItemsGroup CreateWorkItemsGroup(int concurrency, WIGStartInfo wigStartInfo) 1266 catch (Exception e)
1267 { 1267 {
1268 IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, wigStartInfo); 1268 e.GetHashCode();
1269 return workItemsGroup; 1269 Debug.Assert(false);
1270 } 1270 throw;
1271 1271 }
1272 public event WorkItemsGroupIdleHandler OnIdle 1272 }
1273 { 1273 }
1274 add 1274 }
1275 { 1275
1276 throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature."); 1276 private void FireOnThreadTermination()
1277 //_onIdle += value; 1277 {
1278 } 1278 if (null != _onThreadTermination)
1279 remove 1279 {
1280 { 1280 foreach (ThreadTerminationHandler tth in _onThreadTermination.GetInvocationList())
1281 throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature."); 1281 {
1282 //_onIdle -= value; 1282 try
1283 } 1283 {
1284 } 1284 tth();
1285 1285 }
1286 public void Cancel() 1286 catch (Exception e)
1287 { 1287 {
1288 ICollection workItemsGroups = _workItemsGroups.Values; 1288 e.GetHashCode();
1289 foreach(WorkItemsGroup workItemsGroup in workItemsGroups) 1289 Debug.Assert(false);
1290 { 1290 throw;
1291 workItemsGroup.Cancel(); 1291 }
1292 } 1292 }
1293 } 1293 }
1294 1294 }
1295 public void Start() 1295
1296 { 1296 #endregion
1297 lock (this) 1297
1298 { 1298 /// <summary>
1299 if (!this._stpStartInfo.StartSuspended) 1299 /// This event is fired when a thread is created.
1300 { 1300 /// Use it to initialize a thread before the work items use it.
1301 return; 1301 /// </summary>
1302 } 1302 public event ThreadInitializationHandler OnThreadInitialization
1303 _stpStartInfo.StartSuspended = false; 1303 {
1304 } 1304 add { _onThreadInitialization += value; }
1305 1305 remove { _onThreadInitialization -= value; }
1306 ICollection workItemsGroups = _workItemsGroups.Values; 1306 }
1307 foreach(WorkItemsGroup workItemsGroup in workItemsGroups) 1307
1308 { 1308 /// <summary>
1309 workItemsGroup.OnSTPIsStarting(); 1309 /// This event is fired when a thread is terminating.
1310 } 1310 /// Use it for cleanup.
1311 1311 /// </summary>
1312 StartOptimalNumberOfThreads(); 1312 public event ThreadTerminationHandler OnThreadTermination
1313 } 1313 {
1314 1314 add { _onThreadTermination += value; }
1315 #endregion 1315 remove { _onThreadTermination -= value; }
1316 1316 }
1317 #region Properties 1317
1318 1318
1319 /// <summary> 1319 internal void CancelAbortWorkItemsGroup(WorkItemsGroup wig)
1320 /// Get/Set the name of the SmartThreadPool instance 1320 {
1321 /// </summary> 1321 foreach (ThreadEntry threadEntry in _workerThreads.Values)
1322 public string Name 1322 {
1323 { 1323 WorkItem workItem = threadEntry.CurrentWorkItem;
1324 get 1324 if (null != workItem &&
1325 { 1325 workItem.WasQueuedBy(wig) &&
1326 return _name; 1326 !workItem.IsCanceled)
1327 } 1327 {
1328 1328 threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true);
1329 set 1329 }
1330 { 1330 }
1331 _name = value; 1331 }
1332 } 1332
1333 } 1333
1334 1334
1335 /// <summary> 1335 #endregion
1336 /// Get the lower limit of threads in the pool. 1336
1337 /// </summary> 1337 #region Properties
1338 public int MinThreads 1338
1339 { 1339 /// <summary>
1340 get 1340 /// Get/Set the lower limit of threads in the pool.
1341 { 1341 /// </summary>
1342 ValidateNotDisposed(); 1342 public int MinThreads
1343 return _stpStartInfo.MinWorkerThreads; 1343 {
1344 } 1344 get
1345 } 1345 {
1346 1346 ValidateNotDisposed();
1347 /// <summary> 1347 return _stpStartInfo.MinWorkerThreads;
1348 /// Get the upper limit of threads in the pool. 1348 }
1349 /// </summary> 1349 set
1350 public int MaxThreads 1350 {
1351 { 1351 Debug.Assert(value >= 0);
1352 get 1352 Debug.Assert(value <= _stpStartInfo.MaxWorkerThreads);
1353 { 1353 if (_stpStartInfo.MaxWorkerThreads < value)
1354 ValidateNotDisposed(); 1354 {
1355 return _stpStartInfo.MaxWorkerThreads; 1355 _stpStartInfo.MaxWorkerThreads = value;
1356 } 1356 }
1357 } 1357 _stpStartInfo.MinWorkerThreads = value;
1358 /// <summary> 1358 StartOptimalNumberOfThreads();
1359 /// Get the number of threads in the thread pool. 1359 }
1360 /// Should be between the lower and the upper limits. 1360 }
1361 /// </summary> 1361
1362 public int ActiveThreads 1362 /// <summary>
1363 { 1363 /// Get/Set the upper limit of threads in the pool.
1364 get 1364 /// </summary>
1365 { 1365 public int MaxThreads
1366 ValidateNotDisposed(); 1366 {
1367 return _workerThreads.Count; 1367 get
1368 } 1368 {
1369 } 1369 ValidateNotDisposed();
1370 1370 return _stpStartInfo.MaxWorkerThreads;
1371 /// <summary> 1371 }
1372 /// Get the number of busy (not idle) threads in the thread pool. 1372
1373 /// </summary> 1373 set
1374 public int InUseThreads 1374 {
1375 { 1375 Debug.Assert(value > 0);
1376 get 1376 Debug.Assert(value >= _stpStartInfo.MinWorkerThreads);
1377 { 1377 if (_stpStartInfo.MinWorkerThreads > value)
1378 ValidateNotDisposed(); 1378 {
1379 return _inUseWorkerThreads; 1379 _stpStartInfo.MinWorkerThreads = value;
1380 } 1380 }
1381 } 1381 _stpStartInfo.MaxWorkerThreads = value;
1382 1382 StartOptimalNumberOfThreads();
1383 /// <summary> 1383 }
1384 /// Get the number of work items in the queue. 1384 }
1385 /// </summary> 1385 /// <summary>
1386 public int WaitingCallbacks 1386 /// Get the number of threads in the thread pool.
1387 { 1387 /// Should be between the lower and the upper limits.
1388 get 1388 /// </summary>
1389 { 1389 public int ActiveThreads
1390 ValidateNotDisposed(); 1390 {
1391 return _workItemsQueue.Count; 1391 get
1392 } 1392 {
1393 } 1393 ValidateNotDisposed();
1394 1394 return _workerThreads.Count;
1395 1395 }
1396 public event EventHandler Idle 1396 }
1397 { 1397
1398 add 1398 /// <summary>
1399 { 1399 /// Get the number of busy (not idle) threads in the thread pool.
1400 _stpIdle += value; 1400 /// </summary>
1401 } 1401 public int InUseThreads
1402 1402 {
1403 remove 1403 get
1404 { 1404 {
1405 _stpIdle -= value; 1405 ValidateNotDisposed();
1406 } 1406 return _inUseWorkerThreads;
1407 } 1407 }
1408 1408 }
1409 #endregion 1409
1410 1410 /// <summary>
1411 #region IDisposable Members 1411 /// Returns true if the current running work item has been cancelled.
1412 1412 /// Must be used within the work item's callback method.
1413// ~SmartThreadPool() 1413 /// The work item should sample this value in order to know if it
1414// { 1414 /// needs to quit before its completion.
1415// Dispose(); 1415 /// </summary>
1416// } 1416 public static bool IsWorkItemCanceled
1417 1417 {
1418 public void Dispose() 1418 get
1419 { 1419 {
1420 if (!_isDisposed) 1420 return CurrentThreadEntry.CurrentWorkItem.IsCanceled;
1421 { 1421 }
1422 if (!_shutdown) 1422 }
1423 { 1423
1424 Shutdown(); 1424 /// <summary>
1425 } 1425 /// Checks if the work item has been cancelled, and if yes then abort the thread.
1426 1426 /// Can be used with Cancel and timeout
1427 if (null != _shuttingDownEvent) 1427 /// </summary>
1428 { 1428 public static void AbortOnWorkItemCancel()
1429 _shuttingDownEvent.Close(); 1429 {
1430 _shuttingDownEvent = null; 1430 if (IsWorkItemCanceled)
1431 } 1431 {
1432 _workerThreads.Clear(); 1432 Thread.CurrentThread.Abort();
1433 _isDisposed = true; 1433 }
1434 GC.SuppressFinalize(this); 1434 }
1435 } 1435
1436 } 1436 /// <summary>
1437 1437 /// Thread Pool start information (readonly)
1438 private void ValidateNotDisposed() 1438 /// </summary>
1439 { 1439 public STPStartInfo STPStartInfo
1440 if(_isDisposed) 1440 {
1441 { 1441 get
1442 throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown"); 1442 {
1443 } 1443 return _stpStartInfo.AsReadOnly();
1444 } 1444 }
1445 #endregion 1445 }
1446 } 1446
1447 #endregion 1447 public bool IsShuttingdown
1448} 1448 {
1449 get { return _shutdown; }
1450 }
1451
1452 /// <summary>
1453 /// Return the local calculated performance counters
1454 /// Available only if STPStartInfo.EnableLocalPerformanceCounters is true.
1455 /// </summary>
1456 public ISTPPerformanceCountersReader PerformanceCountersReader
1457 {
1458 get { return (ISTPPerformanceCountersReader)_localPCs; }
1459 }
1460
1461 #endregion
1462
1463 #region IDisposable Members
1464
1465 public void Dispose()
1466 {
1467 if (!_isDisposed)
1468 {
1469 if (!_shutdown)
1470 {
1471 Shutdown();
1472 }
1473
1474 if (null != _shuttingDownEvent)
1475 {
1476 _shuttingDownEvent.Close();
1477 _shuttingDownEvent = null;
1478 }
1479 _workerThreads.Clear();
1480
1481 if (null != _isIdleWaitHandle)
1482 {
1483 _isIdleWaitHandle.Close();
1484 _isIdleWaitHandle = null;
1485 }
1486
1487 _isDisposed = true;
1488 }
1489 }
1490
1491 private void ValidateNotDisposed()
1492 {
1493 if(_isDisposed)
1494 {
1495 throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
1496 }
1497 }
1498 #endregion
1499
1500 #region WorkItemsGroupBase Overrides
1501
1502 /// <summary>
1503 /// Get/Set the maximum number of work items that execute cocurrency on the thread pool
1504 /// </summary>
1505 public override int Concurrency
1506 {
1507 get { return MaxThreads; }
1508 set { MaxThreads = value; }
1509 }
1510
1511 /// <summary>
1512 /// Get the number of work items in the queue.
1513 /// </summary>
1514 public override int WaitingCallbacks
1515 {
1516 get
1517 {
1518 ValidateNotDisposed();
1519 return _workItemsQueue.Count;
1520 }
1521 }
1522
1523 /// <summary>
1524 /// Get an array with all the state objects of the currently running items.
1525 /// The array represents a snap shot and impact performance.
1526 /// </summary>
1527 public override object[] GetStates()
1528 {
1529 object[] states = _workItemsQueue.GetStates();
1530 return states;
1531 }
1532
1533 /// <summary>
1534 /// WorkItemsGroup start information (readonly)
1535 /// </summary>
1536 public override WIGStartInfo WIGStartInfo
1537 {
1538 get { return _stpStartInfo.AsReadOnly(); }
1539 }
1540
1541 /// <summary>
1542 /// Start the thread pool if it was started suspended.
1543 /// If it is already running, this method is ignored.
1544 /// </summary>
1545 public override void Start()
1546 {
1547 if (!_isSuspended)
1548 {
1549 return;
1550 }
1551 _isSuspended = false;
1552
1553 ICollection workItemsGroups = _workItemsGroups.Values;
1554 foreach (WorkItemsGroup workItemsGroup in workItemsGroups)
1555 {
1556 workItemsGroup.OnSTPIsStarting();
1557 }
1558
1559 StartOptimalNumberOfThreads();
1560 }
1561
1562 /// <summary>
1563 /// Cancel all work items using thread abortion
1564 /// </summary>
1565 /// <param name="abortExecution">True to stop work items by raising ThreadAbortException</param>
1566 public override void Cancel(bool abortExecution)
1567 {
1568 _canceledSmartThreadPool.IsCanceled = true;
1569 _canceledSmartThreadPool = new CanceledWorkItemsGroup();
1570
1571 ICollection workItemsGroups = _workItemsGroups.Values;
1572 foreach (WorkItemsGroup workItemsGroup in workItemsGroups)
1573 {
1574 workItemsGroup.Cancel(abortExecution);
1575 }
1576
1577 if (abortExecution)
1578 {
1579 foreach (ThreadEntry threadEntry in _workerThreads.Values)
1580 {
1581 WorkItem workItem = threadEntry.CurrentWorkItem;
1582 if (null != workItem &&
1583 threadEntry.AssociatedSmartThreadPool == this &&
1584 !workItem.IsCanceled)
1585 {
1586 threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true);
1587 }
1588 }
1589 }
1590 }
1591
1592 /// <summary>
1593 /// Wait for the thread pool to be idle
1594 /// </summary>
1595 public override bool WaitForIdle(int millisecondsTimeout)
1596 {
1597 ValidateWaitForIdle();
1598 return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
1599 }
1600
1601 /// <summary>
1602 /// This event is fired when all work items are completed.
1603 /// (When IsIdle changes to true)
1604 /// This event only work on WorkItemsGroup. On SmartThreadPool
1605 /// it throws the NotImplementedException.
1606 /// </summary>
1607 public override event WorkItemsGroupIdleHandler OnIdle
1608 {
1609 add
1610 {
1611 throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature.");
1612 //_onIdle += value;
1613 }
1614 remove
1615 {
1616 throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature.");
1617 //_onIdle -= value;
1618 }
1619 }
1620
1621 internal override void PreQueueWorkItem()
1622 {
1623 ValidateNotDisposed();
1624 }
1625
1626 #endregion
1627
1628 #region Join, Choice, Pipe, etc.
1629
1630 /// <summary>
1631 /// Executes all actions in parallel.
1632 /// Returns when they all finish.
1633 /// </summary>
1634 /// <param name="actions">Actions to execute</param>
1635 public void Join(IEnumerable<Action> actions)
1636 {
1637 WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
1638 IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo);
1639 foreach (Action action in actions)
1640 {
1641 workItemsGroup.QueueWorkItem(action);
1642 }
1643 workItemsGroup.Start();
1644 workItemsGroup.WaitForIdle();
1645 }
1646
1647 /// <summary>
1648 /// Executes all actions in parallel.
1649 /// Returns when they all finish.
1650 /// </summary>
1651 /// <param name="actions">Actions to execute</param>
1652 public void Join(params Action[] actions)
1653 {
1654 Join((IEnumerable<Action>)actions);
1655 }
1656
1657 private class ChoiceIndex
1658 {
1659 public int _index = -1;
1660 }
1661
1662 /// <summary>
1663 /// Executes all actions in parallel
1664 /// Returns when the first one completes
1665 /// </summary>
1666 /// <param name="actions">Actions to execute</param>
1667 public int Choice(IEnumerable<Action> actions)
1668 {
1669 WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
1670 IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo);
1671
1672 ManualResetEvent anActionCompleted = new ManualResetEvent(false);
1673
1674 ChoiceIndex choiceIndex = new ChoiceIndex();
1675
1676 int i = 0;
1677 foreach (Action action in actions)
1678 {
1679 Action act = action;
1680 int value = i;
1681 workItemsGroup.QueueWorkItem(() => { act(); Interlocked.CompareExchange(ref choiceIndex._index, value, -1); anActionCompleted.Set(); });
1682 ++i;
1683 }
1684 workItemsGroup.Start();
1685 anActionCompleted.WaitOne();
1686
1687 return choiceIndex._index;
1688 }
1689
1690 /// <summary>
1691 /// Executes all actions in parallel
1692 /// Returns when the first one completes
1693 /// </summary>
1694 /// <param name="actions">Actions to execute</param>
1695 public int Choice(params Action[] actions)
1696 {
1697 return Choice((IEnumerable<Action>)actions);
1698 }
1699
1700 /// <summary>
1701 /// Executes actions in sequence asynchronously.
1702 /// Returns immediately.
1703 /// </summary>
1704 /// <param name="pipeState">A state context that passes </param>
1705 /// <param name="actions">Actions to execute in the order they should run</param>
1706 public void Pipe<T>(T pipeState, IEnumerable<Action<T>> actions)
1707 {
1708 WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
1709 IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(1, wigStartInfo);
1710 foreach (Action<T> action in actions)
1711 {
1712 Action<T> act = action;
1713 workItemsGroup.QueueWorkItem(() => act(pipeState));
1714 }
1715 workItemsGroup.Start();
1716 workItemsGroup.WaitForIdle();
1717 }
1718
1719 /// <summary>
1720 /// Executes actions in sequence asynchronously.
1721 /// Returns immediately.
1722 /// </summary>
1723 /// <param name="pipeState"></param>
1724 /// <param name="actions">Actions to execute in the order they should run</param>
1725 public void Pipe<T>(T pipeState, params Action<T>[] actions)
1726 {
1727 Pipe(pipeState, (IEnumerable<Action<T>>)actions);
1728 }
1729 #endregion
1730 }
1731 #endregion
1732}