aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Monitoring/JobEngine.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Monitoring/JobEngine.cs')
-rw-r--r--OpenSim/Framework/Monitoring/JobEngine.cs329
1 files changed, 329 insertions, 0 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs
new file mode 100644
index 0000000..44f5d9a
--- /dev/null
+++ b/OpenSim/Framework/Monitoring/JobEngine.cs
@@ -0,0 +1,329 @@
1/*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28using System;
29using System.Collections.Concurrent;
30using System.Reflection;
31using System.Threading;
32using log4net;
33using OpenSim.Framework;
34
35namespace OpenSim.Framework.Monitoring
36{
37 public class JobEngine
38 {
39 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
40
41 public int LogLevel { get; set; }
42
43 public string Name { get; private set; }
44
45 public string LoggingName { get; private set; }
46
47 /// <summary>
48 /// Is this engine running?
49 /// </summary>
50 public bool IsRunning { get; private set; }
51
52 /// <summary>
53 /// The current job that the engine is running.
54 /// </summary>
55 /// <remarks>
56 /// Will be null if no job is currently running.
57 /// </remarks>
58 public Job CurrentJob { get; private set; }
59
60 /// <summary>
61 /// Number of jobs waiting to be processed.
62 /// </summary>
63 public int JobsWaiting { get { return m_jobQueue.Count; } }
64
65 /// <summary>
66 /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
67 /// </summary>
68 public int RequestProcessTimeoutOnStop { get; set; }
69
70 /// <summary>
71 /// Controls whether we need to warn in the log about exceeding the max queue size.
72 /// </summary>
73 /// <remarks>
74 /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
75 /// order to avoid spamming the log with lots of warnings.
76 /// </remarks>
77 private bool m_warnOverMaxQueue = true;
78
79 private BlockingCollection<Job> m_jobQueue;
80
81 private CancellationTokenSource m_cancelSource = new CancellationTokenSource();
82
83 /// <summary>
84 /// Used to signal that we are ready to complete stop.
85 /// </summary>
86 private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
87
88 public JobEngine(string name, string loggingName)
89 {
90 Name = name;
91 LoggingName = loggingName;
92
93 RequestProcessTimeoutOnStop = 5000;
94 }
95
96 public void Start()
97 {
98 lock (this)
99 {
100 if (IsRunning)
101 return;
102
103 IsRunning = true;
104
105 m_finishedProcessingAfterStop.Reset();
106
107 m_jobQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000);
108
109 WorkManager.StartThread(
110 ProcessRequests,
111 Name,
112 ThreadPriority.Normal,
113 false,
114 true,
115 null,
116 int.MaxValue);
117 }
118 }
119
120 public void Stop()
121 {
122 lock (this)
123 {
124 try
125 {
126 if (!IsRunning)
127 return;
128
129 IsRunning = false;
130
131 int requestsLeft = m_jobQueue.Count;
132
133 if (requestsLeft <= 0)
134 {
135 m_cancelSource.Cancel();
136 }
137 else
138 {
139 m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft);
140
141 while (requestsLeft > 0)
142 {
143 if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
144 {
145 // After timeout no events have been written
146 if (requestsLeft == m_jobQueue.Count)
147 {
148 m_log.WarnFormat(
149 "[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests",
150 LoggingName, RequestProcessTimeoutOnStop, requestsLeft);
151
152 break;
153 }
154 }
155
156 requestsLeft = m_jobQueue.Count;
157 }
158 }
159 }
160 finally
161 {
162 m_cancelSource.Dispose();
163 m_jobQueue = null;
164 }
165 }
166 }
167
168 /// <summary>
169 /// Make a job.
170 /// </summary>
171 /// <remarks>
172 /// We provide this method to replace the constructor so that we can later pool job objects if necessary to
173 /// reduce memory churn. Normally one would directly call QueueJob() with parameters anyway.
174 /// </remarks>
175 /// <returns></returns>
176 /// <param name="name">Name.</param>
177 /// <param name="action">Action.</param>
178 /// <param name="commonId">Common identifier.</param>
179 public static Job MakeJob(string name, Action action, string commonId = null)
180 {
181 return Job.MakeJob(name, action, commonId);
182 }
183
184 /// <summary>
185 /// Remove the next job queued for processing.
186 /// </summary>
187 /// <remarks>
188 /// Returns null if there is no next job.
189 /// Will not remove a job currently being performed.
190 /// </remarks>
191 public Job RemoveNextJob()
192 {
193 Job nextJob;
194 m_jobQueue.TryTake(out nextJob);
195
196 return nextJob;
197 }
198
199 /// <summary>
200 /// Queue the job for processing.
201 /// </summary>
202 /// <returns><c>true</c>, if job was queued, <c>false</c> otherwise.</returns>
203 /// <param name="name">Name of job. This appears on the console and in logging.</param>
204 /// <param name="action">Action to perform.</param>
205 /// <param name="commonId">
206 /// Common identifier for a set of jobs. This is allows a set of jobs to be removed
207 /// if required (e.g. all jobs for a given agent. Optional.
208 /// </param>
209 public bool QueueJob(string name, Action action, string commonId = null)
210 {
211 return QueueJob(MakeJob(name, action, commonId));
212 }
213
214 /// <summary>
215 /// Queue the job for processing.
216 /// </summary>
217 /// <returns><c>true</c>, if job was queued, <c>false</c> otherwise.</returns>
218 /// <param name="job">The job</param>
219 /// </param>
220 public bool QueueJob(Job job)
221 {
222 if (m_jobQueue.Count < m_jobQueue.BoundedCapacity)
223 {
224 m_jobQueue.Add(job);
225
226 if (!m_warnOverMaxQueue)
227 m_warnOverMaxQueue = true;
228
229 return true;
230 }
231 else
232 {
233 if (m_warnOverMaxQueue)
234 {
235 m_log.WarnFormat(
236 "[{0}]: Job queue at maximum capacity, not recording job from {1} in {2}",
237 LoggingName, job.Name, Name);
238
239 m_warnOverMaxQueue = false;
240 }
241
242 return false;
243 }
244 }
245
246 private void ProcessRequests()
247 {
248 try
249 {
250 while (IsRunning || m_jobQueue.Count > 0)
251 {
252 CurrentJob = m_jobQueue.Take(m_cancelSource.Token);
253
254 if (LogLevel >= 1)
255 m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name);
256
257 try
258 {
259 CurrentJob.Action();
260 }
261 catch (Exception e)
262 {
263 m_log.Error(
264 string.Format(
265 "[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e);
266 }
267
268 if (LogLevel >= 1)
269 m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name);
270
271 CurrentJob = null;
272 }
273 }
274 catch (OperationCanceledException)
275 {
276 }
277
278 m_finishedProcessingAfterStop.Set();
279 }
280
281 public class Job
282 {
283 /// <summary>
284 /// Name of the job.
285 /// </summary>
286 /// <remarks>
287 /// This appears on console and debug output.
288 /// </remarks>
289 public string Name { get; private set; }
290
291 /// <summary>
292 /// Common ID for this job.
293 /// </summary>
294 /// <remarks>
295 /// This allows all jobs with a certain common ID (e.g. a client UUID) to be removed en-masse if required.
296 /// Can be null if this is not required.
297 /// </remarks>
298 public string CommonId { get; private set; }
299
300 /// <summary>
301 /// Action to perform when this job is processed.
302 /// </summary>
303 public Action Action { get; private set; }
304
305 private Job(string name, string commonId, Action action)
306 {
307 Name = name;
308 CommonId = commonId;
309 Action = action;
310 }
311
312 /// <summary>
313 /// Make a job. It needs to be separately queued.
314 /// </summary>
315 /// <remarks>
316 /// We provide this method to replace the constructor so that we can pool job objects if necessary to
317 /// to reduce memory churn. Normally one would directly call JobEngine.QueueJob() with parameters anyway.
318 /// </remarks>
319 /// <returns></returns>
320 /// <param name="name">Name.</param>
321 /// <param name="action">Action.</param>
322 /// <param name="commonId">Common identifier.</param>
323 public static Job MakeJob(string name, Action action, string commonId = null)
324 {
325 return new Job(name, commonId, action);
326 }
327 }
328 }
329} \ No newline at end of file