diff options
author | UbitUmarov | 2015-11-27 13:28:10 +0000 |
---|---|---|
committer | UbitUmarov | 2015-11-27 13:28:10 +0000 |
commit | baf8e762a61dfebc370aae9c67bbcc79eaafba36 (patch) | |
tree | 947f1823998cbe077afb24a5a5b736e4c7903121 | |
parent | change threading on GetTexture and getMesh and WebFetch Modules. (diff) | |
download | opensim-SC_OLD-baf8e762a61dfebc370aae9c67bbcc79eaafba36.zip opensim-SC_OLD-baf8e762a61dfebc370aae9c67bbcc79eaafba36.tar.gz opensim-SC_OLD-baf8e762a61dfebc370aae9c67bbcc79eaafba36.tar.bz2 opensim-SC_OLD-baf8e762a61dfebc370aae9c67bbcc79eaafba36.tar.xz |
change JobEngine stop code and add a extra check for thread removed on watchdog timeout checks
-rw-r--r-- | OpenSim/Framework/Monitoring/JobEngine.cs | 106 | ||||
-rw-r--r-- | OpenSim/Framework/Monitoring/Watchdog.cs | 50 |
2 files changed, 73 insertions, 83 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs index 75ad75d..7709f62 100644 --- a/OpenSim/Framework/Monitoring/JobEngine.cs +++ b/OpenSim/Framework/Monitoring/JobEngine.cs | |||
@@ -40,6 +40,8 @@ namespace OpenSim.Framework.Monitoring | |||
40 | 40 | ||
41 | public int LogLevel { get; set; } | 41 | public int LogLevel { get; set; } |
42 | 42 | ||
43 | private object JobLock = new object(); | ||
44 | |||
43 | public string Name { get; private set; } | 45 | public string Name { get; private set; } |
44 | 46 | ||
45 | public string LoggingName { get; private set; } | 47 | public string LoggingName { get; private set; } |
@@ -95,7 +97,7 @@ namespace OpenSim.Framework.Monitoring | |||
95 | 97 | ||
96 | public void Start() | 98 | public void Start() |
97 | { | 99 | { |
98 | lock (this) | 100 | lock (JobLock) |
99 | { | 101 | { |
100 | if (IsRunning) | 102 | if (IsRunning) |
101 | return; | 103 | return; |
@@ -119,43 +121,22 @@ namespace OpenSim.Framework.Monitoring | |||
119 | 121 | ||
120 | public void Stop() | 122 | public void Stop() |
121 | { | 123 | { |
122 | lock (this) | 124 | lock (JobLock) |
123 | { | 125 | { |
124 | try | 126 | try |
125 | { | 127 | { |
126 | if (!IsRunning) | 128 | if (!IsRunning) |
127 | return; | 129 | return; |
128 | 130 | ||
129 | IsRunning = false; | 131 | m_log.DebugFormat("[JobEngine] Stopping {0}", Name); |
130 | 132 | ||
131 | int requestsLeft = m_jobQueue.Count; | 133 | IsRunning = false; |
132 | 134 | ||
133 | if (requestsLeft <= 0) | 135 | m_finishedProcessingAfterStop.Reset(); |
134 | { | 136 | if(m_jobQueue.Count <= 0) |
135 | m_cancelSource.Cancel(); | 137 | m_cancelSource.Cancel(); |
136 | } | 138 | |
137 | else | 139 | m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop); |
138 | { | ||
139 | m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft); | ||
140 | |||
141 | while (requestsLeft > 0) | ||
142 | { | ||
143 | if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) | ||
144 | { | ||
145 | // After timeout no events have been written | ||
146 | if (requestsLeft == m_jobQueue.Count) | ||
147 | { | ||
148 | m_log.WarnFormat( | ||
149 | "[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests", | ||
150 | LoggingName, RequestProcessTimeoutOnStop, requestsLeft); | ||
151 | |||
152 | break; | ||
153 | } | ||
154 | } | ||
155 | |||
156 | requestsLeft = m_jobQueue.Count; | ||
157 | } | ||
158 | } | ||
159 | } | 140 | } |
160 | finally | 141 | finally |
161 | { | 142 | { |
@@ -244,48 +225,51 @@ namespace OpenSim.Framework.Monitoring | |||
244 | 225 | ||
245 | private void ProcessRequests() | 226 | private void ProcessRequests() |
246 | { | 227 | { |
247 | try | 228 | while(IsRunning || m_jobQueue.Count > 0) |
248 | { | 229 | { |
249 | while (IsRunning || m_jobQueue.Count > 0) | 230 | try |
250 | { | 231 | { |
251 | try | 232 | CurrentJob = m_jobQueue.Take(m_cancelSource.Token); |
252 | { | 233 | } |
253 | CurrentJob = m_jobQueue.Take(m_cancelSource.Token); | 234 | catch(ObjectDisposedException e) |
254 | } | 235 | { |
255 | catch (ObjectDisposedException e) | 236 | // If we see this whilst not running then it may be due to a race where this thread checks |
237 | // IsRunning after the stopping thread sets it to false and disposes of the cancellation source. | ||
238 | if(IsRunning) | ||
239 | throw e; | ||
240 | else | ||
256 | { | 241 | { |
257 | // If we see this whilst not running then it may be due to a race where this thread checks | 242 | m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue", |
258 | // IsRunning after the stopping thread sets it to false and disposes of the cancellation source. | 243 | Name,m_jobQueue.Count); |
259 | if (IsRunning) | 244 | break; |
260 | throw e; | ||
261 | else | ||
262 | break; | ||
263 | } | 245 | } |
246 | } | ||
247 | catch(OperationCanceledException) | ||
248 | { | ||
249 | break; | ||
250 | } | ||
264 | 251 | ||
265 | if (LogLevel >= 1) | 252 | if(LogLevel >= 1) |
266 | m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name); | 253 | m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,CurrentJob.Name); |
267 | 254 | ||
268 | try | 255 | try |
269 | { | 256 | { |
270 | CurrentJob.Action(); | 257 | CurrentJob.Action(); |
271 | } | 258 | } |
272 | catch (Exception e) | 259 | catch(Exception e) |
273 | { | 260 | { |
274 | m_log.Error( | 261 | m_log.Error( |
275 | string.Format( | 262 | string.Format( |
276 | "[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e); | 263 | "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,CurrentJob.Name),e); |
277 | } | 264 | } |
278 | 265 | ||
279 | if (LogLevel >= 1) | 266 | if(LogLevel >= 1) |
280 | m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name); | 267 | m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,CurrentJob.Name); |
281 | 268 | ||
282 | CurrentJob = null; | 269 | CurrentJob = null; |
283 | } | ||
284 | } | ||
285 | catch (OperationCanceledException) | ||
286 | { | ||
287 | } | 270 | } |
288 | 271 | ||
272 | Watchdog.RemoveThread(false); | ||
289 | m_finishedProcessingAfterStop.Set(); | 273 | m_finishedProcessingAfterStop.Set(); |
290 | } | 274 | } |
291 | 275 | ||
diff --git a/OpenSim/Framework/Monitoring/Watchdog.cs b/OpenSim/Framework/Monitoring/Watchdog.cs index 4485a9c..83f8e01 100644 --- a/OpenSim/Framework/Monitoring/Watchdog.cs +++ b/OpenSim/Framework/Monitoring/Watchdog.cs | |||
@@ -333,39 +333,45 @@ namespace OpenSim.Framework.Monitoring | |||
333 | { | 333 | { |
334 | List<ThreadWatchdogInfo> callbackInfos = null; | 334 | List<ThreadWatchdogInfo> callbackInfos = null; |
335 | 335 | ||
336 | // get a copy since we may change m_threads | ||
337 | List<ThreadWatchdogInfo> threadsInfo; | ||
336 | lock (m_threads) | 338 | lock (m_threads) |
339 | threadsInfo = m_threads.Values.ToList(); | ||
340 | |||
341 | foreach (ThreadWatchdogInfo threadInfo in threadsInfo) | ||
337 | { | 342 | { |
338 | // get a copy since we may change m_threads | 343 | lock (m_threads) |
339 | List<ThreadWatchdogInfo> threadsInfo = m_threads.Values.ToList(); | ||
340 | foreach (ThreadWatchdogInfo threadInfo in threadsInfo) | ||
341 | { | 344 | { |
342 | if (threadInfo.Thread.ThreadState == ThreadState.Stopped) | 345 | if(!m_threads.ContainsValue(threadInfo)) |
343 | { | 346 | continue; |
344 | RemoveThread(threadInfo.Thread.ManagedThreadId); | 347 | } |
345 | 348 | ||
346 | if (callbackInfos == null) | 349 | if(threadInfo.Thread.ThreadState == ThreadState.Stopped) |
347 | callbackInfos = new List<ThreadWatchdogInfo>(); | 350 | { |
351 | RemoveThread(threadInfo.Thread.ManagedThreadId); | ||
348 | 352 | ||
349 | callbackInfos.Add(threadInfo); | 353 | if(callbackInfos == null) |
350 | } | 354 | callbackInfos = new List<ThreadWatchdogInfo>(); |
351 | else if (!threadInfo.IsTimedOut && now - threadInfo.LastTick >= threadInfo.Timeout) | ||
352 | { | ||
353 | threadInfo.IsTimedOut = true; | ||
354 | 355 | ||
355 | if (threadInfo.AlarmIfTimeout) | 356 | callbackInfos.Add(threadInfo); |
356 | { | 357 | } |
357 | if (callbackInfos == null) | 358 | else if(!threadInfo.IsTimedOut && now - threadInfo.LastTick >= threadInfo.Timeout) |
358 | callbackInfos = new List<ThreadWatchdogInfo>(); | 359 | { |
360 | threadInfo.IsTimedOut = true; | ||
361 | |||
362 | if(threadInfo.AlarmIfTimeout) | ||
363 | { | ||
364 | if(callbackInfos == null) | ||
365 | callbackInfos = new List<ThreadWatchdogInfo>(); | ||
359 | 366 | ||
360 | // Send a copy of the watchdog info to prevent race conditions where the watchdog | 367 | // Send a copy of the watchdog info to prevent race conditions where the watchdog |
361 | // thread updates the monitoring info after an alarm has been sent out. | 368 | // thread updates the monitoring info after an alarm has been sent out. |
362 | callbackInfos.Add(new ThreadWatchdogInfo(threadInfo)); | 369 | callbackInfos.Add(new ThreadWatchdogInfo(threadInfo)); |
363 | } | ||
364 | } | 370 | } |
365 | } | 371 | } |
366 | } | 372 | } |
367 | 373 | ||
368 | if (callbackInfos != null) | 374 | if(callbackInfos != null) |
369 | foreach (ThreadWatchdogInfo callbackInfo in callbackInfos) | 375 | foreach (ThreadWatchdogInfo callbackInfo in callbackInfos) |
370 | callback(callbackInfo); | 376 | callback(callbackInfo); |
371 | } | 377 | } |