diff options
-rw-r--r-- | OpenSim/Framework/Monitoring/JobEngine.cs | 89 |
1 files changed, 41 insertions, 48 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs index a6a059d..115871e 100644 --- a/OpenSim/Framework/Monitoring/JobEngine.cs +++ b/OpenSim/Framework/Monitoring/JobEngine.cs | |||
@@ -57,7 +57,8 @@ namespace OpenSim.Framework.Monitoring | |||
57 | /// <remarks> | 57 | /// <remarks> |
58 | /// Will be null if no job is currently running. | 58 | /// Will be null if no job is currently running. |
59 | /// </remarks> | 59 | /// </remarks> |
60 | public Job CurrentJob { get; private set; } | 60 | private Job m_currentJob; |
61 | public Job CurrentJob { get { return m_currentJob;} } | ||
61 | 62 | ||
62 | /// <summary> | 63 | /// <summary> |
63 | /// Number of jobs waiting to be processed. | 64 | /// Number of jobs waiting to be processed. |
@@ -82,16 +83,15 @@ namespace OpenSim.Framework.Monitoring | |||
82 | 83 | ||
83 | private CancellationTokenSource m_cancelSource; | 84 | private CancellationTokenSource m_cancelSource; |
84 | 85 | ||
85 | /// <summary> | 86 | private int m_timeout = -1; |
86 | /// Used to signal that we are ready to complete stop. | 87 | |
87 | /// </summary> | 88 | private bool m_threadRunnig = false; |
88 | private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); | ||
89 | 89 | ||
90 | public JobEngine(string name, string loggingName) | 90 | public JobEngine(string name, string loggingName, int timeout = -1) |
91 | { | 91 | { |
92 | Name = name; | 92 | Name = name; |
93 | LoggingName = loggingName; | 93 | LoggingName = loggingName; |
94 | 94 | m_timeout = timeout; | |
95 | RequestProcessTimeoutOnStop = 5000; | 95 | RequestProcessTimeoutOnStop = 5000; |
96 | } | 96 | } |
97 | 97 | ||
@@ -104,18 +104,9 @@ namespace OpenSim.Framework.Monitoring | |||
104 | 104 | ||
105 | IsRunning = true; | 105 | IsRunning = true; |
106 | 106 | ||
107 | m_finishedProcessingAfterStop.Reset(); | ||
108 | |||
109 | m_cancelSource = new CancellationTokenSource(); | 107 | m_cancelSource = new CancellationTokenSource(); |
110 | 108 | WorkManager.RunInThreadPool(ProcessRequests, null, Name, false); | |
111 | WorkManager.StartThread( | 109 | m_threadRunnig = true; |
112 | ProcessRequests, | ||
113 | Name, | ||
114 | ThreadPriority.Normal, | ||
115 | false, | ||
116 | true, | ||
117 | null, | ||
118 | int.MaxValue); | ||
119 | } | 110 | } |
120 | } | 111 | } |
121 | 112 | ||
@@ -131,20 +122,16 @@ namespace OpenSim.Framework.Monitoring | |||
131 | m_log.DebugFormat("[JobEngine] Stopping {0}", Name); | 122 | m_log.DebugFormat("[JobEngine] Stopping {0}", Name); |
132 | 123 | ||
133 | IsRunning = false; | 124 | IsRunning = false; |
134 | 125 | if(m_threadRunnig) | |
135 | m_finishedProcessingAfterStop.Reset(); | 126 | { |
136 | if(m_jobQueue.Count <= 0) | ||
137 | m_cancelSource.Cancel(); | 127 | m_cancelSource.Cancel(); |
138 | 128 | m_threadRunnig = false; | |
139 | m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop); | 129 | } |
140 | m_finishedProcessingAfterStop.Close(); | ||
141 | } | 130 | } |
142 | finally | 131 | finally |
143 | { | 132 | { |
144 | if(m_cancelSource != null) | 133 | if(m_cancelSource != null) |
145 | m_cancelSource.Dispose(); | 134 | m_cancelSource.Dispose(); |
146 | if(m_finishedProcessingAfterStop != null) | ||
147 | m_finishedProcessingAfterStop.Dispose(); | ||
148 | } | 135 | } |
149 | } | 136 | } |
150 | } | 137 | } |
@@ -203,6 +190,18 @@ namespace OpenSim.Framework.Monitoring | |||
203 | /// </param> | 190 | /// </param> |
204 | public bool QueueJob(Job job) | 191 | public bool QueueJob(Job job) |
205 | { | 192 | { |
193 | lock(JobLock) | ||
194 | { | ||
195 | if(!IsRunning) | ||
196 | return false; | ||
197 | |||
198 | if(!m_threadRunnig) | ||
199 | { | ||
200 | WorkManager.RunInThreadPool(ProcessRequests, null, Name, false); | ||
201 | m_threadRunnig = true; | ||
202 | } | ||
203 | } | ||
204 | |||
206 | if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) | 205 | if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) |
207 | { | 206 | { |
208 | m_jobQueue.Add(job); | 207 | m_jobQueue.Add(job); |
@@ -222,59 +221,53 @@ namespace OpenSim.Framework.Monitoring | |||
222 | 221 | ||
223 | m_warnOverMaxQueue = false; | 222 | m_warnOverMaxQueue = false; |
224 | } | 223 | } |
225 | |||
226 | return false; | 224 | return false; |
227 | } | 225 | } |
228 | } | 226 | } |
229 | 227 | ||
230 | private void ProcessRequests() | 228 | private void ProcessRequests(Object o) |
231 | { | 229 | { |
232 | while(IsRunning || m_jobQueue.Count > 0) | 230 | while(IsRunning) |
233 | { | 231 | { |
234 | try | 232 | try |
235 | { | 233 | { |
236 | CurrentJob = m_jobQueue.Take(m_cancelSource.Token); | 234 | if(!m_jobQueue.TryTake(out m_currentJob, m_timeout, m_cancelSource.Token)) |
237 | } | ||
238 | catch(ObjectDisposedException e) | ||
239 | { | ||
240 | // If we see this whilst not running then it may be due to a race where this thread checks | ||
241 | // IsRunning after the stopping thread sets it to false and disposes of the cancellation source. | ||
242 | if(IsRunning) | ||
243 | throw e; | ||
244 | else | ||
245 | { | 235 | { |
246 | m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue", | 236 | lock(JobLock) |
247 | Name,m_jobQueue.Count); | 237 | m_threadRunnig = false; |
248 | break; | 238 | break; |
249 | } | 239 | } |
250 | } | 240 | } |
241 | catch(ObjectDisposedException e) | ||
242 | { | ||
243 | m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue", | ||
244 | Name,m_jobQueue.Count); | ||
245 | break; | ||
246 | } | ||
251 | catch(OperationCanceledException) | 247 | catch(OperationCanceledException) |
252 | { | 248 | { |
253 | break; | 249 | break; |
254 | } | 250 | } |
255 | 251 | ||
256 | if(LogLevel >= 1) | 252 | if(LogLevel >= 1) |
257 | m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,CurrentJob.Name); | 253 | m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,m_currentJob.Name); |
258 | 254 | ||
259 | try | 255 | try |
260 | { | 256 | { |
261 | CurrentJob.Action(); | 257 | m_currentJob.Action(); |
262 | } | 258 | } |
263 | catch(Exception e) | 259 | catch(Exception e) |
264 | { | 260 | { |
265 | m_log.Error( | 261 | m_log.Error( |
266 | string.Format( | 262 | string.Format( |
267 | "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,CurrentJob.Name),e); | 263 | "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,m_currentJob.Name),e); |
268 | } | 264 | } |
269 | 265 | ||
270 | if(LogLevel >= 1) | 266 | if(LogLevel >= 1) |
271 | m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,CurrentJob.Name); | 267 | m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,m_currentJob.Name); |
272 | 268 | ||
273 | CurrentJob = null; | 269 | m_currentJob = null; |
274 | } | 270 | } |
275 | |||
276 | Watchdog.RemoveThread(false); | ||
277 | m_finishedProcessingAfterStop.Set(); | ||
278 | } | 271 | } |
279 | 272 | ||
280 | public class Job | 273 | public class Job |