diff options
Diffstat (limited to '')
-rw-r--r-- | ThirdParty/SmartThreadPool/WorkItemsGroup.cs | 759 |
1 files changed, 304 insertions, 455 deletions
diff --git a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs index 01ac8dd..d9d34ac 100644 --- a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs +++ b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs | |||
@@ -1,6 +1,3 @@ | |||
1 | // Ami Bar | ||
2 | // amibar@gmail.com | ||
3 | |||
4 | using System; | 1 | using System; |
5 | using System.Threading; | 2 | using System.Threading; |
6 | using System.Runtime.CompilerServices; | 3 | using System.Runtime.CompilerServices; |
@@ -8,505 +5,357 @@ using System.Diagnostics; | |||
8 | 5 | ||
9 | namespace Amib.Threading.Internal | 6 | namespace Amib.Threading.Internal |
10 | { | 7 | { |
11 | #region WorkItemsGroup class | ||
12 | |||
13 | /// <summary> | ||
14 | /// Summary description for WorkItemsGroup. | ||
15 | /// </summary> | ||
16 | public class WorkItemsGroup : IWorkItemsGroup | ||
17 | { | ||
18 | #region Private members | ||
19 | |||
20 | private object _lock = new object(); | ||
21 | /// <summary> | ||
22 | /// Contains the name of this instance of SmartThreadPool. | ||
23 | /// Can be changed by the user. | ||
24 | /// </summary> | ||
25 | private string _name = "WorkItemsGroup"; | ||
26 | |||
27 | /// <summary> | ||
28 | /// A reference to the SmartThreadPool instance that created this | ||
29 | /// WorkItemsGroup. | ||
30 | /// </summary> | ||
31 | private SmartThreadPool _stp; | ||
32 | |||
33 | /// <summary> | ||
34 | /// The OnIdle event | ||
35 | /// </summary> | ||
36 | private event WorkItemsGroupIdleHandler _onIdle; | ||
37 | 8 | ||
38 | /// <summary> | 9 | #region WorkItemsGroup class |
39 | /// Defines how many work items of this WorkItemsGroup can run at once. | ||
40 | /// </summary> | ||
41 | private int _concurrency; | ||
42 | 10 | ||
43 | /// <summary> | 11 | /// <summary> |
44 | /// Priority queue to hold work items before they are passed | 12 | /// Summary description for WorkItemsGroup. |
45 | /// to the SmartThreadPool. | 13 | /// </summary> |
46 | /// </summary> | 14 | public class WorkItemsGroup : WorkItemsGroupBase |
47 | private PriorityQueue _workItemsQueue; | 15 | { |
16 | #region Private members | ||
48 | 17 | ||
49 | /// <summary> | 18 | private readonly object _lock = new object(); |
50 | /// Indicate how many work items are waiting in the SmartThreadPool | ||
51 | /// queue. | ||
52 | /// This value is used to apply the concurrency. | ||
53 | /// </summary> | ||
54 | private int _workItemsInStpQueue; | ||
55 | 19 | ||
56 | /// <summary> | 20 | /// <summary> |
57 | /// Indicate how many work items are currently running in the SmartThreadPool. | 21 | /// A reference to the SmartThreadPool instance that created this |
58 | /// This value is used with the Cancel, to calculate if we can send new | 22 | /// WorkItemsGroup. |
59 | /// work items to the STP. | 23 | /// </summary> |
60 | /// </summary> | 24 | private readonly SmartThreadPool _stp; |
61 | private int _workItemsExecutingInStp = 0; | ||
62 | 25 | ||
63 | /// <summary> | 26 | /// <summary> |
64 | /// WorkItemsGroup start information | 27 | /// The OnIdle event |
65 | /// </summary> | 28 | /// </summary> |
66 | private WIGStartInfo _workItemsGroupStartInfo; | 29 | private event WorkItemsGroupIdleHandler _onIdle; |
67 | 30 | ||
68 | /// <summary> | 31 | /// <summary> |
69 | /// Signaled when all of the WorkItemsGroup's work item completed. | 32 | /// A flag to indicate if the Work Items Group is now suspended. |
70 | /// </summary> | 33 | /// </summary> |
71 | private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); | 34 | private bool _isSuspended; |
72 | 35 | ||
73 | /// <summary> | 36 | /// <summary> |
74 | /// A common object for all the work items that this work items group | 37 | /// Defines how many work items of this WorkItemsGroup can run at once. |
75 | /// generate so we can mark them to cancel in O(1) | 38 | /// </summary> |
76 | /// </summary> | 39 | private int _concurrency; |
77 | private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); | 40 | |
78 | 41 | /// <summary> | |
79 | #endregion | 42 | /// Priority queue to hold work items before they are passed |
80 | 43 | /// to the SmartThreadPool. | |
81 | #region Construction | 44 | /// </summary> |
82 | 45 | private readonly PriorityQueue _workItemsQueue; | |
83 | public WorkItemsGroup( | 46 | |
84 | SmartThreadPool stp, | 47 | /// <summary> |
85 | int concurrency, | 48 | /// Indicate how many work items are waiting in the SmartThreadPool |
86 | WIGStartInfo wigStartInfo) | 49 | /// queue. |
50 | /// This value is used to apply the concurrency. | ||
51 | /// </summary> | ||
52 | private int _workItemsInStpQueue; | ||
53 | |||
54 | /// <summary> | ||
55 | /// Indicate how many work items are currently running in the SmartThreadPool. | ||
56 | /// This value is used with the Cancel, to calculate if we can send new | ||
57 | /// work items to the STP. | ||
58 | /// </summary> | ||
59 | private int _workItemsExecutingInStp = 0; | ||
60 | |||
61 | /// <summary> | ||
62 | /// WorkItemsGroup start information | ||
63 | /// </summary> | ||
64 | private readonly WIGStartInfo _workItemsGroupStartInfo; | ||
65 | |||
66 | /// <summary> | ||
67 | /// Signaled when all of the WorkItemsGroup's work item completed. | ||
68 | /// </summary> | ||
69 | //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); | ||
70 | private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true); | ||
71 | |||
72 | /// <summary> | ||
73 | /// A common object for all the work items that this work items group | ||
74 | /// generate so we can mark them to cancel in O(1) | ||
75 | /// </summary> | ||
76 | private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); | ||
77 | |||
78 | #endregion | ||
79 | |||
80 | #region Construction | ||
81 | |||
82 | public WorkItemsGroup( | ||
83 | SmartThreadPool stp, | ||
84 | int concurrency, | ||
85 | WIGStartInfo wigStartInfo) | ||
86 | { | ||
87 | if (concurrency <= 0) | ||
88 | { | ||
89 | throw new ArgumentOutOfRangeException( | ||
90 | "concurrency", | ||
91 | #if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE) | ||
92 | concurrency, | ||
93 | #endif | ||
94 | "concurrency must be greater than zero"); | ||
95 | } | ||
96 | _stp = stp; | ||
97 | _concurrency = concurrency; | ||
98 | _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly(); | ||
99 | _workItemsQueue = new PriorityQueue(); | ||
100 | Name = "WorkItemsGroup"; | ||
101 | |||
102 | // The _workItemsInStpQueue gets the number of currently executing work items, | ||
103 | // because once a work item is executing, it cannot be cancelled. | ||
104 | _workItemsInStpQueue = _workItemsExecutingInStp; | ||
105 | |||
106 | _isSuspended = _workItemsGroupStartInfo.StartSuspended; | ||
107 | } | ||
108 | |||
109 | #endregion | ||
110 | |||
111 | #region WorkItemsGroupBase Overrides | ||
112 | |||
113 | public override int Concurrency | ||
87 | { | 114 | { |
88 | if (concurrency <= 0) | 115 | get { return _concurrency; } |
89 | { | ||
90 | throw new ArgumentOutOfRangeException("concurrency", concurrency, "concurrency must be greater than zero"); | ||
91 | } | ||
92 | _stp = stp; | ||
93 | _concurrency = concurrency; | ||
94 | _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo); | ||
95 | _workItemsQueue = new PriorityQueue(); | ||
96 | |||
97 | // The _workItemsInStpQueue gets the number of currently executing work items, | ||
98 | // because once a work item is executing, it cannot be cancelled. | ||
99 | _workItemsInStpQueue = _workItemsExecutingInStp; | ||
100 | } | ||
101 | |||
102 | #endregion | ||
103 | |||
104 | #region IWorkItemsGroup implementation | ||
105 | |||
106 | /// <summary> | ||
107 | /// Get/Set the name of the SmartThreadPool instance | ||
108 | /// </summary> | ||
109 | public string Name | ||
110 | { | ||
111 | get | ||
112 | { | ||
113 | return _name; | ||
114 | } | ||
115 | |||
116 | set | 116 | set |
117 | { | 117 | { |
118 | _name = value; | 118 | Debug.Assert(value > 0); |
119 | } | ||
120 | } | ||
121 | |||
122 | /// <summary> | ||
123 | /// Queue a work item | ||
124 | /// </summary> | ||
125 | /// <param name="callback">A callback to execute</param> | ||
126 | /// <returns>Returns a work item result</returns> | ||
127 | public IWorkItemResult QueueWorkItem(WorkItemCallback callback) | ||
128 | { | ||
129 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback); | ||
130 | EnqueueToSTPNextWorkItem(workItem); | ||
131 | return workItem.GetWorkItemResult(); | ||
132 | } | ||
133 | 119 | ||
134 | /// <summary> | 120 | int diff = value - _concurrency; |
135 | /// Queue a work item | 121 | _concurrency = value; |
136 | /// </summary> | 122 | if (diff > 0) |
137 | /// <param name="callback">A callback to execute</param> | 123 | { |
138 | /// <param name="workItemPriority">The priority of the work item</param> | 124 | EnqueueToSTPNextNWorkItem(diff); |
139 | /// <returns>Returns a work item result</returns> | 125 | } |
140 | public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority) | 126 | } |
141 | { | ||
142 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, workItemPriority); | ||
143 | EnqueueToSTPNextWorkItem(workItem); | ||
144 | return workItem.GetWorkItemResult(); | ||
145 | } | ||
146 | |||
147 | /// <summary> | ||
148 | /// Queue a work item | ||
149 | /// </summary> | ||
150 | /// <param name="workItemInfo">Work item info</param> | ||
151 | /// <param name="callback">A callback to execute</param> | ||
152 | /// <returns>Returns a work item result</returns> | ||
153 | public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback) | ||
154 | { | ||
155 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback); | ||
156 | EnqueueToSTPNextWorkItem(workItem); | ||
157 | return workItem.GetWorkItemResult(); | ||
158 | } | ||
159 | |||
160 | /// <summary> | ||
161 | /// Queue a work item | ||
162 | /// </summary> | ||
163 | /// <param name="callback">A callback to execute</param> | ||
164 | /// <param name="state"> | ||
165 | /// The context object of the work item. Used for passing arguments to the work item. | ||
166 | /// </param> | ||
167 | /// <returns>Returns a work item result</returns> | ||
168 | public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state) | ||
169 | { | ||
170 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state); | ||
171 | EnqueueToSTPNextWorkItem(workItem); | ||
172 | return workItem.GetWorkItemResult(); | ||
173 | } | ||
174 | |||
175 | /// <summary> | ||
176 | /// Queue a work item | ||
177 | /// </summary> | ||
178 | /// <param name="callback">A callback to execute</param> | ||
179 | /// <param name="state"> | ||
180 | /// The context object of the work item. Used for passing arguments to the work item. | ||
181 | /// </param> | ||
182 | /// <param name="workItemPriority">The work item priority</param> | ||
183 | /// <returns>Returns a work item result</returns> | ||
184 | public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority) | ||
185 | { | ||
186 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, workItemPriority); | ||
187 | EnqueueToSTPNextWorkItem(workItem); | ||
188 | return workItem.GetWorkItemResult(); | ||
189 | } | ||
190 | |||
191 | /// <summary> | ||
192 | /// Queue a work item | ||
193 | /// </summary> | ||
194 | /// <param name="workItemInfo">Work item information</param> | ||
195 | /// <param name="callback">A callback to execute</param> | ||
196 | /// <param name="state"> | ||
197 | /// The context object of the work item. Used for passing arguments to the work item. | ||
198 | /// </param> | ||
199 | /// <returns>Returns a work item result</returns> | ||
200 | public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state) | ||
201 | { | ||
202 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback, state); | ||
203 | EnqueueToSTPNextWorkItem(workItem); | ||
204 | return workItem.GetWorkItemResult(); | ||
205 | } | ||
206 | |||
207 | /// <summary> | ||
208 | /// Queue a work item | ||
209 | /// </summary> | ||
210 | /// <param name="callback">A callback to execute</param> | ||
211 | /// <param name="state"> | ||
212 | /// The context object of the work item. Used for passing arguments to the work item. | ||
213 | /// </param> | ||
214 | /// <param name="postExecuteWorkItemCallback"> | ||
215 | /// A delegate to call after the callback completion | ||
216 | /// </param> | ||
217 | /// <returns>Returns a work item result</returns> | ||
218 | public IWorkItemResult QueueWorkItem( | ||
219 | WorkItemCallback callback, | ||
220 | object state, | ||
221 | PostExecuteWorkItemCallback postExecuteWorkItemCallback) | ||
222 | { | ||
223 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback); | ||
224 | EnqueueToSTPNextWorkItem(workItem); | ||
225 | return workItem.GetWorkItemResult(); | ||
226 | } | ||
227 | |||
228 | /// <summary> | ||
229 | /// Queue a work item | ||
230 | /// </summary> | ||
231 | /// <param name="callback">A callback to execute</param> | ||
232 | /// <param name="state"> | ||
233 | /// The context object of the work item. Used for passing arguments to the work item. | ||
234 | /// </param> | ||
235 | /// <param name="postExecuteWorkItemCallback"> | ||
236 | /// A delegate to call after the callback completion | ||
237 | /// </param> | ||
238 | /// <param name="workItemPriority">The work item priority</param> | ||
239 | /// <returns>Returns a work item result</returns> | ||
240 | public IWorkItemResult QueueWorkItem( | ||
241 | WorkItemCallback callback, | ||
242 | object state, | ||
243 | PostExecuteWorkItemCallback postExecuteWorkItemCallback, | ||
244 | WorkItemPriority workItemPriority) | ||
245 | { | ||
246 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority); | ||
247 | EnqueueToSTPNextWorkItem(workItem); | ||
248 | return workItem.GetWorkItemResult(); | ||
249 | } | ||
250 | |||
251 | /// <summary> | ||
252 | /// Queue a work item | ||
253 | /// </summary> | ||
254 | /// <param name="callback">A callback to execute</param> | ||
255 | /// <param name="state"> | ||
256 | /// The context object of the work item. Used for passing arguments to the work item. | ||
257 | /// </param> | ||
258 | /// <param name="postExecuteWorkItemCallback"> | ||
259 | /// A delegate to call after the callback completion | ||
260 | /// </param> | ||
261 | /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param> | ||
262 | /// <returns>Returns a work item result</returns> | ||
263 | public IWorkItemResult QueueWorkItem( | ||
264 | WorkItemCallback callback, | ||
265 | object state, | ||
266 | PostExecuteWorkItemCallback postExecuteWorkItemCallback, | ||
267 | CallToPostExecute callToPostExecute) | ||
268 | { | ||
269 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute); | ||
270 | EnqueueToSTPNextWorkItem(workItem); | ||
271 | return workItem.GetWorkItemResult(); | ||
272 | } | 127 | } |
273 | 128 | ||
274 | /// <summary> | 129 | public override int WaitingCallbacks |
275 | /// Queue a work item | ||
276 | /// </summary> | ||
277 | /// <param name="callback">A callback to execute</param> | ||
278 | /// <param name="state"> | ||
279 | /// The context object of the work item. Used for passing arguments to the work item. | ||
280 | /// </param> | ||
281 | /// <param name="postExecuteWorkItemCallback"> | ||
282 | /// A delegate to call after the callback completion | ||
283 | /// </param> | ||
284 | /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param> | ||
285 | /// <param name="workItemPriority">The work item priority</param> | ||
286 | /// <returns>Returns a work item result</returns> | ||
287 | public IWorkItemResult QueueWorkItem( | ||
288 | WorkItemCallback callback, | ||
289 | object state, | ||
290 | PostExecuteWorkItemCallback postExecuteWorkItemCallback, | ||
291 | CallToPostExecute callToPostExecute, | ||
292 | WorkItemPriority workItemPriority) | ||
293 | { | 130 | { |
294 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority); | 131 | get { return _workItemsQueue.Count; } |
295 | EnqueueToSTPNextWorkItem(workItem); | ||
296 | return workItem.GetWorkItemResult(); | ||
297 | } | 132 | } |
298 | 133 | ||
299 | /// <summary> | 134 | public override object[] GetStates() |
300 | /// Wait for the thread pool to be idle | ||
301 | /// </summary> | ||
302 | public void WaitForIdle() | ||
303 | { | 135 | { |
304 | WaitForIdle(Timeout.Infinite); | 136 | lock (_lock) |
137 | { | ||
138 | object[] states = new object[_workItemsQueue.Count]; | ||
139 | int i = 0; | ||
140 | foreach (WorkItem workItem in _workItemsQueue) | ||
141 | { | ||
142 | states[i] = workItem.GetWorkItemResult().State; | ||
143 | ++i; | ||
144 | } | ||
145 | return states; | ||
146 | } | ||
305 | } | 147 | } |
306 | 148 | ||
307 | /// <summary> | 149 | /// <summary> |
308 | /// Wait for the thread pool to be idle | 150 | /// WorkItemsGroup start information |
309 | /// </summary> | 151 | /// </summary> |
310 | public bool WaitForIdle(TimeSpan timeout) | 152 | public override WIGStartInfo WIGStartInfo |
311 | { | 153 | { |
312 | return WaitForIdle((int)timeout.TotalMilliseconds); | 154 | get { return _workItemsGroupStartInfo; } |
313 | } | 155 | } |
314 | 156 | ||
315 | /// <summary> | 157 | /// <summary> |
158 | /// Start the Work Items Group if it was started suspended | ||
159 | /// </summary> | ||
160 | public override void Start() | ||
161 | { | ||
162 | // If the Work Items Group already started then quit | ||
163 | if (!_isSuspended) | ||
164 | { | ||
165 | return; | ||
166 | } | ||
167 | _isSuspended = false; | ||
168 | |||
169 | EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency)); | ||
170 | } | ||
171 | |||
172 | public override void Cancel(bool abortExecution) | ||
173 | { | ||
174 | lock (_lock) | ||
175 | { | ||
176 | _canceledWorkItemsGroup.IsCanceled = true; | ||
177 | _workItemsQueue.Clear(); | ||
178 | _workItemsInStpQueue = 0; | ||
179 | _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); | ||
180 | } | ||
181 | |||
182 | if (abortExecution) | ||
183 | { | ||
184 | _stp.CancelAbortWorkItemsGroup(this); | ||
185 | } | ||
186 | } | ||
187 | |||
188 | /// <summary> | ||
316 | /// Wait for the thread pool to be idle | 189 | /// Wait for the thread pool to be idle |
317 | /// </summary> | 190 | /// </summary> |
318 | public bool WaitForIdle(int millisecondsTimeout) | 191 | public override bool WaitForIdle(int millisecondsTimeout) |
319 | { | 192 | { |
320 | _stp.ValidateWorkItemsGroupWaitForIdle(this); | 193 | SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this); |
321 | return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false); | 194 | return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false); |
322 | } | 195 | } |
323 | 196 | ||
324 | public int WaitingCallbacks | 197 | public override event WorkItemsGroupIdleHandler OnIdle |
325 | { | 198 | { |
326 | get | 199 | add { _onIdle += value; } |
327 | { | 200 | remove { _onIdle -= value; } |
328 | return _workItemsQueue.Count; | 201 | } |
329 | } | ||
330 | } | ||
331 | 202 | ||
332 | public event WorkItemsGroupIdleHandler OnIdle | 203 | #endregion |
333 | { | ||
334 | add | ||
335 | { | ||
336 | _onIdle += value; | ||
337 | } | ||
338 | remove | ||
339 | { | ||
340 | _onIdle -= value; | ||
341 | } | ||
342 | } | ||
343 | 204 | ||
344 | public void Cancel() | 205 | #region Private methods |
345 | { | ||
346 | lock(_lock) | ||
347 | { | ||
348 | _canceledWorkItemsGroup.IsCanceled = true; | ||
349 | _workItemsQueue.Clear(); | ||
350 | _workItemsInStpQueue = 0; | ||
351 | _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); | ||
352 | } | ||
353 | } | ||
354 | 206 | ||
355 | public void Start() | 207 | private void RegisterToWorkItemCompletion(IWorkItemResult wir) |
356 | { | 208 | { |
357 | lock (this) | 209 | IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir; |
358 | { | 210 | iwir.OnWorkItemStarted += OnWorkItemStartedCallback; |
359 | if (!_workItemsGroupStartInfo.StartSuspended) | 211 | iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback; |
360 | { | 212 | } |
361 | return; | ||
362 | } | ||
363 | _workItemsGroupStartInfo.StartSuspended = false; | ||
364 | } | ||
365 | |||
366 | for(int i = 0; i < _concurrency; ++i) | ||
367 | { | ||
368 | EnqueueToSTPNextWorkItem(null, false); | ||
369 | } | ||
370 | } | ||
371 | |||
372 | #endregion | ||
373 | 213 | ||
374 | #region Private methods | 214 | public void OnSTPIsStarting() |
375 | 215 | { | |
376 | private void RegisterToWorkItemCompletion(IWorkItemResult wir) | 216 | if (_isSuspended) |
377 | { | ||
378 | IInternalWorkItemResult iwir = wir as IInternalWorkItemResult; | ||
379 | iwir.OnWorkItemStarted += new WorkItemStateCallback(OnWorkItemStartedCallback); | ||
380 | iwir.OnWorkItemCompleted += new WorkItemStateCallback(OnWorkItemCompletedCallback); | ||
381 | } | ||
382 | |||
383 | public void OnSTPIsStarting() | ||
384 | { | ||
385 | lock (this) | ||
386 | { | ||
387 | if (_workItemsGroupStartInfo.StartSuspended) | ||
388 | { | ||
389 | return; | ||
390 | } | ||
391 | } | ||
392 | |||
393 | for(int i = 0; i < _concurrency; ++i) | ||
394 | { | ||
395 | EnqueueToSTPNextWorkItem(null, false); | ||
396 | } | ||
397 | } | ||
398 | |||
399 | private object FireOnIdle(object state) | ||
400 | { | ||
401 | FireOnIdleImpl(_onIdle); | ||
402 | return null; | ||
403 | } | ||
404 | |||
405 | [MethodImpl(MethodImplOptions.NoInlining)] | ||
406 | private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle) | ||
407 | { | ||
408 | if(null == onIdle) | ||
409 | { | 217 | { |
410 | return; | 218 | return; |
411 | } | 219 | } |
220 | |||
221 | EnqueueToSTPNextNWorkItem(_concurrency); | ||
222 | } | ||
412 | 223 | ||
413 | Delegate[] delegates = onIdle.GetInvocationList(); | 224 | public void EnqueueToSTPNextNWorkItem(int count) |
414 | foreach(WorkItemsGroupIdleHandler eh in delegates) | ||
415 | { | ||
416 | try | ||
417 | { | ||
418 | eh(this); | ||
419 | } | ||
420 | // Ignore exceptions | ||
421 | catch{} | ||
422 | } | ||
423 | } | ||
424 | |||
425 | private void OnWorkItemStartedCallback(WorkItem workItem) | ||
426 | { | 225 | { |
427 | lock(_lock) | 226 | for (int i = 0; i < count; ++i) |
428 | { | 227 | { |
429 | ++_workItemsExecutingInStp; | 228 | EnqueueToSTPNextWorkItem(null, false); |
430 | } | 229 | } |
431 | } | 230 | } |
432 | 231 | ||
433 | private void OnWorkItemCompletedCallback(WorkItem workItem) | 232 | private object FireOnIdle(object state) |
434 | { | 233 | { |
435 | EnqueueToSTPNextWorkItem(null, true); | 234 | FireOnIdleImpl(_onIdle); |
436 | } | 235 | return null; |
437 | 236 | } | |
438 | private void EnqueueToSTPNextWorkItem(WorkItem workItem) | 237 | |
238 | [MethodImpl(MethodImplOptions.NoInlining)] | ||
239 | private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle) | ||
240 | { | ||
241 | if(null == onIdle) | ||
242 | { | ||
243 | return; | ||
244 | } | ||
245 | |||
246 | Delegate[] delegates = onIdle.GetInvocationList(); | ||
247 | foreach(WorkItemsGroupIdleHandler eh in delegates) | ||
248 | { | ||
249 | try | ||
250 | { | ||
251 | eh(this); | ||
252 | } | ||
253 | catch { } // Suppress exceptions | ||
254 | } | ||
255 | } | ||
256 | |||
257 | private void OnWorkItemStartedCallback(WorkItem workItem) | ||
258 | { | ||
259 | lock(_lock) | ||
260 | { | ||
261 | ++_workItemsExecutingInStp; | ||
262 | } | ||
263 | } | ||
264 | |||
265 | private void OnWorkItemCompletedCallback(WorkItem workItem) | ||
266 | { | ||
267 | EnqueueToSTPNextWorkItem(null, true); | ||
268 | } | ||
269 | |||
270 | internal override void Enqueue(WorkItem workItem) | ||
439 | { | 271 | { |
440 | EnqueueToSTPNextWorkItem(workItem, false); | 272 | EnqueueToSTPNextWorkItem(workItem); |
441 | } | 273 | } |
442 | 274 | ||
443 | private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue) | 275 | private void EnqueueToSTPNextWorkItem(WorkItem workItem) |
444 | { | 276 | { |
445 | lock(_lock) | 277 | EnqueueToSTPNextWorkItem(workItem, false); |
446 | { | 278 | } |
447 | // Got here from OnWorkItemCompletedCallback() | 279 | |
448 | if (decrementWorkItemsInStpQueue) | 280 | private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue) |
449 | { | 281 | { |
450 | --_workItemsInStpQueue; | 282 | lock(_lock) |
451 | 283 | { | |
452 | if(_workItemsInStpQueue < 0) | 284 | // Got here from OnWorkItemCompletedCallback() |
453 | { | 285 | if (decrementWorkItemsInStpQueue) |
454 | _workItemsInStpQueue = 0; | 286 | { |
455 | } | 287 | --_workItemsInStpQueue; |
456 | 288 | ||
457 | --_workItemsExecutingInStp; | 289 | if(_workItemsInStpQueue < 0) |
458 | 290 | { | |
459 | if(_workItemsExecutingInStp < 0) | 291 | _workItemsInStpQueue = 0; |
460 | { | 292 | } |
461 | _workItemsExecutingInStp = 0; | 293 | |
462 | } | 294 | --_workItemsExecutingInStp; |
463 | } | 295 | |
464 | 296 | if(_workItemsExecutingInStp < 0) | |
465 | // If the work item is not null then enqueue it | 297 | { |
466 | if (null != workItem) | 298 | _workItemsExecutingInStp = 0; |
467 | { | 299 | } |
468 | workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup; | 300 | } |
469 | 301 | ||
470 | RegisterToWorkItemCompletion(workItem.GetWorkItemResult()); | 302 | // If the work item is not null then enqueue it |
471 | _workItemsQueue.Enqueue(workItem); | 303 | if (null != workItem) |
472 | //_stp.IncrementWorkItemsCount(); | 304 | { |
473 | 305 | workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup; | |
474 | if ((1 == _workItemsQueue.Count) && | 306 | |
475 | (0 == _workItemsInStpQueue)) | 307 | RegisterToWorkItemCompletion(workItem.GetWorkItemResult()); |
476 | { | 308 | _workItemsQueue.Enqueue(workItem); |
477 | _stp.RegisterWorkItemsGroup(this); | 309 | //_stp.IncrementWorkItemsCount(); |
478 | Trace.WriteLine("WorkItemsGroup " + Name + " is NOT idle"); | 310 | |
311 | if ((1 == _workItemsQueue.Count) && | ||
312 | (0 == _workItemsInStpQueue)) | ||
313 | { | ||
314 | _stp.RegisterWorkItemsGroup(this); | ||
315 | IsIdle = false; | ||
479 | _isIdleWaitHandle.Reset(); | 316 | _isIdleWaitHandle.Reset(); |
480 | } | 317 | } |
481 | } | 318 | } |
482 | 319 | ||
483 | // If the work items queue of the group is empty than quit | 320 | // If the work items queue of the group is empty than quit |
484 | if (0 == _workItemsQueue.Count) | 321 | if (0 == _workItemsQueue.Count) |
485 | { | 322 | { |
486 | if (0 == _workItemsInStpQueue) | 323 | if (0 == _workItemsInStpQueue) |
487 | { | 324 | { |
488 | _stp.UnregisterWorkItemsGroup(this); | 325 | _stp.UnregisterWorkItemsGroup(this); |
489 | Trace.WriteLine("WorkItemsGroup " + Name + " is idle"); | 326 | IsIdle = true; |
490 | _isIdleWaitHandle.Set(); | 327 | _isIdleWaitHandle.Set(); |
491 | _stp.QueueWorkItem(new WorkItemCallback(this.FireOnIdle)); | 328 | if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0) |
492 | } | 329 | { |
493 | return; | 330 | _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle)); |
494 | } | 331 | } |
495 | 332 | } | |
496 | if (!_workItemsGroupStartInfo.StartSuspended) | 333 | return; |
497 | { | 334 | } |
498 | if (_workItemsInStpQueue < _concurrency) | 335 | |
499 | { | 336 | if (!_isSuspended) |
500 | WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem; | 337 | { |
501 | _stp.Enqueue(nextWorkItem, true); | 338 | if (_workItemsInStpQueue < _concurrency) |
502 | ++_workItemsInStpQueue; | 339 | { |
503 | } | 340 | WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem; |
504 | } | 341 | try |
505 | } | 342 | { |
506 | } | 343 | _stp.Enqueue(nextWorkItem); |
507 | 344 | } | |
508 | #endregion | 345 | catch (ObjectDisposedException e) |
346 | { | ||
347 | e.GetHashCode(); | ||
348 | // The STP has been shutdown | ||
349 | } | ||
350 | |||
351 | ++_workItemsInStpQueue; | ||
352 | } | ||
353 | } | ||
354 | } | ||
355 | } | ||
356 | |||
357 | #endregion | ||
509 | } | 358 | } |
510 | 359 | ||
511 | #endregion | 360 | #endregion |
512 | } | 361 | } |