From 482ff06e13d6694027eec8d4146f733d69908658 Mon Sep 17 00:00:00 2001 From: UbitUmarov Date: Tue, 13 Jun 2017 18:50:34 +0100 Subject: make JobEngine be a workitem of mail pool (smartThread), with the option to release thread after a idle time, so is free to do other service elsewhere --- OpenSim/Framework/Monitoring/JobEngine.cs | 89 ++++++++++++++----------------- 1 file changed, 41 insertions(+), 48 deletions(-) (limited to 'OpenSim/Framework') diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs index a6a059d..115871e 100644 --- a/OpenSim/Framework/Monitoring/JobEngine.cs +++ b/OpenSim/Framework/Monitoring/JobEngine.cs @@ -57,7 +57,8 @@ namespace OpenSim.Framework.Monitoring /// /// Will be null if no job is currently running. /// - public Job CurrentJob { get; private set; } + private Job m_currentJob; + public Job CurrentJob { get { return m_currentJob;} } /// /// Number of jobs waiting to be processed. @@ -82,16 +83,15 @@ namespace OpenSim.Framework.Monitoring private CancellationTokenSource m_cancelSource; - /// - /// Used to signal that we are ready to complete stop. - /// - private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); + private int m_timeout = -1; + + private bool m_threadRunnig = false; - public JobEngine(string name, string loggingName) + public JobEngine(string name, string loggingName, int timeout = -1) { Name = name; LoggingName = loggingName; - + m_timeout = timeout; RequestProcessTimeoutOnStop = 5000; } @@ -104,18 +104,9 @@ namespace OpenSim.Framework.Monitoring IsRunning = true; - m_finishedProcessingAfterStop.Reset(); - m_cancelSource = new CancellationTokenSource(); - - WorkManager.StartThread( - ProcessRequests, - Name, - ThreadPriority.Normal, - false, - true, - null, - int.MaxValue); + WorkManager.RunInThreadPool(ProcessRequests, null, Name, false); + m_threadRunnig = true; } } @@ -131,20 +122,16 @@ namespace OpenSim.Framework.Monitoring m_log.DebugFormat("[JobEngine] Stopping {0}", Name); IsRunning = false; - - m_finishedProcessingAfterStop.Reset(); - if(m_jobQueue.Count <= 0) + if(m_threadRunnig) + { m_cancelSource.Cancel(); - - m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop); - m_finishedProcessingAfterStop.Close(); + m_threadRunnig = false; + } } finally { if(m_cancelSource != null) m_cancelSource.Dispose(); - if(m_finishedProcessingAfterStop != null) - m_finishedProcessingAfterStop.Dispose(); } } } @@ -203,6 +190,18 @@ namespace OpenSim.Framework.Monitoring /// public bool QueueJob(Job job) { + lock(JobLock) + { + if(!IsRunning) + return false; + + if(!m_threadRunnig) + { + WorkManager.RunInThreadPool(ProcessRequests, null, Name, false); + m_threadRunnig = true; + } + } + if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) { m_jobQueue.Add(job); @@ -222,59 +221,53 @@ namespace OpenSim.Framework.Monitoring m_warnOverMaxQueue = false; } - return false; } } - private void ProcessRequests() + private void ProcessRequests(Object o) { - while(IsRunning || m_jobQueue.Count > 0) + while(IsRunning) { try { - CurrentJob = m_jobQueue.Take(m_cancelSource.Token); - } - catch(ObjectDisposedException e) - { - // If we see this whilst not running then it may be due to a race where this thread checks - // IsRunning after the stopping thread sets it to false and disposes of the cancellation source. - if(IsRunning) - throw e; - else + if(!m_jobQueue.TryTake(out m_currentJob, m_timeout, m_cancelSource.Token)) { - m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue", - Name,m_jobQueue.Count); + lock(JobLock) + m_threadRunnig = false; break; } } + catch(ObjectDisposedException e) + { + m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue", + Name,m_jobQueue.Count); + break; + } catch(OperationCanceledException) { break; } if(LogLevel >= 1) - m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,CurrentJob.Name); + m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,m_currentJob.Name); try { - CurrentJob.Action(); + m_currentJob.Action(); } catch(Exception e) { m_log.Error( string.Format( - "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,CurrentJob.Name),e); + "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,m_currentJob.Name),e); } if(LogLevel >= 1) - m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,CurrentJob.Name); + m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,m_currentJob.Name); - CurrentJob = null; + m_currentJob = null; } - - Watchdog.RemoveThread(false); - m_finishedProcessingAfterStop.Set(); } public class Job -- cgit v1.1