From 8e1e8a0920a9e94305619e9afb8e053b4daefb89 Mon Sep 17 00:00:00 2001
From: Justin Clark-Casey (justincc)
Date: Mon, 12 Jan 2015 20:56:37 +0000
Subject: Make the performance controlling job processing threads introduced in
conference code use a generic JobEngine class rather than 4 slightly
different copy/pasted versions.
---
.../UDP/IncomingPacketAsyncHandlingEngine.cs | 328 --------------------
.../Region/ClientStack/Linden/UDP/LLClientView.cs | 5 +-
.../Region/ClientStack/Linden/UDP/LLUDPClient.cs | 2 +-
.../Region/ClientStack/Linden/UDP/LLUDPServer.cs | 52 +++-
.../ClientStack/Linden/UDP/LLUDPServerCommands.cs | 46 +++
.../Linden/UDP/OutgoingQueueRefillEngine.cs | 288 -----------------
.../EntityTransfer/HGEntityTransferModule.cs | 41 ++-
.../EntityTransfer/HGIncomingSceneObjectEngine.cs | 344 ---------------------
8 files changed, 121 insertions(+), 985 deletions(-)
delete mode 100644 OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs
delete mode 100644 OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs
delete mode 100644 OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs
(limited to 'OpenSim/Region')
diff --git a/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs b/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs
deleted file mode 100644
index 6f40b24..0000000
--- a/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Copyright (c) Contributors, http://opensimulator.org/
- * See CONTRIBUTORS.TXT for a full list of copyright holders.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- * * Neither the name of the OpenSimulator Project nor the
- * names of its contributors may be used to endorse or promote products
- * derived from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
- * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
- * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Reflection;
-using System.Threading;
-using log4net;
-using OpenSim.Framework;
-using OpenSim.Framework.Monitoring;
-using OpenSim.Region.Framework.Scenes;
-
-namespace OpenSim.Region.ClientStack.LindenUDP
-{
- public class Job
- {
- public string Name;
- public WaitCallback Callback;
- public object O;
-
- public Job(string name, WaitCallback callback, object o)
- {
- Name = name;
- Callback = callback;
- O = o;
- }
- }
-
- // TODO: These kinds of classes MUST be generalized with JobEngine, etc.
- public class IncomingPacketAsyncHandlingEngine
- {
- private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
-
- public int LogLevel { get; set; }
-
- public bool IsRunning { get; private set; }
-
- ///
- /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
- ///
- public int RequestProcessTimeoutOnStop { get; set; }
-
- ///
- /// Controls whether we need to warn in the log about exceeding the max queue size.
- ///
- ///
- /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
- /// order to avoid spamming the log with lots of warnings.
- ///
- private bool m_warnOverMaxQueue = true;
-
- private BlockingCollection m_requestQueue;
-
- private CancellationTokenSource m_cancelSource = new CancellationTokenSource();
-
- private LLUDPServer m_udpServer;
-
- private Stat m_requestsWaitingStat;
-
- private Job m_currentJob;
-
- ///
- /// Used to signal that we are ready to complete stop.
- ///
- private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
-
- public IncomingPacketAsyncHandlingEngine(LLUDPServer server)
- {
- //LogLevel = 1;
- m_udpServer = server;
- RequestProcessTimeoutOnStop = 5000;
-
- // MainConsole.Instance.Commands.AddCommand(
- // "Debug",
- // false,
- // "debug jobengine",
- // "debug jobengine ",
- // "Start, stop or get status of the job engine.",
- // "If stopped then all jobs are processed immediately.",
- // HandleControlCommand);
- }
-
- public void Start()
- {
- lock (this)
- {
- if (IsRunning)
- return;
-
- IsRunning = true;
-
- m_finishedProcessingAfterStop.Reset();
-
- m_requestQueue = new BlockingCollection(new ConcurrentQueue(), 5000);
-
- m_requestsWaitingStat =
- new Stat(
- "IncomingPacketAsyncRequestsWaiting",
- "Number of incoming packets waiting for async processing in engine.",
- "",
- "",
- "clientstack",
- m_udpServer.Scene.Name,
- StatType.Pull,
- MeasuresOfInterest.None,
- stat => stat.Value = m_requestQueue.Count,
- StatVerbosity.Debug);
-
- StatsManager.RegisterStat(m_requestsWaitingStat);
-
- WorkManager.StartThread(
- ProcessRequests,
- string.Format("Incoming Packet Async Handling Engine Thread ({0})", m_udpServer.Scene.Name),
- ThreadPriority.Normal,
- false,
- true,
- null,
- int.MaxValue);
- }
- }
-
- public void Stop()
- {
- lock (this)
- {
- try
- {
- if (!IsRunning)
- return;
-
- IsRunning = false;
-
- int requestsLeft = m_requestQueue.Count;
-
- if (requestsLeft <= 0)
- {
- m_cancelSource.Cancel();
- }
- else
- {
- m_log.InfoFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Waiting to write {0} events after stop.", requestsLeft);
-
- while (requestsLeft > 0)
- {
- if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
- {
- // After timeout no events have been written
- if (requestsLeft == m_requestQueue.Count)
- {
- m_log.WarnFormat(
- "[INCOMING PACKET ASYNC HANDLING ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests",
- RequestProcessTimeoutOnStop, requestsLeft);
-
- break;
- }
- }
-
- requestsLeft = m_requestQueue.Count;
- }
- }
- }
- finally
- {
- m_cancelSource.Dispose();
- StatsManager.DeregisterStat(m_requestsWaitingStat);
- m_requestsWaitingStat = null;
- m_requestQueue = null;
- }
- }
- }
-
- public bool QueueRequest(string name, WaitCallback req, object o)
- {
- if (LogLevel >= 1)
- m_log.DebugFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Queued job {0}", name);
-
- if (m_requestQueue.Count < m_requestQueue.BoundedCapacity)
- {
- // m_log.DebugFormat(
- // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}",
- // categories, client.AgentID, m_udpServer.Scene.Name);
-
- m_requestQueue.Add(new Job(name, req, o));
-
- if (!m_warnOverMaxQueue)
- m_warnOverMaxQueue = true;
-
- return true;
- }
- else
- {
- if (m_warnOverMaxQueue)
- {
- // m_log.WarnFormat(
- // "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}",
- // client.AgentID, m_udpServer.Scene.Name);
-
- m_log.WarnFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Request queue at maximum capacity, not recording job");
-
- m_warnOverMaxQueue = false;
- }
-
- return false;
- }
- }
-
- private void ProcessRequests()
- {
- try
- {
- while (IsRunning || m_requestQueue.Count > 0)
- {
- m_currentJob = m_requestQueue.Take(m_cancelSource.Token);
-
- // QueueEmpty callback = req.Client.OnQueueEmpty;
- //
- // if (callback != null)
- // {
- // try
- // {
- // callback(req.Categories);
- // }
- // catch (Exception e)
- // {
- // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e);
- // }
- // }
-
- if (LogLevel >= 1)
- m_log.DebugFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Processing job {0}", m_currentJob.Name);
-
- try
- {
- m_currentJob.Callback.Invoke(m_currentJob.O);
- }
- catch (Exception e)
- {
- m_log.Error(
- string.Format(
- "[INCOMING PACKET ASYNC HANDLING ENGINE]: Job {0} failed, continuing. Exception ", m_currentJob.Name), e);
- }
-
- if (LogLevel >= 1)
- m_log.DebugFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Processed job {0}", m_currentJob.Name);
-
- m_currentJob = null;
- }
- }
- catch (OperationCanceledException)
- {
- }
-
- m_finishedProcessingAfterStop.Set();
- }
-
- // private void HandleControlCommand(string module, string[] args)
- // {
- // // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
- // // return;
- //
- // if (args.Length < 3)
- // {
- // MainConsole.Instance.Output("Usage: debug jobengine ");
- // return;
- // }
- //
- // string subCommand = args[2];
- //
- // if (subCommand == "stop")
- // {
- // Stop();
- // MainConsole.Instance.OutputFormat("Stopped job engine.");
- // }
- // else if (subCommand == "start")
- // {
- // Start();
- // MainConsole.Instance.OutputFormat("Started job engine.");
- // }
- // else if (subCommand == "status")
- // {
- // MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning);
- // MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none");
- // MainConsole.Instance.OutputFormat(
- // "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a");
- // MainConsole.Instance.OutputFormat("Log Level: {0}", LogLevel);
- // }
- //
- // else if (subCommand == "loglevel")
- // {
- // // int logLevel;
- // int logLevel = int.Parse(args[3]);
- // // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel))
- // // {
- // LogLevel = logLevel;
- // MainConsole.Instance.OutputFormat("Set log level to {0}", LogLevel);
- // // }
- // }
- // else
- // {
- // MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand);
- // }
- // }
- }
-}
diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs
index 5da0ca1..bb4f8a7 100644
--- a/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs
+++ b/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs
@@ -724,10 +724,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
object obj = new AsyncPacketProcess(this, pprocessor.method, packet);
if (pprocessor.InEngine)
- m_udpServer.IpahEngine.QueueRequest(
- packet.Type.ToString(),
- ProcessSpecificPacketAsync,
- obj);
+ m_udpServer.IpahEngine.QueueJob(packet.Type.ToString(), () => ProcessSpecificPacketAsync(obj));
else
Util.FireAndForget(ProcessSpecificPacketAsync, obj, packet.Type.ToString());
diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs
index de91856..ce6e3ee 100644
--- a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs
+++ b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs
@@ -736,7 +736,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
}
else
{
- m_udpServer.OqrEngine.QueueRequest(this, categories);
+ m_udpServer.OqrEngine.QueueJob(AgentID.ToString(), () => FireQueueEmpty(categories));
}
}
else
diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs
index 2f97516..7bd16e6 100644
--- a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs
+++ b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs
@@ -367,14 +367,17 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// Queue some low priority but potentially high volume async requests so that they don't overwhelm available
/// threadpool threads.
///
- public IncomingPacketAsyncHandlingEngine IpahEngine { get; private set; }
+ public JobEngine IpahEngine { get; private set; }
///
- /// Experimental facility to run queue empty processing within a controlled number of threads rather than
- /// requiring massive numbers of short-lived threads from the threadpool when there are a high number of
- /// connections.
+ /// Run queue empty processing within a single persistent thread.
///
- public OutgoingQueueRefillEngine OqrEngine { get; private set; }
+ ///
+ /// This is the alternative to having every
+ /// connection schedule its own job in the threadpool which causes performance problems when there are many
+ /// connections.
+ ///
+ public JobEngine OqrEngine { get; private set; }
public LLUDPServer(
IPAddress listenIP, ref uint port, int proxyPortOffsetParm, bool allow_alternate_port,
@@ -459,9 +462,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if (usePools)
EnablePools();
-
- IpahEngine = new IncomingPacketAsyncHandlingEngine(this);
- OqrEngine = new OutgoingQueueRefillEngine(this);
}
public void Start()
@@ -633,6 +633,16 @@ namespace OpenSim.Region.ClientStack.LindenUDP
Scene = (Scene)scene;
m_location = new Location(Scene.RegionInfo.RegionHandle);
+
+ IpahEngine
+ = new JobEngine(
+ string.Format("Incoming Packet Async Handling Engine ({0})", Scene.Name),
+ "INCOMING PACKET ASYNC HANDLING ENGINE");
+
+ OqrEngine
+ = new JobEngine(
+ string.Format("Outgoing Queue Refill Engine ({0})", Scene.Name),
+ "OUTGOING QUEUE REFILL ENGINE");
StatsManager.RegisterStat(
new Stat(
@@ -713,6 +723,32 @@ namespace OpenSim.Region.ClientStack.LindenUDP
MeasuresOfInterest.AverageChangeOverTime,
stat => stat.Value = GetTotalQueuedOutgoingPackets(),
StatVerbosity.Info));
+
+ StatsManager.RegisterStat(
+ new Stat(
+ "IncomingPacketAsyncRequestsWaiting",
+ "Number of incoming packets waiting for async processing in engine.",
+ "",
+ "",
+ "clientstack",
+ Scene.Name,
+ StatType.Pull,
+ MeasuresOfInterest.None,
+ stat => stat.Value = IpahEngine.JobsWaiting,
+ StatVerbosity.Debug));
+
+ StatsManager.RegisterStat(
+ new Stat(
+ "OQRERequestsWaiting",
+ "Number of outgong queue refill requests waiting for processing.",
+ "",
+ "",
+ "clientstack",
+ Scene.Name,
+ StatType.Pull,
+ MeasuresOfInterest.None,
+ stat => stat.Value = OqrEngine.JobsWaiting,
+ StatVerbosity.Debug));
// We delay enabling pool stats to AddScene() instead of Initialize() so that we can distinguish pool stats by
// scene name
diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServerCommands.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServerCommands.cs
index e0398d5..17a394d 100644
--- a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServerCommands.cs
+++ b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServerCommands.cs
@@ -186,6 +186,15 @@ namespace OpenSim.Region.ClientStack.LindenUDP
"debug lludp toggle agentupdate",
"Toggle whether agentupdate packets are processed or simply discarded.",
HandleAgentUpdateCommand);
+
+ MainConsole.Instance.Commands.AddCommand(
+ "Debug",
+ false,
+ "debug lludp oqre",
+ "debug lludp oqre ",
+ "Start, stop or get status of OutgoingQueueRefillEngine.",
+ "If stopped then refill requests are processed directly via the threadpool.",
+ HandleOqreCommand);
}
private void HandleShowServerThrottlesCommand(string module, string[] args)
@@ -758,5 +767,42 @@ namespace OpenSim.Region.ClientStack.LindenUDP
MainConsole.Instance.OutputFormat(
"Packet debug level for new clients is {0}", m_udpServer.DefaultClientPacketDebugLevel);
}
+
+ private void HandleOqreCommand(string module, string[] args)
+ {
+ if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
+ return;
+
+ if (args.Length != 4)
+ {
+ MainConsole.Instance.Output("Usage: debug lludp oqre ");
+ return;
+ }
+
+ string subCommand = args[3];
+
+ if (subCommand == "stop")
+ {
+ m_udpServer.OqrEngine.Stop();
+ MainConsole.Instance.OutputFormat("Stopped OQRE for {0}", m_udpServer.Scene.Name);
+ }
+ else if (subCommand == "start")
+ {
+ m_udpServer.OqrEngine.Start();
+ MainConsole.Instance.OutputFormat("Started OQRE for {0}", m_udpServer.Scene.Name);
+ }
+ else if (subCommand == "status")
+ {
+ MainConsole.Instance.OutputFormat("OQRE in {0}", m_udpServer.Scene.Name);
+ MainConsole.Instance.OutputFormat("Running: {0}", m_udpServer.OqrEngine.IsRunning);
+ MainConsole.Instance.OutputFormat(
+ "Requests waiting: {0}",
+ m_udpServer.OqrEngine.IsRunning ? m_udpServer.OqrEngine.JobsWaiting.ToString() : "n/a");
+ }
+ else
+ {
+ MainConsole.Instance.OutputFormat("Unrecognized OQRE subcommand {0}", subCommand);
+ }
+ }
}
}
\ No newline at end of file
diff --git a/OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs b/OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs
deleted file mode 100644
index 1e915c3..0000000
--- a/OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Copyright (c) Contributors, http://opensimulator.org/
- * See CONTRIBUTORS.TXT for a full list of copyright holders.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- * * Neither the name of the OpenSimulator Project nor the
- * names of its contributors may be used to endorse or promote products
- * derived from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
- * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
- * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Reflection;
-using System.Threading;
-using log4net;
-using OpenSim.Framework;
-using OpenSim.Framework.Monitoring;
-using OpenSim.Region.Framework.Scenes;
-
-namespace OpenSim.Region.ClientStack.LindenUDP
-{
- public struct RefillRequest
- {
- public LLUDPClient Client;
- public ThrottleOutPacketTypeFlags Categories;
-
- public RefillRequest(LLUDPClient client, ThrottleOutPacketTypeFlags categories)
- {
- Client = client;
- Categories = categories;
- }
- }
-
- public class OutgoingQueueRefillEngine
- {
- private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
-
- public bool IsRunning { get; private set; }
-
- ///
- /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
- ///
- public int RequestProcessTimeoutOnStop { get; set; }
-
- ///
- /// Controls whether we need to warn in the log about exceeding the max queue size.
- ///
- ///
- /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
- /// order to avoid spamming the log with lots of warnings.
- ///
- private bool m_warnOverMaxQueue = true;
-
- private BlockingCollection m_requestQueue;
-
- private CancellationTokenSource m_cancelSource = new CancellationTokenSource();
-
- private LLUDPServer m_udpServer;
-
- private Stat m_oqreRequestsWaitingStat;
-
- ///
- /// Used to signal that we are ready to complete stop.
- ///
- private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
-
- public OutgoingQueueRefillEngine(LLUDPServer server)
- {
- RequestProcessTimeoutOnStop = 5000;
- m_udpServer = server;
-
- MainConsole.Instance.Commands.AddCommand(
- "Debug",
- false,
- "debug lludp oqre",
- "debug lludp oqre ",
- "Start, stop or get status of OutgoingQueueRefillEngine.",
- "If stopped then refill requests are processed directly via the threadpool.",
- HandleOqreCommand);
- }
-
- public void Start()
- {
- lock (this)
- {
- if (IsRunning)
- return;
-
- IsRunning = true;
-
- m_finishedProcessingAfterStop.Reset();
-
- m_requestQueue = new BlockingCollection(new ConcurrentQueue(), 5000);
-
- m_oqreRequestsWaitingStat =
- new Stat(
- "OQRERequestsWaiting",
- "Number of outgong queue refill requests waiting for processing.",
- "",
- "",
- "clientstack",
- m_udpServer.Scene.Name,
- StatType.Pull,
- MeasuresOfInterest.None,
- stat => stat.Value = m_requestQueue.Count,
- StatVerbosity.Debug);
-
- StatsManager.RegisterStat(m_oqreRequestsWaitingStat);
-
- WorkManager.StartThread(
- ProcessRequests,
- String.Format("OutgoingQueueRefillEngineThread ({0})", m_udpServer.Scene.Name),
- ThreadPriority.Normal,
- false,
- true,
- null,
- int.MaxValue);
- }
- }
-
- public void Stop()
- {
- lock (this)
- {
- try
- {
- if (!IsRunning)
- return;
-
- IsRunning = false;
-
- int requestsLeft = m_requestQueue.Count;
-
- if (requestsLeft <= 0)
- {
- m_cancelSource.Cancel();
- }
- else
- {
- m_log.InfoFormat("[OUTGOING QUEUE REFILL ENGINE]: Waiting to write {0} events after stop.", requestsLeft);
-
- while (requestsLeft > 0)
- {
- if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
- {
- // After timeout no events have been written
- if (requestsLeft == m_requestQueue.Count)
- {
- m_log.WarnFormat(
- "[OUTGOING QUEUE REFILL ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests",
- RequestProcessTimeoutOnStop, requestsLeft);
-
- break;
- }
- }
-
- requestsLeft = m_requestQueue.Count;
- }
- }
- }
- finally
- {
- m_cancelSource.Dispose();
- StatsManager.DeregisterStat(m_oqreRequestsWaitingStat);
- m_oqreRequestsWaitingStat = null;
- m_requestQueue = null;
- }
- }
- }
-
- public bool QueueRequest(LLUDPClient client, ThrottleOutPacketTypeFlags categories)
- {
- if (m_requestQueue.Count < m_requestQueue.BoundedCapacity)
- {
-// m_log.DebugFormat(
-// "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}",
-// categories, client.AgentID, m_udpServer.Scene.Name);
-
- m_requestQueue.Add(new RefillRequest(client, categories));
-
- if (!m_warnOverMaxQueue)
- m_warnOverMaxQueue = true;
-
- return true;
- }
- else
- {
- if (m_warnOverMaxQueue)
- {
- m_log.WarnFormat(
- "[OUTGOING QUEUE REFILL ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}",
- client.AgentID, m_udpServer.Scene.Name);
-
- m_warnOverMaxQueue = false;
- }
-
- return false;
- }
- }
-
- private void ProcessRequests()
- {
- Thread.CurrentThread.Priority = ThreadPriority.Highest;
-
- try
- {
- while (IsRunning || m_requestQueue.Count > 0)
- {
- RefillRequest req = m_requestQueue.Take(m_cancelSource.Token);
-
- // QueueEmpty callback = req.Client.OnQueueEmpty;
- //
- // if (callback != null)
- // {
- // try
- // {
- // callback(req.Categories);
- // }
- // catch (Exception e)
- // {
- // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e);
- // }
- // }
-
- req.Client.FireQueueEmpty(req.Categories);
- }
- }
- catch (OperationCanceledException)
- {
- }
-
- m_finishedProcessingAfterStop.Set();
- }
-
- private void HandleOqreCommand(string module, string[] args)
- {
- if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
- return;
-
- if (args.Length != 4)
- {
- MainConsole.Instance.Output("Usage: debug lludp oqre ");
- return;
- }
-
- string subCommand = args[3];
-
- if (subCommand == "stop")
- {
- Stop();
- MainConsole.Instance.OutputFormat("Stopped OQRE for {0}", m_udpServer.Scene.Name);
- }
- else if (subCommand == "start")
- {
- Start();
- MainConsole.Instance.OutputFormat("Started OQRE for {0}", m_udpServer.Scene.Name);
- }
- else if (subCommand == "status")
- {
- MainConsole.Instance.OutputFormat("OQRE in {0}", m_udpServer.Scene.Name);
- MainConsole.Instance.OutputFormat("Running: {0}", IsRunning);
- MainConsole.Instance.OutputFormat(
- "Requests waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a");
- }
- else
- {
- MainConsole.Instance.OutputFormat("Unrecognized OQRE subcommand {0}", subCommand);
- }
- }
- }
-}
\ No newline at end of file
diff --git a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs
index fceda80..fa23590 100644
--- a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs
+++ b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs
@@ -31,6 +31,7 @@ using System.Reflection;
using OpenSim.Framework;
using OpenSim.Framework.Client;
+using OpenSim.Framework.Monitoring;
using OpenSim.Region.Framework.Interfaces;
using OpenSim.Region.Framework.Scenes;
using OpenSim.Services.Connectors.Hypergrid;
@@ -113,7 +114,7 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
///
/// Used for processing analysis of incoming attachments in a controlled fashion.
///
- private HGIncomingSceneObjectEngine m_incomingSceneObjectEngine;
+ private JobEngine m_incomingSceneObjectEngine;
#region ISharedRegionModule
@@ -160,7 +161,24 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
scene.RegisterModuleInterface(this);
//scene.EventManager.OnIncomingSceneObject += OnIncomingSceneObject;
- m_incomingSceneObjectEngine = new HGIncomingSceneObjectEngine(scene.Name);
+ m_incomingSceneObjectEngine
+ = new JobEngine(
+ string.Format("HG Incoming Scene Object Engine ({0})", scene.Name),
+ "HG INCOMING SCENE OBJECT ENGINE");
+
+ StatsManager.RegisterStat(
+ new Stat(
+ "HGIncomingAttachmentsWaiting",
+ "Number of incoming attachments waiting for processing.",
+ "",
+ "",
+ "entitytransfer",
+ Name,
+ StatType.Pull,
+ MeasuresOfInterest.None,
+ stat => stat.Value = m_incomingSceneObjectEngine.JobsWaiting,
+ StatVerbosity.Debug));
+
m_incomingSceneObjectEngine.Start();
}
}
@@ -548,11 +566,11 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
private void RemoveIncomingSceneObjectJobs(string commonIdToRemove)
{
- List jobsToReinsert = new List();
+ List jobsToReinsert = new List();
int jobsRemoved = 0;
- Job job;
- while ((job = m_incomingSceneObjectEngine.RemoveNextRequest()) != null)
+ JobEngine.Job job;
+ while ((job = m_incomingSceneObjectEngine.RemoveNextJob()) != null)
{
if (job.CommonId != commonIdToRemove)
jobsToReinsert.Add(job);
@@ -566,8 +584,8 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
if (jobsToReinsert.Count > 0)
{
- foreach (Job jobToReinsert in jobsToReinsert)
- m_incomingSceneObjectEngine.QueueRequest(jobToReinsert);
+ foreach (JobEngine.Job jobToReinsert in jobsToReinsert)
+ m_incomingSceneObjectEngine.QueueJob(jobToReinsert);
}
}
@@ -594,10 +612,9 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
{
if (aCircuit.ServiceURLs != null && aCircuit.ServiceURLs.ContainsKey("AssetServerURI"))
{
- m_incomingSceneObjectEngine.QueueRequest(
+ m_incomingSceneObjectEngine.QueueJob(
string.Format("HG UUID Gather for attachment {0} for {1}", so.Name, aCircuit.Name),
- so.OwnerID.ToString(),
- o =>
+ () =>
{
string url = aCircuit.ServiceURLs["AssetServerURI"].ToString();
// m_log.DebugFormat(
@@ -663,8 +680,8 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
// m_log.DebugFormat(
// "[HG ENTITY TRANSFER MODULE]: Completed incoming attachment {0} for HG user {1} with asset server {2}",
// so.Name, so.OwnerID, url);
- },
- null);
+ },
+ so.OwnerID.ToString());
}
}
}
diff --git a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs
deleted file mode 100644
index f62e7f4..0000000
--- a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Copyright (c) Contributors, http://opensimulator.org/
- * See CONTRIBUTORS.TXT for a full list of copyright holders.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- * * Neither the name of the OpenSimulator Project nor the
- * names of its contributors may be used to endorse or promote products
- * derived from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
- * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
- * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Reflection;
-using System.Threading;
-using log4net;
-using OpenSim.Framework;
-using OpenSim.Framework.Monitoring;
-using OpenSim.Region.Framework.Scenes;
-
-namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
-{
- public class Job
- {
- public string Name { get; private set; }
- public string CommonId { get; private set; }
- public WaitCallback Callback { get; private set; }
- public object O { get; private set; }
-
- public Job(string name, string commonId, WaitCallback callback, object o)
- {
- Name = name;
- CommonId = commonId;
- Callback = callback;
- O = o;
- }
- }
-
- // TODO: These kinds of classes MUST be generalized with JobEngine, etc.
- public class HGIncomingSceneObjectEngine
- {
- private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
-
- public int LogLevel { get; set; }
-
- public bool IsRunning { get; private set; }
-
- public string Name { get; set; }
-
- ///
- /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
- ///
- public int RequestProcessTimeoutOnStop { get; set; }
-
- ///
- /// Controls whether we need to warn in the log about exceeding the max queue size.
- ///
- ///
- /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
- /// order to avoid spamming the log with lots of warnings.
- ///
- private bool m_warnOverMaxQueue = true;
-
- private BlockingCollection m_requestQueue;
-
- private CancellationTokenSource m_cancelSource = new CancellationTokenSource();
-
- private Stat m_requestsWaitingStat;
-
- private Job m_currentJob;
-
- ///
- /// Used to signal that we are ready to complete stop.
- ///
- private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
-
- public HGIncomingSceneObjectEngine(string name)
- {
-// LogLevel = 1;
- Name = name;
- RequestProcessTimeoutOnStop = 5000;
-
-// MainConsole.Instance.Commands.AddCommand(
-// "Debug",
-// false,
-// "debug jobengine",
-// "debug jobengine ",
-// "Start, stop or get status of the job engine.",
-// "If stopped then all jobs are processed immediately.",
-// HandleControlCommand);
- }
-
- public void Start()
- {
- lock (this)
- {
- if (IsRunning)
- return;
-
- IsRunning = true;
-
- m_finishedProcessingAfterStop.Reset();
-
- m_requestQueue = new BlockingCollection(new ConcurrentQueue(), 5000);
-
- m_requestsWaitingStat =
- new Stat(
- "HGIncomingAttachmentsWaiting",
- "Number of incoming attachments waiting for processing.",
- "",
- "",
- "entitytransfer",
- Name,
- StatType.Pull,
- MeasuresOfInterest.None,
- stat => stat.Value = m_requestQueue.Count,
- StatVerbosity.Debug);
-
- StatsManager.RegisterStat(m_requestsWaitingStat);
-
- WorkManager.StartThread(
- ProcessRequests,
- string.Format("HG Incoming Scene Object Engine Thread ({0})", Name),
- ThreadPriority.Normal,
- false,
- true,
- null,
- int.MaxValue);
- }
- }
-
- public void Stop()
- {
- lock (this)
- {
- try
- {
- if (!IsRunning)
- return;
-
- IsRunning = false;
-
- int requestsLeft = m_requestQueue.Count;
-
- if (requestsLeft <= 0)
- {
- m_cancelSource.Cancel();
- }
- else
- {
- m_log.InfoFormat("[HG INCOMING SCENE OBJECT ENGINE]: Waiting to write {0} events after stop.", requestsLeft);
-
- while (requestsLeft > 0)
- {
- if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
- {
- // After timeout no events have been written
- if (requestsLeft == m_requestQueue.Count)
- {
- m_log.WarnFormat(
- "[HG INCOMING SCENE OBJECT ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests",
- RequestProcessTimeoutOnStop, requestsLeft);
-
- break;
- }
- }
-
- requestsLeft = m_requestQueue.Count;
- }
- }
- }
- finally
- {
- m_cancelSource.Dispose();
- StatsManager.DeregisterStat(m_requestsWaitingStat);
- m_requestsWaitingStat = null;
- m_requestQueue = null;
- }
- }
- }
-
- public Job RemoveNextRequest()
- {
- Job nextRequest;
- m_requestQueue.TryTake(out nextRequest);
-
- return nextRequest;
- }
-
- public bool QueueRequest(string name, string commonId, WaitCallback req, object o)
- {
- return QueueRequest(new Job(name, commonId, req, o));
- }
-
- public bool QueueRequest(Job job)
- {
- if (LogLevel >= 1)
- m_log.DebugFormat(
- "[HG INCOMING SCENE OBJECT ENGINE]: Queued job {0}, common ID {1}", job.Name, job.CommonId);
-
- if (m_requestQueue.Count < m_requestQueue.BoundedCapacity)
- {
- // m_log.DebugFormat(
- // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}",
- // categories, client.AgentID, m_udpServer.Scene.Name);
-
- m_requestQueue.Add(job);
-
- if (!m_warnOverMaxQueue)
- m_warnOverMaxQueue = true;
-
- return true;
- }
- else
- {
- if (m_warnOverMaxQueue)
- {
- // m_log.WarnFormat(
- // "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}",
- // client.AgentID, m_udpServer.Scene.Name);
-
- m_log.WarnFormat("[HG INCOMING SCENE OBJECT ENGINE]: Request queue at maximum capacity, not recording job");
-
- m_warnOverMaxQueue = false;
- }
-
- return false;
- }
- }
-
- private void ProcessRequests()
- {
- try
- {
- while (IsRunning || m_requestQueue.Count > 0)
- {
- m_currentJob = m_requestQueue.Take(m_cancelSource.Token);
-
- // QueueEmpty callback = req.Client.OnQueueEmpty;
- //
- // if (callback != null)
- // {
- // try
- // {
- // callback(req.Categories);
- // }
- // catch (Exception e)
- // {
- // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e);
- // }
- // }
-
- if (LogLevel >= 1)
- m_log.DebugFormat("[HG INCOMING SCENE OBJECT ENGINE]: Processing job {0}", m_currentJob.Name);
-
- try
- {
- m_currentJob.Callback.Invoke(m_currentJob.O);
- }
- catch (Exception e)
- {
- m_log.Error(
- string.Format(
- "[HG INCOMING SCENE OBJECT ENGINE]: Job {0} failed, continuing. Exception ", m_currentJob.Name), e);
- }
-
- if (LogLevel >= 1)
- m_log.DebugFormat("[HG INCOMING SCENE OBJECT ENGINE]: Processed job {0}", m_currentJob.Name);
-
- m_currentJob = null;
- }
- }
- catch (OperationCanceledException)
- {
- }
-
- m_finishedProcessingAfterStop.Set();
- }
-
-// private void HandleControlCommand(string module, string[] args)
-// {
-// // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
-// // return;
-//
-// if (args.Length < 3)
-// {
-// MainConsole.Instance.Output("Usage: debug jobengine ");
-// return;
-// }
-//
-// string subCommand = args[2];
-//
-// if (subCommand == "stop")
-// {
-// Stop();
-// MainConsole.Instance.OutputFormat("Stopped job engine.");
-// }
-// else if (subCommand == "start")
-// {
-// Start();
-// MainConsole.Instance.OutputFormat("Started job engine.");
-// }
-// else if (subCommand == "status")
-// {
-// MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning);
-// MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none");
-// MainConsole.Instance.OutputFormat(
-// "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a");
-// MainConsole.Instance.OutputFormat("Log Level: {0}", LogLevel);
-// }
-//
-// else if (subCommand == "loglevel")
-// {
-// // int logLevel;
-// int logLevel = int.Parse(args[3]);
-// // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel))
-// // {
-// LogLevel = logLevel;
-// MainConsole.Instance.OutputFormat("Set log level to {0}", LogLevel);
-// // }
-// }
-// else
-// {
-// MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand);
-// }
-// }
- }
-}
--
cgit v1.1