aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework')
-rw-r--r--OpenSim/Framework/Parallel.cs207
-rw-r--r--OpenSim/Framework/ThreadTracker.cs127
-rw-r--r--OpenSim/Framework/Util.cs36
3 files changed, 248 insertions, 122 deletions
diff --git a/OpenSim/Framework/Parallel.cs b/OpenSim/Framework/Parallel.cs
new file mode 100644
index 0000000..74537ba
--- /dev/null
+++ b/OpenSim/Framework/Parallel.cs
@@ -0,0 +1,207 @@
1/*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28using System;
29using System.Collections.Generic;
30using System.Threading;
31
32namespace OpenSim.Framework
33{
34 /// <summary>
35 /// Provides helper methods for parallelizing loops
36 /// </summary>
37 public static class Parallel
38 {
39 private static readonly int processorCount = System.Environment.ProcessorCount;
40
41 /// <summary>
42 /// Executes a for loop in which iterations may run in parallel
43 /// </summary>
44 /// <param name="fromInclusive">The loop will be started at this index</param>
45 /// <param name="toExclusive">The loop will be terminated before this index is reached</param>
46 /// <param name="body">Method body to run for each iteration of the loop</param>
47 public static void For(int fromInclusive, int toExclusive, Action<int> body)
48 {
49 For(processorCount, fromInclusive, toExclusive, body);
50 }
51
52 /// <summary>
53 /// Executes a for loop in which iterations may run in parallel
54 /// </summary>
55 /// <param name="threadCount">The number of concurrent execution threads to run</param>
56 /// <param name="fromInclusive">The loop will be started at this index</param>
57 /// <param name="toExclusive">The loop will be terminated before this index is reached</param>
58 /// <param name="body">Method body to run for each iteration of the loop</param>
59 public static void For(int threadCount, int fromInclusive, int toExclusive, Action<int> body)
60 {
61 int counter = threadCount;
62 AutoResetEvent threadFinishEvent = new AutoResetEvent(false);
63 Exception exception = null;
64
65 --fromInclusive;
66
67 for (int i = 0; i < threadCount; i++)
68 {
69 ThreadPool.QueueUserWorkItem(
70 delegate(object o)
71 {
72 int threadIndex = (int)o;
73
74 while (exception == null)
75 {
76 int currentIndex = Interlocked.Increment(ref fromInclusive);
77
78 if (currentIndex >= toExclusive)
79 break;
80
81 try { body(currentIndex); }
82 catch (Exception ex) { exception = ex; break; }
83 }
84
85 if (Interlocked.Decrement(ref counter) == 0)
86 threadFinishEvent.Set();
87 }, i
88 );
89 }
90
91 threadFinishEvent.WaitOne();
92
93 if (exception != null)
94 throw new Exception(exception.Message, exception);
95 }
96
97 /// <summary>
98 /// Executes a foreach loop in which iterations may run in parallel
99 /// </summary>
100 /// <typeparam name="T">Object type that the collection wraps</typeparam>
101 /// <param name="enumerable">An enumerable collection to iterate over</param>
102 /// <param name="body">Method body to run for each object in the collection</param>
103 public static void ForEach<T>(IEnumerable<T> enumerable, Action<T> body)
104 {
105 ForEach<T>(processorCount, enumerable, body);
106 }
107
108 /// <summary>
109 /// Executes a foreach loop in which iterations may run in parallel
110 /// </summary>
111 /// <typeparam name="T">Object type that the collection wraps</typeparam>
112 /// <param name="threadCount">The number of concurrent execution threads to run</param>
113 /// <param name="enumerable">An enumerable collection to iterate over</param>
114 /// <param name="body">Method body to run for each object in the collection</param>
115 public static void ForEach<T>(int threadCount, IEnumerable<T> enumerable, Action<T> body)
116 {
117 int counter = threadCount;
118 AutoResetEvent threadFinishEvent = new AutoResetEvent(false);
119 IEnumerator<T> enumerator = enumerable.GetEnumerator();
120 Exception exception = null;
121
122 for (int i = 0; i < threadCount; i++)
123 {
124 ThreadPool.QueueUserWorkItem(
125 delegate(object o)
126 {
127 int threadIndex = (int)o;
128
129 while (exception == null)
130 {
131 T entry;
132
133 lock (enumerator)
134 {
135 if (!enumerator.MoveNext())
136 break;
137 entry = (T)enumerator.Current; // Explicit typecast for Mono's sake
138 }
139
140 try { body(entry); }
141 catch (Exception ex) { exception = ex; break; }
142 }
143
144 if (Interlocked.Decrement(ref counter) == 0)
145 threadFinishEvent.Set();
146 }, i
147 );
148 }
149
150 threadFinishEvent.WaitOne();
151
152 if (exception != null)
153 throw new Exception(exception.Message, exception);
154 }
155
156 /// <summary>
157 /// Executes a series of tasks in parallel
158 /// </summary>
159 /// <param name="actions">A series of method bodies to execute</param>
160 public static void Invoke(params Action[] actions)
161 {
162 Invoke(processorCount, actions);
163 }
164
165 /// <summary>
166 /// Executes a series of tasks in parallel
167 /// </summary>
168 /// <param name="threadCount">The number of concurrent execution threads to run</param>
169 /// <param name="actions">A series of method bodies to execute</param>
170 public static void Invoke(int threadCount, params Action[] actions)
171 {
172 int counter = threadCount;
173 AutoResetEvent threadFinishEvent = new AutoResetEvent(false);
174 int index = -1;
175 Exception exception = null;
176
177 for (int i = 0; i < threadCount; i++)
178 {
179 ThreadPool.QueueUserWorkItem(
180 delegate(object o)
181 {
182 int threadIndex = (int)o;
183
184 while (exception == null)
185 {
186 int currentIndex = Interlocked.Increment(ref index);
187
188 if (currentIndex >= actions.Length)
189 break;
190
191 try { actions[currentIndex](); }
192 catch (Exception ex) { exception = ex; break; }
193 }
194
195 if (Interlocked.Decrement(ref counter) == 0)
196 threadFinishEvent.Set();
197 }, i
198 );
199 }
200
201 threadFinishEvent.WaitOne();
202
203 if (exception != null)
204 throw new Exception(exception.Message, exception);
205 }
206 }
207}
diff --git a/OpenSim/Framework/ThreadTracker.cs b/OpenSim/Framework/ThreadTracker.cs
index d3a239d..b68d9b3 100644
--- a/OpenSim/Framework/ThreadTracker.cs
+++ b/OpenSim/Framework/ThreadTracker.cs
@@ -26,138 +26,21 @@
26 */ 26 */
27 27
28using System; 28using System;
29using System.Collections;
30using System.Collections.Generic; 29using System.Collections.Generic;
31using System.Reflection; 30using System.Reflection;
32using System.Threading; 31using System.Diagnostics;
33using log4net; 32using log4net;
34 33
35namespace OpenSim.Framework 34namespace OpenSim.Framework
36{ 35{
37 public static class ThreadTracker 36 public static class ThreadTracker
38 { 37 {
39 private static readonly ILog m_log 38 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
40 = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
41
42 private static readonly long ThreadTimeout = 30 * 10000000;
43 public static List<ThreadTrackerItem> m_Threads;
44 public static Thread ThreadTrackerThread;
45 39
46 static ThreadTracker() 40 public static ProcessThreadCollection GetThreads()
47 { 41 {
48#if DEBUG 42 Process thisProc = Process.GetCurrentProcess();
49 m_Threads = new List<ThreadTrackerItem>(); 43 return thisProc.Threads;
50 ThreadTrackerThread = new Thread(ThreadTrackerThreadLoop);
51 ThreadTrackerThread.Name = "ThreadTrackerThread";
52 ThreadTrackerThread.IsBackground = true;
53 ThreadTrackerThread.Priority = ThreadPriority.BelowNormal;
54 ThreadTrackerThread.Start();
55 Add(ThreadTrackerThread);
56#endif
57 } 44 }
58
59 private static void ThreadTrackerThreadLoop()
60 {
61 try
62 {
63 while (true)
64 {
65 Thread.Sleep(5000);
66 CleanUp();
67 }
68 }
69 catch (Exception e)
70 {
71 m_log.ErrorFormat(
72 "[THREAD TRACKER]: Thread tracker cleanup thread terminating with exception. Please report this error. Exception is {0}",
73 e);
74 }
75 }
76
77 public static void Add(Thread thread)
78 {
79#if DEBUG
80 if (thread != null)
81 {
82 lock (m_Threads)
83 {
84 ThreadTrackerItem tti = new ThreadTrackerItem();
85 tti.Thread = thread;
86 tti.LastSeenActive = DateTime.Now.Ticks;
87 m_Threads.Add(tti);
88 }
89 }
90#endif
91 }
92
93 public static void Remove(Thread thread)
94 {
95#if DEBUG
96 lock (m_Threads)
97 {
98 foreach (ThreadTrackerItem tti in new ArrayList(m_Threads))
99 {
100 if (tti.Thread == thread)
101 m_Threads.Remove(tti);
102 }
103 }
104#endif
105 }
106
107 public static void CleanUp()
108 {
109 lock (m_Threads)
110 {
111 foreach (ThreadTrackerItem tti in new ArrayList(m_Threads))
112 {
113 try
114 {
115
116
117 if (tti.Thread.IsAlive)
118 {
119 // Its active
120 tti.LastSeenActive = DateTime.Now.Ticks;
121 }
122 else
123 {
124 // Its not active -- if its expired then remove it
125 if (tti.LastSeenActive + ThreadTimeout < DateTime.Now.Ticks)
126 m_Threads.Remove(tti);
127 }
128 }
129 catch (NullReferenceException)
130 {
131 m_Threads.Remove(tti);
132 }
133 }
134 }
135 }
136
137 public static List<Thread> GetThreads()
138 {
139 if (m_Threads == null)
140 return null;
141
142 List<Thread> threads = new List<Thread>();
143 lock (m_Threads)
144 {
145 foreach (ThreadTrackerItem tti in new ArrayList(m_Threads))
146 {
147 threads.Add(tti.Thread);
148 }
149 }
150 return threads;
151 }
152
153 #region Nested type: ThreadTrackerItem
154
155 public class ThreadTrackerItem
156 {
157 public long LastSeenActive;
158 public Thread Thread;
159 }
160
161 #endregion
162 } 45 }
163} 46}
diff --git a/OpenSim/Framework/Util.cs b/OpenSim/Framework/Util.cs
index 0851d26..189fa38 100644
--- a/OpenSim/Framework/Util.cs
+++ b/OpenSim/Framework/Util.cs
@@ -1231,6 +1231,42 @@ namespace OpenSim.Framework
1231 return (ipaddr1 != null) ? "http://" + ipaddr1.ToString() + ":" + port1 : uri; 1231 return (ipaddr1 != null) ? "http://" + ipaddr1.ToString() + ":" + port1 : uri;
1232 } 1232 }
1233 1233
1234 public static byte[] StringToBytes256(string str)
1235 {
1236 if (String.IsNullOrEmpty(str)) { return Utils.EmptyBytes; }
1237 if (str.Length > 254) str = str.Remove(254);
1238 if (!str.EndsWith("\0")) { str += "\0"; }
1239
1240 // Because this is UTF-8 encoding and not ASCII, it's possible we
1241 // might have gotten an oversized array even after the string trim
1242 byte[] data = UTF8.GetBytes(str);
1243 if (data.Length > 256)
1244 {
1245 Array.Resize<byte>(ref data, 256);
1246 data[255] = 0;
1247 }
1248
1249 return data;
1250 }
1251
1252 public static byte[] StringToBytes1024(string str)
1253 {
1254 if (String.IsNullOrEmpty(str)) { return Utils.EmptyBytes; }
1255 if (str.Length > 1023) str = str.Remove(1023);
1256 if (!str.EndsWith("\0")) { str += "\0"; }
1257
1258 // Because this is UTF-8 encoding and not ASCII, it's possible we
1259 // might have gotten an oversized array even after the string trim
1260 byte[] data = UTF8.GetBytes(str);
1261 if (data.Length > 1024)
1262 {
1263 Array.Resize<byte>(ref data, 1024);
1264 data[1023] = 0;
1265 }
1266
1267 return data;
1268 }
1269
1234 #region FireAndForget Threading Pattern 1270 #region FireAndForget Threading Pattern
1235 1271
1236 public static void FireAndForget(System.Threading.WaitCallback callback) 1272 public static void FireAndForget(System.Threading.WaitCallback callback)