aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Region/CoreModules
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Region/CoreModules')
-rw-r--r--OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs41
-rw-r--r--OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs344
2 files changed, 29 insertions, 356 deletions
diff --git a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs
index fceda80..fa23590 100644
--- a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs
+++ b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs
@@ -31,6 +31,7 @@ using System.Reflection;
31 31
32using OpenSim.Framework; 32using OpenSim.Framework;
33using OpenSim.Framework.Client; 33using OpenSim.Framework.Client;
34using OpenSim.Framework.Monitoring;
34using OpenSim.Region.Framework.Interfaces; 35using OpenSim.Region.Framework.Interfaces;
35using OpenSim.Region.Framework.Scenes; 36using OpenSim.Region.Framework.Scenes;
36using OpenSim.Services.Connectors.Hypergrid; 37using OpenSim.Services.Connectors.Hypergrid;
@@ -113,7 +114,7 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
113 /// <summary> 114 /// <summary>
114 /// Used for processing analysis of incoming attachments in a controlled fashion. 115 /// Used for processing analysis of incoming attachments in a controlled fashion.
115 /// </summary> 116 /// </summary>
116 private HGIncomingSceneObjectEngine m_incomingSceneObjectEngine; 117 private JobEngine m_incomingSceneObjectEngine;
117 118
118 #region ISharedRegionModule 119 #region ISharedRegionModule
119 120
@@ -160,7 +161,24 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
160 scene.RegisterModuleInterface<IUserAgentVerificationModule>(this); 161 scene.RegisterModuleInterface<IUserAgentVerificationModule>(this);
161 //scene.EventManager.OnIncomingSceneObject += OnIncomingSceneObject; 162 //scene.EventManager.OnIncomingSceneObject += OnIncomingSceneObject;
162 163
163 m_incomingSceneObjectEngine = new HGIncomingSceneObjectEngine(scene.Name); 164 m_incomingSceneObjectEngine
165 = new JobEngine(
166 string.Format("HG Incoming Scene Object Engine ({0})", scene.Name),
167 "HG INCOMING SCENE OBJECT ENGINE");
168
169 StatsManager.RegisterStat(
170 new Stat(
171 "HGIncomingAttachmentsWaiting",
172 "Number of incoming attachments waiting for processing.",
173 "",
174 "",
175 "entitytransfer",
176 Name,
177 StatType.Pull,
178 MeasuresOfInterest.None,
179 stat => stat.Value = m_incomingSceneObjectEngine.JobsWaiting,
180 StatVerbosity.Debug));
181
164 m_incomingSceneObjectEngine.Start(); 182 m_incomingSceneObjectEngine.Start();
165 } 183 }
166 } 184 }
@@ -548,11 +566,11 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
548 566
549 private void RemoveIncomingSceneObjectJobs(string commonIdToRemove) 567 private void RemoveIncomingSceneObjectJobs(string commonIdToRemove)
550 { 568 {
551 List<Job> jobsToReinsert = new List<Job>(); 569 List<JobEngine.Job> jobsToReinsert = new List<JobEngine.Job>();
552 int jobsRemoved = 0; 570 int jobsRemoved = 0;
553 571
554 Job job; 572 JobEngine.Job job;
555 while ((job = m_incomingSceneObjectEngine.RemoveNextRequest()) != null) 573 while ((job = m_incomingSceneObjectEngine.RemoveNextJob()) != null)
556 { 574 {
557 if (job.CommonId != commonIdToRemove) 575 if (job.CommonId != commonIdToRemove)
558 jobsToReinsert.Add(job); 576 jobsToReinsert.Add(job);
@@ -566,8 +584,8 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
566 584
567 if (jobsToReinsert.Count > 0) 585 if (jobsToReinsert.Count > 0)
568 { 586 {
569 foreach (Job jobToReinsert in jobsToReinsert) 587 foreach (JobEngine.Job jobToReinsert in jobsToReinsert)
570 m_incomingSceneObjectEngine.QueueRequest(jobToReinsert); 588 m_incomingSceneObjectEngine.QueueJob(jobToReinsert);
571 } 589 }
572 } 590 }
573 591
@@ -594,10 +612,9 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
594 { 612 {
595 if (aCircuit.ServiceURLs != null && aCircuit.ServiceURLs.ContainsKey("AssetServerURI")) 613 if (aCircuit.ServiceURLs != null && aCircuit.ServiceURLs.ContainsKey("AssetServerURI"))
596 { 614 {
597 m_incomingSceneObjectEngine.QueueRequest( 615 m_incomingSceneObjectEngine.QueueJob(
598 string.Format("HG UUID Gather for attachment {0} for {1}", so.Name, aCircuit.Name), 616 string.Format("HG UUID Gather for attachment {0} for {1}", so.Name, aCircuit.Name),
599 so.OwnerID.ToString(), 617 () =>
600 o =>
601 { 618 {
602 string url = aCircuit.ServiceURLs["AssetServerURI"].ToString(); 619 string url = aCircuit.ServiceURLs["AssetServerURI"].ToString();
603 // m_log.DebugFormat( 620 // m_log.DebugFormat(
@@ -663,8 +680,8 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
663 // m_log.DebugFormat( 680 // m_log.DebugFormat(
664 // "[HG ENTITY TRANSFER MODULE]: Completed incoming attachment {0} for HG user {1} with asset server {2}", 681 // "[HG ENTITY TRANSFER MODULE]: Completed incoming attachment {0} for HG user {1} with asset server {2}",
665 // so.Name, so.OwnerID, url); 682 // so.Name, so.OwnerID, url);
666 }, 683 },
667 null); 684 so.OwnerID.ToString());
668 } 685 }
669 } 686 }
670 } 687 }
diff --git a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs
deleted file mode 100644
index f62e7f4..0000000
--- a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs
+++ /dev/null
@@ -1,344 +0,0 @@
1/*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28using System;
29using System.Collections.Concurrent;
30using System.Reflection;
31using System.Threading;
32using log4net;
33using OpenSim.Framework;
34using OpenSim.Framework.Monitoring;
35using OpenSim.Region.Framework.Scenes;
36
37namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
38{
39 public class Job
40 {
41 public string Name { get; private set; }
42 public string CommonId { get; private set; }
43 public WaitCallback Callback { get; private set; }
44 public object O { get; private set; }
45
46 public Job(string name, string commonId, WaitCallback callback, object o)
47 {
48 Name = name;
49 CommonId = commonId;
50 Callback = callback;
51 O = o;
52 }
53 }
54
55 // TODO: These kinds of classes MUST be generalized with JobEngine, etc.
56 public class HGIncomingSceneObjectEngine
57 {
58 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
59
60 public int LogLevel { get; set; }
61
62 public bool IsRunning { get; private set; }
63
64 public string Name { get; set; }
65
66 /// <summary>
67 /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
68 /// </summary>
69 public int RequestProcessTimeoutOnStop { get; set; }
70
71 /// <summary>
72 /// Controls whether we need to warn in the log about exceeding the max queue size.
73 /// </summary>
74 /// <remarks>
75 /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
76 /// order to avoid spamming the log with lots of warnings.
77 /// </remarks>
78 private bool m_warnOverMaxQueue = true;
79
80 private BlockingCollection<Job> m_requestQueue;
81
82 private CancellationTokenSource m_cancelSource = new CancellationTokenSource();
83
84 private Stat m_requestsWaitingStat;
85
86 private Job m_currentJob;
87
88 /// <summary>
89 /// Used to signal that we are ready to complete stop.
90 /// </summary>
91 private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
92
93 public HGIncomingSceneObjectEngine(string name)
94 {
95// LogLevel = 1;
96 Name = name;
97 RequestProcessTimeoutOnStop = 5000;
98
99// MainConsole.Instance.Commands.AddCommand(
100// "Debug",
101// false,
102// "debug jobengine",
103// "debug jobengine <start|stop|status>",
104// "Start, stop or get status of the job engine.",
105// "If stopped then all jobs are processed immediately.",
106// HandleControlCommand);
107 }
108
109 public void Start()
110 {
111 lock (this)
112 {
113 if (IsRunning)
114 return;
115
116 IsRunning = true;
117
118 m_finishedProcessingAfterStop.Reset();
119
120 m_requestQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000);
121
122 m_requestsWaitingStat =
123 new Stat(
124 "HGIncomingAttachmentsWaiting",
125 "Number of incoming attachments waiting for processing.",
126 "",
127 "",
128 "entitytransfer",
129 Name,
130 StatType.Pull,
131 MeasuresOfInterest.None,
132 stat => stat.Value = m_requestQueue.Count,
133 StatVerbosity.Debug);
134
135 StatsManager.RegisterStat(m_requestsWaitingStat);
136
137 WorkManager.StartThread(
138 ProcessRequests,
139 string.Format("HG Incoming Scene Object Engine Thread ({0})", Name),
140 ThreadPriority.Normal,
141 false,
142 true,
143 null,
144 int.MaxValue);
145 }
146 }
147
148 public void Stop()
149 {
150 lock (this)
151 {
152 try
153 {
154 if (!IsRunning)
155 return;
156
157 IsRunning = false;
158
159 int requestsLeft = m_requestQueue.Count;
160
161 if (requestsLeft <= 0)
162 {
163 m_cancelSource.Cancel();
164 }
165 else
166 {
167 m_log.InfoFormat("[HG INCOMING SCENE OBJECT ENGINE]: Waiting to write {0} events after stop.", requestsLeft);
168
169 while (requestsLeft > 0)
170 {
171 if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
172 {
173 // After timeout no events have been written
174 if (requestsLeft == m_requestQueue.Count)
175 {
176 m_log.WarnFormat(
177 "[HG INCOMING SCENE OBJECT ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests",
178 RequestProcessTimeoutOnStop, requestsLeft);
179
180 break;
181 }
182 }
183
184 requestsLeft = m_requestQueue.Count;
185 }
186 }
187 }
188 finally
189 {
190 m_cancelSource.Dispose();
191 StatsManager.DeregisterStat(m_requestsWaitingStat);
192 m_requestsWaitingStat = null;
193 m_requestQueue = null;
194 }
195 }
196 }
197
198 public Job RemoveNextRequest()
199 {
200 Job nextRequest;
201 m_requestQueue.TryTake(out nextRequest);
202
203 return nextRequest;
204 }
205
206 public bool QueueRequest(string name, string commonId, WaitCallback req, object o)
207 {
208 return QueueRequest(new Job(name, commonId, req, o));
209 }
210
211 public bool QueueRequest(Job job)
212 {
213 if (LogLevel >= 1)
214 m_log.DebugFormat(
215 "[HG INCOMING SCENE OBJECT ENGINE]: Queued job {0}, common ID {1}", job.Name, job.CommonId);
216
217 if (m_requestQueue.Count < m_requestQueue.BoundedCapacity)
218 {
219 // m_log.DebugFormat(
220 // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}",
221 // categories, client.AgentID, m_udpServer.Scene.Name);
222
223 m_requestQueue.Add(job);
224
225 if (!m_warnOverMaxQueue)
226 m_warnOverMaxQueue = true;
227
228 return true;
229 }
230 else
231 {
232 if (m_warnOverMaxQueue)
233 {
234 // m_log.WarnFormat(
235 // "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}",
236 // client.AgentID, m_udpServer.Scene.Name);
237
238 m_log.WarnFormat("[HG INCOMING SCENE OBJECT ENGINE]: Request queue at maximum capacity, not recording job");
239
240 m_warnOverMaxQueue = false;
241 }
242
243 return false;
244 }
245 }
246
247 private void ProcessRequests()
248 {
249 try
250 {
251 while (IsRunning || m_requestQueue.Count > 0)
252 {
253 m_currentJob = m_requestQueue.Take(m_cancelSource.Token);
254
255 // QueueEmpty callback = req.Client.OnQueueEmpty;
256 //
257 // if (callback != null)
258 // {
259 // try
260 // {
261 // callback(req.Categories);
262 // }
263 // catch (Exception e)
264 // {
265 // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e);
266 // }
267 // }
268
269 if (LogLevel >= 1)
270 m_log.DebugFormat("[HG INCOMING SCENE OBJECT ENGINE]: Processing job {0}", m_currentJob.Name);
271
272 try
273 {
274 m_currentJob.Callback.Invoke(m_currentJob.O);
275 }
276 catch (Exception e)
277 {
278 m_log.Error(
279 string.Format(
280 "[HG INCOMING SCENE OBJECT ENGINE]: Job {0} failed, continuing. Exception ", m_currentJob.Name), e);
281 }
282
283 if (LogLevel >= 1)
284 m_log.DebugFormat("[HG INCOMING SCENE OBJECT ENGINE]: Processed job {0}", m_currentJob.Name);
285
286 m_currentJob = null;
287 }
288 }
289 catch (OperationCanceledException)
290 {
291 }
292
293 m_finishedProcessingAfterStop.Set();
294 }
295
296// private void HandleControlCommand(string module, string[] args)
297// {
298// // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
299// // return;
300//
301// if (args.Length < 3)
302// {
303// MainConsole.Instance.Output("Usage: debug jobengine <stop|start|status|loglevel>");
304// return;
305// }
306//
307// string subCommand = args[2];
308//
309// if (subCommand == "stop")
310// {
311// Stop();
312// MainConsole.Instance.OutputFormat("Stopped job engine.");
313// }
314// else if (subCommand == "start")
315// {
316// Start();
317// MainConsole.Instance.OutputFormat("Started job engine.");
318// }
319// else if (subCommand == "status")
320// {
321// MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning);
322// MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none");
323// MainConsole.Instance.OutputFormat(
324// "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a");
325// MainConsole.Instance.OutputFormat("Log Level: {0}", LogLevel);
326// }
327//
328// else if (subCommand == "loglevel")
329// {
330// // int logLevel;
331// int logLevel = int.Parse(args[3]);
332// // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel))
333// // {
334// LogLevel = logLevel;
335// MainConsole.Instance.OutputFormat("Set log level to {0}", LogLevel);
336// // }
337// }
338// else
339// {
340// MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand);
341// }
342// }
343 }
344}