aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--ThirdParty/SmartThreadPool/WorkItemsGroup.cs759
1 files changed, 304 insertions, 455 deletions
diff --git a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
index 01ac8dd..d9d34ac 100644
--- a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
+++ b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
@@ -1,6 +1,3 @@
1// Ami Bar
2// amibar@gmail.com
3
4using System; 1using System;
5using System.Threading; 2using System.Threading;
6using System.Runtime.CompilerServices; 3using System.Runtime.CompilerServices;
@@ -8,505 +5,357 @@ using System.Diagnostics;
8 5
9namespace Amib.Threading.Internal 6namespace Amib.Threading.Internal
10{ 7{
11 #region WorkItemsGroup class
12
13 /// <summary>
14 /// Summary description for WorkItemsGroup.
15 /// </summary>
16 public class WorkItemsGroup : IWorkItemsGroup
17 {
18 #region Private members
19
20 private object _lock = new object();
21 /// <summary>
22 /// Contains the name of this instance of SmartThreadPool.
23 /// Can be changed by the user.
24 /// </summary>
25 private string _name = "WorkItemsGroup";
26
27 /// <summary>
28 /// A reference to the SmartThreadPool instance that created this
29 /// WorkItemsGroup.
30 /// </summary>
31 private SmartThreadPool _stp;
32
33 /// <summary>
34 /// The OnIdle event
35 /// </summary>
36 private event WorkItemsGroupIdleHandler _onIdle;
37 8
38 /// <summary> 9 #region WorkItemsGroup class
39 /// Defines how many work items of this WorkItemsGroup can run at once.
40 /// </summary>
41 private int _concurrency;
42 10
43 /// <summary> 11 /// <summary>
44 /// Priority queue to hold work items before they are passed 12 /// Summary description for WorkItemsGroup.
45 /// to the SmartThreadPool. 13 /// </summary>
46 /// </summary> 14 public class WorkItemsGroup : WorkItemsGroupBase
47 private PriorityQueue _workItemsQueue; 15 {
16 #region Private members
48 17
49 /// <summary> 18 private readonly object _lock = new object();
50 /// Indicate how many work items are waiting in the SmartThreadPool
51 /// queue.
52 /// This value is used to apply the concurrency.
53 /// </summary>
54 private int _workItemsInStpQueue;
55 19
56 /// <summary> 20 /// <summary>
57 /// Indicate how many work items are currently running in the SmartThreadPool. 21 /// A reference to the SmartThreadPool instance that created this
58 /// This value is used with the Cancel, to calculate if we can send new 22 /// WorkItemsGroup.
59 /// work items to the STP. 23 /// </summary>
60 /// </summary> 24 private readonly SmartThreadPool _stp;
61 private int _workItemsExecutingInStp = 0;
62 25
63 /// <summary> 26 /// <summary>
64 /// WorkItemsGroup start information 27 /// The OnIdle event
65 /// </summary> 28 /// </summary>
66 private WIGStartInfo _workItemsGroupStartInfo; 29 private event WorkItemsGroupIdleHandler _onIdle;
67 30
68 /// <summary> 31 /// <summary>
69 /// Signaled when all of the WorkItemsGroup's work item completed. 32 /// A flag to indicate if the Work Items Group is now suspended.
70 /// </summary> 33 /// </summary>
71 private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); 34 private bool _isSuspended;
72 35
73 /// <summary> 36 /// <summary>
74 /// A common object for all the work items that this work items group 37 /// Defines how many work items of this WorkItemsGroup can run at once.
75 /// generate so we can mark them to cancel in O(1) 38 /// </summary>
76 /// </summary> 39 private int _concurrency;
77 private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); 40
78 41 /// <summary>
79 #endregion 42 /// Priority queue to hold work items before they are passed
80 43 /// to the SmartThreadPool.
81 #region Construction 44 /// </summary>
82 45 private readonly PriorityQueue _workItemsQueue;
83 public WorkItemsGroup( 46
84 SmartThreadPool stp, 47 /// <summary>
85 int concurrency, 48 /// Indicate how many work items are waiting in the SmartThreadPool
86 WIGStartInfo wigStartInfo) 49 /// queue.
50 /// This value is used to apply the concurrency.
51 /// </summary>
52 private int _workItemsInStpQueue;
53
54 /// <summary>
55 /// Indicate how many work items are currently running in the SmartThreadPool.
56 /// This value is used with the Cancel, to calculate if we can send new
57 /// work items to the STP.
58 /// </summary>
59 private int _workItemsExecutingInStp = 0;
60
61 /// <summary>
62 /// WorkItemsGroup start information
63 /// </summary>
64 private readonly WIGStartInfo _workItemsGroupStartInfo;
65
66 /// <summary>
67 /// Signaled when all of the WorkItemsGroup's work item completed.
68 /// </summary>
69 //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
70 private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true);
71
72 /// <summary>
73 /// A common object for all the work items that this work items group
74 /// generate so we can mark them to cancel in O(1)
75 /// </summary>
76 private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
77
78 #endregion
79
80 #region Construction
81
82 public WorkItemsGroup(
83 SmartThreadPool stp,
84 int concurrency,
85 WIGStartInfo wigStartInfo)
86 {
87 if (concurrency <= 0)
88 {
89 throw new ArgumentOutOfRangeException(
90 "concurrency",
91#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
92 concurrency,
93#endif
94 "concurrency must be greater than zero");
95 }
96 _stp = stp;
97 _concurrency = concurrency;
98 _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly();
99 _workItemsQueue = new PriorityQueue();
100 Name = "WorkItemsGroup";
101
102 // The _workItemsInStpQueue gets the number of currently executing work items,
103 // because once a work item is executing, it cannot be cancelled.
104 _workItemsInStpQueue = _workItemsExecutingInStp;
105
106 _isSuspended = _workItemsGroupStartInfo.StartSuspended;
107 }
108
109 #endregion
110
111 #region WorkItemsGroupBase Overrides
112
113 public override int Concurrency
87 { 114 {
88 if (concurrency <= 0) 115 get { return _concurrency; }
89 {
90 throw new ArgumentOutOfRangeException("concurrency", concurrency, "concurrency must be greater than zero");
91 }
92 _stp = stp;
93 _concurrency = concurrency;
94 _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo);
95 _workItemsQueue = new PriorityQueue();
96
97 // The _workItemsInStpQueue gets the number of currently executing work items,
98 // because once a work item is executing, it cannot be cancelled.
99 _workItemsInStpQueue = _workItemsExecutingInStp;
100 }
101
102 #endregion
103
104 #region IWorkItemsGroup implementation
105
106 /// <summary>
107 /// Get/Set the name of the SmartThreadPool instance
108 /// </summary>
109 public string Name
110 {
111 get
112 {
113 return _name;
114 }
115
116 set 116 set
117 { 117 {
118 _name = value; 118 Debug.Assert(value > 0);
119 }
120 }
121
122 /// <summary>
123 /// Queue a work item
124 /// </summary>
125 /// <param name="callback">A callback to execute</param>
126 /// <returns>Returns a work item result</returns>
127 public IWorkItemResult QueueWorkItem(WorkItemCallback callback)
128 {
129 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback);
130 EnqueueToSTPNextWorkItem(workItem);
131 return workItem.GetWorkItemResult();
132 }
133 119
134 /// <summary> 120 int diff = value - _concurrency;
135 /// Queue a work item 121 _concurrency = value;
136 /// </summary> 122 if (diff > 0)
137 /// <param name="callback">A callback to execute</param> 123 {
138 /// <param name="workItemPriority">The priority of the work item</param> 124 EnqueueToSTPNextNWorkItem(diff);
139 /// <returns>Returns a work item result</returns> 125 }
140 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority) 126 }
141 {
142 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, workItemPriority);
143 EnqueueToSTPNextWorkItem(workItem);
144 return workItem.GetWorkItemResult();
145 }
146
147 /// <summary>
148 /// Queue a work item
149 /// </summary>
150 /// <param name="workItemInfo">Work item info</param>
151 /// <param name="callback">A callback to execute</param>
152 /// <returns>Returns a work item result</returns>
153 public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback)
154 {
155 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback);
156 EnqueueToSTPNextWorkItem(workItem);
157 return workItem.GetWorkItemResult();
158 }
159
160 /// <summary>
161 /// Queue a work item
162 /// </summary>
163 /// <param name="callback">A callback to execute</param>
164 /// <param name="state">
165 /// The context object of the work item. Used for passing arguments to the work item.
166 /// </param>
167 /// <returns>Returns a work item result</returns>
168 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state)
169 {
170 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state);
171 EnqueueToSTPNextWorkItem(workItem);
172 return workItem.GetWorkItemResult();
173 }
174
175 /// <summary>
176 /// Queue a work item
177 /// </summary>
178 /// <param name="callback">A callback to execute</param>
179 /// <param name="state">
180 /// The context object of the work item. Used for passing arguments to the work item.
181 /// </param>
182 /// <param name="workItemPriority">The work item priority</param>
183 /// <returns>Returns a work item result</returns>
184 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority)
185 {
186 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, workItemPriority);
187 EnqueueToSTPNextWorkItem(workItem);
188 return workItem.GetWorkItemResult();
189 }
190
191 /// <summary>
192 /// Queue a work item
193 /// </summary>
194 /// <param name="workItemInfo">Work item information</param>
195 /// <param name="callback">A callback to execute</param>
196 /// <param name="state">
197 /// The context object of the work item. Used for passing arguments to the work item.
198 /// </param>
199 /// <returns>Returns a work item result</returns>
200 public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state)
201 {
202 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback, state);
203 EnqueueToSTPNextWorkItem(workItem);
204 return workItem.GetWorkItemResult();
205 }
206
207 /// <summary>
208 /// Queue a work item
209 /// </summary>
210 /// <param name="callback">A callback to execute</param>
211 /// <param name="state">
212 /// The context object of the work item. Used for passing arguments to the work item.
213 /// </param>
214 /// <param name="postExecuteWorkItemCallback">
215 /// A delegate to call after the callback completion
216 /// </param>
217 /// <returns>Returns a work item result</returns>
218 public IWorkItemResult QueueWorkItem(
219 WorkItemCallback callback,
220 object state,
221 PostExecuteWorkItemCallback postExecuteWorkItemCallback)
222 {
223 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback);
224 EnqueueToSTPNextWorkItem(workItem);
225 return workItem.GetWorkItemResult();
226 }
227
228 /// <summary>
229 /// Queue a work item
230 /// </summary>
231 /// <param name="callback">A callback to execute</param>
232 /// <param name="state">
233 /// The context object of the work item. Used for passing arguments to the work item.
234 /// </param>
235 /// <param name="postExecuteWorkItemCallback">
236 /// A delegate to call after the callback completion
237 /// </param>
238 /// <param name="workItemPriority">The work item priority</param>
239 /// <returns>Returns a work item result</returns>
240 public IWorkItemResult QueueWorkItem(
241 WorkItemCallback callback,
242 object state,
243 PostExecuteWorkItemCallback postExecuteWorkItemCallback,
244 WorkItemPriority workItemPriority)
245 {
246 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority);
247 EnqueueToSTPNextWorkItem(workItem);
248 return workItem.GetWorkItemResult();
249 }
250
251 /// <summary>
252 /// Queue a work item
253 /// </summary>
254 /// <param name="callback">A callback to execute</param>
255 /// <param name="state">
256 /// The context object of the work item. Used for passing arguments to the work item.
257 /// </param>
258 /// <param name="postExecuteWorkItemCallback">
259 /// A delegate to call after the callback completion
260 /// </param>
261 /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param>
262 /// <returns>Returns a work item result</returns>
263 public IWorkItemResult QueueWorkItem(
264 WorkItemCallback callback,
265 object state,
266 PostExecuteWorkItemCallback postExecuteWorkItemCallback,
267 CallToPostExecute callToPostExecute)
268 {
269 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute);
270 EnqueueToSTPNextWorkItem(workItem);
271 return workItem.GetWorkItemResult();
272 } 127 }
273 128
274 /// <summary> 129 public override int WaitingCallbacks
275 /// Queue a work item
276 /// </summary>
277 /// <param name="callback">A callback to execute</param>
278 /// <param name="state">
279 /// The context object of the work item. Used for passing arguments to the work item.
280 /// </param>
281 /// <param name="postExecuteWorkItemCallback">
282 /// A delegate to call after the callback completion
283 /// </param>
284 /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param>
285 /// <param name="workItemPriority">The work item priority</param>
286 /// <returns>Returns a work item result</returns>
287 public IWorkItemResult QueueWorkItem(
288 WorkItemCallback callback,
289 object state,
290 PostExecuteWorkItemCallback postExecuteWorkItemCallback,
291 CallToPostExecute callToPostExecute,
292 WorkItemPriority workItemPriority)
293 { 130 {
294 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority); 131 get { return _workItemsQueue.Count; }
295 EnqueueToSTPNextWorkItem(workItem);
296 return workItem.GetWorkItemResult();
297 } 132 }
298 133
299 /// <summary> 134 public override object[] GetStates()
300 /// Wait for the thread pool to be idle
301 /// </summary>
302 public void WaitForIdle()
303 { 135 {
304 WaitForIdle(Timeout.Infinite); 136 lock (_lock)
137 {
138 object[] states = new object[_workItemsQueue.Count];
139 int i = 0;
140 foreach (WorkItem workItem in _workItemsQueue)
141 {
142 states[i] = workItem.GetWorkItemResult().State;
143 ++i;
144 }
145 return states;
146 }
305 } 147 }
306 148
307 /// <summary> 149 /// <summary>
308 /// Wait for the thread pool to be idle 150 /// WorkItemsGroup start information
309 /// </summary> 151 /// </summary>
310 public bool WaitForIdle(TimeSpan timeout) 152 public override WIGStartInfo WIGStartInfo
311 { 153 {
312 return WaitForIdle((int)timeout.TotalMilliseconds); 154 get { return _workItemsGroupStartInfo; }
313 } 155 }
314 156
315 /// <summary> 157 /// <summary>
158 /// Start the Work Items Group if it was started suspended
159 /// </summary>
160 public override void Start()
161 {
162 // If the Work Items Group already started then quit
163 if (!_isSuspended)
164 {
165 return;
166 }
167 _isSuspended = false;
168
169 EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency));
170 }
171
172 public override void Cancel(bool abortExecution)
173 {
174 lock (_lock)
175 {
176 _canceledWorkItemsGroup.IsCanceled = true;
177 _workItemsQueue.Clear();
178 _workItemsInStpQueue = 0;
179 _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
180 }
181
182 if (abortExecution)
183 {
184 _stp.CancelAbortWorkItemsGroup(this);
185 }
186 }
187
188 /// <summary>
316 /// Wait for the thread pool to be idle 189 /// Wait for the thread pool to be idle
317 /// </summary> 190 /// </summary>
318 public bool WaitForIdle(int millisecondsTimeout) 191 public override bool WaitForIdle(int millisecondsTimeout)
319 { 192 {
320 _stp.ValidateWorkItemsGroupWaitForIdle(this); 193 SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this);
321 return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false); 194 return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
322 } 195 }
323 196
324 public int WaitingCallbacks 197 public override event WorkItemsGroupIdleHandler OnIdle
325 { 198 {
326 get 199 add { _onIdle += value; }
327 { 200 remove { _onIdle -= value; }
328 return _workItemsQueue.Count; 201 }
329 }
330 }
331 202
332 public event WorkItemsGroupIdleHandler OnIdle 203 #endregion
333 {
334 add
335 {
336 _onIdle += value;
337 }
338 remove
339 {
340 _onIdle -= value;
341 }
342 }
343 204
344 public void Cancel() 205 #region Private methods
345 {
346 lock(_lock)
347 {
348 _canceledWorkItemsGroup.IsCanceled = true;
349 _workItemsQueue.Clear();
350 _workItemsInStpQueue = 0;
351 _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
352 }
353 }
354 206
355 public void Start() 207 private void RegisterToWorkItemCompletion(IWorkItemResult wir)
356 { 208 {
357 lock (this) 209 IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir;
358 { 210 iwir.OnWorkItemStarted += OnWorkItemStartedCallback;
359 if (!_workItemsGroupStartInfo.StartSuspended) 211 iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback;
360 { 212 }
361 return;
362 }
363 _workItemsGroupStartInfo.StartSuspended = false;
364 }
365
366 for(int i = 0; i < _concurrency; ++i)
367 {
368 EnqueueToSTPNextWorkItem(null, false);
369 }
370 }
371
372 #endregion
373 213
374 #region Private methods 214 public void OnSTPIsStarting()
375 215 {
376 private void RegisterToWorkItemCompletion(IWorkItemResult wir) 216 if (_isSuspended)
377 {
378 IInternalWorkItemResult iwir = wir as IInternalWorkItemResult;
379 iwir.OnWorkItemStarted += new WorkItemStateCallback(OnWorkItemStartedCallback);
380 iwir.OnWorkItemCompleted += new WorkItemStateCallback(OnWorkItemCompletedCallback);
381 }
382
383 public void OnSTPIsStarting()
384 {
385 lock (this)
386 {
387 if (_workItemsGroupStartInfo.StartSuspended)
388 {
389 return;
390 }
391 }
392
393 for(int i = 0; i < _concurrency; ++i)
394 {
395 EnqueueToSTPNextWorkItem(null, false);
396 }
397 }
398
399 private object FireOnIdle(object state)
400 {
401 FireOnIdleImpl(_onIdle);
402 return null;
403 }
404
405 [MethodImpl(MethodImplOptions.NoInlining)]
406 private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
407 {
408 if(null == onIdle)
409 { 217 {
410 return; 218 return;
411 } 219 }
220
221 EnqueueToSTPNextNWorkItem(_concurrency);
222 }
412 223
413 Delegate[] delegates = onIdle.GetInvocationList(); 224 public void EnqueueToSTPNextNWorkItem(int count)
414 foreach(WorkItemsGroupIdleHandler eh in delegates)
415 {
416 try
417 {
418 eh(this);
419 }
420 // Ignore exceptions
421 catch{}
422 }
423 }
424
425 private void OnWorkItemStartedCallback(WorkItem workItem)
426 { 225 {
427 lock(_lock) 226 for (int i = 0; i < count; ++i)
428 { 227 {
429 ++_workItemsExecutingInStp; 228 EnqueueToSTPNextWorkItem(null, false);
430 } 229 }
431 } 230 }
432 231
433 private void OnWorkItemCompletedCallback(WorkItem workItem) 232 private object FireOnIdle(object state)
434 { 233 {
435 EnqueueToSTPNextWorkItem(null, true); 234 FireOnIdleImpl(_onIdle);
436 } 235 return null;
437 236 }
438 private void EnqueueToSTPNextWorkItem(WorkItem workItem) 237
238 [MethodImpl(MethodImplOptions.NoInlining)]
239 private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
240 {
241 if(null == onIdle)
242 {
243 return;
244 }
245
246 Delegate[] delegates = onIdle.GetInvocationList();
247 foreach(WorkItemsGroupIdleHandler eh in delegates)
248 {
249 try
250 {
251 eh(this);
252 }
253 catch { } // Suppress exceptions
254 }
255 }
256
257 private void OnWorkItemStartedCallback(WorkItem workItem)
258 {
259 lock(_lock)
260 {
261 ++_workItemsExecutingInStp;
262 }
263 }
264
265 private void OnWorkItemCompletedCallback(WorkItem workItem)
266 {
267 EnqueueToSTPNextWorkItem(null, true);
268 }
269
270 internal override void Enqueue(WorkItem workItem)
439 { 271 {
440 EnqueueToSTPNextWorkItem(workItem, false); 272 EnqueueToSTPNextWorkItem(workItem);
441 } 273 }
442 274
443 private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue) 275 private void EnqueueToSTPNextWorkItem(WorkItem workItem)
444 { 276 {
445 lock(_lock) 277 EnqueueToSTPNextWorkItem(workItem, false);
446 { 278 }
447 // Got here from OnWorkItemCompletedCallback() 279
448 if (decrementWorkItemsInStpQueue) 280 private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
449 { 281 {
450 --_workItemsInStpQueue; 282 lock(_lock)
451 283 {
452 if(_workItemsInStpQueue < 0) 284 // Got here from OnWorkItemCompletedCallback()
453 { 285 if (decrementWorkItemsInStpQueue)
454 _workItemsInStpQueue = 0; 286 {
455 } 287 --_workItemsInStpQueue;
456 288
457 --_workItemsExecutingInStp; 289 if(_workItemsInStpQueue < 0)
458 290 {
459 if(_workItemsExecutingInStp < 0) 291 _workItemsInStpQueue = 0;
460 { 292 }
461 _workItemsExecutingInStp = 0; 293
462 } 294 --_workItemsExecutingInStp;
463 } 295
464 296 if(_workItemsExecutingInStp < 0)
465 // If the work item is not null then enqueue it 297 {
466 if (null != workItem) 298 _workItemsExecutingInStp = 0;
467 { 299 }
468 workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup; 300 }
469 301
470 RegisterToWorkItemCompletion(workItem.GetWorkItemResult()); 302 // If the work item is not null then enqueue it
471 _workItemsQueue.Enqueue(workItem); 303 if (null != workItem)
472 //_stp.IncrementWorkItemsCount(); 304 {
473 305 workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
474 if ((1 == _workItemsQueue.Count) && 306
475 (0 == _workItemsInStpQueue)) 307 RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
476 { 308 _workItemsQueue.Enqueue(workItem);
477 _stp.RegisterWorkItemsGroup(this); 309 //_stp.IncrementWorkItemsCount();
478 Trace.WriteLine("WorkItemsGroup " + Name + " is NOT idle"); 310
311 if ((1 == _workItemsQueue.Count) &&
312 (0 == _workItemsInStpQueue))
313 {
314 _stp.RegisterWorkItemsGroup(this);
315 IsIdle = false;
479 _isIdleWaitHandle.Reset(); 316 _isIdleWaitHandle.Reset();
480 } 317 }
481 } 318 }
482 319
483 // If the work items queue of the group is empty than quit 320 // If the work items queue of the group is empty than quit
484 if (0 == _workItemsQueue.Count) 321 if (0 == _workItemsQueue.Count)
485 { 322 {
486 if (0 == _workItemsInStpQueue) 323 if (0 == _workItemsInStpQueue)
487 { 324 {
488 _stp.UnregisterWorkItemsGroup(this); 325 _stp.UnregisterWorkItemsGroup(this);
489 Trace.WriteLine("WorkItemsGroup " + Name + " is idle"); 326 IsIdle = true;
490 _isIdleWaitHandle.Set(); 327 _isIdleWaitHandle.Set();
491 _stp.QueueWorkItem(new WorkItemCallback(this.FireOnIdle)); 328 if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0)
492 } 329 {
493 return; 330 _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle));
494 } 331 }
495 332 }
496 if (!_workItemsGroupStartInfo.StartSuspended) 333 return;
497 { 334 }
498 if (_workItemsInStpQueue < _concurrency) 335
499 { 336 if (!_isSuspended)
500 WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem; 337 {
501 _stp.Enqueue(nextWorkItem, true); 338 if (_workItemsInStpQueue < _concurrency)
502 ++_workItemsInStpQueue; 339 {
503 } 340 WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
504 } 341 try
505 } 342 {
506 } 343 _stp.Enqueue(nextWorkItem);
507 344 }
508 #endregion 345 catch (ObjectDisposedException e)
346 {
347 e.GetHashCode();
348 // The STP has been shutdown
349 }
350
351 ++_workItemsInStpQueue;
352 }
353 }
354 }
355 }
356
357 #endregion
509 } 358 }
510 359
511 #endregion 360 #endregion
512} 361}