aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--OpenSim/Framework/Monitoring/JobEngine.cs89
1 files changed, 41 insertions, 48 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs
index a6a059d..115871e 100644
--- a/OpenSim/Framework/Monitoring/JobEngine.cs
+++ b/OpenSim/Framework/Monitoring/JobEngine.cs
@@ -57,7 +57,8 @@ namespace OpenSim.Framework.Monitoring
57 /// <remarks> 57 /// <remarks>
58 /// Will be null if no job is currently running. 58 /// Will be null if no job is currently running.
59 /// </remarks> 59 /// </remarks>
60 public Job CurrentJob { get; private set; } 60 private Job m_currentJob;
61 public Job CurrentJob { get { return m_currentJob;} }
61 62
62 /// <summary> 63 /// <summary>
63 /// Number of jobs waiting to be processed. 64 /// Number of jobs waiting to be processed.
@@ -82,16 +83,15 @@ namespace OpenSim.Framework.Monitoring
82 83
83 private CancellationTokenSource m_cancelSource; 84 private CancellationTokenSource m_cancelSource;
84 85
85 /// <summary> 86 private int m_timeout = -1;
86 /// Used to signal that we are ready to complete stop. 87
87 /// </summary> 88 private bool m_threadRunnig = false;
88 private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
89 89
90 public JobEngine(string name, string loggingName) 90 public JobEngine(string name, string loggingName, int timeout = -1)
91 { 91 {
92 Name = name; 92 Name = name;
93 LoggingName = loggingName; 93 LoggingName = loggingName;
94 94 m_timeout = timeout;
95 RequestProcessTimeoutOnStop = 5000; 95 RequestProcessTimeoutOnStop = 5000;
96 } 96 }
97 97
@@ -104,18 +104,9 @@ namespace OpenSim.Framework.Monitoring
104 104
105 IsRunning = true; 105 IsRunning = true;
106 106
107 m_finishedProcessingAfterStop.Reset();
108
109 m_cancelSource = new CancellationTokenSource(); 107 m_cancelSource = new CancellationTokenSource();
110 108 WorkManager.RunInThreadPool(ProcessRequests, null, Name, false);
111 WorkManager.StartThread( 109 m_threadRunnig = true;
112 ProcessRequests,
113 Name,
114 ThreadPriority.Normal,
115 false,
116 true,
117 null,
118 int.MaxValue);
119 } 110 }
120 } 111 }
121 112
@@ -131,20 +122,16 @@ namespace OpenSim.Framework.Monitoring
131 m_log.DebugFormat("[JobEngine] Stopping {0}", Name); 122 m_log.DebugFormat("[JobEngine] Stopping {0}", Name);
132 123
133 IsRunning = false; 124 IsRunning = false;
134 125 if(m_threadRunnig)
135 m_finishedProcessingAfterStop.Reset(); 126 {
136 if(m_jobQueue.Count <= 0)
137 m_cancelSource.Cancel(); 127 m_cancelSource.Cancel();
138 128 m_threadRunnig = false;
139 m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop); 129 }
140 m_finishedProcessingAfterStop.Close();
141 } 130 }
142 finally 131 finally
143 { 132 {
144 if(m_cancelSource != null) 133 if(m_cancelSource != null)
145 m_cancelSource.Dispose(); 134 m_cancelSource.Dispose();
146 if(m_finishedProcessingAfterStop != null)
147 m_finishedProcessingAfterStop.Dispose();
148 } 135 }
149 } 136 }
150 } 137 }
@@ -203,6 +190,18 @@ namespace OpenSim.Framework.Monitoring
203 /// </param> 190 /// </param>
204 public bool QueueJob(Job job) 191 public bool QueueJob(Job job)
205 { 192 {
193 lock(JobLock)
194 {
195 if(!IsRunning)
196 return false;
197
198 if(!m_threadRunnig)
199 {
200 WorkManager.RunInThreadPool(ProcessRequests, null, Name, false);
201 m_threadRunnig = true;
202 }
203 }
204
206 if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) 205 if (m_jobQueue.Count < m_jobQueue.BoundedCapacity)
207 { 206 {
208 m_jobQueue.Add(job); 207 m_jobQueue.Add(job);
@@ -222,59 +221,53 @@ namespace OpenSim.Framework.Monitoring
222 221
223 m_warnOverMaxQueue = false; 222 m_warnOverMaxQueue = false;
224 } 223 }
225
226 return false; 224 return false;
227 } 225 }
228 } 226 }
229 227
230 private void ProcessRequests() 228 private void ProcessRequests(Object o)
231 { 229 {
232 while(IsRunning || m_jobQueue.Count > 0) 230 while(IsRunning)
233 { 231 {
234 try 232 try
235 { 233 {
236 CurrentJob = m_jobQueue.Take(m_cancelSource.Token); 234 if(!m_jobQueue.TryTake(out m_currentJob, m_timeout, m_cancelSource.Token))
237 }
238 catch(ObjectDisposedException e)
239 {
240 // If we see this whilst not running then it may be due to a race where this thread checks
241 // IsRunning after the stopping thread sets it to false and disposes of the cancellation source.
242 if(IsRunning)
243 throw e;
244 else
245 { 235 {
246 m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue", 236 lock(JobLock)
247 Name,m_jobQueue.Count); 237 m_threadRunnig = false;
248 break; 238 break;
249 } 239 }
250 } 240 }
241 catch(ObjectDisposedException e)
242 {
243 m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue",
244 Name,m_jobQueue.Count);
245 break;
246 }
251 catch(OperationCanceledException) 247 catch(OperationCanceledException)
252 { 248 {
253 break; 249 break;
254 } 250 }
255 251
256 if(LogLevel >= 1) 252 if(LogLevel >= 1)
257 m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,CurrentJob.Name); 253 m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,m_currentJob.Name);
258 254
259 try 255 try
260 { 256 {
261 CurrentJob.Action(); 257 m_currentJob.Action();
262 } 258 }
263 catch(Exception e) 259 catch(Exception e)
264 { 260 {
265 m_log.Error( 261 m_log.Error(
266 string.Format( 262 string.Format(
267 "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,CurrentJob.Name),e); 263 "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,m_currentJob.Name),e);
268 } 264 }
269 265
270 if(LogLevel >= 1) 266 if(LogLevel >= 1)
271 m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,CurrentJob.Name); 267 m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,m_currentJob.Name);
272 268
273 CurrentJob = null; 269 m_currentJob = null;
274 } 270 }
275
276 Watchdog.RemoveThread(false);
277 m_finishedProcessingAfterStop.Set();
278 } 271 }
279 272
280 public class Job 273 public class Job