diff options
Diffstat (limited to 'ThirdParty/SmartThreadPool/WorkItemsGroup.cs')
-rw-r--r-- | ThirdParty/SmartThreadPool/WorkItemsGroup.cs | 873 |
1 files changed, 361 insertions, 512 deletions
diff --git a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs index 01ac8dd..67dcbdd 100644 --- a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs +++ b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs | |||
@@ -1,512 +1,361 @@ | |||
1 | // Ami Bar | 1 | using System; |
2 | // amibar@gmail.com | 2 | using System.Threading; |
3 | 3 | using System.Runtime.CompilerServices; | |
4 | using System; | 4 | using System.Diagnostics; |
5 | using System.Threading; | 5 | |
6 | using System.Runtime.CompilerServices; | 6 | namespace Amib.Threading.Internal |
7 | using System.Diagnostics; | 7 | { |
8 | 8 | ||
9 | namespace Amib.Threading.Internal | 9 | #region WorkItemsGroup class |
10 | { | 10 | |
11 | #region WorkItemsGroup class | 11 | /// <summary> |
12 | 12 | /// Summary description for WorkItemsGroup. | |
13 | /// <summary> | 13 | /// </summary> |
14 | /// Summary description for WorkItemsGroup. | 14 | public class WorkItemsGroup : WorkItemsGroupBase |
15 | /// </summary> | 15 | { |
16 | public class WorkItemsGroup : IWorkItemsGroup | 16 | #region Private members |
17 | { | 17 | |
18 | #region Private members | 18 | private readonly object _lock = new object(); |
19 | 19 | ||
20 | private object _lock = new object(); | 20 | /// <summary> |
21 | /// <summary> | 21 | /// A reference to the SmartThreadPool instance that created this |
22 | /// Contains the name of this instance of SmartThreadPool. | 22 | /// WorkItemsGroup. |
23 | /// Can be changed by the user. | 23 | /// </summary> |
24 | /// </summary> | 24 | private readonly SmartThreadPool _stp; |
25 | private string _name = "WorkItemsGroup"; | 25 | |
26 | 26 | /// <summary> | |
27 | /// <summary> | 27 | /// The OnIdle event |
28 | /// A reference to the SmartThreadPool instance that created this | 28 | /// </summary> |
29 | /// WorkItemsGroup. | 29 | private event WorkItemsGroupIdleHandler _onIdle; |
30 | /// </summary> | 30 | |
31 | private SmartThreadPool _stp; | 31 | /// <summary> |
32 | 32 | /// A flag to indicate if the Work Items Group is now suspended. | |
33 | /// <summary> | 33 | /// </summary> |
34 | /// The OnIdle event | 34 | private bool _isSuspended; |
35 | /// </summary> | 35 | |
36 | private event WorkItemsGroupIdleHandler _onIdle; | 36 | /// <summary> |
37 | 37 | /// Defines how many work items of this WorkItemsGroup can run at once. | |
38 | /// <summary> | 38 | /// </summary> |
39 | /// Defines how many work items of this WorkItemsGroup can run at once. | 39 | private int _concurrency; |
40 | /// </summary> | 40 | |
41 | private int _concurrency; | 41 | /// <summary> |
42 | 42 | /// Priority queue to hold work items before they are passed | |
43 | /// <summary> | 43 | /// to the SmartThreadPool. |
44 | /// Priority queue to hold work items before they are passed | 44 | /// </summary> |
45 | /// to the SmartThreadPool. | 45 | private readonly PriorityQueue _workItemsQueue; |
46 | /// </summary> | 46 | |
47 | private PriorityQueue _workItemsQueue; | 47 | /// <summary> |
48 | 48 | /// Indicate how many work items are waiting in the SmartThreadPool | |
49 | /// <summary> | 49 | /// queue. |
50 | /// Indicate how many work items are waiting in the SmartThreadPool | 50 | /// This value is used to apply the concurrency. |
51 | /// queue. | 51 | /// </summary> |
52 | /// This value is used to apply the concurrency. | 52 | private int _workItemsInStpQueue; |
53 | /// </summary> | 53 | |
54 | private int _workItemsInStpQueue; | 54 | /// <summary> |
55 | 55 | /// Indicate how many work items are currently running in the SmartThreadPool. | |
56 | /// <summary> | 56 | /// This value is used with the Cancel, to calculate if we can send new |
57 | /// Indicate how many work items are currently running in the SmartThreadPool. | 57 | /// work items to the STP. |
58 | /// This value is used with the Cancel, to calculate if we can send new | 58 | /// </summary> |
59 | /// work items to the STP. | 59 | private int _workItemsExecutingInStp = 0; |
60 | /// </summary> | 60 | |
61 | private int _workItemsExecutingInStp = 0; | 61 | /// <summary> |
62 | 62 | /// WorkItemsGroup start information | |
63 | /// <summary> | 63 | /// </summary> |
64 | /// WorkItemsGroup start information | 64 | private readonly WIGStartInfo _workItemsGroupStartInfo; |
65 | /// </summary> | 65 | |
66 | private WIGStartInfo _workItemsGroupStartInfo; | 66 | /// <summary> |
67 | 67 | /// Signaled when all of the WorkItemsGroup's work item completed. | |
68 | /// <summary> | 68 | /// </summary> |
69 | /// Signaled when all of the WorkItemsGroup's work item completed. | 69 | //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); |
70 | /// </summary> | 70 | private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true); |
71 | private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); | 71 | |
72 | 72 | /// <summary> | |
73 | /// <summary> | 73 | /// A common object for all the work items that this work items group |
74 | /// A common object for all the work items that this work items group | 74 | /// generate so we can mark them to cancel in O(1) |
75 | /// generate so we can mark them to cancel in O(1) | 75 | /// </summary> |
76 | /// </summary> | 76 | private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); |
77 | private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); | 77 | |
78 | 78 | #endregion | |
79 | #endregion | 79 | |
80 | 80 | #region Construction | |
81 | #region Construction | 81 | |
82 | 82 | public WorkItemsGroup( | |
83 | public WorkItemsGroup( | 83 | SmartThreadPool stp, |
84 | SmartThreadPool stp, | 84 | int concurrency, |
85 | int concurrency, | 85 | WIGStartInfo wigStartInfo) |
86 | WIGStartInfo wigStartInfo) | 86 | { |
87 | { | 87 | if (concurrency <= 0) |
88 | if (concurrency <= 0) | 88 | { |
89 | { | 89 | throw new ArgumentOutOfRangeException( |
90 | throw new ArgumentOutOfRangeException("concurrency", concurrency, "concurrency must be greater than zero"); | 90 | "concurrency", |
91 | } | 91 | #if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE) |
92 | _stp = stp; | 92 | concurrency, |
93 | _concurrency = concurrency; | 93 | #endif |
94 | _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo); | 94 | "concurrency must be greater than zero"); |
95 | _workItemsQueue = new PriorityQueue(); | 95 | } |
96 | 96 | _stp = stp; | |
97 | // The _workItemsInStpQueue gets the number of currently executing work items, | 97 | _concurrency = concurrency; |
98 | // because once a work item is executing, it cannot be cancelled. | 98 | _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly(); |
99 | _workItemsInStpQueue = _workItemsExecutingInStp; | 99 | _workItemsQueue = new PriorityQueue(); |
100 | } | 100 | Name = "WorkItemsGroup"; |
101 | 101 | ||
102 | #endregion | 102 | // The _workItemsInStpQueue gets the number of currently executing work items, |
103 | 103 | // because once a work item is executing, it cannot be cancelled. | |
104 | #region IWorkItemsGroup implementation | 104 | _workItemsInStpQueue = _workItemsExecutingInStp; |
105 | 105 | ||
106 | /// <summary> | 106 | _isSuspended = _workItemsGroupStartInfo.StartSuspended; |
107 | /// Get/Set the name of the SmartThreadPool instance | 107 | } |
108 | /// </summary> | 108 | |
109 | public string Name | 109 | #endregion |
110 | { | 110 | |
111 | get | 111 | #region WorkItemsGroupBase Overrides |
112 | { | 112 | |
113 | return _name; | 113 | public override int Concurrency |
114 | } | 114 | { |
115 | 115 | get { return _concurrency; } | |
116 | set | 116 | set |
117 | { | 117 | { |
118 | _name = value; | 118 | Debug.Assert(value > 0); |
119 | } | 119 | |
120 | } | 120 | int diff = value - _concurrency; |
121 | 121 | _concurrency = value; | |
122 | /// <summary> | 122 | if (diff > 0) |
123 | /// Queue a work item | 123 | { |
124 | /// </summary> | 124 | EnqueueToSTPNextNWorkItem(diff); |
125 | /// <param name="callback">A callback to execute</param> | 125 | } |
126 | /// <returns>Returns a work item result</returns> | 126 | } |
127 | public IWorkItemResult QueueWorkItem(WorkItemCallback callback) | 127 | } |
128 | { | 128 | |
129 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback); | 129 | public override int WaitingCallbacks |
130 | EnqueueToSTPNextWorkItem(workItem); | 130 | { |
131 | return workItem.GetWorkItemResult(); | 131 | get { return _workItemsQueue.Count; } |
132 | } | 132 | } |
133 | 133 | ||
134 | /// <summary> | 134 | public override object[] GetStates() |
135 | /// Queue a work item | 135 | { |
136 | /// </summary> | 136 | lock (_lock) |
137 | /// <param name="callback">A callback to execute</param> | 137 | { |
138 | /// <param name="workItemPriority">The priority of the work item</param> | 138 | object[] states = new object[_workItemsQueue.Count]; |
139 | /// <returns>Returns a work item result</returns> | 139 | int i = 0; |
140 | public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority) | 140 | foreach (WorkItem workItem in _workItemsQueue) |
141 | { | 141 | { |
142 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, workItemPriority); | 142 | states[i] = workItem.GetWorkItemResult().State; |
143 | EnqueueToSTPNextWorkItem(workItem); | 143 | ++i; |
144 | return workItem.GetWorkItemResult(); | 144 | } |
145 | } | 145 | return states; |
146 | 146 | } | |
147 | /// <summary> | 147 | } |
148 | /// Queue a work item | 148 | |
149 | /// </summary> | 149 | /// <summary> |
150 | /// <param name="workItemInfo">Work item info</param> | 150 | /// WorkItemsGroup start information |
151 | /// <param name="callback">A callback to execute</param> | 151 | /// </summary> |
152 | /// <returns>Returns a work item result</returns> | 152 | public override WIGStartInfo WIGStartInfo |
153 | public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback) | 153 | { |
154 | { | 154 | get { return _workItemsGroupStartInfo; } |
155 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback); | 155 | } |
156 | EnqueueToSTPNextWorkItem(workItem); | 156 | |
157 | return workItem.GetWorkItemResult(); | 157 | /// <summary> |
158 | } | 158 | /// Start the Work Items Group if it was started suspended |
159 | 159 | /// </summary> | |
160 | /// <summary> | 160 | public override void Start() |
161 | /// Queue a work item | 161 | { |
162 | /// </summary> | 162 | // If the Work Items Group already started then quit |
163 | /// <param name="callback">A callback to execute</param> | 163 | if (!_isSuspended) |
164 | /// <param name="state"> | 164 | { |
165 | /// The context object of the work item. Used for passing arguments to the work item. | 165 | return; |
166 | /// </param> | 166 | } |
167 | /// <returns>Returns a work item result</returns> | 167 | _isSuspended = false; |
168 | public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state) | 168 | |
169 | { | 169 | EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency)); |
170 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state); | 170 | } |
171 | EnqueueToSTPNextWorkItem(workItem); | 171 | |
172 | return workItem.GetWorkItemResult(); | 172 | public override void Cancel(bool abortExecution) |
173 | } | 173 | { |
174 | 174 | lock (_lock) | |
175 | /// <summary> | 175 | { |
176 | /// Queue a work item | 176 | _canceledWorkItemsGroup.IsCanceled = true; |
177 | /// </summary> | 177 | _workItemsQueue.Clear(); |
178 | /// <param name="callback">A callback to execute</param> | 178 | _workItemsInStpQueue = 0; |
179 | /// <param name="state"> | 179 | _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); |
180 | /// The context object of the work item. Used for passing arguments to the work item. | 180 | } |
181 | /// </param> | 181 | |
182 | /// <param name="workItemPriority">The work item priority</param> | 182 | if (abortExecution) |
183 | /// <returns>Returns a work item result</returns> | 183 | { |
184 | public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority) | 184 | _stp.CancelAbortWorkItemsGroup(this); |
185 | { | 185 | } |
186 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, workItemPriority); | 186 | } |
187 | EnqueueToSTPNextWorkItem(workItem); | 187 | |
188 | return workItem.GetWorkItemResult(); | 188 | /// <summary> |
189 | } | 189 | /// Wait for the thread pool to be idle |
190 | 190 | /// </summary> | |
191 | /// <summary> | 191 | public override bool WaitForIdle(int millisecondsTimeout) |
192 | /// Queue a work item | 192 | { |
193 | /// </summary> | 193 | SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this); |
194 | /// <param name="workItemInfo">Work item information</param> | 194 | return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false); |
195 | /// <param name="callback">A callback to execute</param> | 195 | } |
196 | /// <param name="state"> | 196 | |
197 | /// The context object of the work item. Used for passing arguments to the work item. | 197 | public override event WorkItemsGroupIdleHandler OnIdle |
198 | /// </param> | 198 | { |
199 | /// <returns>Returns a work item result</returns> | 199 | add { _onIdle += value; } |
200 | public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state) | 200 | remove { _onIdle -= value; } |
201 | { | 201 | } |
202 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback, state); | 202 | |
203 | EnqueueToSTPNextWorkItem(workItem); | 203 | #endregion |
204 | return workItem.GetWorkItemResult(); | 204 | |
205 | } | 205 | #region Private methods |
206 | 206 | ||
207 | /// <summary> | 207 | private void RegisterToWorkItemCompletion(IWorkItemResult wir) |
208 | /// Queue a work item | 208 | { |
209 | /// </summary> | 209 | IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir; |
210 | /// <param name="callback">A callback to execute</param> | 210 | iwir.OnWorkItemStarted += OnWorkItemStartedCallback; |
211 | /// <param name="state"> | 211 | iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback; |
212 | /// The context object of the work item. Used for passing arguments to the work item. | 212 | } |
213 | /// </param> | 213 | |
214 | /// <param name="postExecuteWorkItemCallback"> | 214 | public void OnSTPIsStarting() |
215 | /// A delegate to call after the callback completion | 215 | { |
216 | /// </param> | 216 | if (_isSuspended) |
217 | /// <returns>Returns a work item result</returns> | 217 | { |
218 | public IWorkItemResult QueueWorkItem( | 218 | return; |
219 | WorkItemCallback callback, | 219 | } |
220 | object state, | 220 | |
221 | PostExecuteWorkItemCallback postExecuteWorkItemCallback) | 221 | EnqueueToSTPNextNWorkItem(_concurrency); |
222 | { | 222 | } |
223 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback); | 223 | |
224 | EnqueueToSTPNextWorkItem(workItem); | 224 | public void EnqueueToSTPNextNWorkItem(int count) |
225 | return workItem.GetWorkItemResult(); | 225 | { |
226 | } | 226 | for (int i = 0; i < count; ++i) |
227 | 227 | { | |
228 | /// <summary> | 228 | EnqueueToSTPNextWorkItem(null, false); |
229 | /// Queue a work item | 229 | } |
230 | /// </summary> | 230 | } |
231 | /// <param name="callback">A callback to execute</param> | 231 | |
232 | /// <param name="state"> | 232 | private object FireOnIdle(object state) |
233 | /// The context object of the work item. Used for passing arguments to the work item. | 233 | { |
234 | /// </param> | 234 | FireOnIdleImpl(_onIdle); |
235 | /// <param name="postExecuteWorkItemCallback"> | 235 | return null; |
236 | /// A delegate to call after the callback completion | 236 | } |
237 | /// </param> | 237 | |
238 | /// <param name="workItemPriority">The work item priority</param> | 238 | [MethodImpl(MethodImplOptions.NoInlining)] |
239 | /// <returns>Returns a work item result</returns> | 239 | private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle) |
240 | public IWorkItemResult QueueWorkItem( | 240 | { |
241 | WorkItemCallback callback, | 241 | if(null == onIdle) |
242 | object state, | 242 | { |
243 | PostExecuteWorkItemCallback postExecuteWorkItemCallback, | 243 | return; |
244 | WorkItemPriority workItemPriority) | 244 | } |
245 | { | 245 | |
246 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority); | 246 | Delegate[] delegates = onIdle.GetInvocationList(); |
247 | EnqueueToSTPNextWorkItem(workItem); | 247 | foreach(WorkItemsGroupIdleHandler eh in delegates) |
248 | return workItem.GetWorkItemResult(); | 248 | { |
249 | } | 249 | try |
250 | 250 | { | |
251 | /// <summary> | 251 | eh(this); |
252 | /// Queue a work item | 252 | } |
253 | /// </summary> | 253 | catch { } // Suppress exceptions |
254 | /// <param name="callback">A callback to execute</param> | 254 | } |
255 | /// <param name="state"> | 255 | } |
256 | /// The context object of the work item. Used for passing arguments to the work item. | 256 | |
257 | /// </param> | 257 | private void OnWorkItemStartedCallback(WorkItem workItem) |
258 | /// <param name="postExecuteWorkItemCallback"> | 258 | { |
259 | /// A delegate to call after the callback completion | 259 | lock(_lock) |
260 | /// </param> | 260 | { |
261 | /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param> | 261 | ++_workItemsExecutingInStp; |
262 | /// <returns>Returns a work item result</returns> | 262 | } |
263 | public IWorkItemResult QueueWorkItem( | 263 | } |
264 | WorkItemCallback callback, | 264 | |
265 | object state, | 265 | private void OnWorkItemCompletedCallback(WorkItem workItem) |
266 | PostExecuteWorkItemCallback postExecuteWorkItemCallback, | 266 | { |
267 | CallToPostExecute callToPostExecute) | 267 | EnqueueToSTPNextWorkItem(null, true); |
268 | { | 268 | } |
269 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute); | 269 | |
270 | EnqueueToSTPNextWorkItem(workItem); | 270 | internal override void Enqueue(WorkItem workItem) |
271 | return workItem.GetWorkItemResult(); | 271 | { |
272 | } | 272 | EnqueueToSTPNextWorkItem(workItem); |
273 | 273 | } | |
274 | /// <summary> | 274 | |
275 | /// Queue a work item | 275 | private void EnqueueToSTPNextWorkItem(WorkItem workItem) |
276 | /// </summary> | 276 | { |
277 | /// <param name="callback">A callback to execute</param> | 277 | EnqueueToSTPNextWorkItem(workItem, false); |
278 | /// <param name="state"> | 278 | } |
279 | /// The context object of the work item. Used for passing arguments to the work item. | 279 | |
280 | /// </param> | 280 | private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue) |
281 | /// <param name="postExecuteWorkItemCallback"> | 281 | { |
282 | /// A delegate to call after the callback completion | 282 | lock(_lock) |
283 | /// </param> | 283 | { |
284 | /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param> | 284 | // Got here from OnWorkItemCompletedCallback() |
285 | /// <param name="workItemPriority">The work item priority</param> | 285 | if (decrementWorkItemsInStpQueue) |
286 | /// <returns>Returns a work item result</returns> | 286 | { |
287 | public IWorkItemResult QueueWorkItem( | 287 | --_workItemsInStpQueue; |
288 | WorkItemCallback callback, | 288 | |
289 | object state, | 289 | if(_workItemsInStpQueue < 0) |
290 | PostExecuteWorkItemCallback postExecuteWorkItemCallback, | 290 | { |
291 | CallToPostExecute callToPostExecute, | 291 | _workItemsInStpQueue = 0; |
292 | WorkItemPriority workItemPriority) | 292 | } |
293 | { | 293 | |
294 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority); | 294 | --_workItemsExecutingInStp; |
295 | EnqueueToSTPNextWorkItem(workItem); | 295 | |
296 | return workItem.GetWorkItemResult(); | 296 | if(_workItemsExecutingInStp < 0) |
297 | } | 297 | { |
298 | 298 | _workItemsExecutingInStp = 0; | |
299 | /// <summary> | 299 | } |
300 | /// Wait for the thread pool to be idle | 300 | } |
301 | /// </summary> | 301 | |
302 | public void WaitForIdle() | 302 | // If the work item is not null then enqueue it |
303 | { | 303 | if (null != workItem) |
304 | WaitForIdle(Timeout.Infinite); | 304 | { |
305 | } | 305 | workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup; |
306 | 306 | ||
307 | /// <summary> | 307 | RegisterToWorkItemCompletion(workItem.GetWorkItemResult()); |
308 | /// Wait for the thread pool to be idle | 308 | _workItemsQueue.Enqueue(workItem); |
309 | /// </summary> | 309 | //_stp.IncrementWorkItemsCount(); |
310 | public bool WaitForIdle(TimeSpan timeout) | 310 | |
311 | { | 311 | if ((1 == _workItemsQueue.Count) && |
312 | return WaitForIdle((int)timeout.TotalMilliseconds); | 312 | (0 == _workItemsInStpQueue)) |
313 | } | 313 | { |
314 | 314 | _stp.RegisterWorkItemsGroup(this); | |
315 | /// <summary> | 315 | IsIdle = false; |
316 | /// Wait for the thread pool to be idle | 316 | _isIdleWaitHandle.Reset(); |
317 | /// </summary> | 317 | } |
318 | public bool WaitForIdle(int millisecondsTimeout) | 318 | } |
319 | { | 319 | |
320 | _stp.ValidateWorkItemsGroupWaitForIdle(this); | 320 | // If the work items queue of the group is empty than quit |
321 | return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false); | 321 | if (0 == _workItemsQueue.Count) |
322 | } | 322 | { |
323 | 323 | if (0 == _workItemsInStpQueue) | |
324 | public int WaitingCallbacks | 324 | { |
325 | { | 325 | _stp.UnregisterWorkItemsGroup(this); |
326 | get | 326 | IsIdle = true; |
327 | { | 327 | _isIdleWaitHandle.Set(); |
328 | return _workItemsQueue.Count; | 328 | if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0) |
329 | } | 329 | { |
330 | } | 330 | _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle)); |
331 | 331 | } | |
332 | public event WorkItemsGroupIdleHandler OnIdle | 332 | } |
333 | { | 333 | return; |
334 | add | 334 | } |
335 | { | 335 | |
336 | _onIdle += value; | 336 | if (!_isSuspended) |
337 | } | 337 | { |
338 | remove | 338 | if (_workItemsInStpQueue < _concurrency) |
339 | { | 339 | { |
340 | _onIdle -= value; | 340 | WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem; |
341 | } | 341 | try |
342 | } | 342 | { |
343 | 343 | _stp.Enqueue(nextWorkItem); | |
344 | public void Cancel() | 344 | } |
345 | { | 345 | catch (ObjectDisposedException e) |
346 | lock(_lock) | 346 | { |
347 | { | 347 | e.GetHashCode(); |
348 | _canceledWorkItemsGroup.IsCanceled = true; | 348 | // The STP has been shutdown |
349 | _workItemsQueue.Clear(); | 349 | } |
350 | _workItemsInStpQueue = 0; | 350 | |
351 | _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); | 351 | ++_workItemsInStpQueue; |
352 | } | 352 | } |
353 | } | 353 | } |
354 | 354 | } | |
355 | public void Start() | 355 | } |
356 | { | 356 | |
357 | lock (this) | 357 | #endregion |
358 | { | 358 | } |
359 | if (!_workItemsGroupStartInfo.StartSuspended) | 359 | |
360 | { | 360 | #endregion |
361 | return; | 361 | } |
362 | } | ||
363 | _workItemsGroupStartInfo.StartSuspended = false; | ||
364 | } | ||
365 | |||
366 | for(int i = 0; i < _concurrency; ++i) | ||
367 | { | ||
368 | EnqueueToSTPNextWorkItem(null, false); | ||
369 | } | ||
370 | } | ||
371 | |||
372 | #endregion | ||
373 | |||
374 | #region Private methods | ||
375 | |||
376 | private void RegisterToWorkItemCompletion(IWorkItemResult wir) | ||
377 | { | ||
378 | IInternalWorkItemResult iwir = wir as IInternalWorkItemResult; | ||
379 | iwir.OnWorkItemStarted += new WorkItemStateCallback(OnWorkItemStartedCallback); | ||
380 | iwir.OnWorkItemCompleted += new WorkItemStateCallback(OnWorkItemCompletedCallback); | ||
381 | } | ||
382 | |||
383 | public void OnSTPIsStarting() | ||
384 | { | ||
385 | lock (this) | ||
386 | { | ||
387 | if (_workItemsGroupStartInfo.StartSuspended) | ||
388 | { | ||
389 | return; | ||
390 | } | ||
391 | } | ||
392 | |||
393 | for(int i = 0; i < _concurrency; ++i) | ||
394 | { | ||
395 | EnqueueToSTPNextWorkItem(null, false); | ||
396 | } | ||
397 | } | ||
398 | |||
399 | private object FireOnIdle(object state) | ||
400 | { | ||
401 | FireOnIdleImpl(_onIdle); | ||
402 | return null; | ||
403 | } | ||
404 | |||
405 | [MethodImpl(MethodImplOptions.NoInlining)] | ||
406 | private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle) | ||
407 | { | ||
408 | if(null == onIdle) | ||
409 | { | ||
410 | return; | ||
411 | } | ||
412 | |||
413 | Delegate[] delegates = onIdle.GetInvocationList(); | ||
414 | foreach(WorkItemsGroupIdleHandler eh in delegates) | ||
415 | { | ||
416 | try | ||
417 | { | ||
418 | eh(this); | ||
419 | } | ||
420 | // Ignore exceptions | ||
421 | catch{} | ||
422 | } | ||
423 | } | ||
424 | |||
425 | private void OnWorkItemStartedCallback(WorkItem workItem) | ||
426 | { | ||
427 | lock(_lock) | ||
428 | { | ||
429 | ++_workItemsExecutingInStp; | ||
430 | } | ||
431 | } | ||
432 | |||
433 | private void OnWorkItemCompletedCallback(WorkItem workItem) | ||
434 | { | ||
435 | EnqueueToSTPNextWorkItem(null, true); | ||
436 | } | ||
437 | |||
438 | private void EnqueueToSTPNextWorkItem(WorkItem workItem) | ||
439 | { | ||
440 | EnqueueToSTPNextWorkItem(workItem, false); | ||
441 | } | ||
442 | |||
443 | private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue) | ||
444 | { | ||
445 | lock(_lock) | ||
446 | { | ||
447 | // Got here from OnWorkItemCompletedCallback() | ||
448 | if (decrementWorkItemsInStpQueue) | ||
449 | { | ||
450 | --_workItemsInStpQueue; | ||
451 | |||
452 | if(_workItemsInStpQueue < 0) | ||
453 | { | ||
454 | _workItemsInStpQueue = 0; | ||
455 | } | ||
456 | |||
457 | --_workItemsExecutingInStp; | ||
458 | |||
459 | if(_workItemsExecutingInStp < 0) | ||
460 | { | ||
461 | _workItemsExecutingInStp = 0; | ||
462 | } | ||
463 | } | ||
464 | |||
465 | // If the work item is not null then enqueue it | ||
466 | if (null != workItem) | ||
467 | { | ||
468 | workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup; | ||
469 | |||
470 | RegisterToWorkItemCompletion(workItem.GetWorkItemResult()); | ||
471 | _workItemsQueue.Enqueue(workItem); | ||
472 | //_stp.IncrementWorkItemsCount(); | ||
473 | |||
474 | if ((1 == _workItemsQueue.Count) && | ||
475 | (0 == _workItemsInStpQueue)) | ||
476 | { | ||
477 | _stp.RegisterWorkItemsGroup(this); | ||
478 | Trace.WriteLine("WorkItemsGroup " + Name + " is NOT idle"); | ||
479 | _isIdleWaitHandle.Reset(); | ||
480 | } | ||
481 | } | ||
482 | |||
483 | // If the work items queue of the group is empty than quit | ||
484 | if (0 == _workItemsQueue.Count) | ||
485 | { | ||
486 | if (0 == _workItemsInStpQueue) | ||
487 | { | ||
488 | _stp.UnregisterWorkItemsGroup(this); | ||
489 | Trace.WriteLine("WorkItemsGroup " + Name + " is idle"); | ||
490 | _isIdleWaitHandle.Set(); | ||
491 | _stp.QueueWorkItem(new WorkItemCallback(this.FireOnIdle)); | ||
492 | } | ||
493 | return; | ||
494 | } | ||
495 | |||
496 | if (!_workItemsGroupStartInfo.StartSuspended) | ||
497 | { | ||
498 | if (_workItemsInStpQueue < _concurrency) | ||
499 | { | ||
500 | WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem; | ||
501 | _stp.Enqueue(nextWorkItem, true); | ||
502 | ++_workItemsInStpQueue; | ||
503 | } | ||
504 | } | ||
505 | } | ||
506 | } | ||
507 | |||
508 | #endregion | ||
509 | } | ||
510 | |||
511 | #endregion | ||
512 | } | ||