aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Monitoring/JobEngine.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Monitoring/JobEngine.cs')
-rw-r--r--OpenSim/Framework/Monitoring/JobEngine.cs320
1 files changed, 320 insertions, 0 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs
new file mode 100644
index 0000000..5925867
--- /dev/null
+++ b/OpenSim/Framework/Monitoring/JobEngine.cs
@@ -0,0 +1,320 @@
1/*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28using System;
29using System.Collections.Concurrent;
30using System.Reflection;
31using System.Threading;
32using log4net;
33using OpenSim.Framework;
34
35namespace OpenSim.Framework.Monitoring
36{
37 public class Job
38 {
39 public string Name;
40 public WaitCallback Callback;
41 public object O;
42
43 public Job(string name, WaitCallback callback, object o)
44 {
45 Name = name;
46 Callback = callback;
47 O = o;
48 }
49 }
50
51 public class JobEngine
52 {
53 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
54
55 public int LogLevel { get; set; }
56
57 public bool IsRunning { get; private set; }
58
59 /// <summary>
60 /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
61 /// </summary>
62 public int RequestProcessTimeoutOnStop { get; set; }
63
64 /// <summary>
65 /// Controls whether we need to warn in the log about exceeding the max queue size.
66 /// </summary>
67 /// <remarks>
68 /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
69 /// order to avoid spamming the log with lots of warnings.
70 /// </remarks>
71 private bool m_warnOverMaxQueue = true;
72
73 private BlockingCollection<Job> m_requestQueue;
74
75 private CancellationTokenSource m_cancelSource = new CancellationTokenSource();
76
77 private Stat m_requestsWaitingStat;
78
79 private Job m_currentJob;
80
81 /// <summary>
82 /// Used to signal that we are ready to complete stop.
83 /// </summary>
84 private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
85
86 public JobEngine()
87 {
88 RequestProcessTimeoutOnStop = 5000;
89
90 MainConsole.Instance.Commands.AddCommand(
91 "Debug",
92 false,
93 "debug jobengine",
94 "debug jobengine <start|stop|status|log>",
95 "Start, stop, get status or set logging level of the job engine.",
96 "If stopped then all outstanding jobs are processed immediately.",
97 HandleControlCommand);
98 }
99
100 public void Start()
101 {
102 lock (this)
103 {
104 if (IsRunning)
105 return;
106
107 IsRunning = true;
108
109 m_finishedProcessingAfterStop.Reset();
110
111 m_requestQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000);
112
113 m_requestsWaitingStat =
114 new Stat(
115 "JobsWaiting",
116 "Number of jobs waiting for processing.",
117 "",
118 "",
119 "server",
120 "jobengine",
121 StatType.Pull,
122 MeasuresOfInterest.None,
123 stat => stat.Value = m_requestQueue.Count,
124 StatVerbosity.Debug);
125
126 StatsManager.RegisterStat(m_requestsWaitingStat);
127
128 WorkManager.StartThread(
129 ProcessRequests,
130 "JobEngineThread",
131 ThreadPriority.Normal,
132 false,
133 true,
134 null,
135 int.MaxValue);
136 }
137 }
138
139 public void Stop()
140 {
141 lock (this)
142 {
143 try
144 {
145 if (!IsRunning)
146 return;
147
148 IsRunning = false;
149
150 int requestsLeft = m_requestQueue.Count;
151
152 if (requestsLeft <= 0)
153 {
154 m_cancelSource.Cancel();
155 }
156 else
157 {
158 m_log.InfoFormat("[JOB ENGINE]: Waiting to write {0} events after stop.", requestsLeft);
159
160 while (requestsLeft > 0)
161 {
162 if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
163 {
164 // After timeout no events have been written
165 if (requestsLeft == m_requestQueue.Count)
166 {
167 m_log.WarnFormat(
168 "[JOB ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests",
169 RequestProcessTimeoutOnStop, requestsLeft);
170
171 break;
172 }
173 }
174
175 requestsLeft = m_requestQueue.Count;
176 }
177 }
178 }
179 finally
180 {
181 m_cancelSource.Dispose();
182 StatsManager.DeregisterStat(m_requestsWaitingStat);
183 m_requestsWaitingStat = null;
184 m_requestQueue = null;
185 }
186 }
187 }
188
189 public bool QueueRequest(string name, WaitCallback req, object o)
190 {
191 if (LogLevel >= 1)
192 m_log.DebugFormat("[JOB ENGINE]: Queued job {0}", name);
193
194 if (m_requestQueue.Count < m_requestQueue.BoundedCapacity)
195 {
196 // m_log.DebugFormat(
197 // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}",
198 // categories, client.AgentID, m_udpServer.Scene.Name);
199
200 m_requestQueue.Add(new Job(name, req, o));
201
202 if (!m_warnOverMaxQueue)
203 m_warnOverMaxQueue = true;
204
205 return true;
206 }
207 else
208 {
209 if (m_warnOverMaxQueue)
210 {
211// m_log.WarnFormat(
212// "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}",
213// client.AgentID, m_udpServer.Scene.Name);
214
215 m_log.WarnFormat("[JOB ENGINE]: Request queue at maximum capacity, not recording job");
216
217 m_warnOverMaxQueue = false;
218 }
219
220 return false;
221 }
222 }
223
224 private void ProcessRequests()
225 {
226 try
227 {
228 while (IsRunning || m_requestQueue.Count > 0)
229 {
230 m_currentJob = m_requestQueue.Take(m_cancelSource.Token);
231
232 // QueueEmpty callback = req.Client.OnQueueEmpty;
233 //
234 // if (callback != null)
235 // {
236 // try
237 // {
238 // callback(req.Categories);
239 // }
240 // catch (Exception e)
241 // {
242 // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e);
243 // }
244 // }
245
246 if (LogLevel >= 1)
247 m_log.DebugFormat("[JOB ENGINE]: Processing job {0}", m_currentJob.Name);
248
249 try
250 {
251 m_currentJob.Callback.Invoke(m_currentJob.O);
252 }
253 catch (Exception e)
254 {
255 m_log.Error(
256 string.Format(
257 "[JOB ENGINE]: Job {0} failed, continuing. Exception ", m_currentJob.Name), e);
258 }
259
260 if (LogLevel >= 1)
261 m_log.DebugFormat("[JOB ENGINE]: Processed job {0}", m_currentJob.Name);
262
263 m_currentJob = null;
264 }
265 }
266 catch (OperationCanceledException)
267 {
268 }
269
270 m_finishedProcessingAfterStop.Set();
271 }
272
273 private void HandleControlCommand(string module, string[] args)
274 {
275// if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
276// return;
277
278 if (args.Length < 3)
279 {
280 MainConsole.Instance.Output("Usage: debug jobengine <stop|start|status|log>");
281 return;
282 }
283
284 string subCommand = args[2];
285
286 if (subCommand == "stop")
287 {
288 Stop();
289 MainConsole.Instance.OutputFormat("Stopped job engine.");
290 }
291 else if (subCommand == "start")
292 {
293 Start();
294 MainConsole.Instance.OutputFormat("Started job engine.");
295 }
296 else if (subCommand == "status")
297 {
298 MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning);
299 MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none");
300 MainConsole.Instance.OutputFormat(
301 "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a");
302 MainConsole.Instance.OutputFormat("Log Level: {0}", LogLevel);
303 }
304 else if (subCommand == "log")
305 {
306// int logLevel;
307 int logLevel = int.Parse(args[3]);
308// if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel))
309// {
310 LogLevel = logLevel;
311 MainConsole.Instance.OutputFormat("Set debug log level to {0}", LogLevel);
312// }
313 }
314 else
315 {
316 MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand);
317 }
318 }
319 }
320}