From 8e1e8a0920a9e94305619e9afb8e053b4daefb89 Mon Sep 17 00:00:00 2001
From: Justin Clark-Casey (justincc)
Date: Mon, 12 Jan 2015 20:56:37 +0000
Subject: Make the performance controlling job processing threads introduced in
conference code use a generic JobEngine class rather than 4 slightly
different copy/pasted versions.
---
OpenSim/Framework/Monitoring/JobEngine.cs | 320 ----------------------------
OpenSim/Framework/Monitoring/WorkManager.cs | 76 ++++++-
2 files changed, 74 insertions(+), 322 deletions(-)
delete mode 100644 OpenSim/Framework/Monitoring/JobEngine.cs
(limited to 'OpenSim/Framework')
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs
deleted file mode 100644
index 5925867..0000000
--- a/OpenSim/Framework/Monitoring/JobEngine.cs
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Copyright (c) Contributors, http://opensimulator.org/
- * See CONTRIBUTORS.TXT for a full list of copyright holders.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- * * Neither the name of the OpenSimulator Project nor the
- * names of its contributors may be used to endorse or promote products
- * derived from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
- * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
- * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Reflection;
-using System.Threading;
-using log4net;
-using OpenSim.Framework;
-
-namespace OpenSim.Framework.Monitoring
-{
- public class Job
- {
- public string Name;
- public WaitCallback Callback;
- public object O;
-
- public Job(string name, WaitCallback callback, object o)
- {
- Name = name;
- Callback = callback;
- O = o;
- }
- }
-
- public class JobEngine
- {
- private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
-
- public int LogLevel { get; set; }
-
- public bool IsRunning { get; private set; }
-
- ///
- /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
- ///
- public int RequestProcessTimeoutOnStop { get; set; }
-
- ///
- /// Controls whether we need to warn in the log about exceeding the max queue size.
- ///
- ///
- /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
- /// order to avoid spamming the log with lots of warnings.
- ///
- private bool m_warnOverMaxQueue = true;
-
- private BlockingCollection m_requestQueue;
-
- private CancellationTokenSource m_cancelSource = new CancellationTokenSource();
-
- private Stat m_requestsWaitingStat;
-
- private Job m_currentJob;
-
- ///
- /// Used to signal that we are ready to complete stop.
- ///
- private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
-
- public JobEngine()
- {
- RequestProcessTimeoutOnStop = 5000;
-
- MainConsole.Instance.Commands.AddCommand(
- "Debug",
- false,
- "debug jobengine",
- "debug jobengine ",
- "Start, stop, get status or set logging level of the job engine.",
- "If stopped then all outstanding jobs are processed immediately.",
- HandleControlCommand);
- }
-
- public void Start()
- {
- lock (this)
- {
- if (IsRunning)
- return;
-
- IsRunning = true;
-
- m_finishedProcessingAfterStop.Reset();
-
- m_requestQueue = new BlockingCollection(new ConcurrentQueue(), 5000);
-
- m_requestsWaitingStat =
- new Stat(
- "JobsWaiting",
- "Number of jobs waiting for processing.",
- "",
- "",
- "server",
- "jobengine",
- StatType.Pull,
- MeasuresOfInterest.None,
- stat => stat.Value = m_requestQueue.Count,
- StatVerbosity.Debug);
-
- StatsManager.RegisterStat(m_requestsWaitingStat);
-
- WorkManager.StartThread(
- ProcessRequests,
- "JobEngineThread",
- ThreadPriority.Normal,
- false,
- true,
- null,
- int.MaxValue);
- }
- }
-
- public void Stop()
- {
- lock (this)
- {
- try
- {
- if (!IsRunning)
- return;
-
- IsRunning = false;
-
- int requestsLeft = m_requestQueue.Count;
-
- if (requestsLeft <= 0)
- {
- m_cancelSource.Cancel();
- }
- else
- {
- m_log.InfoFormat("[JOB ENGINE]: Waiting to write {0} events after stop.", requestsLeft);
-
- while (requestsLeft > 0)
- {
- if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
- {
- // After timeout no events have been written
- if (requestsLeft == m_requestQueue.Count)
- {
- m_log.WarnFormat(
- "[JOB ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests",
- RequestProcessTimeoutOnStop, requestsLeft);
-
- break;
- }
- }
-
- requestsLeft = m_requestQueue.Count;
- }
- }
- }
- finally
- {
- m_cancelSource.Dispose();
- StatsManager.DeregisterStat(m_requestsWaitingStat);
- m_requestsWaitingStat = null;
- m_requestQueue = null;
- }
- }
- }
-
- public bool QueueRequest(string name, WaitCallback req, object o)
- {
- if (LogLevel >= 1)
- m_log.DebugFormat("[JOB ENGINE]: Queued job {0}", name);
-
- if (m_requestQueue.Count < m_requestQueue.BoundedCapacity)
- {
- // m_log.DebugFormat(
- // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}",
- // categories, client.AgentID, m_udpServer.Scene.Name);
-
- m_requestQueue.Add(new Job(name, req, o));
-
- if (!m_warnOverMaxQueue)
- m_warnOverMaxQueue = true;
-
- return true;
- }
- else
- {
- if (m_warnOverMaxQueue)
- {
-// m_log.WarnFormat(
-// "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}",
-// client.AgentID, m_udpServer.Scene.Name);
-
- m_log.WarnFormat("[JOB ENGINE]: Request queue at maximum capacity, not recording job");
-
- m_warnOverMaxQueue = false;
- }
-
- return false;
- }
- }
-
- private void ProcessRequests()
- {
- try
- {
- while (IsRunning || m_requestQueue.Count > 0)
- {
- m_currentJob = m_requestQueue.Take(m_cancelSource.Token);
-
- // QueueEmpty callback = req.Client.OnQueueEmpty;
- //
- // if (callback != null)
- // {
- // try
- // {
- // callback(req.Categories);
- // }
- // catch (Exception e)
- // {
- // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e);
- // }
- // }
-
- if (LogLevel >= 1)
- m_log.DebugFormat("[JOB ENGINE]: Processing job {0}", m_currentJob.Name);
-
- try
- {
- m_currentJob.Callback.Invoke(m_currentJob.O);
- }
- catch (Exception e)
- {
- m_log.Error(
- string.Format(
- "[JOB ENGINE]: Job {0} failed, continuing. Exception ", m_currentJob.Name), e);
- }
-
- if (LogLevel >= 1)
- m_log.DebugFormat("[JOB ENGINE]: Processed job {0}", m_currentJob.Name);
-
- m_currentJob = null;
- }
- }
- catch (OperationCanceledException)
- {
- }
-
- m_finishedProcessingAfterStop.Set();
- }
-
- private void HandleControlCommand(string module, string[] args)
- {
-// if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
-// return;
-
- if (args.Length < 3)
- {
- MainConsole.Instance.Output("Usage: debug jobengine ");
- return;
- }
-
- string subCommand = args[2];
-
- if (subCommand == "stop")
- {
- Stop();
- MainConsole.Instance.OutputFormat("Stopped job engine.");
- }
- else if (subCommand == "start")
- {
- Start();
- MainConsole.Instance.OutputFormat("Started job engine.");
- }
- else if (subCommand == "status")
- {
- MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning);
- MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none");
- MainConsole.Instance.OutputFormat(
- "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a");
- MainConsole.Instance.OutputFormat("Log Level: {0}", LogLevel);
- }
- else if (subCommand == "log")
- {
-// int logLevel;
- int logLevel = int.Parse(args[3]);
-// if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel))
-// {
- LogLevel = logLevel;
- MainConsole.Instance.OutputFormat("Set debug log level to {0}", LogLevel);
-// }
- }
- else
- {
- MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand);
- }
- }
- }
-}
diff --git a/OpenSim/Framework/Monitoring/WorkManager.cs b/OpenSim/Framework/Monitoring/WorkManager.cs
index 9d0eefc..134661b 100644
--- a/OpenSim/Framework/Monitoring/WorkManager.cs
+++ b/OpenSim/Framework/Monitoring/WorkManager.cs
@@ -57,7 +57,29 @@ namespace OpenSim.Framework.Monitoring
static WorkManager()
{
- JobEngine = new JobEngine();
+ JobEngine = new JobEngine("Non-blocking non-critical job engine", "JOB ENGINE");
+
+ StatsManager.RegisterStat(
+ new Stat(
+ "JobsWaiting",
+ "Number of jobs waiting for processing.",
+ "",
+ "",
+ "server",
+ "jobengine",
+ StatType.Pull,
+ MeasuresOfInterest.None,
+ stat => stat.Value = JobEngine.JobsWaiting,
+ StatVerbosity.Debug));
+
+ MainConsole.Instance.Commands.AddCommand(
+ "Debug",
+ false,
+ "debug jobengine",
+ "debug jobengine ",
+ "Start, stop, get status or set logging level of the job engine.",
+ "If stopped then all outstanding jobs are processed immediately.",
+ HandleControlCommand);
}
///
@@ -200,7 +222,7 @@ namespace OpenSim.Framework.Monitoring
}
if (JobEngine.IsRunning)
- JobEngine.QueueRequest(name, callback, obj);
+ JobEngine.QueueJob(name, () => callback(obj));
else if (canRunInThisThread)
callback(obj);
else if (mustNotTimeout)
@@ -208,5 +230,55 @@ namespace OpenSim.Framework.Monitoring
else
Util.FireAndForget(callback, obj, name);
}
+
+ private static void HandleControlCommand(string module, string[] args)
+ {
+ // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
+ // return;
+
+ if (args.Length < 3)
+ {
+ MainConsole.Instance.Output("Usage: debug jobengine ");
+ return;
+ }
+
+ string subCommand = args[2];
+
+ if (subCommand == "stop")
+ {
+ JobEngine.Stop();
+ MainConsole.Instance.OutputFormat("Stopped job engine.");
+ }
+ else if (subCommand == "start")
+ {
+ JobEngine.Start();
+ MainConsole.Instance.OutputFormat("Started job engine.");
+ }
+ else if (subCommand == "status")
+ {
+ MainConsole.Instance.OutputFormat("Job engine running: {0}", JobEngine.IsRunning);
+
+ JobEngine.Job job = JobEngine.CurrentJob;
+ MainConsole.Instance.OutputFormat("Current job {0}", job != null ? job.Name : "none");
+
+ MainConsole.Instance.OutputFormat(
+ "Jobs waiting: {0}", JobEngine.IsRunning ? JobEngine.JobsWaiting.ToString() : "n/a");
+ MainConsole.Instance.OutputFormat("Log Level: {0}", JobEngine.LogLevel);
+ }
+ else if (subCommand == "log")
+ {
+ // int logLevel;
+ int logLevel = int.Parse(args[3]);
+ // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel))
+ // {
+ JobEngine.LogLevel = logLevel;
+ MainConsole.Instance.OutputFormat("Set debug log level to {0}", JobEngine.LogLevel);
+ // }
+ }
+ else
+ {
+ MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand);
+ }
+ }
}
}
\ No newline at end of file
--
cgit v1.1