aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Monitoring/JobEngine.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Monitoring/JobEngine.cs')
-rw-r--r--OpenSim/Framework/Monitoring/JobEngine.cs292
1 files changed, 292 insertions, 0 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs
new file mode 100644
index 0000000..b401a5c
--- /dev/null
+++ b/OpenSim/Framework/Monitoring/JobEngine.cs
@@ -0,0 +1,292 @@
1/*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28using System;
29using System.Collections.Concurrent;
30using System.Reflection;
31using System.Threading;
32using log4net;
33using OpenSim.Framework;
34
35namespace OpenSim.Framework.Monitoring
36{
37 public class Job
38 {
39 public string Name;
40 public WaitCallback Callback;
41 public object O;
42
43 public Job(string name, WaitCallback callback, object o)
44 {
45 Name = name;
46 Callback = callback;
47 O = o;
48 }
49 }
50
51 public class JobEngine
52 {
53 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
54
55 public bool IsRunning { get; private set; }
56
57 /// <summary>
58 /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
59 /// </summary>
60 public int RequestProcessTimeoutOnStop { get; set; }
61
62 /// <summary>
63 /// Controls whether we need to warn in the log about exceeding the max queue size.
64 /// </summary>
65 /// <remarks>
66 /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
67 /// order to avoid spamming the log with lots of warnings.
68 /// </remarks>
69 private bool m_warnOverMaxQueue = true;
70
71 private BlockingCollection<Job> m_requestQueue;
72
73 private CancellationTokenSource m_cancelSource = new CancellationTokenSource();
74
75 private Stat m_requestsWaitingStat;
76
77 private Job m_currentJob;
78
79 /// <summary>
80 /// Used to signal that we are ready to complete stop.
81 /// </summary>
82 private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
83
84 public JobEngine()
85 {
86 RequestProcessTimeoutOnStop = 5000;
87
88 MainConsole.Instance.Commands.AddCommand(
89 "Debug",
90 false,
91 "debug jobengine",
92 "debug jobengine <start|stop|status>",
93 "Start, stop or get status of the job engine.",
94 "If stopped then all jobs are processed immediately.",
95 HandleControlCommand);
96 }
97
98 public void Start()
99 {
100 lock (this)
101 {
102 if (IsRunning)
103 return;
104
105 IsRunning = true;
106
107 m_finishedProcessingAfterStop.Reset();
108
109 m_requestQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000);
110
111 m_requestsWaitingStat =
112 new Stat(
113 "JobsWaiting",
114 "Number of jobs waiting for processing.",
115 "",
116 "",
117 "server",
118 "jobengine",
119 StatType.Pull,
120 MeasuresOfInterest.None,
121 stat => stat.Value = m_requestQueue.Count,
122 StatVerbosity.Debug);
123
124 StatsManager.RegisterStat(m_requestsWaitingStat);
125
126 Watchdog.StartThread(
127 ProcessRequests,
128 "JobEngineThread",
129 ThreadPriority.Normal,
130 false,
131 true,
132 null,
133 int.MaxValue);
134 }
135 }
136
137 public void Stop()
138 {
139 lock (this)
140 {
141 try
142 {
143 if (!IsRunning)
144 return;
145
146 IsRunning = false;
147
148 int requestsLeft = m_requestQueue.Count;
149
150 if (requestsLeft <= 0)
151 {
152 m_cancelSource.Cancel();
153 }
154 else
155 {
156 m_log.InfoFormat("[JOB ENGINE]: Waiting to write {0} events after stop.", requestsLeft);
157
158 while (requestsLeft > 0)
159 {
160 if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
161 {
162 // After timeout no events have been written
163 if (requestsLeft == m_requestQueue.Count)
164 {
165 m_log.WarnFormat(
166 "[JOB ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests",
167 RequestProcessTimeoutOnStop, requestsLeft);
168
169 break;
170 }
171 }
172
173 requestsLeft = m_requestQueue.Count;
174 }
175 }
176 }
177 finally
178 {
179 m_cancelSource.Dispose();
180 StatsManager.DeregisterStat(m_requestsWaitingStat);
181 m_requestsWaitingStat = null;
182 m_requestQueue = null;
183 }
184 }
185 }
186
187 public bool QueueRequest(string name, WaitCallback req, object o)
188 {
189 m_log.DebugFormat("[JOB ENGINE]: Queued job {0}", name);
190
191 if (m_requestQueue.Count < m_requestQueue.BoundedCapacity)
192 {
193 // m_log.DebugFormat(
194 // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}",
195 // categories, client.AgentID, m_udpServer.Scene.Name);
196
197 m_requestQueue.Add(new Job(name, req, o));
198
199 if (!m_warnOverMaxQueue)
200 m_warnOverMaxQueue = true;
201
202 return true;
203 }
204 else
205 {
206 if (m_warnOverMaxQueue)
207 {
208// m_log.WarnFormat(
209// "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}",
210// client.AgentID, m_udpServer.Scene.Name);
211
212 m_log.WarnFormat("[JOB ENGINE]: Request queue at maximum capacity, not recording job");
213
214 m_warnOverMaxQueue = false;
215 }
216
217 return false;
218 }
219 }
220
221 private void ProcessRequests()
222 {
223 try
224 {
225 while (IsRunning || m_requestQueue.Count > 0)
226 {
227 m_currentJob = m_requestQueue.Take(m_cancelSource.Token);
228
229 // QueueEmpty callback = req.Client.OnQueueEmpty;
230 //
231 // if (callback != null)
232 // {
233 // try
234 // {
235 // callback(req.Categories);
236 // }
237 // catch (Exception e)
238 // {
239 // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e);
240 // }
241 // }
242
243 m_log.DebugFormat("[JOB ENGINE]: Processing job {0}", m_currentJob.Name);
244 m_currentJob.Callback.Invoke(m_currentJob.O);
245 m_log.DebugFormat("[JOB ENGINE]: Processed job {0}", m_currentJob.Name);
246 m_currentJob = null;
247 }
248 }
249 catch (OperationCanceledException)
250 {
251 }
252
253 m_finishedProcessingAfterStop.Set();
254 }
255
256 private void HandleControlCommand(string module, string[] args)
257 {
258// if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
259// return;
260
261 if (args.Length != 3)
262 {
263 MainConsole.Instance.Output("Usage: debug jobengine <stop|start|status>");
264 return;
265 }
266
267 string subCommand = args[2];
268
269 if (subCommand == "stop")
270 {
271 Stop();
272 MainConsole.Instance.OutputFormat("Stopped job engine.");
273 }
274 else if (subCommand == "start")
275 {
276 Start();
277 MainConsole.Instance.OutputFormat("Started job engine.");
278 }
279 else if (subCommand == "status")
280 {
281 MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning);
282 MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none");
283 MainConsole.Instance.OutputFormat(
284 "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a");
285 }
286 else
287 {
288 MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand);
289 }
290 }
291 }
292} \ No newline at end of file