aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/ThirdParty/SmartThreadPool/SmartThreadPool.cs
diff options
context:
space:
mode:
Diffstat (limited to 'ThirdParty/SmartThreadPool/SmartThreadPool.cs')
-rw-r--r--ThirdParty/SmartThreadPool/SmartThreadPool.cs3177
1 files changed, 1732 insertions, 1445 deletions
diff --git a/ThirdParty/SmartThreadPool/SmartThreadPool.cs b/ThirdParty/SmartThreadPool/SmartThreadPool.cs
index 2c061d7..9256777 100644
--- a/ThirdParty/SmartThreadPool/SmartThreadPool.cs
+++ b/ThirdParty/SmartThreadPool/SmartThreadPool.cs
@@ -1,1445 +1,1732 @@
1// Ami Bar 1#region Release History
2// amibar@gmail.com 2
3// 3// Smart Thread Pool
4// Smart thread pool in C#. 4// 7 Aug 2004 - Initial release
5// 7 Aug 2004 - Initial release 5//
6// 14 Sep 2004 - Bug fixes 6// 14 Sep 2004 - Bug fixes
7// 15 Oct 2004 - Added new features 7//
8// - Work items return result. 8// 15 Oct 2004 - Added new features
9// - Support waiting synchronization for multiple work items. 9// - Work items return result.
10// - Work items can be cancelled. 10// - Support waiting synchronization for multiple work items.
11// - Passage of the caller thread’s context to the thread in the pool. 11// - Work items can be cancelled.
12// - Minimal usage of WIN32 handles. 12// - Passage of the caller thread’s context to the thread in the pool.
13// - Minor bug fixes. 13// - Minimal usage of WIN32 handles.
14// 26 Dec 2004 - Changes: 14// - Minor bug fixes.
15// - Removed static constructors. 15//
16// - Added finalizers. 16// 26 Dec 2004 - Changes:
17// - Changed Exceptions so they are serializable. 17// - Removed static constructors.
18// - Fixed the bug in one of the SmartThreadPool constructors. 18// - Added finalizers.
19// - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters. 19// - Changed Exceptions so they are serializable.
20// The SmartThreadPool.WaitAny() is still limited by the .NET Framework. 20// - Fixed the bug in one of the SmartThreadPool constructors.
21// - Added PostExecute with options on which cases to call it. 21// - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters.
22// - Added option to dispose of the state objects. 22// The SmartThreadPool.WaitAny() is still limited by the .NET Framework.
23// - Added a WaitForIdle() method that waits until the work items queue is empty. 23// - Added PostExecute with options on which cases to call it.
24// - Added an STPStartInfo class for the initialization of the thread pool. 24// - Added option to dispose of the state objects.
25// - Changed exception handling so if a work item throws an exception it 25// - Added a WaitForIdle() method that waits until the work items queue is empty.
26// is rethrown at GetResult(), rather then firing an UnhandledException event. 26// - Added an STPStartInfo class for the initialization of the thread pool.
27// Note that PostExecute exception are always ignored. 27// - Changed exception handling so if a work item throws an exception it
28// 25 Mar 2005 - Changes: 28// is rethrown at GetResult(), rather then firing an UnhandledException event.
29// - Fixed lost of work items bug 29// Note that PostExecute exception are always ignored.
30// 3 Jul 2005: Changes. 30//
31// - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, hardly reconstructed. 31// 25 Mar 2005 - Changes:
32// 16 Aug 2005: Changes. 32// - Fixed lost of work items bug
33// - Fixed bug where the InUseThreads becomes negative when canceling work items. 33//
34// 34// 3 Jul 2005: Changes.
35// 31 Jan 2006 - Changes: 35// - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, hardly reconstructed.
36// - Added work items priority 36//
37// - Removed support of chained delegates in callbacks and post executes (nobody really use this) 37// 16 Aug 2005: Changes.
38// - Added work items groups 38// - Fixed bug where the InUseThreads becomes negative when canceling work items.
39// - Added work items groups idle event 39//
40// - Changed SmartThreadPool.WaitAll() behavior so when it gets empty array 40// 31 Jan 2006 - Changes:
41// it returns true rather then throwing an exception. 41// - Added work items priority
42// - Added option to start the STP and the WIG as suspended 42// - Removed support of chained delegates in callbacks and post executes (nobody really use this)
43// - Exception behavior changed, the real exception is returned by an 43// - Added work items groups
44// inner exception 44// - Added work items groups idle event
45// - Added option to keep the Http context of the caller thread. (Thanks to Steven T.) 45// - Changed SmartThreadPool.WaitAll() behavior so when it gets empty array
46// - Added performance counters 46// it returns true rather then throwing an exception.
47// - Added priority to the threads in the pool 47// - Added option to start the STP and the WIG as suspended
48// 48// - Exception behavior changed, the real exception is returned by an
49// 13 Feb 2006 - Changes: 49// inner exception
50// - Added a call to the dispose of the Performance Counter so 50// - Added option to keep the Http context of the caller thread. (Thanks to Steven T.)
51// their won't be a Performance Counter leak. 51// - Added performance counters
52// - Added exception catch in case the Performance Counters cannot 52// - Added priority to the threads in the pool
53// be created. 53//
54 54// 13 Feb 2006 - Changes:
55using System; 55// - Added a call to the dispose of the Performance Counter so
56using System.Security; 56// their won't be a Performance Counter leak.
57using System.Threading; 57// - Added exception catch in case the Performance Counters cannot
58using System.Collections; 58// be created.
59using System.Diagnostics; 59//
60using System.Runtime.CompilerServices; 60// 17 May 2008 - Changes:
61 61// - Changed the dispose behavior and removed the Finalizers.
62using Amib.Threading.Internal; 62// - Enabled the change of the MaxThreads and MinThreads at run time.
63 63// - Enabled the change of the Concurrency of a IWorkItemsGroup at run
64namespace Amib.Threading 64// time If the IWorkItemsGroup is a SmartThreadPool then the Concurrency
65{ 65// refers to the MaxThreads.
66 #region SmartThreadPool class 66// - Improved the cancel behavior.
67 /// <summary> 67// - Added events for thread creation and termination.
68 /// Smart thread pool class. 68// - Fixed the HttpContext context capture.
69 /// </summary> 69// - Changed internal collections so they use generic collections
70 public class SmartThreadPool : IWorkItemsGroup, IDisposable 70// - Added IsIdle flag to the SmartThreadPool and IWorkItemsGroup
71 { 71// - Added support for WinCE
72 #region Default Constants 72// - Added support for Action<T> and Func<T>
73 73//
74 /// <summary> 74// 07 April 2009 - Changes:
75 /// Default minimum number of threads the thread pool contains. (0) 75// - Added support for Silverlight and Mono
76 /// </summary> 76// - Added Join, Choice, and Pipe to SmartThreadPool.
77 public const int DefaultMinWorkerThreads = 0; 77// - Added local performance counters (for Mono, Silverlight, and WindowsCE)
78 78// - Changed duration measures from DateTime.Now to Stopwatch.
79 /// <summary> 79// - Queues changed from System.Collections.Queue to System.Collections.Generic.LinkedList<T>.
80 /// Default maximum number of threads the thread pool contains. (25) 80//
81 /// </summary> 81// 21 December 2009 - Changes:
82 public const int DefaultMaxWorkerThreads = 25; 82// - Added work item timeout (passive)
83 83//
84 /// <summary> 84// 20 August 2012 - Changes:
85 /// Default idle timeout in milliseconds. (One minute) 85// - Added set name to threads
86 /// </summary> 86// - Fixed the WorkItemsQueue.Dequeue.
87 public const int DefaultIdleTimeout = 60*1000; // One minute 87// Replaced while (!Monitor.TryEnter(this)); with lock(this) { ... }
88 88// - Fixed SmartThreadPool.Pipe
89 /// <summary> 89// - Added IsBackground option to threads
90 /// Indicate to copy the security context of the caller and then use it in the call. (false) 90// - Added ApartmentState to threads
91 /// </summary> 91// - Fixed thread creation when queuing many work items at the same time.
92 public const bool DefaultUseCallerCallContext = false; 92//
93 93// 24 August 2012 - Changes:
94 /// <summary> 94// - Enabled cancel abort after cancel. See: http://smartthreadpool.codeplex.com/discussions/345937 by alecswan
95 /// Indicate to copy the HTTP context of the caller and then use it in the call. (false) 95// - Added option to set MaxStackSize of threads
96 /// </summary> 96
97 public const bool DefaultUseCallerHttpContext = false; 97#endregion
98 98
99 /// <summary> 99using System;
100 /// Indicate to dispose of the state objects if they support the IDispose interface. (false) 100using System.Security;
101 /// </summary> 101using System.Threading;
102 public const bool DefaultDisposeOfStateObjects = false; 102using System.Collections;
103 103using System.Collections.Generic;
104 /// <summary> 104using System.Diagnostics;
105 /// The default option to run the post execute 105using System.Runtime.CompilerServices;
106 /// </summary> 106
107 public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always; 107using Amib.Threading.Internal;
108 108
109 /// <summary> 109namespace Amib.Threading
110 /// The default post execute method to run. 110{
111 /// When null it means not to call it. 111 #region SmartThreadPool class
112 /// </summary> 112 /// <summary>
113 public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback = null; 113 /// Smart thread pool class.
114 114 /// </summary>
115 /// <summary> 115 public partial class SmartThreadPool : WorkItemsGroupBase, IDisposable
116 /// The default work item priority 116 {
117 /// </summary> 117 #region Public Default Constants
118 public const WorkItemPriority DefaultWorkItemPriority = WorkItemPriority.Normal; 118
119 119 /// <summary>
120 /// <summary> 120 /// Default minimum number of threads the thread pool contains. (0)
121 /// The default is to work on work items as soon as they arrive 121 /// </summary>
122 /// and not to wait for the start. 122 public const int DefaultMinWorkerThreads = 0;
123 /// </summary> 123
124 public const bool DefaultStartSuspended = false; 124 /// <summary>
125 125 /// Default maximum number of threads the thread pool contains. (25)
126 /// <summary> 126 /// </summary>
127 /// The default is not to use the performance counters 127 public const int DefaultMaxWorkerThreads = 25;
128 /// </summary> 128
129 public static readonly string DefaultPerformanceCounterInstanceName = null; 129 /// <summary>
130 130 /// Default idle timeout in milliseconds. (One minute)
131 public static readonly int DefaultStackSize = 0; 131 /// </summary>
132 132 public const int DefaultIdleTimeout = 60*1000; // One minute
133 /// <summary> 133
134 /// The default thread priority 134 /// <summary>
135 /// </summary> 135 /// Indicate to copy the security context of the caller and then use it in the call. (false)
136 public const ThreadPriority DefaultThreadPriority = ThreadPriority.Normal; 136 /// </summary>
137 137 public const bool DefaultUseCallerCallContext = false;
138 /// <summary> 138
139 /// The default thread pool name 139 /// <summary>
140 /// </summary> 140 /// Indicate to copy the HTTP context of the caller and then use it in the call. (false)
141 public const string DefaultThreadPoolName = "SmartThreadPool"; 141 /// </summary>
142 142 public const bool DefaultUseCallerHttpContext = false;
143 #endregion 143
144 144 /// <summary>
145 #region Member Variables 145 /// Indicate to dispose of the state objects if they support the IDispose interface. (false)
146 146 /// </summary>
147 /// <summary> 147 public const bool DefaultDisposeOfStateObjects = false;
148 /// Contains the name of this instance of SmartThreadPool. 148
149 /// Can be changed by the user. 149 /// <summary>
150 /// </summary> 150 /// The default option to run the post execute (CallToPostExecute.Always)
151 private string _name = DefaultThreadPoolName; 151 /// </summary>
152 152 public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always;
153 /// <summary> 153
154 /// Hashtable of all the threads in the thread pool. 154 /// <summary>
155 /// </summary> 155 /// The default post execute method to run. (None)
156 private Hashtable _workerThreads = Hashtable.Synchronized(new Hashtable()); 156 /// When null it means not to call it.
157 157 /// </summary>
158 /// <summary> 158 public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback;
159 /// Queue of work items. 159
160 /// </summary> 160 /// <summary>
161 private WorkItemsQueue _workItemsQueue = new WorkItemsQueue(); 161 /// The default work item priority (WorkItemPriority.Normal)
162 162 /// </summary>
163 /// <summary> 163 public const WorkItemPriority DefaultWorkItemPriority = WorkItemPriority.Normal;
164 /// Count the work items handled. 164
165 /// Used by the performance counter. 165 /// <summary>
166 /// </summary> 166 /// The default is to work on work items as soon as they arrive
167 private long _workItemsProcessed = 0; 167 /// and not to wait for the start. (false)
168 168 /// </summary>
169 /// <summary> 169 public const bool DefaultStartSuspended = false;
170 /// Number of threads that currently work (not idle). 170
171 /// </summary> 171 /// <summary>
172 private int _inUseWorkerThreads = 0; 172 /// The default name to use for the performance counters instance. (null)
173 173 /// </summary>
174 /// <summary> 174 public static readonly string DefaultPerformanceCounterInstanceName;
175 /// Start information to use. 175
176 /// It is simpler than providing many constructors. 176#if !(WINDOWS_PHONE)
177 /// </summary> 177
178 private STPStartInfo _stpStartInfo = new STPStartInfo(); 178 /// <summary>
179 179 /// The default thread priority (ThreadPriority.Normal)
180 /// <summary> 180 /// </summary>
181 /// Total number of work items that are stored in the work items queue 181 public const ThreadPriority DefaultThreadPriority = ThreadPriority.Normal;
182 /// plus the work items that the threads in the pool are working on. 182#endif
183 /// </summary> 183 /// <summary>
184 private int _currentWorkItemsCount = 0; 184 /// The default thread pool name. (SmartThreadPool)
185 185 /// </summary>
186 /// <summary> 186 public const string DefaultThreadPoolName = "SmartThreadPool";
187 /// Signaled when the thread pool is idle, i.e. no thread is busy 187
188 /// and the work items queue is empty 188 /// <summary>
189 /// </summary> 189 /// The default Max Stack Size. (SmartThreadPool)
190 private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); 190 /// </summary>
191 191 public static readonly int? DefaultMaxStackSize = null;
192 /// <summary> 192
193 /// An event to signal all the threads to quit immediately. 193 /// <summary>
194 /// </summary> 194 /// The default fill state with params. (false)
195 private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false); 195 /// It is relevant only to QueueWorkItem of Action&lt;...&gt;/Func&lt;...&gt;
196 196 /// </summary>
197 /// <summary> 197 public const bool DefaultFillStateWithArgs = false;
198 /// A flag to indicate the threads to quit. 198
199 /// </summary> 199 /// <summary>
200 private bool _shutdown = false; 200 /// The default thread backgroundness. (true)
201 201 /// </summary>
202 /// <summary> 202 public const bool DefaultAreThreadsBackground = true;
203 /// Counts the threads created in the pool. 203
204 /// It is used to name the threads. 204#if !(_SILVERLIGHT) && !(WINDOWS_PHONE)
205 /// </summary> 205 /// <summary>
206 private int _threadCounter = 0; 206 /// The default apartment state of a thread in the thread pool.
207 207 /// The default is ApartmentState.Unknown which means the STP will not
208 /// <summary> 208 /// set the apartment of the thread. It will use the .NET default.
209 /// Indicate that the SmartThreadPool has been disposed 209 /// </summary>
210 /// </summary> 210 public const ApartmentState DefaultApartmentState = ApartmentState.Unknown;
211 private bool _isDisposed = false; 211#endif
212 212
213 /// <summary> 213 #endregion
214 /// Event to send that the thread pool is idle 214
215 /// </summary> 215 #region Member Variables
216 private event EventHandler _stpIdle; 216
217 217 /// <summary>
218 /// <summary> 218 /// Dictionary of all the threads in the thread pool.
219 /// On idle event 219 /// </summary>
220 /// </summary> 220 private readonly SynchronizedDictionary<Thread, ThreadEntry> _workerThreads = new SynchronizedDictionary<Thread, ThreadEntry>();
221 //private event WorkItemsGroupIdleHandler _onIdle; 221
222 222 /// <summary>
223 /// <summary> 223 /// Queue of work items.
224 /// Holds all the WorkItemsGroup instaces that have at least one 224 /// </summary>
225 /// work item int the SmartThreadPool 225 private readonly WorkItemsQueue _workItemsQueue = new WorkItemsQueue();
226 /// This variable is used in case of Shutdown 226
227 /// </summary> 227 /// <summary>
228 private Hashtable _workItemsGroups = Hashtable.Synchronized(new Hashtable()); 228 /// Count the work items handled.
229 229 /// Used by the performance counter.
230 /// <summary> 230 /// </summary>
231 /// A reference from each thread in the thread pool to its SmartThreadPool 231 private int _workItemsProcessed;
232 /// object container. 232
233 /// With this variable a thread can know whatever it belongs to a 233 /// <summary>
234 /// SmartThreadPool. 234 /// Number of threads that currently work (not idle).
235 /// </summary> 235 /// </summary>
236 [ThreadStatic] 236 private int _inUseWorkerThreads;
237 private static SmartThreadPool _smartThreadPool; 237
238 238 /// <summary>
239 /// <summary> 239 /// Stores a copy of the original STPStartInfo.
240 /// A reference to the current work item a thread from the thread pool 240 /// It is used to change the MinThread and MaxThreads
241 /// is executing. 241 /// </summary>
242 /// </summary> 242 private STPStartInfo _stpStartInfo;
243 [ThreadStatic] 243
244 private static WorkItem _currentWorkItem; 244 /// <summary>
245 245 /// Total number of work items that are stored in the work items queue
246 /// <summary> 246 /// plus the work items that the threads in the pool are working on.
247 /// STP performance counters 247 /// </summary>
248 /// </summary> 248 private int _currentWorkItemsCount;
249 private ISTPInstancePerformanceCounters _pcs = NullSTPInstancePerformanceCounters.Instance; 249
250 250 /// <summary>
251 #endregion 251 /// Signaled when the thread pool is idle, i.e. no thread is busy
252 252 /// and the work items queue is empty
253 #region Construction and Finalization 253 /// </summary>
254 254 //private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
255 /// <summary> 255 private ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true);
256 /// Constructor 256
257 /// </summary> 257 /// <summary>
258 public SmartThreadPool() 258 /// An event to signal all the threads to quit immediately.
259 { 259 /// </summary>
260 Initialize(); 260 //private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false);
261 } 261 private ManualResetEvent _shuttingDownEvent = EventWaitHandleFactory.CreateManualResetEvent(false);
262 262
263 /// <summary> 263 /// <summary>
264 /// Constructor 264 /// A flag to indicate if the Smart Thread Pool is now suspended.
265 /// </summary> 265 /// </summary>
266 /// <param name="idleTimeout">Idle timeout in milliseconds</param> 266 private bool _isSuspended;
267 public SmartThreadPool(int idleTimeout) 267
268 { 268 /// <summary>
269 _stpStartInfo.IdleTimeout = idleTimeout; 269 /// A flag to indicate the threads to quit.
270 Initialize(); 270 /// </summary>
271 } 271 private bool _shutdown;
272 272
273 /// <summary> 273 /// <summary>
274 /// Constructor 274 /// Counts the threads created in the pool.
275 /// </summary> 275 /// It is used to name the threads.
276 /// <param name="idleTimeout">Idle timeout in milliseconds</param> 276 /// </summary>
277 /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param> 277 private int _threadCounter;
278 public SmartThreadPool( 278
279 int idleTimeout, 279 /// <summary>
280 int maxWorkerThreads) 280 /// Indicate that the SmartThreadPool has been disposed
281 { 281 /// </summary>
282 _stpStartInfo.IdleTimeout = idleTimeout; 282 private bool _isDisposed;
283 _stpStartInfo.MaxWorkerThreads = maxWorkerThreads; 283
284 Initialize(); 284 /// <summary>
285 } 285 /// Holds all the WorkItemsGroup instaces that have at least one
286 286 /// work item int the SmartThreadPool
287 /// <summary> 287 /// This variable is used in case of Shutdown
288 /// Constructor 288 /// </summary>
289 /// </summary> 289 private readonly SynchronizedDictionary<IWorkItemsGroup, IWorkItemsGroup> _workItemsGroups = new SynchronizedDictionary<IWorkItemsGroup, IWorkItemsGroup>();
290 /// <param name="idleTimeout">Idle timeout in milliseconds</param> 290
291 /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param> 291 /// <summary>
292 /// <param name="minWorkerThreads">Lower limit of threads in the pool</param> 292 /// A common object for all the work items int the STP
293 public SmartThreadPool( 293 /// so we can mark them to cancel in O(1)
294 int idleTimeout, 294 /// </summary>
295 int maxWorkerThreads, 295 private CanceledWorkItemsGroup _canceledSmartThreadPool = new CanceledWorkItemsGroup();
296 int minWorkerThreads) 296
297 { 297 /// <summary>
298 _stpStartInfo.IdleTimeout = idleTimeout; 298 /// Windows STP performance counters
299 _stpStartInfo.MaxWorkerThreads = maxWorkerThreads; 299 /// </summary>
300 _stpStartInfo.MinWorkerThreads = minWorkerThreads; 300 private ISTPInstancePerformanceCounters _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
301 Initialize(); 301
302 } 302 /// <summary>
303 303 /// Local STP performance counters
304 /// <summary> 304 /// </summary>
305 /// Constructor 305 private ISTPInstancePerformanceCounters _localPCs = NullSTPInstancePerformanceCounters.Instance;
306 /// </summary> 306
307 public SmartThreadPool(STPStartInfo stpStartInfo) 307
308 { 308#if (WINDOWS_PHONE)
309 _stpStartInfo = new STPStartInfo(stpStartInfo); 309 private static readonly Dictionary<int, ThreadEntry> _threadEntries = new Dictionary<int, ThreadEntry>();
310 Initialize(); 310#elif (_WINDOWS_CE)
311 } 311 private static LocalDataStoreSlot _threadEntrySlot = Thread.AllocateDataSlot();
312 312#else
313 private void Initialize() 313 [ThreadStatic]
314 { 314 private static ThreadEntry _threadEntry;
315 Name = _stpStartInfo.ThreadPoolName; 315
316 ValidateSTPStartInfo(); 316#endif
317 317
318 if (null != _stpStartInfo.PerformanceCounterInstanceName) 318 /// <summary>
319 { 319 /// An event to call after a thread is created, but before
320 try 320 /// it's first use.
321 { 321 /// </summary>
322 _pcs = new STPInstancePerformanceCounters(_stpStartInfo.PerformanceCounterInstanceName); 322 private event ThreadInitializationHandler _onThreadInitialization;
323 } 323
324 catch(Exception e) 324 /// <summary>
325 { 325 /// An event to call when a thread is about to exit, after
326 Debug.WriteLine("Unable to create Performance Counters: " + e.ToString()); 326 /// it is no longer belong to the pool.
327 _pcs = NullSTPInstancePerformanceCounters.Instance; 327 /// </summary>
328 } 328 private event ThreadTerminationHandler _onThreadTermination;
329 } 329
330 330 #endregion
331 StartOptimalNumberOfThreads(); 331
332 } 332 #region Per thread properties
333 333
334 private void StartOptimalNumberOfThreads() 334 /// <summary>
335 { 335 /// A reference to the current work item a thread from the thread pool
336 int threadsCount = Math.Max(_workItemsQueue.Count, _stpStartInfo.MinWorkerThreads); 336 /// is executing.
337 threadsCount = Math.Min(threadsCount, _stpStartInfo.MaxWorkerThreads); 337 /// </summary>
338 StartThreads(threadsCount); 338 internal static ThreadEntry CurrentThreadEntry
339 } 339 {
340 340#if (WINDOWS_PHONE)
341 private void ValidateSTPStartInfo() 341 get
342 { 342 {
343 if (_stpStartInfo.MinWorkerThreads < 0) 343 lock(_threadEntries)
344 { 344 {
345 throw new ArgumentOutOfRangeException( 345 ThreadEntry threadEntry;
346 "MinWorkerThreads", "MinWorkerThreads cannot be negative"); 346 if (_threadEntries.TryGetValue(Thread.CurrentThread.ManagedThreadId, out threadEntry))
347 } 347 {
348 348 return threadEntry;
349 if (_stpStartInfo.MaxWorkerThreads <= 0) 349 }
350 { 350 }
351 throw new ArgumentOutOfRangeException( 351 return null;
352 "MaxWorkerThreads", "MaxWorkerThreads must be greater than zero"); 352 }
353 } 353 set
354 354 {
355 if (_stpStartInfo.MinWorkerThreads > _stpStartInfo.MaxWorkerThreads) 355 lock(_threadEntries)
356 { 356 {
357 throw new ArgumentOutOfRangeException( 357 _threadEntries[Thread.CurrentThread.ManagedThreadId] = value;
358 "MinWorkerThreads, maxWorkerThreads", 358 }
359 "MaxWorkerThreads must be greater or equal to MinWorkerThreads"); 359 }
360 } 360#elif (_WINDOWS_CE)
361 } 361 get
362 362 {
363 private void ValidateCallback(Delegate callback) 363 //Thread.CurrentThread.ManagedThreadId
364 { 364 return Thread.GetData(_threadEntrySlot) as ThreadEntry;
365 if(callback.GetInvocationList().Length > 1) 365 }
366 { 366 set
367 throw new NotSupportedException("SmartThreadPool doesn't support delegates chains"); 367 {
368 } 368 Thread.SetData(_threadEntrySlot, value);
369 } 369 }
370 370#else
371 #endregion 371 get
372 372 {
373 #region Thread Processing 373 return _threadEntry;
374 374 }
375 /// <summary> 375 set
376 /// Waits on the queue for a work item, shutdown, or timeout. 376 {
377 /// </summary> 377 _threadEntry = value;
378 /// <returns> 378 }
379 /// Returns the WaitingCallback or null in case of timeout or shutdown. 379#endif
380 /// </returns> 380 }
381 private WorkItem Dequeue() 381 #endregion
382 { 382
383 WorkItem workItem = 383 #region Construction and Finalization
384 _workItemsQueue.DequeueWorkItem(_stpStartInfo.IdleTimeout, _shuttingDownEvent); 384
385 385 /// <summary>
386 return workItem; 386 /// Constructor
387 } 387 /// </summary>
388 388 public SmartThreadPool()
389 /// <summary> 389 {
390 /// Put a new work item in the queue 390 _stpStartInfo = new STPStartInfo();
391 /// </summary> 391 Initialize();
392 /// <param name="workItem">A work item to queue</param> 392 }
393 private void Enqueue(WorkItem workItem) 393
394 { 394 /// <summary>
395 Enqueue(workItem, true); 395 /// Constructor
396 } 396 /// </summary>
397 397 /// <param name="idleTimeout">Idle timeout in milliseconds</param>
398 /// <summary> 398 public SmartThreadPool(int idleTimeout)
399 /// Put a new work item in the queue 399 {
400 /// </summary> 400 _stpStartInfo = new STPStartInfo
401 /// <param name="workItem">A work item to queue</param> 401 {
402 internal void Enqueue(WorkItem workItem, bool incrementWorkItems) 402 IdleTimeout = idleTimeout,
403 { 403 };
404 // Make sure the workItem is not null 404 Initialize();
405 Debug.Assert(null != workItem); 405 }
406 406
407 if (incrementWorkItems) 407 /// <summary>
408 { 408 /// Constructor
409 IncrementWorkItemsCount(); 409 /// </summary>
410 } 410 /// <param name="idleTimeout">Idle timeout in milliseconds</param>
411 411 /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
412 _workItemsQueue.EnqueueWorkItem(workItem); 412 public SmartThreadPool(
413 workItem.WorkItemIsQueued(); 413 int idleTimeout,
414 414 int maxWorkerThreads)
415 // If all the threads are busy then try to create a new one 415 {
416 if ((InUseThreads + WaitingCallbacks) > _workerThreads.Count) 416 _stpStartInfo = new STPStartInfo
417 { 417 {
418 StartThreads(1); 418 IdleTimeout = idleTimeout,
419 } 419 MaxWorkerThreads = maxWorkerThreads,
420 } 420 };
421 421 Initialize();
422 private void IncrementWorkItemsCount() 422 }
423 { 423
424 _pcs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed); 424 /// <summary>
425 425 /// Constructor
426 int count = Interlocked.Increment(ref _currentWorkItemsCount); 426 /// </summary>
427 //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString()); 427 /// <param name="idleTimeout">Idle timeout in milliseconds</param>
428 if (count == 1) 428 /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
429 { 429 /// <param name="minWorkerThreads">Lower limit of threads in the pool</param>
430 //Trace.WriteLine("STP is NOT idle"); 430 public SmartThreadPool(
431 _isIdleWaitHandle.Reset(); 431 int idleTimeout,
432 } 432 int maxWorkerThreads,
433 } 433 int minWorkerThreads)
434 434 {
435 private void DecrementWorkItemsCount() 435 _stpStartInfo = new STPStartInfo
436 { 436 {
437 ++_workItemsProcessed; 437 IdleTimeout = idleTimeout,
438 438 MaxWorkerThreads = maxWorkerThreads,
439 // The counter counts even if the work item was cancelled 439 MinWorkerThreads = minWorkerThreads,
440 _pcs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed); 440 };
441 441 Initialize();
442 int count = Interlocked.Decrement(ref _currentWorkItemsCount); 442 }
443 //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString()); 443
444 if (count == 0) 444 /// <summary>
445 { 445 /// Constructor
446 //Trace.WriteLine("STP is idle"); 446 /// </summary>
447 _isIdleWaitHandle.Set(); 447 /// <param name="stpStartInfo">A SmartThreadPool configuration that overrides the default behavior</param>
448 } 448 public SmartThreadPool(STPStartInfo stpStartInfo)
449 } 449 {
450 450 _stpStartInfo = new STPStartInfo(stpStartInfo);
451 internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup) 451 Initialize();
452 { 452 }
453 _workItemsGroups[workItemsGroup] = workItemsGroup; 453
454 } 454 private void Initialize()
455 455 {
456 internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup) 456 Name = _stpStartInfo.ThreadPoolName;
457 { 457 ValidateSTPStartInfo();
458 if (_workItemsGroups.Contains(workItemsGroup)) 458
459 { 459 // _stpStartInfoRW stores a read/write copy of the STPStartInfo.
460 _workItemsGroups.Remove(workItemsGroup); 460 // Actually only MaxWorkerThreads and MinWorkerThreads are overwritten
461 } 461
462 } 462 _isSuspended = _stpStartInfo.StartSuspended;
463 463
464 /// <summary> 464#if (_WINDOWS_CE) || (_SILVERLIGHT) || (_MONO) || (WINDOWS_PHONE)
465 /// Inform that the current thread is about to quit or quiting. 465 if (null != _stpStartInfo.PerformanceCounterInstanceName)
466 /// The same thread may call this method more than once. 466 {
467 /// </summary> 467 throw new NotSupportedException("Performance counters are not implemented for Compact Framework/Silverlight/Mono, instead use StpStartInfo.EnableLocalPerformanceCounters");
468 private void InformCompleted() 468 }
469 { 469#else
470 // There is no need to lock the two methods together 470 if (null != _stpStartInfo.PerformanceCounterInstanceName)
471 // since only the current thread removes itself 471 {
472 // and the _workerThreads is a synchronized hashtable 472 try
473 if (_workerThreads.Contains(Thread.CurrentThread)) 473 {
474 { 474 _windowsPCs = new STPInstancePerformanceCounters(_stpStartInfo.PerformanceCounterInstanceName);
475 _workerThreads.Remove(Thread.CurrentThread); 475 }
476 _pcs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); 476 catch (Exception e)
477 } 477 {
478 } 478 Debug.WriteLine("Unable to create Performance Counters: " + e);
479 479 _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
480 /// <summary> 480 }
481 /// Starts new threads 481 }
482 /// </summary> 482#endif
483 /// <param name="threadsCount">The number of threads to start</param> 483
484 private void StartThreads(int threadsCount) 484 if (_stpStartInfo.EnableLocalPerformanceCounters)
485 { 485 {
486 if (_stpStartInfo.StartSuspended) 486 _localPCs = new LocalSTPInstancePerformanceCounters();
487 { 487 }
488 return; 488
489 } 489 // If the STP is not started suspended then start the threads.
490 490 if (!_isSuspended)
491 lock(_workerThreads.SyncRoot) 491 {
492 { 492 StartOptimalNumberOfThreads();
493 // Don't start threads on shut down 493 }
494 if (_shutdown) 494 }
495 { 495
496 return; 496 private void StartOptimalNumberOfThreads()
497 } 497 {
498 498 int threadsCount = Math.Max(_workItemsQueue.Count, _stpStartInfo.MinWorkerThreads);
499 for(int i = 0; i < threadsCount; ++i) 499 threadsCount = Math.Min(threadsCount, _stpStartInfo.MaxWorkerThreads);
500 { 500 threadsCount -= _workerThreads.Count;
501 // Don't create more threads then the upper limit 501 if (threadsCount > 0)
502 if (_workerThreads.Count >= _stpStartInfo.MaxWorkerThreads) 502 {
503 { 503 StartThreads(threadsCount);
504 return; 504 }
505 } 505 }
506 506
507 // Create a new thread 507 private void ValidateSTPStartInfo()
508 Thread workerThread; 508 {
509 if (_stpStartInfo.StackSize > 0) 509 if (_stpStartInfo.MinWorkerThreads < 0)
510 workerThread = new Thread(ProcessQueuedItems, _stpStartInfo.StackSize); 510 {
511 else 511 throw new ArgumentOutOfRangeException(
512 workerThread = new Thread(ProcessQueuedItems); 512 "MinWorkerThreads", "MinWorkerThreads cannot be negative");
513 513 }
514 // Configure the new thread and start it 514
515 workerThread.Name = "STP " + Name + " Thread #" + _threadCounter; 515 if (_stpStartInfo.MaxWorkerThreads <= 0)
516 workerThread.IsBackground = true; 516 {
517 workerThread.Priority = _stpStartInfo.ThreadPriority; 517 throw new ArgumentOutOfRangeException(
518 workerThread.Start(); 518 "MaxWorkerThreads", "MaxWorkerThreads must be greater than zero");
519 ++_threadCounter; 519 }
520 520
521 // Add the new thread to the hashtable and update its creation 521 if (_stpStartInfo.MinWorkerThreads > _stpStartInfo.MaxWorkerThreads)
522 // time. 522 {
523 _workerThreads[workerThread] = DateTime.Now; 523 throw new ArgumentOutOfRangeException(
524 _pcs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); 524 "MinWorkerThreads, maxWorkerThreads",
525 } 525 "MaxWorkerThreads must be greater or equal to MinWorkerThreads");
526 } 526 }
527 } 527 }
528 528
529 /// <summary> 529 private static void ValidateCallback(Delegate callback)
530 /// A worker thread method that processes work items from the work items queue. 530 {
531 /// </summary> 531 if(callback.GetInvocationList().Length > 1)
532 private void ProcessQueuedItems() 532 {
533 { 533 throw new NotSupportedException("SmartThreadPool doesn't support delegates chains");
534 // Initialize the _smartThreadPool variable 534 }
535 _smartThreadPool = this; 535 }
536 536
537 try 537 #endregion
538 { 538
539 bool bInUseWorkerThreadsWasIncremented = false; 539 #region Thread Processing
540 540
541 // Process until shutdown. 541 /// <summary>
542 while(!_shutdown) 542 /// Waits on the queue for a work item, shutdown, or timeout.
543 { 543 /// </summary>
544 544 /// <returns>
545 // Wait for a work item, shutdown, or timeout 545 /// Returns the WaitingCallback or null in case of timeout or shutdown.
546 WorkItem workItem = Dequeue(); 546 /// </returns>
547 547 private WorkItem Dequeue()
548 // On timeout or shut down. 548 {
549 if (null == workItem) 549 WorkItem workItem =
550 { 550 _workItemsQueue.DequeueWorkItem(_stpStartInfo.IdleTimeout, _shuttingDownEvent);
551 // Double lock for quit. 551
552 if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads) 552 return workItem;
553 { 553 }
554 lock(_workerThreads.SyncRoot) 554
555 { 555 /// <summary>
556 // Update the last time this thread was seen alive. 556 /// Put a new work item in the queue
557 // It's good for debugging. 557 /// </summary>
558 _workerThreads[Thread.CurrentThread] = DateTime.Now; 558 /// <param name="workItem">A work item to queue</param>
559 559 internal override void Enqueue(WorkItem workItem)
560 if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads) 560 {
561 { 561 // Make sure the workItem is not null
562 // Inform that the thread is quiting and then quit. 562 Debug.Assert(null != workItem);
563 // This method must be called within this lock or else 563
564 // more threads will quit and the thread pool will go 564 IncrementWorkItemsCount();
565 // below the lower limit. 565
566 InformCompleted(); 566 workItem.CanceledSmartThreadPool = _canceledSmartThreadPool;
567 break; 567 _workItemsQueue.EnqueueWorkItem(workItem);
568 } 568 workItem.WorkItemIsQueued();
569 } 569
570 } 570 // If all the threads are busy then try to create a new one
571 } 571 if (_currentWorkItemsCount > _workerThreads.Count)
572 572 {
573 // If we didn't quit then skip to the next iteration. 573 StartThreads(1);
574 if (null == workItem) 574 }
575 { 575 }
576 continue; 576
577 } 577 private void IncrementWorkItemsCount()
578 578 {
579 try 579 _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
580 { 580 _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
581 // Initialize the value to false 581
582 bInUseWorkerThreadsWasIncremented = false; 582 int count = Interlocked.Increment(ref _currentWorkItemsCount);
583 583 //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
584 // Change the state of the work item to 'in progress' if possible. 584 if (count == 1)
585 // We do it here so if the work item has been canceled we won't 585 {
586 // increment the _inUseWorkerThreads. 586 IsIdle = false;
587 // The cancel mechanism doesn't delete items from the queue, 587 _isIdleWaitHandle.Reset();
588 // it marks the work item as canceled, and when the work item 588 }
589 // is dequeued, we just skip it. 589 }
590 // If the post execute of work item is set to always or to 590
591 // call when the work item is canceled then the StartingWorkItem() 591 private void DecrementWorkItemsCount()
592 // will return true, so the post execute can run. 592 {
593 if (!workItem.StartingWorkItem()) 593 int count = Interlocked.Decrement(ref _currentWorkItemsCount);
594 { 594 //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
595 continue; 595 if (count == 0)
596 } 596 {
597 597 IsIdle = true;
598 // Execute the callback. Make sure to accurately 598 _isIdleWaitHandle.Set();
599 // record how many callbacks are currently executing. 599 }
600 int inUseWorkerThreads = Interlocked.Increment(ref _inUseWorkerThreads); 600
601 _pcs.SampleThreads(_workerThreads.Count, inUseWorkerThreads); 601 Interlocked.Increment(ref _workItemsProcessed);
602 602
603 // Mark that the _inUseWorkerThreads incremented, so in the finally{} 603 if (!_shutdown)
604 // statement we will decrement it correctly. 604 {
605 bInUseWorkerThreadsWasIncremented = true; 605 // The counter counts even if the work item was cancelled
606 606 _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
607 // Set the _currentWorkItem to the current work item 607 _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
608 _currentWorkItem = workItem; 608 }
609 609
610 lock(workItem) 610 }
611 { 611
612 workItem.currentThread = Thread.CurrentThread; 612 internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
613 } 613 {
614 614 _workItemsGroups[workItemsGroup] = workItemsGroup;
615 ExecuteWorkItem(workItem); 615 }
616 616
617 lock(workItem) 617 internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
618 { 618 {
619 workItem.currentThread = null; 619 if (_workItemsGroups.Contains(workItemsGroup))
620 } 620 {
621 621 _workItemsGroups.Remove(workItemsGroup);
622 } 622 }
623 catch(ThreadAbortException ex) 623 }
624 { 624
625 lock(workItem) 625 /// <summary>
626 { 626 /// Inform that the current thread is about to quit or quiting.
627 workItem.currentThread = null; 627 /// The same thread may call this method more than once.
628 } 628 /// </summary>
629 ex.GetHashCode(); 629 private void InformCompleted()
630 Thread.ResetAbort(); 630 {
631 } 631 // There is no need to lock the two methods together
632 catch(Exception ex) 632 // since only the current thread removes itself
633 { 633 // and the _workerThreads is a synchronized dictionary
634 ex.GetHashCode(); 634 if (_workerThreads.Contains(Thread.CurrentThread))
635 // Do nothing 635 {
636 } 636 _workerThreads.Remove(Thread.CurrentThread);
637 finally 637 _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
638 { 638 _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
639 lock(workItem) 639 }
640 { 640 }
641 workItem.currentThread = null; 641
642 } 642 /// <summary>
643 643 /// Starts new threads
644 if (null != workItem) 644 /// </summary>
645 { 645 /// <param name="threadsCount">The number of threads to start</param>
646 workItem.DisposeOfState(); 646 private void StartThreads(int threadsCount)
647 } 647 {
648 648 if (_isSuspended)
649 // Set the _currentWorkItem to null, since we 649 {
650 // no longer run user's code. 650 return;
651 _currentWorkItem = null; 651 }
652 652
653 // Decrement the _inUseWorkerThreads only if we had 653 lock(_workerThreads.SyncRoot)
654 // incremented it. Note the cancelled work items don't 654 {
655 // increment _inUseWorkerThreads. 655 // Don't start threads on shut down
656 if (bInUseWorkerThreadsWasIncremented) 656 if (_shutdown)
657 { 657 {
658 int inUseWorkerThreads = Interlocked.Decrement(ref _inUseWorkerThreads); 658 return;
659 _pcs.SampleThreads(_workerThreads.Count, inUseWorkerThreads); 659 }
660 } 660
661 661 for(int i = 0; i < threadsCount; ++i)
662 // Notify that the work item has been completed. 662 {
663 // WorkItemsGroup may enqueue their next work item. 663 // Don't create more threads then the upper limit
664 workItem.FireWorkItemCompleted(); 664 if (_workerThreads.Count >= _stpStartInfo.MaxWorkerThreads)
665 665 {
666 // Decrement the number of work items here so the idle 666 return;
667 // ManualResetEvent won't fluctuate. 667 }
668 DecrementWorkItemsCount(); 668
669 } 669 // Create a new thread
670 } 670
671 } 671#if (_SILVERLIGHT) || (WINDOWS_PHONE)
672 catch(ThreadAbortException tae) 672 Thread workerThread = new Thread(ProcessQueuedItems);
673 { 673#else
674 tae.GetHashCode(); 674 Thread workerThread =
675 // Handle the abort exception gracfully. 675 _stpStartInfo.MaxStackSize.HasValue
676 Thread.ResetAbort(); 676 ? new Thread(ProcessQueuedItems, _stpStartInfo.MaxStackSize.Value)
677 } 677 : new Thread(ProcessQueuedItems);
678 catch(Exception e) 678#endif
679 { 679 // Configure the new thread and start it
680 Debug.Assert(null != e); 680 workerThread.Name = "STP " + Name + " Thread #" + _threadCounter;
681 } 681 workerThread.IsBackground = _stpStartInfo.AreThreadsBackground;
682 finally 682
683 { 683#if !(_SILVERLIGHT) && !(_WINDOWS_CE) && !(WINDOWS_PHONE)
684 InformCompleted(); 684 if (_stpStartInfo.ApartmentState != ApartmentState.Unknown)
685 } 685 {
686 } 686 workerThread.SetApartmentState(_stpStartInfo.ApartmentState);
687 687 }
688 private void ExecuteWorkItem(WorkItem workItem) 688#endif
689 { 689
690 _pcs.SampleWorkItemsWaitTime(workItem.WaitingTime); 690#if !(_SILVERLIGHT) && !(WINDOWS_PHONE)
691 try 691 workerThread.Priority = _stpStartInfo.ThreadPriority;
692 { 692#endif
693 workItem.Execute(); 693 workerThread.Start();
694 } 694 ++_threadCounter;
695 catch 695
696 { 696 // Add it to the dictionary and update its creation time.
697 throw; 697 _workerThreads[workerThread] = new ThreadEntry(this);
698 } 698
699 finally 699 _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
700 { 700 _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
701 _pcs.SampleWorkItemsProcessTime(workItem.ProcessTime); 701 }
702 } 702 }
703 } 703 }
704 704
705 705 /// <summary>
706 #endregion 706 /// A worker thread method that processes work items from the work items queue.
707 707 /// </summary>
708 #region Public Methods 708 private void ProcessQueuedItems()
709 709 {
710 /// <summary> 710 // Keep the entry of the dictionary as thread's variable to avoid the synchronization locks
711 /// Queue a work item 711 // of the dictionary.
712 /// </summary> 712 CurrentThreadEntry = _workerThreads[Thread.CurrentThread];
713 /// <param name="callback">A callback to execute</param> 713
714 /// <returns>Returns a work item result</returns> 714 FireOnThreadInitialization();
715 public IWorkItemResult QueueWorkItem(WorkItemCallback callback) 715
716 { 716 try
717 ValidateNotDisposed(); 717 {
718 ValidateCallback(callback); 718 bool bInUseWorkerThreadsWasIncremented = false;
719 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback); 719
720 Enqueue(workItem); 720 // Process until shutdown.
721 return workItem.GetWorkItemResult(); 721 while(!_shutdown)
722 } 722 {
723 723 // Update the last time this thread was seen alive.
724 /// <summary> 724 // It's good for debugging.
725 /// Queue a work item 725 CurrentThreadEntry.IAmAlive();
726 /// </summary> 726
727 /// <param name="callback">A callback to execute</param> 727 // The following block handles the when the MaxWorkerThreads has been
728 /// <param name="workItemPriority">The priority of the work item</param> 728 // incremented by the user at run-time.
729 /// <returns>Returns a work item result</returns> 729 // Double lock for quit.
730 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority) 730 if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads)
731 { 731 {
732 ValidateNotDisposed(); 732 lock (_workerThreads.SyncRoot)
733 ValidateCallback(callback); 733 {
734 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, workItemPriority); 734 if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads)
735 Enqueue(workItem); 735 {
736 return workItem.GetWorkItemResult(); 736 // Inform that the thread is quiting and then quit.
737 } 737 // This method must be called within this lock or else
738 738 // more threads will quit and the thread pool will go
739 /// <summary> 739 // below the lower limit.
740 /// Queue a work item 740 InformCompleted();
741 /// </summary> 741 break;
742 /// <param name="workItemInfo">Work item info</param> 742 }
743 /// <param name="callback">A callback to execute</param> 743 }
744 /// <returns>Returns a work item result</returns> 744 }
745 public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback) 745
746 { 746 // Wait for a work item, shutdown, or timeout
747 ValidateNotDisposed(); 747 WorkItem workItem = Dequeue();
748 ValidateCallback(callback); 748
749 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, workItemInfo, callback); 749 // Update the last time this thread was seen alive.
750 Enqueue(workItem); 750 // It's good for debugging.
751 return workItem.GetWorkItemResult(); 751 CurrentThreadEntry.IAmAlive();
752 } 752
753 753 // On timeout or shut down.
754 /// <summary> 754 if (null == workItem)
755 /// Queue a work item 755 {
756 /// </summary> 756 // Double lock for quit.
757 /// <param name="callback">A callback to execute</param> 757 if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
758 /// <param name="state"> 758 {
759 /// The context object of the work item. Used for passing arguments to the work item. 759 lock(_workerThreads.SyncRoot)
760 /// </param> 760 {
761 /// <returns>Returns a work item result</returns> 761 if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
762 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state) 762 {
763 { 763 // Inform that the thread is quiting and then quit.
764 ValidateNotDisposed(); 764 // This method must be called within this lock or else
765 ValidateCallback(callback); 765 // more threads will quit and the thread pool will go
766 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state); 766 // below the lower limit.
767 Enqueue(workItem); 767 InformCompleted();
768 return workItem.GetWorkItemResult(); 768 break;
769 } 769 }
770 770 }
771 /// <summary> 771 }
772 /// Queue a work item 772 }
773 /// </summary> 773
774 /// <param name="callback">A callback to execute</param> 774 // If we didn't quit then skip to the next iteration.
775 /// <param name="state"> 775 if (null == workItem)
776 /// The context object of the work item. Used for passing arguments to the work item. 776 {
777 /// </param> 777 continue;
778 /// <param name="workItemPriority">The work item priority</param> 778 }
779 /// <returns>Returns a work item result</returns> 779
780 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority) 780 try
781 { 781 {
782 ValidateNotDisposed(); 782 // Initialize the value to false
783 ValidateCallback(callback); 783 bInUseWorkerThreadsWasIncremented = false;
784 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, workItemPriority); 784
785 Enqueue(workItem); 785 // Set the Current Work Item of the thread.
786 return workItem.GetWorkItemResult(); 786 // Store the Current Work Item before the workItem.StartingWorkItem() is called,
787 } 787 // so WorkItem.Cancel can work when the work item is between InQueue and InProgress
788 788 // states.
789 /// <summary> 789 // If the work item has been cancelled BEFORE the workItem.StartingWorkItem()
790 /// Queue a work item 790 // (work item is in InQueue state) then workItem.StartingWorkItem() will return false.
791 /// </summary> 791 // If the work item has been cancelled AFTER the workItem.StartingWorkItem() then
792 /// <param name="workItemInfo">Work item information</param> 792 // (work item is in InProgress state) then the thread will be aborted
793 /// <param name="callback">A callback to execute</param> 793 CurrentThreadEntry.CurrentWorkItem = workItem;
794 /// <param name="state"> 794
795 /// The context object of the work item. Used for passing arguments to the work item. 795 // Change the state of the work item to 'in progress' if possible.
796 /// </param> 796 // We do it here so if the work item has been canceled we won't
797 /// <returns>Returns a work item result</returns> 797 // increment the _inUseWorkerThreads.
798 public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state) 798 // The cancel mechanism doesn't delete items from the queue,
799 { 799 // it marks the work item as canceled, and when the work item
800 ValidateNotDisposed(); 800 // is dequeued, we just skip it.
801 ValidateCallback(callback); 801 // If the post execute of work item is set to always or to
802 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, workItemInfo, callback, state); 802 // call when the work item is canceled then the StartingWorkItem()
803 Enqueue(workItem); 803 // will return true, so the post execute can run.
804 return workItem.GetWorkItemResult(); 804 if (!workItem.StartingWorkItem())
805 } 805 {
806 806 continue;
807 /// <summary> 807 }
808 /// Queue a work item 808
809 /// </summary> 809 // Execute the callback. Make sure to accurately
810 /// <param name="callback">A callback to execute</param> 810 // record how many callbacks are currently executing.
811 /// <param name="state"> 811 int inUseWorkerThreads = Interlocked.Increment(ref _inUseWorkerThreads);
812 /// The context object of the work item. Used for passing arguments to the work item. 812 _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
813 /// </param> 813 _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
814 /// <param name="postExecuteWorkItemCallback"> 814
815 /// A delegate to call after the callback completion 815 // Mark that the _inUseWorkerThreads incremented, so in the finally{}
816 /// </param> 816 // statement we will decrement it correctly.
817 /// <returns>Returns a work item result</returns> 817 bInUseWorkerThreadsWasIncremented = true;
818 public IWorkItemResult QueueWorkItem( 818
819 WorkItemCallback callback, 819 workItem.FireWorkItemStarted();
820 object state, 820
821 PostExecuteWorkItemCallback postExecuteWorkItemCallback) 821 ExecuteWorkItem(workItem);
822 { 822 }
823 ValidateNotDisposed(); 823 catch(Exception ex)
824 ValidateCallback(callback); 824 {
825 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback); 825 ex.GetHashCode();
826 Enqueue(workItem); 826 // Do nothing
827 return workItem.GetWorkItemResult(); 827 }
828 } 828 finally
829 829 {
830 /// <summary> 830 workItem.DisposeOfState();
831 /// Queue a work item 831
832 /// </summary> 832 // Set the CurrentWorkItem to null, since we
833 /// <param name="callback">A callback to execute</param> 833 // no longer run user's code.
834 /// <param name="state"> 834 CurrentThreadEntry.CurrentWorkItem = null;
835 /// The context object of the work item. Used for passing arguments to the work item. 835
836 /// </param> 836 // Decrement the _inUseWorkerThreads only if we had
837 /// <param name="postExecuteWorkItemCallback"> 837 // incremented it. Note the cancelled work items don't
838 /// A delegate to call after the callback completion 838 // increment _inUseWorkerThreads.
839 /// </param> 839 if (bInUseWorkerThreadsWasIncremented)
840 /// <param name="workItemPriority">The work item priority</param> 840 {
841 /// <returns>Returns a work item result</returns> 841 int inUseWorkerThreads = Interlocked.Decrement(ref _inUseWorkerThreads);
842 public IWorkItemResult QueueWorkItem( 842 _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
843 WorkItemCallback callback, 843 _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
844 object state, 844 }
845 PostExecuteWorkItemCallback postExecuteWorkItemCallback, 845
846 WorkItemPriority workItemPriority) 846 // Notify that the work item has been completed.
847 { 847 // WorkItemsGroup may enqueue their next work item.
848 ValidateNotDisposed(); 848 workItem.FireWorkItemCompleted();
849 ValidateCallback(callback); 849
850 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority); 850 // Decrement the number of work items here so the idle
851 Enqueue(workItem); 851 // ManualResetEvent won't fluctuate.
852 return workItem.GetWorkItemResult(); 852 DecrementWorkItemsCount();
853 } 853 }
854 854 }
855 /// <summary> 855 }
856 /// Queue a work item 856 catch(ThreadAbortException tae)
857 /// </summary> 857 {
858 /// <param name="callback">A callback to execute</param> 858 tae.GetHashCode();
859 /// <param name="state"> 859 // Handle the abort exception gracfully.
860 /// The context object of the work item. Used for passing arguments to the work item. 860#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
861 /// </param> 861 Thread.ResetAbort();
862 /// <param name="postExecuteWorkItemCallback"> 862#endif
863 /// A delegate to call after the callback completion 863 }
864 /// </param> 864 catch(Exception e)
865 /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param> 865 {
866 /// <returns>Returns a work item result</returns> 866 Debug.Assert(null != e);
867 public IWorkItemResult QueueWorkItem( 867 }
868 WorkItemCallback callback, 868 finally
869 object state, 869 {
870 PostExecuteWorkItemCallback postExecuteWorkItemCallback, 870 InformCompleted();
871 CallToPostExecute callToPostExecute) 871 FireOnThreadTermination();
872 { 872 }
873 ValidateNotDisposed(); 873 }
874 ValidateCallback(callback); 874
875 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute); 875 private void ExecuteWorkItem(WorkItem workItem)
876 Enqueue(workItem); 876 {
877 return workItem.GetWorkItemResult(); 877 _windowsPCs.SampleWorkItemsWaitTime(workItem.WaitingTime);
878 } 878 _localPCs.SampleWorkItemsWaitTime(workItem.WaitingTime);
879 879 try
880 /// <summary> 880 {
881 /// Queue a work item 881 workItem.Execute();
882 /// </summary> 882 }
883 /// <param name="callback">A callback to execute</param> 883 finally
884 /// <param name="state"> 884 {
885 /// The context object of the work item. Used for passing arguments to the work item. 885 _windowsPCs.SampleWorkItemsProcessTime(workItem.ProcessTime);
886 /// </param> 886 _localPCs.SampleWorkItemsProcessTime(workItem.ProcessTime);
887 /// <param name="postExecuteWorkItemCallback"> 887 }
888 /// A delegate to call after the callback completion 888 }
889 /// </param> 889
890 /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param> 890
891 /// <param name="workItemPriority">The work item priority</param> 891 #endregion
892 /// <returns>Returns a work item result</returns> 892
893 public IWorkItemResult QueueWorkItem( 893 #region Public Methods
894 WorkItemCallback callback, 894
895 object state, 895 private void ValidateWaitForIdle()
896 PostExecuteWorkItemCallback postExecuteWorkItemCallback, 896 {
897 CallToPostExecute callToPostExecute, 897 if (null != CurrentThreadEntry && CurrentThreadEntry.AssociatedSmartThreadPool == this)
898 WorkItemPriority workItemPriority) 898 {
899 { 899 throw new NotSupportedException(
900 ValidateNotDisposed(); 900 "WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
901 ValidateCallback(callback); 901 }
902 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _stpStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority); 902 }
903 Enqueue(workItem); 903
904 return workItem.GetWorkItemResult(); 904 internal static void ValidateWorkItemsGroupWaitForIdle(IWorkItemsGroup workItemsGroup)
905 } 905 {
906 906 if (null == CurrentThreadEntry)
907 /// <summary> 907 {
908 /// Wait for the thread pool to be idle 908 return;
909 /// </summary> 909 }
910 public void WaitForIdle() 910
911 { 911 WorkItem workItem = CurrentThreadEntry.CurrentWorkItem;
912 WaitForIdle(Timeout.Infinite); 912 ValidateWorkItemsGroupWaitForIdleImpl(workItemsGroup, workItem);
913 } 913 if ((null != workItemsGroup) &&
914 914 (null != workItem) &&
915 /// <summary> 915 CurrentThreadEntry.CurrentWorkItem.WasQueuedBy(workItemsGroup))
916 /// Wait for the thread pool to be idle 916 {
917 /// </summary> 917 throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
918 public bool WaitForIdle(TimeSpan timeout) 918 }
919 { 919 }
920 return WaitForIdle((int)timeout.TotalMilliseconds); 920
921 } 921 [MethodImpl(MethodImplOptions.NoInlining)]
922 922 private static void ValidateWorkItemsGroupWaitForIdleImpl(IWorkItemsGroup workItemsGroup, WorkItem workItem)
923 /// <summary> 923 {
924 /// Wait for the thread pool to be idle 924 if ((null != workItemsGroup) &&
925 /// </summary> 925 (null != workItem) &&
926 public bool WaitForIdle(int millisecondsTimeout) 926 workItem.WasQueuedBy(workItemsGroup))
927 { 927 {
928 ValidateWaitForIdle(); 928 throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
929 return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false); 929 }
930 } 930 }
931 931
932 private void ValidateWaitForIdle() 932 /// <summary>
933 { 933 /// Force the SmartThreadPool to shutdown
934 if(_smartThreadPool == this) 934 /// </summary>
935 { 935 public void Shutdown()
936 throw new NotSupportedException( 936 {
937 "WaitForIdle cannot be called from a thread on its SmartThreadPool, it will cause may cause a deadlock"); 937 Shutdown(true, 0);
938 } 938 }
939 } 939
940 940 /// <summary>
941 internal void ValidateWorkItemsGroupWaitForIdle(IWorkItemsGroup workItemsGroup) 941 /// Force the SmartThreadPool to shutdown with timeout
942 { 942 /// </summary>
943 ValidateWorkItemsGroupWaitForIdleImpl(workItemsGroup, SmartThreadPool._currentWorkItem); 943 public void Shutdown(bool forceAbort, TimeSpan timeout)
944 if ((null != workItemsGroup) && 944 {
945 (null != SmartThreadPool._currentWorkItem) && 945 Shutdown(forceAbort, (int)timeout.TotalMilliseconds);
946 SmartThreadPool._currentWorkItem.WasQueuedBy(workItemsGroup)) 946 }
947 { 947
948 throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it will cause may cause a deadlock"); 948 /// <summary>
949 } 949 /// Empties the queue of work items and abort the threads in the pool.
950 } 950 /// </summary>
951 951 public void Shutdown(bool forceAbort, int millisecondsTimeout)
952 [MethodImpl(MethodImplOptions.NoInlining)] 952 {
953 private void ValidateWorkItemsGroupWaitForIdleImpl(IWorkItemsGroup workItemsGroup, WorkItem workItem) 953 ValidateNotDisposed();
954 { 954
955 if ((null != workItemsGroup) && 955 ISTPInstancePerformanceCounters pcs = _windowsPCs;
956 (null != workItem) && 956
957 workItem.WasQueuedBy(workItemsGroup)) 957 if (NullSTPInstancePerformanceCounters.Instance != _windowsPCs)
958 { 958 {
959 throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it will cause may cause a deadlock"); 959 // Set the _pcs to "null" to stop updating the performance
960 } 960 // counters
961 } 961 _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
962 962
963 963 pcs.Dispose();
964 964 }
965 /// <summary> 965
966 /// Force the SmartThreadPool to shutdown 966 Thread [] threads;
967 /// </summary> 967 lock(_workerThreads.SyncRoot)
968 public void Shutdown() 968 {
969 { 969 // Shutdown the work items queue
970 Shutdown(true, 0); 970 _workItemsQueue.Dispose();
971 } 971
972 972 // Signal the threads to exit
973 public void Shutdown(bool forceAbort, TimeSpan timeout) 973 _shutdown = true;
974 { 974 _shuttingDownEvent.Set();
975 Shutdown(forceAbort, (int)timeout.TotalMilliseconds); 975
976 } 976 // Make a copy of the threads' references in the pool
977 977 threads = new Thread [_workerThreads.Count];
978 /// <summary> 978 _workerThreads.Keys.CopyTo(threads, 0);
979 /// Empties the queue of work items and abort the threads in the pool. 979 }
980 /// </summary> 980
981 public void Shutdown(bool forceAbort, int millisecondsTimeout) 981 int millisecondsLeft = millisecondsTimeout;
982 { 982 Stopwatch stopwatch = Stopwatch.StartNew();
983 ValidateNotDisposed(); 983 //DateTime start = DateTime.UtcNow;
984 984 bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout);
985 ISTPInstancePerformanceCounters pcs = _pcs; 985 bool timeout = false;
986 986
987 if (NullSTPInstancePerformanceCounters.Instance != _pcs) 987 // Each iteration we update the time left for the timeout.
988 { 988 foreach(Thread thread in threads)
989 _pcs.Dispose(); 989 {
990 // Set the _pcs to "null" to stop updating the performance 990 // Join don't work with negative numbers
991 // counters 991 if (!waitInfinitely && (millisecondsLeft < 0))
992 _pcs = NullSTPInstancePerformanceCounters.Instance; 992 {
993 } 993 timeout = true;
994 994 break;
995 Thread [] threads = null; 995 }
996 lock(_workerThreads.SyncRoot) 996
997 { 997 // Wait for the thread to terminate
998 // Shutdown the work items queue 998 bool success = thread.Join(millisecondsLeft);
999 _workItemsQueue.Dispose(); 999 if(!success)
1000 1000 {
1001 // Signal the threads to exit 1001 timeout = true;
1002 _shutdown = true; 1002 break;
1003 _shuttingDownEvent.Set(); 1003 }
1004 1004
1005 // Make a copy of the threads' references in the pool 1005 if(!waitInfinitely)
1006 threads = new Thread [_workerThreads.Count]; 1006 {
1007 _workerThreads.Keys.CopyTo(threads, 0); 1007 // Update the time left to wait
1008 } 1008 //TimeSpan ts = DateTime.UtcNow - start;
1009 1009 millisecondsLeft = millisecondsTimeout - (int)stopwatch.ElapsedMilliseconds;
1010 int millisecondsLeft = millisecondsTimeout; 1010 }
1011 DateTime start = DateTime.Now; 1011 }
1012 bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout); 1012
1013 bool timeout = false; 1013 if (timeout && forceAbort)
1014 1014 {
1015 // Each iteration we update the time left for the timeout. 1015 // Abort the threads in the pool
1016 foreach(Thread thread in threads) 1016 foreach(Thread thread in threads)
1017 { 1017 {
1018 // Join don't work with negative numbers 1018
1019 if (!waitInfinitely && (millisecondsLeft < 0)) 1019 if ((thread != null)
1020 { 1020#if !(_WINDOWS_CE)
1021 timeout = true; 1021 && thread.IsAlive
1022 break; 1022#endif
1023 } 1023 )
1024 1024 {
1025 // Wait for the thread to terminate 1025 try
1026 bool success = thread.Join(millisecondsLeft); 1026 {
1027 if(!success) 1027 thread.Abort(); // Shutdown
1028 { 1028 }
1029 timeout = true; 1029 catch(SecurityException e)
1030 break; 1030 {
1031 } 1031 e.GetHashCode();
1032 1032 }
1033 if(!waitInfinitely) 1033 catch(ThreadStateException ex)
1034 { 1034 {
1035 // Update the time left to wait 1035 ex.GetHashCode();
1036 TimeSpan ts = DateTime.Now - start; 1036 // In case the thread has been terminated
1037 millisecondsLeft = millisecondsTimeout - (int)ts.TotalMilliseconds; 1037 // after the check if it is alive.
1038 } 1038 }
1039 } 1039 }
1040 1040 }
1041 if (timeout && forceAbort) 1041 }
1042 { 1042 }
1043 // Abort the threads in the pool 1043
1044 foreach(Thread thread in threads) 1044 /// <summary>
1045 { 1045 /// Wait for all work items to complete
1046 if ((thread != null) && thread.IsAlive) 1046 /// </summary>
1047 { 1047 /// <param name="waitableResults">Array of work item result objects</param>
1048 try 1048 /// <returns>
1049 { 1049 /// true when every work item in workItemResults has completed; otherwise false.
1050 thread.Abort("Shutdown"); 1050 /// </returns>
1051 } 1051 public static bool WaitAll(
1052 catch(SecurityException e) 1052 IWaitableResult [] waitableResults)
1053 { 1053 {
1054 e.GetHashCode(); 1054 return WaitAll(waitableResults, Timeout.Infinite, true);
1055 } 1055 }
1056 catch(ThreadStateException ex) 1056
1057 { 1057 /// <summary>
1058 ex.GetHashCode(); 1058 /// Wait for all work items to complete
1059 // In case the thread has been terminated 1059 /// </summary>
1060 // after the check if it is alive. 1060 /// <param name="waitableResults">Array of work item result objects</param>
1061 } 1061 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1062 } 1062 /// <param name="exitContext">
1063 } 1063 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1064 } 1064 /// </param>
1065 1065 /// <returns>
1066 // Dispose of the performance counters 1066 /// true when every work item in workItemResults has completed; otherwise false.
1067 pcs.Dispose(); 1067 /// </returns>
1068 } 1068 public static bool WaitAll(
1069 1069 IWaitableResult [] waitableResults,
1070 /// <summary> 1070 TimeSpan timeout,
1071 /// Wait for all work items to complete 1071 bool exitContext)
1072 /// </summary> 1072 {
1073 /// <param name="workItemResults">Array of work item result objects</param> 1073 return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext);
1074 /// <returns> 1074 }
1075 /// true when every work item in workItemResults has completed; otherwise false. 1075
1076 /// </returns> 1076 /// <summary>
1077 public static bool WaitAll( 1077 /// Wait for all work items to complete
1078 IWorkItemResult [] workItemResults) 1078 /// </summary>
1079 { 1079 /// <param name="waitableResults">Array of work item result objects</param>
1080 return WaitAll(workItemResults, Timeout.Infinite, true); 1080 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1081 } 1081 /// <param name="exitContext">
1082 1082 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1083 /// <summary> 1083 /// </param>
1084 /// Wait for all work items to complete 1084 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1085 /// </summary> 1085 /// <returns>
1086 /// <param name="workItemResults">Array of work item result objects</param> 1086 /// true when every work item in workItemResults has completed; otherwise false.
1087 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param> 1087 /// </returns>
1088 /// <param name="exitContext"> 1088 public static bool WaitAll(
1089 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1089 IWaitableResult[] waitableResults,
1090 /// </param> 1090 TimeSpan timeout,
1091 /// <returns> 1091 bool exitContext,
1092 /// true when every work item in workItemResults has completed; otherwise false. 1092 WaitHandle cancelWaitHandle)
1093 /// </returns> 1093 {
1094 public static bool WaitAll( 1094 return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle);
1095 IWorkItemResult [] workItemResults, 1095 }
1096 TimeSpan timeout, 1096
1097 bool exitContext) 1097 /// <summary>
1098 { 1098 /// Wait for all work items to complete
1099 return WaitAll(workItemResults, (int)timeout.TotalMilliseconds, exitContext); 1099 /// </summary>
1100 } 1100 /// <param name="waitableResults">Array of work item result objects</param>
1101 1101 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1102 /// <summary> 1102 /// <param name="exitContext">
1103 /// Wait for all work items to complete 1103 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1104 /// </summary> 1104 /// </param>
1105 /// <param name="workItemResults">Array of work item result objects</param> 1105 /// <returns>
1106 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param> 1106 /// true when every work item in workItemResults has completed; otherwise false.
1107 /// <param name="exitContext"> 1107 /// </returns>
1108 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1108 public static bool WaitAll(
1109 /// </param> 1109 IWaitableResult [] waitableResults,
1110 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param> 1110 int millisecondsTimeout,
1111 /// <returns> 1111 bool exitContext)
1112 /// true when every work item in workItemResults has completed; otherwise false. 1112 {
1113 /// </returns> 1113 return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, null);
1114 public static bool WaitAll( 1114 }
1115 IWorkItemResult [] workItemResults, 1115
1116 TimeSpan timeout, 1116 /// <summary>
1117 bool exitContext, 1117 /// Wait for all work items to complete
1118 WaitHandle cancelWaitHandle) 1118 /// </summary>
1119 { 1119 /// <param name="waitableResults">Array of work item result objects</param>
1120 return WaitAll(workItemResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle); 1120 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1121 } 1121 /// <param name="exitContext">
1122 1122 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1123 /// <summary> 1123 /// </param>
1124 /// Wait for all work items to complete 1124 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1125 /// </summary> 1125 /// <returns>
1126 /// <param name="workItemResults">Array of work item result objects</param> 1126 /// true when every work item in workItemResults has completed; otherwise false.
1127 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param> 1127 /// </returns>
1128 /// <param name="exitContext"> 1128 public static bool WaitAll(
1129 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1129 IWaitableResult[] waitableResults,
1130 /// </param> 1130 int millisecondsTimeout,
1131 /// <returns> 1131 bool exitContext,
1132 /// true when every work item in workItemResults has completed; otherwise false. 1132 WaitHandle cancelWaitHandle)
1133 /// </returns> 1133 {
1134 public static bool WaitAll( 1134 return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle);
1135 IWorkItemResult [] workItemResults, 1135 }
1136 int millisecondsTimeout, 1136
1137 bool exitContext) 1137
1138 { 1138 /// <summary>
1139 return WorkItem.WaitAll(workItemResults, millisecondsTimeout, exitContext, null); 1139 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1140 } 1140 /// </summary>
1141 1141 /// <param name="waitableResults">Array of work item result objects</param>
1142 /// <summary> 1142 /// <returns>
1143 /// Wait for all work items to complete 1143 /// The array index of the work item result that satisfied the wait, or WaitTimeout if any of the work items has been canceled.
1144 /// </summary> 1144 /// </returns>
1145 /// <param name="workItemResults">Array of work item result objects</param> 1145 public static int WaitAny(
1146 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param> 1146 IWaitableResult [] waitableResults)
1147 /// <param name="exitContext"> 1147 {
1148 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1148 return WaitAny(waitableResults, Timeout.Infinite, true);
1149 /// </param> 1149 }
1150 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param> 1150
1151 /// <returns> 1151 /// <summary>
1152 /// true when every work item in workItemResults has completed; otherwise false. 1152 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1153 /// </returns> 1153 /// </summary>
1154 public static bool WaitAll( 1154 /// <param name="waitableResults">Array of work item result objects</param>
1155 IWorkItemResult [] workItemResults, 1155 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1156 int millisecondsTimeout, 1156 /// <param name="exitContext">
1157 bool exitContext, 1157 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1158 WaitHandle cancelWaitHandle) 1158 /// </param>
1159 { 1159 /// <returns>
1160 return WorkItem.WaitAll(workItemResults, millisecondsTimeout, exitContext, cancelWaitHandle); 1160 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1161 } 1161 /// </returns>
1162 1162 public static int WaitAny(
1163 1163 IWaitableResult[] waitableResults,
1164 /// <summary> 1164 TimeSpan timeout,
1165 /// Waits for any of the work items in the specified array to complete, cancel, or timeout 1165 bool exitContext)
1166 /// </summary> 1166 {
1167 /// <param name="workItemResults">Array of work item result objects</param> 1167 return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext);
1168 /// <returns> 1168 }
1169 /// The array index of the work item result that satisfied the wait, or WaitTimeout if any of the work items has been canceled. 1169
1170 /// </returns> 1170 /// <summary>
1171 public static int WaitAny( 1171 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1172 IWorkItemResult [] workItemResults) 1172 /// </summary>
1173 { 1173 /// <param name="waitableResults">Array of work item result objects</param>
1174 return WaitAny(workItemResults, Timeout.Infinite, true); 1174 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1175 } 1175 /// <param name="exitContext">
1176 1176 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1177 /// <summary> 1177 /// </param>
1178 /// Waits for any of the work items in the specified array to complete, cancel, or timeout 1178 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1179 /// </summary> 1179 /// <returns>
1180 /// <param name="workItemResults">Array of work item result objects</param> 1180 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1181 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param> 1181 /// </returns>
1182 /// <param name="exitContext"> 1182 public static int WaitAny(
1183 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1183 IWaitableResult [] waitableResults,
1184 /// </param> 1184 TimeSpan timeout,
1185 /// <returns> 1185 bool exitContext,
1186 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. 1186 WaitHandle cancelWaitHandle)
1187 /// </returns> 1187 {
1188 public static int WaitAny( 1188 return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle);
1189 IWorkItemResult [] workItemResults, 1189 }
1190 TimeSpan timeout, 1190
1191 bool exitContext) 1191 /// <summary>
1192 { 1192 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1193 return WaitAny(workItemResults, (int)timeout.TotalMilliseconds, exitContext); 1193 /// </summary>
1194 } 1194 /// <param name="waitableResults">Array of work item result objects</param>
1195 1195 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1196 /// <summary> 1196 /// <param name="exitContext">
1197 /// Waits for any of the work items in the specified array to complete, cancel, or timeout 1197 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1198 /// </summary> 1198 /// </param>
1199 /// <param name="workItemResults">Array of work item result objects</param> 1199 /// <returns>
1200 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param> 1200 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1201 /// <param name="exitContext"> 1201 /// </returns>
1202 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1202 public static int WaitAny(
1203 /// </param> 1203 IWaitableResult [] waitableResults,
1204 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param> 1204 int millisecondsTimeout,
1205 /// <returns> 1205 bool exitContext)
1206 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. 1206 {
1207 /// </returns> 1207 return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, null);
1208 public static int WaitAny( 1208 }
1209 IWorkItemResult [] workItemResults, 1209
1210 TimeSpan timeout, 1210 /// <summary>
1211 bool exitContext, 1211 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1212 WaitHandle cancelWaitHandle) 1212 /// </summary>
1213 { 1213 /// <param name="waitableResults">Array of work item result objects</param>
1214 return WaitAny(workItemResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle); 1214 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1215 } 1215 /// <param name="exitContext">
1216 1216 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1217 /// <summary> 1217 /// </param>
1218 /// Waits for any of the work items in the specified array to complete, cancel, or timeout 1218 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1219 /// </summary> 1219 /// <returns>
1220 /// <param name="workItemResults">Array of work item result objects</param> 1220 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1221 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param> 1221 /// </returns>
1222 /// <param name="exitContext"> 1222 public static int WaitAny(
1223 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1223 IWaitableResult [] waitableResults,
1224 /// </param> 1224 int millisecondsTimeout,
1225 /// <returns> 1225 bool exitContext,
1226 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. 1226 WaitHandle cancelWaitHandle)
1227 /// </returns> 1227 {
1228 public static int WaitAny( 1228 return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle);
1229 IWorkItemResult [] workItemResults, 1229 }
1230 int millisecondsTimeout, 1230
1231 bool exitContext) 1231 /// <summary>
1232 { 1232 /// Creates a new WorkItemsGroup.
1233 return WorkItem.WaitAny(workItemResults, millisecondsTimeout, exitContext, null); 1233 /// </summary>
1234 } 1234 /// <param name="concurrency">The number of work items that can be run concurrently</param>
1235 1235 /// <returns>A reference to the WorkItemsGroup</returns>
1236 /// <summary> 1236 public IWorkItemsGroup CreateWorkItemsGroup(int concurrency)
1237 /// Waits for any of the work items in the specified array to complete, cancel, or timeout 1237 {
1238 /// </summary> 1238 IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, _stpStartInfo);
1239 /// <param name="workItemResults">Array of work item result objects</param> 1239 return workItemsGroup;
1240 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param> 1240 }
1241 /// <param name="exitContext"> 1241
1242 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 1242 /// <summary>
1243 /// </param> 1243 /// Creates a new WorkItemsGroup.
1244 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param> 1244 /// </summary>
1245 /// <returns> 1245 /// <param name="concurrency">The number of work items that can be run concurrently</param>
1246 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. 1246 /// <param name="wigStartInfo">A WorkItemsGroup configuration that overrides the default behavior</param>
1247 /// </returns> 1247 /// <returns>A reference to the WorkItemsGroup</returns>
1248 public static int WaitAny( 1248 public IWorkItemsGroup CreateWorkItemsGroup(int concurrency, WIGStartInfo wigStartInfo)
1249 IWorkItemResult [] workItemResults, 1249 {
1250 int millisecondsTimeout, 1250 IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, wigStartInfo);
1251 bool exitContext, 1251 return workItemsGroup;
1252 WaitHandle cancelWaitHandle) 1252 }
1253 { 1253
1254 return WorkItem.WaitAny(workItemResults, millisecondsTimeout, exitContext, cancelWaitHandle); 1254 #region Fire Thread's Events
1255 } 1255
1256 1256 private void FireOnThreadInitialization()
1257 public IWorkItemsGroup CreateWorkItemsGroup(int concurrency) 1257 {
1258 { 1258 if (null != _onThreadInitialization)
1259 IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, _stpStartInfo); 1259 {
1260 return workItemsGroup; 1260 foreach (ThreadInitializationHandler tih in _onThreadInitialization.GetInvocationList())
1261 } 1261 {
1262 1262 try
1263 public IWorkItemsGroup CreateWorkItemsGroup(int concurrency, WIGStartInfo wigStartInfo) 1263 {
1264 { 1264 tih();
1265 IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, wigStartInfo); 1265 }
1266 return workItemsGroup; 1266 catch (Exception e)
1267 } 1267 {
1268 1268 e.GetHashCode();
1269 public event WorkItemsGroupIdleHandler OnIdle 1269 Debug.Assert(false);
1270 { 1270 throw;
1271 add 1271 }
1272 { 1272 }
1273 throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature."); 1273 }
1274 //_onIdle += value; 1274 }
1275 } 1275
1276 remove 1276 private void FireOnThreadTermination()
1277 { 1277 {
1278 throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature."); 1278 if (null != _onThreadTermination)
1279 //_onIdle -= value; 1279 {
1280 } 1280 foreach (ThreadTerminationHandler tth in _onThreadTermination.GetInvocationList())
1281 } 1281 {
1282 1282 try
1283 public void Cancel() 1283 {
1284 { 1284 tth();
1285 ICollection workItemsGroups = _workItemsGroups.Values; 1285 }
1286 foreach(WorkItemsGroup workItemsGroup in workItemsGroups) 1286 catch (Exception e)
1287 { 1287 {
1288 workItemsGroup.Cancel(); 1288 e.GetHashCode();
1289 } 1289 Debug.Assert(false);
1290 } 1290 throw;
1291 1291 }
1292 public void Start() 1292 }
1293 { 1293 }
1294 lock (this) 1294 }
1295 { 1295
1296 if (!this._stpStartInfo.StartSuspended) 1296 #endregion
1297 { 1297
1298 return; 1298 /// <summary>
1299 } 1299 /// This event is fired when a thread is created.
1300 _stpStartInfo.StartSuspended = false; 1300 /// Use it to initialize a thread before the work items use it.
1301 } 1301 /// </summary>
1302 1302 public event ThreadInitializationHandler OnThreadInitialization
1303 ICollection workItemsGroups = _workItemsGroups.Values; 1303 {
1304 foreach(WorkItemsGroup workItemsGroup in workItemsGroups) 1304 add { _onThreadInitialization += value; }
1305 { 1305 remove { _onThreadInitialization -= value; }
1306 workItemsGroup.OnSTPIsStarting(); 1306 }
1307 } 1307
1308 1308 /// <summary>
1309 StartOptimalNumberOfThreads(); 1309 /// This event is fired when a thread is terminating.
1310 } 1310 /// Use it for cleanup.
1311 1311 /// </summary>
1312 #endregion 1312 public event ThreadTerminationHandler OnThreadTermination
1313 1313 {
1314 #region Properties 1314 add { _onThreadTermination += value; }
1315 1315 remove { _onThreadTermination -= value; }
1316 /// <summary> 1316 }
1317 /// Get/Set the name of the SmartThreadPool instance 1317
1318 /// </summary> 1318
1319 public string Name 1319 internal void CancelAbortWorkItemsGroup(WorkItemsGroup wig)
1320 { 1320 {
1321 get 1321 foreach (ThreadEntry threadEntry in _workerThreads.Values)
1322 { 1322 {
1323 return _name; 1323 WorkItem workItem = threadEntry.CurrentWorkItem;
1324 } 1324 if (null != workItem &&
1325 1325 workItem.WasQueuedBy(wig) &&
1326 set 1326 !workItem.IsCanceled)
1327 { 1327 {
1328 _name = value; 1328 threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true);
1329 } 1329 }
1330 } 1330 }
1331 1331 }
1332 /// <summary> 1332
1333 /// Get the lower limit of threads in the pool. 1333
1334 /// </summary> 1334
1335 public int MinThreads 1335 #endregion
1336 { 1336
1337 get 1337 #region Properties
1338 { 1338
1339 ValidateNotDisposed(); 1339 /// <summary>
1340 return _stpStartInfo.MinWorkerThreads; 1340 /// Get/Set the lower limit of threads in the pool.
1341 } 1341 /// </summary>
1342 } 1342 public int MinThreads
1343 1343 {
1344 /// <summary> 1344 get
1345 /// Get the upper limit of threads in the pool. 1345 {
1346 /// </summary> 1346 ValidateNotDisposed();
1347 public int MaxThreads 1347 return _stpStartInfo.MinWorkerThreads;
1348 { 1348 }
1349 get 1349 set
1350 { 1350 {
1351 ValidateNotDisposed(); 1351 Debug.Assert(value >= 0);
1352 return _stpStartInfo.MaxWorkerThreads; 1352 Debug.Assert(value <= _stpStartInfo.MaxWorkerThreads);
1353 } 1353 if (_stpStartInfo.MaxWorkerThreads < value)
1354 } 1354 {
1355 /// <summary> 1355 _stpStartInfo.MaxWorkerThreads = value;
1356 /// Get the number of threads in the thread pool. 1356 }
1357 /// Should be between the lower and the upper limits. 1357 _stpStartInfo.MinWorkerThreads = value;
1358 /// </summary> 1358 StartOptimalNumberOfThreads();
1359 public int ActiveThreads 1359 }
1360 { 1360 }
1361 get 1361
1362 { 1362 /// <summary>
1363 ValidateNotDisposed(); 1363 /// Get/Set the upper limit of threads in the pool.
1364 return _workerThreads.Count; 1364 /// </summary>
1365 } 1365 public int MaxThreads
1366 } 1366 {
1367 1367 get
1368 /// <summary> 1368 {
1369 /// Get the number of busy (not idle) threads in the thread pool. 1369 ValidateNotDisposed();
1370 /// </summary> 1370 return _stpStartInfo.MaxWorkerThreads;
1371 public int InUseThreads 1371 }
1372 { 1372
1373 get 1373 set
1374 { 1374 {
1375 ValidateNotDisposed(); 1375 Debug.Assert(value > 0);
1376 return _inUseWorkerThreads; 1376 Debug.Assert(value >= _stpStartInfo.MinWorkerThreads);
1377 } 1377 if (_stpStartInfo.MinWorkerThreads > value)
1378 } 1378 {
1379 1379 _stpStartInfo.MinWorkerThreads = value;
1380 /// <summary> 1380 }
1381 /// Get the number of work items in the queue. 1381 _stpStartInfo.MaxWorkerThreads = value;
1382 /// </summary> 1382 StartOptimalNumberOfThreads();
1383 public int WaitingCallbacks 1383 }
1384 { 1384 }
1385 get 1385 /// <summary>
1386 { 1386 /// Get the number of threads in the thread pool.
1387 ValidateNotDisposed(); 1387 /// Should be between the lower and the upper limits.
1388 return _workItemsQueue.Count; 1388 /// </summary>
1389 } 1389 public int ActiveThreads
1390 } 1390 {
1391 1391 get
1392 1392 {
1393 public event EventHandler Idle 1393 ValidateNotDisposed();
1394 { 1394 return _workerThreads.Count;
1395 add 1395 }
1396 { 1396 }
1397 _stpIdle += value; 1397
1398 } 1398 /// <summary>
1399 1399 /// Get the number of busy (not idle) threads in the thread pool.
1400 remove 1400 /// </summary>
1401 { 1401 public int InUseThreads
1402 _stpIdle -= value; 1402 {
1403 } 1403 get
1404 } 1404 {
1405 1405 ValidateNotDisposed();
1406 #endregion 1406 return _inUseWorkerThreads;
1407 1407 }
1408 #region IDisposable Members 1408 }
1409 1409
1410// ~SmartThreadPool() 1410 /// <summary>
1411// { 1411 /// Returns true if the current running work item has been cancelled.
1412// Dispose(); 1412 /// Must be used within the work item's callback method.
1413// } 1413 /// The work item should sample this value in order to know if it
1414 1414 /// needs to quit before its completion.
1415 public void Dispose() 1415 /// </summary>
1416 { 1416 public static bool IsWorkItemCanceled
1417 if (!_isDisposed) 1417 {
1418 { 1418 get
1419 if (!_shutdown) 1419 {
1420 { 1420 return CurrentThreadEntry.CurrentWorkItem.IsCanceled;
1421 Shutdown(); 1421 }
1422 } 1422 }
1423 1423
1424 if (null != _shuttingDownEvent) 1424 /// <summary>
1425 { 1425 /// Checks if the work item has been cancelled, and if yes then abort the thread.
1426 _shuttingDownEvent.Close(); 1426 /// Can be used with Cancel and timeout
1427 _shuttingDownEvent = null; 1427 /// </summary>
1428 } 1428 public static void AbortOnWorkItemCancel()
1429 _workerThreads.Clear(); 1429 {
1430 _isDisposed = true; 1430 if (IsWorkItemCanceled)
1431 GC.SuppressFinalize(this); 1431 {
1432 } 1432 Thread.CurrentThread.Abort();
1433 } 1433 }
1434 1434 }
1435 private void ValidateNotDisposed() 1435
1436 { 1436 /// <summary>
1437 if(_isDisposed) 1437 /// Thread Pool start information (readonly)
1438 { 1438 /// </summary>
1439 throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown"); 1439 public STPStartInfo STPStartInfo
1440 } 1440 {
1441 } 1441 get
1442 #endregion 1442 {
1443 } 1443 return _stpStartInfo.AsReadOnly();
1444 #endregion 1444 }
1445} 1445 }
1446
1447 public bool IsShuttingdown
1448 {
1449 get { return _shutdown; }
1450 }
1451
1452 /// <summary>
1453 /// Return the local calculated performance counters
1454 /// Available only if STPStartInfo.EnableLocalPerformanceCounters is true.
1455 /// </summary>
1456 public ISTPPerformanceCountersReader PerformanceCountersReader
1457 {
1458 get { return (ISTPPerformanceCountersReader)_localPCs; }
1459 }
1460
1461 #endregion
1462
1463 #region IDisposable Members
1464
1465 public void Dispose()
1466 {
1467 if (!_isDisposed)
1468 {
1469 if (!_shutdown)
1470 {
1471 Shutdown();
1472 }
1473
1474 if (null != _shuttingDownEvent)
1475 {
1476 _shuttingDownEvent.Close();
1477 _shuttingDownEvent = null;
1478 }
1479 _workerThreads.Clear();
1480
1481 if (null != _isIdleWaitHandle)
1482 {
1483 _isIdleWaitHandle.Close();
1484 _isIdleWaitHandle = null;
1485 }
1486
1487 _isDisposed = true;
1488 }
1489 }
1490
1491 private void ValidateNotDisposed()
1492 {
1493 if(_isDisposed)
1494 {
1495 throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
1496 }
1497 }
1498 #endregion
1499
1500 #region WorkItemsGroupBase Overrides
1501
1502 /// <summary>
1503 /// Get/Set the maximum number of work items that execute cocurrency on the thread pool
1504 /// </summary>
1505 public override int Concurrency
1506 {
1507 get { return MaxThreads; }
1508 set { MaxThreads = value; }
1509 }
1510
1511 /// <summary>
1512 /// Get the number of work items in the queue.
1513 /// </summary>
1514 public override int WaitingCallbacks
1515 {
1516 get
1517 {
1518 ValidateNotDisposed();
1519 return _workItemsQueue.Count;
1520 }
1521 }
1522
1523 /// <summary>
1524 /// Get an array with all the state objects of the currently running items.
1525 /// The array represents a snap shot and impact performance.
1526 /// </summary>
1527 public override object[] GetStates()
1528 {
1529 object[] states = _workItemsQueue.GetStates();
1530 return states;
1531 }
1532
1533 /// <summary>
1534 /// WorkItemsGroup start information (readonly)
1535 /// </summary>
1536 public override WIGStartInfo WIGStartInfo
1537 {
1538 get { return _stpStartInfo.AsReadOnly(); }
1539 }
1540
1541 /// <summary>
1542 /// Start the thread pool if it was started suspended.
1543 /// If it is already running, this method is ignored.
1544 /// </summary>
1545 public override void Start()
1546 {
1547 if (!_isSuspended)
1548 {
1549 return;
1550 }
1551 _isSuspended = false;
1552
1553 ICollection workItemsGroups = _workItemsGroups.Values;
1554 foreach (WorkItemsGroup workItemsGroup in workItemsGroups)
1555 {
1556 workItemsGroup.OnSTPIsStarting();
1557 }
1558
1559 StartOptimalNumberOfThreads();
1560 }
1561
1562 /// <summary>
1563 /// Cancel all work items using thread abortion
1564 /// </summary>
1565 /// <param name="abortExecution">True to stop work items by raising ThreadAbortException</param>
1566 public override void Cancel(bool abortExecution)
1567 {
1568 _canceledSmartThreadPool.IsCanceled = true;
1569 _canceledSmartThreadPool = new CanceledWorkItemsGroup();
1570
1571 ICollection workItemsGroups = _workItemsGroups.Values;
1572 foreach (WorkItemsGroup workItemsGroup in workItemsGroups)
1573 {
1574 workItemsGroup.Cancel(abortExecution);
1575 }
1576
1577 if (abortExecution)
1578 {
1579 foreach (ThreadEntry threadEntry in _workerThreads.Values)
1580 {
1581 WorkItem workItem = threadEntry.CurrentWorkItem;
1582 if (null != workItem &&
1583 threadEntry.AssociatedSmartThreadPool == this &&
1584 !workItem.IsCanceled)
1585 {
1586 threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true);
1587 }
1588 }
1589 }
1590 }
1591
1592 /// <summary>
1593 /// Wait for the thread pool to be idle
1594 /// </summary>
1595 public override bool WaitForIdle(int millisecondsTimeout)
1596 {
1597 ValidateWaitForIdle();
1598 return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
1599 }
1600
1601 /// <summary>
1602 /// This event is fired when all work items are completed.
1603 /// (When IsIdle changes to true)
1604 /// This event only work on WorkItemsGroup. On SmartThreadPool
1605 /// it throws the NotImplementedException.
1606 /// </summary>
1607 public override event WorkItemsGroupIdleHandler OnIdle
1608 {
1609 add
1610 {
1611 throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature.");
1612 //_onIdle += value;
1613 }
1614 remove
1615 {
1616 throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature.");
1617 //_onIdle -= value;
1618 }
1619 }
1620
1621 internal override void PreQueueWorkItem()
1622 {
1623 ValidateNotDisposed();
1624 }
1625
1626 #endregion
1627
1628 #region Join, Choice, Pipe, etc.
1629
1630 /// <summary>
1631 /// Executes all actions in parallel.
1632 /// Returns when they all finish.
1633 /// </summary>
1634 /// <param name="actions">Actions to execute</param>
1635 public void Join(IEnumerable<Action> actions)
1636 {
1637 WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
1638 IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo);
1639 foreach (Action action in actions)
1640 {
1641 workItemsGroup.QueueWorkItem(action);
1642 }
1643 workItemsGroup.Start();
1644 workItemsGroup.WaitForIdle();
1645 }
1646
1647 /// <summary>
1648 /// Executes all actions in parallel.
1649 /// Returns when they all finish.
1650 /// </summary>
1651 /// <param name="actions">Actions to execute</param>
1652 public void Join(params Action[] actions)
1653 {
1654 Join((IEnumerable<Action>)actions);
1655 }
1656
1657 private class ChoiceIndex
1658 {
1659 public int _index = -1;
1660 }
1661
1662 /// <summary>
1663 /// Executes all actions in parallel
1664 /// Returns when the first one completes
1665 /// </summary>
1666 /// <param name="actions">Actions to execute</param>
1667 public int Choice(IEnumerable<Action> actions)
1668 {
1669 WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
1670 IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo);
1671
1672 ManualResetEvent anActionCompleted = new ManualResetEvent(false);
1673
1674 ChoiceIndex choiceIndex = new ChoiceIndex();
1675
1676 int i = 0;
1677 foreach (Action action in actions)
1678 {
1679 Action act = action;
1680 int value = i;
1681 workItemsGroup.QueueWorkItem(() => { act(); Interlocked.CompareExchange(ref choiceIndex._index, value, -1); anActionCompleted.Set(); });
1682 ++i;
1683 }
1684 workItemsGroup.Start();
1685 anActionCompleted.WaitOne();
1686
1687 return choiceIndex._index;
1688 }
1689
1690 /// <summary>
1691 /// Executes all actions in parallel
1692 /// Returns when the first one completes
1693 /// </summary>
1694 /// <param name="actions">Actions to execute</param>
1695 public int Choice(params Action[] actions)
1696 {
1697 return Choice((IEnumerable<Action>)actions);
1698 }
1699
1700 /// <summary>
1701 /// Executes actions in sequence asynchronously.
1702 /// Returns immediately.
1703 /// </summary>
1704 /// <param name="pipeState">A state context that passes </param>
1705 /// <param name="actions">Actions to execute in the order they should run</param>
1706 public void Pipe<T>(T pipeState, IEnumerable<Action<T>> actions)
1707 {
1708 WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
1709 IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(1, wigStartInfo);
1710 foreach (Action<T> action in actions)
1711 {
1712 Action<T> act = action;
1713 workItemsGroup.QueueWorkItem(() => act(pipeState));
1714 }
1715 workItemsGroup.Start();
1716 workItemsGroup.WaitForIdle();
1717 }
1718
1719 /// <summary>
1720 /// Executes actions in sequence asynchronously.
1721 /// Returns immediately.
1722 /// </summary>
1723 /// <param name="pipeState"></param>
1724 /// <param name="actions">Actions to execute in the order they should run</param>
1725 public void Pipe<T>(T pipeState, params Action<T>[] actions)
1726 {
1727 Pipe(pipeState, (IEnumerable<Action<T>>)actions);
1728 }
1729 #endregion
1730 }
1731 #endregion
1732}