aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Monitoring/JobEngine.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Monitoring/JobEngine.cs')
-rw-r--r--OpenSim/Framework/Monitoring/JobEngine.cs162
1 files changed, 71 insertions, 91 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs
index 6db9a67..115871e 100644
--- a/OpenSim/Framework/Monitoring/JobEngine.cs
+++ b/OpenSim/Framework/Monitoring/JobEngine.cs
@@ -40,6 +40,8 @@ namespace OpenSim.Framework.Monitoring
40 40
41 public int LogLevel { get; set; } 41 public int LogLevel { get; set; }
42 42
43 private object JobLock = new object();
44
43 public string Name { get; private set; } 45 public string Name { get; private set; }
44 46
45 public string LoggingName { get; private set; } 47 public string LoggingName { get; private set; }
@@ -47,7 +49,7 @@ namespace OpenSim.Framework.Monitoring
47 /// <summary> 49 /// <summary>
48 /// Is this engine running? 50 /// Is this engine running?
49 /// </summary> 51 /// </summary>
50 public bool IsRunning { get; private set; } 52 public bool IsRunning { get; private set; }
51 53
52 /// <summary> 54 /// <summary>
53 /// The current job that the engine is running. 55 /// The current job that the engine is running.
@@ -55,7 +57,8 @@ namespace OpenSim.Framework.Monitoring
55 /// <remarks> 57 /// <remarks>
56 /// Will be null if no job is currently running. 58 /// Will be null if no job is currently running.
57 /// </remarks> 59 /// </remarks>
58 public Job CurrentJob { get; private set; } 60 private Job m_currentJob;
61 public Job CurrentJob { get { return m_currentJob;} }
59 62
60 /// <summary> 63 /// <summary>
61 /// Number of jobs waiting to be processed. 64 /// Number of jobs waiting to be processed.
@@ -71,96 +74,64 @@ namespace OpenSim.Framework.Monitoring
71 /// Controls whether we need to warn in the log about exceeding the max queue size. 74 /// Controls whether we need to warn in the log about exceeding the max queue size.
72 /// </summary> 75 /// </summary>
73 /// <remarks> 76 /// <remarks>
74 /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in 77 /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
75 /// order to avoid spamming the log with lots of warnings. 78 /// order to avoid spamming the log with lots of warnings.
76 /// </remarks> 79 /// </remarks>
77 private bool m_warnOverMaxQueue = true; 80 private bool m_warnOverMaxQueue = true;
78 81
79 private BlockingCollection<Job> m_jobQueue; 82 private BlockingCollection<Job> m_jobQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000);
80 83
81 private CancellationTokenSource m_cancelSource; 84 private CancellationTokenSource m_cancelSource;
82 85
83 /// <summary> 86 private int m_timeout = -1;
84 /// Used to signal that we are ready to complete stop.
85 /// </summary>
86 private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
87 87
88 public JobEngine(string name, string loggingName) 88 private bool m_threadRunnig = false;
89
90 public JobEngine(string name, string loggingName, int timeout = -1)
89 { 91 {
90 Name = name; 92 Name = name;
91 LoggingName = loggingName; 93 LoggingName = loggingName;
92 94 m_timeout = timeout;
93 RequestProcessTimeoutOnStop = 5000; 95 RequestProcessTimeoutOnStop = 5000;
94 } 96 }
95 97
96 public void Start() 98 public void Start()
97 { 99 {
98 lock (this) 100 lock (JobLock)
99 { 101 {
100 if (IsRunning) 102 if (IsRunning)
101 return; 103 return;
102 104
103 IsRunning = true; 105 IsRunning = true;
104 106
105 m_finishedProcessingAfterStop.Reset();
106
107 m_jobQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000);
108 m_cancelSource = new CancellationTokenSource(); 107 m_cancelSource = new CancellationTokenSource();
109 108 WorkManager.RunInThreadPool(ProcessRequests, null, Name, false);
110 WorkManager.StartThread( 109 m_threadRunnig = true;
111 ProcessRequests,
112 Name,
113 ThreadPriority.Normal,
114 false,
115 true,
116 null,
117 int.MaxValue);
118 } 110 }
119 } 111 }
120 112
121 public void Stop() 113 public void Stop()
122 { 114 {
123 lock (this) 115 lock (JobLock)
124 { 116 {
125 try 117 try
126 { 118 {
127 if (!IsRunning) 119 if (!IsRunning)
128 return; 120 return;
129 121
130 IsRunning = false; 122 m_log.DebugFormat("[JobEngine] Stopping {0}", Name);
131 123
132 int requestsLeft = m_jobQueue.Count; 124 IsRunning = false;
133 125 if(m_threadRunnig)
134 if (requestsLeft <= 0)
135 { 126 {
136 m_cancelSource.Cancel(); 127 m_cancelSource.Cancel();
137 } 128 m_threadRunnig = false;
138 else
139 {
140 m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft);
141
142 while (requestsLeft > 0)
143 {
144 if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
145 {
146 // After timeout no events have been written
147 if (requestsLeft == m_jobQueue.Count)
148 {
149 m_log.WarnFormat(
150 "[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests",
151 LoggingName, RequestProcessTimeoutOnStop, requestsLeft);
152
153 break;
154 }
155 }
156
157 requestsLeft = m_jobQueue.Count;
158 }
159 } 129 }
160 } 130 }
161 finally 131 finally
162 { 132 {
163 m_cancelSource.Dispose(); 133 if(m_cancelSource != null)
134 m_cancelSource.Dispose();
164 } 135 }
165 } 136 }
166 } 137 }
@@ -169,7 +140,7 @@ namespace OpenSim.Framework.Monitoring
169 /// Make a job. 140 /// Make a job.
170 /// </summary> 141 /// </summary>
171 /// <remarks> 142 /// <remarks>
172 /// We provide this method to replace the constructor so that we can later pool job objects if necessary to 143 /// We provide this method to replace the constructor so that we can later pool job objects if necessary to
173 /// reduce memory churn. Normally one would directly call QueueJob() with parameters anyway. 144 /// reduce memory churn. Normally one would directly call QueueJob() with parameters anyway.
174 /// </remarks> 145 /// </remarks>
175 /// <returns></returns> 146 /// <returns></returns>
@@ -219,6 +190,18 @@ namespace OpenSim.Framework.Monitoring
219 /// </param> 190 /// </param>
220 public bool QueueJob(Job job) 191 public bool QueueJob(Job job)
221 { 192 {
193 lock(JobLock)
194 {
195 if(!IsRunning)
196 return false;
197
198 if(!m_threadRunnig)
199 {
200 WorkManager.RunInThreadPool(ProcessRequests, null, Name, false);
201 m_threadRunnig = true;
202 }
203 }
204
222 if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) 205 if (m_jobQueue.Count < m_jobQueue.BoundedCapacity)
223 { 206 {
224 m_jobQueue.Add(job); 207 m_jobQueue.Add(job);
@@ -238,56 +221,53 @@ namespace OpenSim.Framework.Monitoring
238 221
239 m_warnOverMaxQueue = false; 222 m_warnOverMaxQueue = false;
240 } 223 }
241
242 return false; 224 return false;
243 } 225 }
244 } 226 }
245 227
246 private void ProcessRequests() 228 private void ProcessRequests(Object o)
247 { 229 {
248 try 230 while(IsRunning)
249 { 231 {
250 while (IsRunning || m_jobQueue.Count > 0) 232 try
251 { 233 {
252 try 234 if(!m_jobQueue.TryTake(out m_currentJob, m_timeout, m_cancelSource.Token))
253 { 235 {
254 CurrentJob = m_jobQueue.Take(m_cancelSource.Token); 236 lock(JobLock)
255 } 237 m_threadRunnig = false;
256 catch (ObjectDisposedException e) 238 break;
257 {
258 // If we see this whilst not running then it may be due to a race where this thread checks
259 // IsRunning after the stopping thread sets it to false and disposes of the cancellation source.
260 if (IsRunning)
261 throw e;
262 else
263 break;
264 } 239 }
240 }
241 catch(ObjectDisposedException e)
242 {
243 m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue",
244 Name,m_jobQueue.Count);
245 break;
246 }
247 catch(OperationCanceledException)
248 {
249 break;
250 }
265 251
266 if (LogLevel >= 1) 252 if(LogLevel >= 1)
267 m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name); 253 m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,m_currentJob.Name);
268 254
269 try 255 try
270 { 256 {
271 CurrentJob.Action(); 257 m_currentJob.Action();
272 } 258 }
273 catch (Exception e) 259 catch(Exception e)
274 { 260 {
275 m_log.Error( 261 m_log.Error(
276 string.Format( 262 string.Format(
277 "[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e); 263 "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,m_currentJob.Name),e);
278 } 264 }
279 265
280 if (LogLevel >= 1) 266 if(LogLevel >= 1)
281 m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name); 267 m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,m_currentJob.Name);
282 268
283 CurrentJob = null; 269 m_currentJob = null;
284 }
285 }
286 catch (OperationCanceledException)
287 {
288 } 270 }
289
290 m_finishedProcessingAfterStop.Set();
291 } 271 }
292 272
293 public class Job 273 public class Job
@@ -320,7 +300,7 @@ namespace OpenSim.Framework.Monitoring
320 CommonId = commonId; 300 CommonId = commonId;
321 Action = action; 301 Action = action;
322 } 302 }
323 303
324 /// <summary> 304 /// <summary>
325 /// Make a job. It needs to be separately queued. 305 /// Make a job. It needs to be separately queued.
326 /// </summary> 306 /// </summary>
@@ -338,4 +318,4 @@ namespace OpenSim.Framework.Monitoring
338 } 318 }
339 } 319 }
340 } 320 }
341} \ No newline at end of file 321}