From 0c31eb0a5d0a124e0cd7c3db53464e62e87b8fb3 Mon Sep 17 00:00:00 2001
From: Justin Clark-Casey (justincc)
Date: Mon, 12 Jan 2015 20:59:58 +0000
Subject: Add the missing deleted and re-added JobEngine class from the
previous commit 8e1e8a0
---
OpenSim/Framework/Monitoring/JobEngine.cs | 329 ++++++++++++++++++++++++++++++
1 file changed, 329 insertions(+)
create mode 100644 OpenSim/Framework/Monitoring/JobEngine.cs
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs
new file mode 100644
index 0000000..44f5d9a
--- /dev/null
+++ b/OpenSim/Framework/Monitoring/JobEngine.cs
@@ -0,0 +1,329 @@
+/*
+ * Copyright (c) Contributors, http://opensimulator.org/
+ * See CONTRIBUTORS.TXT for a full list of copyright holders.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of the OpenSimulator Project nor the
+ * names of its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Reflection;
+using System.Threading;
+using log4net;
+using OpenSim.Framework;
+
+namespace OpenSim.Framework.Monitoring
+{
+ public class JobEngine
+ {
+ private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ public int LogLevel { get; set; }
+
+ public string Name { get; private set; }
+
+ public string LoggingName { get; private set; }
+
+ ///
+ /// Is this engine running?
+ ///
+ public bool IsRunning { get; private set; }
+
+ ///
+ /// The current job that the engine is running.
+ ///
+ ///
+ /// Will be null if no job is currently running.
+ ///
+ public Job CurrentJob { get; private set; }
+
+ ///
+ /// Number of jobs waiting to be processed.
+ ///
+ public int JobsWaiting { get { return m_jobQueue.Count; } }
+
+ ///
+ /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
+ ///
+ public int RequestProcessTimeoutOnStop { get; set; }
+
+ ///
+ /// Controls whether we need to warn in the log about exceeding the max queue size.
+ ///
+ ///
+ /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
+ /// order to avoid spamming the log with lots of warnings.
+ ///
+ private bool m_warnOverMaxQueue = true;
+
+ private BlockingCollection m_jobQueue;
+
+ private CancellationTokenSource m_cancelSource = new CancellationTokenSource();
+
+ ///
+ /// Used to signal that we are ready to complete stop.
+ ///
+ private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
+
+ public JobEngine(string name, string loggingName)
+ {
+ Name = name;
+ LoggingName = loggingName;
+
+ RequestProcessTimeoutOnStop = 5000;
+ }
+
+ public void Start()
+ {
+ lock (this)
+ {
+ if (IsRunning)
+ return;
+
+ IsRunning = true;
+
+ m_finishedProcessingAfterStop.Reset();
+
+ m_jobQueue = new BlockingCollection(new ConcurrentQueue(), 5000);
+
+ WorkManager.StartThread(
+ ProcessRequests,
+ Name,
+ ThreadPriority.Normal,
+ false,
+ true,
+ null,
+ int.MaxValue);
+ }
+ }
+
+ public void Stop()
+ {
+ lock (this)
+ {
+ try
+ {
+ if (!IsRunning)
+ return;
+
+ IsRunning = false;
+
+ int requestsLeft = m_jobQueue.Count;
+
+ if (requestsLeft <= 0)
+ {
+ m_cancelSource.Cancel();
+ }
+ else
+ {
+ m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft);
+
+ while (requestsLeft > 0)
+ {
+ if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
+ {
+ // After timeout no events have been written
+ if (requestsLeft == m_jobQueue.Count)
+ {
+ m_log.WarnFormat(
+ "[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests",
+ LoggingName, RequestProcessTimeoutOnStop, requestsLeft);
+
+ break;
+ }
+ }
+
+ requestsLeft = m_jobQueue.Count;
+ }
+ }
+ }
+ finally
+ {
+ m_cancelSource.Dispose();
+ m_jobQueue = null;
+ }
+ }
+ }
+
+ ///
+ /// Make a job.
+ ///
+ ///
+ /// We provide this method to replace the constructor so that we can later pool job objects if necessary to
+ /// reduce memory churn. Normally one would directly call QueueJob() with parameters anyway.
+ ///
+ ///
+ /// Name.
+ /// Action.
+ /// Common identifier.
+ public static Job MakeJob(string name, Action action, string commonId = null)
+ {
+ return Job.MakeJob(name, action, commonId);
+ }
+
+ ///
+ /// Remove the next job queued for processing.
+ ///
+ ///
+ /// Returns null if there is no next job.
+ /// Will not remove a job currently being performed.
+ ///
+ public Job RemoveNextJob()
+ {
+ Job nextJob;
+ m_jobQueue.TryTake(out nextJob);
+
+ return nextJob;
+ }
+
+ ///
+ /// Queue the job for processing.
+ ///
+ /// true, if job was queued, false otherwise.
+ /// Name of job. This appears on the console and in logging.
+ /// Action to perform.
+ ///
+ /// Common identifier for a set of jobs. This is allows a set of jobs to be removed
+ /// if required (e.g. all jobs for a given agent. Optional.
+ ///
+ public bool QueueJob(string name, Action action, string commonId = null)
+ {
+ return QueueJob(MakeJob(name, action, commonId));
+ }
+
+ ///
+ /// Queue the job for processing.
+ ///
+ /// true, if job was queued, false otherwise.
+ /// The job
+ ///
+ public bool QueueJob(Job job)
+ {
+ if (m_jobQueue.Count < m_jobQueue.BoundedCapacity)
+ {
+ m_jobQueue.Add(job);
+
+ if (!m_warnOverMaxQueue)
+ m_warnOverMaxQueue = true;
+
+ return true;
+ }
+ else
+ {
+ if (m_warnOverMaxQueue)
+ {
+ m_log.WarnFormat(
+ "[{0}]: Job queue at maximum capacity, not recording job from {1} in {2}",
+ LoggingName, job.Name, Name);
+
+ m_warnOverMaxQueue = false;
+ }
+
+ return false;
+ }
+ }
+
+ private void ProcessRequests()
+ {
+ try
+ {
+ while (IsRunning || m_jobQueue.Count > 0)
+ {
+ CurrentJob = m_jobQueue.Take(m_cancelSource.Token);
+
+ if (LogLevel >= 1)
+ m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name);
+
+ try
+ {
+ CurrentJob.Action();
+ }
+ catch (Exception e)
+ {
+ m_log.Error(
+ string.Format(
+ "[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e);
+ }
+
+ if (LogLevel >= 1)
+ m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name);
+
+ CurrentJob = null;
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ }
+
+ m_finishedProcessingAfterStop.Set();
+ }
+
+ public class Job
+ {
+ ///
+ /// Name of the job.
+ ///
+ ///
+ /// This appears on console and debug output.
+ ///
+ public string Name { get; private set; }
+
+ ///
+ /// Common ID for this job.
+ ///
+ ///
+ /// This allows all jobs with a certain common ID (e.g. a client UUID) to be removed en-masse if required.
+ /// Can be null if this is not required.
+ ///
+ public string CommonId { get; private set; }
+
+ ///
+ /// Action to perform when this job is processed.
+ ///
+ public Action Action { get; private set; }
+
+ private Job(string name, string commonId, Action action)
+ {
+ Name = name;
+ CommonId = commonId;
+ Action = action;
+ }
+
+ ///
+ /// Make a job. It needs to be separately queued.
+ ///
+ ///
+ /// We provide this method to replace the constructor so that we can pool job objects if necessary to
+ /// to reduce memory churn. Normally one would directly call JobEngine.QueueJob() with parameters anyway.
+ ///
+ ///
+ /// Name.
+ /// Action.
+ /// Common identifier.
+ public static Job MakeJob(string name, Action action, string commonId = null)
+ {
+ return new Job(name, commonId, action);
+ }
+ }
+ }
+}
\ No newline at end of file
--
cgit v1.1