/* * Copyright (c) Contributors, http://opensimulator.org/ * See CONTRIBUTORS.TXT for a full list of copyright holders. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of the OpenSimulator Project nor the * names of its contributors may be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ using System; using System.Collections.Concurrent; using System.Reflection; using System.Threading; using log4net; using OpenSim.Framework; namespace OpenSim.Framework.Monitoring { public class JobEngine { private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); public int LogLevel { get; set; } public string Name { get; private set; } public string LoggingName { get; private set; } /// /// Is this engine running? /// public bool IsRunning { get; private set; } /// /// The current job that the engine is running. /// /// /// Will be null if no job is currently running. /// public Job CurrentJob { get; private set; } /// /// Number of jobs waiting to be processed. /// public int JobsWaiting { get { return m_jobQueue.Count; } } /// /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping. /// public int RequestProcessTimeoutOnStop { get; set; } /// /// Controls whether we need to warn in the log about exceeding the max queue size. /// /// /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in /// order to avoid spamming the log with lots of warnings. /// private bool m_warnOverMaxQueue = true; private BlockingCollection m_jobQueue = new BlockingCollection(new ConcurrentQueue(), 5000); private CancellationTokenSource m_cancelSource; /// /// Used to signal that we are ready to complete stop. /// private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); public JobEngine(string name, string loggingName) { Name = name; LoggingName = loggingName; RequestProcessTimeoutOnStop = 5000; } public void Start() { lock (this) { if (IsRunning) return; IsRunning = true; m_finishedProcessingAfterStop.Reset(); m_cancelSource = new CancellationTokenSource(); WorkManager.StartThread( ProcessRequests, Name, ThreadPriority.Normal, false, true, null, int.MaxValue); } } public void Stop() { lock (this) { try { if (!IsRunning) return; IsRunning = false; int requestsLeft = m_jobQueue.Count; if (requestsLeft <= 0) { m_cancelSource.Cancel(); } else { m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft); while (requestsLeft > 0) { if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) { // After timeout no events have been written if (requestsLeft == m_jobQueue.Count) { m_log.WarnFormat( "[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests", LoggingName, RequestProcessTimeoutOnStop, requestsLeft); break; } } requestsLeft = m_jobQueue.Count; } } } finally { m_cancelSource.Dispose(); } } } /// /// Make a job. /// /// /// We provide this method to replace the constructor so that we can later pool job objects if necessary to /// reduce memory churn. Normally one would directly call QueueJob() with parameters anyway. /// /// /// Name. /// Action. /// Common identifier. public static Job MakeJob(string name, Action action, string commonId = null) { return Job.MakeJob(name, action, commonId); } /// /// Remove the next job queued for processing. /// /// /// Returns null if there is no next job. /// Will not remove a job currently being performed. /// public Job RemoveNextJob() { Job nextJob; m_jobQueue.TryTake(out nextJob); return nextJob; } /// /// Queue the job for processing. /// /// true, if job was queued, false otherwise. /// Name of job. This appears on the console and in logging. /// Action to perform. /// /// Common identifier for a set of jobs. This is allows a set of jobs to be removed /// if required (e.g. all jobs for a given agent. Optional. /// public bool QueueJob(string name, Action action, string commonId = null) { return QueueJob(MakeJob(name, action, commonId)); } /// /// Queue the job for processing. /// /// true, if job was queued, false otherwise. /// The job /// public bool QueueJob(Job job) { if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) { m_jobQueue.Add(job); if (!m_warnOverMaxQueue) m_warnOverMaxQueue = true; return true; } else { if (m_warnOverMaxQueue) { m_log.WarnFormat( "[{0}]: Job queue at maximum capacity, not recording job from {1} in {2}", LoggingName, job.Name, Name); m_warnOverMaxQueue = false; } return false; } } private void ProcessRequests() { try { while (IsRunning || m_jobQueue.Count > 0) { try { CurrentJob = m_jobQueue.Take(m_cancelSource.Token); } catch (ObjectDisposedException e) { // If we see this whilst not running then it may be due to a race where this thread checks // IsRunning after the stopping thread sets it to false and disposes of the cancellation source. if (IsRunning) throw e; else break; } if (LogLevel >= 1) m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name); try { CurrentJob.Action(); } catch (Exception e) { m_log.Error( string.Format( "[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e); } if (LogLevel >= 1) m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name); CurrentJob = null; } } catch (OperationCanceledException) { } m_finishedProcessingAfterStop.Set(); } public class Job { /// /// Name of the job. /// /// /// This appears on console and debug output. /// public string Name { get; private set; } /// /// Common ID for this job. /// /// /// This allows all jobs with a certain common ID (e.g. a client UUID) to be removed en-masse if required. /// Can be null if this is not required. /// public string CommonId { get; private set; } /// /// Action to perform when this job is processed. /// public Action Action { get; private set; } private Job(string name, string commonId, Action action) { Name = name; CommonId = commonId; Action = action; } /// /// Make a job. It needs to be separately queued. /// /// /// We provide this method to replace the constructor so that we can pool job objects if necessary to /// to reduce memory churn. Normally one would directly call JobEngine.QueueJob() with parameters anyway. /// /// /// Name. /// Action. /// Common identifier. public static Job MakeJob(string name, Action action, string commonId = null) { return new Job(name, commonId, action); } } } }