From 482ff06e13d6694027eec8d4146f733d69908658 Mon Sep 17 00:00:00 2001
From: UbitUmarov
Date: Tue, 13 Jun 2017 18:50:34 +0100
Subject: make JobEngine be a workitem of mail pool (smartThread), with the
option to release thread after a idle time, so is free to do other service
elsewhere
---
OpenSim/Framework/Monitoring/JobEngine.cs | 89 ++++++++++++++-----------------
1 file changed, 41 insertions(+), 48 deletions(-)
(limited to 'OpenSim/Framework')
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs
index a6a059d..115871e 100644
--- a/OpenSim/Framework/Monitoring/JobEngine.cs
+++ b/OpenSim/Framework/Monitoring/JobEngine.cs
@@ -57,7 +57,8 @@ namespace OpenSim.Framework.Monitoring
///
/// Will be null if no job is currently running.
///
- public Job CurrentJob { get; private set; }
+ private Job m_currentJob;
+ public Job CurrentJob { get { return m_currentJob;} }
///
/// Number of jobs waiting to be processed.
@@ -82,16 +83,15 @@ namespace OpenSim.Framework.Monitoring
private CancellationTokenSource m_cancelSource;
- ///
- /// Used to signal that we are ready to complete stop.
- ///
- private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
+ private int m_timeout = -1;
+
+ private bool m_threadRunnig = false;
- public JobEngine(string name, string loggingName)
+ public JobEngine(string name, string loggingName, int timeout = -1)
{
Name = name;
LoggingName = loggingName;
-
+ m_timeout = timeout;
RequestProcessTimeoutOnStop = 5000;
}
@@ -104,18 +104,9 @@ namespace OpenSim.Framework.Monitoring
IsRunning = true;
- m_finishedProcessingAfterStop.Reset();
-
m_cancelSource = new CancellationTokenSource();
-
- WorkManager.StartThread(
- ProcessRequests,
- Name,
- ThreadPriority.Normal,
- false,
- true,
- null,
- int.MaxValue);
+ WorkManager.RunInThreadPool(ProcessRequests, null, Name, false);
+ m_threadRunnig = true;
}
}
@@ -131,20 +122,16 @@ namespace OpenSim.Framework.Monitoring
m_log.DebugFormat("[JobEngine] Stopping {0}", Name);
IsRunning = false;
-
- m_finishedProcessingAfterStop.Reset();
- if(m_jobQueue.Count <= 0)
+ if(m_threadRunnig)
+ {
m_cancelSource.Cancel();
-
- m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop);
- m_finishedProcessingAfterStop.Close();
+ m_threadRunnig = false;
+ }
}
finally
{
if(m_cancelSource != null)
m_cancelSource.Dispose();
- if(m_finishedProcessingAfterStop != null)
- m_finishedProcessingAfterStop.Dispose();
}
}
}
@@ -203,6 +190,18 @@ namespace OpenSim.Framework.Monitoring
///
public bool QueueJob(Job job)
{
+ lock(JobLock)
+ {
+ if(!IsRunning)
+ return false;
+
+ if(!m_threadRunnig)
+ {
+ WorkManager.RunInThreadPool(ProcessRequests, null, Name, false);
+ m_threadRunnig = true;
+ }
+ }
+
if (m_jobQueue.Count < m_jobQueue.BoundedCapacity)
{
m_jobQueue.Add(job);
@@ -222,59 +221,53 @@ namespace OpenSim.Framework.Monitoring
m_warnOverMaxQueue = false;
}
-
return false;
}
}
- private void ProcessRequests()
+ private void ProcessRequests(Object o)
{
- while(IsRunning || m_jobQueue.Count > 0)
+ while(IsRunning)
{
try
{
- CurrentJob = m_jobQueue.Take(m_cancelSource.Token);
- }
- catch(ObjectDisposedException e)
- {
- // If we see this whilst not running then it may be due to a race where this thread checks
- // IsRunning after the stopping thread sets it to false and disposes of the cancellation source.
- if(IsRunning)
- throw e;
- else
+ if(!m_jobQueue.TryTake(out m_currentJob, m_timeout, m_cancelSource.Token))
{
- m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue",
- Name,m_jobQueue.Count);
+ lock(JobLock)
+ m_threadRunnig = false;
break;
}
}
+ catch(ObjectDisposedException e)
+ {
+ m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue",
+ Name,m_jobQueue.Count);
+ break;
+ }
catch(OperationCanceledException)
{
break;
}
if(LogLevel >= 1)
- m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,CurrentJob.Name);
+ m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,m_currentJob.Name);
try
{
- CurrentJob.Action();
+ m_currentJob.Action();
}
catch(Exception e)
{
m_log.Error(
string.Format(
- "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,CurrentJob.Name),e);
+ "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,m_currentJob.Name),e);
}
if(LogLevel >= 1)
- m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,CurrentJob.Name);
+ m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,m_currentJob.Name);
- CurrentJob = null;
+ m_currentJob = null;
}
-
- Watchdog.RemoveThread(false);
- m_finishedProcessingAfterStop.Set();
}
public class Job
--
cgit v1.1