From 8e1e8a0920a9e94305619e9afb8e053b4daefb89 Mon Sep 17 00:00:00 2001
From: Justin Clark-Casey (justincc)
Date: Mon, 12 Jan 2015 20:56:37 +0000
Subject: Make the performance controlling job processing threads introduced in
conference code use a generic JobEngine class rather than 4 slightly
different copy/pasted versions.
---
.../EntityTransfer/HGEntityTransferModule.cs | 41 ++-
.../EntityTransfer/HGIncomingSceneObjectEngine.cs | 344 ---------------------
2 files changed, 29 insertions(+), 356 deletions(-)
delete mode 100644 OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs
(limited to 'OpenSim/Region/CoreModules')
diff --git a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs
index fceda80..fa23590 100644
--- a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs
+++ b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGEntityTransferModule.cs
@@ -31,6 +31,7 @@ using System.Reflection;
using OpenSim.Framework;
using OpenSim.Framework.Client;
+using OpenSim.Framework.Monitoring;
using OpenSim.Region.Framework.Interfaces;
using OpenSim.Region.Framework.Scenes;
using OpenSim.Services.Connectors.Hypergrid;
@@ -113,7 +114,7 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
///
/// Used for processing analysis of incoming attachments in a controlled fashion.
///
- private HGIncomingSceneObjectEngine m_incomingSceneObjectEngine;
+ private JobEngine m_incomingSceneObjectEngine;
#region ISharedRegionModule
@@ -160,7 +161,24 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
scene.RegisterModuleInterface(this);
//scene.EventManager.OnIncomingSceneObject += OnIncomingSceneObject;
- m_incomingSceneObjectEngine = new HGIncomingSceneObjectEngine(scene.Name);
+ m_incomingSceneObjectEngine
+ = new JobEngine(
+ string.Format("HG Incoming Scene Object Engine ({0})", scene.Name),
+ "HG INCOMING SCENE OBJECT ENGINE");
+
+ StatsManager.RegisterStat(
+ new Stat(
+ "HGIncomingAttachmentsWaiting",
+ "Number of incoming attachments waiting for processing.",
+ "",
+ "",
+ "entitytransfer",
+ Name,
+ StatType.Pull,
+ MeasuresOfInterest.None,
+ stat => stat.Value = m_incomingSceneObjectEngine.JobsWaiting,
+ StatVerbosity.Debug));
+
m_incomingSceneObjectEngine.Start();
}
}
@@ -548,11 +566,11 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
private void RemoveIncomingSceneObjectJobs(string commonIdToRemove)
{
- List jobsToReinsert = new List();
+ List jobsToReinsert = new List();
int jobsRemoved = 0;
- Job job;
- while ((job = m_incomingSceneObjectEngine.RemoveNextRequest()) != null)
+ JobEngine.Job job;
+ while ((job = m_incomingSceneObjectEngine.RemoveNextJob()) != null)
{
if (job.CommonId != commonIdToRemove)
jobsToReinsert.Add(job);
@@ -566,8 +584,8 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
if (jobsToReinsert.Count > 0)
{
- foreach (Job jobToReinsert in jobsToReinsert)
- m_incomingSceneObjectEngine.QueueRequest(jobToReinsert);
+ foreach (JobEngine.Job jobToReinsert in jobsToReinsert)
+ m_incomingSceneObjectEngine.QueueJob(jobToReinsert);
}
}
@@ -594,10 +612,9 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
{
if (aCircuit.ServiceURLs != null && aCircuit.ServiceURLs.ContainsKey("AssetServerURI"))
{
- m_incomingSceneObjectEngine.QueueRequest(
+ m_incomingSceneObjectEngine.QueueJob(
string.Format("HG UUID Gather for attachment {0} for {1}", so.Name, aCircuit.Name),
- so.OwnerID.ToString(),
- o =>
+ () =>
{
string url = aCircuit.ServiceURLs["AssetServerURI"].ToString();
// m_log.DebugFormat(
@@ -663,8 +680,8 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
// m_log.DebugFormat(
// "[HG ENTITY TRANSFER MODULE]: Completed incoming attachment {0} for HG user {1} with asset server {2}",
// so.Name, so.OwnerID, url);
- },
- null);
+ },
+ so.OwnerID.ToString());
}
}
}
diff --git a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs
deleted file mode 100644
index f62e7f4..0000000
--- a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Copyright (c) Contributors, http://opensimulator.org/
- * See CONTRIBUTORS.TXT for a full list of copyright holders.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- * * Neither the name of the OpenSimulator Project nor the
- * names of its contributors may be used to endorse or promote products
- * derived from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
- * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
- * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Reflection;
-using System.Threading;
-using log4net;
-using OpenSim.Framework;
-using OpenSim.Framework.Monitoring;
-using OpenSim.Region.Framework.Scenes;
-
-namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
-{
- public class Job
- {
- public string Name { get; private set; }
- public string CommonId { get; private set; }
- public WaitCallback Callback { get; private set; }
- public object O { get; private set; }
-
- public Job(string name, string commonId, WaitCallback callback, object o)
- {
- Name = name;
- CommonId = commonId;
- Callback = callback;
- O = o;
- }
- }
-
- // TODO: These kinds of classes MUST be generalized with JobEngine, etc.
- public class HGIncomingSceneObjectEngine
- {
- private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
-
- public int LogLevel { get; set; }
-
- public bool IsRunning { get; private set; }
-
- public string Name { get; set; }
-
- ///
- /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
- ///
- public int RequestProcessTimeoutOnStop { get; set; }
-
- ///
- /// Controls whether we need to warn in the log about exceeding the max queue size.
- ///
- ///
- /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
- /// order to avoid spamming the log with lots of warnings.
- ///
- private bool m_warnOverMaxQueue = true;
-
- private BlockingCollection m_requestQueue;
-
- private CancellationTokenSource m_cancelSource = new CancellationTokenSource();
-
- private Stat m_requestsWaitingStat;
-
- private Job m_currentJob;
-
- ///
- /// Used to signal that we are ready to complete stop.
- ///
- private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
-
- public HGIncomingSceneObjectEngine(string name)
- {
-// LogLevel = 1;
- Name = name;
- RequestProcessTimeoutOnStop = 5000;
-
-// MainConsole.Instance.Commands.AddCommand(
-// "Debug",
-// false,
-// "debug jobengine",
-// "debug jobengine ",
-// "Start, stop or get status of the job engine.",
-// "If stopped then all jobs are processed immediately.",
-// HandleControlCommand);
- }
-
- public void Start()
- {
- lock (this)
- {
- if (IsRunning)
- return;
-
- IsRunning = true;
-
- m_finishedProcessingAfterStop.Reset();
-
- m_requestQueue = new BlockingCollection(new ConcurrentQueue(), 5000);
-
- m_requestsWaitingStat =
- new Stat(
- "HGIncomingAttachmentsWaiting",
- "Number of incoming attachments waiting for processing.",
- "",
- "",
- "entitytransfer",
- Name,
- StatType.Pull,
- MeasuresOfInterest.None,
- stat => stat.Value = m_requestQueue.Count,
- StatVerbosity.Debug);
-
- StatsManager.RegisterStat(m_requestsWaitingStat);
-
- WorkManager.StartThread(
- ProcessRequests,
- string.Format("HG Incoming Scene Object Engine Thread ({0})", Name),
- ThreadPriority.Normal,
- false,
- true,
- null,
- int.MaxValue);
- }
- }
-
- public void Stop()
- {
- lock (this)
- {
- try
- {
- if (!IsRunning)
- return;
-
- IsRunning = false;
-
- int requestsLeft = m_requestQueue.Count;
-
- if (requestsLeft <= 0)
- {
- m_cancelSource.Cancel();
- }
- else
- {
- m_log.InfoFormat("[HG INCOMING SCENE OBJECT ENGINE]: Waiting to write {0} events after stop.", requestsLeft);
-
- while (requestsLeft > 0)
- {
- if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
- {
- // After timeout no events have been written
- if (requestsLeft == m_requestQueue.Count)
- {
- m_log.WarnFormat(
- "[HG INCOMING SCENE OBJECT ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests",
- RequestProcessTimeoutOnStop, requestsLeft);
-
- break;
- }
- }
-
- requestsLeft = m_requestQueue.Count;
- }
- }
- }
- finally
- {
- m_cancelSource.Dispose();
- StatsManager.DeregisterStat(m_requestsWaitingStat);
- m_requestsWaitingStat = null;
- m_requestQueue = null;
- }
- }
- }
-
- public Job RemoveNextRequest()
- {
- Job nextRequest;
- m_requestQueue.TryTake(out nextRequest);
-
- return nextRequest;
- }
-
- public bool QueueRequest(string name, string commonId, WaitCallback req, object o)
- {
- return QueueRequest(new Job(name, commonId, req, o));
- }
-
- public bool QueueRequest(Job job)
- {
- if (LogLevel >= 1)
- m_log.DebugFormat(
- "[HG INCOMING SCENE OBJECT ENGINE]: Queued job {0}, common ID {1}", job.Name, job.CommonId);
-
- if (m_requestQueue.Count < m_requestQueue.BoundedCapacity)
- {
- // m_log.DebugFormat(
- // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}",
- // categories, client.AgentID, m_udpServer.Scene.Name);
-
- m_requestQueue.Add(job);
-
- if (!m_warnOverMaxQueue)
- m_warnOverMaxQueue = true;
-
- return true;
- }
- else
- {
- if (m_warnOverMaxQueue)
- {
- // m_log.WarnFormat(
- // "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}",
- // client.AgentID, m_udpServer.Scene.Name);
-
- m_log.WarnFormat("[HG INCOMING SCENE OBJECT ENGINE]: Request queue at maximum capacity, not recording job");
-
- m_warnOverMaxQueue = false;
- }
-
- return false;
- }
- }
-
- private void ProcessRequests()
- {
- try
- {
- while (IsRunning || m_requestQueue.Count > 0)
- {
- m_currentJob = m_requestQueue.Take(m_cancelSource.Token);
-
- // QueueEmpty callback = req.Client.OnQueueEmpty;
- //
- // if (callback != null)
- // {
- // try
- // {
- // callback(req.Categories);
- // }
- // catch (Exception e)
- // {
- // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e);
- // }
- // }
-
- if (LogLevel >= 1)
- m_log.DebugFormat("[HG INCOMING SCENE OBJECT ENGINE]: Processing job {0}", m_currentJob.Name);
-
- try
- {
- m_currentJob.Callback.Invoke(m_currentJob.O);
- }
- catch (Exception e)
- {
- m_log.Error(
- string.Format(
- "[HG INCOMING SCENE OBJECT ENGINE]: Job {0} failed, continuing. Exception ", m_currentJob.Name), e);
- }
-
- if (LogLevel >= 1)
- m_log.DebugFormat("[HG INCOMING SCENE OBJECT ENGINE]: Processed job {0}", m_currentJob.Name);
-
- m_currentJob = null;
- }
- }
- catch (OperationCanceledException)
- {
- }
-
- m_finishedProcessingAfterStop.Set();
- }
-
-// private void HandleControlCommand(string module, string[] args)
-// {
-// // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
-// // return;
-//
-// if (args.Length < 3)
-// {
-// MainConsole.Instance.Output("Usage: debug jobengine ");
-// return;
-// }
-//
-// string subCommand = args[2];
-//
-// if (subCommand == "stop")
-// {
-// Stop();
-// MainConsole.Instance.OutputFormat("Stopped job engine.");
-// }
-// else if (subCommand == "start")
-// {
-// Start();
-// MainConsole.Instance.OutputFormat("Started job engine.");
-// }
-// else if (subCommand == "status")
-// {
-// MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning);
-// MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none");
-// MainConsole.Instance.OutputFormat(
-// "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a");
-// MainConsole.Instance.OutputFormat("Log Level: {0}", LogLevel);
-// }
-//
-// else if (subCommand == "loglevel")
-// {
-// // int logLevel;
-// int logLevel = int.Parse(args[3]);
-// // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel))
-// // {
-// LogLevel = logLevel;
-// MainConsole.Instance.OutputFormat("Set log level to {0}", LogLevel);
-// // }
-// }
-// else
-// {
-// MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand);
-// }
-// }
- }
-}
--
cgit v1.1