aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--ThirdParty/SmartThreadPool/WorkItemsGroup.cs722
1 files changed, 361 insertions, 361 deletions
diff --git a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
index 67dcbdd..d9d34ac 100644
--- a/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
+++ b/ThirdParty/SmartThreadPool/WorkItemsGroup.cs
@@ -1,361 +1,361 @@
1using System; 1using System;
2using System.Threading; 2using System.Threading;
3using System.Runtime.CompilerServices; 3using System.Runtime.CompilerServices;
4using System.Diagnostics; 4using System.Diagnostics;
5 5
6namespace Amib.Threading.Internal 6namespace Amib.Threading.Internal
7{ 7{
8 8
9 #region WorkItemsGroup class 9 #region WorkItemsGroup class
10 10
11 /// <summary> 11 /// <summary>
12 /// Summary description for WorkItemsGroup. 12 /// Summary description for WorkItemsGroup.
13 /// </summary> 13 /// </summary>
14 public class WorkItemsGroup : WorkItemsGroupBase 14 public class WorkItemsGroup : WorkItemsGroupBase
15 { 15 {
16 #region Private members 16 #region Private members
17 17
18 private readonly object _lock = new object(); 18 private readonly object _lock = new object();
19 19
20 /// <summary> 20 /// <summary>
21 /// A reference to the SmartThreadPool instance that created this 21 /// A reference to the SmartThreadPool instance that created this
22 /// WorkItemsGroup. 22 /// WorkItemsGroup.
23 /// </summary> 23 /// </summary>
24 private readonly SmartThreadPool _stp; 24 private readonly SmartThreadPool _stp;
25 25
26 /// <summary> 26 /// <summary>
27 /// The OnIdle event 27 /// The OnIdle event
28 /// </summary> 28 /// </summary>
29 private event WorkItemsGroupIdleHandler _onIdle; 29 private event WorkItemsGroupIdleHandler _onIdle;
30 30
31 /// <summary> 31 /// <summary>
32 /// A flag to indicate if the Work Items Group is now suspended. 32 /// A flag to indicate if the Work Items Group is now suspended.
33 /// </summary> 33 /// </summary>
34 private bool _isSuspended; 34 private bool _isSuspended;
35 35
36 /// <summary> 36 /// <summary>
37 /// Defines how many work items of this WorkItemsGroup can run at once. 37 /// Defines how many work items of this WorkItemsGroup can run at once.
38 /// </summary> 38 /// </summary>
39 private int _concurrency; 39 private int _concurrency;
40 40
41 /// <summary> 41 /// <summary>
42 /// Priority queue to hold work items before they are passed 42 /// Priority queue to hold work items before they are passed
43 /// to the SmartThreadPool. 43 /// to the SmartThreadPool.
44 /// </summary> 44 /// </summary>
45 private readonly PriorityQueue _workItemsQueue; 45 private readonly PriorityQueue _workItemsQueue;
46 46
47 /// <summary> 47 /// <summary>
48 /// Indicate how many work items are waiting in the SmartThreadPool 48 /// Indicate how many work items are waiting in the SmartThreadPool
49 /// queue. 49 /// queue.
50 /// This value is used to apply the concurrency. 50 /// This value is used to apply the concurrency.
51 /// </summary> 51 /// </summary>
52 private int _workItemsInStpQueue; 52 private int _workItemsInStpQueue;
53 53
54 /// <summary> 54 /// <summary>
55 /// Indicate how many work items are currently running in the SmartThreadPool. 55 /// Indicate how many work items are currently running in the SmartThreadPool.
56 /// This value is used with the Cancel, to calculate if we can send new 56 /// This value is used with the Cancel, to calculate if we can send new
57 /// work items to the STP. 57 /// work items to the STP.
58 /// </summary> 58 /// </summary>
59 private int _workItemsExecutingInStp = 0; 59 private int _workItemsExecutingInStp = 0;
60 60
61 /// <summary> 61 /// <summary>
62 /// WorkItemsGroup start information 62 /// WorkItemsGroup start information
63 /// </summary> 63 /// </summary>
64 private readonly WIGStartInfo _workItemsGroupStartInfo; 64 private readonly WIGStartInfo _workItemsGroupStartInfo;
65 65
66 /// <summary> 66 /// <summary>
67 /// Signaled when all of the WorkItemsGroup's work item completed. 67 /// Signaled when all of the WorkItemsGroup's work item completed.
68 /// </summary> 68 /// </summary>
69 //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); 69 //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
70 private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true); 70 private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true);
71 71
72 /// <summary> 72 /// <summary>
73 /// A common object for all the work items that this work items group 73 /// A common object for all the work items that this work items group
74 /// generate so we can mark them to cancel in O(1) 74 /// generate so we can mark them to cancel in O(1)
75 /// </summary> 75 /// </summary>
76 private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); 76 private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
77 77
78 #endregion 78 #endregion
79 79
80 #region Construction 80 #region Construction
81 81
82 public WorkItemsGroup( 82 public WorkItemsGroup(
83 SmartThreadPool stp, 83 SmartThreadPool stp,
84 int concurrency, 84 int concurrency,
85 WIGStartInfo wigStartInfo) 85 WIGStartInfo wigStartInfo)
86 { 86 {
87 if (concurrency <= 0) 87 if (concurrency <= 0)
88 { 88 {
89 throw new ArgumentOutOfRangeException( 89 throw new ArgumentOutOfRangeException(
90 "concurrency", 90 "concurrency",
91#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE) 91#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
92 concurrency, 92 concurrency,
93#endif 93#endif
94 "concurrency must be greater than zero"); 94 "concurrency must be greater than zero");
95 } 95 }
96 _stp = stp; 96 _stp = stp;
97 _concurrency = concurrency; 97 _concurrency = concurrency;
98 _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly(); 98 _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly();
99 _workItemsQueue = new PriorityQueue(); 99 _workItemsQueue = new PriorityQueue();
100 Name = "WorkItemsGroup"; 100 Name = "WorkItemsGroup";
101 101
102 // The _workItemsInStpQueue gets the number of currently executing work items, 102 // The _workItemsInStpQueue gets the number of currently executing work items,
103 // because once a work item is executing, it cannot be cancelled. 103 // because once a work item is executing, it cannot be cancelled.
104 _workItemsInStpQueue = _workItemsExecutingInStp; 104 _workItemsInStpQueue = _workItemsExecutingInStp;
105 105
106 _isSuspended = _workItemsGroupStartInfo.StartSuspended; 106 _isSuspended = _workItemsGroupStartInfo.StartSuspended;
107 } 107 }
108 108
109 #endregion 109 #endregion
110 110
111 #region WorkItemsGroupBase Overrides 111 #region WorkItemsGroupBase Overrides
112 112
113 public override int Concurrency 113 public override int Concurrency
114 { 114 {
115 get { return _concurrency; } 115 get { return _concurrency; }
116 set 116 set
117 { 117 {
118 Debug.Assert(value > 0); 118 Debug.Assert(value > 0);
119 119
120 int diff = value - _concurrency; 120 int diff = value - _concurrency;
121 _concurrency = value; 121 _concurrency = value;
122 if (diff > 0) 122 if (diff > 0)
123 { 123 {
124 EnqueueToSTPNextNWorkItem(diff); 124 EnqueueToSTPNextNWorkItem(diff);
125 } 125 }
126 } 126 }
127 } 127 }
128 128
129 public override int WaitingCallbacks 129 public override int WaitingCallbacks
130 { 130 {
131 get { return _workItemsQueue.Count; } 131 get { return _workItemsQueue.Count; }
132 } 132 }
133 133
134 public override object[] GetStates() 134 public override object[] GetStates()
135 { 135 {
136 lock (_lock) 136 lock (_lock)
137 { 137 {
138 object[] states = new object[_workItemsQueue.Count]; 138 object[] states = new object[_workItemsQueue.Count];
139 int i = 0; 139 int i = 0;
140 foreach (WorkItem workItem in _workItemsQueue) 140 foreach (WorkItem workItem in _workItemsQueue)
141 { 141 {
142 states[i] = workItem.GetWorkItemResult().State; 142 states[i] = workItem.GetWorkItemResult().State;
143 ++i; 143 ++i;
144 } 144 }
145 return states; 145 return states;
146 } 146 }
147 } 147 }
148 148
149 /// <summary> 149 /// <summary>
150 /// WorkItemsGroup start information 150 /// WorkItemsGroup start information
151 /// </summary> 151 /// </summary>
152 public override WIGStartInfo WIGStartInfo 152 public override WIGStartInfo WIGStartInfo
153 { 153 {
154 get { return _workItemsGroupStartInfo; } 154 get { return _workItemsGroupStartInfo; }
155 } 155 }
156 156
157 /// <summary> 157 /// <summary>
158 /// Start the Work Items Group if it was started suspended 158 /// Start the Work Items Group if it was started suspended
159 /// </summary> 159 /// </summary>
160 public override void Start() 160 public override void Start()
161 { 161 {
162 // If the Work Items Group already started then quit 162 // If the Work Items Group already started then quit
163 if (!_isSuspended) 163 if (!_isSuspended)
164 { 164 {
165 return; 165 return;
166 } 166 }
167 _isSuspended = false; 167 _isSuspended = false;
168 168
169 EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency)); 169 EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency));
170 } 170 }
171 171
172 public override void Cancel(bool abortExecution) 172 public override void Cancel(bool abortExecution)
173 { 173 {
174 lock (_lock) 174 lock (_lock)
175 { 175 {
176 _canceledWorkItemsGroup.IsCanceled = true; 176 _canceledWorkItemsGroup.IsCanceled = true;
177 _workItemsQueue.Clear(); 177 _workItemsQueue.Clear();
178 _workItemsInStpQueue = 0; 178 _workItemsInStpQueue = 0;
179 _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); 179 _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
180 } 180 }
181 181
182 if (abortExecution) 182 if (abortExecution)
183 { 183 {
184 _stp.CancelAbortWorkItemsGroup(this); 184 _stp.CancelAbortWorkItemsGroup(this);
185 } 185 }
186 } 186 }
187 187
188 /// <summary> 188 /// <summary>
189 /// Wait for the thread pool to be idle 189 /// Wait for the thread pool to be idle
190 /// </summary> 190 /// </summary>
191 public override bool WaitForIdle(int millisecondsTimeout) 191 public override bool WaitForIdle(int millisecondsTimeout)
192 { 192 {
193 SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this); 193 SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this);
194 return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false); 194 return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
195 } 195 }
196 196
197 public override event WorkItemsGroupIdleHandler OnIdle 197 public override event WorkItemsGroupIdleHandler OnIdle
198 { 198 {
199 add { _onIdle += value; } 199 add { _onIdle += value; }
200 remove { _onIdle -= value; } 200 remove { _onIdle -= value; }
201 } 201 }
202 202
203 #endregion 203 #endregion
204 204
205 #region Private methods 205 #region Private methods
206 206
207 private void RegisterToWorkItemCompletion(IWorkItemResult wir) 207 private void RegisterToWorkItemCompletion(IWorkItemResult wir)
208 { 208 {
209 IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir; 209 IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir;
210 iwir.OnWorkItemStarted += OnWorkItemStartedCallback; 210 iwir.OnWorkItemStarted += OnWorkItemStartedCallback;
211 iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback; 211 iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback;
212 } 212 }
213 213
214 public void OnSTPIsStarting() 214 public void OnSTPIsStarting()
215 { 215 {
216 if (_isSuspended) 216 if (_isSuspended)
217 { 217 {
218 return; 218 return;
219 } 219 }
220 220
221 EnqueueToSTPNextNWorkItem(_concurrency); 221 EnqueueToSTPNextNWorkItem(_concurrency);
222 } 222 }
223 223
224 public void EnqueueToSTPNextNWorkItem(int count) 224 public void EnqueueToSTPNextNWorkItem(int count)
225 { 225 {
226 for (int i = 0; i < count; ++i) 226 for (int i = 0; i < count; ++i)
227 { 227 {
228 EnqueueToSTPNextWorkItem(null, false); 228 EnqueueToSTPNextWorkItem(null, false);
229 } 229 }
230 } 230 }
231 231
232 private object FireOnIdle(object state) 232 private object FireOnIdle(object state)
233 { 233 {
234 FireOnIdleImpl(_onIdle); 234 FireOnIdleImpl(_onIdle);
235 return null; 235 return null;
236 } 236 }
237 237
238 [MethodImpl(MethodImplOptions.NoInlining)] 238 [MethodImpl(MethodImplOptions.NoInlining)]
239 private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle) 239 private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
240 { 240 {
241 if(null == onIdle) 241 if(null == onIdle)
242 { 242 {
243 return; 243 return;
244 } 244 }
245 245
246 Delegate[] delegates = onIdle.GetInvocationList(); 246 Delegate[] delegates = onIdle.GetInvocationList();
247 foreach(WorkItemsGroupIdleHandler eh in delegates) 247 foreach(WorkItemsGroupIdleHandler eh in delegates)
248 { 248 {
249 try 249 try
250 { 250 {
251 eh(this); 251 eh(this);
252 } 252 }
253 catch { } // Suppress exceptions 253 catch { } // Suppress exceptions
254 } 254 }
255 } 255 }
256 256
257 private void OnWorkItemStartedCallback(WorkItem workItem) 257 private void OnWorkItemStartedCallback(WorkItem workItem)
258 { 258 {
259 lock(_lock) 259 lock(_lock)
260 { 260 {
261 ++_workItemsExecutingInStp; 261 ++_workItemsExecutingInStp;
262 } 262 }
263 } 263 }
264 264
265 private void OnWorkItemCompletedCallback(WorkItem workItem) 265 private void OnWorkItemCompletedCallback(WorkItem workItem)
266 { 266 {
267 EnqueueToSTPNextWorkItem(null, true); 267 EnqueueToSTPNextWorkItem(null, true);
268 } 268 }
269 269
270 internal override void Enqueue(WorkItem workItem) 270 internal override void Enqueue(WorkItem workItem)
271 { 271 {
272 EnqueueToSTPNextWorkItem(workItem); 272 EnqueueToSTPNextWorkItem(workItem);
273 } 273 }
274 274
275 private void EnqueueToSTPNextWorkItem(WorkItem workItem) 275 private void EnqueueToSTPNextWorkItem(WorkItem workItem)
276 { 276 {
277 EnqueueToSTPNextWorkItem(workItem, false); 277 EnqueueToSTPNextWorkItem(workItem, false);
278 } 278 }
279 279
280 private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue) 280 private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
281 { 281 {
282 lock(_lock) 282 lock(_lock)
283 { 283 {
284 // Got here from OnWorkItemCompletedCallback() 284 // Got here from OnWorkItemCompletedCallback()
285 if (decrementWorkItemsInStpQueue) 285 if (decrementWorkItemsInStpQueue)
286 { 286 {
287 --_workItemsInStpQueue; 287 --_workItemsInStpQueue;
288 288
289 if(_workItemsInStpQueue < 0) 289 if(_workItemsInStpQueue < 0)
290 { 290 {
291 _workItemsInStpQueue = 0; 291 _workItemsInStpQueue = 0;
292 } 292 }
293 293
294 --_workItemsExecutingInStp; 294 --_workItemsExecutingInStp;
295 295
296 if(_workItemsExecutingInStp < 0) 296 if(_workItemsExecutingInStp < 0)
297 { 297 {
298 _workItemsExecutingInStp = 0; 298 _workItemsExecutingInStp = 0;
299 } 299 }
300 } 300 }
301 301
302 // If the work item is not null then enqueue it 302 // If the work item is not null then enqueue it
303 if (null != workItem) 303 if (null != workItem)
304 { 304 {
305 workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup; 305 workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
306 306
307 RegisterToWorkItemCompletion(workItem.GetWorkItemResult()); 307 RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
308 _workItemsQueue.Enqueue(workItem); 308 _workItemsQueue.Enqueue(workItem);
309 //_stp.IncrementWorkItemsCount(); 309 //_stp.IncrementWorkItemsCount();
310 310
311 if ((1 == _workItemsQueue.Count) && 311 if ((1 == _workItemsQueue.Count) &&
312 (0 == _workItemsInStpQueue)) 312 (0 == _workItemsInStpQueue))
313 { 313 {
314 _stp.RegisterWorkItemsGroup(this); 314 _stp.RegisterWorkItemsGroup(this);
315 IsIdle = false; 315 IsIdle = false;
316 _isIdleWaitHandle.Reset(); 316 _isIdleWaitHandle.Reset();
317 } 317 }
318 } 318 }
319 319
320 // If the work items queue of the group is empty than quit 320 // If the work items queue of the group is empty than quit
321 if (0 == _workItemsQueue.Count) 321 if (0 == _workItemsQueue.Count)
322 { 322 {
323 if (0 == _workItemsInStpQueue) 323 if (0 == _workItemsInStpQueue)
324 { 324 {
325 _stp.UnregisterWorkItemsGroup(this); 325 _stp.UnregisterWorkItemsGroup(this);
326 IsIdle = true; 326 IsIdle = true;
327 _isIdleWaitHandle.Set(); 327 _isIdleWaitHandle.Set();
328 if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0) 328 if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0)
329 { 329 {
330 _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle)); 330 _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle));
331 } 331 }
332 } 332 }
333 return; 333 return;
334 } 334 }
335 335
336 if (!_isSuspended) 336 if (!_isSuspended)
337 { 337 {
338 if (_workItemsInStpQueue < _concurrency) 338 if (_workItemsInStpQueue < _concurrency)
339 { 339 {
340 WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem; 340 WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
341 try 341 try
342 { 342 {
343 _stp.Enqueue(nextWorkItem); 343 _stp.Enqueue(nextWorkItem);
344 } 344 }
345 catch (ObjectDisposedException e) 345 catch (ObjectDisposedException e)
346 { 346 {
347 e.GetHashCode(); 347 e.GetHashCode();
348 // The STP has been shutdown 348 // The STP has been shutdown
349 } 349 }
350 350
351 ++_workItemsInStpQueue; 351 ++_workItemsInStpQueue;
352 } 352 }
353 } 353 }
354 } 354 }
355 } 355 }
356 356
357 #endregion 357 #endregion
358 } 358 }
359 359
360 #endregion 360 #endregion
361} 361}