aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Region/CoreModules/Framework/EntityTransfer
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Region/CoreModules/Framework/EntityTransfer')
-rw-r--r--OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs327
1 files changed, 327 insertions, 0 deletions
diff --git a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs
new file mode 100644
index 0000000..5cf2e1f
--- /dev/null
+++ b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs
@@ -0,0 +1,327 @@
1/*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28using System;
29using System.Collections.Concurrent;
30using System.Reflection;
31using System.Threading;
32using log4net;
33using OpenSim.Framework;
34using OpenSim.Framework.Monitoring;
35using OpenSim.Region.Framework.Scenes;
36
37namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
38{
39 public class Job
40 {
41 public string Name;
42 public WaitCallback Callback;
43 public object O;
44
45 public Job(string name, WaitCallback callback, object o)
46 {
47 Name = name;
48 Callback = callback;
49 O = o;
50 }
51 }
52
53 // TODO: These kinds of classes MUST be generalized with JobEngine, etc.
54 public class HGIncomingSceneObjectEngine
55 {
56 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
57
58 public int LogLevel { get; set; }
59
60 public bool IsRunning { get; private set; }
61
62 public string Name { get; set; }
63
64 /// <summary>
65 /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
66 /// </summary>
67 public int RequestProcessTimeoutOnStop { get; set; }
68
69 /// <summary>
70 /// Controls whether we need to warn in the log about exceeding the max queue size.
71 /// </summary>
72 /// <remarks>
73 /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
74 /// order to avoid spamming the log with lots of warnings.
75 /// </remarks>
76 private bool m_warnOverMaxQueue = true;
77
78 private BlockingCollection<Job> m_requestQueue;
79
80 private CancellationTokenSource m_cancelSource = new CancellationTokenSource();
81
82 private Stat m_requestsWaitingStat;
83
84 private Job m_currentJob;
85
86 /// <summary>
87 /// Used to signal that we are ready to complete stop.
88 /// </summary>
89 private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
90
91 public HGIncomingSceneObjectEngine(string name)
92 {
93 Name = name;
94 RequestProcessTimeoutOnStop = 5000;
95
96// MainConsole.Instance.Commands.AddCommand(
97// "Debug",
98// false,
99// "debug jobengine",
100// "debug jobengine <start|stop|status>",
101// "Start, stop or get status of the job engine.",
102// "If stopped then all jobs are processed immediately.",
103// HandleControlCommand);
104 }
105
106 public void Start()
107 {
108 lock (this)
109 {
110 if (IsRunning)
111 return;
112
113 IsRunning = true;
114
115 m_finishedProcessingAfterStop.Reset();
116
117 m_requestQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000);
118
119 m_requestsWaitingStat =
120 new Stat(
121 "HGIncomingAttachmentsWaiting",
122 "Number of incoming attachments waiting for processing.",
123 "",
124 "",
125 "entitytransfer",
126 Name,
127 StatType.Pull,
128 MeasuresOfInterest.None,
129 stat => stat.Value = m_requestQueue.Count,
130 StatVerbosity.Debug);
131
132 StatsManager.RegisterStat(m_requestsWaitingStat);
133
134 Watchdog.StartThread(
135 ProcessRequests,
136 "HG Incoming Scene Object Engine Thread",
137 ThreadPriority.Normal,
138 false,
139 true,
140 null,
141 int.MaxValue);
142 }
143 }
144
145 public void Stop()
146 {
147 lock (this)
148 {
149 try
150 {
151 if (!IsRunning)
152 return;
153
154 IsRunning = false;
155
156 int requestsLeft = m_requestQueue.Count;
157
158 if (requestsLeft <= 0)
159 {
160 m_cancelSource.Cancel();
161 }
162 else
163 {
164 m_log.InfoFormat("[HG INCOMING SCENE OBJECT ENGINE]: Waiting to write {0} events after stop.", requestsLeft);
165
166 while (requestsLeft > 0)
167 {
168 if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
169 {
170 // After timeout no events have been written
171 if (requestsLeft == m_requestQueue.Count)
172 {
173 m_log.WarnFormat(
174 "[HG INCOMING SCENE OBJECT ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests",
175 RequestProcessTimeoutOnStop, requestsLeft);
176
177 break;
178 }
179 }
180
181 requestsLeft = m_requestQueue.Count;
182 }
183 }
184 }
185 finally
186 {
187 m_cancelSource.Dispose();
188 StatsManager.DeregisterStat(m_requestsWaitingStat);
189 m_requestsWaitingStat = null;
190 m_requestQueue = null;
191 }
192 }
193 }
194
195 public bool QueueRequest(string name, WaitCallback req, object o)
196 {
197 if (LogLevel >= 1)
198 m_log.DebugFormat("[HG INCOMING SCENE OBJECT ENGINE]: Queued job {0}", name);
199
200 if (m_requestQueue.Count < m_requestQueue.BoundedCapacity)
201 {
202 // m_log.DebugFormat(
203 // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}",
204 // categories, client.AgentID, m_udpServer.Scene.Name);
205
206 m_requestQueue.Add(new Job(name, req, o));
207
208 if (!m_warnOverMaxQueue)
209 m_warnOverMaxQueue = true;
210
211 return true;
212 }
213 else
214 {
215 if (m_warnOverMaxQueue)
216 {
217 // m_log.WarnFormat(
218 // "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}",
219 // client.AgentID, m_udpServer.Scene.Name);
220
221 m_log.WarnFormat("[HG INCOMING SCENE OBJECT ENGINE]: Request queue at maximum capacity, not recording job");
222
223 m_warnOverMaxQueue = false;
224 }
225
226 return false;
227 }
228 }
229
230 private void ProcessRequests()
231 {
232 try
233 {
234 while (IsRunning || m_requestQueue.Count > 0)
235 {
236 m_currentJob = m_requestQueue.Take(m_cancelSource.Token);
237
238 // QueueEmpty callback = req.Client.OnQueueEmpty;
239 //
240 // if (callback != null)
241 // {
242 // try
243 // {
244 // callback(req.Categories);
245 // }
246 // catch (Exception e)
247 // {
248 // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e);
249 // }
250 // }
251
252 if (LogLevel >= 1)
253 m_log.DebugFormat("[HG INCOMING SCENE OBJECT ENGINE]: Processing job {0}", m_currentJob.Name);
254
255 try
256 {
257 m_currentJob.Callback.Invoke(m_currentJob.O);
258 }
259 catch (Exception e)
260 {
261 m_log.Error(
262 string.Format(
263 "[HG INCOMING SCENE OBJECT ENGINE]: Job {0} failed, continuing. Exception ", m_currentJob.Name), e);
264 }
265
266 if (LogLevel >= 1)
267 m_log.DebugFormat("[HG INCOMING SCENE OBJECT ENGINE]: Processed job {0}", m_currentJob.Name);
268
269 m_currentJob = null;
270 }
271 }
272 catch (OperationCanceledException)
273 {
274 }
275
276 m_finishedProcessingAfterStop.Set();
277 }
278
279// private void HandleControlCommand(string module, string[] args)
280// {
281// // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
282// // return;
283//
284// if (args.Length < 3)
285// {
286// MainConsole.Instance.Output("Usage: debug jobengine <stop|start|status|loglevel>");
287// return;
288// }
289//
290// string subCommand = args[2];
291//
292// if (subCommand == "stop")
293// {
294// Stop();
295// MainConsole.Instance.OutputFormat("Stopped job engine.");
296// }
297// else if (subCommand == "start")
298// {
299// Start();
300// MainConsole.Instance.OutputFormat("Started job engine.");
301// }
302// else if (subCommand == "status")
303// {
304// MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning);
305// MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none");
306// MainConsole.Instance.OutputFormat(
307// "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a");
308// MainConsole.Instance.OutputFormat("Log Level: {0}", LogLevel);
309// }
310//
311// else if (subCommand == "loglevel")
312// {
313// // int logLevel;
314// int logLevel = int.Parse(args[3]);
315// // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel))
316// // {
317// LogLevel = logLevel;
318// MainConsole.Instance.OutputFormat("Set log level to {0}", LogLevel);
319// // }
320// }
321// else
322// {
323// MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand);
324// }
325// }
326 }
327}