diff options
Diffstat (limited to '')
-rw-r--r-- | OpenSim/Framework/Monitoring/JobEngine.cs | 106 |
1 files changed, 45 insertions, 61 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs index 75ad75d..7709f62 100644 --- a/OpenSim/Framework/Monitoring/JobEngine.cs +++ b/OpenSim/Framework/Monitoring/JobEngine.cs | |||
@@ -40,6 +40,8 @@ namespace OpenSim.Framework.Monitoring | |||
40 | 40 | ||
41 | public int LogLevel { get; set; } | 41 | public int LogLevel { get; set; } |
42 | 42 | ||
43 | private object JobLock = new object(); | ||
44 | |||
43 | public string Name { get; private set; } | 45 | public string Name { get; private set; } |
44 | 46 | ||
45 | public string LoggingName { get; private set; } | 47 | public string LoggingName { get; private set; } |
@@ -95,7 +97,7 @@ namespace OpenSim.Framework.Monitoring | |||
95 | 97 | ||
96 | public void Start() | 98 | public void Start() |
97 | { | 99 | { |
98 | lock (this) | 100 | lock (JobLock) |
99 | { | 101 | { |
100 | if (IsRunning) | 102 | if (IsRunning) |
101 | return; | 103 | return; |
@@ -119,43 +121,22 @@ namespace OpenSim.Framework.Monitoring | |||
119 | 121 | ||
120 | public void Stop() | 122 | public void Stop() |
121 | { | 123 | { |
122 | lock (this) | 124 | lock (JobLock) |
123 | { | 125 | { |
124 | try | 126 | try |
125 | { | 127 | { |
126 | if (!IsRunning) | 128 | if (!IsRunning) |
127 | return; | 129 | return; |
128 | 130 | ||
129 | IsRunning = false; | 131 | m_log.DebugFormat("[JobEngine] Stopping {0}", Name); |
130 | 132 | ||
131 | int requestsLeft = m_jobQueue.Count; | 133 | IsRunning = false; |
132 | 134 | ||
133 | if (requestsLeft <= 0) | 135 | m_finishedProcessingAfterStop.Reset(); |
134 | { | 136 | if(m_jobQueue.Count <= 0) |
135 | m_cancelSource.Cancel(); | 137 | m_cancelSource.Cancel(); |
136 | } | 138 | |
137 | else | 139 | m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop); |
138 | { | ||
139 | m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft); | ||
140 | |||
141 | while (requestsLeft > 0) | ||
142 | { | ||
143 | if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) | ||
144 | { | ||
145 | // After timeout no events have been written | ||
146 | if (requestsLeft == m_jobQueue.Count) | ||
147 | { | ||
148 | m_log.WarnFormat( | ||
149 | "[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests", | ||
150 | LoggingName, RequestProcessTimeoutOnStop, requestsLeft); | ||
151 | |||
152 | break; | ||
153 | } | ||
154 | } | ||
155 | |||
156 | requestsLeft = m_jobQueue.Count; | ||
157 | } | ||
158 | } | ||
159 | } | 140 | } |
160 | finally | 141 | finally |
161 | { | 142 | { |
@@ -244,48 +225,51 @@ namespace OpenSim.Framework.Monitoring | |||
244 | 225 | ||
245 | private void ProcessRequests() | 226 | private void ProcessRequests() |
246 | { | 227 | { |
247 | try | 228 | while(IsRunning || m_jobQueue.Count > 0) |
248 | { | 229 | { |
249 | while (IsRunning || m_jobQueue.Count > 0) | 230 | try |
250 | { | 231 | { |
251 | try | 232 | CurrentJob = m_jobQueue.Take(m_cancelSource.Token); |
252 | { | 233 | } |
253 | CurrentJob = m_jobQueue.Take(m_cancelSource.Token); | 234 | catch(ObjectDisposedException e) |
254 | } | 235 | { |
255 | catch (ObjectDisposedException e) | 236 | // If we see this whilst not running then it may be due to a race where this thread checks |
237 | // IsRunning after the stopping thread sets it to false and disposes of the cancellation source. | ||
238 | if(IsRunning) | ||
239 | throw e; | ||
240 | else | ||
256 | { | 241 | { |
257 | // If we see this whilst not running then it may be due to a race where this thread checks | 242 | m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue", |
258 | // IsRunning after the stopping thread sets it to false and disposes of the cancellation source. | 243 | Name,m_jobQueue.Count); |
259 | if (IsRunning) | 244 | break; |
260 | throw e; | ||
261 | else | ||
262 | break; | ||
263 | } | 245 | } |
246 | } | ||
247 | catch(OperationCanceledException) | ||
248 | { | ||
249 | break; | ||
250 | } | ||
264 | 251 | ||
265 | if (LogLevel >= 1) | 252 | if(LogLevel >= 1) |
266 | m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name); | 253 | m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,CurrentJob.Name); |
267 | 254 | ||
268 | try | 255 | try |
269 | { | 256 | { |
270 | CurrentJob.Action(); | 257 | CurrentJob.Action(); |
271 | } | 258 | } |
272 | catch (Exception e) | 259 | catch(Exception e) |
273 | { | 260 | { |
274 | m_log.Error( | 261 | m_log.Error( |
275 | string.Format( | 262 | string.Format( |
276 | "[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e); | 263 | "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,CurrentJob.Name),e); |
277 | } | 264 | } |
278 | 265 | ||
279 | if (LogLevel >= 1) | 266 | if(LogLevel >= 1) |
280 | m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name); | 267 | m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,CurrentJob.Name); |
281 | 268 | ||
282 | CurrentJob = null; | 269 | CurrentJob = null; |
283 | } | ||
284 | } | ||
285 | catch (OperationCanceledException) | ||
286 | { | ||
287 | } | 270 | } |
288 | 271 | ||
272 | Watchdog.RemoveThread(false); | ||
289 | m_finishedProcessingAfterStop.Set(); | 273 | m_finishedProcessingAfterStop.Set(); |
290 | } | 274 | } |
291 | 275 | ||