aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
diff options
context:
space:
mode:
authorJustin Clark-Casey (justincc)2013-05-01 19:01:43 +0100
committerJustin Clark-Casey (justincc)2013-05-01 19:01:43 +0100
commit206fb306a7820cf593570e35ddfa8e7c5a10e449 (patch)
tree0ef0fdf42ddc0b63224af52b62b0bad42f62e352 /ThirdParty/SmartThreadPool/WorkItemsGroup.cs
parentFix CAPS to work like they should - do not send caps to the viewer if they're... (diff)
downloadopensim-SC_OLD-206fb306a7820cf593570e35ddfa8e7c5a10e449.zip
opensim-SC_OLD-206fb306a7820cf593570e35ddfa8e7c5a10e449.tar.gz
opensim-SC_OLD-206fb306a7820cf593570e35ddfa8e7c5a10e449.tar.bz2
opensim-SC_OLD-206fb306a7820cf593570e35ddfa8e7c5a10e449.tar.xz
Update SmartThreadPool to latest version 2.2.3 with a major and minor change.
SmartThreadPool code comes from http://www.codeproject.com/Articles/7933/Smart-Thread-Pool This version implements thread abort (via WorkItem.Cancel(true)), threadpool naming, max thread stack, etc. so we no longer need to manually patch those. However, two changes have been made to stock 2.2.3. Major change: WorkItem.Cancel(bool abortExecution) in our version does not succeed if the work item was in progress and thread abort was not specified. This is to match previous behaviour where we handle co-operative termination via another mechanism rather than checking WorkItem.IsCanceled. Minor change: Did not add STP's StopWatch implementation as this is only used WinCE and Silverlight and causes a build clash with System.Diagnostics.StopWatch The reason for updating is to see if this improves http://opensimulator.org/mantis/view.php?id=6557 and http://opensimulator.org/mantis/view.php?id=6586
Diffstat (limited to 'ThirdParty/SmartThreadPool/WorkItemsGroup.cs')
-rw-r--r--ThirdParty/SmartThreadPool/WorkItemsGroup.cs873
1 files changed, 361 insertions, 512 deletions
diff --git a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
index 01ac8dd..67dcbdd 100644
--- a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
+++ b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
@@ -1,512 +1,361 @@
1// Ami Bar 1using System;
2// amibar@gmail.com 2using System.Threading;
3 3using System.Runtime.CompilerServices;
4using System; 4using System.Diagnostics;
5using System.Threading; 5
6using System.Runtime.CompilerServices; 6namespace Amib.Threading.Internal
7using System.Diagnostics; 7{
8 8
9namespace Amib.Threading.Internal 9 #region WorkItemsGroup class
10{ 10
11 #region WorkItemsGroup class 11 /// <summary>
12 12 /// Summary description for WorkItemsGroup.
13 /// <summary> 13 /// </summary>
14 /// Summary description for WorkItemsGroup. 14 public class WorkItemsGroup : WorkItemsGroupBase
15 /// </summary> 15 {
16 public class WorkItemsGroup : IWorkItemsGroup 16 #region Private members
17 { 17
18 #region Private members 18 private readonly object _lock = new object();
19 19
20 private object _lock = new object(); 20 /// <summary>
21 /// <summary> 21 /// A reference to the SmartThreadPool instance that created this
22 /// Contains the name of this instance of SmartThreadPool. 22 /// WorkItemsGroup.
23 /// Can be changed by the user. 23 /// </summary>
24 /// </summary> 24 private readonly SmartThreadPool _stp;
25 private string _name = "WorkItemsGroup"; 25
26 26 /// <summary>
27 /// <summary> 27 /// The OnIdle event
28 /// A reference to the SmartThreadPool instance that created this 28 /// </summary>
29 /// WorkItemsGroup. 29 private event WorkItemsGroupIdleHandler _onIdle;
30 /// </summary> 30
31 private SmartThreadPool _stp; 31 /// <summary>
32 32 /// A flag to indicate if the Work Items Group is now suspended.
33 /// <summary> 33 /// </summary>
34 /// The OnIdle event 34 private bool _isSuspended;
35 /// </summary> 35
36 private event WorkItemsGroupIdleHandler _onIdle; 36 /// <summary>
37 37 /// Defines how many work items of this WorkItemsGroup can run at once.
38 /// <summary> 38 /// </summary>
39 /// Defines how many work items of this WorkItemsGroup can run at once. 39 private int _concurrency;
40 /// </summary> 40
41 private int _concurrency; 41 /// <summary>
42 42 /// Priority queue to hold work items before they are passed
43 /// <summary> 43 /// to the SmartThreadPool.
44 /// Priority queue to hold work items before they are passed 44 /// </summary>
45 /// to the SmartThreadPool. 45 private readonly PriorityQueue _workItemsQueue;
46 /// </summary> 46
47 private PriorityQueue _workItemsQueue; 47 /// <summary>
48 48 /// Indicate how many work items are waiting in the SmartThreadPool
49 /// <summary> 49 /// queue.
50 /// Indicate how many work items are waiting in the SmartThreadPool 50 /// This value is used to apply the concurrency.
51 /// queue. 51 /// </summary>
52 /// This value is used to apply the concurrency. 52 private int _workItemsInStpQueue;
53 /// </summary> 53
54 private int _workItemsInStpQueue; 54 /// <summary>
55 55 /// Indicate how many work items are currently running in the SmartThreadPool.
56 /// <summary> 56 /// This value is used with the Cancel, to calculate if we can send new
57 /// Indicate how many work items are currently running in the SmartThreadPool. 57 /// work items to the STP.
58 /// This value is used with the Cancel, to calculate if we can send new 58 /// </summary>
59 /// work items to the STP. 59 private int _workItemsExecutingInStp = 0;
60 /// </summary> 60
61 private int _workItemsExecutingInStp = 0; 61 /// <summary>
62 62 /// WorkItemsGroup start information
63 /// <summary> 63 /// </summary>
64 /// WorkItemsGroup start information 64 private readonly WIGStartInfo _workItemsGroupStartInfo;
65 /// </summary> 65
66 private WIGStartInfo _workItemsGroupStartInfo; 66 /// <summary>
67 67 /// Signaled when all of the WorkItemsGroup's work item completed.
68 /// <summary> 68 /// </summary>
69 /// Signaled when all of the WorkItemsGroup's work item completed. 69 //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
70 /// </summary> 70 private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true);
71 private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); 71
72 72 /// <summary>
73 /// <summary> 73 /// A common object for all the work items that this work items group
74 /// A common object for all the work items that this work items group 74 /// generate so we can mark them to cancel in O(1)
75 /// generate so we can mark them to cancel in O(1) 75 /// </summary>
76 /// </summary> 76 private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
77 private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); 77
78 78 #endregion
79 #endregion 79
80 80 #region Construction
81 #region Construction 81
82 82 public WorkItemsGroup(
83 public WorkItemsGroup( 83 SmartThreadPool stp,
84 SmartThreadPool stp, 84 int concurrency,
85 int concurrency, 85 WIGStartInfo wigStartInfo)
86 WIGStartInfo wigStartInfo) 86 {
87 { 87 if (concurrency <= 0)
88 if (concurrency <= 0) 88 {
89 { 89 throw new ArgumentOutOfRangeException(
90 throw new ArgumentOutOfRangeException("concurrency", concurrency, "concurrency must be greater than zero"); 90 "concurrency",
91 } 91#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
92 _stp = stp; 92 concurrency,
93 _concurrency = concurrency; 93#endif
94 _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo); 94 "concurrency must be greater than zero");
95 _workItemsQueue = new PriorityQueue(); 95 }
96 96 _stp = stp;
97 // The _workItemsInStpQueue gets the number of currently executing work items, 97 _concurrency = concurrency;
98 // because once a work item is executing, it cannot be cancelled. 98 _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly();
99 _workItemsInStpQueue = _workItemsExecutingInStp; 99 _workItemsQueue = new PriorityQueue();
100 } 100 Name = "WorkItemsGroup";
101 101
102 #endregion 102 // The _workItemsInStpQueue gets the number of currently executing work items,
103 103 // because once a work item is executing, it cannot be cancelled.
104 #region IWorkItemsGroup implementation 104 _workItemsInStpQueue = _workItemsExecutingInStp;
105 105
106 /// <summary> 106 _isSuspended = _workItemsGroupStartInfo.StartSuspended;
107 /// Get/Set the name of the SmartThreadPool instance 107 }
108 /// </summary> 108
109 public string Name 109 #endregion
110 { 110
111 get 111 #region WorkItemsGroupBase Overrides
112 { 112
113 return _name; 113 public override int Concurrency
114 } 114 {
115 115 get { return _concurrency; }
116 set 116 set
117 { 117 {
118 _name = value; 118 Debug.Assert(value > 0);
119 } 119
120 } 120 int diff = value - _concurrency;
121 121 _concurrency = value;
122 /// <summary> 122 if (diff > 0)
123 /// Queue a work item 123 {
124 /// </summary> 124 EnqueueToSTPNextNWorkItem(diff);
125 /// <param name="callback">A callback to execute</param> 125 }
126 /// <returns>Returns a work item result</returns> 126 }
127 public IWorkItemResult QueueWorkItem(WorkItemCallback callback) 127 }
128 { 128
129 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback); 129 public override int WaitingCallbacks
130 EnqueueToSTPNextWorkItem(workItem); 130 {
131 return workItem.GetWorkItemResult(); 131 get { return _workItemsQueue.Count; }
132 } 132 }
133 133
134 /// <summary> 134 public override object[] GetStates()
135 /// Queue a work item 135 {
136 /// </summary> 136 lock (_lock)
137 /// <param name="callback">A callback to execute</param> 137 {
138 /// <param name="workItemPriority">The priority of the work item</param> 138 object[] states = new object[_workItemsQueue.Count];
139 /// <returns>Returns a work item result</returns> 139 int i = 0;
140 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority) 140 foreach (WorkItem workItem in _workItemsQueue)
141 { 141 {
142 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, workItemPriority); 142 states[i] = workItem.GetWorkItemResult().State;
143 EnqueueToSTPNextWorkItem(workItem); 143 ++i;
144 return workItem.GetWorkItemResult(); 144 }
145 } 145 return states;
146 146 }
147 /// <summary> 147 }
148 /// Queue a work item 148
149 /// </summary> 149 /// <summary>
150 /// <param name="workItemInfo">Work item info</param> 150 /// WorkItemsGroup start information
151 /// <param name="callback">A callback to execute</param> 151 /// </summary>
152 /// <returns>Returns a work item result</returns> 152 public override WIGStartInfo WIGStartInfo
153 public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback) 153 {
154 { 154 get { return _workItemsGroupStartInfo; }
155 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback); 155 }
156 EnqueueToSTPNextWorkItem(workItem); 156
157 return workItem.GetWorkItemResult(); 157 /// <summary>
158 } 158 /// Start the Work Items Group if it was started suspended
159 159 /// </summary>
160 /// <summary> 160 public override void Start()
161 /// Queue a work item 161 {
162 /// </summary> 162 // If the Work Items Group already started then quit
163 /// <param name="callback">A callback to execute</param> 163 if (!_isSuspended)
164 /// <param name="state"> 164 {
165 /// The context object of the work item. Used for passing arguments to the work item. 165 return;
166 /// </param> 166 }
167 /// <returns>Returns a work item result</returns> 167 _isSuspended = false;
168 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state) 168
169 { 169 EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency));
170 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state); 170 }
171 EnqueueToSTPNextWorkItem(workItem); 171
172 return workItem.GetWorkItemResult(); 172 public override void Cancel(bool abortExecution)
173 } 173 {
174 174 lock (_lock)
175 /// <summary> 175 {
176 /// Queue a work item 176 _canceledWorkItemsGroup.IsCanceled = true;
177 /// </summary> 177 _workItemsQueue.Clear();
178 /// <param name="callback">A callback to execute</param> 178 _workItemsInStpQueue = 0;
179 /// <param name="state"> 179 _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
180 /// The context object of the work item. Used for passing arguments to the work item. 180 }
181 /// </param> 181
182 /// <param name="workItemPriority">The work item priority</param> 182 if (abortExecution)
183 /// <returns>Returns a work item result</returns> 183 {
184 public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority) 184 _stp.CancelAbortWorkItemsGroup(this);
185 { 185 }
186 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, workItemPriority); 186 }
187 EnqueueToSTPNextWorkItem(workItem); 187
188 return workItem.GetWorkItemResult(); 188 /// <summary>
189 } 189 /// Wait for the thread pool to be idle
190 190 /// </summary>
191 /// <summary> 191 public override bool WaitForIdle(int millisecondsTimeout)
192 /// Queue a work item 192 {
193 /// </summary> 193 SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this);
194 /// <param name="workItemInfo">Work item information</param> 194 return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
195 /// <param name="callback">A callback to execute</param> 195 }
196 /// <param name="state"> 196
197 /// The context object of the work item. Used for passing arguments to the work item. 197 public override event WorkItemsGroupIdleHandler OnIdle
198 /// </param> 198 {
199 /// <returns>Returns a work item result</returns> 199 add { _onIdle += value; }
200 public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state) 200 remove { _onIdle -= value; }
201 { 201 }
202 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback, state); 202
203 EnqueueToSTPNextWorkItem(workItem); 203 #endregion
204 return workItem.GetWorkItemResult(); 204
205 } 205 #region Private methods
206 206
207 /// <summary> 207 private void RegisterToWorkItemCompletion(IWorkItemResult wir)
208 /// Queue a work item 208 {
209 /// </summary> 209 IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir;
210 /// <param name="callback">A callback to execute</param> 210 iwir.OnWorkItemStarted += OnWorkItemStartedCallback;
211 /// <param name="state"> 211 iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback;
212 /// The context object of the work item. Used for passing arguments to the work item. 212 }
213 /// </param> 213
214 /// <param name="postExecuteWorkItemCallback"> 214 public void OnSTPIsStarting()
215 /// A delegate to call after the callback completion 215 {
216 /// </param> 216 if (_isSuspended)
217 /// <returns>Returns a work item result</returns> 217 {
218 public IWorkItemResult QueueWorkItem( 218 return;
219 WorkItemCallback callback, 219 }
220 object state, 220
221 PostExecuteWorkItemCallback postExecuteWorkItemCallback) 221 EnqueueToSTPNextNWorkItem(_concurrency);
222 { 222 }
223 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback); 223
224 EnqueueToSTPNextWorkItem(workItem); 224 public void EnqueueToSTPNextNWorkItem(int count)
225 return workItem.GetWorkItemResult(); 225 {
226 } 226 for (int i = 0; i < count; ++i)
227 227 {
228 /// <summary> 228 EnqueueToSTPNextWorkItem(null, false);
229 /// Queue a work item 229 }
230 /// </summary> 230 }
231 /// <param name="callback">A callback to execute</param> 231
232 /// <param name="state"> 232 private object FireOnIdle(object state)
233 /// The context object of the work item. Used for passing arguments to the work item. 233 {
234 /// </param> 234 FireOnIdleImpl(_onIdle);
235 /// <param name="postExecuteWorkItemCallback"> 235 return null;
236 /// A delegate to call after the callback completion 236 }
237 /// </param> 237
238 /// <param name="workItemPriority">The work item priority</param> 238 [MethodImpl(MethodImplOptions.NoInlining)]
239 /// <returns>Returns a work item result</returns> 239 private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
240 public IWorkItemResult QueueWorkItem( 240 {
241 WorkItemCallback callback, 241 if(null == onIdle)
242 object state, 242 {
243 PostExecuteWorkItemCallback postExecuteWorkItemCallback, 243 return;
244 WorkItemPriority workItemPriority) 244 }
245 { 245
246 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority); 246 Delegate[] delegates = onIdle.GetInvocationList();
247 EnqueueToSTPNextWorkItem(workItem); 247 foreach(WorkItemsGroupIdleHandler eh in delegates)
248 return workItem.GetWorkItemResult(); 248 {
249 } 249 try
250 250 {
251 /// <summary> 251 eh(this);
252 /// Queue a work item 252 }
253 /// </summary> 253 catch { } // Suppress exceptions
254 /// <param name="callback">A callback to execute</param> 254 }
255 /// <param name="state"> 255 }
256 /// The context object of the work item. Used for passing arguments to the work item. 256
257 /// </param> 257 private void OnWorkItemStartedCallback(WorkItem workItem)
258 /// <param name="postExecuteWorkItemCallback"> 258 {
259 /// A delegate to call after the callback completion 259 lock(_lock)
260 /// </param> 260 {
261 /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param> 261 ++_workItemsExecutingInStp;
262 /// <returns>Returns a work item result</returns> 262 }
263 public IWorkItemResult QueueWorkItem( 263 }
264 WorkItemCallback callback, 264
265 object state, 265 private void OnWorkItemCompletedCallback(WorkItem workItem)
266 PostExecuteWorkItemCallback postExecuteWorkItemCallback, 266 {
267 CallToPostExecute callToPostExecute) 267 EnqueueToSTPNextWorkItem(null, true);
268 { 268 }
269 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute); 269
270 EnqueueToSTPNextWorkItem(workItem); 270 internal override void Enqueue(WorkItem workItem)
271 return workItem.GetWorkItemResult(); 271 {
272 } 272 EnqueueToSTPNextWorkItem(workItem);
273 273 }
274 /// <summary> 274
275 /// Queue a work item 275 private void EnqueueToSTPNextWorkItem(WorkItem workItem)
276 /// </summary> 276 {
277 /// <param name="callback">A callback to execute</param> 277 EnqueueToSTPNextWorkItem(workItem, false);
278 /// <param name="state"> 278 }
279 /// The context object of the work item. Used for passing arguments to the work item. 279
280 /// </param> 280 private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
281 /// <param name="postExecuteWorkItemCallback"> 281 {
282 /// A delegate to call after the callback completion 282 lock(_lock)
283 /// </param> 283 {
284 /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param> 284 // Got here from OnWorkItemCompletedCallback()
285 /// <param name="workItemPriority">The work item priority</param> 285 if (decrementWorkItemsInStpQueue)
286 /// <returns>Returns a work item result</returns> 286 {
287 public IWorkItemResult QueueWorkItem( 287 --_workItemsInStpQueue;
288 WorkItemCallback callback, 288
289 object state, 289 if(_workItemsInStpQueue < 0)
290 PostExecuteWorkItemCallback postExecuteWorkItemCallback, 290 {
291 CallToPostExecute callToPostExecute, 291 _workItemsInStpQueue = 0;
292 WorkItemPriority workItemPriority) 292 }
293 { 293
294 WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority); 294 --_workItemsExecutingInStp;
295 EnqueueToSTPNextWorkItem(workItem); 295
296 return workItem.GetWorkItemResult(); 296 if(_workItemsExecutingInStp < 0)
297 } 297 {
298 298 _workItemsExecutingInStp = 0;
299 /// <summary> 299 }
300 /// Wait for the thread pool to be idle 300 }
301 /// </summary> 301
302 public void WaitForIdle() 302 // If the work item is not null then enqueue it
303 { 303 if (null != workItem)
304 WaitForIdle(Timeout.Infinite); 304 {
305 } 305 workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
306 306
307 /// <summary> 307 RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
308 /// Wait for the thread pool to be idle 308 _workItemsQueue.Enqueue(workItem);
309 /// </summary> 309 //_stp.IncrementWorkItemsCount();
310 public bool WaitForIdle(TimeSpan timeout) 310
311 { 311 if ((1 == _workItemsQueue.Count) &&
312 return WaitForIdle((int)timeout.TotalMilliseconds); 312 (0 == _workItemsInStpQueue))
313 } 313 {
314 314 _stp.RegisterWorkItemsGroup(this);
315 /// <summary> 315 IsIdle = false;
316 /// Wait for the thread pool to be idle 316 _isIdleWaitHandle.Reset();
317 /// </summary> 317 }
318 public bool WaitForIdle(int millisecondsTimeout) 318 }
319 { 319
320 _stp.ValidateWorkItemsGroupWaitForIdle(this); 320 // If the work items queue of the group is empty than quit
321 return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false); 321 if (0 == _workItemsQueue.Count)
322 } 322 {
323 323 if (0 == _workItemsInStpQueue)
324 public int WaitingCallbacks 324 {
325 { 325 _stp.UnregisterWorkItemsGroup(this);
326 get 326 IsIdle = true;
327 { 327 _isIdleWaitHandle.Set();
328 return _workItemsQueue.Count; 328 if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0)
329 } 329 {
330 } 330 _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle));
331 331 }
332 public event WorkItemsGroupIdleHandler OnIdle 332 }
333 { 333 return;
334 add 334 }
335 { 335
336 _onIdle += value; 336 if (!_isSuspended)
337 } 337 {
338 remove 338 if (_workItemsInStpQueue < _concurrency)
339 { 339 {
340 _onIdle -= value; 340 WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
341 } 341 try
342 } 342 {
343 343 _stp.Enqueue(nextWorkItem);
344 public void Cancel() 344 }
345 { 345 catch (ObjectDisposedException e)
346 lock(_lock) 346 {
347 { 347 e.GetHashCode();
348 _canceledWorkItemsGroup.IsCanceled = true; 348 // The STP has been shutdown
349 _workItemsQueue.Clear(); 349 }
350 _workItemsInStpQueue = 0; 350
351 _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); 351 ++_workItemsInStpQueue;
352 } 352 }
353 } 353 }
354 354 }
355 public void Start() 355 }
356 { 356
357 lock (this) 357 #endregion
358 { 358 }
359 if (!_workItemsGroupStartInfo.StartSuspended) 359
360 { 360 #endregion
361 return; 361}
362 }
363 _workItemsGroupStartInfo.StartSuspended = false;
364 }
365
366 for(int i = 0; i < _concurrency; ++i)
367 {
368 EnqueueToSTPNextWorkItem(null, false);
369 }
370 }
371
372 #endregion
373
374 #region Private methods
375
376 private void RegisterToWorkItemCompletion(IWorkItemResult wir)
377 {
378 IInternalWorkItemResult iwir = wir as IInternalWorkItemResult;
379 iwir.OnWorkItemStarted += new WorkItemStateCallback(OnWorkItemStartedCallback);
380 iwir.OnWorkItemCompleted += new WorkItemStateCallback(OnWorkItemCompletedCallback);
381 }
382
383 public void OnSTPIsStarting()
384 {
385 lock (this)
386 {
387 if (_workItemsGroupStartInfo.StartSuspended)
388 {
389 return;
390 }
391 }
392
393 for(int i = 0; i < _concurrency; ++i)
394 {
395 EnqueueToSTPNextWorkItem(null, false);
396 }
397 }
398
399 private object FireOnIdle(object state)
400 {
401 FireOnIdleImpl(_onIdle);
402 return null;
403 }
404
405 [MethodImpl(MethodImplOptions.NoInlining)]
406 private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
407 {
408 if(null == onIdle)
409 {
410 return;
411 }
412
413 Delegate[] delegates = onIdle.GetInvocationList();
414 foreach(WorkItemsGroupIdleHandler eh in delegates)
415 {
416 try
417 {
418 eh(this);
419 }
420 // Ignore exceptions
421 catch{}
422 }
423 }
424
425 private void OnWorkItemStartedCallback(WorkItem workItem)
426 {
427 lock(_lock)
428 {
429 ++_workItemsExecutingInStp;
430 }
431 }
432
433 private void OnWorkItemCompletedCallback(WorkItem workItem)
434 {
435 EnqueueToSTPNextWorkItem(null, true);
436 }
437
438 private void EnqueueToSTPNextWorkItem(WorkItem workItem)
439 {
440 EnqueueToSTPNextWorkItem(workItem, false);
441 }
442
443 private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
444 {
445 lock(_lock)
446 {
447 // Got here from OnWorkItemCompletedCallback()
448 if (decrementWorkItemsInStpQueue)
449 {
450 --_workItemsInStpQueue;
451
452 if(_workItemsInStpQueue < 0)
453 {
454 _workItemsInStpQueue = 0;
455 }
456
457 --_workItemsExecutingInStp;
458
459 if(_workItemsExecutingInStp < 0)
460 {
461 _workItemsExecutingInStp = 0;
462 }
463 }
464
465 // If the work item is not null then enqueue it
466 if (null != workItem)
467 {
468 workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
469
470 RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
471 _workItemsQueue.Enqueue(workItem);
472 //_stp.IncrementWorkItemsCount();
473
474 if ((1 == _workItemsQueue.Count) &&
475 (0 == _workItemsInStpQueue))
476 {
477 _stp.RegisterWorkItemsGroup(this);
478 Trace.WriteLine("WorkItemsGroup " + Name + " is NOT idle");
479 _isIdleWaitHandle.Reset();
480 }
481 }
482
483 // If the work items queue of the group is empty than quit
484 if (0 == _workItemsQueue.Count)
485 {
486 if (0 == _workItemsInStpQueue)
487 {
488 _stp.UnregisterWorkItemsGroup(this);
489 Trace.WriteLine("WorkItemsGroup " + Name + " is idle");
490 _isIdleWaitHandle.Set();
491 _stp.QueueWorkItem(new WorkItemCallback(this.FireOnIdle));
492 }
493 return;
494 }
495
496 if (!_workItemsGroupStartInfo.StartSuspended)
497 {
498 if (_workItemsInStpQueue < _concurrency)
499 {
500 WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
501 _stp.Enqueue(nextWorkItem, true);
502 ++_workItemsInStpQueue;
503 }
504 }
505 }
506 }
507
508 #endregion
509 }
510
511 #endregion
512}