aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--OpenSim/Framework/Monitoring/JobEngine.cs106
-rw-r--r--OpenSim/Framework/Monitoring/Watchdog.cs50
2 files changed, 73 insertions, 83 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs
index 75ad75d..7709f62 100644
--- a/OpenSim/Framework/Monitoring/JobEngine.cs
+++ b/OpenSim/Framework/Monitoring/JobEngine.cs
@@ -40,6 +40,8 @@ namespace OpenSim.Framework.Monitoring
40 40
41 public int LogLevel { get; set; } 41 public int LogLevel { get; set; }
42 42
43 private object JobLock = new object();
44
43 public string Name { get; private set; } 45 public string Name { get; private set; }
44 46
45 public string LoggingName { get; private set; } 47 public string LoggingName { get; private set; }
@@ -95,7 +97,7 @@ namespace OpenSim.Framework.Monitoring
95 97
96 public void Start() 98 public void Start()
97 { 99 {
98 lock (this) 100 lock (JobLock)
99 { 101 {
100 if (IsRunning) 102 if (IsRunning)
101 return; 103 return;
@@ -119,43 +121,22 @@ namespace OpenSim.Framework.Monitoring
119 121
120 public void Stop() 122 public void Stop()
121 { 123 {
122 lock (this) 124 lock (JobLock)
123 { 125 {
124 try 126 try
125 { 127 {
126 if (!IsRunning) 128 if (!IsRunning)
127 return; 129 return;
128 130
129 IsRunning = false; 131 m_log.DebugFormat("[JobEngine] Stopping {0}", Name);
130 132
131 int requestsLeft = m_jobQueue.Count; 133 IsRunning = false;
132 134
133 if (requestsLeft <= 0) 135 m_finishedProcessingAfterStop.Reset();
134 { 136 if(m_jobQueue.Count <= 0)
135 m_cancelSource.Cancel(); 137 m_cancelSource.Cancel();
136 } 138
137 else 139 m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop);
138 {
139 m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft);
140
141 while (requestsLeft > 0)
142 {
143 if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
144 {
145 // After timeout no events have been written
146 if (requestsLeft == m_jobQueue.Count)
147 {
148 m_log.WarnFormat(
149 "[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests",
150 LoggingName, RequestProcessTimeoutOnStop, requestsLeft);
151
152 break;
153 }
154 }
155
156 requestsLeft = m_jobQueue.Count;
157 }
158 }
159 } 140 }
160 finally 141 finally
161 { 142 {
@@ -244,48 +225,51 @@ namespace OpenSim.Framework.Monitoring
244 225
245 private void ProcessRequests() 226 private void ProcessRequests()
246 { 227 {
247 try 228 while(IsRunning || m_jobQueue.Count > 0)
248 { 229 {
249 while (IsRunning || m_jobQueue.Count > 0) 230 try
250 { 231 {
251 try 232 CurrentJob = m_jobQueue.Take(m_cancelSource.Token);
252 { 233 }
253 CurrentJob = m_jobQueue.Take(m_cancelSource.Token); 234 catch(ObjectDisposedException e)
254 } 235 {
255 catch (ObjectDisposedException e) 236 // If we see this whilst not running then it may be due to a race where this thread checks
237 // IsRunning after the stopping thread sets it to false and disposes of the cancellation source.
238 if(IsRunning)
239 throw e;
240 else
256 { 241 {
257 // If we see this whilst not running then it may be due to a race where this thread checks 242 m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue",
258 // IsRunning after the stopping thread sets it to false and disposes of the cancellation source. 243 Name,m_jobQueue.Count);
259 if (IsRunning) 244 break;
260 throw e;
261 else
262 break;
263 } 245 }
246 }
247 catch(OperationCanceledException)
248 {
249 break;
250 }
264 251
265 if (LogLevel >= 1) 252 if(LogLevel >= 1)
266 m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name); 253 m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,CurrentJob.Name);
267 254
268 try 255 try
269 { 256 {
270 CurrentJob.Action(); 257 CurrentJob.Action();
271 } 258 }
272 catch (Exception e) 259 catch(Exception e)
273 { 260 {
274 m_log.Error( 261 m_log.Error(
275 string.Format( 262 string.Format(
276 "[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e); 263 "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,CurrentJob.Name),e);
277 } 264 }
278 265
279 if (LogLevel >= 1) 266 if(LogLevel >= 1)
280 m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name); 267 m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,CurrentJob.Name);
281 268
282 CurrentJob = null; 269 CurrentJob = null;
283 }
284 }
285 catch (OperationCanceledException)
286 {
287 } 270 }
288 271
272 Watchdog.RemoveThread(false);
289 m_finishedProcessingAfterStop.Set(); 273 m_finishedProcessingAfterStop.Set();
290 } 274 }
291 275
diff --git a/OpenSim/Framework/Monitoring/Watchdog.cs b/OpenSim/Framework/Monitoring/Watchdog.cs
index 4485a9c..83f8e01 100644
--- a/OpenSim/Framework/Monitoring/Watchdog.cs
+++ b/OpenSim/Framework/Monitoring/Watchdog.cs
@@ -333,39 +333,45 @@ namespace OpenSim.Framework.Monitoring
333 { 333 {
334 List<ThreadWatchdogInfo> callbackInfos = null; 334 List<ThreadWatchdogInfo> callbackInfos = null;
335 335
336 // get a copy since we may change m_threads
337 List<ThreadWatchdogInfo> threadsInfo;
336 lock (m_threads) 338 lock (m_threads)
339 threadsInfo = m_threads.Values.ToList();
340
341 foreach (ThreadWatchdogInfo threadInfo in threadsInfo)
337 { 342 {
338 // get a copy since we may change m_threads 343 lock (m_threads)
339 List<ThreadWatchdogInfo> threadsInfo = m_threads.Values.ToList();
340 foreach (ThreadWatchdogInfo threadInfo in threadsInfo)
341 { 344 {
342 if (threadInfo.Thread.ThreadState == ThreadState.Stopped) 345 if(!m_threads.ContainsValue(threadInfo))
343 { 346 continue;
344 RemoveThread(threadInfo.Thread.ManagedThreadId); 347 }
345 348
346 if (callbackInfos == null) 349 if(threadInfo.Thread.ThreadState == ThreadState.Stopped)
347 callbackInfos = new List<ThreadWatchdogInfo>(); 350 {
351 RemoveThread(threadInfo.Thread.ManagedThreadId);
348 352
349 callbackInfos.Add(threadInfo); 353 if(callbackInfos == null)
350 } 354 callbackInfos = new List<ThreadWatchdogInfo>();
351 else if (!threadInfo.IsTimedOut && now - threadInfo.LastTick >= threadInfo.Timeout)
352 {
353 threadInfo.IsTimedOut = true;
354 355
355 if (threadInfo.AlarmIfTimeout) 356 callbackInfos.Add(threadInfo);
356 { 357 }
357 if (callbackInfos == null) 358 else if(!threadInfo.IsTimedOut && now - threadInfo.LastTick >= threadInfo.Timeout)
358 callbackInfos = new List<ThreadWatchdogInfo>(); 359 {
360 threadInfo.IsTimedOut = true;
361
362 if(threadInfo.AlarmIfTimeout)
363 {
364 if(callbackInfos == null)
365 callbackInfos = new List<ThreadWatchdogInfo>();
359 366
360 // Send a copy of the watchdog info to prevent race conditions where the watchdog 367 // Send a copy of the watchdog info to prevent race conditions where the watchdog
361 // thread updates the monitoring info after an alarm has been sent out. 368 // thread updates the monitoring info after an alarm has been sent out.
362 callbackInfos.Add(new ThreadWatchdogInfo(threadInfo)); 369 callbackInfos.Add(new ThreadWatchdogInfo(threadInfo));
363 }
364 } 370 }
365 } 371 }
366 } 372 }
367 373
368 if (callbackInfos != null) 374 if(callbackInfos != null)
369 foreach (ThreadWatchdogInfo callbackInfo in callbackInfos) 375 foreach (ThreadWatchdogInfo callbackInfo in callbackInfos)
370 callback(callbackInfo); 376 callback(callbackInfo);
371 } 377 }