From d6c9705a3b9d4c9bbe0e23f6d8d3793c07bf53ea Mon Sep 17 00:00:00 2001 From: Justin Clark-Casey (justincc) Date: Wed, 5 Nov 2014 19:02:26 +0000 Subject: Add incoming packet async handling engine to queue some inbound udp async requests. This is to reduce the potential for overload of the threadpool if there are many simultaneous requets in high concurrency situations. Currently only applied to AvatarProperties and GenericMessage requests. --- .../UDP/IncomingPacketAsyncHandlingEngine.cs | 328 +++++++++++++++++++++ .../Region/ClientStack/Linden/UDP/LLClientView.cs | 67 ++++- .../Region/ClientStack/Linden/UDP/LLUDPServer.cs | 9 + .../EntityTransfer/HGIncomingSceneObjectEngine.cs | 2 +- 4 files changed, 394 insertions(+), 12 deletions(-) create mode 100644 OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs (limited to 'OpenSim') diff --git a/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs b/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs new file mode 100644 index 0000000..874ddae --- /dev/null +++ b/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs @@ -0,0 +1,328 @@ +/* + * Copyright (c) Contributors, http://opensimulator.org/ + * See CONTRIBUTORS.TXT for a full list of copyright holders. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the OpenSimulator Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +using System; +using System.Collections.Concurrent; +using System.Reflection; +using System.Threading; +using log4net; +using OpenSim.Framework; +using OpenSim.Framework.Monitoring; +using OpenSim.Region.Framework.Scenes; + +namespace OpenSim.Region.ClientStack.LindenUDP +{ + public class Job + { + public string Name; + public WaitCallback Callback; + public object O; + + public Job(string name, WaitCallback callback, object o) + { + Name = name; + Callback = callback; + O = o; + } + } + + // TODO: These kinds of classes MUST be generalized with JobEngine, etc. + public class IncomingPacketAsyncHandlingEngine + { + private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + + public int LogLevel { get; set; } + + public bool IsRunning { get; private set; } + + /// + /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping. + /// + public int RequestProcessTimeoutOnStop { get; set; } + + /// + /// Controls whether we need to warn in the log about exceeding the max queue size. + /// + /// + /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in + /// order to avoid spamming the log with lots of warnings. + /// + private bool m_warnOverMaxQueue = true; + + private BlockingCollection m_requestQueue; + + private CancellationTokenSource m_cancelSource = new CancellationTokenSource(); + + private LLUDPServer m_udpServer; + + private Stat m_requestsWaitingStat; + + private Job m_currentJob; + + /// + /// Used to signal that we are ready to complete stop. + /// + private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); + + public IncomingPacketAsyncHandlingEngine(LLUDPServer server) + { + //LogLevel = 1; + m_udpServer = server; + RequestProcessTimeoutOnStop = 5000; + + // MainConsole.Instance.Commands.AddCommand( + // "Debug", + // false, + // "debug jobengine", + // "debug jobengine ", + // "Start, stop or get status of the job engine.", + // "If stopped then all jobs are processed immediately.", + // HandleControlCommand); + } + + public void Start() + { + lock (this) + { + if (IsRunning) + return; + + IsRunning = true; + + m_finishedProcessingAfterStop.Reset(); + + m_requestQueue = new BlockingCollection(new ConcurrentQueue(), 5000); + + m_requestsWaitingStat = + new Stat( + "IncomingPacketAsyncRequestsWaiting", + "Number of incoming packets waiting for async processing in engine.", + "", + "", + "clientstack", + m_udpServer.Scene.Name, + StatType.Pull, + MeasuresOfInterest.None, + stat => stat.Value = m_requestQueue.Count, + StatVerbosity.Debug); + + StatsManager.RegisterStat(m_requestsWaitingStat); + + Watchdog.StartThread( + ProcessRequests, + string.Format("Incoming Packet Async Handling Engine Thread ({0})", m_udpServer.Scene.Name), + ThreadPriority.Normal, + false, + true, + null, + int.MaxValue); + } + } + + public void Stop() + { + lock (this) + { + try + { + if (!IsRunning) + return; + + IsRunning = false; + + int requestsLeft = m_requestQueue.Count; + + if (requestsLeft <= 0) + { + m_cancelSource.Cancel(); + } + else + { + m_log.InfoFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Waiting to write {0} events after stop.", requestsLeft); + + while (requestsLeft > 0) + { + if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) + { + // After timeout no events have been written + if (requestsLeft == m_requestQueue.Count) + { + m_log.WarnFormat( + "[INCOMING PACKET ASYNC HANDLING ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests", + RequestProcessTimeoutOnStop, requestsLeft); + + break; + } + } + + requestsLeft = m_requestQueue.Count; + } + } + } + finally + { + m_cancelSource.Dispose(); + StatsManager.DeregisterStat(m_requestsWaitingStat); + m_requestsWaitingStat = null; + m_requestQueue = null; + } + } + } + + public bool QueueRequest(string name, WaitCallback req, object o) + { + if (LogLevel >= 1) + m_log.DebugFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Queued job {0}", name); + + if (m_requestQueue.Count < m_requestQueue.BoundedCapacity) + { + // m_log.DebugFormat( + // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}", + // categories, client.AgentID, m_udpServer.Scene.Name); + + m_requestQueue.Add(new Job(name, req, o)); + + if (!m_warnOverMaxQueue) + m_warnOverMaxQueue = true; + + return true; + } + else + { + if (m_warnOverMaxQueue) + { + // m_log.WarnFormat( + // "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}", + // client.AgentID, m_udpServer.Scene.Name); + + m_log.WarnFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Request queue at maximum capacity, not recording job"); + + m_warnOverMaxQueue = false; + } + + return false; + } + } + + private void ProcessRequests() + { + try + { + while (IsRunning || m_requestQueue.Count > 0) + { + m_currentJob = m_requestQueue.Take(m_cancelSource.Token); + + // QueueEmpty callback = req.Client.OnQueueEmpty; + // + // if (callback != null) + // { + // try + // { + // callback(req.Categories); + // } + // catch (Exception e) + // { + // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e); + // } + // } + + if (LogLevel >= 1) + m_log.DebugFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Processing job {0}", m_currentJob.Name); + + try + { + m_currentJob.Callback.Invoke(m_currentJob.O); + } + catch (Exception e) + { + m_log.Error( + string.Format( + "[INCOMING PACKET ASYNC HANDLING ENGINE]: Job {0} failed, continuing. Exception ", m_currentJob.Name), e); + } + + if (LogLevel >= 1) + m_log.DebugFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Processed job {0}", m_currentJob.Name); + + m_currentJob = null; + } + } + catch (OperationCanceledException) + { + } + + m_finishedProcessingAfterStop.Set(); + } + + // private void HandleControlCommand(string module, string[] args) + // { + // // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene) + // // return; + // + // if (args.Length < 3) + // { + // MainConsole.Instance.Output("Usage: debug jobengine "); + // return; + // } + // + // string subCommand = args[2]; + // + // if (subCommand == "stop") + // { + // Stop(); + // MainConsole.Instance.OutputFormat("Stopped job engine."); + // } + // else if (subCommand == "start") + // { + // Start(); + // MainConsole.Instance.OutputFormat("Started job engine."); + // } + // else if (subCommand == "status") + // { + // MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning); + // MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none"); + // MainConsole.Instance.OutputFormat( + // "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a"); + // MainConsole.Instance.OutputFormat("Log Level: {0}", LogLevel); + // } + // + // else if (subCommand == "loglevel") + // { + // // int logLevel; + // int logLevel = int.Parse(args[3]); + // // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel)) + // // { + // LogLevel = logLevel; + // MainConsole.Instance.OutputFormat("Set log level to {0}", LogLevel); + // // } + // } + // else + // { + // MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand); + // } + // } + } +} diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs index 516327c..85f9d68 100644 --- a/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs +++ b/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs @@ -648,12 +648,36 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// true if the handler was added. This is currently always the case. public bool AddLocalPacketHandler(PacketType packetType, PacketMethod handler, bool doAsync) { + return AddLocalPacketHandler(packetType, handler, doAsync, false); + } + + /// + /// Add a handler for the given packet type. + /// + /// + /// + /// + /// If true, when the packet is received handle it on a different thread. Whether this is given direct to + /// a threadpool thread or placed in a queue depends on the inEngine parameter. + /// + /// + /// If async is false then this parameter is ignored. + /// If async is true and inEngine is false, then the packet is sent directly to a + /// threadpool thread. + /// If async is true and inEngine is true, then the packet is sent to the IncomingPacketAsyncHandlingEngine. + /// This may result in slower handling but reduces the risk of overloading the simulator when there are many + /// simultaneous async requests. + /// + /// true if the handler was added. This is currently always the case. + public bool AddLocalPacketHandler(PacketType packetType, PacketMethod handler, bool doAsync, bool inEngine) + { bool result = false; lock (m_packetHandlers) { if (!m_packetHandlers.ContainsKey(packetType)) { - m_packetHandlers.Add(packetType, new PacketProcessor() { method = handler, Async = doAsync }); + m_packetHandlers.Add( + packetType, new PacketProcessor() { method = handler, Async = doAsync, InEngine = inEngine }); result = true; } } @@ -688,21 +712,29 @@ namespace OpenSim.Region.ClientStack.LindenUDP PacketProcessor pprocessor; if (m_packetHandlers.TryGetValue(packet.Type, out pprocessor)) { + ClientInfo cinfo = UDPClient.GetClientInfo(); + //there is a local handler for this packet type if (pprocessor.Async) { - ClientInfo cinfo = UDPClient.GetClientInfo(); if (!cinfo.AsyncRequests.ContainsKey(packet.Type.ToString())) cinfo.AsyncRequests[packet.Type.ToString()] = 0; cinfo.AsyncRequests[packet.Type.ToString()]++; object obj = new AsyncPacketProcess(this, pprocessor.method, packet); - Util.FireAndForget(ProcessSpecificPacketAsync, obj, packet.Type.ToString()); + + if (pprocessor.InEngine) + m_udpServer.IpahEngine.QueueRequest( + packet.Type.ToString(), + ProcessSpecificPacketAsync, + obj); + else + Util.FireAndForget(ProcessSpecificPacketAsync, obj, packet.Type.ToString()); + result = true; } else { - ClientInfo cinfo = UDPClient.GetClientInfo(); if (!cinfo.SyncRequests.ContainsKey(packet.Type.ToString())) cinfo.SyncRequests[packet.Type.ToString()] = 0; cinfo.SyncRequests[packet.Type.ToString()]++; @@ -5554,10 +5586,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP AddLocalPacketHandler(PacketType.ParcelBuy, HandleParcelBuyRequest, false); AddLocalPacketHandler(PacketType.UUIDGroupNameRequest, HandleUUIDGroupNameRequest); AddLocalPacketHandler(PacketType.ObjectGroup, HandleObjectGroupRequest); - AddLocalPacketHandler(PacketType.GenericMessage, HandleGenericMessage); - AddLocalPacketHandler(PacketType.AvatarPropertiesRequest, HandleAvatarPropertiesRequest); + AddLocalPacketHandler(PacketType.GenericMessage, HandleGenericMessage, true, true); + AddLocalPacketHandler(PacketType.AvatarPropertiesRequest, HandleAvatarPropertiesRequest, true, true); AddLocalPacketHandler(PacketType.ChatFromViewer, HandleChatFromViewer); - AddLocalPacketHandler(PacketType.AvatarPropertiesUpdate, HandlerAvatarPropertiesUpdate); + AddLocalPacketHandler(PacketType.AvatarPropertiesUpdate, HandlerAvatarPropertiesUpdate, true, true); AddLocalPacketHandler(PacketType.ScriptDialogReply, HandlerScriptDialogReply); AddLocalPacketHandler(PacketType.ImprovedInstantMessage, HandlerImprovedInstantMessage); AddLocalPacketHandler(PacketType.AcceptFriendship, HandlerAcceptFriendship); @@ -5742,8 +5774,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP AddLocalPacketHandler(PacketType.PickDelete, HandlePickDelete); AddLocalPacketHandler(PacketType.PickGodDelete, HandlePickGodDelete); AddLocalPacketHandler(PacketType.PickInfoUpdate, HandlePickInfoUpdate); - AddLocalPacketHandler(PacketType.AvatarNotesUpdate, HandleAvatarNotesUpdate); - AddLocalPacketHandler(PacketType.AvatarInterestsUpdate, HandleAvatarInterestsUpdate); + AddLocalPacketHandler(PacketType.AvatarNotesUpdate, HandleAvatarNotesUpdate, true, true); + AddLocalPacketHandler(PacketType.AvatarInterestsUpdate, HandleAvatarInterestsUpdate, true, true); AddLocalPacketHandler(PacketType.GrantUserRights, HandleGrantUserRights); AddLocalPacketHandler(PacketType.PlacesQuery, HandlePlacesQuery); AddLocalPacketHandler(PacketType.UpdateMuteListEntry, HandleUpdateMuteListEntry); @@ -12801,8 +12833,21 @@ namespace OpenSim.Region.ClientStack.LindenUDP public struct PacketProcessor { - public PacketMethod method; - public bool Async; + /// + /// Packet handling method. + /// + public PacketMethod method { get; set; } + + /// + /// Should this packet be handled asynchronously? + /// + public bool Async { get; set; } + + /// + /// If async is true, should this packet be handled in the async engine or given directly to a threadpool + /// thread? + /// + public bool InEngine { get; set; } } public class AsyncPacketProcess diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs index 61e1d6a..d8cf7a5 100644 --- a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs +++ b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs @@ -364,6 +364,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP private IClientAPI m_currentIncomingClient; /// + /// Queue some low priority but potentially high volume async requests so that they don't overwhelm available + /// threadpool threads. + /// + public IncomingPacketAsyncHandlingEngine IpahEngine { get; private set; } + + /// /// Experimental facility to run queue empty processing within a controlled number of threads rather than /// requiring massive numbers of short-lived threads from the threadpool when there are a high number of /// connections. @@ -454,6 +460,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP if (usePools) EnablePools(); + IpahEngine = new IncomingPacketAsyncHandlingEngine(this); OqrEngine = new OutgoingQueueRefillEngine(this); } @@ -461,6 +468,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP { StartInbound(); StartOutbound(); + IpahEngine.Start(); OqrEngine.Start(); m_elapsedMSSinceLastStatReport = Environment.TickCount; @@ -506,6 +514,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP m_log.Info("[LLUDPSERVER]: Shutting down the LLUDP server for " + Scene.Name); base.StopOutbound(); base.StopInbound(); + IpahEngine.Stop(); OqrEngine.Stop(); } diff --git a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs index 5cf2e1f..718db31 100644 --- a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs +++ b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs @@ -133,7 +133,7 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer Watchdog.StartThread( ProcessRequests, - "HG Incoming Scene Object Engine Thread", + string.Format("HG Incoming Scene Object Engine Thread ({0})", Name), ThreadPriority.Normal, false, true, -- cgit v1.1