aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/ThirdParty/SmartThreadPool/SmartThreadPool.cs
diff options
context:
space:
mode:
Diffstat (limited to 'ThirdParty/SmartThreadPool/SmartThreadPool.cs')
-rw-r--r--ThirdParty/SmartThreadPool/SmartThreadPool.cs2562
1 files changed, 1423 insertions, 1139 deletions
diff --git a/ThirdParty/SmartThreadPool/SmartThreadPool.cs b/ThirdParty/SmartThreadPool/SmartThreadPool.cs
index 19a0007..a4f4ce5 100644
--- a/ThirdParty/SmartThreadPool/SmartThreadPool.cs
+++ b/ThirdParty/SmartThreadPool/SmartThreadPool.cs
@@ -1,61 +1,106 @@
1// Ami Bar 1#region Release History
2// amibar@gmail.com 2
3// 3// Smart Thread Pool
4// Smart thread pool in C#.
5// 7 Aug 2004 - Initial release 4// 7 Aug 2004 - Initial release
5//
6// 14 Sep 2004 - Bug fixes 6// 14 Sep 2004 - Bug fixes
7//
7// 15 Oct 2004 - Added new features 8// 15 Oct 2004 - Added new features
8// - Work items return result. 9// - Work items return result.
9// - Support waiting synchronization for multiple work items. 10// - Support waiting synchronization for multiple work items.
10// - Work items can be cancelled. 11// - Work items can be cancelled.
11// - Passage of the caller thread’s context to the thread in the pool. 12// - Passage of the caller thread’s context to the thread in the pool.
12// - Minimal usage of WIN32 handles. 13// - Minimal usage of WIN32 handles.
13// - Minor bug fixes. 14// - Minor bug fixes.
15//
14// 26 Dec 2004 - Changes: 16// 26 Dec 2004 - Changes:
15// - Removed static constructors. 17// - Removed static constructors.
16// - Added finalizers. 18// - Added finalizers.
17// - Changed Exceptions so they are serializable. 19// - Changed Exceptions so they are serializable.
18// - Fixed the bug in one of the SmartThreadPool constructors. 20// - Fixed the bug in one of the SmartThreadPool constructors.
19// - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters. 21// - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters.
20// The SmartThreadPool.WaitAny() is still limited by the .NET Framework. 22// The SmartThreadPool.WaitAny() is still limited by the .NET Framework.
21// - Added PostExecute with options on which cases to call it. 23// - Added PostExecute with options on which cases to call it.
22// - Added option to dispose of the state objects. 24// - Added option to dispose of the state objects.
23// - Added a WaitForIdle() method that waits until the work items queue is empty. 25// - Added a WaitForIdle() method that waits until the work items queue is empty.
24// - Added an STPStartInfo class for the initialization of the thread pool. 26// - Added an STPStartInfo class for the initialization of the thread pool.
25// - Changed exception handling so if a work item throws an exception it 27// - Changed exception handling so if a work item throws an exception it
26// is rethrown at GetResult(), rather then firing an UnhandledException event. 28// is rethrown at GetResult(), rather then firing an UnhandledException event.
27// Note that PostExecute exception are always ignored. 29// Note that PostExecute exception are always ignored.
30//
28// 25 Mar 2005 - Changes: 31// 25 Mar 2005 - Changes:
29// - Fixed lost of work items bug 32// - Fixed lost of work items bug
33//
30// 3 Jul 2005: Changes. 34// 3 Jul 2005: Changes.
31// - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, hardly reconstructed. 35// - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, hardly reconstructed.
36//
32// 16 Aug 2005: Changes. 37// 16 Aug 2005: Changes.
33// - Fixed bug where the InUseThreads becomes negative when canceling work items. 38// - Fixed bug where the InUseThreads becomes negative when canceling work items.
34// 39//
35// 31 Jan 2006 - Changes: 40// 31 Jan 2006 - Changes:
36// - Added work items priority 41// - Added work items priority
37// - Removed support of chained delegates in callbacks and post executes (nobody really use this) 42// - Removed support of chained delegates in callbacks and post executes (nobody really use this)
38// - Added work items groups 43// - Added work items groups
39// - Added work items groups idle event 44// - Added work items groups idle event
40// - Changed SmartThreadPool.WaitAll() behavior so when it gets empty array 45// - Changed SmartThreadPool.WaitAll() behavior so when it gets empty array
41// it returns true rather then throwing an exception. 46// it returns true rather then throwing an exception.
42// - Added option to start the STP and the WIG as suspended 47// - Added option to start the STP and the WIG as suspended
43// - Exception behavior changed, the real exception is returned by an 48// - Exception behavior changed, the real exception is returned by an
44// inner exception 49// inner exception
45// - Added option to keep the Http context of the caller thread. (Thanks to Steven T.) 50// - Added option to keep the Http context of the caller thread. (Thanks to Steven T.)
46// - Added performance counters 51// - Added performance counters
47// - Added priority to the threads in the pool 52// - Added priority to the threads in the pool
48// 53//
49// 13 Feb 2006 - Changes: 54// 13 Feb 2006 - Changes:
50// - Added a call to the dispose of the Performance Counter so 55// - Added a call to the dispose of the Performance Counter so
51// their won't be a Performance Counter leak. 56// their won't be a Performance Counter leak.
52// - Added exception catch in case the Performance Counters cannot 57// - Added exception catch in case the Performance Counters cannot
53// be created. 58// be created.
59//
60// 17 May 2008 - Changes:
61// - Changed the dispose behavior and removed the Finalizers.
62// - Enabled the change of the MaxThreads and MinThreads at run time.
63// - Enabled the change of the Concurrency of a IWorkItemsGroup at run
64// time If the IWorkItemsGroup is a SmartThreadPool then the Concurrency
65// refers to the MaxThreads.
66// - Improved the cancel behavior.
67// - Added events for thread creation and termination.
68// - Fixed the HttpContext context capture.
69// - Changed internal collections so they use generic collections
70// - Added IsIdle flag to the SmartThreadPool and IWorkItemsGroup
71// - Added support for WinCE
72// - Added support for Action<T> and Func<T>
73//
74// 07 April 2009 - Changes:
75// - Added support for Silverlight and Mono
76// - Added Join, Choice, and Pipe to SmartThreadPool.
77// - Added local performance counters (for Mono, Silverlight, and WindowsCE)
78// - Changed duration measures from DateTime.Now to Stopwatch.
79// - Queues changed from System.Collections.Queue to System.Collections.Generic.LinkedList<T>.
80//
81// 21 December 2009 - Changes:
82// - Added work item timeout (passive)
83//
84// 20 August 2012 - Changes:
85// - Added set name to threads
86// - Fixed the WorkItemsQueue.Dequeue.
87// Replaced while (!Monitor.TryEnter(this)); with lock(this) { ... }
88// - Fixed SmartThreadPool.Pipe
89// - Added IsBackground option to threads
90// - Added ApartmentState to threads
91// - Fixed thread creation when queuing many work items at the same time.
92//
93// 24 August 2012 - Changes:
94// - Enabled cancel abort after cancel. See: http://smartthreadpool.codeplex.com/discussions/345937 by alecswan
95// - Added option to set MaxStackSize of threads
96
97#endregion
54 98
55using System; 99using System;
56using System.Security; 100using System.Security;
57using System.Threading; 101using System.Threading;
58using System.Collections; 102using System.Collections;
103using System.Collections.Generic;
59using System.Diagnostics; 104using System.Diagnostics;
60using System.Runtime.CompilerServices; 105using System.Runtime.CompilerServices;
61 106
@@ -63,1213 +108,1503 @@ using Amib.Threading.Internal;
63 108
64namespace Amib.Threading 109namespace Amib.Threading
65{ 110{
66 #region SmartThreadPool class 111 #region SmartThreadPool class
67 /// <summary> 112 /// <summary>
68 /// Smart thread pool class. 113 /// Smart thread pool class.
69 /// </summary> 114 /// </summary>
70 public class SmartThreadPool : IWorkItemsGroup, IDisposable 115 public partial class SmartThreadPool : WorkItemsGroupBase, IDisposable
71 { 116 {
72 #region Default Constants 117 #region Public Default Constants
73 118
74 /// <summary> 119 /// <summary>
75 /// Default minimum number of threads the thread pool contains. (0) 120 /// Default minimum number of threads the thread pool contains. (0)
76 /// </summary> 121 /// </summary>
77 public const int DefaultMinWorkerThreads = 0; 122 public const int DefaultMinWorkerThreads = 0;
78 123
79 /// <summary> 124 /// <summary>
80 /// Default maximum number of threads the thread pool contains. (25) 125 /// Default maximum number of threads the thread pool contains. (25)
81 /// </summary> 126 /// </summary>
82 public const int DefaultMaxWorkerThreads = 25; 127 public const int DefaultMaxWorkerThreads = 25;
83 128
84 /// <summary> 129 /// <summary>
85 /// Default idle timeout in milliseconds. (One minute) 130 /// Default idle timeout in milliseconds. (One minute)
131 /// </summary>
132 public const int DefaultIdleTimeout = 60*1000; // One minute
133
134 /// <summary>
135 /// Indicate to copy the security context of the caller and then use it in the call. (false)
136 /// </summary>
137 public const bool DefaultUseCallerCallContext = false;
138
139 /// <summary>
140 /// Indicate to copy the HTTP context of the caller and then use it in the call. (false)
141 /// </summary>
142 public const bool DefaultUseCallerHttpContext = false;
143
144 /// <summary>
145 /// Indicate to dispose of the state objects if they support the IDispose interface. (false)
146 /// </summary>
147 public const bool DefaultDisposeOfStateObjects = false;
148
149 /// <summary>
150 /// The default option to run the post execute (CallToPostExecute.Always)
151 /// </summary>
152 public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always;
153
154 /// <summary>
155 /// The default post execute method to run. (None)
156 /// When null it means not to call it.
157 /// </summary>
158 public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback;
159
160 /// <summary>
161 /// The default work item priority (WorkItemPriority.Normal)
162 /// </summary>
163 public const WorkItemPriority DefaultWorkItemPriority = WorkItemPriority.Normal;
164
165 /// <summary>
166 /// The default is to work on work items as soon as they arrive
167 /// and not to wait for the start. (false)
168 /// </summary>
169 public const bool DefaultStartSuspended = false;
170
171 /// <summary>
172 /// The default name to use for the performance counters instance. (null)
173 /// </summary>
174 public static readonly string DefaultPerformanceCounterInstanceName;
175
176#if !(WINDOWS_PHONE)
177
178 /// <summary>
179 /// The default thread priority (ThreadPriority.Normal)
180 /// </summary>
181 public const ThreadPriority DefaultThreadPriority = ThreadPriority.Normal;
182#endif
183 /// <summary>
184 /// The default thread pool name. (SmartThreadPool)
86 /// </summary> 185 /// </summary>
87 public const int DefaultIdleTimeout = 60*1000; // One minute 186 public const string DefaultThreadPoolName = "SmartThreadPool";
88 187
89 /// <summary> 188 /// <summary>
90 /// Indicate to copy the security context of the caller and then use it in the call. (false) 189 /// The default Max Stack Size. (SmartThreadPool)
91 /// </summary> 190 /// </summary>
92 public const bool DefaultUseCallerCallContext = false; 191 public static readonly int? DefaultMaxStackSize = null;
93 192
94 /// <summary> 193 /// <summary>
95 /// Indicate to copy the HTTP context of the caller and then use it in the call. (false) 194 /// The default fill state with params. (false)
195 /// It is relevant only to QueueWorkItem of Action&lt;...&gt;/Func&lt;...&gt;
96 /// </summary> 196 /// </summary>
97 public const bool DefaultUseCallerHttpContext = false; 197 public const bool DefaultFillStateWithArgs = false;
98 198
99 /// <summary> 199 /// <summary>
100 /// Indicate to dispose of the state objects if they support the IDispose interface. (false) 200 /// The default thread backgroundness. (true)
101 /// </summary> 201 /// </summary>
102 public const bool DefaultDisposeOfStateObjects = false; 202 public const bool DefaultAreThreadsBackground = true;
103 203
204#if !(_SILVERLIGHT) && !(WINDOWS_PHONE)
104 /// <summary> 205 /// <summary>
105 /// The default option to run the post execute 206 /// The default apartment state of a thread in the thread pool.
207 /// The default is ApartmentState.Unknown which means the STP will not
208 /// set the apartment of the thread. It will use the .NET default.
106 /// </summary> 209 /// </summary>
107 public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always; 210 public const ApartmentState DefaultApartmentState = ApartmentState.Unknown;
211#endif
108 212
109 /// <summary> 213 #endregion
110 /// The default post execute method to run.
111 /// When null it means not to call it.
112 /// </summary>
113 public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback = null;
114 214
115 /// <summary> 215 #region Member Variables
116 /// The default work item priority
117 /// </summary>
118 public const WorkItemPriority DefaultWorkItemPriority = WorkItemPriority.Normal;
119 216
120 /// <summary> 217 /// <summary>
121 /// The default is to work on work items as soon as they arrive 218 /// Dictionary of all the threads in the thread pool.
122 /// and not to wait for the start. 219 /// </summary>
123 /// </summary> 220 private readonly SynchronizedDictionary<Thread, ThreadEntry> _workerThreads = new SynchronizedDictionary<Thread, ThreadEntry>();
124 public const bool DefaultStartSuspended = false;
125 221
126 /// <summary> 222 /// <summary>
127 /// The default is not to use the performance counters 223 /// Queue of work items.
128 /// </summary> 224 /// </summary>
129 public static readonly string DefaultPerformanceCounterInstanceName = null; 225 private readonly WorkItemsQueue _workItemsQueue = new WorkItemsQueue();
130 226
131 public static readonly int DefaultStackSize = 0; 227 /// <summary>
228 /// Count the work items handled.
229 /// Used by the performance counter.
230 /// </summary>
231 private int _workItemsProcessed;
132 232
133 /// <summary> 233 /// <summary>
134 /// The default thread priority 234 /// Number of threads that currently work (not idle).
135 /// </summary> 235 /// </summary>
136 public const ThreadPriority DefaultThreadPriority = ThreadPriority.Normal; 236 private int _inUseWorkerThreads;
137 237
138 /// <summary> 238 /// <summary>
139 /// The default thread pool name 239 /// Stores a copy of the original STPStartInfo.
240 /// It is used to change the MinThread and MaxThreads
140 /// </summary> 241 /// </summary>
141 public const string DefaultThreadPoolName = "SmartThreadPool"; 242 private STPStartInfo _stpStartInfo;
142 243
143 #endregion 244 /// <summary>
245 /// Total number of work items that are stored in the work items queue
246 /// plus the work items that the threads in the pool are working on.
247 /// </summary>
248 private int _currentWorkItemsCount;
144 249
145 #region Member Variables 250 /// <summary>
251 /// Signaled when the thread pool is idle, i.e. no thread is busy
252 /// and the work items queue is empty
253 /// </summary>
254 //private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
255 private ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true);
146 256
147 /// <summary> 257 /// <summary>
148 /// Contains the name of this instance of SmartThreadPool. 258 /// An event to signal all the threads to quit immediately.
149 /// Can be changed by the user. 259 /// </summary>
150 /// </summary> 260 //private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false);
151 private string _name = DefaultThreadPoolName; 261 private ManualResetEvent _shuttingDownEvent = EventWaitHandleFactory.CreateManualResetEvent(false);
152 262
153 /// <summary> 263 /// <summary>
154 /// Hashtable of all the threads in the thread pool. 264 /// A flag to indicate if the Smart Thread Pool is now suspended.
155 /// </summary> 265 /// </summary>
156 private Hashtable _workerThreads = Hashtable.Synchronized(new Hashtable()); 266 private bool _isSuspended;
157 267
158 /// <summary> 268 /// <summary>
159 /// Queue of work items. 269 /// A flag to indicate the threads to quit.
160 /// </summary> 270 /// </summary>
161 private WorkItemsQueue _workItemsQueue = new WorkItemsQueue(); 271 private bool _shutdown;
162 272
163 /// <summary> 273 /// <summary>
164 /// Count the work items handled. 274 /// Counts the threads created in the pool.
165 /// Used by the performance counter. 275 /// It is used to name the threads.
166 /// </summary> 276 /// </summary>
167 private long _workItemsProcessed = 0; 277 private int _threadCounter;
168 278
169 /// <summary> 279 /// <summary>
170 /// Number of threads that currently work (not idle). 280 /// Indicate that the SmartThreadPool has been disposed
171 /// </summary> 281 /// </summary>
172 private int _inUseWorkerThreads = 0; 282 private bool _isDisposed;
173 283
174 /// <summary> 284 /// <summary>
175 /// Start information to use. 285 /// Holds all the WorkItemsGroup instaces that have at least one
176 /// It is simpler than providing many constructors. 286 /// work item int the SmartThreadPool
177 /// </summary> 287 /// This variable is used in case of Shutdown
178 private STPStartInfo _stpStartInfo = new STPStartInfo(); 288 /// </summary>
289 private readonly SynchronizedDictionary<IWorkItemsGroup, IWorkItemsGroup> _workItemsGroups = new SynchronizedDictionary<IWorkItemsGroup, IWorkItemsGroup>();
179 290
180 /// <summary> 291 /// <summary>
181 /// Total number of work items that are stored in the work items queue 292 /// A common object for all the work items int the STP
182 /// plus the work items that the threads in the pool are working on. 293 /// so we can mark them to cancel in O(1)
183 /// </summary> 294 /// </summary>
184 private int _currentWorkItemsCount = 0; 295 private CanceledWorkItemsGroup _canceledSmartThreadPool = new CanceledWorkItemsGroup();
185 296
186 /// <summary> 297 /// <summary>
187 /// Signaled when the thread pool is idle, i.e. no thread is busy 298 /// Windows STP performance counters
188 /// and the work items queue is empty
189 /// </summary> 299 /// </summary>
190 private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); 300 private ISTPInstancePerformanceCounters _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
191 301
192 /// <summary> 302 /// <summary>
193 /// An event to signal all the threads to quit immediately. 303 /// Local STP performance counters
194 /// </summary> 304 /// </summary>
195 private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false); 305 private ISTPInstancePerformanceCounters _localPCs = NullSTPInstancePerformanceCounters.Instance;
196 306
197 /// <summary>
198 /// A flag to indicate the threads to quit.
199 /// </summary>
200 private bool _shutdown = false;
201 307
202 /// <summary> 308#if (WINDOWS_PHONE)
203 /// Counts the threads created in the pool. 309 private static readonly Dictionary<int, ThreadEntry> _threadEntries = new Dictionary<int, ThreadEntry>();
204 /// It is used to name the threads. 310#elif (_WINDOWS_CE)
205 /// </summary> 311 private static LocalDataStoreSlot _threadEntrySlot = Thread.AllocateDataSlot();
206 private int _threadCounter = 0; 312#else
313 [ThreadStatic]
314 private static ThreadEntry _threadEntry;
207 315
208 /// <summary> 316#endif
209 /// Indicate that the SmartThreadPool has been disposed
210 /// </summary>
211 private bool _isDisposed = false;
212 317
213 /// <summary> 318 /// <summary>
214 /// Event to send that the thread pool is idle 319 /// An event to call after a thread is created, but before
320 /// it's first use.
215 /// </summary> 321 /// </summary>
216 private event EventHandler _stpIdle; 322 private event ThreadInitializationHandler _onThreadInitialization;
217 323
218 /// <summary> 324 /// <summary>
219 /// On idle event 325 /// An event to call when a thread is about to exit, after
326 /// it is no longer belong to the pool.
220 /// </summary> 327 /// </summary>
221 //private event WorkItemsGroupIdleHandler _onIdle; 328 private event ThreadTerminationHandler _onThreadTermination;
222 329
223 /// <summary> 330 #endregion
224 /// Holds all the WorkItemsGroup instaces that have at least one
225 /// work item int the SmartThreadPool
226 /// This variable is used in case of Shutdown
227 /// </summary>
228 private Hashtable _workItemsGroups = Hashtable.Synchronized(new Hashtable());
229 331
230 /// <summary> 332 #region Per thread properties
231 /// A reference from each thread in the thread pool to its SmartThreadPool
232 /// object container.
233 /// With this variable a thread can know whatever it belongs to a
234 /// SmartThreadPool.
235 /// </summary>
236 [ThreadStatic]
237 private static SmartThreadPool _smartThreadPool;
238 333
239 /// <summary> 334 /// <summary>
240 /// A reference to the current work item a thread from the thread pool 335 /// A reference to the current work item a thread from the thread pool
241 /// is executing. 336 /// is executing.
242 /// </summary> 337 /// </summary>
243 [ThreadStatic] 338 internal static ThreadEntry CurrentThreadEntry
244 private static WorkItem _currentWorkItem; 339 {
245 340#if (WINDOWS_PHONE)
246 /// <summary> 341 get
247 /// STP performance counters 342 {
248 /// </summary> 343 lock(_threadEntries)
249 private ISTPInstancePerformanceCounters _pcs = NullSTPInstancePerformanceCounters.Instance; 344 {
250 345 ThreadEntry threadEntry;
346 if (_threadEntries.TryGetValue(Thread.CurrentThread.ManagedThreadId, out threadEntry))
347 {
348 return threadEntry;
349 }
350 }
351 return null;
352 }
353 set
354 {
355 lock(_threadEntries)
356 {
357 _threadEntries[Thread.CurrentThread.ManagedThreadId] = value;
358 }
359 }
360#elif (_WINDOWS_CE)
361 get
362 {
363 //Thread.CurrentThread.ManagedThreadId
364 return Thread.GetData(_threadEntrySlot) as ThreadEntry;
365 }
366 set
367 {
368 Thread.SetData(_threadEntrySlot, value);
369 }
370#else
371 get
372 {
373 return _threadEntry;
374 }
375 set
376 {
377 _threadEntry = value;
378 }
379#endif
380 }
251 #endregion 381 #endregion
252 382
253 #region Construction and Finalization 383 #region Construction and Finalization
254 384
255 /// <summary> 385 /// <summary>
256 /// Constructor 386 /// Constructor
257 /// </summary> 387 /// </summary>
258 public SmartThreadPool() 388 public SmartThreadPool()
259 { 389 {
390 _stpStartInfo = new STPStartInfo();
260 Initialize(); 391 Initialize();
261 } 392 }
262 393
263 /// <summary> 394 /// <summary>
264 /// Constructor 395 /// Constructor
265 /// </summary> 396 /// </summary>
266 /// <param name="idleTimeout">Idle timeout in milliseconds</param> 397 /// <param name="idleTimeout">Idle timeout in milliseconds</param>
267 public SmartThreadPool(int idleTimeout) 398 public SmartThreadPool(int idleTimeout)
268 { 399 {
269 _stpStartInfo.IdleTimeout = idleTimeout; 400 _stpStartInfo = new STPStartInfo
270 Initialize(); 401 {
271 } 402 IdleTimeout = idleTimeout,
403 };
404 Initialize();
405 }
406
407 /// <summary>
408 /// Constructor
409 /// </summary>
410 /// <param name="idleTimeout">Idle timeout in milliseconds</param>
411 /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
412 public SmartThreadPool(
413 int idleTimeout,
414 int maxWorkerThreads)
415 {
416 _stpStartInfo = new STPStartInfo
417 {
418 IdleTimeout = idleTimeout,
419 MaxWorkerThreads = maxWorkerThreads,
420 };
421 Initialize();
422 }
423
424 /// <summary>
425 /// Constructor
426 /// </summary>
427 /// <param name="idleTimeout">Idle timeout in milliseconds</param>
428 /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
429 /// <param name="minWorkerThreads">Lower limit of threads in the pool</param>
430 public SmartThreadPool(
431 int idleTimeout,
432 int maxWorkerThreads,
433 int minWorkerThreads)
434 {
435 _stpStartInfo = new STPStartInfo
436 {
437 IdleTimeout = idleTimeout,
438 MaxWorkerThreads = maxWorkerThreads,
439 MinWorkerThreads = minWorkerThreads,
440 };
441 Initialize();
442 }
272 443
273 /// <summary> 444 /// <summary>
274 /// Constructor 445 /// Constructor
275 /// </summary> 446 /// </summary>
276 /// <param name="idleTimeout">Idle timeout in milliseconds</param> 447 /// <param name="stpStartInfo">A SmartThreadPool configuration that overrides the default behavior</param>
277 /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param> 448 public SmartThreadPool(STPStartInfo stpStartInfo)
278 public SmartThreadPool( 449 {
279 int idleTimeout, 450 _stpStartInfo = new STPStartInfo(stpStartInfo);
280 int maxWorkerThreads) 451 Initialize();
281 { 452 }
282 _stpStartInfo.IdleTimeout = idleTimeout;
283 _stpStartInfo.MaxWorkerThreads = maxWorkerThreads;
284 Initialize();
285 }
286 453
287 /// <summary> 454 private void Initialize()
288 /// Constructor 455 {
289 /// </summary> 456 Name = _stpStartInfo.ThreadPoolName;
290 /// <param name="idleTimeout">Idle timeout in milliseconds</param> 457 ValidateSTPStartInfo();
291 /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
292 /// <param name="minWorkerThreads">Lower limit of threads in the pool</param>
293 public SmartThreadPool(
294 int idleTimeout,
295 int maxWorkerThreads,
296 int minWorkerThreads)
297 {
298 _stpStartInfo.IdleTimeout = idleTimeout;
299 _stpStartInfo.MaxWorkerThreads = maxWorkerThreads;
300 _stpStartInfo.MinWorkerThreads = minWorkerThreads;
301 Initialize();
302 }
303 458
304 /// <summary> 459 // _stpStartInfoRW stores a read/write copy of the STPStartInfo.
305 /// Constructor 460 // Actually only MaxWorkerThreads and MinWorkerThreads are overwritten
306 /// </summary>
307 public SmartThreadPool(STPStartInfo stpStartInfo)
308 {
309 _stpStartInfo = new STPStartInfo(stpStartInfo);
310 Initialize();
311 }
312 461
313 private void Initialize() 462 _isSuspended = _stpStartInfo.StartSuspended;
314 {
315 Name = _stpStartInfo.ThreadPoolName;
316 ValidateSTPStartInfo();
317 463
464#if (_WINDOWS_CE) || (_SILVERLIGHT) || (_MONO) || (WINDOWS_PHONE)
465 if (null != _stpStartInfo.PerformanceCounterInstanceName)
466 {
467 throw new NotSupportedException("Performance counters are not implemented for Compact Framework/Silverlight/Mono, instead use StpStartInfo.EnableLocalPerformanceCounters");
468 }
469#else
318 if (null != _stpStartInfo.PerformanceCounterInstanceName) 470 if (null != _stpStartInfo.PerformanceCounterInstanceName)
319 { 471 {
320 try 472 try
321 { 473 {
322 _pcs = new STPInstancePerformanceCounters(_stpStartInfo.PerformanceCounterInstanceName); 474 _windowsPCs = new STPInstancePerformanceCounters(_stpStartInfo.PerformanceCounterInstanceName);
323 } 475 }
324 catch(Exception e) 476 catch (Exception e)
325 { 477 {
326 Debug.WriteLine("Unable to create Performance Counters: " + e.ToString()); 478 Debug.WriteLine("Unable to create Performance Counters: " + e);
327 _pcs = NullSTPInstancePerformanceCounters.Instance; 479 _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
328 } 480 }
329 } 481 }
482#endif
330 483
331 StartOptimalNumberOfThreads(); 484 if (_stpStartInfo.EnableLocalPerformanceCounters)
332 }
333
334 private void StartOptimalNumberOfThreads()
335 {
336 int threadsCount = Math.Max(_workItemsQueue.Count, _stpStartInfo.MinWorkerThreads);
337 threadsCount = Math.Min(threadsCount, _stpStartInfo.MaxWorkerThreads);
338 StartThreads(threadsCount);
339 }
340
341 private void ValidateSTPStartInfo()
342 {
343 if (_stpStartInfo.MinWorkerThreads < 0)
344 {
345 throw new ArgumentOutOfRangeException(
346 "MinWorkerThreads", "MinWorkerThreads cannot be negative");
347 }
348
349 if (_stpStartInfo.MaxWorkerThreads <= 0)
350 { 485 {
351 throw new ArgumentOutOfRangeException( 486 _localPCs = new LocalSTPInstancePerformanceCounters();
352 "MaxWorkerThreads", "MaxWorkerThreads must be greater than zero");
353 } 487 }
354 488
355 if (_stpStartInfo.MinWorkerThreads > _stpStartInfo.MaxWorkerThreads) 489 // If the STP is not started suspended then start the threads.
490 if (!_isSuspended)
356 { 491 {
357 throw new ArgumentOutOfRangeException( 492 StartOptimalNumberOfThreads();
358 "MinWorkerThreads, maxWorkerThreads",
359 "MaxWorkerThreads must be greater or equal to MinWorkerThreads");
360 } 493 }
361 } 494 }
362 495
363 private void ValidateCallback(Delegate callback) 496 private void StartOptimalNumberOfThreads()
364 { 497 {
365 if(callback.GetInvocationList().Length > 1) 498 int threadsCount = Math.Max(_workItemsQueue.Count, _stpStartInfo.MinWorkerThreads);
499 threadsCount = Math.Min(threadsCount, _stpStartInfo.MaxWorkerThreads);
500 threadsCount -= _workerThreads.Count;
501 if (threadsCount > 0)
366 { 502 {
367 throw new NotSupportedException("SmartThreadPool doesn't support delegates chains"); 503 StartThreads(threadsCount);
368 } 504 }
369 } 505 }
370 506
371 #endregion 507 private void ValidateSTPStartInfo()
508 {
509 if (_stpStartInfo.MinWorkerThreads < 0)
510 {
511 throw new ArgumentOutOfRangeException(
512 "MinWorkerThreads", "MinWorkerThreads cannot be negative");
513 }
372 514
373 #region Thread Processing 515 if (_stpStartInfo.MaxWorkerThreads <= 0)
516 {
517 throw new ArgumentOutOfRangeException(
518 "MaxWorkerThreads", "MaxWorkerThreads must be greater than zero");
519 }
374 520
375 /// <summary> 521 if (_stpStartInfo.MinWorkerThreads > _stpStartInfo.MaxWorkerThreads)
376 /// Waits on the queue for a work item, shutdown, or timeout. 522 {
377 /// </summary> 523 throw new ArgumentOutOfRangeException(
378 /// <returns> 524 "MinWorkerThreads, maxWorkerThreads",
379 /// Returns the WaitingCallback or null in case of timeout or shutdown. 525 "MaxWorkerThreads must be greater or equal to MinWorkerThreads");
380 /// </returns> 526 }
381 private WorkItem Dequeue() 527 }
382 { 528
383 WorkItem workItem = 529 private static void ValidateCallback(Delegate callback)
530 {
531 if(callback.GetInvocationList().Length > 1)
532 {
533 throw new NotSupportedException("SmartThreadPool doesn't support delegates chains");
534 }
535 }
536
537 #endregion
538
539 #region Thread Processing
540
541 /// <summary>
542 /// Waits on the queue for a work item, shutdown, or timeout.
543 /// </summary>
544 /// <returns>
545 /// Returns the WaitingCallback or null in case of timeout or shutdown.
546 /// </returns>
547 private WorkItem Dequeue()
548 {
549 WorkItem workItem =
384 _workItemsQueue.DequeueWorkItem(_stpStartInfo.IdleTimeout, _shuttingDownEvent); 550 _workItemsQueue.DequeueWorkItem(_stpStartInfo.IdleTimeout, _shuttingDownEvent);
385 551
386 return workItem; 552 return workItem;
387 } 553 }
388 554
389 /// <summary> 555 /// <summary>
390 /// Put a new work item in the queue 556 /// Put a new work item in the queue
391 /// </summary> 557 /// </summary>
392 /// <param name="workItem">A work item to queue</param> 558 /// <param name="workItem">A work item to queue</param>
393 private void Enqueue(WorkItem workItem) 559 internal override void Enqueue(WorkItem workItem)
394 { 560 {
395 Enqueue(workItem, true); 561 // Make sure the workItem is not null
396 } 562 Debug.Assert(null != workItem);
397 563
398 /// <summary> 564 IncrementWorkItemsCount();
399 /// Put a new work item in the queue 565
400 /// </summary> 566 workItem.CanceledSmartThreadPool = _canceledSmartThreadPool;
401 /// <param name="workItem">A work item to queue</param> 567 _workItemsQueue.EnqueueWorkItem(workItem);
402 internal void Enqueue(WorkItem workItem, bool incrementWorkItems) 568 workItem.WorkItemIsQueued();
403 { 569
404 // Make sure the workItem is not null 570 // If all the threads are busy then try to create a new one
405 Debug.Assert(null != workItem); 571 if (_currentWorkItemsCount > _workerThreads.Count)
406 572 {
407 if (incrementWorkItems) 573 StartThreads(1);
408 { 574 }
409 IncrementWorkItemsCount(); 575 }
410 } 576
411 577 private void IncrementWorkItemsCount()
412 _workItemsQueue.EnqueueWorkItem(workItem); 578 {
413 workItem.WorkItemIsQueued(); 579 _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
414 580 _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
415 // If all the threads are busy then try to create a new one 581
416 if ((InUseThreads + WaitingCallbacks) > _workerThreads.Count) 582 int count = Interlocked.Increment(ref _currentWorkItemsCount);
417 { 583 //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
418 StartThreads(1); 584 if (count == 1)
419 } 585 {
420 } 586 IsIdle = false;
421
422 private void IncrementWorkItemsCount()
423 {
424 _pcs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
425
426 int count = Interlocked.Increment(ref _currentWorkItemsCount);
427 //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
428 if (count == 1)
429 {
430 //Trace.WriteLine("STP is NOT idle");
431 _isIdleWaitHandle.Reset(); 587 _isIdleWaitHandle.Reset();
432 } 588 }
433 } 589 }
434
435 private void DecrementWorkItemsCount()
436 {
437 ++_workItemsProcessed;
438
439 // The counter counts even if the work item was cancelled
440 _pcs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
441 590
591 private void DecrementWorkItemsCount()
592 {
442 int count = Interlocked.Decrement(ref _currentWorkItemsCount); 593 int count = Interlocked.Decrement(ref _currentWorkItemsCount);
443 //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString()); 594 //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
444 if (count == 0) 595 if (count == 0)
445 { 596 {
446 //Trace.WriteLine("STP is idle"); 597 IsIdle = true;
447 _isIdleWaitHandle.Set(); 598 _isIdleWaitHandle.Set();
448 } 599 }
449 }
450 600
451 internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup) 601 Interlocked.Increment(ref _workItemsProcessed);
452 {
453 _workItemsGroups[workItemsGroup] = workItemsGroup;
454 }
455
456 internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
457 {
458 if (_workItemsGroups.Contains(workItemsGroup))
459 {
460 _workItemsGroups.Remove(workItemsGroup);
461 }
462 }
463 602
464 /// <summary> 603 if (!_shutdown)
465 /// Inform that the current thread is about to quit or quiting.
466 /// The same thread may call this method more than once.
467 /// </summary>
468 private void InformCompleted()
469 {
470 // There is no need to lock the two methods together
471 // since only the current thread removes itself
472 // and the _workerThreads is a synchronized hashtable
473 if (_workerThreads.Contains(Thread.CurrentThread))
474 { 604 {
475 _workerThreads.Remove(Thread.CurrentThread); 605 // The counter counts even if the work item was cancelled
476 _pcs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); 606 _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
607 _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
477 } 608 }
478 }
479 609
480 /// <summary> 610 }
481 /// Starts new threads 611
482 /// </summary> 612 internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
483 /// <param name="threadsCount">The number of threads to start</param> 613 {
484 private void StartThreads(int threadsCount) 614 _workItemsGroups[workItemsGroup] = workItemsGroup;
485 { 615 }
486 if (_stpStartInfo.StartSuspended) 616
487 { 617 internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
488 return; 618 {
489 } 619 if (_workItemsGroups.Contains(workItemsGroup))
620 {
621 _workItemsGroups.Remove(workItemsGroup);
622 }
623 }
624
625 /// <summary>
626 /// Inform that the current thread is about to quit or quiting.
627 /// The same thread may call this method more than once.
628 /// </summary>
629 private void InformCompleted()
630 {
631 // There is no need to lock the two methods together
632 // since only the current thread removes itself
633 // and the _workerThreads is a synchronized dictionary
634 if (_workerThreads.Contains(Thread.CurrentThread))
635 {
636 _workerThreads.Remove(Thread.CurrentThread);
637 _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
638 _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
639 }
640 }
641
642 /// <summary>
643 /// Starts new threads
644 /// </summary>
645 /// <param name="threadsCount">The number of threads to start</param>
646 private void StartThreads(int threadsCount)
647 {
648 if (_isSuspended)
649 {
650 return;
651 }
652
653 lock(_workerThreads.SyncRoot)
654 {
655 // Don't start threads on shut down
656 if (_shutdown)
657 {
658 return;
659 }
660
661 for(int i = 0; i < threadsCount; ++i)
662 {
663 // Don't create more threads then the upper limit
664 if (_workerThreads.Count >= _stpStartInfo.MaxWorkerThreads)
665 {
666 return;
667 }
490 668
491 lock(_workerThreads.SyncRoot) 669 // Create a new thread
492 {
493 // Don't start threads on shut down
494 if (_shutdown)
495 {
496 return;
497 }
498 670
499 for(int i = 0; i < threadsCount; ++i) 671#if (_SILVERLIGHT) || (WINDOWS_PHONE)
500 { 672 Thread workerThread = new Thread(ProcessQueuedItems);
501 // Don't create more threads then the upper limit 673#else
502 if (_workerThreads.Count >= _stpStartInfo.MaxWorkerThreads) 674 Thread workerThread =
675 _stpStartInfo.MaxStackSize.HasValue
676 ? new Thread(ProcessQueuedItems, _stpStartInfo.MaxStackSize.Value)
677 : new Thread(ProcessQueuedItems);
678#endif
679 // Configure the new thread and start it
680 workerThread.Name = "STP " + Name + " Thread #" + _threadCounter;
681 workerThread.IsBackground = _stpStartInfo.AreThreadsBackground;
682
683#if !(_SILVERLIGHT) && !(_WINDOWS_CE) && !(WINDOWS_PHONE)
684 if (_stpStartInfo.ApartmentState != ApartmentState.Unknown)
503 { 685 {
504 return; 686 workerThread.SetApartmentState(_stpStartInfo.ApartmentState);
505 } 687 }
688#endif
506 689
507 // Create a new thread 690#if !(_SILVERLIGHT) && !(WINDOWS_PHONE)
508 Thread workerThread;
509 if (_stpStartInfo.StackSize > 0)
510 workerThread = new Thread(ProcessQueuedItems, _stpStartInfo.StackSize);
511 else
512 workerThread = new Thread(ProcessQueuedItems);
513
514 // Configure the new thread and start it
515 workerThread.Name = "STP " + Name + " Thread #" + _threadCounter;
516 workerThread.IsBackground = true;
517 workerThread.Priority = _stpStartInfo.ThreadPriority; 691 workerThread.Priority = _stpStartInfo.ThreadPriority;
692#endif
518 workerThread.Start(); 693 workerThread.Start();
519 ++_threadCounter; 694 ++_threadCounter;
520 695
521 // Add the new thread to the hashtable and update its creation 696 // Add it to the dictionary and update its creation time.
522 // time. 697 _workerThreads[workerThread] = new ThreadEntry(this);
523 _workerThreads[workerThread] = DateTime.Now; 698
524 _pcs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); 699 _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
525 } 700 _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
526 } 701 }
527 } 702 }
528 703 }
529 /// <summary> 704
530 /// A worker thread method that processes work items from the work items queue. 705 /// <summary>
531 /// </summary> 706 /// A worker thread method that processes work items from the work items queue.
532 private void ProcessQueuedItems() 707 /// </summary>
533 { 708 private void ProcessQueuedItems()
534 // Initialize the _smartThreadPool variable 709 {
535 _smartThreadPool = this; 710 // Keep the entry of the dictionary as thread's variable to avoid the synchronization locks
536 711 // of the dictionary.
537 try 712 CurrentThreadEntry = _workerThreads[Thread.CurrentThread];
538 { 713
539 bool bInUseWorkerThreadsWasIncremented = false; 714 FireOnThreadInitialization();
540 715
541 // Process until shutdown. 716 try
542 while(!_shutdown) 717 {
543 { 718 bool bInUseWorkerThreadsWasIncremented = false;
544 // Update the last time this thread was seen alive. 719
545 // It's good for debugging. 720 // Process until shutdown.
546 _workerThreads[Thread.CurrentThread] = DateTime.Now; 721 while(!_shutdown)
547 722 {
548 // Wait for a work item, shutdown, or timeout 723 // Update the last time this thread was seen alive.
549 WorkItem workItem = Dequeue(); 724 // It's good for debugging.
550 725 CurrentThreadEntry.IAmAlive();
551 // Update the last time this thread was seen alive. 726
552 // It's good for debugging. 727 // The following block handles the when the MaxWorkerThreads has been
553 _workerThreads[Thread.CurrentThread] = DateTime.Now; 728 // incremented by the user at run-time.
554 729 // Double lock for quit.
555 // On timeout or shut down. 730 if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads)
556 if (null == workItem)
557 { 731 {
558 // Double lock for quit. 732 lock (_workerThreads.SyncRoot)
559 if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
560 { 733 {
561 lock(_workerThreads.SyncRoot) 734 if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads)
562 { 735 {
563 if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads) 736 // Inform that the thread is quiting and then quit.
564 { 737 // This method must be called within this lock or else
565 // Inform that the thread is quiting and then quit. 738 // more threads will quit and the thread pool will go
566 // This method must be called within this lock or else 739 // below the lower limit.
567 // more threads will quit and the thread pool will go 740 InformCompleted();
568 // below the lower limit. 741 break;
569 InformCompleted();
570 break;
571 }
572 } 742 }
573 } 743 }
574 } 744 }
575 745
576 // If we didn't quit then skip to the next iteration. 746 // Wait for a work item, shutdown, or timeout
577 if (null == workItem) 747 WorkItem workItem = Dequeue();
578 {
579 continue;
580 }
581
582 try
583 {
584 // Initialize the value to false
585 bInUseWorkerThreadsWasIncremented = false;
586
587 // Change the state of the work item to 'in progress' if possible.
588 // We do it here so if the work item has been canceled we won't
589 // increment the _inUseWorkerThreads.
590 // The cancel mechanism doesn't delete items from the queue,
591 // it marks the work item as canceled, and when the work item
592 // is dequeued, we just skip it.
593 // If the post execute of work item is set to always or to
594 // call when the work item is canceled then the StartingWorkItem()
595 // will return true, so the post execute can run.
596 if (!workItem.StartingWorkItem())
597 {
598 continue;
599 }
600 748
601 // Execute the callback. Make sure to accurately 749 // Update the last time this thread was seen alive.
602 // record how many callbacks are currently executing. 750 // It's good for debugging.
603 int inUseWorkerThreads = Interlocked.Increment(ref _inUseWorkerThreads); 751 CurrentThreadEntry.IAmAlive();
604 _pcs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
605 752
606 // Mark that the _inUseWorkerThreads incremented, so in the finally{} 753 // On timeout or shut down.
607 // statement we will decrement it correctly. 754 if (null == workItem)
608 bInUseWorkerThreadsWasIncremented = true; 755 {
609 756 // Double lock for quit.
610 // Set the _currentWorkItem to the current work item 757 if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
611 _currentWorkItem = workItem; 758 {
612 759 lock(_workerThreads.SyncRoot)
613 lock(workItem) 760 {
614 { 761 if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
615 workItem.currentThread = Thread.CurrentThread; 762 {
616 } 763 // Inform that the thread is quiting and then quit.
617 764 // This method must be called within this lock or else
618 ExecuteWorkItem(workItem); 765 // more threads will quit and the thread pool will go
619 766 // below the lower limit.
620 lock(workItem) 767 InformCompleted();
621 { 768 break;
622 workItem.currentThread = null; 769 }
623 } 770 }
624 771 }
625 } 772 }
626 catch(ThreadAbortException ex) 773
627 { 774 // If we didn't quit then skip to the next iteration.
628 lock(workItem) 775 if (null == workItem)
629 { 776 {
630 workItem.currentThread = null; 777 continue;
631 } 778 }
632 ex.GetHashCode(); 779
633 Thread.ResetAbort(); 780 try
634 } 781 {
635 catch(Exception ex) 782 // Initialize the value to false
636 { 783 bInUseWorkerThreadsWasIncremented = false;
784
785 // Set the Current Work Item of the thread.
786 // Store the Current Work Item before the workItem.StartingWorkItem() is called,
787 // so WorkItem.Cancel can work when the work item is between InQueue and InProgress
788 // states.
789 // If the work item has been cancelled BEFORE the workItem.StartingWorkItem()
790 // (work item is in InQueue state) then workItem.StartingWorkItem() will return false.
791 // If the work item has been cancelled AFTER the workItem.StartingWorkItem() then
792 // (work item is in InProgress state) then the thread will be aborted
793 CurrentThreadEntry.CurrentWorkItem = workItem;
794
795 // Change the state of the work item to 'in progress' if possible.
796 // We do it here so if the work item has been canceled we won't
797 // increment the _inUseWorkerThreads.
798 // The cancel mechanism doesn't delete items from the queue,
799 // it marks the work item as canceled, and when the work item
800 // is dequeued, we just skip it.
801 // If the post execute of work item is set to always or to
802 // call when the work item is canceled then the StartingWorkItem()
803 // will return true, so the post execute can run.
804 if (!workItem.StartingWorkItem())
805 {
806 continue;
807 }
808
809 // Execute the callback. Make sure to accurately
810 // record how many callbacks are currently executing.
811 int inUseWorkerThreads = Interlocked.Increment(ref _inUseWorkerThreads);
812 _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
813 _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
814
815 // Mark that the _inUseWorkerThreads incremented, so in the finally{}
816 // statement we will decrement it correctly.
817 bInUseWorkerThreadsWasIncremented = true;
818
819 workItem.FireWorkItemStarted();
820
821 ExecuteWorkItem(workItem);
822 }
823 catch(Exception ex)
824 {
637 ex.GetHashCode(); 825 ex.GetHashCode();
638 // Do nothing 826 // Do nothing
639 } 827 }
640 finally 828 finally
641 { 829 {
642 lock(workItem) 830 workItem.DisposeOfState();
643 { 831
644 workItem.currentThread = null; 832 // Set the CurrentWorkItem to null, since we
645 } 833 // no longer run user's code.
646 834 CurrentThreadEntry.CurrentWorkItem = null;
647 if (null != workItem) 835
648 { 836 // Decrement the _inUseWorkerThreads only if we had
649 workItem.DisposeOfState(); 837 // incremented it. Note the cancelled work items don't
650 } 838 // increment _inUseWorkerThreads.
651 839 if (bInUseWorkerThreadsWasIncremented)
652 // Set the _currentWorkItem to null, since we 840 {
653 // no longer run user's code. 841 int inUseWorkerThreads = Interlocked.Decrement(ref _inUseWorkerThreads);
654 _currentWorkItem = null; 842 _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
655 843 _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
656 // Decrement the _inUseWorkerThreads only if we had 844 }
657 // incremented it. Note the cancelled work items don't 845
658 // increment _inUseWorkerThreads. 846 // Notify that the work item has been completed.
659 if (bInUseWorkerThreadsWasIncremented) 847 // WorkItemsGroup may enqueue their next work item.
660 { 848 workItem.FireWorkItemCompleted();
661 int inUseWorkerThreads = Interlocked.Decrement(ref _inUseWorkerThreads); 849
662 _pcs.SampleThreads(_workerThreads.Count, inUseWorkerThreads); 850 // Decrement the number of work items here so the idle
663 } 851 // ManualResetEvent won't fluctuate.
664 852 DecrementWorkItemsCount();
665 // Notify that the work item has been completed. 853 }
666 // WorkItemsGroup may enqueue their next work item. 854 }
667 workItem.FireWorkItemCompleted(); 855 }
668 856 catch(ThreadAbortException tae)
669 // Decrement the number of work items here so the idle 857 {
670 // ManualResetEvent won't fluctuate.
671 DecrementWorkItemsCount();
672 }
673 }
674 }
675 catch(ThreadAbortException tae)
676 {
677 tae.GetHashCode(); 858 tae.GetHashCode();
678 // Handle the abort exception gracfully. 859 // Handle the abort exception gracfully.
679 Thread.ResetAbort(); 860#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
680 } 861 Thread.ResetAbort();
681 catch(Exception e) 862#endif
682 {
683 Debug.Assert(null != e);
684 }
685 finally
686 {
687 InformCompleted();
688 }
689 }
690
691 private void ExecuteWorkItem(WorkItem workItem)
692 {
693 _pcs.SampleWorkItemsWaitTime(workItem.WaitingTime);
694 try
695 {
696 workItem.Execute();
697 }
698 catch
699 {
700 throw;
701 } 863 }
702 finally 864 catch(Exception e)
865 {
866 Debug.Assert(null != e);
867 }
868 finally
869 {
870 InformCompleted();
871 FireOnThreadTermination();
872 }
873 }
874
875 private void ExecuteWorkItem(WorkItem workItem)
876 {
877 _windowsPCs.SampleWorkItemsWaitTime(workItem.WaitingTime);
878 _localPCs.SampleWorkItemsWaitTime(workItem.WaitingTime);
879 try
880 {
881 workItem.Execute();
882 }
883 finally
884 {
885 _windowsPCs.SampleWorkItemsProcessTime(workItem.ProcessTime);
886 _localPCs.SampleWorkItemsProcessTime(workItem.ProcessTime);
887 }
888 }
889
890
891 #endregion
892
893 #region Public Methods
894
895 private void ValidateWaitForIdle()
896 {
897 if (null != CurrentThreadEntry && CurrentThreadEntry.AssociatedSmartThreadPool == this)
898 {
899 throw new NotSupportedException(
900 "WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
901 }
902 }
903
904 internal static void ValidateWorkItemsGroupWaitForIdle(IWorkItemsGroup workItemsGroup)
905 {
906 if (null == CurrentThreadEntry)
703 { 907 {
704 _pcs.SampleWorkItemsProcessTime(workItem.ProcessTime); 908 return;
705 } 909 }
706 }
707
708
709 #endregion
710 910
711 #region Public Methods 911 WorkItem workItem = CurrentThreadEntry.CurrentWorkItem;
712 912 ValidateWorkItemsGroupWaitForIdleImpl(workItemsGroup, workItem);
713 /// <summary> 913 if ((null != workItemsGroup) &&
714 /// Queue a work item 914 (null != workItem) &&
915 CurrentThreadEntry.CurrentWorkItem.WasQueuedBy(workItemsGroup))
916 {
917 throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
918 }
919 }
920
921 [MethodImpl(MethodImplOptions.NoInlining)]
922 private static void ValidateWorkItemsGroupWaitForIdleImpl(IWorkItemsGroup workItemsGroup, WorkItem workItem)
923 {
924 if ((null != workItemsGroup) &&
925 (null != workItem) &&
926 workItem.WasQueuedBy(workItemsGroup))
927 {
928 throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
929 }
930 }
931
932 /// <summary>
933 /// Force the SmartThreadPool to shutdown
934 /// </summary>
935 public void Shutdown()
936 {
937 Shutdown(true, 0);
938 }
939
940 /// <summary>
941 /// Force the SmartThreadPool to shutdown with timeout
715 /// </summary> 942 /// </summary>
716 /// <param name="callback">A callback to execute</param> 943 public void Shutdown(bool forceAbort, TimeSpan timeout)
717 /// <returns>Returns a work item result</returns> 944 {
718 public IWorkItemResult QueueWorkItem(WorkItemCallback callback) 945 Shutdown(forceAbort, (int)timeout.TotalMilliseconds);
719 { 946 }
720 ValidateNotDisposed(); 947
721 ValidateCallback(callback); 948 /// <summary>
722 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback); 949 /// Empties the queue of work items and abort the threads in the pool.
723 Enqueue(workItem); 950 /// </summary>
724 return workItem.GetWorkItemResult(); 951 public void Shutdown(bool forceAbort, int millisecondsTimeout)
725 } 952 {
953 ValidateNotDisposed();
954
955 ISTPInstancePerformanceCounters pcs = _windowsPCs;
956
957 if (NullSTPInstancePerformanceCounters.Instance != _windowsPCs)
958 {
959 // Set the _pcs to "null" to stop updating the performance
960 // counters
961 _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
962
963 pcs.Dispose();
964 }
965
966 Thread [] threads;
967 lock(_workerThreads.SyncRoot)
968 {
969 // Shutdown the work items queue
970 _workItemsQueue.Dispose();
971
972 // Signal the threads to exit
973 _shutdown = true;
974 _shuttingDownEvent.Set();
975
976 // Make a copy of the threads' references in the pool
977 threads = new Thread [_workerThreads.Count];
978 _workerThreads.Keys.CopyTo(threads, 0);
979 }
980
981 int millisecondsLeft = millisecondsTimeout;
982 Stopwatch stopwatch = Stopwatch.StartNew();
983 //DateTime start = DateTime.UtcNow;
984 bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout);
985 bool timeout = false;
986
987 // Each iteration we update the time left for the timeout.
988 foreach(Thread thread in threads)
989 {
990 // Join don't work with negative numbers
991 if (!waitInfinitely && (millisecondsLeft < 0))
992 {
993 timeout = true;
994 break;
995 }
996
997 // Wait for the thread to terminate
998 bool success = thread.Join(millisecondsLeft);
999 if(!success)
1000 {
1001 timeout = true;
1002 break;
1003 }
1004
1005 if(!waitInfinitely)
1006 {
1007 // Update the time left to wait
1008 //TimeSpan ts = DateTime.UtcNow - start;
1009 millisecondsLeft = millisecondsTimeout - (int)stopwatch.ElapsedMilliseconds;
1010 }
1011 }
1012
1013 if (timeout && forceAbort)
1014 {
1015 // Abort the threads in the pool
1016 foreach(Thread thread in threads)
1017 {
1018
1019 if ((thread != null)
1020#if !(_WINDOWS_CE)
1021 && thread.IsAlive
1022#endif
1023 )
1024 {
1025 try
1026 {
1027 thread.Abort(); // Shutdown
1028 }
1029 catch(SecurityException e)
1030 {
1031 e.GetHashCode();
1032 }
1033 catch(ThreadStateException ex)
1034 {
1035 ex.GetHashCode();
1036 // In case the thread has been terminated
1037 // after the check if it is alive.
1038 }
1039 }
1040 }
1041 }
1042 }
1043
1044 /// <summary>
1045 /// Wait for all work items to complete
1046 /// </summary>
1047 /// <param name="waitableResults">Array of work item result objects</param>
1048 /// <returns>
1049 /// true when every work item in workItemResults has completed; otherwise false.
1050 /// </returns>
1051 public static bool WaitAll(
1052 IWaitableResult [] waitableResults)
1053 {
1054 return WaitAll(waitableResults, Timeout.Infinite, true);
1055 }
1056
1057 /// <summary>
1058 /// Wait for all work items to complete
1059 /// </summary>
1060 /// <param name="waitableResults">Array of work item result objects</param>
1061 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1062 /// <param name="exitContext">
1063 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1064 /// </param>
1065 /// <returns>
1066 /// true when every work item in workItemResults has completed; otherwise false.
1067 /// </returns>
1068 public static bool WaitAll(
1069 IWaitableResult [] waitableResults,
1070 TimeSpan timeout,
1071 bool exitContext)
1072 {
1073 return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext);
1074 }
1075
1076 /// <summary>
1077 /// Wait for all work items to complete
1078 /// </summary>
1079 /// <param name="waitableResults">Array of work item result objects</param>
1080 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1081 /// <param name="exitContext">
1082 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1083 /// </param>
1084 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1085 /// <returns>
1086 /// true when every work item in workItemResults has completed; otherwise false.
1087 /// </returns>
1088 public static bool WaitAll(
1089 IWaitableResult[] waitableResults,
1090 TimeSpan timeout,
1091 bool exitContext,
1092 WaitHandle cancelWaitHandle)
1093 {
1094 return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle);
1095 }
1096
1097 /// <summary>
1098 /// Wait for all work items to complete
1099 /// </summary>
1100 /// <param name="waitableResults">Array of work item result objects</param>
1101 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1102 /// <param name="exitContext">
1103 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1104 /// </param>
1105 /// <returns>
1106 /// true when every work item in workItemResults has completed; otherwise false.
1107 /// </returns>
1108 public static bool WaitAll(
1109 IWaitableResult [] waitableResults,
1110 int millisecondsTimeout,
1111 bool exitContext)
1112 {
1113 return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, null);
1114 }
1115
1116 /// <summary>
1117 /// Wait for all work items to complete
1118 /// </summary>
1119 /// <param name="waitableResults">Array of work item result objects</param>
1120 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1121 /// <param name="exitContext">
1122 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1123 /// </param>
1124 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1125 /// <returns>
1126 /// true when every work item in workItemResults has completed; otherwise false.
1127 /// </returns>
1128 public static bool WaitAll(
1129 IWaitableResult[] waitableResults,
1130 int millisecondsTimeout,
1131 bool exitContext,
1132 WaitHandle cancelWaitHandle)
1133 {
1134 return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle);
1135 }
1136
1137
1138 /// <summary>
1139 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1140 /// </summary>
1141 /// <param name="waitableResults">Array of work item result objects</param>
1142 /// <returns>
1143 /// The array index of the work item result that satisfied the wait, or WaitTimeout if any of the work items has been canceled.
1144 /// </returns>
1145 public static int WaitAny(
1146 IWaitableResult [] waitableResults)
1147 {
1148 return WaitAny(waitableResults, Timeout.Infinite, true);
1149 }
1150
1151 /// <summary>
1152 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1153 /// </summary>
1154 /// <param name="waitableResults">Array of work item result objects</param>
1155 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1156 /// <param name="exitContext">
1157 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1158 /// </param>
1159 /// <returns>
1160 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1161 /// </returns>
1162 public static int WaitAny(
1163 IWaitableResult[] waitableResults,
1164 TimeSpan timeout,
1165 bool exitContext)
1166 {
1167 return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext);
1168 }
1169
1170 /// <summary>
1171 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1172 /// </summary>
1173 /// <param name="waitableResults">Array of work item result objects</param>
1174 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1175 /// <param name="exitContext">
1176 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1177 /// </param>
1178 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1179 /// <returns>
1180 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1181 /// </returns>
1182 public static int WaitAny(
1183 IWaitableResult [] waitableResults,
1184 TimeSpan timeout,
1185 bool exitContext,
1186 WaitHandle cancelWaitHandle)
1187 {
1188 return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle);
1189 }
1190
1191 /// <summary>
1192 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1193 /// </summary>
1194 /// <param name="waitableResults">Array of work item result objects</param>
1195 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1196 /// <param name="exitContext">
1197 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1198 /// </param>
1199 /// <returns>
1200 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1201 /// </returns>
1202 public static int WaitAny(
1203 IWaitableResult [] waitableResults,
1204 int millisecondsTimeout,
1205 bool exitContext)
1206 {
1207 return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, null);
1208 }
1209
1210 /// <summary>
1211 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1212 /// </summary>
1213 /// <param name="waitableResults">Array of work item result objects</param>
1214 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1215 /// <param name="exitContext">
1216 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1217 /// </param>
1218 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1219 /// <returns>
1220 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1221 /// </returns>
1222 public static int WaitAny(
1223 IWaitableResult [] waitableResults,
1224 int millisecondsTimeout,
1225 bool exitContext,
1226 WaitHandle cancelWaitHandle)
1227 {
1228 return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle);
1229 }
1230
1231 /// <summary>
1232 /// Creates a new WorkItemsGroup.
1233 /// </summary>
1234 /// <param name="concurrency">The number of work items that can be run concurrently</param>
1235 /// <returns>A reference to the WorkItemsGroup</returns>
1236 public IWorkItemsGroup CreateWorkItemsGroup(int concurrency)
1237 {
1238 IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, _stpStartInfo);
1239 return workItemsGroup;
1240 }
726 1241
727 /// <summary> 1242 /// <summary>
728 /// Queue a work item 1243 /// Creates a new WorkItemsGroup.
729 /// </summary> 1244 /// </summary>
730 /// <param name="callback">A callback to execute</param> 1245 /// <param name="concurrency">The number of work items that can be run concurrently</param>
731 /// <param name="workItemPriority">The priority of the work item</param> 1246 /// <param name="wigStartInfo">A WorkItemsGroup configuration that overrides the default behavior</param>
732 /// <returns>Returns a work item result</returns> 1247 /// <returns>A reference to the WorkItemsGroup</returns>
733 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority) 1248 public IWorkItemsGroup CreateWorkItemsGroup(int concurrency, WIGStartInfo wigStartInfo)
734 { 1249 {
735 ValidateNotDisposed(); 1250 IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, wigStartInfo);
736 ValidateCallback(callback); 1251 return workItemsGroup;
737 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, workItemPriority); 1252 }
738 Enqueue(workItem);
739 return workItem.GetWorkItemResult();
740 }
741 1253
742 /// <summary> 1254 #region Fire Thread's Events
743 /// Queue a work item
744 /// </summary>
745 /// <param name="workItemInfo">Work item info</param>
746 /// <param name="callback">A callback to execute</param>
747 /// <returns>Returns a work item result</returns>
748 public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback)
749 {
750 ValidateNotDisposed();
751 ValidateCallback(callback);
752 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, workItemInfo, callback);
753 Enqueue(workItem);
754 return workItem.GetWorkItemResult();
755 }
756 1255
757 /// <summary> 1256 private void FireOnThreadInitialization()
758 /// Queue a work item
759 /// </summary>
760 /// <param name="callback">A callback to execute</param>
761 /// <param name="state">
762 /// The context object of the work item. Used for passing arguments to the work item.
763 /// </param>
764 /// <returns>Returns a work item result</returns>
765 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state)
766 { 1257 {
767 ValidateNotDisposed(); 1258 if (null != _onThreadInitialization)
768 ValidateCallback(callback); 1259 {
769 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state); 1260 foreach (ThreadInitializationHandler tih in _onThreadInitialization.GetInvocationList())
770 Enqueue(workItem); 1261 {
771 return workItem.GetWorkItemResult(); 1262 try
1263 {
1264 tih();
1265 }
1266 catch (Exception e)
1267 {
1268 e.GetHashCode();
1269 Debug.Assert(false);
1270 throw;
1271 }
1272 }
1273 }
772 } 1274 }
773 1275
774 /// <summary> 1276 private void FireOnThreadTermination()
775 /// Queue a work item
776 /// </summary>
777 /// <param name="callback">A callback to execute</param>
778 /// <param name="state">
779 /// The context object of the work item. Used for passing arguments to the work item.
780 /// </param>
781 /// <param name="workItemPriority">The work item priority</param>
782 /// <returns>Returns a work item result</returns>
783 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority)
784 { 1277 {
785 ValidateNotDisposed(); 1278 if (null != _onThreadTermination)
786 ValidateCallback(callback); 1279 {
787 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, workItemPriority); 1280 foreach (ThreadTerminationHandler tth in _onThreadTermination.GetInvocationList())
788 Enqueue(workItem); 1281 {
789 return workItem.GetWorkItemResult(); 1282 try
1283 {
1284 tth();
1285 }
1286 catch (Exception e)
1287 {
1288 e.GetHashCode();
1289 Debug.Assert(false);
1290 throw;
1291 }
1292 }
1293 }
790 } 1294 }
791 1295
792 /// <summary> 1296 #endregion
793 /// Queue a work item
794 /// </summary>
795 /// <param name="workItemInfo">Work item information</param>
796 /// <param name="callback">A callback to execute</param>
797 /// <param name="state">
798 /// The context object of the work item. Used for passing arguments to the work item.
799 /// </param>
800 /// <returns>Returns a work item result</returns>
801 public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state)
802 {
803 ValidateNotDisposed();
804 ValidateCallback(callback);
805 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, workItemInfo, callback, state);
806 Enqueue(workItem);
807 return workItem.GetWorkItemResult();
808 }
809 1297
810 /// <summary> 1298 /// <summary>
811 /// Queue a work item 1299 /// This event is fired when a thread is created.
1300 /// Use it to initialize a thread before the work items use it.
812 /// </summary> 1301 /// </summary>
813 /// <param name="callback">A callback to execute</param> 1302 public event ThreadInitializationHandler OnThreadInitialization
814 /// <param name="state">
815 /// The context object of the work item. Used for passing arguments to the work item.
816 /// </param>
817 /// <param name="postExecuteWorkItemCallback">
818 /// A delegate to call after the callback completion
819 /// </param>
820 /// <returns>Returns a work item result</returns>
821 public IWorkItemResult QueueWorkItem(
822 WorkItemCallback callback,
823 object state,
824 PostExecuteWorkItemCallback postExecuteWorkItemCallback)
825 { 1303 {
826 ValidateNotDisposed(); 1304 add { _onThreadInitialization += value; }
827 ValidateCallback(callback); 1305 remove { _onThreadInitialization -= value; }
828 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback);
829 Enqueue(workItem);
830 return workItem.GetWorkItemResult();
831 } 1306 }
832 1307
833 /// <summary> 1308 /// <summary>
834 /// Queue a work item 1309 /// This event is fired when a thread is terminating.
1310 /// Use it for cleanup.
835 /// </summary> 1311 /// </summary>
836 /// <param name="callback">A callback to execute</param> 1312 public event ThreadTerminationHandler OnThreadTermination
837 /// <param name="state">
838 /// The context object of the work item. Used for passing arguments to the work item.
839 /// </param>
840 /// <param name="postExecuteWorkItemCallback">
841 /// A delegate to call after the callback completion
842 /// </param>
843 /// <param name="workItemPriority">The work item priority</param>
844 /// <returns>Returns a work item result</returns>
845 public IWorkItemResult QueueWorkItem(
846 WorkItemCallback callback,
847 object state,
848 PostExecuteWorkItemCallback postExecuteWorkItemCallback,
849 WorkItemPriority workItemPriority)
850 { 1313 {
851 ValidateNotDisposed(); 1314 add { _onThreadTermination += value; }
852 ValidateCallback(callback); 1315 remove { _onThreadTermination -= value; }
853 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority);
854 Enqueue(workItem);
855 return workItem.GetWorkItemResult();
856 } 1316 }
857 1317
858 /// <summary>
859 /// Queue a work item
860 /// </summary>
861 /// <param name="callback">A callback to execute</param>
862 /// <param name="state">
863 /// The context object of the work item. Used for passing arguments to the work item.
864 /// </param>
865 /// <param name="postExecuteWorkItemCallback">
866 /// A delegate to call after the callback completion
867 /// </param>
868 /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param>
869 /// <returns>Returns a work item result</returns>
870 public IWorkItemResult QueueWorkItem(
871 WorkItemCallback callback,
872 object state,
873 PostExecuteWorkItemCallback postExecuteWorkItemCallback,
874 CallToPostExecute callToPostExecute)
875 {
876 ValidateNotDisposed();
877 ValidateCallback(callback);
878 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute);
879 Enqueue(workItem);
880 return workItem.GetWorkItemResult();
881 }
882 1318
883 /// <summary> 1319 internal void CancelAbortWorkItemsGroup(WorkItemsGroup wig)
884 /// Queue a work item
885 /// </summary>
886 /// <param name="callback">A callback to execute</param>
887 /// <param name="state">
888 /// The context object of the work item. Used for passing arguments to the work item.
889 /// </param>
890 /// <param name="postExecuteWorkItemCallback">
891 /// A delegate to call after the callback completion
892 /// </param>
893 /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param>
894 /// <param name="workItemPriority">The work item priority</param>
895 /// <returns>Returns a work item result</returns>
896 public IWorkItemResult QueueWorkItem(
897 WorkItemCallback callback,
898 object state,
899 PostExecuteWorkItemCallback postExecuteWorkItemCallback,
900 CallToPostExecute callToPostExecute,
901 WorkItemPriority workItemPriority)
902 { 1320 {
903 ValidateNotDisposed(); 1321 foreach (ThreadEntry threadEntry in _workerThreads.Values)
904 ValidateCallback(callback); 1322 {
905 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority); 1323 WorkItem workItem = threadEntry.CurrentWorkItem;
906 Enqueue(workItem); 1324 if (null != workItem &&
907 return workItem.GetWorkItemResult(); 1325 workItem.WasQueuedBy(wig) &&
1326 !workItem.IsCanceled)
1327 {
1328 threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true);
1329 }
1330 }
908 } 1331 }
909 1332
910 /// <summary> 1333
911 /// Wait for the thread pool to be idle
912 /// </summary>
913 public void WaitForIdle()
914 {
915 WaitForIdle(Timeout.Infinite);
916 }
917 1334
918 /// <summary> 1335 #endregion
919 /// Wait for the thread pool to be idle
920 /// </summary>
921 public bool WaitForIdle(TimeSpan timeout)
922 {
923 return WaitForIdle((int)timeout.TotalMilliseconds);
924 }
925 1336
926 /// <summary> 1337 #region Properties
927 /// Wait for the thread pool to be idle
928 /// </summary>
929 public bool WaitForIdle(int millisecondsTimeout)
930 {
931 ValidateWaitForIdle();
932 return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false);
933 }
934 1338
935 private void ValidateWaitForIdle() 1339 /// <summary>
936 { 1340 /// Get/Set the lower limit of threads in the pool.
937 if(_smartThreadPool == this) 1341 /// </summary>
1342 public int MinThreads
1343 {
1344 get
1345 {
1346 ValidateNotDisposed();
1347 return _stpStartInfo.MinWorkerThreads;
1348 }
1349 set
938 { 1350 {
939 throw new NotSupportedException( 1351 Debug.Assert(value >= 0);
940 "WaitForIdle cannot be called from a thread on its SmartThreadPool, it will cause may cause a deadlock"); 1352 Debug.Assert(value <= _stpStartInfo.MaxWorkerThreads);
1353 if (_stpStartInfo.MaxWorkerThreads < value)
1354 {
1355 _stpStartInfo.MaxWorkerThreads = value;
1356 }
1357 _stpStartInfo.MinWorkerThreads = value;
1358 StartOptimalNumberOfThreads();
941 } 1359 }
942 } 1360 }
1361
1362 /// <summary>
1363 /// Get/Set the upper limit of threads in the pool.
1364 /// </summary>
1365 public int MaxThreads
1366 {
1367 get
1368 {
1369 ValidateNotDisposed();
1370 return _stpStartInfo.MaxWorkerThreads;
1371 }
943 1372
944 internal void ValidateWorkItemsGroupWaitForIdle(IWorkItemsGroup workItemsGroup) 1373 set
1374 {
1375 Debug.Assert(value > 0);
1376 Debug.Assert(value >= _stpStartInfo.MinWorkerThreads);
1377 if (_stpStartInfo.MinWorkerThreads > value)
1378 {
1379 _stpStartInfo.MinWorkerThreads = value;
1380 }
1381 _stpStartInfo.MaxWorkerThreads = value;
1382 StartOptimalNumberOfThreads();
1383 }
1384 }
1385 /// <summary>
1386 /// Get the number of threads in the thread pool.
1387 /// Should be between the lower and the upper limits.
1388 /// </summary>
1389 public int ActiveThreads
1390 {
1391 get
1392 {
1393 ValidateNotDisposed();
1394 return _workerThreads.Count;
1395 }
1396 }
1397
1398 /// <summary>
1399 /// Get the number of busy (not idle) threads in the thread pool.
1400 /// </summary>
1401 public int InUseThreads
1402 {
1403 get
1404 {
1405 ValidateNotDisposed();
1406 return _inUseWorkerThreads;
1407 }
1408 }
1409
1410 /// <summary>
1411 /// Returns true if the current running work item has been cancelled.
1412 /// Must be used within the work item's callback method.
1413 /// The work item should sample this value in order to know if it
1414 /// needs to quit before its completion.
1415 /// </summary>
1416 public static bool IsWorkItemCanceled
945 { 1417 {
946 ValidateWorkItemsGroupWaitForIdleImpl(workItemsGroup, SmartThreadPool._currentWorkItem); 1418 get
947 if ((null != workItemsGroup) &&
948 (null != SmartThreadPool._currentWorkItem) &&
949 SmartThreadPool._currentWorkItem.WasQueuedBy(workItemsGroup))
950 { 1419 {
951 throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it will cause may cause a deadlock"); 1420 return CurrentThreadEntry.CurrentWorkItem.IsCanceled;
952 } 1421 }
953 } 1422 }
954 1423
955 [MethodImpl(MethodImplOptions.NoInlining)] 1424 /// <summary>
956 private void ValidateWorkItemsGroupWaitForIdleImpl(IWorkItemsGroup workItemsGroup, WorkItem workItem) 1425 /// Checks if the work item has been cancelled, and if yes then abort the thread.
1426 /// Can be used with Cancel and timeout
1427 /// </summary>
1428 public static void AbortOnWorkItemCancel()
957 { 1429 {
958 if ((null != workItemsGroup) && 1430 if (IsWorkItemCanceled)
959 (null != workItem) &&
960 workItem.WasQueuedBy(workItemsGroup))
961 { 1431 {
962 throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it will cause may cause a deadlock"); 1432 Thread.CurrentThread.Abort();
963 } 1433 }
964 } 1434 }
965 1435
966
967
968 /// <summary> 1436 /// <summary>
969 /// Force the SmartThreadPool to shutdown 1437 /// Thread Pool start information (readonly)
970 /// </summary> 1438 /// </summary>
971 public void Shutdown() 1439 public STPStartInfo STPStartInfo
972 { 1440 {
973 Shutdown(true, 0); 1441 get
1442 {
1443 return _stpStartInfo.AsReadOnly();
1444 }
974 } 1445 }
975 1446
976 public void Shutdown(bool forceAbort, TimeSpan timeout) 1447 public bool IsShuttingdown
977 { 1448 {
978 Shutdown(forceAbort, (int)timeout.TotalMilliseconds); 1449 get { return _shutdown; }
979 } 1450 }
980 1451
981 /// <summary> 1452 /// <summary>
982 /// Empties the queue of work items and abort the threads in the pool. 1453 /// Return the local calculated performance counters
1454 /// Available only if STPStartInfo.EnableLocalPerformanceCounters is true.
983 /// </summary> 1455 /// </summary>
984 public void Shutdown(bool forceAbort, int millisecondsTimeout) 1456 public ISTPPerformanceCountersReader PerformanceCountersReader
985 { 1457 {
986 ValidateNotDisposed(); 1458 get { return (ISTPPerformanceCountersReader)_localPCs; }
987 1459 }
988 ISTPInstancePerformanceCounters pcs = _pcs;
989
990 if (NullSTPInstancePerformanceCounters.Instance != _pcs)
991 {
992 _pcs.Dispose();
993 // Set the _pcs to "null" to stop updating the performance
994 // counters
995 _pcs = NullSTPInstancePerformanceCounters.Instance;
996 }
997
998 Thread [] threads = null;
999 lock(_workerThreads.SyncRoot)
1000 {
1001 // Shutdown the work items queue
1002 _workItemsQueue.Dispose();
1003
1004 // Signal the threads to exit
1005 _shutdown = true;
1006 _shuttingDownEvent.Set();
1007 1460
1008 // Make a copy of the threads' references in the pool 1461 #endregion
1009 threads = new Thread [_workerThreads.Count];
1010 _workerThreads.Keys.CopyTo(threads, 0);
1011 }
1012 1462
1013 int millisecondsLeft = millisecondsTimeout; 1463 #region IDisposable Members
1014 DateTime start = DateTime.Now;
1015 bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout);
1016 bool timeout = false;
1017 1464
1018 // Each iteration we update the time left for the timeout. 1465 public void Dispose()
1019 foreach(Thread thread in threads) 1466 {
1467 if (!_isDisposed)
1020 { 1468 {
1021 // Join don't work with negative numbers 1469 if (!_shutdown)
1022 if (!waitInfinitely && (millisecondsLeft < 0))
1023 { 1470 {
1024 timeout = true; 1471 Shutdown();
1025 break;
1026 } 1472 }
1027 1473
1028 // Wait for the thread to terminate 1474 if (null != _shuttingDownEvent)
1029 bool success = thread.Join(millisecondsLeft);
1030 if(!success)
1031 { 1475 {
1032 timeout = true; 1476 _shuttingDownEvent.Close();
1033 break; 1477 _shuttingDownEvent = null;
1034 } 1478 }
1035 1479 _workerThreads.Clear();
1036 if(!waitInfinitely) 1480
1481 if (null != _isIdleWaitHandle)
1037 { 1482 {
1038 // Update the time left to wait 1483 _isIdleWaitHandle.Close();
1039 TimeSpan ts = DateTime.Now - start; 1484 _isIdleWaitHandle = null;
1040 millisecondsLeft = millisecondsTimeout - (int)ts.TotalMilliseconds;
1041 } 1485 }
1042 }
1043 1486
1044 if (timeout && forceAbort) 1487 _isDisposed = true;
1045 {
1046 // Abort the threads in the pool
1047 foreach(Thread thread in threads)
1048 {
1049 if ((thread != null) && thread.IsAlive)
1050 {
1051 try
1052 {
1053 thread.Abort("Shutdown");
1054 }
1055 catch(SecurityException e)
1056 {
1057 e.GetHashCode();
1058 }
1059 catch(ThreadStateException ex)
1060 {
1061 ex.GetHashCode();
1062 // In case the thread has been terminated
1063 // after the check if it is alive.
1064 }
1065 }
1066 }
1067 } 1488 }
1068
1069 // Dispose of the performance counters
1070 pcs.Dispose();
1071 } 1489 }
1072 1490
1073 /// <summary> 1491 private void ValidateNotDisposed()
1074 /// Wait for all work items to complete
1075 /// </summary>
1076 /// <param name="workItemResults">Array of work item result objects</param>
1077 /// <returns>
1078 /// true when every work item in workItemResults has completed; otherwise false.
1079 /// </returns>
1080 public static bool WaitAll(
1081 IWorkItemResult [] workItemResults)
1082 { 1492 {
1083 return WaitAll(workItemResults, Timeout.Infinite, true); 1493 if(_isDisposed)
1494 {
1495 throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
1496 }
1084 } 1497 }
1498 #endregion
1499
1500 #region WorkItemsGroupBase Overrides
1085 1501
1086 /// <summary> 1502 /// <summary>
1087 /// Wait for all work items to complete 1503 /// Get/Set the maximum number of work items that execute cocurrency on the thread pool
1088 /// </summary> 1504 /// </summary>
1089 /// <param name="workItemResults">Array of work item result objects</param> 1505 public override int Concurrency
1090 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param> 1506 {
1091 /// <param name="exitContext"> 1507 get { return MaxThreads; }
1092 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1508 set { MaxThreads = value; }
1093 /// </param> 1509 }
1094 /// <returns> 1510
1095 /// true when every work item in workItemResults has completed; otherwise false. 1511 /// <summary>
1096 /// </returns> 1512 /// Get the number of work items in the queue.
1097 public static bool WaitAll( 1513 /// </summary>
1098 IWorkItemResult [] workItemResults, 1514 public override int WaitingCallbacks
1099 TimeSpan timeout, 1515 {
1100 bool exitContext) 1516 get
1101 { 1517 {
1102 return WaitAll(workItemResults, (int)timeout.TotalMilliseconds, exitContext); 1518 ValidateNotDisposed();
1103 } 1519 return _workItemsQueue.Count;
1520 }
1521 }
1104 1522
1105 /// <summary> 1523 /// <summary>
1106 /// Wait for all work items to complete 1524 /// Get an array with all the state objects of the currently running items.
1525 /// The array represents a snap shot and impact performance.
1107 /// </summary> 1526 /// </summary>
1108 /// <param name="workItemResults">Array of work item result objects</param> 1527 public override object[] GetStates()
1109 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1110 /// <param name="exitContext">
1111 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1112 /// </param>
1113 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1114 /// <returns>
1115 /// true when every work item in workItemResults has completed; otherwise false.
1116 /// </returns>
1117 public static bool WaitAll(
1118 IWorkItemResult [] workItemResults,
1119 TimeSpan timeout,
1120 bool exitContext,
1121 WaitHandle cancelWaitHandle)
1122 { 1528 {
1123 return WaitAll(workItemResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle); 1529 object[] states = _workItemsQueue.GetStates();
1530 return states;
1124 } 1531 }
1125 1532
1126 /// <summary> 1533 /// <summary>
1127 /// Wait for all work items to complete 1534 /// WorkItemsGroup start information (readonly)
1128 /// </summary> 1535 /// </summary>
1129 /// <param name="workItemResults">Array of work item result objects</param> 1536 public override WIGStartInfo WIGStartInfo
1130 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1131 /// <param name="exitContext">
1132 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1133 /// </param>
1134 /// <returns>
1135 /// true when every work item in workItemResults has completed; otherwise false.
1136 /// </returns>
1137 public static bool WaitAll(
1138 IWorkItemResult [] workItemResults,
1139 int millisecondsTimeout,
1140 bool exitContext)
1141 { 1537 {
1142 return WorkItem.WaitAll(workItemResults, millisecondsTimeout, exitContext, null); 1538 get { return _stpStartInfo.AsReadOnly(); }
1143 } 1539 }
1144 1540
1145 /// <summary> 1541 /// <summary>
1146 /// Wait for all work items to complete 1542 /// Start the thread pool if it was started suspended.
1543 /// If it is already running, this method is ignored.
1147 /// </summary> 1544 /// </summary>
1148 /// <param name="workItemResults">Array of work item result objects</param> 1545 public override void Start()
1149 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1150 /// <param name="exitContext">
1151 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1152 /// </param>
1153 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1154 /// <returns>
1155 /// true when every work item in workItemResults has completed; otherwise false.
1156 /// </returns>
1157 public static bool WaitAll(
1158 IWorkItemResult [] workItemResults,
1159 int millisecondsTimeout,
1160 bool exitContext,
1161 WaitHandle cancelWaitHandle)
1162 { 1546 {
1163 return WorkItem.WaitAll(workItemResults, millisecondsTimeout, exitContext, cancelWaitHandle); 1547 if (!_isSuspended)
1164 } 1548 {
1549 return;
1550 }
1551 _isSuspended = false;
1165 1552
1553 ICollection workItemsGroups = _workItemsGroups.Values;
1554 foreach (WorkItemsGroup workItemsGroup in workItemsGroups)
1555 {
1556 workItemsGroup.OnSTPIsStarting();
1557 }
1166 1558
1167 /// <summary> 1559 StartOptimalNumberOfThreads();
1168 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1169 /// </summary>
1170 /// <param name="workItemResults">Array of work item result objects</param>
1171 /// <returns>
1172 /// The array index of the work item result that satisfied the wait, or WaitTimeout if any of the work items has been canceled.
1173 /// </returns>
1174 public static int WaitAny(
1175 IWorkItemResult [] workItemResults)
1176 {
1177 return WaitAny(workItemResults, Timeout.Infinite, true);
1178 } 1560 }
1179 1561
1180 /// <summary> 1562 /// <summary>
1181 /// Waits for any of the work items in the specified array to complete, cancel, or timeout 1563 /// Cancel all work items using thread abortion
1182 /// </summary> 1564 /// </summary>
1183 /// <param name="workItemResults">Array of work item result objects</param> 1565 /// <param name="abortExecution">True to stop work items by raising ThreadAbortException</param>
1184 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param> 1566 public override void Cancel(bool abortExecution)
1185 /// <param name="exitContext">
1186 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1187 /// </param>
1188 /// <returns>
1189 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1190 /// </returns>
1191 public static int WaitAny(
1192 IWorkItemResult [] workItemResults,
1193 TimeSpan timeout,
1194 bool exitContext)
1195 { 1567 {
1196 return WaitAny(workItemResults, (int)timeout.TotalMilliseconds, exitContext); 1568 _canceledSmartThreadPool.IsCanceled = true;
1197 } 1569 _canceledSmartThreadPool = new CanceledWorkItemsGroup();
1198 1570
1199 /// <summary> 1571 ICollection workItemsGroups = _workItemsGroups.Values;
1200 /// Waits for any of the work items in the specified array to complete, cancel, or timeout 1572 foreach (WorkItemsGroup workItemsGroup in workItemsGroups)
1201 /// </summary> 1573 {
1202 /// <param name="workItemResults">Array of work item result objects</param> 1574 workItemsGroup.Cancel(abortExecution);
1203 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param> 1575 }
1204 /// <param name="exitContext"> 1576
1205 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1577 if (abortExecution)
1206 /// </param> 1578 {
1207 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param> 1579 foreach (ThreadEntry threadEntry in _workerThreads.Values)
1208 /// <returns> 1580 {
1209 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. 1581 WorkItem workItem = threadEntry.CurrentWorkItem;
1210 /// </returns> 1582 if (null != workItem &&
1211 public static int WaitAny( 1583 threadEntry.AssociatedSmartThreadPool == this &&
1212 IWorkItemResult [] workItemResults, 1584 !workItem.IsCanceled)
1213 TimeSpan timeout, 1585 {
1214 bool exitContext, 1586 threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true);
1215 WaitHandle cancelWaitHandle) 1587 }
1216 { 1588 }
1217 return WaitAny(workItemResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle); 1589 }
1218 } 1590 }
1219 1591
1220 /// <summary> 1592 /// <summary>
1221 /// Waits for any of the work items in the specified array to complete, cancel, or timeout 1593 /// Wait for the thread pool to be idle
1222 /// </summary> 1594 /// </summary>
1223 /// <param name="workItemResults">Array of work item result objects</param> 1595 public override bool WaitForIdle(int millisecondsTimeout)
1224 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1225 /// <param name="exitContext">
1226 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1227 /// </param>
1228 /// <returns>
1229 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1230 /// </returns>
1231 public static int WaitAny(
1232 IWorkItemResult [] workItemResults,
1233 int millisecondsTimeout,
1234 bool exitContext)
1235 { 1596 {
1236 return WorkItem.WaitAny(workItemResults, millisecondsTimeout, exitContext, null); 1597 ValidateWaitForIdle();
1598 return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
1237 } 1599 }
1238 1600
1239 /// <summary> 1601 /// <summary>
1240 /// Waits for any of the work items in the specified array to complete, cancel, or timeout 1602 /// This event is fired when all work items are completed.
1603 /// (When IsIdle changes to true)
1604 /// This event only work on WorkItemsGroup. On SmartThreadPool
1605 /// it throws the NotImplementedException.
1241 /// </summary> 1606 /// </summary>
1242 /// <param name="workItemResults">Array of work item result objects</param> 1607 public override event WorkItemsGroupIdleHandler OnIdle
1243 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1244 /// <param name="exitContext">
1245 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1246 /// </param>
1247 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1248 /// <returns>
1249 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1250 /// </returns>
1251 public static int WaitAny(
1252 IWorkItemResult [] workItemResults,
1253 int millisecondsTimeout,
1254 bool exitContext,
1255 WaitHandle cancelWaitHandle)
1256 {
1257 return WorkItem.WaitAny(workItemResults, millisecondsTimeout, exitContext, cancelWaitHandle);
1258 }
1259
1260 public IWorkItemsGroup CreateWorkItemsGroup(int concurrency)
1261 {
1262 IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, _stpStartInfo);
1263 return workItemsGroup;
1264 }
1265
1266 public IWorkItemsGroup CreateWorkItemsGroup(int concurrency, WIGStartInfo wigStartInfo)
1267 {
1268 IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, wigStartInfo);
1269 return workItemsGroup;
1270 }
1271
1272 public event WorkItemsGroupIdleHandler OnIdle
1273 { 1608 {
1274 add 1609 add
1275 { 1610 {
@@ -1283,166 +1618,115 @@ namespace Amib.Threading
1283 } 1618 }
1284 } 1619 }
1285 1620
1286 public void Cancel() 1621 internal override void PreQueueWorkItem()
1287 {
1288 ICollection workItemsGroups = _workItemsGroups.Values;
1289 foreach(WorkItemsGroup workItemsGroup in workItemsGroups)
1290 {
1291 workItemsGroup.Cancel();
1292 }
1293 }
1294
1295 public void Start()
1296 { 1622 {
1297 lock (this) 1623 ValidateNotDisposed();
1298 {
1299 if (!this._stpStartInfo.StartSuspended)
1300 {
1301 return;
1302 }
1303 _stpStartInfo.StartSuspended = false;
1304 }
1305
1306 ICollection workItemsGroups = _workItemsGroups.Values;
1307 foreach(WorkItemsGroup workItemsGroup in workItemsGroups)
1308 {
1309 workItemsGroup.OnSTPIsStarting();
1310 }
1311
1312 StartOptimalNumberOfThreads();
1313 } 1624 }
1314 1625
1315 #endregion 1626 #endregion
1316 1627
1317 #region Properties 1628 #region Join, Choice, Pipe, etc.
1318 1629
1319 /// <summary> 1630 /// <summary>
1320 /// Get/Set the name of the SmartThreadPool instance 1631 /// Executes all actions in parallel.
1632 /// Returns when they all finish.
1321 /// </summary> 1633 /// </summary>
1322 public string Name 1634 /// <param name="actions">Actions to execute</param>
1323 { 1635 public void Join(IEnumerable<Action> actions)
1324 get 1636 {
1325 { 1637 WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
1326 return _name; 1638 IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo);
1327 } 1639 foreach (Action action in actions)
1328
1329 set
1330 {
1331 _name = value;
1332 }
1333 }
1334
1335 /// <summary>
1336 /// Get the lower limit of threads in the pool.
1337 /// </summary>
1338 public int MinThreads
1339 {
1340 get
1341 { 1640 {
1342 ValidateNotDisposed(); 1641 workItemsGroup.QueueWorkItem(action);
1343 return _stpStartInfo.MinWorkerThreads;
1344 } 1642 }
1643 workItemsGroup.Start();
1644 workItemsGroup.WaitForIdle();
1345 } 1645 }
1346 1646
1347 /// <summary> 1647 /// <summary>
1348 /// Get the upper limit of threads in the pool. 1648 /// Executes all actions in parallel.
1349 /// </summary> 1649 /// Returns when they all finish.
1350 public int MaxThreads
1351 {
1352 get
1353 {
1354 ValidateNotDisposed();
1355 return _stpStartInfo.MaxWorkerThreads;
1356 }
1357 }
1358 /// <summary>
1359 /// Get the number of threads in the thread pool.
1360 /// Should be between the lower and the upper limits.
1361 /// </summary> 1650 /// </summary>
1362 public int ActiveThreads 1651 /// <param name="actions">Actions to execute</param>
1363 { 1652 public void Join(params Action[] actions)
1364 get 1653 {
1365 { 1654 Join((IEnumerable<Action>)actions);
1366 ValidateNotDisposed();
1367 return _workerThreads.Count;
1368 }
1369 } 1655 }
1370 1656
1371 /// <summary> 1657 private class ChoiceIndex
1372 /// Get the number of busy (not idle) threads in the thread pool. 1658 {
1373 /// </summary> 1659 public int _index = -1;
1374 public int InUseThreads
1375 {
1376 get
1377 {
1378 ValidateNotDisposed();
1379 return _inUseWorkerThreads;
1380 }
1381 } 1660 }
1382 1661
1383 /// <summary> 1662 /// <summary>
1384 /// Get the number of work items in the queue. 1663 /// Executes all actions in parallel
1664 /// Returns when the first one completes
1385 /// </summary> 1665 /// </summary>
1386 public int WaitingCallbacks 1666 /// <param name="actions">Actions to execute</param>
1387 { 1667 public int Choice(IEnumerable<Action> actions)
1388 get 1668 {
1389 { 1669 WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
1390 ValidateNotDisposed(); 1670 IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo);
1391 return _workItemsQueue.Count;
1392 }
1393 }
1394 1671
1672 ManualResetEvent anActionCompleted = new ManualResetEvent(false);
1395 1673
1396 public event EventHandler Idle 1674 ChoiceIndex choiceIndex = new ChoiceIndex();
1397 { 1675
1398 add 1676 int i = 0;
1677 foreach (Action action in actions)
1399 { 1678 {
1400 _stpIdle += value; 1679 Action act = action;
1680 int value = i;
1681 workItemsGroup.QueueWorkItem(() => { act(); Interlocked.CompareExchange(ref choiceIndex._index, value, -1); anActionCompleted.Set(); });
1682 ++i;
1401 } 1683 }
1684 workItemsGroup.Start();
1685 anActionCompleted.WaitOne();
1402 1686
1403 remove 1687 return choiceIndex._index;
1404 {
1405 _stpIdle -= value;
1406 }
1407 } 1688 }
1408 1689
1409 #endregion 1690 /// <summary>
1410 1691 /// Executes all actions in parallel
1411 #region IDisposable Members 1692 /// Returns when the first one completes
1412 1693 /// </summary>
1413// ~SmartThreadPool() 1694 /// <param name="actions">Actions to execute</param>
1414// { 1695 public int Choice(params Action[] actions)
1415// Dispose(); 1696 {
1416// } 1697 return Choice((IEnumerable<Action>)actions);
1698 }
1417 1699
1418 public void Dispose() 1700 /// <summary>
1701 /// Executes actions in sequence asynchronously.
1702 /// Returns immediately.
1703 /// </summary>
1704 /// <param name="pipeState">A state context that passes </param>
1705 /// <param name="actions">Actions to execute in the order they should run</param>
1706 public void Pipe<T>(T pipeState, IEnumerable<Action<T>> actions)
1419 { 1707 {
1420 if (!_isDisposed) 1708 WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
1709 IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(1, wigStartInfo);
1710 foreach (Action<T> action in actions)
1421 { 1711 {
1422 if (!_shutdown) 1712 Action<T> act = action;
1423 { 1713 workItemsGroup.QueueWorkItem(() => act(pipeState));
1424 Shutdown();
1425 }
1426
1427 if (null != _shuttingDownEvent)
1428 {
1429 _shuttingDownEvent.Close();
1430 _shuttingDownEvent = null;
1431 }
1432 _workerThreads.Clear();
1433 _isDisposed = true;
1434 GC.SuppressFinalize(this);
1435 } 1714 }
1715 workItemsGroup.Start();
1716 workItemsGroup.WaitForIdle();
1436 } 1717 }
1437 1718
1438 private void ValidateNotDisposed() 1719 /// <summary>
1720 /// Executes actions in sequence asynchronously.
1721 /// Returns immediately.
1722 /// </summary>
1723 /// <param name="pipeState"></param>
1724 /// <param name="actions">Actions to execute in the order they should run</param>
1725 public void Pipe<T>(T pipeState, params Action<T>[] actions)
1439 { 1726 {
1440 if(_isDisposed) 1727 Pipe(pipeState, (IEnumerable<Action<T>>)actions);
1441 {
1442 throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
1443 }
1444 } 1728 }
1445 #endregion 1729 #endregion
1446 } 1730 }
1447 #endregion 1731 #endregion
1448} 1732}