From 134f86e8d5c414409631b25b8c6f0ee45fbd8631 Mon Sep 17 00:00:00 2001 From: David Walter Seikel Date: Thu, 3 Nov 2016 21:44:39 +1000 Subject: Initial update to OpenSim 0.8.2.1 source code. --- OpenSim/Framework/Monitoring/JobEngine.cs | 341 ++++++++++++++++++++++++++++++ 1 file changed, 341 insertions(+) create mode 100644 OpenSim/Framework/Monitoring/JobEngine.cs (limited to 'OpenSim/Framework/Monitoring/JobEngine.cs') diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs new file mode 100644 index 0000000..6db9a67 --- /dev/null +++ b/OpenSim/Framework/Monitoring/JobEngine.cs @@ -0,0 +1,341 @@ +/* + * Copyright (c) Contributors, http://opensimulator.org/ + * See CONTRIBUTORS.TXT for a full list of copyright holders. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the OpenSimulator Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +using System; +using System.Collections.Concurrent; +using System.Reflection; +using System.Threading; +using log4net; +using OpenSim.Framework; + +namespace OpenSim.Framework.Monitoring +{ + public class JobEngine + { + private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + + public int LogLevel { get; set; } + + public string Name { get; private set; } + + public string LoggingName { get; private set; } + + /// + /// Is this engine running? + /// + public bool IsRunning { get; private set; } + + /// + /// The current job that the engine is running. + /// + /// + /// Will be null if no job is currently running. + /// + public Job CurrentJob { get; private set; } + + /// + /// Number of jobs waiting to be processed. + /// + public int JobsWaiting { get { return m_jobQueue.Count; } } + + /// + /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping. + /// + public int RequestProcessTimeoutOnStop { get; set; } + + /// + /// Controls whether we need to warn in the log about exceeding the max queue size. + /// + /// + /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in + /// order to avoid spamming the log with lots of warnings. + /// + private bool m_warnOverMaxQueue = true; + + private BlockingCollection m_jobQueue; + + private CancellationTokenSource m_cancelSource; + + /// + /// Used to signal that we are ready to complete stop. + /// + private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); + + public JobEngine(string name, string loggingName) + { + Name = name; + LoggingName = loggingName; + + RequestProcessTimeoutOnStop = 5000; + } + + public void Start() + { + lock (this) + { + if (IsRunning) + return; + + IsRunning = true; + + m_finishedProcessingAfterStop.Reset(); + + m_jobQueue = new BlockingCollection(new ConcurrentQueue(), 5000); + m_cancelSource = new CancellationTokenSource(); + + WorkManager.StartThread( + ProcessRequests, + Name, + ThreadPriority.Normal, + false, + true, + null, + int.MaxValue); + } + } + + public void Stop() + { + lock (this) + { + try + { + if (!IsRunning) + return; + + IsRunning = false; + + int requestsLeft = m_jobQueue.Count; + + if (requestsLeft <= 0) + { + m_cancelSource.Cancel(); + } + else + { + m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft); + + while (requestsLeft > 0) + { + if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) + { + // After timeout no events have been written + if (requestsLeft == m_jobQueue.Count) + { + m_log.WarnFormat( + "[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests", + LoggingName, RequestProcessTimeoutOnStop, requestsLeft); + + break; + } + } + + requestsLeft = m_jobQueue.Count; + } + } + } + finally + { + m_cancelSource.Dispose(); + } + } + } + + /// + /// Make a job. + /// + /// + /// We provide this method to replace the constructor so that we can later pool job objects if necessary to + /// reduce memory churn. Normally one would directly call QueueJob() with parameters anyway. + /// + /// + /// Name. + /// Action. + /// Common identifier. + public static Job MakeJob(string name, Action action, string commonId = null) + { + return Job.MakeJob(name, action, commonId); + } + + /// + /// Remove the next job queued for processing. + /// + /// + /// Returns null if there is no next job. + /// Will not remove a job currently being performed. + /// + public Job RemoveNextJob() + { + Job nextJob; + m_jobQueue.TryTake(out nextJob); + + return nextJob; + } + + /// + /// Queue the job for processing. + /// + /// true, if job was queued, false otherwise. + /// Name of job. This appears on the console and in logging. + /// Action to perform. + /// + /// Common identifier for a set of jobs. This is allows a set of jobs to be removed + /// if required (e.g. all jobs for a given agent. Optional. + /// + public bool QueueJob(string name, Action action, string commonId = null) + { + return QueueJob(MakeJob(name, action, commonId)); + } + + /// + /// Queue the job for processing. + /// + /// true, if job was queued, false otherwise. + /// The job + /// + public bool QueueJob(Job job) + { + if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) + { + m_jobQueue.Add(job); + + if (!m_warnOverMaxQueue) + m_warnOverMaxQueue = true; + + return true; + } + else + { + if (m_warnOverMaxQueue) + { + m_log.WarnFormat( + "[{0}]: Job queue at maximum capacity, not recording job from {1} in {2}", + LoggingName, job.Name, Name); + + m_warnOverMaxQueue = false; + } + + return false; + } + } + + private void ProcessRequests() + { + try + { + while (IsRunning || m_jobQueue.Count > 0) + { + try + { + CurrentJob = m_jobQueue.Take(m_cancelSource.Token); + } + catch (ObjectDisposedException e) + { + // If we see this whilst not running then it may be due to a race where this thread checks + // IsRunning after the stopping thread sets it to false and disposes of the cancellation source. + if (IsRunning) + throw e; + else + break; + } + + if (LogLevel >= 1) + m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name); + + try + { + CurrentJob.Action(); + } + catch (Exception e) + { + m_log.Error( + string.Format( + "[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e); + } + + if (LogLevel >= 1) + m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name); + + CurrentJob = null; + } + } + catch (OperationCanceledException) + { + } + + m_finishedProcessingAfterStop.Set(); + } + + public class Job + { + /// + /// Name of the job. + /// + /// + /// This appears on console and debug output. + /// + public string Name { get; private set; } + + /// + /// Common ID for this job. + /// + /// + /// This allows all jobs with a certain common ID (e.g. a client UUID) to be removed en-masse if required. + /// Can be null if this is not required. + /// + public string CommonId { get; private set; } + + /// + /// Action to perform when this job is processed. + /// + public Action Action { get; private set; } + + private Job(string name, string commonId, Action action) + { + Name = name; + CommonId = commonId; + Action = action; + } + + /// + /// Make a job. It needs to be separately queued. + /// + /// + /// We provide this method to replace the constructor so that we can pool job objects if necessary to + /// to reduce memory churn. Normally one would directly call JobEngine.QueueJob() with parameters anyway. + /// + /// + /// Name. + /// Action. + /// Common identifier. + public static Job MakeJob(string name, Action action, string commonId = null) + { + return new Job(name, commonId, action); + } + } + } +} \ No newline at end of file -- cgit v1.1