From 8e1e8a0920a9e94305619e9afb8e053b4daefb89 Mon Sep 17 00:00:00 2001 From: Justin Clark-Casey (justincc) Date: Mon, 12 Jan 2015 20:56:37 +0000 Subject: Make the performance controlling job processing threads introduced in conference code use a generic JobEngine class rather than 4 slightly different copy/pasted versions. --- .../UDP/IncomingPacketAsyncHandlingEngine.cs | 328 -------------------- .../Region/ClientStack/Linden/UDP/LLClientView.cs | 5 +- .../Region/ClientStack/Linden/UDP/LLUDPClient.cs | 2 +- .../Region/ClientStack/Linden/UDP/LLUDPServer.cs | 52 +++- .../ClientStack/Linden/UDP/LLUDPServerCommands.cs | 46 +++ .../Linden/UDP/OutgoingQueueRefillEngine.cs | 288 ----------------- .../EntityTransfer/HGEntityTransferModule.cs | 41 ++- .../EntityTransfer/HGIncomingSceneObjectEngine.cs | 344 --------------------- 8 files changed, 121 insertions(+), 985 deletions(-) delete mode 100644 OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs delete mode 100644 OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs delete mode 100644 OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs (limited to 'OpenSim/Region') diff --git a/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs b/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs deleted file mode 100644 index 6f40b24..0000000 --- a/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Copyright (c) Contributors, http://opensimulator.org/ - * See CONTRIBUTORS.TXT for a full list of copyright holders. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of the OpenSimulator Project nor the - * names of its contributors may be used to endorse or promote products - * derived from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -using System; -using System.Collections.Concurrent; -using System.Reflection; -using System.Threading; -using log4net; -using OpenSim.Framework; -using OpenSim.Framework.Monitoring; -using OpenSim.Region.Framework.Scenes; - -namespace OpenSim.Region.ClientStack.LindenUDP -{ - public class Job - { - public string Name; - public WaitCallback Callback; - public object O; - - public Job(string name, WaitCallback callback, object o) - { - Name = name; - Callback = callback; - O = o; - } - } - - // TODO: These kinds of classes MUST be generalized with JobEngine, etc. - public class IncomingPacketAsyncHandlingEngine - { - private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - - public int LogLevel { get; set; } - - public bool IsRunning { get; private set; } - - /// - /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping. - /// - public int RequestProcessTimeoutOnStop { get; set; } - - /// - /// Controls whether we need to warn in the log about exceeding the max queue size. - /// - /// - /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in - /// order to avoid spamming the log with lots of warnings. - /// - private bool m_warnOverMaxQueue = true; - - private BlockingCollection m_requestQueue; - - private CancellationTokenSource m_cancelSource = new CancellationTokenSource(); - - private LLUDPServer m_udpServer; - - private Stat m_requestsWaitingStat; - - private Job m_currentJob; - - /// - /// Used to signal that we are ready to complete stop. - /// - private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); - - public IncomingPacketAsyncHandlingEngine(LLUDPServer server) - { - //LogLevel = 1; - m_udpServer = server; - RequestProcessTimeoutOnStop = 5000; - - // MainConsole.Instance.Commands.AddCommand( - // "Debug", - // false, - // "debug jobengine", - // "debug jobengine ", - // "Start, stop or get status of the job engine.", - // "If stopped then all jobs are processed immediately.", - // HandleControlCommand); - } - - public void Start() - { - lock (this) - { - if (IsRunning) - return; - - IsRunning = true; - - m_finishedProcessingAfterStop.Reset(); - - m_requestQueue = new BlockingCollection(new ConcurrentQueue(), 5000); - - m_requestsWaitingStat = - new Stat( - "IncomingPacketAsyncRequestsWaiting", - "Number of incoming packets waiting for async processing in engine.", - "", - "", - "clientstack", - m_udpServer.Scene.Name, - StatType.Pull, - MeasuresOfInterest.None, - stat => stat.Value = m_requestQueue.Count, - StatVerbosity.Debug); - - StatsManager.RegisterStat(m_requestsWaitingStat); - - WorkManager.StartThread( - ProcessRequests, - string.Format("Incoming Packet Async Handling Engine Thread ({0})", m_udpServer.Scene.Name), - ThreadPriority.Normal, - false, - true, - null, - int.MaxValue); - } - } - - public void Stop() - { - lock (this) - { - try - { - if (!IsRunning) - return; - - IsRunning = false; - - int requestsLeft = m_requestQueue.Count; - - if (requestsLeft <= 0) - { - m_cancelSource.Cancel(); - } - else - { - m_log.InfoFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Waiting to write {0} events after stop.", requestsLeft); - - while (requestsLeft > 0) - { - if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) - { - // After timeout no events have been written - if (requestsLeft == m_requestQueue.Count) - { - m_log.WarnFormat( - "[INCOMING PACKET ASYNC HANDLING ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests", - RequestProcessTimeoutOnStop, requestsLeft); - - break; - } - } - - requestsLeft = m_requestQueue.Count; - } - } - } - finally - { - m_cancelSource.Dispose(); - StatsManager.DeregisterStat(m_requestsWaitingStat); - m_requestsWaitingStat = null; - m_requestQueue = null; - } - } - } - - public bool QueueRequest(string name, WaitCallback req, object o) - { - if (LogLevel >= 1) - m_log.DebugFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Queued job {0}", name); - - if (m_requestQueue.Count < m_requestQueue.BoundedCapacity) - { - // m_log.DebugFormat( - // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}", - // categories, client.AgentID, m_udpServer.Scene.Name); - - m_requestQueue.Add(new Job(name, req, o)); - - if (!m_warnOverMaxQueue) - m_warnOverMaxQueue = true; - - return true; - } - else - { - if (m_warnOverMaxQueue) - { - // m_log.WarnFormat( - // "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}", - // client.AgentID, m_udpServer.Scene.Name); - - m_log.WarnFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Request queue at maximum capacity, not recording job"); - - m_warnOverMaxQueue = false; - } - - return false; - } - } - - private void ProcessRequests() - { - try - { - while (IsRunning || m_requestQueue.Count > 0) - { - m_currentJob = m_requestQueue.Take(m_cancelSource.Token); - - // QueueEmpty callback = req.Client.OnQueueEmpty; - // - // if (callback != null) - // { - // try - // { - // callback(req.Categories); - // } - // catch (Exception e) - // { - // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e); - // } - // } - - if (LogLevel >= 1) - m_log.DebugFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Processing job {0}", m_currentJob.Name); - - try - { - m_currentJob.Callback.Invoke(m_currentJob.O); - } - catch (Exception e) - { - m_log.Error( - string.Format( - "[INCOMING PACKET ASYNC HANDLING ENGINE]: Job {0} failed, continuing. Exception ", m_currentJob.Name), e); - } - - if (LogLevel >= 1) - m_log.DebugFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Processed job {0}", m_currentJob.Name); - - m_currentJob = null; - } - } - catch (OperationCanceledException) - { - } - - m_finishedProcessingAfterStop.Set(); - } - - // private void HandleControlCommand(string module, string[] args) - // { - // // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene) - // // return; - // - // if (args.Length < 3) - // { - // MainConsole.Instance.Output("Usage: debug jobengine "); - // return; - // } - // - // string subCommand = args[2]; - // - // if (subCommand == "stop") - // { - // Stop(); - // MainConsole.Instance.OutputFormat("Stopped job engine."); - // } - // else if (subCommand == "start") - // { - // Start(); - // MainConsole.Instance.OutputFormat("Started job engine."); - // } - // else if (subCommand == "status") - // { - // MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning); - // MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none"); - // MainConsole.Instance.OutputFormat( - // "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a"); - // MainConsole.Instance.OutputFormat("Log Level: {0}", LogLevel); - // } - // - // else if (subCommand == "loglevel") - // { - // // int logLevel; - // int logLevel = int.Parse(args[3]); - // // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel)) - // // { - // LogLevel = logLevel; - // MainConsole.Instance.OutputFormat("Set log level to {0}", LogLevel); - // // } - // } - // else - // { - // MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand); - // } - // } - } -} diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs index 5da0ca1..bb4f8a7 100644 --- a/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs +++ b/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs @@ -724,10 +724,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP object obj = new AsyncPacketProcess(this, pprocessor.method, packet); if (pprocessor.InEngine) - m_udpServer.IpahEngine.QueueRequest( - packet.Type.ToString(), - ProcessSpecificPacketAsync, - obj); + m_udpServer.IpahEngine.QueueJob(packet.Type.ToString(), () => ProcessSpecificPacketAsync(obj)); else Util.FireAndForget(ProcessSpecificPacketAsync, obj, packet.Type.ToString()); diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs index de91856..ce6e3ee 100644 --- a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs +++ b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs @@ -736,7 +736,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP } else { - m_udpServer.OqrEngine.QueueRequest(this, categories); + m_udpServer.OqrEngine.QueueJob(AgentID.ToString(), () => FireQueueEmpty(categories)); } } else diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs index 2f97516..7bd16e6 100644 --- a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs +++ b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs @@ -367,14 +367,17 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Queue some low priority but potentially high volume async requests so that they don't overwhelm available /// threadpool threads. /// - public IncomingPacketAsyncHandlingEngine IpahEngine { get; private set; } + public JobEngine IpahEngine { get; private set; } /// - /// Experimental facility to run queue empty processing within a controlled number of threads rather than - /// requiring massive numbers of short-lived threads from the threadpool when there are a high number of - /// connections. + /// Run queue empty processing within a single persistent thread. /// - public OutgoingQueueRefillEngine OqrEngine { get; private set; } + /// + /// This is the alternative to having every + /// connection schedule its own job in the threadpool which causes performance problems when there are many + /// connections. + /// + public JobEngine OqrEngine { get; private set; } public LLUDPServer( IPAddress listenIP, ref uint port, int proxyPortOffsetParm, bool allow_alternate_port, @@ -459,9 +462,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP if (usePools) EnablePools(); - - IpahEngine = new IncomingPacketAsyncHandlingEngine(this); - OqrEngine = new OutgoingQueueRefillEngine(this); } public void Start() @@ -633,6 +633,16 @@ namespace OpenSim.Region.ClientStack.LindenUDP Scene = (Scene)scene; m_location = new Location(Scene.RegionInfo.RegionHandle); + + IpahEngine + = new JobEngine( + string.Format("Incoming Packet Async Handling Engine ({0})", Scene.Name), + "INCOMING PACKET ASYNC HANDLING ENGINE"); + + OqrEngine + = new JobEngine( + string.Format("Outgoing Queue Refill Engine ({0})", Scene.Name), + "OUTGOING QUEUE REFILL ENGINE"); StatsManager.RegisterStat( new Stat( @@ -713,6 +723,32 @@ namespace OpenSim.Region.ClientStack.LindenUDP MeasuresOfInterest.AverageChangeOverTime, stat => stat.Value = GetTotalQueuedOutgoingPackets(), StatVerbosity.Info)); + + StatsManager.RegisterStat( + new Stat( + "IncomingPacketAsyncRequestsWaiting", + "Number of incoming packets waiting for async processing in engine.", + "", + "", + "clientstack", + Scene.Name, + StatType.Pull, + MeasuresOfInterest.None, + stat => stat.Value = IpahEngine.JobsWaiting, + StatVerbosity.Debug)); + + StatsManager.RegisterStat( + new Stat( + "OQRERequestsWaiting", + "Number of outgong queue refill requests waiting for processing.", + "", + "", + "clientstack", + Scene.Name, + StatType.Pull, + MeasuresOfInterest.None, + stat => stat.Value = OqrEngine.JobsWaiting, + StatVerbosity.Debug)); // We delay enabling pool stats to AddScene() instead of Initialize() so that we can distinguish pool stats by // scene name diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServerCommands.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServerCommands.cs index e0398d5..17a394d 100644 --- a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServerCommands.cs +++ b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServerCommands.cs @@ -186,6 +186,15 @@ namespace OpenSim.Region.ClientStack.LindenUDP "debug lludp toggle agentupdate", "Toggle whether agentupdate packets are processed or simply discarded.", HandleAgentUpdateCommand); + + MainConsole.Instance.Commands.AddCommand( + "Debug", + false, + "debug lludp oqre", + "debug lludp oqre ", + "Start, stop or get status of OutgoingQueueRefillEngine.", + "If stopped then refill requests are processed directly via the threadpool.", + HandleOqreCommand); } private void HandleShowServerThrottlesCommand(string module, string[] args) @@ -758,5 +767,42 @@ namespace OpenSim.Region.ClientStack.LindenUDP MainConsole.Instance.OutputFormat( "Packet debug level for new clients is {0}", m_udpServer.DefaultClientPacketDebugLevel); } + + private void HandleOqreCommand(string module, string[] args) + { + if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene) + return; + + if (args.Length != 4) + { + MainConsole.Instance.Output("Usage: debug lludp oqre "); + return; + } + + string subCommand = args[3]; + + if (subCommand == "stop") + { + m_udpServer.OqrEngine.Stop(); + MainConsole.Instance.OutputFormat("Stopped OQRE for {0}", m_udpServer.Scene.Name); + } + else if (subCommand == "start") + { + m_udpServer.OqrEngine.Start(); + MainConsole.Instance.OutputFormat("Started OQRE for {0}", m_udpServer.Scene.Name); + } + else if (subCommand == "status") + { + MainConsole.Instance.OutputFormat("OQRE in {0}", m_udpServer.Scene.Name); + MainConsole.Instance.OutputFormat("Running: {0}", m_udpServer.OqrEngine.IsRunning); + MainConsole.Instance.OutputFormat( + "Requests waiting: {0}", + m_udpServer.OqrEngine.IsRunning ? m_udpServer.OqrEngine.JobsWaiting.ToString() : "n/a"); + } + else + { + MainConsole.Instance.OutputFormat("Unrecognized OQRE subcommand {0}", subCommand); + } + } } } \ No newline at end of file diff --git a/OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs b/OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs deleted file mode 100644 index 1e915c3..0000000 --- a/OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs +++ /dev/null @@ -1,288 +0,0 @@ -/* - * Copyright (c) Contributors, http://opensimulator.org/ - * See CONTRIBUTORS.TXT for a full list of copyright holders. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of the OpenSimulator Project nor the - * names of its contributors may be used to endorse or promote products - * derived from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -using System; -using System.Collections.Concurrent; -using System.Reflection; -using System.Threading; -using log4net; -using OpenSim.Framework; -using OpenSim.Framework.Monitoring; -using OpenSim.Region.Framework.Scenes; - -namespace OpenSim.Region.ClientStack.LindenUDP -{ - public struct RefillRequest - { - public LLUDPClient Client; - public ThrottleOutPacketTypeFlags Categories; - - public RefillRequest(LLUDPClient client, ThrottleOutPacketTypeFlags categories) - { - Client = client; - Categories = categories; - } - } - - public class OutgoingQueueRefillEngine - { - private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - - public bool IsRunning { get; private set; } - - /// - /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping. - /// - public int RequestProcessTimeoutOnStop { get; set; } - - /// - /// Controls whether we need to warn in the log about exceeding the max queue size. - /// - /// - /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in - /// order to avoid spamming the log with lots of warnings. - /// - private bool m_warnOverMaxQueue = true; - - private BlockingCollection m_requestQueue; - - private CancellationTokenSource m_cancelSource = new CancellationTokenSource(); - - private LLUDPServer m_udpServer; - - private Stat m_oqreRequestsWaitingStat; - - /// - /// Used to signal that we are ready to complete stop. - /// - private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); - - public OutgoingQueueRefillEngine(LLUDPServer server) - { - RequestProcessTimeoutOnStop = 5000; - m_udpServer = server; - - MainConsole.Instance.Commands.AddCommand( - "Debug", - false, - "debug lludp oqre", - "debug lludp oqre ", - "Start, stop or get status of OutgoingQueueRefillEngine.", - "If stopped then refill requests are processed directly via the threadpool.", - HandleOqreCommand); - } - - public void Start() - { - lock (this) - { - if (IsRunning) - return; - - IsRunning = true; - - m_finishedProcessingAfterStop.Reset(); - - m_requestQueue = new BlockingCollection(new ConcurrentQueue(), 5000); - - m_oqreRequestsWaitingStat = - new Stat( - "OQRERequestsWaiting", - "Number of outgong queue refill requests waiting for processing.", - "", - "", - "clientstack", - m_udpServer.Scene.Name, - StatType.Pull, - MeasuresOfInterest.None, - stat => stat.Value = m_requestQueue.Count, - StatVerbosity.Debug); - - StatsManager.RegisterStat(m_oqreRequestsWaitingStat); - - WorkManager.StartThread( - ProcessRequests, - String.Format("OutgoingQueueRefillEngineThread ({0})", m_udpServer.Scene.Name), - ThreadPriority.Normal, - false, - true, - null, - int.MaxValue); - } - } - - public void Stop() - { - lock (this) - { - try - { - if (!IsRunning) - return; - - IsRunning = false; - - int requestsLeft = m_requestQueue.Count; - - if (requestsLeft <= 0) - { - m_cancelSource.Cancel(); - } - else - { - m_log.InfoFormat("[OUTGOING QUEUE REFILL ENGINE]: Waiting to write {0} events after stop.", requestsLeft); - - while (requestsLeft > 0) - { - if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) - { - // After timeout no events have been written - if (requestsLeft == m_requestQueue.Count) - { - m_log.WarnFormat( - "[OUTGOING QUEUE REFILL ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests", - RequestProcessTimeoutOnStop, requestsLeft); - - break; - } - } - - requestsLeft = m_requestQueue.Count; - } - } - } - finally - { - m_cancelSource.Dispose(); - StatsManager.DeregisterStat(m_oqreRequestsWaitingStat); - m_oqreRequestsWaitingStat = null; - m_requestQueue = null; - } - } - } - - public bool QueueRequest(LLUDPClient client, ThrottleOutPacketTypeFlags categories) - { - if (m_requestQueue.Count < m_requestQueue.BoundedCapacity) - { -// m_log.DebugFormat( -// "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}", -// categories, client.AgentID, m_udpServer.Scene.Name); - - m_requestQueue.Add(new RefillRequest(client, categories)); - - if (!m_warnOverMaxQueue) - m_warnOverMaxQueue = true; - - return true; - } - else - { - if (m_warnOverMaxQueue) - { - m_log.WarnFormat( - "[OUTGOING QUEUE REFILL ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}", - client.AgentID, m_udpServer.Scene.Name); - - m_warnOverMaxQueue = false; - } - - return false; - } - } - - private void ProcessRequests() - { - Thread.CurrentThread.Priority = ThreadPriority.Highest; - - try - { - while (IsRunning || m_requestQueue.Count > 0) - { - RefillRequest req = m_requestQueue.Take(m_cancelSource.Token); - - // QueueEmpty callback = req.Client.OnQueueEmpty; - // - // if (callback != null) - // { - // try - // { - // callback(req.Categories); - // } - // catch (Exception e) - // { - // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e); - // } - // } - - req.Client.FireQueueEmpty(req.Categories); - } - } - catch (OperationCanceledException) - { - } - - m_finishedProcessingAfterStop.Set(); - } - - private void HandleOqreCommand(string module, string[] args) - { - if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene) - return; - - if (args.Length != 4) - { - MainConsole.Instance.Output("Usage: debug lludp oqre "); - return; - } - - string subCommand = args[3]; - - if (subCommand == "stop") - { - Stop(); - MainConsole.Instance.OutputFormat("Stopped OQRE for {0}", m_udpServer.Scene.Name); - } - else if (subCommand == "start") - { - Start(); - MainConsole.Instance.OutputFormat("Started OQRE for {0}", m_udpServer.Scene.Name); - } - else if (subCommand == "status") - { - MainConsole.Instance.OutputFormat("OQRE in {0}", m_udpServer.Scene.Name); - MainConsole.Instance.OutputFormat("Running: {0}", IsRunning); - MainConsole.Instance.OutputFormat( - "Requests waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a"); - } - else - { - MainConsole.Instance.OutputFormat("Unrecognized OQRE subcommand {0}", subCommand); - } - } - } -} \ No newline at end of file diff --git a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs index fceda80..fa23590 100644 --- a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs +++ b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs @@ -31,6 +31,7 @@ using System.Reflection; using OpenSim.Framework; using OpenSim.Framework.Client; +using OpenSim.Framework.Monitoring; using OpenSim.Region.Framework.Interfaces; using OpenSim.Region.Framework.Scenes; using OpenSim.Services.Connectors.Hypergrid; @@ -113,7 +114,7 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer /// /// Used for processing analysis of incoming attachments in a controlled fashion. /// - private HGIncomingSceneObjectEngine m_incomingSceneObjectEngine; + private JobEngine m_incomingSceneObjectEngine; #region ISharedRegionModule @@ -160,7 +161,24 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer scene.RegisterModuleInterface(this); //scene.EventManager.OnIncomingSceneObject += OnIncomingSceneObject; - m_incomingSceneObjectEngine = new HGIncomingSceneObjectEngine(scene.Name); + m_incomingSceneObjectEngine + = new JobEngine( + string.Format("HG Incoming Scene Object Engine ({0})", scene.Name), + "HG INCOMING SCENE OBJECT ENGINE"); + + StatsManager.RegisterStat( + new Stat( + "HGIncomingAttachmentsWaiting", + "Number of incoming attachments waiting for processing.", + "", + "", + "entitytransfer", + Name, + StatType.Pull, + MeasuresOfInterest.None, + stat => stat.Value = m_incomingSceneObjectEngine.JobsWaiting, + StatVerbosity.Debug)); + m_incomingSceneObjectEngine.Start(); } } @@ -548,11 +566,11 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer private void RemoveIncomingSceneObjectJobs(string commonIdToRemove) { - List jobsToReinsert = new List(); + List jobsToReinsert = new List(); int jobsRemoved = 0; - Job job; - while ((job = m_incomingSceneObjectEngine.RemoveNextRequest()) != null) + JobEngine.Job job; + while ((job = m_incomingSceneObjectEngine.RemoveNextJob()) != null) { if (job.CommonId != commonIdToRemove) jobsToReinsert.Add(job); @@ -566,8 +584,8 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer if (jobsToReinsert.Count > 0) { - foreach (Job jobToReinsert in jobsToReinsert) - m_incomingSceneObjectEngine.QueueRequest(jobToReinsert); + foreach (JobEngine.Job jobToReinsert in jobsToReinsert) + m_incomingSceneObjectEngine.QueueJob(jobToReinsert); } } @@ -594,10 +612,9 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer { if (aCircuit.ServiceURLs != null && aCircuit.ServiceURLs.ContainsKey("AssetServerURI")) { - m_incomingSceneObjectEngine.QueueRequest( + m_incomingSceneObjectEngine.QueueJob( string.Format("HG UUID Gather for attachment {0} for {1}", so.Name, aCircuit.Name), - so.OwnerID.ToString(), - o => + () => { string url = aCircuit.ServiceURLs["AssetServerURI"].ToString(); // m_log.DebugFormat( @@ -663,8 +680,8 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer // m_log.DebugFormat( // "[HG ENTITY TRANSFER MODULE]: Completed incoming attachment {0} for HG user {1} with asset server {2}", // so.Name, so.OwnerID, url); - }, - null); + }, + so.OwnerID.ToString()); } } } diff --git a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs deleted file mode 100644 index f62e7f4..0000000 --- a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Copyright (c) Contributors, http://opensimulator.org/ - * See CONTRIBUTORS.TXT for a full list of copyright holders. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of the OpenSimulator Project nor the - * names of its contributors may be used to endorse or promote products - * derived from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -using System; -using System.Collections.Concurrent; -using System.Reflection; -using System.Threading; -using log4net; -using OpenSim.Framework; -using OpenSim.Framework.Monitoring; -using OpenSim.Region.Framework.Scenes; - -namespace OpenSim.Region.CoreModules.Framework.EntityTransfer -{ - public class Job - { - public string Name { get; private set; } - public string CommonId { get; private set; } - public WaitCallback Callback { get; private set; } - public object O { get; private set; } - - public Job(string name, string commonId, WaitCallback callback, object o) - { - Name = name; - CommonId = commonId; - Callback = callback; - O = o; - } - } - - // TODO: These kinds of classes MUST be generalized with JobEngine, etc. - public class HGIncomingSceneObjectEngine - { - private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - - public int LogLevel { get; set; } - - public bool IsRunning { get; private set; } - - public string Name { get; set; } - - /// - /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping. - /// - public int RequestProcessTimeoutOnStop { get; set; } - - /// - /// Controls whether we need to warn in the log about exceeding the max queue size. - /// - /// - /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in - /// order to avoid spamming the log with lots of warnings. - /// - private bool m_warnOverMaxQueue = true; - - private BlockingCollection m_requestQueue; - - private CancellationTokenSource m_cancelSource = new CancellationTokenSource(); - - private Stat m_requestsWaitingStat; - - private Job m_currentJob; - - /// - /// Used to signal that we are ready to complete stop. - /// - private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); - - public HGIncomingSceneObjectEngine(string name) - { -// LogLevel = 1; - Name = name; - RequestProcessTimeoutOnStop = 5000; - -// MainConsole.Instance.Commands.AddCommand( -// "Debug", -// false, -// "debug jobengine", -// "debug jobengine ", -// "Start, stop or get status of the job engine.", -// "If stopped then all jobs are processed immediately.", -// HandleControlCommand); - } - - public void Start() - { - lock (this) - { - if (IsRunning) - return; - - IsRunning = true; - - m_finishedProcessingAfterStop.Reset(); - - m_requestQueue = new BlockingCollection(new ConcurrentQueue(), 5000); - - m_requestsWaitingStat = - new Stat( - "HGIncomingAttachmentsWaiting", - "Number of incoming attachments waiting for processing.", - "", - "", - "entitytransfer", - Name, - StatType.Pull, - MeasuresOfInterest.None, - stat => stat.Value = m_requestQueue.Count, - StatVerbosity.Debug); - - StatsManager.RegisterStat(m_requestsWaitingStat); - - WorkManager.StartThread( - ProcessRequests, - string.Format("HG Incoming Scene Object Engine Thread ({0})", Name), - ThreadPriority.Normal, - false, - true, - null, - int.MaxValue); - } - } - - public void Stop() - { - lock (this) - { - try - { - if (!IsRunning) - return; - - IsRunning = false; - - int requestsLeft = m_requestQueue.Count; - - if (requestsLeft <= 0) - { - m_cancelSource.Cancel(); - } - else - { - m_log.InfoFormat("[HG INCOMING SCENE OBJECT ENGINE]: Waiting to write {0} events after stop.", requestsLeft); - - while (requestsLeft > 0) - { - if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) - { - // After timeout no events have been written - if (requestsLeft == m_requestQueue.Count) - { - m_log.WarnFormat( - "[HG INCOMING SCENE OBJECT ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests", - RequestProcessTimeoutOnStop, requestsLeft); - - break; - } - } - - requestsLeft = m_requestQueue.Count; - } - } - } - finally - { - m_cancelSource.Dispose(); - StatsManager.DeregisterStat(m_requestsWaitingStat); - m_requestsWaitingStat = null; - m_requestQueue = null; - } - } - } - - public Job RemoveNextRequest() - { - Job nextRequest; - m_requestQueue.TryTake(out nextRequest); - - return nextRequest; - } - - public bool QueueRequest(string name, string commonId, WaitCallback req, object o) - { - return QueueRequest(new Job(name, commonId, req, o)); - } - - public bool QueueRequest(Job job) - { - if (LogLevel >= 1) - m_log.DebugFormat( - "[HG INCOMING SCENE OBJECT ENGINE]: Queued job {0}, common ID {1}", job.Name, job.CommonId); - - if (m_requestQueue.Count < m_requestQueue.BoundedCapacity) - { - // m_log.DebugFormat( - // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}", - // categories, client.AgentID, m_udpServer.Scene.Name); - - m_requestQueue.Add(job); - - if (!m_warnOverMaxQueue) - m_warnOverMaxQueue = true; - - return true; - } - else - { - if (m_warnOverMaxQueue) - { - // m_log.WarnFormat( - // "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}", - // client.AgentID, m_udpServer.Scene.Name); - - m_log.WarnFormat("[HG INCOMING SCENE OBJECT ENGINE]: Request queue at maximum capacity, not recording job"); - - m_warnOverMaxQueue = false; - } - - return false; - } - } - - private void ProcessRequests() - { - try - { - while (IsRunning || m_requestQueue.Count > 0) - { - m_currentJob = m_requestQueue.Take(m_cancelSource.Token); - - // QueueEmpty callback = req.Client.OnQueueEmpty; - // - // if (callback != null) - // { - // try - // { - // callback(req.Categories); - // } - // catch (Exception e) - // { - // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e); - // } - // } - - if (LogLevel >= 1) - m_log.DebugFormat("[HG INCOMING SCENE OBJECT ENGINE]: Processing job {0}", m_currentJob.Name); - - try - { - m_currentJob.Callback.Invoke(m_currentJob.O); - } - catch (Exception e) - { - m_log.Error( - string.Format( - "[HG INCOMING SCENE OBJECT ENGINE]: Job {0} failed, continuing. Exception ", m_currentJob.Name), e); - } - - if (LogLevel >= 1) - m_log.DebugFormat("[HG INCOMING SCENE OBJECT ENGINE]: Processed job {0}", m_currentJob.Name); - - m_currentJob = null; - } - } - catch (OperationCanceledException) - { - } - - m_finishedProcessingAfterStop.Set(); - } - -// private void HandleControlCommand(string module, string[] args) -// { -// // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene) -// // return; -// -// if (args.Length < 3) -// { -// MainConsole.Instance.Output("Usage: debug jobengine "); -// return; -// } -// -// string subCommand = args[2]; -// -// if (subCommand == "stop") -// { -// Stop(); -// MainConsole.Instance.OutputFormat("Stopped job engine."); -// } -// else if (subCommand == "start") -// { -// Start(); -// MainConsole.Instance.OutputFormat("Started job engine."); -// } -// else if (subCommand == "status") -// { -// MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning); -// MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none"); -// MainConsole.Instance.OutputFormat( -// "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a"); -// MainConsole.Instance.OutputFormat("Log Level: {0}", LogLevel); -// } -// -// else if (subCommand == "loglevel") -// { -// // int logLevel; -// int logLevel = int.Parse(args[3]); -// // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel)) -// // { -// LogLevel = logLevel; -// MainConsole.Instance.OutputFormat("Set log level to {0}", LogLevel); -// // } -// } -// else -// { -// MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand); -// } -// } - } -} -- cgit v1.1