diff options
Diffstat (limited to '')
-rw-r--r-- | OpenSim/Framework/Monitoring/JobEngine.cs | 89 | ||||
-rw-r--r-- | OpenSim/Framework/Monitoring/WorkManager.cs | 10 | ||||
-rw-r--r-- | OpenSim/Framework/Util.cs | 10 | ||||
-rw-r--r-- | OpenSim/Framework/WebUtil.cs | 20 |
4 files changed, 64 insertions, 65 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs index a6a059d..115871e 100644 --- a/OpenSim/Framework/Monitoring/JobEngine.cs +++ b/OpenSim/Framework/Monitoring/JobEngine.cs | |||
@@ -57,7 +57,8 @@ namespace OpenSim.Framework.Monitoring | |||
57 | /// <remarks> | 57 | /// <remarks> |
58 | /// Will be null if no job is currently running. | 58 | /// Will be null if no job is currently running. |
59 | /// </remarks> | 59 | /// </remarks> |
60 | public Job CurrentJob { get; private set; } | 60 | private Job m_currentJob; |
61 | public Job CurrentJob { get { return m_currentJob;} } | ||
61 | 62 | ||
62 | /// <summary> | 63 | /// <summary> |
63 | /// Number of jobs waiting to be processed. | 64 | /// Number of jobs waiting to be processed. |
@@ -82,16 +83,15 @@ namespace OpenSim.Framework.Monitoring | |||
82 | 83 | ||
83 | private CancellationTokenSource m_cancelSource; | 84 | private CancellationTokenSource m_cancelSource; |
84 | 85 | ||
85 | /// <summary> | 86 | private int m_timeout = -1; |
86 | /// Used to signal that we are ready to complete stop. | 87 | |
87 | /// </summary> | 88 | private bool m_threadRunnig = false; |
88 | private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); | ||
89 | 89 | ||
90 | public JobEngine(string name, string loggingName) | 90 | public JobEngine(string name, string loggingName, int timeout = -1) |
91 | { | 91 | { |
92 | Name = name; | 92 | Name = name; |
93 | LoggingName = loggingName; | 93 | LoggingName = loggingName; |
94 | 94 | m_timeout = timeout; | |
95 | RequestProcessTimeoutOnStop = 5000; | 95 | RequestProcessTimeoutOnStop = 5000; |
96 | } | 96 | } |
97 | 97 | ||
@@ -104,18 +104,9 @@ namespace OpenSim.Framework.Monitoring | |||
104 | 104 | ||
105 | IsRunning = true; | 105 | IsRunning = true; |
106 | 106 | ||
107 | m_finishedProcessingAfterStop.Reset(); | ||
108 | |||
109 | m_cancelSource = new CancellationTokenSource(); | 107 | m_cancelSource = new CancellationTokenSource(); |
110 | 108 | WorkManager.RunInThreadPool(ProcessRequests, null, Name, false); | |
111 | WorkManager.StartThread( | 109 | m_threadRunnig = true; |
112 | ProcessRequests, | ||
113 | Name, | ||
114 | ThreadPriority.Normal, | ||
115 | false, | ||
116 | true, | ||
117 | null, | ||
118 | int.MaxValue); | ||
119 | } | 110 | } |
120 | } | 111 | } |
121 | 112 | ||
@@ -131,20 +122,16 @@ namespace OpenSim.Framework.Monitoring | |||
131 | m_log.DebugFormat("[JobEngine] Stopping {0}", Name); | 122 | m_log.DebugFormat("[JobEngine] Stopping {0}", Name); |
132 | 123 | ||
133 | IsRunning = false; | 124 | IsRunning = false; |
134 | 125 | if(m_threadRunnig) | |
135 | m_finishedProcessingAfterStop.Reset(); | 126 | { |
136 | if(m_jobQueue.Count <= 0) | ||
137 | m_cancelSource.Cancel(); | 127 | m_cancelSource.Cancel(); |
138 | 128 | m_threadRunnig = false; | |
139 | m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop); | 129 | } |
140 | m_finishedProcessingAfterStop.Close(); | ||
141 | } | 130 | } |
142 | finally | 131 | finally |
143 | { | 132 | { |
144 | if(m_cancelSource != null) | 133 | if(m_cancelSource != null) |
145 | m_cancelSource.Dispose(); | 134 | m_cancelSource.Dispose(); |
146 | if(m_finishedProcessingAfterStop != null) | ||
147 | m_finishedProcessingAfterStop.Dispose(); | ||
148 | } | 135 | } |
149 | } | 136 | } |
150 | } | 137 | } |
@@ -203,6 +190,18 @@ namespace OpenSim.Framework.Monitoring | |||
203 | /// </param> | 190 | /// </param> |
204 | public bool QueueJob(Job job) | 191 | public bool QueueJob(Job job) |
205 | { | 192 | { |
193 | lock(JobLock) | ||
194 | { | ||
195 | if(!IsRunning) | ||
196 | return false; | ||
197 | |||
198 | if(!m_threadRunnig) | ||
199 | { | ||
200 | WorkManager.RunInThreadPool(ProcessRequests, null, Name, false); | ||
201 | m_threadRunnig = true; | ||
202 | } | ||
203 | } | ||
204 | |||
206 | if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) | 205 | if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) |
207 | { | 206 | { |
208 | m_jobQueue.Add(job); | 207 | m_jobQueue.Add(job); |
@@ -222,59 +221,53 @@ namespace OpenSim.Framework.Monitoring | |||
222 | 221 | ||
223 | m_warnOverMaxQueue = false; | 222 | m_warnOverMaxQueue = false; |
224 | } | 223 | } |
225 | |||
226 | return false; | 224 | return false; |
227 | } | 225 | } |
228 | } | 226 | } |
229 | 227 | ||
230 | private void ProcessRequests() | 228 | private void ProcessRequests(Object o) |
231 | { | 229 | { |
232 | while(IsRunning || m_jobQueue.Count > 0) | 230 | while(IsRunning) |
233 | { | 231 | { |
234 | try | 232 | try |
235 | { | 233 | { |
236 | CurrentJob = m_jobQueue.Take(m_cancelSource.Token); | 234 | if(!m_jobQueue.TryTake(out m_currentJob, m_timeout, m_cancelSource.Token)) |
237 | } | ||
238 | catch(ObjectDisposedException e) | ||
239 | { | ||
240 | // If we see this whilst not running then it may be due to a race where this thread checks | ||
241 | // IsRunning after the stopping thread sets it to false and disposes of the cancellation source. | ||
242 | if(IsRunning) | ||
243 | throw e; | ||
244 | else | ||
245 | { | 235 | { |
246 | m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue", | 236 | lock(JobLock) |
247 | Name,m_jobQueue.Count); | 237 | m_threadRunnig = false; |
248 | break; | 238 | break; |
249 | } | 239 | } |
250 | } | 240 | } |
241 | catch(ObjectDisposedException e) | ||
242 | { | ||
243 | m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue", | ||
244 | Name,m_jobQueue.Count); | ||
245 | break; | ||
246 | } | ||
251 | catch(OperationCanceledException) | 247 | catch(OperationCanceledException) |
252 | { | 248 | { |
253 | break; | 249 | break; |
254 | } | 250 | } |
255 | 251 | ||
256 | if(LogLevel >= 1) | 252 | if(LogLevel >= 1) |
257 | m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,CurrentJob.Name); | 253 | m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,m_currentJob.Name); |
258 | 254 | ||
259 | try | 255 | try |
260 | { | 256 | { |
261 | CurrentJob.Action(); | 257 | m_currentJob.Action(); |
262 | } | 258 | } |
263 | catch(Exception e) | 259 | catch(Exception e) |
264 | { | 260 | { |
265 | m_log.Error( | 261 | m_log.Error( |
266 | string.Format( | 262 | string.Format( |
267 | "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,CurrentJob.Name),e); | 263 | "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,m_currentJob.Name),e); |
268 | } | 264 | } |
269 | 265 | ||
270 | if(LogLevel >= 1) | 266 | if(LogLevel >= 1) |
271 | m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,CurrentJob.Name); | 267 | m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,m_currentJob.Name); |
272 | 268 | ||
273 | CurrentJob = null; | 269 | m_currentJob = null; |
274 | } | 270 | } |
275 | |||
276 | Watchdog.RemoveThread(false); | ||
277 | m_finishedProcessingAfterStop.Set(); | ||
278 | } | 271 | } |
279 | 272 | ||
280 | public class Job | 273 | public class Job |
diff --git a/OpenSim/Framework/Monitoring/WorkManager.cs b/OpenSim/Framework/Monitoring/WorkManager.cs index 9d52f71..5d9b185 100644 --- a/OpenSim/Framework/Monitoring/WorkManager.cs +++ b/OpenSim/Framework/Monitoring/WorkManager.cs | |||
@@ -57,7 +57,7 @@ namespace OpenSim.Framework.Monitoring | |||
57 | 57 | ||
58 | static WorkManager() | 58 | static WorkManager() |
59 | { | 59 | { |
60 | JobEngine = new JobEngine("Non-blocking non-critical job engine", "JOB ENGINE"); | 60 | JobEngine = new JobEngine("Non-blocking non-critical job engine", "JOB ENGINE", 30000); |
61 | 61 | ||
62 | StatsManager.RegisterStat( | 62 | StatsManager.RegisterStat( |
63 | new Stat( | 63 | new Stat( |
@@ -182,9 +182,9 @@ namespace OpenSim.Framework.Monitoring | |||
182 | /// <param name="callback"></param> | 182 | /// <param name="callback"></param> |
183 | /// <param name="obj"></param> | 183 | /// <param name="obj"></param> |
184 | /// <param name="name">The name of the job. This is used in monitoring and debugging.</param> | 184 | /// <param name="name">The name of the job. This is used in monitoring and debugging.</param> |
185 | public static void RunInThreadPool(System.Threading.WaitCallback callback, object obj, string name) | 185 | public static void RunInThreadPool(System.Threading.WaitCallback callback, object obj, string name, bool timeout = true) |
186 | { | 186 | { |
187 | Util.FireAndForget(callback, obj, name); | 187 | Util.FireAndForget(callback, obj, name, timeout); |
188 | } | 188 | } |
189 | 189 | ||
190 | /// <summary> | 190 | /// <summary> |
@@ -231,10 +231,8 @@ namespace OpenSim.Framework.Monitoring | |||
231 | JobEngine.QueueJob(name, () => callback(obj)); | 231 | JobEngine.QueueJob(name, () => callback(obj)); |
232 | else if (canRunInThisThread) | 232 | else if (canRunInThisThread) |
233 | callback(obj); | 233 | callback(obj); |
234 | else if (mustNotTimeout) | ||
235 | RunInThread(callback, obj, name, log); | ||
236 | else | 234 | else |
237 | Util.FireAndForget(callback, obj, name); | 235 | Util.FireAndForget(callback, obj, name, !mustNotTimeout); |
238 | } | 236 | } |
239 | 237 | ||
240 | private static void HandleControlCommand(string module, string[] args) | 238 | private static void HandleControlCommand(string module, string[] args) |
diff --git a/OpenSim/Framework/Util.cs b/OpenSim/Framework/Util.cs index f52a84c..9a1e348 100644 --- a/OpenSim/Framework/Util.cs +++ b/OpenSim/Framework/Util.cs | |||
@@ -2492,8 +2492,9 @@ namespace OpenSim.Framework | |||
2492 | public bool Running { get; set; } | 2492 | public bool Running { get; set; } |
2493 | public bool Aborted { get; set; } | 2493 | public bool Aborted { get; set; } |
2494 | private int started; | 2494 | private int started; |
2495 | public bool DoTimeout; | ||
2495 | 2496 | ||
2496 | public ThreadInfo(long threadFuncNum, string context) | 2497 | public ThreadInfo(long threadFuncNum, string context, bool dotimeout = true) |
2497 | { | 2498 | { |
2498 | ThreadFuncNum = threadFuncNum; | 2499 | ThreadFuncNum = threadFuncNum; |
2499 | this.context = context; | 2500 | this.context = context; |
@@ -2501,6 +2502,7 @@ namespace OpenSim.Framework | |||
2501 | Thread = null; | 2502 | Thread = null; |
2502 | Running = false; | 2503 | Running = false; |
2503 | Aborted = false; | 2504 | Aborted = false; |
2505 | DoTimeout = dotimeout; | ||
2504 | } | 2506 | } |
2505 | 2507 | ||
2506 | public void Started() | 2508 | public void Started() |
@@ -2571,7 +2573,7 @@ namespace OpenSim.Framework | |||
2571 | foreach (KeyValuePair<long, ThreadInfo> entry in activeThreads) | 2573 | foreach (KeyValuePair<long, ThreadInfo> entry in activeThreads) |
2572 | { | 2574 | { |
2573 | ThreadInfo t = entry.Value; | 2575 | ThreadInfo t = entry.Value; |
2574 | if (t.Running && !t.Aborted && (t.Elapsed() >= THREAD_TIMEOUT)) | 2576 | if (t.DoTimeout && t.Running && !t.Aborted && (t.Elapsed() >= THREAD_TIMEOUT)) |
2575 | { | 2577 | { |
2576 | m_log.WarnFormat("Timeout in threadfunc {0} ({1}) {2}", t.ThreadFuncNum, t.Thread.Name, t.GetStackTrace()); | 2578 | m_log.WarnFormat("Timeout in threadfunc {0} ({1}) {2}", t.ThreadFuncNum, t.Thread.Name, t.GetStackTrace()); |
2577 | t.Abort(); | 2579 | t.Abort(); |
@@ -2612,7 +2614,7 @@ namespace OpenSim.Framework | |||
2612 | FireAndForget(callback, obj, null); | 2614 | FireAndForget(callback, obj, null); |
2613 | } | 2615 | } |
2614 | 2616 | ||
2615 | public static void FireAndForget(System.Threading.WaitCallback callback, object obj, string context) | 2617 | public static void FireAndForget(System.Threading.WaitCallback callback, object obj, string context, bool dotimeout = true) |
2616 | { | 2618 | { |
2617 | Interlocked.Increment(ref numTotalThreadFuncsCalled); | 2619 | Interlocked.Increment(ref numTotalThreadFuncsCalled); |
2618 | 2620 | ||
@@ -2634,7 +2636,7 @@ namespace OpenSim.Framework | |||
2634 | bool loggingEnabled = LogThreadPool > 0; | 2636 | bool loggingEnabled = LogThreadPool > 0; |
2635 | 2637 | ||
2636 | long threadFuncNum = Interlocked.Increment(ref nextThreadFuncNum); | 2638 | long threadFuncNum = Interlocked.Increment(ref nextThreadFuncNum); |
2637 | ThreadInfo threadInfo = new ThreadInfo(threadFuncNum, context); | 2639 | ThreadInfo threadInfo = new ThreadInfo(threadFuncNum, context, dotimeout); |
2638 | 2640 | ||
2639 | if (FireAndForgetMethod == FireAndForgetMethod.RegressionTest) | 2641 | if (FireAndForgetMethod == FireAndForgetMethod.RegressionTest) |
2640 | { | 2642 | { |
diff --git a/OpenSim/Framework/WebUtil.cs b/OpenSim/Framework/WebUtil.cs index 7b085d0..48078ad 100644 --- a/OpenSim/Framework/WebUtil.cs +++ b/OpenSim/Framework/WebUtil.cs | |||
@@ -1262,18 +1262,24 @@ namespace OpenSim.Framework | |||
1262 | { | 1262 | { |
1263 | if (hwr.StatusCode == HttpStatusCode.NotFound) | 1263 | if (hwr.StatusCode == HttpStatusCode.NotFound) |
1264 | return deserial; | 1264 | return deserial; |
1265 | |||
1265 | if (hwr.StatusCode == HttpStatusCode.Unauthorized) | 1266 | if (hwr.StatusCode == HttpStatusCode.Unauthorized) |
1266 | { | 1267 | { |
1267 | m_log.Error(string.Format( | 1268 | m_log.ErrorFormat("[SynchronousRestObjectRequester]: Web request {0} requires authentication", |
1268 | "[SynchronousRestObjectRequester]: Web request {0} requires authentication ", | 1269 | requestUrl); |
1269 | requestUrl)); | 1270 | } |
1270 | return deserial; | 1271 | else |
1272 | { | ||
1273 | m_log.WarnFormat("[SynchronousRestObjectRequester]: Web request {0} returned error: {1}", | ||
1274 | requestUrl, hwr.StatusCode); | ||
1271 | } | 1275 | } |
1272 | } | 1276 | } |
1273 | else | 1277 | else |
1274 | m_log.Error(string.Format( | 1278 | m_log.ErrorFormat( |
1275 | "[SynchronousRestObjectRequester]: WebException for {0} {1} {2} ", | 1279 | "[SynchronousRestObjectRequester]: WebException for {0} {1} {2} {3}", |
1276 | verb, requestUrl, typeof(TResponse).ToString()), e); | 1280 | verb, requestUrl, typeof(TResponse).ToString(), e.Message); |
1281 | |||
1282 | return deserial; | ||
1277 | } | 1283 | } |
1278 | } | 1284 | } |
1279 | catch (System.InvalidOperationException) | 1285 | catch (System.InvalidOperationException) |