diff options
Diffstat (limited to 'ThirdParty/SmartThreadPool/WorkItemsGroup.cs')
-rw-r--r-- | ThirdParty/SmartThreadPool/WorkItemsGroup.cs | 361 |
1 files changed, 361 insertions, 0 deletions
diff --git a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs new file mode 100644 index 0000000..d429bc6 --- /dev/null +++ b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs | |||
@@ -0,0 +1,361 @@ | |||
1 | using System; | ||
2 | using System.Threading; | ||
3 | using System.Runtime.CompilerServices; | ||
4 | using System.Diagnostics; | ||
5 | |||
6 | namespace Amib.Threading.Internal | ||
7 | { | ||
8 | |||
9 | #region WorkItemsGroup class | ||
10 | |||
11 | /// <summary> | ||
12 | /// Summary description for WorkItemsGroup. | ||
13 | /// </summary> | ||
14 | public class WorkItemsGroup : WorkItemsGroupBase | ||
15 | { | ||
16 | #region Private members | ||
17 | |||
18 | private readonly object _lock = new object(); | ||
19 | |||
20 | /// <summary> | ||
21 | /// A reference to the SmartThreadPool instance that created this | ||
22 | /// WorkItemsGroup. | ||
23 | /// </summary> | ||
24 | private readonly SmartThreadPool _stp; | ||
25 | |||
26 | /// <summary> | ||
27 | /// The OnIdle event | ||
28 | /// </summary> | ||
29 | private event WorkItemsGroupIdleHandler _onIdle; | ||
30 | |||
31 | /// <summary> | ||
32 | /// A flag to indicate if the Work Items Group is now suspended. | ||
33 | /// </summary> | ||
34 | private bool _isSuspended; | ||
35 | |||
36 | /// <summary> | ||
37 | /// Defines how many work items of this WorkItemsGroup can run at once. | ||
38 | /// </summary> | ||
39 | private int _concurrency; | ||
40 | |||
41 | /// <summary> | ||
42 | /// Priority queue to hold work items before they are passed | ||
43 | /// to the SmartThreadPool. | ||
44 | /// </summary> | ||
45 | private readonly PriorityQueue _workItemsQueue; | ||
46 | |||
47 | /// <summary> | ||
48 | /// Indicate how many work items are waiting in the SmartThreadPool | ||
49 | /// queue. | ||
50 | /// This value is used to apply the concurrency. | ||
51 | /// </summary> | ||
52 | private int _workItemsInStpQueue; | ||
53 | |||
54 | /// <summary> | ||
55 | /// Indicate how many work items are currently running in the SmartThreadPool. | ||
56 | /// This value is used with the Cancel, to calculate if we can send new | ||
57 | /// work items to the STP. | ||
58 | /// </summary> | ||
59 | private int _workItemsExecutingInStp = 0; | ||
60 | |||
61 | /// <summary> | ||
62 | /// WorkItemsGroup start information | ||
63 | /// </summary> | ||
64 | private readonly WIGStartInfo _workItemsGroupStartInfo; | ||
65 | |||
66 | /// <summary> | ||
67 | /// Signaled when all of the WorkItemsGroup's work item completed. | ||
68 | /// </summary> | ||
69 | //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); | ||
70 | private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true); | ||
71 | |||
72 | /// <summary> | ||
73 | /// A common object for all the work items that this work items group | ||
74 | /// generate so we can mark them to cancel in O(1) | ||
75 | /// </summary> | ||
76 | private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); | ||
77 | |||
78 | #endregion | ||
79 | |||
80 | #region Construction | ||
81 | |||
82 | public WorkItemsGroup( | ||
83 | SmartThreadPool stp, | ||
84 | int concurrency, | ||
85 | WIGStartInfo wigStartInfo) | ||
86 | { | ||
87 | if (concurrency <= 0) | ||
88 | { | ||
89 | throw new ArgumentOutOfRangeException( | ||
90 | "concurrency", | ||
91 | #if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE) | ||
92 | concurrency, | ||
93 | #endif | ||
94 | "concurrency must be greater than zero"); | ||
95 | } | ||
96 | _stp = stp; | ||
97 | _concurrency = concurrency; | ||
98 | _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly(); | ||
99 | _workItemsQueue = new PriorityQueue(); | ||
100 | Name = "WorkItemsGroup"; | ||
101 | |||
102 | // The _workItemsInStpQueue gets the number of currently executing work items, | ||
103 | // because once a work item is executing, it cannot be cancelled. | ||
104 | _workItemsInStpQueue = _workItemsExecutingInStp; | ||
105 | |||
106 | _isSuspended = _workItemsGroupStartInfo.StartSuspended; | ||
107 | } | ||
108 | |||
109 | #endregion | ||
110 | |||
111 | #region WorkItemsGroupBase Overrides | ||
112 | |||
113 | public override int Concurrency | ||
114 | { | ||
115 | get { return _concurrency; } | ||
116 | set | ||
117 | { | ||
118 | Debug.Assert(value > 0); | ||
119 | |||
120 | int diff = value - _concurrency; | ||
121 | _concurrency = value; | ||
122 | if (diff > 0) | ||
123 | { | ||
124 | EnqueueToSTPNextNWorkItem(diff); | ||
125 | } | ||
126 | } | ||
127 | } | ||
128 | |||
129 | public override int WaitingCallbacks | ||
130 | { | ||
131 | get { return _workItemsQueue.Count; } | ||
132 | } | ||
133 | |||
134 | public override object[] GetStates() | ||
135 | { | ||
136 | lock (_lock) | ||
137 | { | ||
138 | object[] states = new object[_workItemsQueue.Count]; | ||
139 | int i = 0; | ||
140 | foreach (WorkItem workItem in _workItemsQueue) | ||
141 | { | ||
142 | states[i] = workItem.GetWorkItemResult().State; | ||
143 | ++i; | ||
144 | } | ||
145 | return states; | ||
146 | } | ||
147 | } | ||
148 | |||
149 | /// <summary> | ||
150 | /// WorkItemsGroup start information | ||
151 | /// </summary> | ||
152 | public override WIGStartInfo WIGStartInfo | ||
153 | { | ||
154 | get { return _workItemsGroupStartInfo; } | ||
155 | } | ||
156 | |||
157 | /// <summary> | ||
158 | /// Start the Work Items Group if it was started suspended | ||
159 | /// </summary> | ||
160 | public override void Start() | ||
161 | { | ||
162 | // If the Work Items Group already started then quit | ||
163 | if (!_isSuspended) | ||
164 | { | ||
165 | return; | ||
166 | } | ||
167 | _isSuspended = false; | ||
168 | |||
169 | EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency)); | ||
170 | } | ||
171 | |||
172 | public override void Cancel(bool abortExecution) | ||
173 | { | ||
174 | lock (_lock) | ||
175 | { | ||
176 | _canceledWorkItemsGroup.IsCanceled = true; | ||
177 | _workItemsQueue.Clear(); | ||
178 | _workItemsInStpQueue = 0; | ||
179 | _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); | ||
180 | } | ||
181 | |||
182 | if (abortExecution) | ||
183 | { | ||
184 | _stp.CancelAbortWorkItemsGroup(this); | ||
185 | } | ||
186 | } | ||
187 | |||
188 | /// <summary> | ||
189 | /// Wait for the thread pool to be idle | ||
190 | /// </summary> | ||
191 | public override bool WaitForIdle(int millisecondsTimeout) | ||
192 | { | ||
193 | SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this); | ||
194 | return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false); | ||
195 | } | ||
196 | |||
197 | public override event WorkItemsGroupIdleHandler OnIdle | ||
198 | { | ||
199 | add { _onIdle += value; } | ||
200 | remove { _onIdle -= value; } | ||
201 | } | ||
202 | |||
203 | #endregion | ||
204 | |||
205 | #region Private methods | ||
206 | |||
207 | private void RegisterToWorkItemCompletion(IWorkItemResult wir) | ||
208 | { | ||
209 | IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir; | ||
210 | iwir.OnWorkItemStarted += OnWorkItemStartedCallback; | ||
211 | iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback; | ||
212 | } | ||
213 | |||
214 | public void OnSTPIsStarting() | ||
215 | { | ||
216 | if (_isSuspended) | ||
217 | { | ||
218 | return; | ||
219 | } | ||
220 | |||
221 | EnqueueToSTPNextNWorkItem(_concurrency); | ||
222 | } | ||
223 | |||
224 | public void EnqueueToSTPNextNWorkItem(int count) | ||
225 | { | ||
226 | for (int i = 0; i < count; ++i) | ||
227 | { | ||
228 | EnqueueToSTPNextWorkItem(null, false); | ||
229 | } | ||
230 | } | ||
231 | |||
232 | private object FireOnIdle(object state) | ||
233 | { | ||
234 | FireOnIdleImpl(_onIdle); | ||
235 | return null; | ||
236 | } | ||
237 | |||
238 | [MethodImpl(MethodImplOptions.NoInlining)] | ||
239 | private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle) | ||
240 | { | ||
241 | if(null == onIdle) | ||
242 | { | ||
243 | return; | ||
244 | } | ||
245 | |||
246 | Delegate[] delegates = onIdle.GetInvocationList(); | ||
247 | foreach(WorkItemsGroupIdleHandler eh in delegates) | ||
248 | { | ||
249 | try | ||
250 | { | ||
251 | eh(this); | ||
252 | } | ||
253 | catch { } // Suppress exceptions | ||
254 | } | ||
255 | } | ||
256 | |||
257 | private void OnWorkItemStartedCallback(WorkItem workItem) | ||
258 | { | ||
259 | lock(_lock) | ||
260 | { | ||
261 | ++_workItemsExecutingInStp; | ||
262 | } | ||
263 | } | ||
264 | |||
265 | private void OnWorkItemCompletedCallback(WorkItem workItem) | ||
266 | { | ||
267 | EnqueueToSTPNextWorkItem(null, true); | ||
268 | } | ||
269 | |||
270 | internal override void Enqueue(WorkItem workItem) | ||
271 | { | ||
272 | EnqueueToSTPNextWorkItem(workItem); | ||
273 | } | ||
274 | |||
275 | private void EnqueueToSTPNextWorkItem(WorkItem workItem) | ||
276 | { | ||
277 | EnqueueToSTPNextWorkItem(workItem, false); | ||
278 | } | ||
279 | |||
280 | private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue) | ||
281 | { | ||
282 | lock(_lock) | ||
283 | { | ||
284 | // Got here from OnWorkItemCompletedCallback() | ||
285 | if (decrementWorkItemsInStpQueue) | ||
286 | { | ||
287 | --_workItemsInStpQueue; | ||
288 | |||
289 | if(_workItemsInStpQueue < 0) | ||
290 | { | ||
291 | _workItemsInStpQueue = 0; | ||
292 | } | ||
293 | |||
294 | --_workItemsExecutingInStp; | ||
295 | |||
296 | if(_workItemsExecutingInStp < 0) | ||
297 | { | ||
298 | _workItemsExecutingInStp = 0; | ||
299 | } | ||
300 | } | ||
301 | |||
302 | // If the work item is not null then enqueue it | ||
303 | if (null != workItem) | ||
304 | { | ||
305 | workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup; | ||
306 | |||
307 | RegisterToWorkItemCompletion(workItem.GetWorkItemResult()); | ||
308 | _workItemsQueue.Enqueue(workItem); | ||
309 | //_stp.IncrementWorkItemsCount(); | ||
310 | |||
311 | if ((1 == _workItemsQueue.Count) && | ||
312 | (0 == _workItemsInStpQueue)) | ||
313 | { | ||
314 | _stp.RegisterWorkItemsGroup(this); | ||
315 | IsIdle = false; | ||
316 | _isIdleWaitHandle.Reset(); | ||
317 | } | ||
318 | } | ||
319 | |||
320 | // If the work items queue of the group is empty than quit | ||
321 | if (0 == _workItemsQueue.Count) | ||
322 | { | ||
323 | if (0 == _workItemsInStpQueue) | ||
324 | { | ||
325 | _stp.UnregisterWorkItemsGroup(this); | ||
326 | IsIdle = true; | ||
327 | _isIdleWaitHandle.Set(); | ||
328 | if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0) | ||
329 | { | ||
330 | _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle)); | ||
331 | } | ||
332 | } | ||
333 | return; | ||
334 | } | ||
335 | |||
336 | if (!_isSuspended) | ||
337 | { | ||
338 | if (_workItemsInStpQueue < _concurrency) | ||
339 | { | ||
340 | WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem; | ||
341 | try | ||
342 | { | ||
343 | _stp.Enqueue(nextWorkItem); | ||
344 | } | ||
345 | catch (ObjectDisposedException e) | ||
346 | { | ||
347 | e.GetHashCode(); | ||
348 | // The STP has been shutdown | ||
349 | } | ||
350 | |||
351 | ++_workItemsInStpQueue; | ||
352 | } | ||
353 | } | ||
354 | } | ||
355 | } | ||
356 | |||
357 | #endregion | ||
358 | } | ||
359 | |||
360 | #endregion | ||
361 | } | ||