aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
diff options
context:
space:
mode:
Diffstat (limited to 'ThirdParty/SmartThreadPool/WorkItemsGroup.cs')
-rw-r--r--ThirdParty/SmartThreadPool/WorkItemsGroup.cs361
1 files changed, 361 insertions, 0 deletions
diff --git a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
new file mode 100644
index 0000000..d429bc6
--- /dev/null
+++ b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
@@ -0,0 +1,361 @@
1using System;
2using System.Threading;
3using System.Runtime.CompilerServices;
4using System.Diagnostics;
5
6namespace Amib.Threading.Internal
7{
8
9 #region WorkItemsGroup class
10
11 /// <summary>
12 /// Summary description for WorkItemsGroup.
13 /// </summary>
14 public class WorkItemsGroup : WorkItemsGroupBase
15 {
16 #region Private members
17
18 private readonly object _lock = new object();
19
20 /// <summary>
21 /// A reference to the SmartThreadPool instance that created this
22 /// WorkItemsGroup.
23 /// </summary>
24 private readonly SmartThreadPool _stp;
25
26 /// <summary>
27 /// The OnIdle event
28 /// </summary>
29 private event WorkItemsGroupIdleHandler _onIdle;
30
31 /// <summary>
32 /// A flag to indicate if the Work Items Group is now suspended.
33 /// </summary>
34 private bool _isSuspended;
35
36 /// <summary>
37 /// Defines how many work items of this WorkItemsGroup can run at once.
38 /// </summary>
39 private int _concurrency;
40
41 /// <summary>
42 /// Priority queue to hold work items before they are passed
43 /// to the SmartThreadPool.
44 /// </summary>
45 private readonly PriorityQueue _workItemsQueue;
46
47 /// <summary>
48 /// Indicate how many work items are waiting in the SmartThreadPool
49 /// queue.
50 /// This value is used to apply the concurrency.
51 /// </summary>
52 private int _workItemsInStpQueue;
53
54 /// <summary>
55 /// Indicate how many work items are currently running in the SmartThreadPool.
56 /// This value is used with the Cancel, to calculate if we can send new
57 /// work items to the STP.
58 /// </summary>
59 private int _workItemsExecutingInStp = 0;
60
61 /// <summary>
62 /// WorkItemsGroup start information
63 /// </summary>
64 private readonly WIGStartInfo _workItemsGroupStartInfo;
65
66 /// <summary>
67 /// Signaled when all of the WorkItemsGroup's work item completed.
68 /// </summary>
69 //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
70 private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true);
71
72 /// <summary>
73 /// A common object for all the work items that this work items group
74 /// generate so we can mark them to cancel in O(1)
75 /// </summary>
76 private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
77
78 #endregion
79
80 #region Construction
81
82 public WorkItemsGroup(
83 SmartThreadPool stp,
84 int concurrency,
85 WIGStartInfo wigStartInfo)
86 {
87 if (concurrency <= 0)
88 {
89 throw new ArgumentOutOfRangeException(
90 "concurrency",
91#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
92 concurrency,
93#endif
94 "concurrency must be greater than zero");
95 }
96 _stp = stp;
97 _concurrency = concurrency;
98 _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly();
99 _workItemsQueue = new PriorityQueue();
100 Name = "WorkItemsGroup";
101
102 // The _workItemsInStpQueue gets the number of currently executing work items,
103 // because once a work item is executing, it cannot be cancelled.
104 _workItemsInStpQueue = _workItemsExecutingInStp;
105
106 _isSuspended = _workItemsGroupStartInfo.StartSuspended;
107 }
108
109 #endregion
110
111 #region WorkItemsGroupBase Overrides
112
113 public override int Concurrency
114 {
115 get { return _concurrency; }
116 set
117 {
118 Debug.Assert(value > 0);
119
120 int diff = value - _concurrency;
121 _concurrency = value;
122 if (diff > 0)
123 {
124 EnqueueToSTPNextNWorkItem(diff);
125 }
126 }
127 }
128
129 public override int WaitingCallbacks
130 {
131 get { return _workItemsQueue.Count; }
132 }
133
134 public override object[] GetStates()
135 {
136 lock (_lock)
137 {
138 object[] states = new object[_workItemsQueue.Count];
139 int i = 0;
140 foreach (WorkItem workItem in _workItemsQueue)
141 {
142 states[i] = workItem.GetWorkItemResult().State;
143 ++i;
144 }
145 return states;
146 }
147 }
148
149 /// <summary>
150 /// WorkItemsGroup start information
151 /// </summary>
152 public override WIGStartInfo WIGStartInfo
153 {
154 get { return _workItemsGroupStartInfo; }
155 }
156
157 /// <summary>
158 /// Start the Work Items Group if it was started suspended
159 /// </summary>
160 public override void Start()
161 {
162 // If the Work Items Group already started then quit
163 if (!_isSuspended)
164 {
165 return;
166 }
167 _isSuspended = false;
168
169 EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency));
170 }
171
172 public override void Cancel(bool abortExecution)
173 {
174 lock (_lock)
175 {
176 _canceledWorkItemsGroup.IsCanceled = true;
177 _workItemsQueue.Clear();
178 _workItemsInStpQueue = 0;
179 _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
180 }
181
182 if (abortExecution)
183 {
184 _stp.CancelAbortWorkItemsGroup(this);
185 }
186 }
187
188 /// <summary>
189 /// Wait for the thread pool to be idle
190 /// </summary>
191 public override bool WaitForIdle(int millisecondsTimeout)
192 {
193 SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this);
194 return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
195 }
196
197 public override event WorkItemsGroupIdleHandler OnIdle
198 {
199 add { _onIdle += value; }
200 remove { _onIdle -= value; }
201 }
202
203 #endregion
204
205 #region Private methods
206
207 private void RegisterToWorkItemCompletion(IWorkItemResult wir)
208 {
209 IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir;
210 iwir.OnWorkItemStarted += OnWorkItemStartedCallback;
211 iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback;
212 }
213
214 public void OnSTPIsStarting()
215 {
216 if (_isSuspended)
217 {
218 return;
219 }
220
221 EnqueueToSTPNextNWorkItem(_concurrency);
222 }
223
224 public void EnqueueToSTPNextNWorkItem(int count)
225 {
226 for (int i = 0; i < count; ++i)
227 {
228 EnqueueToSTPNextWorkItem(null, false);
229 }
230 }
231
232 private object FireOnIdle(object state)
233 {
234 FireOnIdleImpl(_onIdle);
235 return null;
236 }
237
238 [MethodImpl(MethodImplOptions.NoInlining)]
239 private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
240 {
241 if(null == onIdle)
242 {
243 return;
244 }
245
246 Delegate[] delegates = onIdle.GetInvocationList();
247 foreach(WorkItemsGroupIdleHandler eh in delegates)
248 {
249 try
250 {
251 eh(this);
252 }
253 catch { } // Suppress exceptions
254 }
255 }
256
257 private void OnWorkItemStartedCallback(WorkItem workItem)
258 {
259 lock(_lock)
260 {
261 ++_workItemsExecutingInStp;
262 }
263 }
264
265 private void OnWorkItemCompletedCallback(WorkItem workItem)
266 {
267 EnqueueToSTPNextWorkItem(null, true);
268 }
269
270 internal override void Enqueue(WorkItem workItem)
271 {
272 EnqueueToSTPNextWorkItem(workItem);
273 }
274
275 private void EnqueueToSTPNextWorkItem(WorkItem workItem)
276 {
277 EnqueueToSTPNextWorkItem(workItem, false);
278 }
279
280 private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
281 {
282 lock(_lock)
283 {
284 // Got here from OnWorkItemCompletedCallback()
285 if (decrementWorkItemsInStpQueue)
286 {
287 --_workItemsInStpQueue;
288
289 if(_workItemsInStpQueue < 0)
290 {
291 _workItemsInStpQueue = 0;
292 }
293
294 --_workItemsExecutingInStp;
295
296 if(_workItemsExecutingInStp < 0)
297 {
298 _workItemsExecutingInStp = 0;
299 }
300 }
301
302 // If the work item is not null then enqueue it
303 if (null != workItem)
304 {
305 workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
306
307 RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
308 _workItemsQueue.Enqueue(workItem);
309 //_stp.IncrementWorkItemsCount();
310
311 if ((1 == _workItemsQueue.Count) &&
312 (0 == _workItemsInStpQueue))
313 {
314 _stp.RegisterWorkItemsGroup(this);
315 IsIdle = false;
316 _isIdleWaitHandle.Reset();
317 }
318 }
319
320 // If the work items queue of the group is empty than quit
321 if (0 == _workItemsQueue.Count)
322 {
323 if (0 == _workItemsInStpQueue)
324 {
325 _stp.UnregisterWorkItemsGroup(this);
326 IsIdle = true;
327 _isIdleWaitHandle.Set();
328 if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0)
329 {
330 _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle));
331 }
332 }
333 return;
334 }
335
336 if (!_isSuspended)
337 {
338 if (_workItemsInStpQueue < _concurrency)
339 {
340 WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
341 try
342 {
343 _stp.Enqueue(nextWorkItem);
344 }
345 catch (ObjectDisposedException e)
346 {
347 e.GetHashCode();
348 // The STP has been shutdown
349 }
350
351 ++_workItemsInStpQueue;
352 }
353 }
354 }
355 }
356
357 #endregion
358 }
359
360 #endregion
361}