aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Monitoring/JobEngine.cs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--OpenSim/Framework/Monitoring/JobEngine.cs341
1 files changed, 341 insertions, 0 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs
new file mode 100644
index 0000000..6db9a67
--- /dev/null
+++ b/OpenSim/Framework/Monitoring/JobEngine.cs
@@ -0,0 +1,341 @@
1/*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28using System;
29using System.Collections.Concurrent;
30using System.Reflection;
31using System.Threading;
32using log4net;
33using OpenSim.Framework;
34
35namespace OpenSim.Framework.Monitoring
36{
37 public class JobEngine
38 {
39 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
40
41 public int LogLevel { get; set; }
42
43 public string Name { get; private set; }
44
45 public string LoggingName { get; private set; }
46
47 /// <summary>
48 /// Is this engine running?
49 /// </summary>
50 public bool IsRunning { get; private set; }
51
52 /// <summary>
53 /// The current job that the engine is running.
54 /// </summary>
55 /// <remarks>
56 /// Will be null if no job is currently running.
57 /// </remarks>
58 public Job CurrentJob { get; private set; }
59
60 /// <summary>
61 /// Number of jobs waiting to be processed.
62 /// </summary>
63 public int JobsWaiting { get { return m_jobQueue.Count; } }
64
65 /// <summary>
66 /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
67 /// </summary>
68 public int RequestProcessTimeoutOnStop { get; set; }
69
70 /// <summary>
71 /// Controls whether we need to warn in the log about exceeding the max queue size.
72 /// </summary>
73 /// <remarks>
74 /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
75 /// order to avoid spamming the log with lots of warnings.
76 /// </remarks>
77 private bool m_warnOverMaxQueue = true;
78
79 private BlockingCollection<Job> m_jobQueue;
80
81 private CancellationTokenSource m_cancelSource;
82
83 /// <summary>
84 /// Used to signal that we are ready to complete stop.
85 /// </summary>
86 private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
87
88 public JobEngine(string name, string loggingName)
89 {
90 Name = name;
91 LoggingName = loggingName;
92
93 RequestProcessTimeoutOnStop = 5000;
94 }
95
96 public void Start()
97 {
98 lock (this)
99 {
100 if (IsRunning)
101 return;
102
103 IsRunning = true;
104
105 m_finishedProcessingAfterStop.Reset();
106
107 m_jobQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000);
108 m_cancelSource = new CancellationTokenSource();
109
110 WorkManager.StartThread(
111 ProcessRequests,
112 Name,
113 ThreadPriority.Normal,
114 false,
115 true,
116 null,
117 int.MaxValue);
118 }
119 }
120
121 public void Stop()
122 {
123 lock (this)
124 {
125 try
126 {
127 if (!IsRunning)
128 return;
129
130 IsRunning = false;
131
132 int requestsLeft = m_jobQueue.Count;
133
134 if (requestsLeft <= 0)
135 {
136 m_cancelSource.Cancel();
137 }
138 else
139 {
140 m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft);
141
142 while (requestsLeft > 0)
143 {
144 if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
145 {
146 // After timeout no events have been written
147 if (requestsLeft == m_jobQueue.Count)
148 {
149 m_log.WarnFormat(
150 "[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests",
151 LoggingName, RequestProcessTimeoutOnStop, requestsLeft);
152
153 break;
154 }
155 }
156
157 requestsLeft = m_jobQueue.Count;
158 }
159 }
160 }
161 finally
162 {
163 m_cancelSource.Dispose();
164 }
165 }
166 }
167
168 /// <summary>
169 /// Make a job.
170 /// </summary>
171 /// <remarks>
172 /// We provide this method to replace the constructor so that we can later pool job objects if necessary to
173 /// reduce memory churn. Normally one would directly call QueueJob() with parameters anyway.
174 /// </remarks>
175 /// <returns></returns>
176 /// <param name="name">Name.</param>
177 /// <param name="action">Action.</param>
178 /// <param name="commonId">Common identifier.</param>
179 public static Job MakeJob(string name, Action action, string commonId = null)
180 {
181 return Job.MakeJob(name, action, commonId);
182 }
183
184 /// <summary>
185 /// Remove the next job queued for processing.
186 /// </summary>
187 /// <remarks>
188 /// Returns null if there is no next job.
189 /// Will not remove a job currently being performed.
190 /// </remarks>
191 public Job RemoveNextJob()
192 {
193 Job nextJob;
194 m_jobQueue.TryTake(out nextJob);
195
196 return nextJob;
197 }
198
199 /// <summary>
200 /// Queue the job for processing.
201 /// </summary>
202 /// <returns><c>true</c>, if job was queued, <c>false</c> otherwise.</returns>
203 /// <param name="name">Name of job. This appears on the console and in logging.</param>
204 /// <param name="action">Action to perform.</param>
205 /// <param name="commonId">
206 /// Common identifier for a set of jobs. This is allows a set of jobs to be removed
207 /// if required (e.g. all jobs for a given agent. Optional.
208 /// </param>
209 public bool QueueJob(string name, Action action, string commonId = null)
210 {
211 return QueueJob(MakeJob(name, action, commonId));
212 }
213
214 /// <summary>
215 /// Queue the job for processing.
216 /// </summary>
217 /// <returns><c>true</c>, if job was queued, <c>false</c> otherwise.</returns>
218 /// <param name="job">The job</param>
219 /// </param>
220 public bool QueueJob(Job job)
221 {
222 if (m_jobQueue.Count < m_jobQueue.BoundedCapacity)
223 {
224 m_jobQueue.Add(job);
225
226 if (!m_warnOverMaxQueue)
227 m_warnOverMaxQueue = true;
228
229 return true;
230 }
231 else
232 {
233 if (m_warnOverMaxQueue)
234 {
235 m_log.WarnFormat(
236 "[{0}]: Job queue at maximum capacity, not recording job from {1} in {2}",
237 LoggingName, job.Name, Name);
238
239 m_warnOverMaxQueue = false;
240 }
241
242 return false;
243 }
244 }
245
246 private void ProcessRequests()
247 {
248 try
249 {
250 while (IsRunning || m_jobQueue.Count > 0)
251 {
252 try
253 {
254 CurrentJob = m_jobQueue.Take(m_cancelSource.Token);
255 }
256 catch (ObjectDisposedException e)
257 {
258 // If we see this whilst not running then it may be due to a race where this thread checks
259 // IsRunning after the stopping thread sets it to false and disposes of the cancellation source.
260 if (IsRunning)
261 throw e;
262 else
263 break;
264 }
265
266 if (LogLevel >= 1)
267 m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name);
268
269 try
270 {
271 CurrentJob.Action();
272 }
273 catch (Exception e)
274 {
275 m_log.Error(
276 string.Format(
277 "[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e);
278 }
279
280 if (LogLevel >= 1)
281 m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name);
282
283 CurrentJob = null;
284 }
285 }
286 catch (OperationCanceledException)
287 {
288 }
289
290 m_finishedProcessingAfterStop.Set();
291 }
292
293 public class Job
294 {
295 /// <summary>
296 /// Name of the job.
297 /// </summary>
298 /// <remarks>
299 /// This appears on console and debug output.
300 /// </remarks>
301 public string Name { get; private set; }
302
303 /// <summary>
304 /// Common ID for this job.
305 /// </summary>
306 /// <remarks>
307 /// This allows all jobs with a certain common ID (e.g. a client UUID) to be removed en-masse if required.
308 /// Can be null if this is not required.
309 /// </remarks>
310 public string CommonId { get; private set; }
311
312 /// <summary>
313 /// Action to perform when this job is processed.
314 /// </summary>
315 public Action Action { get; private set; }
316
317 private Job(string name, string commonId, Action action)
318 {
319 Name = name;
320 CommonId = commonId;
321 Action = action;
322 }
323
324 /// <summary>
325 /// Make a job. It needs to be separately queued.
326 /// </summary>
327 /// <remarks>
328 /// We provide this method to replace the constructor so that we can pool job objects if necessary to
329 /// to reduce memory churn. Normally one would directly call JobEngine.QueueJob() with parameters anyway.
330 /// </remarks>
331 /// <returns></returns>
332 /// <param name="name">Name.</param>
333 /// <param name="action">Action.</param>
334 /// <param name="commonId">Common identifier.</param>
335 public static Job MakeJob(string name, Action action, string commonId = null)
336 {
337 return new Job(name, commonId, action);
338 }
339 }
340 }
341} \ No newline at end of file