aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Parallel.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Parallel.cs')
-rw-r--r--OpenSim/Framework/Parallel.cs207
1 files changed, 207 insertions, 0 deletions
diff --git a/OpenSim/Framework/Parallel.cs b/OpenSim/Framework/Parallel.cs
new file mode 100644
index 0000000..74537ba
--- /dev/null
+++ b/OpenSim/Framework/Parallel.cs
@@ -0,0 +1,207 @@
1/*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28using System;
29using System.Collections.Generic;
30using System.Threading;
31
32namespace OpenSim.Framework
33{
34 /// <summary>
35 /// Provides helper methods for parallelizing loops
36 /// </summary>
37 public static class Parallel
38 {
39 private static readonly int processorCount = System.Environment.ProcessorCount;
40
41 /// <summary>
42 /// Executes a for loop in which iterations may run in parallel
43 /// </summary>
44 /// <param name="fromInclusive">The loop will be started at this index</param>
45 /// <param name="toExclusive">The loop will be terminated before this index is reached</param>
46 /// <param name="body">Method body to run for each iteration of the loop</param>
47 public static void For(int fromInclusive, int toExclusive, Action<int> body)
48 {
49 For(processorCount, fromInclusive, toExclusive, body);
50 }
51
52 /// <summary>
53 /// Executes a for loop in which iterations may run in parallel
54 /// </summary>
55 /// <param name="threadCount">The number of concurrent execution threads to run</param>
56 /// <param name="fromInclusive">The loop will be started at this index</param>
57 /// <param name="toExclusive">The loop will be terminated before this index is reached</param>
58 /// <param name="body">Method body to run for each iteration of the loop</param>
59 public static void For(int threadCount, int fromInclusive, int toExclusive, Action<int> body)
60 {
61 int counter = threadCount;
62 AutoResetEvent threadFinishEvent = new AutoResetEvent(false);
63 Exception exception = null;
64
65 --fromInclusive;
66
67 for (int i = 0; i < threadCount; i++)
68 {
69 ThreadPool.QueueUserWorkItem(
70 delegate(object o)
71 {
72 int threadIndex = (int)o;
73
74 while (exception == null)
75 {
76 int currentIndex = Interlocked.Increment(ref fromInclusive);
77
78 if (currentIndex >= toExclusive)
79 break;
80
81 try { body(currentIndex); }
82 catch (Exception ex) { exception = ex; break; }
83 }
84
85 if (Interlocked.Decrement(ref counter) == 0)
86 threadFinishEvent.Set();
87 }, i
88 );
89 }
90
91 threadFinishEvent.WaitOne();
92
93 if (exception != null)
94 throw new Exception(exception.Message, exception);
95 }
96
97 /// <summary>
98 /// Executes a foreach loop in which iterations may run in parallel
99 /// </summary>
100 /// <typeparam name="T">Object type that the collection wraps</typeparam>
101 /// <param name="enumerable">An enumerable collection to iterate over</param>
102 /// <param name="body">Method body to run for each object in the collection</param>
103 public static void ForEach<T>(IEnumerable<T> enumerable, Action<T> body)
104 {
105 ForEach<T>(processorCount, enumerable, body);
106 }
107
108 /// <summary>
109 /// Executes a foreach loop in which iterations may run in parallel
110 /// </summary>
111 /// <typeparam name="T">Object type that the collection wraps</typeparam>
112 /// <param name="threadCount">The number of concurrent execution threads to run</param>
113 /// <param name="enumerable">An enumerable collection to iterate over</param>
114 /// <param name="body">Method body to run for each object in the collection</param>
115 public static void ForEach<T>(int threadCount, IEnumerable<T> enumerable, Action<T> body)
116 {
117 int counter = threadCount;
118 AutoResetEvent threadFinishEvent = new AutoResetEvent(false);
119 IEnumerator<T> enumerator = enumerable.GetEnumerator();
120 Exception exception = null;
121
122 for (int i = 0; i < threadCount; i++)
123 {
124 ThreadPool.QueueUserWorkItem(
125 delegate(object o)
126 {
127 int threadIndex = (int)o;
128
129 while (exception == null)
130 {
131 T entry;
132
133 lock (enumerator)
134 {
135 if (!enumerator.MoveNext())
136 break;
137 entry = (T)enumerator.Current; // Explicit typecast for Mono's sake
138 }
139
140 try { body(entry); }
141 catch (Exception ex) { exception = ex; break; }
142 }
143
144 if (Interlocked.Decrement(ref counter) == 0)
145 threadFinishEvent.Set();
146 }, i
147 );
148 }
149
150 threadFinishEvent.WaitOne();
151
152 if (exception != null)
153 throw new Exception(exception.Message, exception);
154 }
155
156 /// <summary>
157 /// Executes a series of tasks in parallel
158 /// </summary>
159 /// <param name="actions">A series of method bodies to execute</param>
160 public static void Invoke(params Action[] actions)
161 {
162 Invoke(processorCount, actions);
163 }
164
165 /// <summary>
166 /// Executes a series of tasks in parallel
167 /// </summary>
168 /// <param name="threadCount">The number of concurrent execution threads to run</param>
169 /// <param name="actions">A series of method bodies to execute</param>
170 public static void Invoke(int threadCount, params Action[] actions)
171 {
172 int counter = threadCount;
173 AutoResetEvent threadFinishEvent = new AutoResetEvent(false);
174 int index = -1;
175 Exception exception = null;
176
177 for (int i = 0; i < threadCount; i++)
178 {
179 ThreadPool.QueueUserWorkItem(
180 delegate(object o)
181 {
182 int threadIndex = (int)o;
183
184 while (exception == null)
185 {
186 int currentIndex = Interlocked.Increment(ref index);
187
188 if (currentIndex >= actions.Length)
189 break;
190
191 try { actions[currentIndex](); }
192 catch (Exception ex) { exception = ex; break; }
193 }
194
195 if (Interlocked.Decrement(ref counter) == 0)
196 threadFinishEvent.Set();
197 }, i
198 );
199 }
200
201 threadFinishEvent.WaitOne();
202
203 if (exception != null)
204 throw new Exception(exception.Message, exception);
205 }
206 }
207}