From 8e1e8a0920a9e94305619e9afb8e053b4daefb89 Mon Sep 17 00:00:00 2001 From: Justin Clark-Casey (justincc) Date: Mon, 12 Jan 2015 20:56:37 +0000 Subject: Make the performance controlling job processing threads introduced in conference code use a generic JobEngine class rather than 4 slightly different copy/pasted versions. --- .../EntityTransfer/HGEntityTransferModule.cs | 41 ++- .../EntityTransfer/HGIncomingSceneObjectEngine.cs | 344 --------------------- 2 files changed, 29 insertions(+), 356 deletions(-) delete mode 100644 OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs (limited to 'OpenSim/Region/CoreModules') diff --git a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs index fceda80..fa23590 100644 --- a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs +++ b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs @@ -31,6 +31,7 @@ using System.Reflection; using OpenSim.Framework; using OpenSim.Framework.Client; +using OpenSim.Framework.Monitoring; using OpenSim.Region.Framework.Interfaces; using OpenSim.Region.Framework.Scenes; using OpenSim.Services.Connectors.Hypergrid; @@ -113,7 +114,7 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer /// /// Used for processing analysis of incoming attachments in a controlled fashion. /// - private HGIncomingSceneObjectEngine m_incomingSceneObjectEngine; + private JobEngine m_incomingSceneObjectEngine; #region ISharedRegionModule @@ -160,7 +161,24 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer scene.RegisterModuleInterface(this); //scene.EventManager.OnIncomingSceneObject += OnIncomingSceneObject; - m_incomingSceneObjectEngine = new HGIncomingSceneObjectEngine(scene.Name); + m_incomingSceneObjectEngine + = new JobEngine( + string.Format("HG Incoming Scene Object Engine ({0})", scene.Name), + "HG INCOMING SCENE OBJECT ENGINE"); + + StatsManager.RegisterStat( + new Stat( + "HGIncomingAttachmentsWaiting", + "Number of incoming attachments waiting for processing.", + "", + "", + "entitytransfer", + Name, + StatType.Pull, + MeasuresOfInterest.None, + stat => stat.Value = m_incomingSceneObjectEngine.JobsWaiting, + StatVerbosity.Debug)); + m_incomingSceneObjectEngine.Start(); } } @@ -548,11 +566,11 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer private void RemoveIncomingSceneObjectJobs(string commonIdToRemove) { - List jobsToReinsert = new List(); + List jobsToReinsert = new List(); int jobsRemoved = 0; - Job job; - while ((job = m_incomingSceneObjectEngine.RemoveNextRequest()) != null) + JobEngine.Job job; + while ((job = m_incomingSceneObjectEngine.RemoveNextJob()) != null) { if (job.CommonId != commonIdToRemove) jobsToReinsert.Add(job); @@ -566,8 +584,8 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer if (jobsToReinsert.Count > 0) { - foreach (Job jobToReinsert in jobsToReinsert) - m_incomingSceneObjectEngine.QueueRequest(jobToReinsert); + foreach (JobEngine.Job jobToReinsert in jobsToReinsert) + m_incomingSceneObjectEngine.QueueJob(jobToReinsert); } } @@ -594,10 +612,9 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer { if (aCircuit.ServiceURLs != null && aCircuit.ServiceURLs.ContainsKey("AssetServerURI")) { - m_incomingSceneObjectEngine.QueueRequest( + m_incomingSceneObjectEngine.QueueJob( string.Format("HG UUID Gather for attachment {0} for {1}", so.Name, aCircuit.Name), - so.OwnerID.ToString(), - o => + () => { string url = aCircuit.ServiceURLs["AssetServerURI"].ToString(); // m_log.DebugFormat( @@ -663,8 +680,8 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer // m_log.DebugFormat( // "[HG ENTITY TRANSFER MODULE]: Completed incoming attachment {0} for HG user {1} with asset server {2}", // so.Name, so.OwnerID, url); - }, - null); + }, + so.OwnerID.ToString()); } } } diff --git a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs deleted file mode 100644 index f62e7f4..0000000 --- a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Copyright (c) Contributors, http://opensimulator.org/ - * See CONTRIBUTORS.TXT for a full list of copyright holders. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of the OpenSimulator Project nor the - * names of its contributors may be used to endorse or promote products - * derived from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -using System; -using System.Collections.Concurrent; -using System.Reflection; -using System.Threading; -using log4net; -using OpenSim.Framework; -using OpenSim.Framework.Monitoring; -using OpenSim.Region.Framework.Scenes; - -namespace OpenSim.Region.CoreModules.Framework.EntityTransfer -{ - public class Job - { - public string Name { get; private set; } - public string CommonId { get; private set; } - public WaitCallback Callback { get; private set; } - public object O { get; private set; } - - public Job(string name, string commonId, WaitCallback callback, object o) - { - Name = name; - CommonId = commonId; - Callback = callback; - O = o; - } - } - - // TODO: These kinds of classes MUST be generalized with JobEngine, etc. - public class HGIncomingSceneObjectEngine - { - private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - - public int LogLevel { get; set; } - - public bool IsRunning { get; private set; } - - public string Name { get; set; } - - /// - /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping. - /// - public int RequestProcessTimeoutOnStop { get; set; } - - /// - /// Controls whether we need to warn in the log about exceeding the max queue size. - /// - /// - /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in - /// order to avoid spamming the log with lots of warnings. - /// - private bool m_warnOverMaxQueue = true; - - private BlockingCollection m_requestQueue; - - private CancellationTokenSource m_cancelSource = new CancellationTokenSource(); - - private Stat m_requestsWaitingStat; - - private Job m_currentJob; - - /// - /// Used to signal that we are ready to complete stop. - /// - private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); - - public HGIncomingSceneObjectEngine(string name) - { -// LogLevel = 1; - Name = name; - RequestProcessTimeoutOnStop = 5000; - -// MainConsole.Instance.Commands.AddCommand( -// "Debug", -// false, -// "debug jobengine", -// "debug jobengine ", -// "Start, stop or get status of the job engine.", -// "If stopped then all jobs are processed immediately.", -// HandleControlCommand); - } - - public void Start() - { - lock (this) - { - if (IsRunning) - return; - - IsRunning = true; - - m_finishedProcessingAfterStop.Reset(); - - m_requestQueue = new BlockingCollection(new ConcurrentQueue(), 5000); - - m_requestsWaitingStat = - new Stat( - "HGIncomingAttachmentsWaiting", - "Number of incoming attachments waiting for processing.", - "", - "", - "entitytransfer", - Name, - StatType.Pull, - MeasuresOfInterest.None, - stat => stat.Value = m_requestQueue.Count, - StatVerbosity.Debug); - - StatsManager.RegisterStat(m_requestsWaitingStat); - - WorkManager.StartThread( - ProcessRequests, - string.Format("HG Incoming Scene Object Engine Thread ({0})", Name), - ThreadPriority.Normal, - false, - true, - null, - int.MaxValue); - } - } - - public void Stop() - { - lock (this) - { - try - { - if (!IsRunning) - return; - - IsRunning = false; - - int requestsLeft = m_requestQueue.Count; - - if (requestsLeft <= 0) - { - m_cancelSource.Cancel(); - } - else - { - m_log.InfoFormat("[HG INCOMING SCENE OBJECT ENGINE]: Waiting to write {0} events after stop.", requestsLeft); - - while (requestsLeft > 0) - { - if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) - { - // After timeout no events have been written - if (requestsLeft == m_requestQueue.Count) - { - m_log.WarnFormat( - "[HG INCOMING SCENE OBJECT ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests", - RequestProcessTimeoutOnStop, requestsLeft); - - break; - } - } - - requestsLeft = m_requestQueue.Count; - } - } - } - finally - { - m_cancelSource.Dispose(); - StatsManager.DeregisterStat(m_requestsWaitingStat); - m_requestsWaitingStat = null; - m_requestQueue = null; - } - } - } - - public Job RemoveNextRequest() - { - Job nextRequest; - m_requestQueue.TryTake(out nextRequest); - - return nextRequest; - } - - public bool QueueRequest(string name, string commonId, WaitCallback req, object o) - { - return QueueRequest(new Job(name, commonId, req, o)); - } - - public bool QueueRequest(Job job) - { - if (LogLevel >= 1) - m_log.DebugFormat( - "[HG INCOMING SCENE OBJECT ENGINE]: Queued job {0}, common ID {1}", job.Name, job.CommonId); - - if (m_requestQueue.Count < m_requestQueue.BoundedCapacity) - { - // m_log.DebugFormat( - // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}", - // categories, client.AgentID, m_udpServer.Scene.Name); - - m_requestQueue.Add(job); - - if (!m_warnOverMaxQueue) - m_warnOverMaxQueue = true; - - return true; - } - else - { - if (m_warnOverMaxQueue) - { - // m_log.WarnFormat( - // "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}", - // client.AgentID, m_udpServer.Scene.Name); - - m_log.WarnFormat("[HG INCOMING SCENE OBJECT ENGINE]: Request queue at maximum capacity, not recording job"); - - m_warnOverMaxQueue = false; - } - - return false; - } - } - - private void ProcessRequests() - { - try - { - while (IsRunning || m_requestQueue.Count > 0) - { - m_currentJob = m_requestQueue.Take(m_cancelSource.Token); - - // QueueEmpty callback = req.Client.OnQueueEmpty; - // - // if (callback != null) - // { - // try - // { - // callback(req.Categories); - // } - // catch (Exception e) - // { - // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e); - // } - // } - - if (LogLevel >= 1) - m_log.DebugFormat("[HG INCOMING SCENE OBJECT ENGINE]: Processing job {0}", m_currentJob.Name); - - try - { - m_currentJob.Callback.Invoke(m_currentJob.O); - } - catch (Exception e) - { - m_log.Error( - string.Format( - "[HG INCOMING SCENE OBJECT ENGINE]: Job {0} failed, continuing. Exception ", m_currentJob.Name), e); - } - - if (LogLevel >= 1) - m_log.DebugFormat("[HG INCOMING SCENE OBJECT ENGINE]: Processed job {0}", m_currentJob.Name); - - m_currentJob = null; - } - } - catch (OperationCanceledException) - { - } - - m_finishedProcessingAfterStop.Set(); - } - -// private void HandleControlCommand(string module, string[] args) -// { -// // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene) -// // return; -// -// if (args.Length < 3) -// { -// MainConsole.Instance.Output("Usage: debug jobengine "); -// return; -// } -// -// string subCommand = args[2]; -// -// if (subCommand == "stop") -// { -// Stop(); -// MainConsole.Instance.OutputFormat("Stopped job engine."); -// } -// else if (subCommand == "start") -// { -// Start(); -// MainConsole.Instance.OutputFormat("Started job engine."); -// } -// else if (subCommand == "status") -// { -// MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning); -// MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none"); -// MainConsole.Instance.OutputFormat( -// "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a"); -// MainConsole.Instance.OutputFormat("Log Level: {0}", LogLevel); -// } -// -// else if (subCommand == "loglevel") -// { -// // int logLevel; -// int logLevel = int.Parse(args[3]); -// // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel)) -// // { -// LogLevel = logLevel; -// MainConsole.Instance.OutputFormat("Set log level to {0}", LogLevel); -// // } -// } -// else -// { -// MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand); -// } -// } - } -} -- cgit v1.1