aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/ThirdParty/SmartThreadPool/WorkItemsQueue.cs
diff options
context:
space:
mode:
authorDiva Canto2013-05-06 09:18:17 -0700
committerDiva Canto2013-05-06 09:18:17 -0700
commita81ddf3d7097a2e0959080ae7291357435b0bd5b (patch)
tree688ceb63aa41d8eb9d3af27bd57d39cbd38ca250 /ThirdParty/SmartThreadPool/WorkItemsQueue.cs
parentMinor reordering of operations on NewUserConnection. The agent circuit needs ... (diff)
parentMerge branch 'master' into bulletsim4 (diff)
downloadopensim-SC_OLD-a81ddf3d7097a2e0959080ae7291357435b0bd5b.zip
opensim-SC_OLD-a81ddf3d7097a2e0959080ae7291357435b0bd5b.tar.gz
opensim-SC_OLD-a81ddf3d7097a2e0959080ae7291357435b0bd5b.tar.bz2
opensim-SC_OLD-a81ddf3d7097a2e0959080ae7291357435b0bd5b.tar.xz
Merge branch 'master' of ssh://opensimulator.org/var/git/opensim
Diffstat (limited to 'ThirdParty/SmartThreadPool/WorkItemsQueue.cs')
-rw-r--r--ThirdParty/SmartThreadPool/WorkItemsQueue.cs973
1 files changed, 509 insertions, 464 deletions
diff --git a/ThirdParty/SmartThreadPool/WorkItemsQueue.cs b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs
index af5af07..e0bc916 100644
--- a/ThirdParty/SmartThreadPool/WorkItemsQueue.cs
+++ b/ThirdParty/SmartThreadPool/WorkItemsQueue.cs
@@ -1,109 +1,151 @@
1// Ami Bar
2// amibar@gmail.com
3
4using System; 1using System;
2using System.Collections.Generic;
5using System.Threading; 3using System.Threading;
6 4
7namespace Amib.Threading.Internal 5namespace Amib.Threading.Internal
8{ 6{
9 #region WorkItemsQueue class 7 #region WorkItemsQueue class
10 8
11 /// <summary> 9 /// <summary>
12 /// WorkItemsQueue class. 10 /// WorkItemsQueue class.
13 /// </summary> 11 /// </summary>
14 public class WorkItemsQueue : IDisposable 12 public class WorkItemsQueue : IDisposable
15 { 13 {
16 #region Member variables 14 #region Member variables
17 15
18 /// <summary> 16 /// <summary>
19 /// Waiters queue (implemented as stack). 17 /// Waiters queue (implemented as stack).
20 /// </summary> 18 /// </summary>
21 private WaiterEntry _headWaiterEntry = new WaiterEntry(); 19 private readonly WaiterEntry _headWaiterEntry = new WaiterEntry();
22 20
23 /// <summary> 21 /// <summary>
24 /// Waiters count 22 /// Waiters count
25 /// </summary> 23 /// </summary>
26 private int _waitersCount = 0; 24 private int _waitersCount = 0;
27 25
28 /// <summary> 26 /// <summary>
29 /// Work items queue 27 /// Work items queue
30 /// </summary> 28 /// </summary>
31 private PriorityQueue _workItems = new PriorityQueue(); 29 private readonly PriorityQueue _workItems = new PriorityQueue();
32 30
33 /// <summary> 31 /// <summary>
34 /// Indicate that work items are allowed to be queued 32 /// Indicate that work items are allowed to be queued
35 /// </summary> 33 /// </summary>
36 private bool _isWorkItemsQueueActive = true; 34 private bool _isWorkItemsQueueActive = true;
37 35
38 /// <summary>
39 /// Each thread in the thread pool keeps its own waiter entry.
40 /// </summary>
41 [ThreadStatic]
42 private static WaiterEntry _waiterEntry;
43 36
44 /// <summary> 37#if (WINDOWS_PHONE)
45 /// A flag that indicates if the WorkItemsQueue has been disposed. 38 private static readonly Dictionary<int, WaiterEntry> _waiterEntries = new Dictionary<int, WaiterEntry>();
46 /// </summary> 39#elif (_WINDOWS_CE)
47 private bool _isDisposed = false; 40 private static LocalDataStoreSlot _waiterEntrySlot = Thread.AllocateDataSlot();
41#else
48 42
49 #endregion 43 [ThreadStatic]
44 private static WaiterEntry _waiterEntry;
45#endif
50 46
51 #region Public properties
52 47
53 /// <summary> 48 /// <summary>
54 /// Returns the current number of work items in the queue 49 /// Each thread in the thread pool keeps its own waiter entry.
55 /// </summary> 50 /// </summary>
56 public int Count 51 private static WaiterEntry CurrentWaiterEntry
57 { 52 {
53#if (WINDOWS_PHONE)
58 get 54 get
59 { 55 {
60 lock(this) 56 lock (_waiterEntries)
61 { 57 {
62 ValidateNotDisposed(); 58 WaiterEntry waiterEntry;
63 return _workItems.Count; 59 if (_waiterEntries.TryGetValue(Thread.CurrentThread.ManagedThreadId, out waiterEntry))
60 {
61 return waiterEntry;
62 }
64 } 63 }
64 return null;
65 } 65 }
66 } 66 set
67
68 /// <summary>
69 /// Returns the current number of waiters
70 /// </summary>
71 public int WaitersCount
72 {
73 get
74 { 67 {
75 lock(this) 68 lock (_waiterEntries)
76 { 69 {
77 ValidateNotDisposed(); 70 _waiterEntries[Thread.CurrentThread.ManagedThreadId] = value;
78 return _waitersCount;
79 } 71 }
80 } 72 }
73#elif (_WINDOWS_CE)
74 get
75 {
76 return Thread.GetData(_waiterEntrySlot) as WaiterEntry;
77 }
78 set
79 {
80 Thread.SetData(_waiterEntrySlot, value);
81 }
82#else
83 get
84 {
85 return _waiterEntry;
86 }
87 set
88 {
89 _waiterEntry = value;
90 }
91#endif
81 } 92 }
82 93
94 /// <summary>
95 /// A flag that indicates if the WorkItemsQueue has been disposed.
96 /// </summary>
97 private bool _isDisposed = false;
83 98
84 #endregion 99 #endregion
85 100
86 #region Public methods 101 #region Public properties
87 102
88 /// <summary> 103 /// <summary>
89 /// Enqueue a work item to the queue. 104 /// Returns the current number of work items in the queue
90 /// </summary> 105 /// </summary>
91 public bool EnqueueWorkItem(WorkItem workItem) 106 public int Count
92 { 107 {
93 // A work item cannot be null, since null is used in the 108 get
94 // WaitForWorkItem() method to indicate timeout or cancel 109 {
95 if (null == workItem) 110 return _workItems.Count;
96 { 111 }
97 throw new ArgumentNullException("workItem" , "workItem cannot be null"); 112 }
98 } 113
114 /// <summary>
115 /// Returns the current number of waiters
116 /// </summary>
117 public int WaitersCount
118 {
119 get
120 {
121 return _waitersCount;
122 }
123 }
99 124
100 bool enqueue = true;
101 125
102 // First check if there is a waiter waiting for work item. During 126 #endregion
103 // the check, timed out waiters are ignored. If there is no 127
104 // waiter then the work item is queued. 128 #region Public methods
105 lock(this) 129
106 { 130 /// <summary>
131 /// Enqueue a work item to the queue.
132 /// </summary>
133 public bool EnqueueWorkItem(WorkItem workItem)
134 {
135 // A work item cannot be null, since null is used in the
136 // WaitForWorkItem() method to indicate timeout or cancel
137 if (null == workItem)
138 {
139 throw new ArgumentNullException("workItem" , "workItem cannot be null");
140 }
141
142 bool enqueue = true;
143
144 // First check if there is a waiter waiting for work item. During
145 // the check, timed out waiters are ignored. If there is no
146 // waiter then the work item is queued.
147 lock(this)
148 {
107 ValidateNotDisposed(); 149 ValidateNotDisposed();
108 150
109 if (!_isWorkItemsQueueActive) 151 if (!_isWorkItemsQueueActive)
@@ -111,56 +153,55 @@ namespace Amib.Threading.Internal
111 return false; 153 return false;
112 } 154 }
113 155
114 while(_waitersCount > 0) 156 while(_waitersCount > 0)
115 { 157 {
116 // Dequeue a waiter. 158 // Dequeue a waiter.
117 WaiterEntry waiterEntry = PopWaiter(); 159 WaiterEntry waiterEntry = PopWaiter();
118 160
119 // Signal the waiter. On success break the loop 161 // Signal the waiter. On success break the loop
120 if (waiterEntry.Signal(workItem)) 162 if (waiterEntry.Signal(workItem))
121 { 163 {
122 enqueue = false; 164 enqueue = false;
123 break; 165 break;
124 } 166 }
125 } 167 }
126 168
127 if (enqueue) 169 if (enqueue)
128 { 170 {
129 // Enqueue the work item 171 // Enqueue the work item
130 _workItems.Enqueue(workItem); 172 _workItems.Enqueue(workItem);
131 } 173 }
132 } 174 }
133 return true; 175 return true;
134 } 176 }
135 177
136 178
137 /// <summary> 179 /// <summary>
138 /// Waits for a work item or exits on timeout or cancel 180 /// Waits for a work item or exits on timeout or cancel
139 /// </summary> 181 /// </summary>
140 /// <param name="millisecondsTimeout">Timeout in milliseconds</param> 182 /// <param name="millisecondsTimeout">Timeout in milliseconds</param>
141 /// <param name="cancelEvent">Cancel wait handle</param> 183 /// <param name="cancelEvent">Cancel wait handle</param>
142 /// <returns>Returns true if the resource was granted</returns> 184 /// <returns>Returns true if the resource was granted</returns>
143 public WorkItem DequeueWorkItem( 185 public WorkItem DequeueWorkItem(
144 int millisecondsTimeout, 186 int millisecondsTimeout,
145 WaitHandle cancelEvent) 187 WaitHandle cancelEvent)
146 { 188 {
147 /// This method cause the caller to wait for a work item. 189 // This method cause the caller to wait for a work item.
148 /// If there is at least one waiting work item then the 190 // If there is at least one waiting work item then the
149 /// method returns immidiately with true. 191 // method returns immidiately with it.
150 /// 192 //
151 /// If there are no waiting work items then the caller 193 // If there are no waiting work items then the caller
152 /// is queued between other waiters for a work item to arrive. 194 // is queued between other waiters for a work item to arrive.
153 /// 195 //
154 /// If a work item didn't come within millisecondsTimeout or 196 // If a work item didn't come within millisecondsTimeout or
155 /// the user canceled the wait by signaling the cancelEvent 197 // the user canceled the wait by signaling the cancelEvent
156 /// then the method returns false to indicate that the caller 198 // then the method returns null to indicate that the caller
157 /// didn't get a work item. 199 // didn't get a work item.
158 200
159 WaiterEntry waiterEntry = null; 201 WaiterEntry waiterEntry;
160 WorkItem workItem = null; 202 WorkItem workItem = null;
161 203 lock (this)
162 lock(this) 204 {
163 {
164 ValidateNotDisposed(); 205 ValidateNotDisposed();
165 206
166 // If there are waiting work items then take one and return. 207 // If there are waiting work items then take one and return.
@@ -169,80 +210,79 @@ namespace Amib.Threading.Internal
169 workItem = _workItems.Dequeue() as WorkItem; 210 workItem = _workItems.Dequeue() as WorkItem;
170 return workItem; 211 return workItem;
171 } 212 }
172 // No waiting work items ...
173 else
174 {
175 // Get the wait entry for the waiters queue
176 waiterEntry = GetThreadWaiterEntry();
177
178 // Put the waiter with the other waiters
179 PushWaiter(waiterEntry);
180 }
181 }
182
183 // Prepare array of wait handle for the WaitHandle.WaitAny()
184 WaitHandle [] waitHandles = new WaitHandle [] {
185 waiterEntry.WaitHandle,
186 cancelEvent };
187
188 // Wait for an available resource, cancel event, or timeout.
189 213
190 // During the wait we are supposes to exit the synchronization 214 // No waiting work items ...
191 // domain. (Placing true as the third argument of the WaitAny()) 215
192 // It just doesn't work, I don't know why, so I have lock(this) 216 // Get the waiter entry for the waiters queue
193 // statments insted of one. 217 waiterEntry = GetThreadWaiterEntry();
194 218
195 int index = WaitHandle.WaitAny( 219 // Put the waiter with the other waiters
196 waitHandles, 220 PushWaiter(waiterEntry);
197 millisecondsTimeout, 221 }
198 true); 222
199 223 // Prepare array of wait handle for the WaitHandle.WaitAny()
200 lock(this) 224 WaitHandle [] waitHandles = new WaitHandle[] {
201 { 225 waiterEntry.WaitHandle,
202 // success is true if it got a work item. 226 cancelEvent };
203 bool success = (0 == index); 227
204 228 // Wait for an available resource, cancel event, or timeout.
205 // The timeout variable is used only for readability. 229
206 // (We treat cancel as timeout) 230 // During the wait we are supposes to exit the synchronization
207 bool timeout = !success; 231 // domain. (Placing true as the third argument of the WaitAny())
208 232 // It just doesn't work, I don't know why, so I have two lock(this)
209 // On timeout update the waiterEntry that it is timed out 233 // statments instead of one.
210 if (timeout) 234
211 { 235 int index = STPEventWaitHandle.WaitAny(
212 // The Timeout() fails if the waiter has already been signaled 236 waitHandles,
213 timeout = waiterEntry.Timeout(); 237 millisecondsTimeout,
214 238 true);
215 // On timeout remove the waiter from the queue. 239
216 // Note that the complexity is O(1). 240 lock(this)
217 if(timeout) 241 {
218 { 242 // success is true if it got a work item.
219 RemoveWaiter(waiterEntry, false); 243 bool success = (0 == index);
220 } 244
221 245 // The timeout variable is used only for readability.
222 // Again readability 246 // (We treat cancel as timeout)
223 success = !timeout; 247 bool timeout = !success;
224 } 248
225 249 // On timeout update the waiterEntry that it is timed out
226 // On success return the work item 250 if (timeout)
227 if (success) 251 {
228 { 252 // The Timeout() fails if the waiter has already been signaled
229 workItem = waiterEntry.WorkItem; 253 timeout = waiterEntry.Timeout();
230 254
231 if (null == workItem) 255 // On timeout remove the waiter from the queue.
232 { 256 // Note that the complexity is O(1).
233 workItem = _workItems.Dequeue() as WorkItem; 257 if(timeout)
234 } 258 {
235 } 259 RemoveWaiter(waiterEntry, false);
236 } 260 }
237 // On failure return null. 261
238 return workItem; 262 // Again readability
239 } 263 success = !timeout;
264 }
265
266 // On success return the work item
267 if (success)
268 {
269 workItem = waiterEntry.WorkItem;
270
271 if (null == workItem)
272 {
273 workItem = _workItems.Dequeue() as WorkItem;
274 }
275 }
276 }
277 // On failure return null.
278 return workItem;
279 }
240 280
241 /// <summary> 281 /// <summary>
242 /// Cleanup the work items queue, hence no more work 282 /// Cleanup the work items queue, hence no more work
243 /// items are allowed to be queue 283 /// items are allowed to be queue
244 /// </summary> 284 /// </summary>
245 protected virtual void Cleanup() 285 private void Cleanup()
246 { 286 {
247 lock(this) 287 lock(this)
248 { 288 {
@@ -271,301 +311,312 @@ namespace Amib.Threading.Internal
271 // Tell the waiters that they were timed out. 311 // Tell the waiters that they were timed out.
272 // It won't signal them to exit, but to ignore their 312 // It won't signal them to exit, but to ignore their
273 // next work item. 313 // next work item.
274 while(_waitersCount > 0) 314 while(_waitersCount > 0)
275 { 315 {
276 WaiterEntry waiterEntry = PopWaiter(); 316 WaiterEntry waiterEntry = PopWaiter();
277 waiterEntry.Timeout(); 317 waiterEntry.Timeout();
278 } 318 }
279 } 319 }
280 } 320 }
281 321
282 #endregion 322 public object[] GetStates()
283
284 #region Private methods
285
286 /// <summary>
287 /// Returns the WaiterEntry of the current thread
288 /// </summary>
289 /// <returns></returns>
290 /// In order to avoid creation and destuction of WaiterEntry
291 /// objects each thread has its own WaiterEntry object.
292 private WaiterEntry GetThreadWaiterEntry()
293 { 323 {
294 if (null == _waiterEntry) 324 lock (this)
295 { 325 {
296 _waiterEntry = new WaiterEntry(); 326 object[] states = new object[_workItems.Count];
327 int i = 0;
328 foreach (WorkItem workItem in _workItems)
329 {
330 states[i] = workItem.GetWorkItemResult().State;
331 ++i;
332 }
333 return states;
297 } 334 }
298 _waiterEntry.Reset();
299 return _waiterEntry;
300 } 335 }
301 336
302 #region Waiters stack methods 337 #endregion
303 338
304 /// <summary> 339 #region Private methods
305 /// Push a new waiter into the waiter's stack 340
306 /// </summary> 341 /// <summary>
307 /// <param name="newWaiterEntry">A waiter to put in the stack</param> 342 /// Returns the WaiterEntry of the current thread
308 public void PushWaiter(WaiterEntry newWaiterEntry) 343 /// </summary>
309 { 344 /// <returns></returns>
310 // Remove the waiter if it is already in the stack and 345 /// In order to avoid creation and destuction of WaiterEntry
311 // update waiter's count as needed 346 /// objects each thread has its own WaiterEntry object.
312 RemoveWaiter(newWaiterEntry, false); 347 private static WaiterEntry GetThreadWaiterEntry()
313 348 {
314 // If the stack is empty then newWaiterEntry is the new head of the stack 349 if (null == CurrentWaiterEntry)
315 if (null == _headWaiterEntry._nextWaiterEntry) 350 {
316 { 351 CurrentWaiterEntry = new WaiterEntry();
317 _headWaiterEntry._nextWaiterEntry = newWaiterEntry; 352 }
318 newWaiterEntry._prevWaiterEntry = _headWaiterEntry; 353 CurrentWaiterEntry.Reset();
319 354 return CurrentWaiterEntry;
320 } 355 }
321 // If the stack is not empty then put newWaiterEntry as the new head 356
322 // of the stack. 357 #region Waiters stack methods
323 else 358
324 { 359 /// <summary>
325 // Save the old first waiter entry 360 /// Push a new waiter into the waiter's stack
326 WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry; 361 /// </summary>
362 /// <param name="newWaiterEntry">A waiter to put in the stack</param>
363 public void PushWaiter(WaiterEntry newWaiterEntry)
364 {
365 // Remove the waiter if it is already in the stack and
366 // update waiter's count as needed
367 RemoveWaiter(newWaiterEntry, false);
368
369 // If the stack is empty then newWaiterEntry is the new head of the stack
370 if (null == _headWaiterEntry._nextWaiterEntry)
371 {
372 _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
373 newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
374
375 }
376 // If the stack is not empty then put newWaiterEntry as the new head
377 // of the stack.
378 else
379 {
380 // Save the old first waiter entry
381 WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
327 382
328 // Update the links 383 // Update the links
329 _headWaiterEntry._nextWaiterEntry = newWaiterEntry; 384 _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
330 newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry; 385 newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry;
331 newWaiterEntry._prevWaiterEntry = _headWaiterEntry; 386 newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
332 oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry; 387 oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry;
333 } 388 }
334 389
335 // Increment the number of waiters 390 // Increment the number of waiters
336 ++_waitersCount; 391 ++_waitersCount;
337 } 392 }
338 393
339 /// <summary> 394 /// <summary>
340 /// Pop a waiter from the waiter's stack 395 /// Pop a waiter from the waiter's stack
341 /// </summary> 396 /// </summary>
342 /// <returns>Returns the first waiter in the stack</returns> 397 /// <returns>Returns the first waiter in the stack</returns>
343 private WaiterEntry PopWaiter() 398 private WaiterEntry PopWaiter()
344 { 399 {
345 // Store the current stack head 400 // Store the current stack head
346 WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry; 401 WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
347 402
348 // Store the new stack head 403 // Store the new stack head
349 WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry; 404 WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry;
350 405
351 // Update the old stack head list links and decrement the number 406 // Update the old stack head list links and decrement the number
352 // waiters. 407 // waiters.
353 RemoveWaiter(oldFirstWaiterEntry, true); 408 RemoveWaiter(oldFirstWaiterEntry, true);
354 409
355 // Update the new stack head 410 // Update the new stack head
356 _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry; 411 _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry;
357 if (null != newHeadWaiterEntry) 412 if (null != newHeadWaiterEntry)
358 { 413 {
359 newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry; 414 newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry;
360 } 415 }
361 416
362 // Return the old stack head 417 // Return the old stack head
363 return oldFirstWaiterEntry; 418 return oldFirstWaiterEntry;
364 } 419 }
365 420
366 /// <summary> 421 /// <summary>
367 /// Remove a waiter from the stack 422 /// Remove a waiter from the stack
368 /// </summary> 423 /// </summary>
369 /// <param name="waiterEntry">A waiter entry to remove</param> 424 /// <param name="waiterEntry">A waiter entry to remove</param>
370 /// <param name="popDecrement">If true the waiter count is always decremented</param> 425 /// <param name="popDecrement">If true the waiter count is always decremented</param>
371 private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement) 426 private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement)
372 { 427 {
373 // Store the prev entry in the list 428 // Store the prev entry in the list
374 WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry; 429 WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry;
375 430
376 // Store the next entry in the list 431 // Store the next entry in the list
377 WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry; 432 WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry;
378 433
379 // A flag to indicate if we need to decrement the waiters count. 434 // A flag to indicate if we need to decrement the waiters count.
380 // If we got here from PopWaiter then we must decrement. 435 // If we got here from PopWaiter then we must decrement.
381 // If we got here from PushWaiter then we decrement only if 436 // If we got here from PushWaiter then we decrement only if
382 // the waiter was already in the stack. 437 // the waiter was already in the stack.
383 bool decrementCounter = popDecrement; 438 bool decrementCounter = popDecrement;
384 439
385 // Null the waiter's entry links 440 // Null the waiter's entry links
386 waiterEntry._prevWaiterEntry = null; 441 waiterEntry._prevWaiterEntry = null;
387 waiterEntry._nextWaiterEntry = null; 442 waiterEntry._nextWaiterEntry = null;
388 443
389 // If the waiter entry had a prev link then update it. 444 // If the waiter entry had a prev link then update it.
390 // It also means that the waiter is already in the list and we 445 // It also means that the waiter is already in the list and we
391 // need to decrement the waiters count. 446 // need to decrement the waiters count.
392 if (null != prevWaiterEntry) 447 if (null != prevWaiterEntry)
393 { 448 {
394 prevWaiterEntry._nextWaiterEntry = nextWaiterEntry; 449 prevWaiterEntry._nextWaiterEntry = nextWaiterEntry;
395 decrementCounter = true; 450 decrementCounter = true;
396 } 451 }
397 452
398 // If the waiter entry had a next link then update it. 453 // If the waiter entry had a next link then update it.
399 // It also means that the waiter is already in the list and we 454 // It also means that the waiter is already in the list and we
400 // need to decrement the waiters count. 455 // need to decrement the waiters count.
401 if (null != nextWaiterEntry) 456 if (null != nextWaiterEntry)
402 { 457 {
403 nextWaiterEntry._prevWaiterEntry = prevWaiterEntry; 458 nextWaiterEntry._prevWaiterEntry = prevWaiterEntry;
404 decrementCounter = true; 459 decrementCounter = true;
405 } 460 }
406 461
407 // Decrement the waiters count if needed 462 // Decrement the waiters count if needed
408 if (decrementCounter) 463 if (decrementCounter)
409 { 464 {
410 --_waitersCount; 465 --_waitersCount;
411 } 466 }
412 } 467 }
413 468
414 #endregion 469 #endregion
415 470
416 #endregion 471 #endregion
417 472
418 #region WaiterEntry class 473 #region WaiterEntry class
419 474
420 // A waiter entry in the _waiters queue. 475 // A waiter entry in the _waiters queue.
421 public class WaiterEntry : IDisposable 476 public sealed class WaiterEntry : IDisposable
422 { 477 {
423 #region Member variables 478 #region Member variables
424 479
425 /// <summary> 480 /// <summary>
426 /// Event to signal the waiter that it got the work item. 481 /// Event to signal the waiter that it got the work item.
427 /// </summary> 482 /// </summary>
428 private AutoResetEvent _waitHandle = new AutoResetEvent(false); 483 //private AutoResetEvent _waitHandle = new AutoResetEvent(false);
429 484 private AutoResetEvent _waitHandle = EventWaitHandleFactory.CreateAutoResetEvent();
430 /// <summary> 485
431 /// Flag to know if this waiter already quited from the queue 486 /// <summary>
432 /// because of a timeout. 487 /// Flag to know if this waiter already quited from the queue
433 /// </summary> 488 /// because of a timeout.
434 private bool _isTimedout = false; 489 /// </summary>
435 490 private bool _isTimedout = false;
436 /// <summary> 491
437 /// Flag to know if the waiter was signaled and got a work item. 492 /// <summary>
438 /// </summary> 493 /// Flag to know if the waiter was signaled and got a work item.
439 private bool _isSignaled = false; 494 /// </summary>
495 private bool _isSignaled = false;
440 496
441 /// <summary> 497 /// <summary>
442 /// A work item that passed directly to the waiter withou going 498 /// A work item that passed directly to the waiter withou going
443 /// through the queue 499 /// through the queue
444 /// </summary> 500 /// </summary>
445 private WorkItem _workItem = null; 501 private WorkItem _workItem = null;
446 502
447 private bool _isDisposed = false; 503 private bool _isDisposed = false;
448 504
449 // Linked list members 505 // Linked list members
450 internal WaiterEntry _nextWaiterEntry = null; 506 internal WaiterEntry _nextWaiterEntry = null;
451 internal WaiterEntry _prevWaiterEntry = null; 507 internal WaiterEntry _prevWaiterEntry = null;
452 508
453 #endregion 509 #endregion
454 510
455 #region Construction 511 #region Construction
456 512
457 public WaiterEntry() 513 public WaiterEntry()
458 { 514 {
459 Reset(); 515 Reset();
460 } 516 }
461 517
462 #endregion 518 #endregion
463 519
464 #region Public methods 520 #region Public methods
465 521
466 public WaitHandle WaitHandle 522 public WaitHandle WaitHandle
467 { 523 {
468 get { return _waitHandle; } 524 get { return _waitHandle; }
469 } 525 }
470 526
471 public WorkItem WorkItem 527 public WorkItem WorkItem
472 { 528 {
473 get 529 get
474 { 530 {
475 lock(this) 531 return _workItem;
476 { 532 }
477 return _workItem; 533 }
478 } 534
479 } 535 /// <summary>
480 } 536 /// Signal the waiter that it got a work item.
481 537 /// </summary>
482 /// <summary> 538 /// <returns>Return true on success</returns>
483 /// Signal the waiter that it got a work item. 539 /// The method fails if Timeout() preceded its call
484 /// </summary> 540 public bool Signal(WorkItem workItem)
485 /// <returns>Return true on success</returns> 541 {
486 /// The method fails if Timeout() preceded its call 542 lock(this)
487 public bool Signal(WorkItem workItem) 543 {
488 { 544 if (!_isTimedout)
489 lock(this) 545 {
490 { 546 _workItem = workItem;
491 if (!_isTimedout) 547 _isSignaled = true;
492 { 548 _waitHandle.Set();
493 _workItem = workItem; 549 return true;
494 _isSignaled = true; 550 }
495 _waitHandle.Set(); 551 }
496 return true; 552 return false;
497 } 553 }
498 } 554
499 return false; 555 /// <summary>
500 } 556 /// Mark the wait entry that it has been timed out
501 557 /// </summary>
502 /// <summary> 558 /// <returns>Return true on success</returns>
503 /// Mark the wait entry that it has been timed out 559 /// The method fails if Signal() preceded its call
504 /// </summary> 560 public bool Timeout()
505 /// <returns>Return true on success</returns> 561 {
506 /// The method fails if Signal() preceded its call 562 lock(this)
507 public bool Timeout() 563 {
508 { 564 // Time out can happen only if the waiter wasn't marked as
509 lock(this) 565 // signaled
566 if (!_isSignaled)
567 {
568 // We don't remove the waiter from the queue, the DequeueWorkItem
569 // method skips _waiters that were timed out.
570 _isTimedout = true;
571 return true;
572 }
573 }
574 return false;
575 }
576
577 /// <summary>
578 /// Reset the wait entry so it can be used again
579 /// </summary>
580 public void Reset()
581 {
582 _workItem = null;
583 _isTimedout = false;
584 _isSignaled = false;
585 _waitHandle.Reset();
586 }
587
588 /// <summary>
589 /// Free resources
590 /// </summary>
591 public void Close()
592 {
593 if (null != _waitHandle)
594 {
595 _waitHandle.Close();
596 _waitHandle = null;
597 }
598 }
599
600 #endregion
601
602 #region IDisposable Members
603
604 public void Dispose()
605 {
606 lock (this)
510 { 607 {
511 // Time out can happen only if the waiter wasn't marked as 608 if (!_isDisposed)
512 // signaled
513 if (!_isSignaled)
514 { 609 {
515 // We don't remove the waiter from the queue, the DequeueWorkItem 610 Close();
516 // method skips _waiters that were timed out.
517 _isTimedout = true;
518 return true;
519 } 611 }
520 }
521 return false;
522 }
523
524 /// <summary>
525 /// Reset the wait entry so it can be used again
526 /// </summary>
527 public void Reset()
528 {
529 _workItem = null;
530 _isTimedout = false;
531 _isSignaled = false;
532 _waitHandle.Reset();
533 }
534
535 /// <summary>
536 /// Free resources
537 /// </summary>
538 public void Close()
539 {
540 if (null != _waitHandle)
541 {
542 _waitHandle.Close();
543 _waitHandle = null;
544 }
545 }
546
547 #endregion
548
549 #region IDisposable Members
550
551 public void Dispose()
552 {
553 if (!_isDisposed)
554 {
555 Close();
556 _isDisposed = true; 612 _isDisposed = true;
557 } 613 }
558 } 614 }
559 615
560 ~WaiterEntry() 616 #endregion
561 { 617 }
562 Dispose();
563 }
564 618
565 #endregion 619 #endregion
566 }
567
568 #endregion
569 620
570 #region IDisposable Members 621 #region IDisposable Members
571 622
@@ -574,14 +625,8 @@ namespace Amib.Threading.Internal
574 if (!_isDisposed) 625 if (!_isDisposed)
575 { 626 {
576 Cleanup(); 627 Cleanup();
577 _isDisposed = true;
578 GC.SuppressFinalize(this);
579 } 628 }
580 } 629 _isDisposed = true;
581
582 ~WorkItemsQueue()
583 {
584 Cleanup();
585 } 630 }
586 631
587 private void ValidateNotDisposed() 632 private void ValidateNotDisposed()
@@ -595,6 +640,6 @@ namespace Amib.Threading.Internal
595 #endregion 640 #endregion
596 } 641 }
597 642
598 #endregion 643 #endregion
599} 644}
600 645