From 8e1e8a0920a9e94305619e9afb8e053b4daefb89 Mon Sep 17 00:00:00 2001 From: Justin Clark-Casey (justincc) Date: Mon, 12 Jan 2015 20:56:37 +0000 Subject: Make the performance controlling job processing threads introduced in conference code use a generic JobEngine class rather than 4 slightly different copy/pasted versions. --- OpenSim/Framework/Monitoring/JobEngine.cs | 320 ---------------------------- OpenSim/Framework/Monitoring/WorkManager.cs | 76 ++++++- 2 files changed, 74 insertions(+), 322 deletions(-) delete mode 100644 OpenSim/Framework/Monitoring/JobEngine.cs (limited to 'OpenSim/Framework') diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs deleted file mode 100644 index 5925867..0000000 --- a/OpenSim/Framework/Monitoring/JobEngine.cs +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Copyright (c) Contributors, http://opensimulator.org/ - * See CONTRIBUTORS.TXT for a full list of copyright holders. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of the OpenSimulator Project nor the - * names of its contributors may be used to endorse or promote products - * derived from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -using System; -using System.Collections.Concurrent; -using System.Reflection; -using System.Threading; -using log4net; -using OpenSim.Framework; - -namespace OpenSim.Framework.Monitoring -{ - public class Job - { - public string Name; - public WaitCallback Callback; - public object O; - - public Job(string name, WaitCallback callback, object o) - { - Name = name; - Callback = callback; - O = o; - } - } - - public class JobEngine - { - private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - - public int LogLevel { get; set; } - - public bool IsRunning { get; private set; } - - /// - /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping. - /// - public int RequestProcessTimeoutOnStop { get; set; } - - /// - /// Controls whether we need to warn in the log about exceeding the max queue size. - /// - /// - /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in - /// order to avoid spamming the log with lots of warnings. - /// - private bool m_warnOverMaxQueue = true; - - private BlockingCollection m_requestQueue; - - private CancellationTokenSource m_cancelSource = new CancellationTokenSource(); - - private Stat m_requestsWaitingStat; - - private Job m_currentJob; - - /// - /// Used to signal that we are ready to complete stop. - /// - private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); - - public JobEngine() - { - RequestProcessTimeoutOnStop = 5000; - - MainConsole.Instance.Commands.AddCommand( - "Debug", - false, - "debug jobengine", - "debug jobengine ", - "Start, stop, get status or set logging level of the job engine.", - "If stopped then all outstanding jobs are processed immediately.", - HandleControlCommand); - } - - public void Start() - { - lock (this) - { - if (IsRunning) - return; - - IsRunning = true; - - m_finishedProcessingAfterStop.Reset(); - - m_requestQueue = new BlockingCollection(new ConcurrentQueue(), 5000); - - m_requestsWaitingStat = - new Stat( - "JobsWaiting", - "Number of jobs waiting for processing.", - "", - "", - "server", - "jobengine", - StatType.Pull, - MeasuresOfInterest.None, - stat => stat.Value = m_requestQueue.Count, - StatVerbosity.Debug); - - StatsManager.RegisterStat(m_requestsWaitingStat); - - WorkManager.StartThread( - ProcessRequests, - "JobEngineThread", - ThreadPriority.Normal, - false, - true, - null, - int.MaxValue); - } - } - - public void Stop() - { - lock (this) - { - try - { - if (!IsRunning) - return; - - IsRunning = false; - - int requestsLeft = m_requestQueue.Count; - - if (requestsLeft <= 0) - { - m_cancelSource.Cancel(); - } - else - { - m_log.InfoFormat("[JOB ENGINE]: Waiting to write {0} events after stop.", requestsLeft); - - while (requestsLeft > 0) - { - if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) - { - // After timeout no events have been written - if (requestsLeft == m_requestQueue.Count) - { - m_log.WarnFormat( - "[JOB ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests", - RequestProcessTimeoutOnStop, requestsLeft); - - break; - } - } - - requestsLeft = m_requestQueue.Count; - } - } - } - finally - { - m_cancelSource.Dispose(); - StatsManager.DeregisterStat(m_requestsWaitingStat); - m_requestsWaitingStat = null; - m_requestQueue = null; - } - } - } - - public bool QueueRequest(string name, WaitCallback req, object o) - { - if (LogLevel >= 1) - m_log.DebugFormat("[JOB ENGINE]: Queued job {0}", name); - - if (m_requestQueue.Count < m_requestQueue.BoundedCapacity) - { - // m_log.DebugFormat( - // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}", - // categories, client.AgentID, m_udpServer.Scene.Name); - - m_requestQueue.Add(new Job(name, req, o)); - - if (!m_warnOverMaxQueue) - m_warnOverMaxQueue = true; - - return true; - } - else - { - if (m_warnOverMaxQueue) - { -// m_log.WarnFormat( -// "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}", -// client.AgentID, m_udpServer.Scene.Name); - - m_log.WarnFormat("[JOB ENGINE]: Request queue at maximum capacity, not recording job"); - - m_warnOverMaxQueue = false; - } - - return false; - } - } - - private void ProcessRequests() - { - try - { - while (IsRunning || m_requestQueue.Count > 0) - { - m_currentJob = m_requestQueue.Take(m_cancelSource.Token); - - // QueueEmpty callback = req.Client.OnQueueEmpty; - // - // if (callback != null) - // { - // try - // { - // callback(req.Categories); - // } - // catch (Exception e) - // { - // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e); - // } - // } - - if (LogLevel >= 1) - m_log.DebugFormat("[JOB ENGINE]: Processing job {0}", m_currentJob.Name); - - try - { - m_currentJob.Callback.Invoke(m_currentJob.O); - } - catch (Exception e) - { - m_log.Error( - string.Format( - "[JOB ENGINE]: Job {0} failed, continuing. Exception ", m_currentJob.Name), e); - } - - if (LogLevel >= 1) - m_log.DebugFormat("[JOB ENGINE]: Processed job {0}", m_currentJob.Name); - - m_currentJob = null; - } - } - catch (OperationCanceledException) - { - } - - m_finishedProcessingAfterStop.Set(); - } - - private void HandleControlCommand(string module, string[] args) - { -// if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene) -// return; - - if (args.Length < 3) - { - MainConsole.Instance.Output("Usage: debug jobengine "); - return; - } - - string subCommand = args[2]; - - if (subCommand == "stop") - { - Stop(); - MainConsole.Instance.OutputFormat("Stopped job engine."); - } - else if (subCommand == "start") - { - Start(); - MainConsole.Instance.OutputFormat("Started job engine."); - } - else if (subCommand == "status") - { - MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning); - MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none"); - MainConsole.Instance.OutputFormat( - "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a"); - MainConsole.Instance.OutputFormat("Log Level: {0}", LogLevel); - } - else if (subCommand == "log") - { -// int logLevel; - int logLevel = int.Parse(args[3]); -// if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel)) -// { - LogLevel = logLevel; - MainConsole.Instance.OutputFormat("Set debug log level to {0}", LogLevel); -// } - } - else - { - MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand); - } - } - } -} diff --git a/OpenSim/Framework/Monitoring/WorkManager.cs b/OpenSim/Framework/Monitoring/WorkManager.cs index 9d0eefc..134661b 100644 --- a/OpenSim/Framework/Monitoring/WorkManager.cs +++ b/OpenSim/Framework/Monitoring/WorkManager.cs @@ -57,7 +57,29 @@ namespace OpenSim.Framework.Monitoring static WorkManager() { - JobEngine = new JobEngine(); + JobEngine = new JobEngine("Non-blocking non-critical job engine", "JOB ENGINE"); + + StatsManager.RegisterStat( + new Stat( + "JobsWaiting", + "Number of jobs waiting for processing.", + "", + "", + "server", + "jobengine", + StatType.Pull, + MeasuresOfInterest.None, + stat => stat.Value = JobEngine.JobsWaiting, + StatVerbosity.Debug)); + + MainConsole.Instance.Commands.AddCommand( + "Debug", + false, + "debug jobengine", + "debug jobengine ", + "Start, stop, get status or set logging level of the job engine.", + "If stopped then all outstanding jobs are processed immediately.", + HandleControlCommand); } /// @@ -200,7 +222,7 @@ namespace OpenSim.Framework.Monitoring } if (JobEngine.IsRunning) - JobEngine.QueueRequest(name, callback, obj); + JobEngine.QueueJob(name, () => callback(obj)); else if (canRunInThisThread) callback(obj); else if (mustNotTimeout) @@ -208,5 +230,55 @@ namespace OpenSim.Framework.Monitoring else Util.FireAndForget(callback, obj, name); } + + private static void HandleControlCommand(string module, string[] args) + { + // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene) + // return; + + if (args.Length < 3) + { + MainConsole.Instance.Output("Usage: debug jobengine "); + return; + } + + string subCommand = args[2]; + + if (subCommand == "stop") + { + JobEngine.Stop(); + MainConsole.Instance.OutputFormat("Stopped job engine."); + } + else if (subCommand == "start") + { + JobEngine.Start(); + MainConsole.Instance.OutputFormat("Started job engine."); + } + else if (subCommand == "status") + { + MainConsole.Instance.OutputFormat("Job engine running: {0}", JobEngine.IsRunning); + + JobEngine.Job job = JobEngine.CurrentJob; + MainConsole.Instance.OutputFormat("Current job {0}", job != null ? job.Name : "none"); + + MainConsole.Instance.OutputFormat( + "Jobs waiting: {0}", JobEngine.IsRunning ? JobEngine.JobsWaiting.ToString() : "n/a"); + MainConsole.Instance.OutputFormat("Log Level: {0}", JobEngine.LogLevel); + } + else if (subCommand == "log") + { + // int logLevel; + int logLevel = int.Parse(args[3]); + // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel)) + // { + JobEngine.LogLevel = logLevel; + MainConsole.Instance.OutputFormat("Set debug log level to {0}", JobEngine.LogLevel); + // } + } + else + { + MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand); + } + } } } \ No newline at end of file -- cgit v1.1