aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/ThirdParty/SmartThreadPool/WorkItem.cs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--ThirdParty/SmartThreadPool/WorkItem.cs649
1 files changed, 303 insertions, 346 deletions
diff --git a/ThirdParty/SmartThreadPool/WorkItem.cs b/ThirdParty/SmartThreadPool/WorkItem.cs
index d0c0524..f229d1f 100644
--- a/ThirdParty/SmartThreadPool/WorkItem.cs
+++ b/ThirdParty/SmartThreadPool/WorkItem.cs
@@ -1,58 +1,13 @@
1// Ami Bar
2// amibar@gmail.com
3
4using System; 1using System;
5using System.Threading; 2using System.Threading;
6using System.Diagnostics; 3using System.Diagnostics;
7 4
8namespace Amib.Threading.Internal 5namespace Amib.Threading.Internal
9{ 6{
10 #region WorkItem Delegate
11
12 /// <summary>
13 /// An internal delegate to call when the WorkItem starts or completes
14 /// </summary>
15 internal delegate void WorkItemStateCallback(WorkItem workItem);
16
17 #endregion
18
19 #region IInternalWorkItemResult interface
20
21 public class CanceledWorkItemsGroup
22 {
23 public readonly static CanceledWorkItemsGroup NotCanceledWorkItemsGroup = new CanceledWorkItemsGroup();
24
25 private bool _isCanceled = false;
26 public bool IsCanceled
27 {
28 get { return _isCanceled; }
29 set { _isCanceled = value; }
30 }
31 }
32
33 internal interface IInternalWorkItemResult
34 {
35 event WorkItemStateCallback OnWorkItemStarted;
36 event WorkItemStateCallback OnWorkItemCompleted;
37 }
38
39 #endregion
40
41 #region IWorkItem interface
42
43 public interface IWorkItem
44 {
45
46 }
47
48 #endregion
49
50 #region WorkItem class
51
52 /// <summary> 7 /// <summary>
53 /// Holds a callback delegate and the state for that delegate. 8 /// Holds a callback delegate and the state for that delegate.
54 /// </summary> 9 /// </summary>
55 public class WorkItem : IHasWorkItemPriority, IWorkItem 10 public partial class WorkItem : IHasWorkItemPriority
56 { 11 {
57 #region WorkItemState enum 12 #region WorkItemState enum
58 13
@@ -61,33 +16,57 @@ namespace Amib.Threading.Internal
61 /// </summary> 16 /// </summary>
62 private enum WorkItemState 17 private enum WorkItemState
63 { 18 {
64 InQueue, 19 InQueue = 0, // Nexts: InProgress, Canceled
65 InProgress, 20 InProgress = 1, // Nexts: Completed, Canceled
66 Completed, 21 Completed = 2, // Stays Completed
67 Canceled, 22 Canceled = 3, // Stays Canceled
68 } 23 }
69 24
70 #endregion 25 private static bool IsValidStatesTransition(WorkItemState currentState, WorkItemState nextState)
26 {
27 bool valid = false;
28
29 switch (currentState)
30 {
31 case WorkItemState.InQueue:
32 valid = (WorkItemState.InProgress == nextState) || (WorkItemState.Canceled == nextState);
33 break;
34 case WorkItemState.InProgress:
35 valid = (WorkItemState.Completed == nextState) || (WorkItemState.Canceled == nextState);
36 break;
37 case WorkItemState.Completed:
38 case WorkItemState.Canceled:
39 // Cannot be changed
40 break;
41 default:
42 // Unknown state
43 Debug.Assert(false);
44 break;
45 }
71 46
72 #region Member Variables 47 return valid;
48 }
73 49
74 public Thread currentThread; 50 #endregion
51
52 #region Fields
75 53
76 /// <summary> 54 /// <summary>
77 /// Callback delegate for the callback. 55 /// Callback delegate for the callback.
78 /// </summary> 56 /// </summary>
79 private WorkItemCallback _callback; 57 private readonly WorkItemCallback _callback;
80 58
81 /// <summary> 59 /// <summary>
82 /// State with which to call the callback delegate. 60 /// State with which to call the callback delegate.
83 /// </summary> 61 /// </summary>
84 private object _state; 62 private object _state;
85 63
64#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
86 /// <summary> 65 /// <summary>
87 /// Stores the caller's context 66 /// Stores the caller's context
88 /// </summary> 67 /// </summary>
89 private CallerThreadContext _callerContext; 68 private readonly CallerThreadContext _callerContext;
90 69#endif
91 /// <summary> 70 /// <summary>
92 /// Holds the result of the mehtod 71 /// Holds the result of the mehtod
93 /// </summary> 72 /// </summary>
@@ -117,12 +96,12 @@ namespace Amib.Threading.Internal
117 /// <summary> 96 /// <summary>
118 /// Represents the result state of the work item 97 /// Represents the result state of the work item
119 /// </summary> 98 /// </summary>
120 private WorkItemResult _workItemResult; 99 private readonly WorkItemResult _workItemResult;
121 100
122 /// <summary> 101 /// <summary>
123 /// Work item info 102 /// Work item info
124 /// </summary> 103 /// </summary>
125 private WorkItemInfo _workItemInfo; 104 private readonly WorkItemInfo _workItemInfo;
126 105
127 /// <summary> 106 /// <summary>
128 /// Called when the WorkItem starts 107 /// Called when the WorkItem starts
@@ -141,30 +120,41 @@ namespace Amib.Threading.Internal
141 private CanceledWorkItemsGroup _canceledWorkItemsGroup = CanceledWorkItemsGroup.NotCanceledWorkItemsGroup; 120 private CanceledWorkItemsGroup _canceledWorkItemsGroup = CanceledWorkItemsGroup.NotCanceledWorkItemsGroup;
142 121
143 /// <summary> 122 /// <summary>
123 /// A reference to an object that indicates whatever the
124 /// SmartThreadPool has been canceled
125 /// </summary>
126 private CanceledWorkItemsGroup _canceledSmartThreadPool = CanceledWorkItemsGroup.NotCanceledWorkItemsGroup;
127
128 /// <summary>
144 /// The work item group this work item belong to. 129 /// The work item group this work item belong to.
145 ///
146 /// </summary> 130 /// </summary>
147 private IWorkItemsGroup _workItemsGroup; 131 private readonly IWorkItemsGroup _workItemsGroup;
148 132
149 #region Performance Counter fields 133 /// <summary>
134 /// The thread that executes this workitem.
135 /// This field is available for the period when the work item is executed, before and after it is null.
136 /// </summary>
137 private Thread _executingThread;
150 138
151 /// <summary> 139 /// <summary>
152 /// The time when the work items is queued. 140 /// The absulote time when the work item will be timeout
153 /// Used with the performance counter.
154 /// </summary> 141 /// </summary>
155 private DateTime _queuedTime; 142 private long _expirationTime;
143
144 #region Performance Counter fields
145
146
147
156 148
157 /// <summary> 149 /// <summary>
158 /// The time when the work items starts its execution. 150 /// Stores how long the work item waited on the stp queue
159 /// Used with the performance counter.
160 /// </summary> 151 /// </summary>
161 private DateTime _beginProcessTime; 152 private Stopwatch _waitingOnQueueStopwatch;
162 153
163 /// <summary> 154 /// <summary>
164 /// The time when the work items ends its execution. 155 /// Stores how much time it took the work item to execute after it went out of the queue
165 /// Used with the performance counter.
166 /// </summary> 156 /// </summary>
167 private DateTime _endProcessTime; 157 private Stopwatch _processingStopwatch;
168 158
169 #endregion 159 #endregion
170 160
@@ -174,17 +164,25 @@ namespace Amib.Threading.Internal
174 164
175 public TimeSpan WaitingTime 165 public TimeSpan WaitingTime
176 { 166 {
177 get 167 get
178 { 168 {
179 return (_beginProcessTime - _queuedTime); 169 return _waitingOnQueueStopwatch.Elapsed;
180 } 170 }
181 } 171 }
182 172
183 public TimeSpan ProcessTime 173 public TimeSpan ProcessTime
184 { 174 {
185 get 175 get
176 {
177 return _processingStopwatch.Elapsed;
178 }
179 }
180
181 internal WorkItemInfo WorkItemInfo
182 {
183 get
186 { 184 {
187 return (_endProcessTime - _beginProcessTime); 185 return _workItemInfo;
188 } 186 }
189 } 187 }
190 188
@@ -195,6 +193,8 @@ namespace Amib.Threading.Internal
195 /// <summary> 193 /// <summary>
196 /// Initialize the callback holding object. 194 /// Initialize the callback holding object.
197 /// </summary> 195 /// </summary>
196 /// <param name="workItemsGroup">The workItemGroup of the workitem</param>
197 /// <param name="workItemInfo">The WorkItemInfo of te workitem</param>
198 /// <param name="callback">Callback delegate for the callback.</param> 198 /// <param name="callback">Callback delegate for the callback.</param>
199 /// <param name="state">State with which to call the callback delegate.</param> 199 /// <param name="state">State with which to call the callback delegate.</param>
200 /// 200 ///
@@ -203,16 +203,18 @@ namespace Amib.Threading.Internal
203 public WorkItem( 203 public WorkItem(
204 IWorkItemsGroup workItemsGroup, 204 IWorkItemsGroup workItemsGroup,
205 WorkItemInfo workItemInfo, 205 WorkItemInfo workItemInfo,
206 WorkItemCallback callback, 206 WorkItemCallback callback,
207 object state) 207 object state)
208 { 208 {
209 _workItemsGroup = workItemsGroup; 209 _workItemsGroup = workItemsGroup;
210 _workItemInfo = workItemInfo; 210 _workItemInfo = workItemInfo;
211 211
212#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
212 if (_workItemInfo.UseCallerCallContext || _workItemInfo.UseCallerHttpContext) 213 if (_workItemInfo.UseCallerCallContext || _workItemInfo.UseCallerHttpContext)
213 { 214 {
214 _callerContext = CallerThreadContext.Capture(_workItemInfo.UseCallerCallContext, _workItemInfo.UseCallerHttpContext); 215 _callerContext = CallerThreadContext.Capture(_workItemInfo.UseCallerCallContext, _workItemInfo.UseCallerHttpContext);
215 } 216 }
217#endif
216 218
217 _callback = callback; 219 _callback = callback;
218 _state = state; 220 _state = state;
@@ -222,9 +224,18 @@ namespace Amib.Threading.Internal
222 224
223 internal void Initialize() 225 internal void Initialize()
224 { 226 {
227 // The _workItemState is changed directly instead of using the SetWorkItemState
228 // method since we don't want to go throught IsValidStateTransition.
225 _workItemState = WorkItemState.InQueue; 229 _workItemState = WorkItemState.InQueue;
230
226 _workItemCompleted = null; 231 _workItemCompleted = null;
227 _workItemCompletedRefCount = 0; 232 _workItemCompletedRefCount = 0;
233 _waitingOnQueueStopwatch = new Stopwatch();
234 _processingStopwatch = new Stopwatch();
235 _expirationTime =
236 _workItemInfo.Timeout > 0 ?
237 DateTime.UtcNow.Ticks + _workItemInfo.Timeout * TimeSpan.TicksPerMillisecond :
238 long.MaxValue;
228 } 239 }
229 240
230 internal bool WasQueuedBy(IWorkItemsGroup workItemsGroup) 241 internal bool WasQueuedBy(IWorkItemsGroup workItemsGroup)
@@ -237,17 +248,16 @@ namespace Amib.Threading.Internal
237 248
238 #region Methods 249 #region Methods
239 250
240 public CanceledWorkItemsGroup CanceledWorkItemsGroup 251 internal CanceledWorkItemsGroup CanceledWorkItemsGroup
241 { 252 {
242 get 253 get { return _canceledWorkItemsGroup; }
243 { 254 set { _canceledWorkItemsGroup = value; }
244 return _canceledWorkItemsGroup; 255 }
245 }
246 256
247 set 257 internal CanceledWorkItemsGroup CanceledSmartThreadPool
248 { 258 {
249 _canceledWorkItemsGroup = value; 259 get { return _canceledSmartThreadPool; }
250 } 260 set { _canceledSmartThreadPool = value; }
251 } 261 }
252 262
253 /// <summary> 263 /// <summary>
@@ -259,9 +269,10 @@ namespace Amib.Threading.Internal
259 /// </returns> 269 /// </returns>
260 public bool StartingWorkItem() 270 public bool StartingWorkItem()
261 { 271 {
262 _beginProcessTime = DateTime.Now; 272 _waitingOnQueueStopwatch.Stop();
273 _processingStopwatch.Start();
263 274
264 lock(this) 275 lock (this)
265 { 276 {
266 if (IsCanceled) 277 if (IsCanceled)
267 { 278 {
@@ -277,6 +288,9 @@ namespace Amib.Threading.Internal
277 288
278 Debug.Assert(WorkItemState.InQueue == GetWorkItemState()); 289 Debug.Assert(WorkItemState.InQueue == GetWorkItemState());
279 290
291 // No need for a lock yet, only after the state has changed to InProgress
292 _executingThread = Thread.CurrentThread;
293
280 SetWorkItemState(WorkItemState.InProgress); 294 SetWorkItemState(WorkItemState.InProgress);
281 } 295 }
282 296
@@ -291,7 +305,7 @@ namespace Amib.Threading.Internal
291 CallToPostExecute currentCallToPostExecute = 0; 305 CallToPostExecute currentCallToPostExecute = 0;
292 306
293 // Execute the work item if we are in the correct state 307 // Execute the work item if we are in the correct state
294 switch(GetWorkItemState()) 308 switch (GetWorkItemState())
295 { 309 {
296 case WorkItemState.InProgress: 310 case WorkItemState.InProgress:
297 currentCallToPostExecute |= CallToPostExecute.WhenWorkItemNotCanceled; 311 currentCallToPostExecute |= CallToPostExecute.WhenWorkItemNotCanceled;
@@ -311,7 +325,7 @@ namespace Amib.Threading.Internal
311 PostExecute(); 325 PostExecute();
312 } 326 }
313 327
314 _endProcessTime = DateTime.Now; 328 _processingStopwatch.Stop();
315 } 329 }
316 330
317 internal void FireWorkItemCompleted() 331 internal void FireWorkItemCompleted()
@@ -323,8 +337,21 @@ namespace Amib.Threading.Internal
323 _workItemCompletedEvent(this); 337 _workItemCompletedEvent(this);
324 } 338 }
325 } 339 }
326 catch // Ignore exceptions 340 catch // Suppress exceptions
327 {} 341 { }
342 }
343
344 internal void FireWorkItemStarted()
345 {
346 try
347 {
348 if (null != _workItemStartedEvent)
349 {
350 _workItemStartedEvent(this);
351 }
352 }
353 catch // Suppress exceptions
354 { }
328 } 355 }
329 356
330 /// <summary> 357 /// <summary>
@@ -332,32 +359,70 @@ namespace Amib.Threading.Internal
332 /// </summary> 359 /// </summary>
333 private void ExecuteWorkItem() 360 private void ExecuteWorkItem()
334 { 361 {
362
363#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
335 CallerThreadContext ctc = null; 364 CallerThreadContext ctc = null;
336 if (null != _callerContext) 365 if (null != _callerContext)
337 { 366 {
338 ctc = CallerThreadContext.Capture(_callerContext.CapturedCallContext, _callerContext.CapturedHttpContext); 367 ctc = CallerThreadContext.Capture(_callerContext.CapturedCallContext, _callerContext.CapturedHttpContext);
339 CallerThreadContext.Apply(_callerContext); 368 CallerThreadContext.Apply(_callerContext);
340 } 369 }
370#endif
341 371
342 Exception exception = null; 372 Exception exception = null;
343 object result = null; 373 object result = null;
344 374
345 try 375 try
346 { 376 {
347 result = _callback(_state); 377 try
378 {
379 result = _callback(_state);
380 }
381 catch (Exception e)
382 {
383 // Save the exception so we can rethrow it later
384 exception = e;
385 }
386
387 // Remove the value of the execution thread, so it will be impossible to cancel the work item,
388 // since it is already completed.
389 // Cancelling a work item that already completed may cause the abortion of the next work item!!!
390 Thread executionThread = Interlocked.CompareExchange(ref _executingThread, null, _executingThread);
391
392 if (null == executionThread)
393 {
394 // Oops! we are going to be aborted..., Wait here so we can catch the ThreadAbortException
395 Thread.Sleep(60 * 1000);
396
397 // If after 1 minute this thread was not aborted then let it continue working.
398 }
348 } 399 }
349 catch (Exception e) 400 // We must treat the ThreadAbortException or else it will be stored in the exception variable
401 catch (ThreadAbortException tae)
350 { 402 {
351 // Save the exception so we can rethrow it later 403 tae.GetHashCode();
352 exception = e; 404 // Check if the work item was cancelled
405 // If we got a ThreadAbortException and the STP is not shutting down, it means the
406 // work items was cancelled.
407 if (!SmartThreadPool.CurrentThreadEntry.AssociatedSmartThreadPool.IsShuttingdown)
408 {
409#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
410 Thread.ResetAbort();
411#endif
412 }
353 } 413 }
354 414
415#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
355 if (null != _callerContext) 416 if (null != _callerContext)
356 { 417 {
357 CallerThreadContext.Apply(ctc); 418 CallerThreadContext.Apply(ctc);
358 } 419 }
420#endif
359 421
360 SetResult(result, exception); 422 if (!SmartThreadPool.IsWorkItemCanceled)
423 {
424 SetResult(result, exception);
425 }
361 } 426 }
362 427
363 /// <summary> 428 /// <summary>
@@ -369,9 +434,9 @@ namespace Amib.Threading.Internal
369 { 434 {
370 try 435 try
371 { 436 {
372 _workItemInfo.PostExecuteWorkItemCallback(this._workItemResult); 437 _workItemInfo.PostExecuteWorkItemCallback(_workItemResult);
373 } 438 }
374 catch (Exception e) 439 catch (Exception e)
375 { 440 {
376 Debug.Assert(null != e); 441 Debug.Assert(null != e);
377 } 442 }
@@ -382,6 +447,8 @@ namespace Amib.Threading.Internal
382 /// Set the result of the work item to return 447 /// Set the result of the work item to return
383 /// </summary> 448 /// </summary>
384 /// <param name="result">The result of the work item</param> 449 /// <param name="result">The result of the work item</param>
450 /// <param name="exception">The exception that was throw while the workitem executed, null
451 /// if there was no exception.</param>
385 internal void SetResult(object result, Exception exception) 452 internal void SetResult(object result, Exception exception)
386 { 453 {
387 _result = result; 454 _result = result;
@@ -401,48 +468,48 @@ namespace Amib.Threading.Internal
401 /// <summary> 468 /// <summary>
402 /// Wait for all work items to complete 469 /// Wait for all work items to complete
403 /// </summary> 470 /// </summary>
404 /// <param name="workItemResults">Array of work item result objects</param> 471 /// <param name="waitableResults">Array of work item result objects</param>
405 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param> 472 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
406 /// <param name="exitContext"> 473 /// <param name="exitContext">
407 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 474 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
408 /// </param> 475 /// </param>
409 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param> 476 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
410 /// <returns> 477 /// <returns>
411 /// true when every work item in workItemResults has completed; otherwise false. 478 /// true when every work item in waitableResults has completed; otherwise false.
412 /// </returns> 479 /// </returns>
413 internal static bool WaitAll( 480 internal static bool WaitAll(
414 IWorkItemResult [] workItemResults, 481 IWaitableResult[] waitableResults,
415 int millisecondsTimeout, 482 int millisecondsTimeout,
416 bool exitContext, 483 bool exitContext,
417 WaitHandle cancelWaitHandle) 484 WaitHandle cancelWaitHandle)
418 { 485 {
419 if (0 == workItemResults.Length) 486 if (0 == waitableResults.Length)
420 { 487 {
421 return true; 488 return true;
422 } 489 }
423 490
424 bool success; 491 bool success;
425 WaitHandle [] waitHandles = new WaitHandle[workItemResults.Length];; 492 WaitHandle[] waitHandles = new WaitHandle[waitableResults.Length];
426 GetWaitHandles(workItemResults, waitHandles); 493 GetWaitHandles(waitableResults, waitHandles);
427 494
428 if ((null == cancelWaitHandle) && (waitHandles.Length <= 64)) 495 if ((null == cancelWaitHandle) && (waitHandles.Length <= 64))
429 { 496 {
430 success = WaitHandle.WaitAll(waitHandles, millisecondsTimeout, exitContext); 497 success = STPEventWaitHandle.WaitAll(waitHandles, millisecondsTimeout, exitContext);
431 } 498 }
432 else 499 else
433 { 500 {
434 success = true; 501 success = true;
435 int millisecondsLeft = millisecondsTimeout; 502 int millisecondsLeft = millisecondsTimeout;
436 DateTime start = DateTime.Now; 503 Stopwatch stopwatch = Stopwatch.StartNew();
437 504
438 WaitHandle [] whs; 505 WaitHandle[] whs;
439 if (null != cancelWaitHandle) 506 if (null != cancelWaitHandle)
440 { 507 {
441 whs = new WaitHandle [] { null, cancelWaitHandle }; 508 whs = new WaitHandle[] { null, cancelWaitHandle };
442 } 509 }
443 else 510 else
444 { 511 {
445 whs = new WaitHandle [] { null }; 512 whs = new WaitHandle[] { null };
446 } 513 }
447 514
448 bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout); 515 bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout);
@@ -450,7 +517,7 @@ namespace Amib.Threading.Internal
450 // We cannot use WaitHandle.WaitAll directly, because the cancelWaitHandle 517 // We cannot use WaitHandle.WaitAll directly, because the cancelWaitHandle
451 // won't affect it. 518 // won't affect it.
452 // Each iteration we update the time left for the timeout. 519 // Each iteration we update the time left for the timeout.
453 for(int i = 0; i < workItemResults.Length; ++i) 520 for (int i = 0; i < waitableResults.Length; ++i)
454 { 521 {
455 // WaitAny don't work with negative numbers 522 // WaitAny don't work with negative numbers
456 if (!waitInfinitely && (millisecondsLeft < 0)) 523 if (!waitInfinitely && (millisecondsLeft < 0))
@@ -460,23 +527,22 @@ namespace Amib.Threading.Internal
460 } 527 }
461 528
462 whs[0] = waitHandles[i]; 529 whs[0] = waitHandles[i];
463 int result = WaitHandle.WaitAny(whs, millisecondsLeft, exitContext); 530 int result = STPEventWaitHandle.WaitAny(whs, millisecondsLeft, exitContext);
464 if((result > 0) || (WaitHandle.WaitTimeout == result)) 531 if ((result > 0) || (STPEventWaitHandle.WaitTimeout == result))
465 { 532 {
466 success = false; 533 success = false;
467 break; 534 break;
468 } 535 }
469 536
470 if(!waitInfinitely) 537 if (!waitInfinitely)
471 { 538 {
472 // Update the time left to wait 539 // Update the time left to wait
473 TimeSpan ts = DateTime.Now - start; 540 millisecondsLeft = millisecondsTimeout - (int)stopwatch.ElapsedMilliseconds;
474 millisecondsLeft = millisecondsTimeout - (int)ts.TotalMilliseconds;
475 } 541 }
476 } 542 }
477 } 543 }
478 // Release the wait handles 544 // Release the wait handles
479 ReleaseWaitHandles(workItemResults); 545 ReleaseWaitHandles(waitableResults);
480 546
481 return success; 547 return success;
482 } 548 }
@@ -484,7 +550,7 @@ namespace Amib.Threading.Internal
484 /// <summary> 550 /// <summary>
485 /// Waits for any of the work items in the specified array to complete, cancel, or timeout 551 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
486 /// </summary> 552 /// </summary>
487 /// <param name="workItemResults">Array of work item result objects</param> 553 /// <param name="waitableResults">Array of work item result objects</param>
488 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param> 554 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
489 /// <param name="exitContext"> 555 /// <param name="exitContext">
490 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 556 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
@@ -493,38 +559,38 @@ namespace Amib.Threading.Internal
493 /// <returns> 559 /// <returns>
494 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. 560 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
495 /// </returns> 561 /// </returns>
496 internal static int WaitAny( 562 internal static int WaitAny(
497 IWorkItemResult [] workItemResults, 563 IWaitableResult[] waitableResults,
498 int millisecondsTimeout, 564 int millisecondsTimeout,
499 bool exitContext, 565 bool exitContext,
500 WaitHandle cancelWaitHandle) 566 WaitHandle cancelWaitHandle)
501 { 567 {
502 WaitHandle [] waitHandles = null; 568 WaitHandle[] waitHandles;
503 569
504 if (null != cancelWaitHandle) 570 if (null != cancelWaitHandle)
505 { 571 {
506 waitHandles = new WaitHandle[workItemResults.Length+1]; 572 waitHandles = new WaitHandle[waitableResults.Length + 1];
507 GetWaitHandles(workItemResults, waitHandles); 573 GetWaitHandles(waitableResults, waitHandles);
508 waitHandles[workItemResults.Length] = cancelWaitHandle; 574 waitHandles[waitableResults.Length] = cancelWaitHandle;
509 } 575 }
510 else 576 else
511 { 577 {
512 waitHandles = new WaitHandle[workItemResults.Length]; 578 waitHandles = new WaitHandle[waitableResults.Length];
513 GetWaitHandles(workItemResults, waitHandles); 579 GetWaitHandles(waitableResults, waitHandles);
514 } 580 }
515 581
516 int result = WaitHandle.WaitAny(waitHandles, millisecondsTimeout, exitContext); 582 int result = STPEventWaitHandle.WaitAny(waitHandles, millisecondsTimeout, exitContext);
517 583
518 // Treat cancel as timeout 584 // Treat cancel as timeout
519 if (null != cancelWaitHandle) 585 if (null != cancelWaitHandle)
520 { 586 {
521 if (result == workItemResults.Length) 587 if (result == waitableResults.Length)
522 { 588 {
523 result = WaitHandle.WaitTimeout; 589 result = STPEventWaitHandle.WaitTimeout;
524 } 590 }
525 } 591 }
526 592
527 ReleaseWaitHandles(workItemResults); 593 ReleaseWaitHandles(waitableResults);
528 594
529 return result; 595 return result;
530 } 596 }
@@ -532,16 +598,16 @@ namespace Amib.Threading.Internal
532 /// <summary> 598 /// <summary>
533 /// Fill an array of wait handles with the work items wait handles. 599 /// Fill an array of wait handles with the work items wait handles.
534 /// </summary> 600 /// </summary>
535 /// <param name="workItemResults">An array of work item results</param> 601 /// <param name="waitableResults">An array of work item results</param>
536 /// <param name="waitHandles">An array of wait handles to fill</param> 602 /// <param name="waitHandles">An array of wait handles to fill</param>
537 private static void GetWaitHandles( 603 private static void GetWaitHandles(
538 IWorkItemResult [] workItemResults, 604 IWaitableResult[] waitableResults,
539 WaitHandle [] waitHandles) 605 WaitHandle[] waitHandles)
540 { 606 {
541 for(int i = 0; i < workItemResults.Length; ++i) 607 for (int i = 0; i < waitableResults.Length; ++i)
542 { 608 {
543 WorkItemResult wir = workItemResults[i] as WorkItemResult; 609 WorkItemResult wir = waitableResults[i].GetWorkItemResult() as WorkItemResult;
544 Debug.Assert(null != wir, "All workItemResults must be WorkItemResult objects"); 610 Debug.Assert(null != wir, "All waitableResults must be WorkItemResult objects");
545 611
546 waitHandles[i] = wir.GetWorkItem().GetWaitHandle(); 612 waitHandles[i] = wir.GetWorkItem().GetWaitHandle();
547 } 613 }
@@ -550,40 +616,64 @@ namespace Amib.Threading.Internal
550 /// <summary> 616 /// <summary>
551 /// Release the work items' wait handles 617 /// Release the work items' wait handles
552 /// </summary> 618 /// </summary>
553 /// <param name="workItemResults">An array of work item results</param> 619 /// <param name="waitableResults">An array of work item results</param>
554 private static void ReleaseWaitHandles(IWorkItemResult [] workItemResults) 620 private static void ReleaseWaitHandles(IWaitableResult[] waitableResults)
555 { 621 {
556 for(int i = 0; i < workItemResults.Length; ++i) 622 for (int i = 0; i < waitableResults.Length; ++i)
557 { 623 {
558 WorkItemResult wir = workItemResults[i] as WorkItemResult; 624 WorkItemResult wir = (WorkItemResult)waitableResults[i].GetWorkItemResult();
559 625
560 wir.GetWorkItem().ReleaseWaitHandle(); 626 wir.GetWorkItem().ReleaseWaitHandle();
561 } 627 }
562 } 628 }
563 629
564
565 #endregion 630 #endregion
566 631
567 #region Private Members 632 #region Private Members
568 633
569 private WorkItemState GetWorkItemState() 634 private WorkItemState GetWorkItemState()
570 { 635 {
571 if (_canceledWorkItemsGroup.IsCanceled) 636 lock (this)
572 { 637 {
573 return WorkItemState.Canceled; 638 if (WorkItemState.Completed == _workItemState)
574 } 639 {
575 return _workItemState; 640 return _workItemState;
641 }
642
643 long nowTicks = DateTime.UtcNow.Ticks;
576 644
645 if (WorkItemState.Canceled != _workItemState && nowTicks > _expirationTime)
646 {
647 _workItemState = WorkItemState.Canceled;
648 }
649
650 if (WorkItemState.InProgress == _workItemState)
651 {
652 return _workItemState;
653 }
654
655 if (CanceledSmartThreadPool.IsCanceled || CanceledWorkItemsGroup.IsCanceled)
656 {
657 return WorkItemState.Canceled;
658 }
659
660 return _workItemState;
661 }
577 } 662 }
663
664
578 /// <summary> 665 /// <summary>
579 /// Sets the work item's state 666 /// Sets the work item's state
580 /// </summary> 667 /// </summary>
581 /// <param name="workItemState">The state to set the work item to</param> 668 /// <param name="workItemState">The state to set the work item to</param>
582 private void SetWorkItemState(WorkItemState workItemState) 669 private void SetWorkItemState(WorkItemState workItemState)
583 { 670 {
584 lock(this) 671 lock (this)
585 { 672 {
586 _workItemState = workItemState; 673 if (IsValidStatesTransition(_workItemState, workItemState))
674 {
675 _workItemState = workItemState;
676 }
587 } 677 }
588 } 678 }
589 679
@@ -594,7 +684,7 @@ namespace Amib.Threading.Internal
594 private void SignalComplete(bool canceled) 684 private void SignalComplete(bool canceled)
595 { 685 {
596 SetWorkItemState(canceled ? WorkItemState.Canceled : WorkItemState.Completed); 686 SetWorkItemState(canceled ? WorkItemState.Canceled : WorkItemState.Completed);
597 lock(this) 687 lock (this)
598 { 688 {
599 // If someone is waiting then signal. 689 // If someone is waiting then signal.
600 if (null != _workItemCompleted) 690 if (null != _workItemCompleted)
@@ -606,40 +696,83 @@ namespace Amib.Threading.Internal
606 696
607 internal void WorkItemIsQueued() 697 internal void WorkItemIsQueued()
608 { 698 {
609 _queuedTime = DateTime.Now; 699 _waitingOnQueueStopwatch.Start();
610 } 700 }
611 701
612 #endregion 702 #endregion
613 703
614 #region Members exposed by WorkItemResult 704 #region Members exposed by WorkItemResult
615 705
616 /// <summary> 706 /// <summary>
617 /// Cancel the work item if it didn't start running yet. 707 /// Cancel the work item if it didn't start running yet.
618 /// </summary> 708 /// </summary>
619 /// <returns>Returns true on success or false if the work item is in progress or already completed</returns> 709 /// <returns>Returns true on success or false if the work item is in progress or already completed</returns>
620 private bool Cancel() 710 private bool Cancel(bool abortExecution)
621 { 711 {
622 lock(this) 712#if (_WINDOWS_CE)
713 if(abortExecution)
714 {
715 throw new ArgumentOutOfRangeException("abortExecution", "WindowsCE doesn't support this feature");
716 }
717#endif
718 bool success = false;
719 bool signalComplete = false;
720
721 lock (this)
623 { 722 {
624 switch(GetWorkItemState()) 723 switch (GetWorkItemState())
625 { 724 {
626 case WorkItemState.Canceled: 725 case WorkItemState.Canceled:
627 //Debug.WriteLine("Work item already canceled"); 726 //Debug.WriteLine("Work item already canceled");
628 return true; 727 if (abortExecution)
728 {
729 Thread executionThread = Interlocked.CompareExchange(ref _executingThread, null, _executingThread);
730 if (null != executionThread)
731 {
732 executionThread.Abort(); // "Cancel"
733 // No need to signalComplete, because we already cancelled this work item
734 // so it already signaled its completion.
735 //signalComplete = true;
736 }
737 }
738 success = true;
739 break;
629 case WorkItemState.Completed: 740 case WorkItemState.Completed:
630 case WorkItemState.InProgress:
631 //Debug.WriteLine("Work item cannot be canceled"); 741 //Debug.WriteLine("Work item cannot be canceled");
632 return false; 742 break;
743 case WorkItemState.InProgress:
744 if (abortExecution)
745 {
746 Thread executionThread = Interlocked.CompareExchange(ref _executingThread, null, _executingThread);
747 if (null != executionThread)
748 {
749 executionThread.Abort(); // "Cancel"
750 success = true;
751 signalComplete = true;
752 }
753 }
754 else
755 {
756 success = false;
757 signalComplete = false;
758 }
759 break;
633 case WorkItemState.InQueue: 760 case WorkItemState.InQueue:
634 // Signal to the wait for completion that the work 761 // Signal to the wait for completion that the work
635 // item has been completed (canceled). There is no 762 // item has been completed (canceled). There is no
636 // reason to wait for it to get out of the queue 763 // reason to wait for it to get out of the queue
637 SignalComplete(true); 764 signalComplete = true;
638 //Debug.WriteLine("Work item canceled"); 765 //Debug.WriteLine("Work item canceled");
639 return true; 766 success = true;
767 break;
768 }
769
770 if (signalComplete)
771 {
772 SignalComplete(true);
640 } 773 }
641 } 774 }
642 return false; 775 return success;
643 } 776 }
644 777
645 /// <summary> 778 /// <summary>
@@ -653,7 +786,7 @@ namespace Amib.Threading.Internal
653 bool exitContext, 786 bool exitContext,
654 WaitHandle cancelWaitHandle) 787 WaitHandle cancelWaitHandle)
655 { 788 {
656 Exception e = null; 789 Exception e;
657 object result = GetResult(millisecondsTimeout, exitContext, cancelWaitHandle, out e); 790 object result = GetResult(millisecondsTimeout, exitContext, cancelWaitHandle, out e);
658 if (null != e) 791 if (null != e)
659 { 792 {
@@ -694,7 +827,7 @@ namespace Amib.Threading.Internal
694 { 827 {
695 WaitHandle wh = GetWaitHandle(); 828 WaitHandle wh = GetWaitHandle();
696 829
697 bool timeout = !wh.WaitOne(millisecondsTimeout, exitContext); 830 bool timeout = !STPEventWaitHandle.WaitOne(wh, millisecondsTimeout, exitContext);
698 831
699 ReleaseWaitHandle(); 832 ReleaseWaitHandle();
700 833
@@ -706,10 +839,10 @@ namespace Amib.Threading.Internal
706 else 839 else
707 { 840 {
708 WaitHandle wh = GetWaitHandle(); 841 WaitHandle wh = GetWaitHandle();
709 int result = WaitHandle.WaitAny(new WaitHandle[] { wh, cancelWaitHandle }); 842 int result = STPEventWaitHandle.WaitAny(new WaitHandle[] { wh, cancelWaitHandle });
710 ReleaseWaitHandle(); 843 ReleaseWaitHandle();
711 844
712 switch(result) 845 switch (result)
713 { 846 {
714 case 0: 847 case 0:
715 // The work item signaled 848 // The work item signaled
@@ -717,7 +850,7 @@ namespace Amib.Threading.Internal
717 // work item (not the get result) 850 // work item (not the get result)
718 break; 851 break;
719 case 1: 852 case 1:
720 case WaitHandle.WaitTimeout: 853 case STPEventWaitHandle.WaitTimeout:
721 throw new WorkItemTimeoutException("Work item timeout"); 854 throw new WorkItemTimeoutException("Work item timeout");
722 default: 855 default:
723 Debug.Assert(false); 856 Debug.Assert(false);
@@ -745,11 +878,11 @@ namespace Amib.Threading.Internal
745 /// </summary> 878 /// </summary>
746 private WaitHandle GetWaitHandle() 879 private WaitHandle GetWaitHandle()
747 { 880 {
748 lock(this) 881 lock (this)
749 { 882 {
750 if (null == _workItemCompleted) 883 if (null == _workItemCompleted)
751 { 884 {
752 _workItemCompleted = new ManualResetEvent(IsCompleted); 885 _workItemCompleted = EventWaitHandleFactory.CreateManualResetEvent(IsCompleted);
753 } 886 }
754 ++_workItemCompletedRefCount; 887 ++_workItemCompletedRefCount;
755 } 888 }
@@ -758,7 +891,7 @@ namespace Amib.Threading.Internal
758 891
759 private void ReleaseWaitHandle() 892 private void ReleaseWaitHandle()
760 { 893 {
761 lock(this) 894 lock (this)
762 { 895 {
763 if (null != _workItemCompleted) 896 if (null != _workItemCompleted)
764 { 897 {
@@ -779,10 +912,10 @@ namespace Amib.Threading.Internal
779 { 912 {
780 get 913 get
781 { 914 {
782 lock(this) 915 lock (this)
783 { 916 {
784 WorkItemState workItemState = GetWorkItemState(); 917 WorkItemState workItemState = GetWorkItemState();
785 return ((workItemState == WorkItemState.Completed) || 918 return ((workItemState == WorkItemState.Completed) ||
786 (workItemState == WorkItemState.Canceled)); 919 (workItemState == WorkItemState.Canceled));
787 } 920 }
788 } 921 }
@@ -795,7 +928,7 @@ namespace Amib.Threading.Internal
795 { 928 {
796 get 929 get
797 { 930 {
798 lock(this) 931 lock (this)
799 { 932 {
800 return (GetWorkItemState() == WorkItemState.Canceled); 933 return (GetWorkItemState() == WorkItemState.Canceled);
801 } 934 }
@@ -843,172 +976,6 @@ namespace Amib.Threading.Internal
843 } 976 }
844 } 977 }
845 978
846
847 #region WorkItemResult class
848
849 private class WorkItemResult : IWorkItemResult, IInternalWorkItemResult
850 {
851 /// <summary>
852 /// A back reference to the work item
853 /// </summary>
854 private WorkItem _workItem;
855
856 public WorkItemResult(WorkItem workItem)
857 {
858 _workItem = workItem;
859 }
860
861 internal WorkItem GetWorkItem()
862 {
863 return _workItem;
864 }
865
866 #region IWorkItemResult Members
867
868 public bool IsCompleted
869 {
870 get
871 {
872 return _workItem.IsCompleted;
873 }
874 }
875
876 public void Abort()
877 {
878 _workItem.Abort();
879 }
880
881 public bool IsCanceled
882 {
883 get
884 {
885 return _workItem.IsCanceled;
886 }
887 }
888
889 public object GetResult()
890 {
891 return _workItem.GetResult(Timeout.Infinite, true, null);
892 }
893
894 public object GetResult(int millisecondsTimeout, bool exitContext)
895 {
896 return _workItem.GetResult(millisecondsTimeout, exitContext, null);
897 }
898
899 public object GetResult(TimeSpan timeout, bool exitContext)
900 {
901 return _workItem.GetResult((int)timeout.TotalMilliseconds, exitContext, null);
902 }
903
904 public object GetResult(int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle)
905 {
906 return _workItem.GetResult(millisecondsTimeout, exitContext, cancelWaitHandle);
907 }
908
909 public object GetResult(TimeSpan timeout, bool exitContext, WaitHandle cancelWaitHandle)
910 {
911 return _workItem.GetResult((int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle);
912 }
913
914 public object GetResult(out Exception e)
915 {
916 return _workItem.GetResult(Timeout.Infinite, true, null, out e);
917 }
918
919 public object GetResult(int millisecondsTimeout, bool exitContext, out Exception e)
920 {
921 return _workItem.GetResult(millisecondsTimeout, exitContext, null, out e);
922 }
923
924 public object GetResult(TimeSpan timeout, bool exitContext, out Exception e)
925 {
926 return _workItem.GetResult((int)timeout.TotalMilliseconds, exitContext, null, out e);
927 }
928
929 public object GetResult(int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle, out Exception e)
930 {
931 return _workItem.GetResult(millisecondsTimeout, exitContext, cancelWaitHandle, out e);
932 }
933
934 public object GetResult(TimeSpan timeout, bool exitContext, WaitHandle cancelWaitHandle, out Exception e)
935 {
936 return _workItem.GetResult((int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle, out e);
937 }
938
939 public bool Cancel()
940 {
941 return _workItem.Cancel();
942 }
943
944 public object State
945 {
946 get
947 {
948 return _workItem._state;
949 }
950 }
951
952 public WorkItemPriority WorkItemPriority
953 {
954 get
955 {
956 return _workItem._workItemInfo.WorkItemPriority;
957 }
958 }
959
960 /// <summary>
961 /// Return the result, same as GetResult()
962 /// </summary>
963 public object Result
964 {
965 get { return GetResult(); }
966 }
967
968 /// <summary>
969 /// Returns the exception if occured otherwise returns null.
970 /// This value is valid only after the work item completed,
971 /// before that it is always null.
972 /// </summary>
973 public object Exception
974 {
975 get { return _workItem._exception; }
976 }
977
978 #endregion
979
980 #region IInternalWorkItemResult Members
981
982 public event WorkItemStateCallback OnWorkItemStarted
983 {
984 add
985 {
986 _workItem.OnWorkItemStarted += value;
987 }
988 remove
989 {
990 _workItem.OnWorkItemStarted -= value;
991 }
992 }
993
994
995 public event WorkItemStateCallback OnWorkItemCompleted
996 {
997 add
998 {
999 _workItem.OnWorkItemCompleted += value;
1000 }
1001 remove
1002 {
1003 _workItem.OnWorkItemCompleted -= value;
1004 }
1005 }
1006
1007 #endregion
1008 }
1009
1010 #endregion
1011
1012 public void DisposeOfState() 979 public void DisposeOfState()
1013 { 980 {
1014 if (_workItemInfo.DisposeOfStateObjects) 981 if (_workItemInfo.DisposeOfStateObjects)
@@ -1021,15 +988,5 @@ namespace Amib.Threading.Internal
1021 } 988 }
1022 } 989 }
1023 } 990 }
1024
1025 public void Abort()
1026 {
1027 lock (this)
1028 {
1029 if(currentThread != null)
1030 currentThread.Abort();
1031 }
1032 }
1033 } 991 }
1034 #endregion
1035} 992}