aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim')
-rw-r--r--OpenSim/Framework/Monitoring/JobEngine.cs292
-rw-r--r--OpenSim/Framework/Monitoring/Watchdog.cs18
-rw-r--r--OpenSim/Region/Framework/Scenes/ScenePresence.cs2
3 files changed, 311 insertions, 1 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs
new file mode 100644
index 0000000..b401a5c
--- /dev/null
+++ b/OpenSim/Framework/Monitoring/JobEngine.cs
@@ -0,0 +1,292 @@
1/*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28using System;
29using System.Collections.Concurrent;
30using System.Reflection;
31using System.Threading;
32using log4net;
33using OpenSim.Framework;
34
35namespace OpenSim.Framework.Monitoring
36{
37 public class Job
38 {
39 public string Name;
40 public WaitCallback Callback;
41 public object O;
42
43 public Job(string name, WaitCallback callback, object o)
44 {
45 Name = name;
46 Callback = callback;
47 O = o;
48 }
49 }
50
51 public class JobEngine
52 {
53 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
54
55 public bool IsRunning { get; private set; }
56
57 /// <summary>
58 /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
59 /// </summary>
60 public int RequestProcessTimeoutOnStop { get; set; }
61
62 /// <summary>
63 /// Controls whether we need to warn in the log about exceeding the max queue size.
64 /// </summary>
65 /// <remarks>
66 /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
67 /// order to avoid spamming the log with lots of warnings.
68 /// </remarks>
69 private bool m_warnOverMaxQueue = true;
70
71 private BlockingCollection<Job> m_requestQueue;
72
73 private CancellationTokenSource m_cancelSource = new CancellationTokenSource();
74
75 private Stat m_requestsWaitingStat;
76
77 private Job m_currentJob;
78
79 /// <summary>
80 /// Used to signal that we are ready to complete stop.
81 /// </summary>
82 private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
83
84 public JobEngine()
85 {
86 RequestProcessTimeoutOnStop = 5000;
87
88 MainConsole.Instance.Commands.AddCommand(
89 "Debug",
90 false,
91 "debug jobengine",
92 "debug jobengine <start|stop|status>",
93 "Start, stop or get status of the job engine.",
94 "If stopped then all jobs are processed immediately.",
95 HandleControlCommand);
96 }
97
98 public void Start()
99 {
100 lock (this)
101 {
102 if (IsRunning)
103 return;
104
105 IsRunning = true;
106
107 m_finishedProcessingAfterStop.Reset();
108
109 m_requestQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000);
110
111 m_requestsWaitingStat =
112 new Stat(
113 "JobsWaiting",
114 "Number of jobs waiting for processing.",
115 "",
116 "",
117 "server",
118 "jobengine",
119 StatType.Pull,
120 MeasuresOfInterest.None,
121 stat => stat.Value = m_requestQueue.Count,
122 StatVerbosity.Debug);
123
124 StatsManager.RegisterStat(m_requestsWaitingStat);
125
126 Watchdog.StartThread(
127 ProcessRequests,
128 "JobEngineThread",
129 ThreadPriority.Normal,
130 false,
131 true,
132 null,
133 int.MaxValue);
134 }
135 }
136
137 public void Stop()
138 {
139 lock (this)
140 {
141 try
142 {
143 if (!IsRunning)
144 return;
145
146 IsRunning = false;
147
148 int requestsLeft = m_requestQueue.Count;
149
150 if (requestsLeft <= 0)
151 {
152 m_cancelSource.Cancel();
153 }
154 else
155 {
156 m_log.InfoFormat("[JOB ENGINE]: Waiting to write {0} events after stop.", requestsLeft);
157
158 while (requestsLeft > 0)
159 {
160 if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
161 {
162 // After timeout no events have been written
163 if (requestsLeft == m_requestQueue.Count)
164 {
165 m_log.WarnFormat(
166 "[JOB ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests",
167 RequestProcessTimeoutOnStop, requestsLeft);
168
169 break;
170 }
171 }
172
173 requestsLeft = m_requestQueue.Count;
174 }
175 }
176 }
177 finally
178 {
179 m_cancelSource.Dispose();
180 StatsManager.DeregisterStat(m_requestsWaitingStat);
181 m_requestsWaitingStat = null;
182 m_requestQueue = null;
183 }
184 }
185 }
186
187 public bool QueueRequest(string name, WaitCallback req, object o)
188 {
189 m_log.DebugFormat("[JOB ENGINE]: Queued job {0}", name);
190
191 if (m_requestQueue.Count < m_requestQueue.BoundedCapacity)
192 {
193 // m_log.DebugFormat(
194 // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}",
195 // categories, client.AgentID, m_udpServer.Scene.Name);
196
197 m_requestQueue.Add(new Job(name, req, o));
198
199 if (!m_warnOverMaxQueue)
200 m_warnOverMaxQueue = true;
201
202 return true;
203 }
204 else
205 {
206 if (m_warnOverMaxQueue)
207 {
208// m_log.WarnFormat(
209// "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}",
210// client.AgentID, m_udpServer.Scene.Name);
211
212 m_log.WarnFormat("[JOB ENGINE]: Request queue at maximum capacity, not recording job");
213
214 m_warnOverMaxQueue = false;
215 }
216
217 return false;
218 }
219 }
220
221 private void ProcessRequests()
222 {
223 try
224 {
225 while (IsRunning || m_requestQueue.Count > 0)
226 {
227 m_currentJob = m_requestQueue.Take(m_cancelSource.Token);
228
229 // QueueEmpty callback = req.Client.OnQueueEmpty;
230 //
231 // if (callback != null)
232 // {
233 // try
234 // {
235 // callback(req.Categories);
236 // }
237 // catch (Exception e)
238 // {
239 // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e);
240 // }
241 // }
242
243 m_log.DebugFormat("[JOB ENGINE]: Processing job {0}", m_currentJob.Name);
244 m_currentJob.Callback.Invoke(m_currentJob.O);
245 m_log.DebugFormat("[JOB ENGINE]: Processed job {0}", m_currentJob.Name);
246 m_currentJob = null;
247 }
248 }
249 catch (OperationCanceledException)
250 {
251 }
252
253 m_finishedProcessingAfterStop.Set();
254 }
255
256 private void HandleControlCommand(string module, string[] args)
257 {
258// if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
259// return;
260
261 if (args.Length != 3)
262 {
263 MainConsole.Instance.Output("Usage: debug jobengine <stop|start|status>");
264 return;
265 }
266
267 string subCommand = args[2];
268
269 if (subCommand == "stop")
270 {
271 Stop();
272 MainConsole.Instance.OutputFormat("Stopped job engine.");
273 }
274 else if (subCommand == "start")
275 {
276 Start();
277 MainConsole.Instance.OutputFormat("Started job engine.");
278 }
279 else if (subCommand == "status")
280 {
281 MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning);
282 MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none");
283 MainConsole.Instance.OutputFormat(
284 "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a");
285 }
286 else
287 {
288 MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand);
289 }
290 }
291 }
292} \ No newline at end of file
diff --git a/OpenSim/Framework/Monitoring/Watchdog.cs b/OpenSim/Framework/Monitoring/Watchdog.cs
index 0fcb195..b00aa5c 100644
--- a/OpenSim/Framework/Monitoring/Watchdog.cs
+++ b/OpenSim/Framework/Monitoring/Watchdog.cs
@@ -133,6 +133,8 @@ namespace OpenSim.Framework.Monitoring
133 /// /summary> 133 /// /summary>
134 public static event Action<ThreadWatchdogInfo> OnWatchdogTimeout; 134 public static event Action<ThreadWatchdogInfo> OnWatchdogTimeout;
135 135
136 private static JobEngine m_jobEngine;
137
136 /// <summary> 138 /// <summary>
137 /// Is this watchdog active? 139 /// Is this watchdog active?
138 /// </summary> 140 /// </summary>
@@ -173,6 +175,7 @@ namespace OpenSim.Framework.Monitoring
173 175
174 static Watchdog() 176 static Watchdog()
175 { 177 {
178 m_jobEngine = new JobEngine();
176 m_threads = new Dictionary<int, ThreadWatchdogInfo>(); 179 m_threads = new Dictionary<int, ThreadWatchdogInfo>();
177 m_watchdogTimer = new System.Timers.Timer(WATCHDOG_INTERVAL_MS); 180 m_watchdogTimer = new System.Timers.Timer(WATCHDOG_INTERVAL_MS);
178 m_watchdogTimer.AutoReset = false; 181 m_watchdogTimer.AutoReset = false;
@@ -450,5 +453,20 @@ namespace OpenSim.Framework.Monitoring
450 453
451 m_watchdogTimer.Start(); 454 m_watchdogTimer.Start();
452 } 455 }
456
457 public static void RunWhenPossible(string jobType, WaitCallback callback, string name, object obj, bool log = false)
458 {
459 if (Util.FireAndForgetMethod == FireAndForgetMethod.RegressionTest)
460 {
461 Culture.SetCurrentCulture();
462 callback(obj);
463 return;
464 }
465
466 if (m_jobEngine.IsRunning)
467 m_jobEngine.QueueRequest(name, callback, obj);
468 else
469 RunInThread(callback, name, obj, log);
470 }
453 } 471 }
454} \ No newline at end of file 472} \ No newline at end of file
diff --git a/OpenSim/Region/Framework/Scenes/ScenePresence.cs b/OpenSim/Region/Framework/Scenes/ScenePresence.cs
index 87063c6..2718785 100644
--- a/OpenSim/Region/Framework/Scenes/ScenePresence.cs
+++ b/OpenSim/Region/Framework/Scenes/ScenePresence.cs
@@ -3362,7 +3362,7 @@ namespace OpenSim.Region.Framework.Scenes
3362 SentInitialDataToClient = true; 3362 SentInitialDataToClient = true;
3363 3363
3364 // Send all scene object to the new client 3364 // Send all scene object to the new client
3365 Watchdog.RunInThread(delegate 3365 Watchdog.RunWhenPossible("SendInitialDataToClient", delegate
3366 { 3366 {
3367// m_log.DebugFormat( 3367// m_log.DebugFormat(
3368// "[SCENE PRESENCE]: Sending initial data to {0} agent {1} in {2}, tp flags {3}", 3368// "[SCENE PRESENCE]: Sending initial data to {0} agent {1} in {2}, tp flags {3}",