diff options
author | Justin Clark-Casey (justincc) | 2013-05-01 19:01:43 +0100 |
---|---|---|
committer | Justin Clark-Casey (justincc) | 2013-05-01 19:01:43 +0100 |
commit | 206fb306a7820cf593570e35ddfa8e7c5a10e449 (patch) | |
tree | 0ef0fdf42ddc0b63224af52b62b0bad42f62e352 /ThirdParty/SmartThreadPool/WorkItemsGroup.cs | |
parent | Fix CAPS to work like they should - do not send caps to the viewer if they're... (diff) | |
download | opensim-SC_OLD-206fb306a7820cf593570e35ddfa8e7c5a10e449.zip opensim-SC_OLD-206fb306a7820cf593570e35ddfa8e7c5a10e449.tar.gz opensim-SC_OLD-206fb306a7820cf593570e35ddfa8e7c5a10e449.tar.bz2 opensim-SC_OLD-206fb306a7820cf593570e35ddfa8e7c5a10e449.tar.xz |
Update SmartThreadPool to latest version 2.2.3 with a major and minor change.
SmartThreadPool code comes from http://www.codeproject.com/Articles/7933/Smart-Thread-Pool
This version implements thread abort (via WorkItem.Cancel(true)), threadpool naming, max thread stack, etc. so we no longer need to manually patch those.
However, two changes have been made to stock 2.2.3.
Major change: WorkItem.Cancel(bool abortExecution) in our version does not succeed if the work item was in progress and thread abort was not specified.
This is to match previous behaviour where we handle co-operative termination via another mechanism rather than checking WorkItem.IsCanceled.
Minor change: Did not add STP's StopWatch implementation as this is only used WinCE and Silverlight and causes a build clash with System.Diagnostics.StopWatch
The reason for updating is to see if this improves http://opensimulator.org/mantis/view.php?id=6557 and http://opensimulator.org/mantis/view.php?id=6586
Diffstat (limited to 'ThirdParty/SmartThreadPool/WorkItemsGroup.cs')
-rw-r--r-- | ThirdParty/SmartThreadPool/WorkItemsGroup.cs | 873 |
1 files changed, 361 insertions, 512 deletions
diff --git a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs index 01ac8dd..67dcbdd 100644 --- a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs +++ b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs | |||
@@ -1,512 +1,361 @@ | |||
1 | // Ami Bar | 1 | using System; |
2 | // amibar@gmail.com | 2 | using System.Threading; |
3 | 3 | using System.Runtime.CompilerServices; | |
4 | using System; | 4 | using System.Diagnostics; |
5 | using System.Threading; | 5 | |
6 | using System.Runtime.CompilerServices; | 6 | namespace Amib.Threading.Internal |
7 | using System.Diagnostics; | 7 | { |
8 | 8 | ||
9 | namespace Amib.Threading.Internal | 9 | #region WorkItemsGroup class |
10 | { | 10 | |
11 | #region WorkItemsGroup class | 11 | /// <summary> |
12 | 12 | /// Summary description for WorkItemsGroup. | |
13 | /// <summary> | 13 | /// </summary> |
14 | /// Summary description for WorkItemsGroup. | 14 | public class WorkItemsGroup : WorkItemsGroupBase |
15 | /// </summary> | 15 | { |
16 | public class WorkItemsGroup : IWorkItemsGroup | 16 | #region Private members |
17 | { | 17 | |
18 | #region Private members | 18 | private readonly object _lock = new object(); |
19 | 19 | ||
20 | private object _lock = new object(); | 20 | /// <summary> |
21 | /// <summary> | 21 | /// A reference to the SmartThreadPool instance that created this |
22 | /// Contains the name of this instance of SmartThreadPool. | 22 | /// WorkItemsGroup. |
23 | /// Can be changed by the user. | 23 | /// </summary> |
24 | /// </summary> | 24 | private readonly SmartThreadPool _stp; |
25 | private string _name = "WorkItemsGroup"; | 25 | |
26 | 26 | /// <summary> | |
27 | /// <summary> | 27 | /// The OnIdle event |
28 | /// A reference to the SmartThreadPool instance that created this | 28 | /// </summary> |
29 | /// WorkItemsGroup. | 29 | private event WorkItemsGroupIdleHandler _onIdle; |
30 | /// </summary> | 30 | |
31 | private SmartThreadPool _stp; | 31 | /// <summary> |
32 | 32 | /// A flag to indicate if the Work Items Group is now suspended. | |
33 | /// <summary> | 33 | /// </summary> |
34 | /// The OnIdle event | 34 | private bool _isSuspended; |
35 | /// </summary> | 35 | |
36 | private event WorkItemsGroupIdleHandler _onIdle; | 36 | /// <summary> |
37 | 37 | /// Defines how many work items of this WorkItemsGroup can run at once. | |
38 | /// <summary> | 38 | /// </summary> |
39 | /// Defines how many work items of this WorkItemsGroup can run at once. | 39 | private int _concurrency; |
40 | /// </summary> | 40 | |
41 | private int _concurrency; | 41 | /// <summary> |
42 | 42 | /// Priority queue to hold work items before they are passed | |
43 | /// <summary> | 43 | /// to the SmartThreadPool. |
44 | /// Priority queue to hold work items before they are passed | 44 | /// </summary> |
45 | /// to the SmartThreadPool. | 45 | private readonly PriorityQueue _workItemsQueue; |
46 | /// </summary> | 46 | |
47 | private PriorityQueue _workItemsQueue; | 47 | /// <summary> |
48 | 48 | /// Indicate how many work items are waiting in the SmartThreadPool | |
49 | /// <summary> | 49 | /// queue. |
50 | /// Indicate how many work items are waiting in the SmartThreadPool | 50 | /// This value is used to apply the concurrency. |
51 | /// queue. | 51 | /// </summary> |
52 | /// This value is used to apply the concurrency. | 52 | private int _workItemsInStpQueue; |
53 | /// </summary> | 53 | |
54 | private int _workItemsInStpQueue; | 54 | /// <summary> |
55 | 55 | /// Indicate how many work items are currently running in the SmartThreadPool. | |
56 | /// <summary> | 56 | /// This value is used with the Cancel, to calculate if we can send new |
57 | /// Indicate how many work items are currently running in the SmartThreadPool. | 57 | /// work items to the STP. |
58 | /// This value is used with the Cancel, to calculate if we can send new | 58 | /// </summary> |
59 | /// work items to the STP. | 59 | private int _workItemsExecutingInStp = 0; |
60 | /// </summary> | 60 | |
61 | private int _workItemsExecutingInStp = 0; | 61 | /// <summary> |
62 | 62 | /// WorkItemsGroup start information | |
63 | /// <summary> | 63 | /// </summary> |
64 | /// WorkItemsGroup start information | 64 | private readonly WIGStartInfo _workItemsGroupStartInfo; |
65 | /// </summary> | 65 | |
66 | private WIGStartInfo _workItemsGroupStartInfo; | 66 | /// <summary> |
67 | 67 | /// Signaled when all of the WorkItemsGroup's work item completed. | |
68 | /// <summary> | 68 | /// </summary> |
69 | /// Signaled when all of the WorkItemsGroup's work item completed. | 69 | //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); |
70 | /// </summary> | 70 | private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true); |
71 | private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); | 71 | |
72 | 72 | /// <summary> | |
73 | /// <summary> | 73 | /// A common object for all the work items that this work items group |
74 | /// A common object for all the work items that this work items group | 74 | /// generate so we can mark them to cancel in O(1) |
75 | /// generate so we can mark them to cancel in O(1) | 75 | /// </summary> |
76 | /// </summary> | 76 | private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); |
77 | private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); | 77 | |
78 | 78 | #endregion | |
79 | #endregion | 79 | |
80 | 80 | #region Construction | |
81 | #region Construction | 81 | |
82 | 82 | public WorkItemsGroup( | |
83 | public WorkItemsGroup( | 83 | SmartThreadPool stp, |
84 | SmartThreadPool stp, | 84 | int concurrency, |
85 | int concurrency, | 85 | WIGStartInfo wigStartInfo) |
86 | WIGStartInfo wigStartInfo) | 86 | { |
87 | { | 87 | if (concurrency <= 0) |
88 | if (concurrency <= 0) | 88 | { |
89 | { | 89 | throw new ArgumentOutOfRangeException( |
90 | throw new ArgumentOutOfRangeException("concurrency", concurrency, "concurrency must be greater than zero"); | 90 | "concurrency", |
91 | } | 91 | #if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE) |
92 | _stp = stp; | 92 | concurrency, |
93 | _concurrency = concurrency; | 93 | #endif |
94 | _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo); | 94 | "concurrency must be greater than zero"); |
95 | _workItemsQueue = new PriorityQueue(); | 95 | } |
96 | 96 | _stp = stp; | |
97 | // The _workItemsInStpQueue gets the number of currently executing work items, | 97 | _concurrency = concurrency; |
98 | // because once a work item is executing, it cannot be cancelled. | 98 | _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly(); |
99 | _workItemsInStpQueue = _workItemsExecutingInStp; | 99 | _workItemsQueue = new PriorityQueue(); |
100 | } | 100 | Name = "WorkItemsGroup"; |
101 | 101 | ||
102 | #endregion | 102 | // The _workItemsInStpQueue gets the number of currently executing work items, |
103 | 103 | // because once a work item is executing, it cannot be cancelled. | |
104 | #region IWorkItemsGroup implementation | 104 | _workItemsInStpQueue = _workItemsExecutingInStp; |
105 | 105 | ||
106 | /// <summary> | 106 | _isSuspended = _workItemsGroupStartInfo.StartSuspended; |
107 | /// Get/Set the name of the SmartThreadPool instance | 107 | } |
108 | /// </summary> | 108 | |
109 | public string Name | 109 | #endregion |
110 | { | 110 | |
111 | get | 111 | #region WorkItemsGroupBase Overrides |
112 | { | 112 | |
113 | return _name; | 113 | public override int Concurrency |
114 | } | 114 | { |
115 | 115 | get { return _concurrency; } | |
116 | set | 116 | set |
117 | { | 117 | { |
118 | _name = value; | 118 | Debug.Assert(value > 0); |
119 | } | 119 | |
120 | } | 120 | int diff = value - _concurrency; |
121 | 121 | _concurrency = value; | |
122 | /// <summary> | 122 | if (diff > 0) |
123 | /// Queue a work item | 123 | { |
124 | /// </summary> | 124 | EnqueueToSTPNextNWorkItem(diff); |
125 | /// <param name="callback">A callback to execute</param> | 125 | } |
126 | /// <returns>Returns a work item result</returns> | 126 | } |
127 | public IWorkItemResult QueueWorkItem(WorkItemCallback callback) | 127 | } |
128 | { | 128 | |
129 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback); | 129 | public override int WaitingCallbacks |
130 | EnqueueToSTPNextWorkItem(workItem); | 130 | { |
131 | return workItem.GetWorkItemResult(); | 131 | get { return _workItemsQueue.Count; } |
132 | } | 132 | } |
133 | 133 | ||
134 | /// <summary> | 134 | public override object[] GetStates() |
135 | /// Queue a work item | 135 | { |
136 | /// </summary> | 136 | lock (_lock) |
137 | /// <param name="callback">A callback to execute</param> | 137 | { |
138 | /// <param name="workItemPriority">The priority of the work item</param> | 138 | object[] states = new object[_workItemsQueue.Count]; |
139 | /// <returns>Returns a work item result</returns> | 139 | int i = 0; |
140 | public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority) | 140 | foreach (WorkItem workItem in _workItemsQueue) |
141 | { | 141 | { |
142 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, workItemPriority); | 142 | states[i] = workItem.GetWorkItemResult().State; |
143 | EnqueueToSTPNextWorkItem(workItem); | 143 | ++i; |
144 | return workItem.GetWorkItemResult(); | 144 | } |
145 | } | 145 | return states; |
146 | 146 | } | |
147 | /// <summary> | 147 | } |
148 | /// Queue a work item | 148 | |
149 | /// </summary> | 149 | /// <summary> |
150 | /// <param name="workItemInfo">Work item info</param> | 150 | /// WorkItemsGroup start information |
151 | /// <param name="callback">A callback to execute</param> | 151 | /// </summary> |
152 | /// <returns>Returns a work item result</returns> | 152 | public override WIGStartInfo WIGStartInfo |
153 | public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback) | 153 | { |
154 | { | 154 | get { return _workItemsGroupStartInfo; } |
155 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback); | 155 | } |
156 | EnqueueToSTPNextWorkItem(workItem); | 156 | |
157 | return workItem.GetWorkItemResult(); | 157 | /// <summary> |
158 | } | 158 | /// Start the Work Items Group if it was started suspended |
159 | 159 | /// </summary> | |
160 | /// <summary> | 160 | public override void Start() |
161 | /// Queue a work item | 161 | { |
162 | /// </summary> | 162 | // If the Work Items Group already started then quit |
163 | /// <param name="callback">A callback to execute</param> | 163 | if (!_isSuspended) |
164 | /// <param name="state"> | 164 | { |
165 | /// The context object of the work item. Used for passing arguments to the work item. | 165 | return; |
166 | /// </param> | 166 | } |
167 | /// <returns>Returns a work item result</returns> | 167 | _isSuspended = false; |
168 | public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state) | 168 | |
169 | { | 169 | EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency)); |
170 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state); | 170 | } |
171 | EnqueueToSTPNextWorkItem(workItem); | 171 | |
172 | return workItem.GetWorkItemResult(); | 172 | public override void Cancel(bool abortExecution) |
173 | } | 173 | { |
174 | 174 | lock (_lock) | |
175 | /// <summary> | 175 | { |
176 | /// Queue a work item | 176 | _canceledWorkItemsGroup.IsCanceled = true; |
177 | /// </summary> | 177 | _workItemsQueue.Clear(); |
178 | /// <param name="callback">A callback to execute</param> | 178 | _workItemsInStpQueue = 0; |
179 | /// <param name="state"> | 179 | _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); |
180 | /// The context object of the work item. Used for passing arguments to the work item. | 180 | } |
181 | /// </param> | 181 | |
182 | /// <param name="workItemPriority">The work item priority</param> | 182 | if (abortExecution) |
183 | /// <returns>Returns a work item result</returns> | 183 | { |
184 | public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority) | 184 | _stp.CancelAbortWorkItemsGroup(this); |
185 | { | 185 | } |
186 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, workItemPriority); | 186 | } |
187 | EnqueueToSTPNextWorkItem(workItem); | 187 | |
188 | return workItem.GetWorkItemResult(); | 188 | /// <summary> |
189 | } | 189 | /// Wait for the thread pool to be idle |
190 | 190 | /// </summary> | |
191 | /// <summary> | 191 | public override bool WaitForIdle(int millisecondsTimeout) |
192 | /// Queue a work item | 192 | { |
193 | /// </summary> | 193 | SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this); |
194 | /// <param name="workItemInfo">Work item information</param> | 194 | return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false); |
195 | /// <param name="callback">A callback to execute</param> | 195 | } |
196 | /// <param name="state"> | 196 | |
197 | /// The context object of the work item. Used for passing arguments to the work item. | 197 | public override event WorkItemsGroupIdleHandler OnIdle |
198 | /// </param> | 198 | { |
199 | /// <returns>Returns a work item result</returns> | 199 | add { _onIdle += value; } |
200 | public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state) | 200 | remove { _onIdle -= value; } |
201 | { | 201 | } |
202 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback, state); | 202 | |
203 | EnqueueToSTPNextWorkItem(workItem); | 203 | #endregion |
204 | return workItem.GetWorkItemResult(); | 204 | |
205 | } | 205 | #region Private methods |
206 | 206 | ||
207 | /// <summary> | 207 | private void RegisterToWorkItemCompletion(IWorkItemResult wir) |
208 | /// Queue a work item | 208 | { |
209 | /// </summary> | 209 | IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir; |
210 | /// <param name="callback">A callback to execute</param> | 210 | iwir.OnWorkItemStarted += OnWorkItemStartedCallback; |
211 | /// <param name="state"> | 211 | iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback; |
212 | /// The context object of the work item. Used for passing arguments to the work item. | 212 | } |
213 | /// </param> | 213 | |
214 | /// <param name="postExecuteWorkItemCallback"> | 214 | public void OnSTPIsStarting() |
215 | /// A delegate to call after the callback completion | 215 | { |
216 | /// </param> | 216 | if (_isSuspended) |
217 | /// <returns>Returns a work item result</returns> | 217 | { |
218 | public IWorkItemResult QueueWorkItem( | 218 | return; |
219 | WorkItemCallback callback, | 219 | } |
220 | object state, | 220 | |
221 | PostExecuteWorkItemCallback postExecuteWorkItemCallback) | 221 | EnqueueToSTPNextNWorkItem(_concurrency); |
222 | { | 222 | } |
223 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback); | 223 | |
224 | EnqueueToSTPNextWorkItem(workItem); | 224 | public void EnqueueToSTPNextNWorkItem(int count) |
225 | return workItem.GetWorkItemResult(); | 225 | { |
226 | } | 226 | for (int i = 0; i < count; ++i) |
227 | 227 | { | |
228 | /// <summary> | 228 | EnqueueToSTPNextWorkItem(null, false); |
229 | /// Queue a work item | 229 | } |
230 | /// </summary> | 230 | } |
231 | /// <param name="callback">A callback to execute</param> | 231 | |
232 | /// <param name="state"> | 232 | private object FireOnIdle(object state) |
233 | /// The context object of the work item. Used for passing arguments to the work item. | 233 | { |
234 | /// </param> | 234 | FireOnIdleImpl(_onIdle); |
235 | /// <param name="postExecuteWorkItemCallback"> | 235 | return null; |
236 | /// A delegate to call after the callback completion | 236 | } |
237 | /// </param> | 237 | |
238 | /// <param name="workItemPriority">The work item priority</param> | 238 | [MethodImpl(MethodImplOptions.NoInlining)] |
239 | /// <returns>Returns a work item result</returns> | 239 | private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle) |
240 | public IWorkItemResult QueueWorkItem( | 240 | { |
241 | WorkItemCallback callback, | 241 | if(null == onIdle) |
242 | object state, | 242 | { |
243 | PostExecuteWorkItemCallback postExecuteWorkItemCallback, | 243 | return; |
244 | WorkItemPriority workItemPriority) | 244 | } |
245 | { | 245 | |
246 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority); | 246 | Delegate[] delegates = onIdle.GetInvocationList(); |
247 | EnqueueToSTPNextWorkItem(workItem); | 247 | foreach(WorkItemsGroupIdleHandler eh in delegates) |
248 | return workItem.GetWorkItemResult(); | 248 | { |
249 | } | 249 | try |
250 | 250 | { | |
251 | /// <summary> | 251 | eh(this); |
252 | /// Queue a work item | 252 | } |
253 | /// </summary> | 253 | catch { } // Suppress exceptions |
254 | /// <param name="callback">A callback to execute</param> | 254 | } |
255 | /// <param name="state"> | 255 | } |
256 | /// The context object of the work item. Used for passing arguments to the work item. | 256 | |
257 | /// </param> | 257 | private void OnWorkItemStartedCallback(WorkItem workItem) |
258 | /// <param name="postExecuteWorkItemCallback"> | 258 | { |
259 | /// A delegate to call after the callback completion | 259 | lock(_lock) |
260 | /// </param> | 260 | { |
261 | /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param> | 261 | ++_workItemsExecutingInStp; |
262 | /// <returns>Returns a work item result</returns> | 262 | } |
263 | public IWorkItemResult QueueWorkItem( | 263 | } |
264 | WorkItemCallback callback, | 264 | |
265 | object state, | 265 | private void OnWorkItemCompletedCallback(WorkItem workItem) |
266 | PostExecuteWorkItemCallback postExecuteWorkItemCallback, | 266 | { |
267 | CallToPostExecute callToPostExecute) | 267 | EnqueueToSTPNextWorkItem(null, true); |
268 | { | 268 | } |
269 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute); | 269 | |
270 | EnqueueToSTPNextWorkItem(workItem); | 270 | internal override void Enqueue(WorkItem workItem) |
271 | return workItem.GetWorkItemResult(); | 271 | { |
272 | } | 272 | EnqueueToSTPNextWorkItem(workItem); |
273 | 273 | } | |
274 | /// <summary> | 274 | |
275 | /// Queue a work item | 275 | private void EnqueueToSTPNextWorkItem(WorkItem workItem) |
276 | /// </summary> | 276 | { |
277 | /// <param name="callback">A callback to execute</param> | 277 | EnqueueToSTPNextWorkItem(workItem, false); |
278 | /// <param name="state"> | 278 | } |
279 | /// The context object of the work item. Used for passing arguments to the work item. | 279 | |
280 | /// </param> | 280 | private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue) |
281 | /// <param name="postExecuteWorkItemCallback"> | 281 | { |
282 | /// A delegate to call after the callback completion | 282 | lock(_lock) |
283 | /// </param> | 283 | { |
284 | /// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param> | 284 | // Got here from OnWorkItemCompletedCallback() |
285 | /// <param name="workItemPriority">The work item priority</param> | 285 | if (decrementWorkItemsInStpQueue) |
286 | /// <returns>Returns a work item result</returns> | 286 | { |
287 | public IWorkItemResult QueueWorkItem( | 287 | --_workItemsInStpQueue; |
288 | WorkItemCallback callback, | 288 | |
289 | object state, | 289 | if(_workItemsInStpQueue < 0) |
290 | PostExecuteWorkItemCallback postExecuteWorkItemCallback, | 290 | { |
291 | CallToPostExecute callToPostExecute, | 291 | _workItemsInStpQueue = 0; |
292 | WorkItemPriority workItemPriority) | 292 | } |
293 | { | 293 | |
294 | WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority); | 294 | --_workItemsExecutingInStp; |
295 | EnqueueToSTPNextWorkItem(workItem); | 295 | |
296 | return workItem.GetWorkItemResult(); | 296 | if(_workItemsExecutingInStp < 0) |
297 | } | 297 | { |
298 | 298 | _workItemsExecutingInStp = 0; | |
299 | /// <summary> | 299 | } |
300 | /// Wait for the thread pool to be idle | 300 | } |
301 | /// </summary> | 301 | |
302 | public void WaitForIdle() | 302 | // If the work item is not null then enqueue it |
303 | { | 303 | if (null != workItem) |
304 | WaitForIdle(Timeout.Infinite); | 304 | { |
305 | } | 305 | workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup; |
306 | 306 | ||
307 | /// <summary> | 307 | RegisterToWorkItemCompletion(workItem.GetWorkItemResult()); |
308 | /// Wait for the thread pool to be idle | 308 | _workItemsQueue.Enqueue(workItem); |
309 | /// </summary> | 309 | //_stp.IncrementWorkItemsCount(); |
310 | public bool WaitForIdle(TimeSpan timeout) | 310 | |
311 | { | 311 | if ((1 == _workItemsQueue.Count) && |
312 | return WaitForIdle((int)timeout.TotalMilliseconds); | 312 | (0 == _workItemsInStpQueue)) |
313 | } | 313 | { |
314 | 314 | _stp.RegisterWorkItemsGroup(this); | |
315 | /// <summary> | 315 | IsIdle = false; |
316 | /// Wait for the thread pool to be idle | 316 | _isIdleWaitHandle.Reset(); |
317 | /// </summary> | 317 | } |
318 | public bool WaitForIdle(int millisecondsTimeout) | 318 | } |
319 | { | 319 | |
320 | _stp.ValidateWorkItemsGroupWaitForIdle(this); | 320 | // If the work items queue of the group is empty than quit |
321 | return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false); | 321 | if (0 == _workItemsQueue.Count) |
322 | } | 322 | { |
323 | 323 | if (0 == _workItemsInStpQueue) | |
324 | public int WaitingCallbacks | 324 | { |
325 | { | 325 | _stp.UnregisterWorkItemsGroup(this); |
326 | get | 326 | IsIdle = true; |
327 | { | 327 | _isIdleWaitHandle.Set(); |
328 | return _workItemsQueue.Count; | 328 | if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0) |
329 | } | 329 | { |
330 | } | 330 | _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle)); |
331 | 331 | } | |
332 | public event WorkItemsGroupIdleHandler OnIdle | 332 | } |
333 | { | 333 | return; |
334 | add | 334 | } |
335 | { | 335 | |
336 | _onIdle += value; | 336 | if (!_isSuspended) |
337 | } | 337 | { |
338 | remove | 338 | if (_workItemsInStpQueue < _concurrency) |
339 | { | 339 | { |
340 | _onIdle -= value; | 340 | WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem; |
341 | } | 341 | try |
342 | } | 342 | { |
343 | 343 | _stp.Enqueue(nextWorkItem); | |
344 | public void Cancel() | 344 | } |
345 | { | 345 | catch (ObjectDisposedException e) |
346 | lock(_lock) | 346 | { |
347 | { | 347 | e.GetHashCode(); |
348 | _canceledWorkItemsGroup.IsCanceled = true; | 348 | // The STP has been shutdown |
349 | _workItemsQueue.Clear(); | 349 | } |
350 | _workItemsInStpQueue = 0; | 350 | |
351 | _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); | 351 | ++_workItemsInStpQueue; |
352 | } | 352 | } |
353 | } | 353 | } |
354 | 354 | } | |
355 | public void Start() | 355 | } |
356 | { | 356 | |
357 | lock (this) | 357 | #endregion |
358 | { | 358 | } |
359 | if (!_workItemsGroupStartInfo.StartSuspended) | 359 | |
360 | { | 360 | #endregion |
361 | return; | 361 | } |
362 | } | ||
363 | _workItemsGroupStartInfo.StartSuspended = false; | ||
364 | } | ||
365 | |||
366 | for(int i = 0; i < _concurrency; ++i) | ||
367 | { | ||
368 | EnqueueToSTPNextWorkItem(null, false); | ||
369 | } | ||
370 | } | ||
371 | |||
372 | #endregion | ||
373 | |||
374 | #region Private methods | ||
375 | |||
376 | private void RegisterToWorkItemCompletion(IWorkItemResult wir) | ||
377 | { | ||
378 | IInternalWorkItemResult iwir = wir as IInternalWorkItemResult; | ||
379 | iwir.OnWorkItemStarted += new WorkItemStateCallback(OnWorkItemStartedCallback); | ||
380 | iwir.OnWorkItemCompleted += new WorkItemStateCallback(OnWorkItemCompletedCallback); | ||
381 | } | ||
382 | |||
383 | public void OnSTPIsStarting() | ||
384 | { | ||
385 | lock (this) | ||
386 | { | ||
387 | if (_workItemsGroupStartInfo.StartSuspended) | ||
388 | { | ||
389 | return; | ||
390 | } | ||
391 | } | ||
392 | |||
393 | for(int i = 0; i < _concurrency; ++i) | ||
394 | { | ||
395 | EnqueueToSTPNextWorkItem(null, false); | ||
396 | } | ||
397 | } | ||
398 | |||
399 | private object FireOnIdle(object state) | ||
400 | { | ||
401 | FireOnIdleImpl(_onIdle); | ||
402 | return null; | ||
403 | } | ||
404 | |||
405 | [MethodImpl(MethodImplOptions.NoInlining)] | ||
406 | private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle) | ||
407 | { | ||
408 | if(null == onIdle) | ||
409 | { | ||
410 | return; | ||
411 | } | ||
412 | |||
413 | Delegate[] delegates = onIdle.GetInvocationList(); | ||
414 | foreach(WorkItemsGroupIdleHandler eh in delegates) | ||
415 | { | ||
416 | try | ||
417 | { | ||
418 | eh(this); | ||
419 | } | ||
420 | // Ignore exceptions | ||
421 | catch{} | ||
422 | } | ||
423 | } | ||
424 | |||
425 | private void OnWorkItemStartedCallback(WorkItem workItem) | ||
426 | { | ||
427 | lock(_lock) | ||
428 | { | ||
429 | ++_workItemsExecutingInStp; | ||
430 | } | ||
431 | } | ||
432 | |||
433 | private void OnWorkItemCompletedCallback(WorkItem workItem) | ||
434 | { | ||
435 | EnqueueToSTPNextWorkItem(null, true); | ||
436 | } | ||
437 | |||
438 | private void EnqueueToSTPNextWorkItem(WorkItem workItem) | ||
439 | { | ||
440 | EnqueueToSTPNextWorkItem(workItem, false); | ||
441 | } | ||
442 | |||
443 | private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue) | ||
444 | { | ||
445 | lock(_lock) | ||
446 | { | ||
447 | // Got here from OnWorkItemCompletedCallback() | ||
448 | if (decrementWorkItemsInStpQueue) | ||
449 | { | ||
450 | --_workItemsInStpQueue; | ||
451 | |||
452 | if(_workItemsInStpQueue < 0) | ||
453 | { | ||
454 | _workItemsInStpQueue = 0; | ||
455 | } | ||
456 | |||
457 | --_workItemsExecutingInStp; | ||
458 | |||
459 | if(_workItemsExecutingInStp < 0) | ||
460 | { | ||
461 | _workItemsExecutingInStp = 0; | ||
462 | } | ||
463 | } | ||
464 | |||
465 | // If the work item is not null then enqueue it | ||
466 | if (null != workItem) | ||
467 | { | ||
468 | workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup; | ||
469 | |||
470 | RegisterToWorkItemCompletion(workItem.GetWorkItemResult()); | ||
471 | _workItemsQueue.Enqueue(workItem); | ||
472 | //_stp.IncrementWorkItemsCount(); | ||
473 | |||
474 | if ((1 == _workItemsQueue.Count) && | ||
475 | (0 == _workItemsInStpQueue)) | ||
476 | { | ||
477 | _stp.RegisterWorkItemsGroup(this); | ||
478 | Trace.WriteLine("WorkItemsGroup " + Name + " is NOT idle"); | ||
479 | _isIdleWaitHandle.Reset(); | ||
480 | } | ||
481 | } | ||
482 | |||
483 | // If the work items queue of the group is empty than quit | ||
484 | if (0 == _workItemsQueue.Count) | ||
485 | { | ||
486 | if (0 == _workItemsInStpQueue) | ||
487 | { | ||
488 | _stp.UnregisterWorkItemsGroup(this); | ||
489 | Trace.WriteLine("WorkItemsGroup " + Name + " is idle"); | ||
490 | _isIdleWaitHandle.Set(); | ||
491 | _stp.QueueWorkItem(new WorkItemCallback(this.FireOnIdle)); | ||
492 | } | ||
493 | return; | ||
494 | } | ||
495 | |||
496 | if (!_workItemsGroupStartInfo.StartSuspended) | ||
497 | { | ||
498 | if (_workItemsInStpQueue < _concurrency) | ||
499 | { | ||
500 | WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem; | ||
501 | _stp.Enqueue(nextWorkItem, true); | ||
502 | ++_workItemsInStpQueue; | ||
503 | } | ||
504 | } | ||
505 | } | ||
506 | } | ||
507 | |||
508 | #endregion | ||
509 | } | ||
510 | |||
511 | #endregion | ||
512 | } | ||