aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
diff options
context:
space:
mode:
authorMelanie2013-05-01 21:37:17 +0100
committerMelanie2013-05-01 21:37:17 +0100
commitc6d50cd431796fe81ba815541878e19b47b7bc08 (patch)
tree99689cc6142b821919d48efcfd03e857883c0b5f /ThirdParty/SmartThreadPool/WorkItemsGroup.cs
parentFix the long standing bug of items being delivered to lost and found or trash... (diff)
parentAdd in-code exaplanation for the change in cancellation signalling in STP 2.2... (diff)
downloadopensim-SC_OLD-c6d50cd431796fe81ba815541878e19b47b7bc08.zip
opensim-SC_OLD-c6d50cd431796fe81ba815541878e19b47b7bc08.tar.gz
opensim-SC_OLD-c6d50cd431796fe81ba815541878e19b47b7bc08.tar.bz2
opensim-SC_OLD-c6d50cd431796fe81ba815541878e19b47b7bc08.tar.xz
Merge branch 'master' of melanie@opensimulator.org:/var/git/opensim
Diffstat (limited to 'ThirdParty/SmartThreadPool/WorkItemsGroup.cs')
-rw-r--r--ThirdParty/SmartThreadPool/WorkItemsGroup.cs873
1 files changed, 361 insertions, 512 deletions
diff --git a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
index 01ac8dd..67dcbdd 100644
--- a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
+++ b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
@@ -1,512 +1,361 @@
1// Ami Bar 1using System;
2// amibar@gmail.com 2using System.Threading;
3 3using System.Runtime.CompilerServices;
4using System; 4using System.Diagnostics;
5using System.Threading; 5
6using System.Runtime.CompilerServices; 6namespace Amib.Threading.Internal
7using System.Diagnostics; 7{
8 8
9namespace Amib.Threading.Internal 9 #region WorkItemsGroup class
10{ 10
11 #region WorkItemsGroup class 11 /// <summary>
12 12 /// Summary description for WorkItemsGroup.
13 /// <summary> 13 /// </summary>
14 /// Summary description for WorkItemsGroup. 14 public class WorkItemsGroup : WorkItemsGroupBase
15 /// </summary> 15 {
16 public class WorkItemsGroup : IWorkItemsGroup 16 #region Private members
17 { 17
18 #region Private members 18 private readonly object _lock = new object();
19 19
20 private object _lock = new object(); 20 /// <summary>
21 /// <summary> 21 /// A reference to the SmartThreadPool instance that created this
22 /// Contains the name of this instance of SmartThreadPool. 22 /// WorkItemsGroup.
23 /// Can be changed by the user. 23 /// </summary>
24 /// </summary> 24 private readonly SmartThreadPool _stp;
25 private string _name = "WorkItemsGroup"; 25
26 26 /// <summary>
27 /// <summary> 27 /// The OnIdle event
28 /// A reference to the SmartThreadPool instance that created this 28 /// </summary>
29 /// WorkItemsGroup. 29 private event WorkItemsGroupIdleHandler _onIdle;
30 /// </summary> 30
31 private SmartThreadPool _stp; 31 /// <summary>
32 32 /// A flag to indicate if the Work Items Group is now suspended.
33 /// <summary> 33 /// </summary>
34 /// The OnIdle event 34 private bool _isSuspended;
35 /// </summary> 35
36 private event WorkItemsGroupIdleHandler _onIdle; 36 /// <summary>
37 37 /// Defines how many work items of this WorkItemsGroup can run at once.
38 /// <summary> 38 /// </summary>
39 /// Defines how many work items of this WorkItemsGroup can run at once. 39 private int _concurrency;
40 /// </summary> 40
41 private int _concurrency; 41 /// <summary>
42 42 /// Priority queue to hold work items before they are passed
43 /// <summary> 43 /// to the SmartThreadPool.
44 /// Priority queue to hold work items before they are passed 44 /// </summary>
45 /// to the SmartThreadPool. 45 private readonly PriorityQueue _workItemsQueue;
46 /// </summary> 46
47 private PriorityQueue _workItemsQueue; 47 /// <summary>
48 48 /// Indicate how many work items are waiting in the SmartThreadPool
49 /// <summary> 49 /// queue.
50 /// Indicate how many work items are waiting in the SmartThreadPool 50 /// This value is used to apply the concurrency.
51 /// queue. 51 /// </summary>
52 /// This value is used to apply the concurrency. 52 private int _workItemsInStpQueue;
53 /// </summary> 53
54 private int _workItemsInStpQueue; 54 /// <summary>
55 55 /// Indicate how many work items are currently running in the SmartThreadPool.
56 /// <summary> 56 /// This value is used with the Cancel, to calculate if we can send new
57 /// Indicate how many work items are currently running in the SmartThreadPool. 57 /// work items to the STP.
58 /// This value is used with the Cancel, to calculate if we can send new 58 /// </summary>
59 /// work items to the STP. 59 private int _workItemsExecutingInStp = 0;
60 /// </summary> 60
61 private int _workItemsExecutingInStp = 0; 61 /// <summary>
62 62 /// WorkItemsGroup start information
63 /// <summary> 63 /// </summary>
64 /// WorkItemsGroup start information 64 private readonly WIGStartInfo _workItemsGroupStartInfo;
65 /// </summary> 65
66 private WIGStartInfo _workItemsGroupStartInfo; 66 /// <summary>
67 67 /// Signaled when all of the WorkItemsGroup's work item completed.
68 /// <summary> 68 /// </summary>
69 /// Signaled when all of the WorkItemsGroup's work item completed. 69 //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
70 /// </summary> 70 private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true);
71 private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); 71
72 72 /// <summary>
73 /// <summary> 73 /// A common object for all the work items that this work items group
74 /// A common object for all the work items that this work items group 74 /// generate so we can mark them to cancel in O(1)
75 /// generate so we can mark them to cancel in O(1) 75 /// </summary>
76 /// </summary> 76 private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
77 private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); 77
78 78 #endregion
79 #endregion 79
80 80 #region Construction
81 #region Construction 81
82 82 public WorkItemsGroup(
83 public WorkItemsGroup( 83 SmartThreadPool stp,
84 SmartThreadPool stp, 84 int concurrency,
85 int concurrency, 85 WIGStartInfo wigStartInfo)
86 WIGStartInfo wigStartInfo) 86 {
87 { 87 if (concurrency <= 0)
88 if (concurrency <= 0) 88 {
89 { 89 throw new ArgumentOutOfRangeException(
90 throw new ArgumentOutOfRangeException("concurrency", concurrency, "concurrency must be greater than zero"); 90 "concurrency",
91 } 91#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
92 _stp = stp; 92 concurrency,
93 _concurrency = concurrency; 93#endif
94 _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo); 94 "concurrency must be greater than zero");
95 _workItemsQueue = new PriorityQueue(); 95 }
96 96 _stp = stp;
97 // The _workItemsInStpQueue gets the number of currently executing work items, 97 _concurrency = concurrency;
98 // because once a work item is executing, it cannot be cancelled. 98 _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly();
99 _workItemsInStpQueue = _workItemsExecutingInStp; 99 _workItemsQueue = new PriorityQueue();
100 } 100 Name = "WorkItemsGroup";
101 101
102 #endregion 102 // The _workItemsInStpQueue gets the number of currently executing work items,
103 103 // because once a work item is executing, it cannot be cancelled.
104 #region IWorkItemsGroup implementation 104 _workItemsInStpQueue = _workItemsExecutingInStp;
105 105
106 /// <summary> 106 _isSuspended = _workItemsGroupStartInfo.StartSuspended;
107 /// Get/Set the name of the SmartThreadPool instance 107 }
108 /// </summary> 108
109 public string Name 109 #endregion
110 { 110
111 get 111 #region WorkItemsGroupBase Overrides
112 { 112
113 return _name; 113 public override int Concurrency
114 } 114 {
115 115 get { return _concurrency; }
116 set 116 set
117 { 117 {
118 _name = value; 118 Debug.Assert(value > 0);
119 } 119
120 } 120 int diff = value - _concurrency;
121 121 _concurrency = value;
122 /// <summary> 122 if (diff > 0)
123 /// Queue a work item 123 {
124 /// </summary> 124 EnqueueToSTPNextNWorkItem(diff);
125 /// <param name="callback">A callback to execute</param> 125 }
126 /// <returns>Returns a work item result</returns> 126 }
127 public IWorkItemResult QueueWorkItem(WorkItemCallback callback) 127 }
128 { 128
129 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback); 129 public override int WaitingCallbacks
130 EnqueueToSTPNextWorkItem(workItem); 130 {
131 return workItem.GetWorkItemResult(); 131 get { return _workItemsQueue.Count; }
132 } 132 }
133 133
134 /// <summary> 134 public override object[] GetStates()
135 /// Queue a work item 135 {
136 /// </summary> 136 lock (_lock)
137 /// <param name="callback">A callback to execute</param> 137 {
138 /// <param name="workItemPriority">The priority of the work item</param> 138 object[] states = new object[_workItemsQueue.Count];
139 /// <returns>Returns a work item result</returns> 139 int i = 0;
140 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority) 140 foreach (WorkItem workItem in _workItemsQueue)
141 { 141 {
142 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, workItemPriority); 142 states[i] = workItem.GetWorkItemResult().State;
143 EnqueueToSTPNextWorkItem(workItem); 143 ++i;
144 return workItem.GetWorkItemResult(); 144 }
145 } 145 return states;
146 146 }
147 /// <summary> 147 }
148 /// Queue a work item 148
149 /// </summary> 149 /// <summary>
150 /// <param name="workItemInfo">Work item info</param> 150 /// WorkItemsGroup start information
151 /// <param name="callback">A callback to execute</param> 151 /// </summary>
152 /// <returns>Returns a work item result</returns> 152 public override WIGStartInfo WIGStartInfo
153 public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback) 153 {
154 { 154 get { return _workItemsGroupStartInfo; }
155 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback); 155 }
156 EnqueueToSTPNextWorkItem(workItem); 156
157 return workItem.GetWorkItemResult(); 157 /// <summary>
158 } 158 /// Start the Work Items Group if it was started suspended
159 159 /// </summary>
160 /// <summary> 160 public override void Start()
161 /// Queue a work item 161 {
162 /// </summary> 162 // If the Work Items Group already started then quit
163 /// <param name="callback">A callback to execute</param> 163 if (!_isSuspended)
164 /// <param name="state"> 164 {
165 /// The context object of the work item. Used for passing arguments to the work item. 165 return;
166 /// </param> 166 }
167 /// <returns>Returns a work item result</returns> 167 _isSuspended = false;
168 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state) 168
169 { 169 EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency));
170 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state); 170 }
171 EnqueueToSTPNextWorkItem(workItem); 171
172 return workItem.GetWorkItemResult(); 172 public override void Cancel(bool abortExecution)
173 } 173 {
174 174 lock (_lock)
175 /// <summary> 175 {
176 /// Queue a work item 176 _canceledWorkItemsGroup.IsCanceled = true;
177 /// </summary> 177 _workItemsQueue.Clear();
178 /// <param name="callback">A callback to execute</param> 178 _workItemsInStpQueue = 0;
179 /// <param name="state"> 179 _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
180 /// The context object of the work item. Used for passing arguments to the work item. 180 }
181 /// </param> 181
182 /// <param name="workItemPriority">The work item priority</param> 182 if (abortExecution)
183 /// <returns>Returns a work item result</returns> 183 {
184 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority) 184 _stp.CancelAbortWorkItemsGroup(this);
185 { 185 }
186 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, workItemPriority); 186 }
187 EnqueueToSTPNextWorkItem(workItem); 187
188 return workItem.GetWorkItemResult(); 188 /// <summary>
189 } 189 /// Wait for the thread pool to be idle
190 190 /// </summary>
191 /// <summary> 191 public override bool WaitForIdle(int millisecondsTimeout)
192 /// Queue a work item 192 {
193 /// </summary> 193 SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this);
194 /// <param name="workItemInfo">Work item information</param> 194 return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
195 /// <param name="callback">A callback to execute</param> 195 }
196 /// <param name="state"> 196
197 /// The context object of the work item. Used for passing arguments to the work item. 197 public override event WorkItemsGroupIdleHandler OnIdle
198 /// </param> 198 {
199 /// <returns>Returns a work item result</returns> 199 add { _onIdle += value; }
200 public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state) 200 remove { _onIdle -= value; }
201 { 201 }
202 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback, state); 202
203 EnqueueToSTPNextWorkItem(workItem); 203 #endregion
204 return workItem.GetWorkItemResult(); 204
205 } 205 #region Private methods
206 206
207 /// <summary> 207 private void RegisterToWorkItemCompletion(IWorkItemResult wir)
208 /// Queue a work item 208 {
209 /// </summary> 209 IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir;
210 /// <param name="callback">A callback to execute</param> 210 iwir.OnWorkItemStarted += OnWorkItemStartedCallback;
211 /// <param name="state"> 211 iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback;
212 /// The context object of the work item. Used for passing arguments to the work item. 212 }
213 /// </param> 213
214 /// <param name="postExecuteWorkItemCallback"> 214 public void OnSTPIsStarting()
215 /// A delegate to call after the callback completion 215 {
216 /// </param> 216 if (_isSuspended)
217 /// <returns>Returns a work item result</returns> 217 {
218 public IWorkItemResult QueueWorkItem( 218 return;
219 WorkItemCallback callback, 219 }
220 object state, 220
221 PostExecuteWorkItemCallback postExecuteWorkItemCallback) 221 EnqueueToSTPNextNWorkItem(_concurrency);
222 { 222 }
223 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback); 223
224 EnqueueToSTPNextWorkItem(workItem); 224 public void EnqueueToSTPNextNWorkItem(int count)
225 return workItem.GetWorkItemResult(); 225 {
226 } 226 for (int i = 0; i < count; ++i)
227 227 {
228 /// <summary> 228 EnqueueToSTPNextWorkItem(null, false);
229 /// Queue a work item 229 }
230 /// </summary> 230 }
231 /// <param name="callback">A callback to execute</param> 231
232 /// <param name="state"> 232 private object FireOnIdle(object state)
233 /// The context object of the work item. Used for passing arguments to the work item. 233 {
234 /// </param> 234 FireOnIdleImpl(_onIdle);
235 /// <param name="postExecuteWorkItemCallback"> 235 return null;
236 /// A delegate to call after the callback completion 236 }
237 /// </param> 237
238 /// <param name="workItemPriority">The work item priority</param> 238 [MethodImpl(MethodImplOptions.NoInlining)]
239 /// <returns>Returns a work item result</returns> 239 private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
240 public IWorkItemResult QueueWorkItem( 240 {
241 WorkItemCallback callback, 241 if(null == onIdle)
242 object state, 242 {
243 PostExecuteWorkItemCallback postExecuteWorkItemCallback, 243 return;
244 WorkItemPriority workItemPriority) 244 }
245 { 245
246 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority); 246 Delegate[] delegates = onIdle.GetInvocationList();
247 EnqueueToSTPNextWorkItem(workItem); 247 foreach(WorkItemsGroupIdleHandler eh in delegates)
248 return workItem.GetWorkItemResult(); 248 {
249 } 249 try
250 250 {
251 /// <summary> 251 eh(this);
252 /// Queue a work item 252 }
253 /// </summary> 253 catch { } // Suppress exceptions
254 /// <param name="callback">A callback to execute</param> 254 }
255 /// <param name="state"> 255 }
256 /// The context object of the work item. Used for passing arguments to the work item. 256
257 /// </param> 257 private void OnWorkItemStartedCallback(WorkItem workItem)
258 /// <param name="postExecuteWorkItemCallback"> 258 {
259 /// A delegate to call after the callback completion 259 lock(_lock)
260 /// </param> 260 {
261 /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param> 261 ++_workItemsExecutingInStp;
262 /// <returns>Returns a work item result</returns> 262 }
263 public IWorkItemResult QueueWorkItem( 263 }
264 WorkItemCallback callback, 264
265 object state, 265 private void OnWorkItemCompletedCallback(WorkItem workItem)
266 PostExecuteWorkItemCallback postExecuteWorkItemCallback, 266 {
267 CallToPostExecute callToPostExecute) 267 EnqueueToSTPNextWorkItem(null, true);
268 { 268 }
269 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute); 269
270 EnqueueToSTPNextWorkItem(workItem); 270 internal override void Enqueue(WorkItem workItem)
271 return workItem.GetWorkItemResult(); 271 {
272 } 272 EnqueueToSTPNextWorkItem(workItem);
273 273 }
274 /// <summary> 274
275 /// Queue a work item 275 private void EnqueueToSTPNextWorkItem(WorkItem workItem)
276 /// </summary> 276 {
277 /// <param name="callback">A callback to execute</param> 277 EnqueueToSTPNextWorkItem(workItem, false);
278 /// <param name="state"> 278 }
279 /// The context object of the work item. Used for passing arguments to the work item. 279
280 /// </param> 280 private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
281 /// <param name="postExecuteWorkItemCallback"> 281 {
282 /// A delegate to call after the callback completion 282 lock(_lock)
283 /// </param> 283 {
284 /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param> 284 // Got here from OnWorkItemCompletedCallback()
285 /// <param name="workItemPriority">The work item priority</param> 285 if (decrementWorkItemsInStpQueue)
286 /// <returns>Returns a work item result</returns> 286 {
287 public IWorkItemResult QueueWorkItem( 287 --_workItemsInStpQueue;
288 WorkItemCallback callback, 288
289 object state, 289 if(_workItemsInStpQueue < 0)
290 PostExecuteWorkItemCallback postExecuteWorkItemCallback, 290 {
291 CallToPostExecute callToPostExecute, 291 _workItemsInStpQueue = 0;
292 WorkItemPriority workItemPriority) 292 }
293 { 293
294 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority); 294 --_workItemsExecutingInStp;
295 EnqueueToSTPNextWorkItem(workItem); 295
296 return workItem.GetWorkItemResult(); 296 if(_workItemsExecutingInStp < 0)
297 } 297 {
298 298 _workItemsExecutingInStp = 0;
299 /// <summary> 299 }
300 /// Wait for the thread pool to be idle 300 }
301 /// </summary> 301
302 public void WaitForIdle() 302 // If the work item is not null then enqueue it
303 { 303 if (null != workItem)
304 WaitForIdle(Timeout.Infinite); 304 {
305 } 305 workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
306 306
307 /// <summary> 307 RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
308 /// Wait for the thread pool to be idle 308 _workItemsQueue.Enqueue(workItem);
309 /// </summary> 309 //_stp.IncrementWorkItemsCount();
310 public bool WaitForIdle(TimeSpan timeout) 310
311 { 311 if ((1 == _workItemsQueue.Count) &&
312 return WaitForIdle((int)timeout.TotalMilliseconds); 312 (0 == _workItemsInStpQueue))
313 } 313 {
314 314 _stp.RegisterWorkItemsGroup(this);
315 /// <summary> 315 IsIdle = false;
316 /// Wait for the thread pool to be idle 316 _isIdleWaitHandle.Reset();
317 /// </summary> 317 }
318 public bool WaitForIdle(int millisecondsTimeout) 318 }
319 { 319
320 _stp.ValidateWorkItemsGroupWaitForIdle(this); 320 // If the work items queue of the group is empty than quit
321 return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false); 321 if (0 == _workItemsQueue.Count)
322 } 322 {
323 323 if (0 == _workItemsInStpQueue)
324 public int WaitingCallbacks 324 {
325 { 325 _stp.UnregisterWorkItemsGroup(this);
326 get 326 IsIdle = true;
327 { 327 _isIdleWaitHandle.Set();
328 return _workItemsQueue.Count; 328 if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0)
329 } 329 {
330 } 330 _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle));
331 331 }
332 public event WorkItemsGroupIdleHandler OnIdle 332 }
333 { 333 return;
334 add 334 }
335 { 335
336 _onIdle += value; 336 if (!_isSuspended)
337 } 337 {
338 remove 338 if (_workItemsInStpQueue < _concurrency)
339 { 339 {
340 _onIdle -= value; 340 WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
341 } 341 try
342 } 342 {
343 343 _stp.Enqueue(nextWorkItem);
344 public void Cancel() 344 }
345 { 345 catch (ObjectDisposedException e)
346 lock(_lock) 346 {
347 { 347 e.GetHashCode();
348 _canceledWorkItemsGroup.IsCanceled = true; 348 // The STP has been shutdown
349 _workItemsQueue.Clear(); 349 }
350 _workItemsInStpQueue = 0; 350
351 _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); 351 ++_workItemsInStpQueue;
352 } 352 }
353 } 353 }
354 354 }
355 public void Start() 355 }
356 { 356
357 lock (this) 357 #endregion
358 { 358 }
359 if (!_workItemsGroupStartInfo.StartSuspended) 359
360 { 360 #endregion
361 return; 361}
362 }
363 _workItemsGroupStartInfo.StartSuspended = false;
364 }
365
366 for(int i = 0; i < _concurrency; ++i)
367 {
368 EnqueueToSTPNextWorkItem(null, false);
369 }
370 }
371
372 #endregion
373
374 #region Private methods
375
376 private void RegisterToWorkItemCompletion(IWorkItemResult wir)
377 {
378 IInternalWorkItemResult iwir = wir as IInternalWorkItemResult;
379 iwir.OnWorkItemStarted += new WorkItemStateCallback(OnWorkItemStartedCallback);
380 iwir.OnWorkItemCompleted += new WorkItemStateCallback(OnWorkItemCompletedCallback);
381 }
382
383 public void OnSTPIsStarting()
384 {
385 lock (this)
386 {
387 if (_workItemsGroupStartInfo.StartSuspended)
388 {
389 return;
390 }
391 }
392
393 for(int i = 0; i < _concurrency; ++i)
394 {
395 EnqueueToSTPNextWorkItem(null, false);
396 }
397 }
398
399 private object FireOnIdle(object state)
400 {
401 FireOnIdleImpl(_onIdle);
402 return null;
403 }
404
405 [MethodImpl(MethodImplOptions.NoInlining)]
406 private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
407 {
408 if(null == onIdle)
409 {
410 return;
411 }
412
413 Delegate[] delegates = onIdle.GetInvocationList();
414 foreach(WorkItemsGroupIdleHandler eh in delegates)
415 {
416 try
417 {
418 eh(this);
419 }
420 // Ignore exceptions
421 catch{}
422 }
423 }
424
425 private void OnWorkItemStartedCallback(WorkItem workItem)
426 {
427 lock(_lock)
428 {
429 ++_workItemsExecutingInStp;
430 }
431 }
432
433 private void OnWorkItemCompletedCallback(WorkItem workItem)
434 {
435 EnqueueToSTPNextWorkItem(null, true);
436 }
437
438 private void EnqueueToSTPNextWorkItem(WorkItem workItem)
439 {
440 EnqueueToSTPNextWorkItem(workItem, false);
441 }
442
443 private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
444 {
445 lock(_lock)
446 {
447 // Got here from OnWorkItemCompletedCallback()
448 if (decrementWorkItemsInStpQueue)
449 {
450 --_workItemsInStpQueue;
451
452 if(_workItemsInStpQueue < 0)
453 {
454 _workItemsInStpQueue = 0;
455 }
456
457 --_workItemsExecutingInStp;
458
459 if(_workItemsExecutingInStp < 0)
460 {
461 _workItemsExecutingInStp = 0;
462 }
463 }
464
465 // If the work item is not null then enqueue it
466 if (null != workItem)
467 {
468 workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
469
470 RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
471 _workItemsQueue.Enqueue(workItem);
472 //_stp.IncrementWorkItemsCount();
473
474 if ((1 == _workItemsQueue.Count) &&
475 (0 == _workItemsInStpQueue))
476 {
477 _stp.RegisterWorkItemsGroup(this);
478 Trace.WriteLine("WorkItemsGroup " + Name + " is NOT idle");
479 _isIdleWaitHandle.Reset();
480 }
481 }
482
483 // If the work items queue of the group is empty than quit
484 if (0 == _workItemsQueue.Count)
485 {
486 if (0 == _workItemsInStpQueue)
487 {
488 _stp.UnregisterWorkItemsGroup(this);
489 Trace.WriteLine("WorkItemsGroup " + Name + " is idle");
490 _isIdleWaitHandle.Set();
491 _stp.QueueWorkItem(new WorkItemCallback(this.FireOnIdle));
492 }
493 return;
494 }
495
496 if (!_workItemsGroupStartInfo.StartSuspended)
497 {
498 if (_workItemsInStpQueue < _concurrency)
499 {
500 WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
501 _stp.Enqueue(nextWorkItem, true);
502 ++_workItemsInStpQueue;
503 }
504 }
505 }
506 }
507
508 #endregion
509 }
510
511 #endregion
512}