From 5e4d6cab00cb29cd088ab7b62ab13aff103b64cb Mon Sep 17 00:00:00 2001 From: onefang Date: Sun, 19 May 2019 21:24:15 +1000 Subject: Dump OpenSim 0.9.0.1 into it's own branch. --- OpenSim/Framework/Monitoring/JobEngine.cs | 162 +++++++++++++----------------- 1 file changed, 71 insertions(+), 91 deletions(-) (limited to 'OpenSim/Framework/Monitoring/JobEngine.cs') diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs index 6db9a67..115871e 100644 --- a/OpenSim/Framework/Monitoring/JobEngine.cs +++ b/OpenSim/Framework/Monitoring/JobEngine.cs @@ -40,6 +40,8 @@ namespace OpenSim.Framework.Monitoring public int LogLevel { get; set; } + private object JobLock = new object(); + public string Name { get; private set; } public string LoggingName { get; private set; } @@ -47,7 +49,7 @@ namespace OpenSim.Framework.Monitoring /// /// Is this engine running? /// - public bool IsRunning { get; private set; } + public bool IsRunning { get; private set; } /// /// The current job that the engine is running. @@ -55,7 +57,8 @@ namespace OpenSim.Framework.Monitoring /// /// Will be null if no job is currently running. /// - public Job CurrentJob { get; private set; } + private Job m_currentJob; + public Job CurrentJob { get { return m_currentJob;} } /// /// Number of jobs waiting to be processed. @@ -71,96 +74,64 @@ namespace OpenSim.Framework.Monitoring /// Controls whether we need to warn in the log about exceeding the max queue size. /// /// - /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in + /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in /// order to avoid spamming the log with lots of warnings. /// private bool m_warnOverMaxQueue = true; - private BlockingCollection m_jobQueue; + private BlockingCollection m_jobQueue = new BlockingCollection(new ConcurrentQueue(), 5000); private CancellationTokenSource m_cancelSource; - /// - /// Used to signal that we are ready to complete stop. - /// - private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); + private int m_timeout = -1; - public JobEngine(string name, string loggingName) + private bool m_threadRunnig = false; + + public JobEngine(string name, string loggingName, int timeout = -1) { Name = name; LoggingName = loggingName; - + m_timeout = timeout; RequestProcessTimeoutOnStop = 5000; } public void Start() { - lock (this) + lock (JobLock) { if (IsRunning) return; IsRunning = true; - m_finishedProcessingAfterStop.Reset(); - - m_jobQueue = new BlockingCollection(new ConcurrentQueue(), 5000); m_cancelSource = new CancellationTokenSource(); - - WorkManager.StartThread( - ProcessRequests, - Name, - ThreadPriority.Normal, - false, - true, - null, - int.MaxValue); + WorkManager.RunInThreadPool(ProcessRequests, null, Name, false); + m_threadRunnig = true; } } public void Stop() - { - lock (this) + { + lock (JobLock) { try { if (!IsRunning) return; - IsRunning = false; + m_log.DebugFormat("[JobEngine] Stopping {0}", Name); - int requestsLeft = m_jobQueue.Count; - - if (requestsLeft <= 0) + IsRunning = false; + if(m_threadRunnig) { m_cancelSource.Cancel(); - } - else - { - m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft); - - while (requestsLeft > 0) - { - if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) - { - // After timeout no events have been written - if (requestsLeft == m_jobQueue.Count) - { - m_log.WarnFormat( - "[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests", - LoggingName, RequestProcessTimeoutOnStop, requestsLeft); - - break; - } - } - - requestsLeft = m_jobQueue.Count; - } + m_threadRunnig = false; } } finally { - m_cancelSource.Dispose(); + if(m_cancelSource != null) + m_cancelSource.Dispose(); } } } @@ -169,7 +140,7 @@ namespace OpenSim.Framework.Monitoring /// Make a job. /// /// - /// We provide this method to replace the constructor so that we can later pool job objects if necessary to + /// We provide this method to replace the constructor so that we can later pool job objects if necessary to /// reduce memory churn. Normally one would directly call QueueJob() with parameters anyway. /// /// @@ -219,6 +190,18 @@ namespace OpenSim.Framework.Monitoring /// public bool QueueJob(Job job) { + lock(JobLock) + { + if(!IsRunning) + return false; + + if(!m_threadRunnig) + { + WorkManager.RunInThreadPool(ProcessRequests, null, Name, false); + m_threadRunnig = true; + } + } + if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) { m_jobQueue.Add(job); @@ -238,56 +221,53 @@ namespace OpenSim.Framework.Monitoring m_warnOverMaxQueue = false; } - return false; } } - private void ProcessRequests() + private void ProcessRequests(Object o) { - try + while(IsRunning) { - while (IsRunning || m_jobQueue.Count > 0) + try { - try + if(!m_jobQueue.TryTake(out m_currentJob, m_timeout, m_cancelSource.Token)) { - CurrentJob = m_jobQueue.Take(m_cancelSource.Token); - } - catch (ObjectDisposedException e) - { - // If we see this whilst not running then it may be due to a race where this thread checks - // IsRunning after the stopping thread sets it to false and disposes of the cancellation source. - if (IsRunning) - throw e; - else - break; + lock(JobLock) + m_threadRunnig = false; + break; } + } + catch(ObjectDisposedException e) + { + m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue", + Name,m_jobQueue.Count); + break; + } + catch(OperationCanceledException) + { + break; + } - if (LogLevel >= 1) - m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name); + if(LogLevel >= 1) + m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,m_currentJob.Name); - try - { - CurrentJob.Action(); - } - catch (Exception e) - { - m_log.Error( - string.Format( - "[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e); - } + try + { + m_currentJob.Action(); + } + catch(Exception e) + { + m_log.Error( + string.Format( + "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,m_currentJob.Name),e); + } - if (LogLevel >= 1) - m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name); + if(LogLevel >= 1) + m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,m_currentJob.Name); - CurrentJob = null; - } - } - catch (OperationCanceledException) - { + m_currentJob = null; } - - m_finishedProcessingAfterStop.Set(); } public class Job @@ -320,7 +300,7 @@ namespace OpenSim.Framework.Monitoring CommonId = commonId; Action = action; } - + /// /// Make a job. It needs to be separately queued. /// @@ -338,4 +318,4 @@ namespace OpenSim.Framework.Monitoring } } } -} \ No newline at end of file +} -- cgit v1.1