diff options
Diffstat (limited to 'OpenSim/Framework/Monitoring/JobEngine.cs')
-rw-r--r-- | OpenSim/Framework/Monitoring/JobEngine.cs | 162 |
1 files changed, 71 insertions, 91 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs index 6db9a67..115871e 100644 --- a/OpenSim/Framework/Monitoring/JobEngine.cs +++ b/OpenSim/Framework/Monitoring/JobEngine.cs | |||
@@ -40,6 +40,8 @@ namespace OpenSim.Framework.Monitoring | |||
40 | 40 | ||
41 | public int LogLevel { get; set; } | 41 | public int LogLevel { get; set; } |
42 | 42 | ||
43 | private object JobLock = new object(); | ||
44 | |||
43 | public string Name { get; private set; } | 45 | public string Name { get; private set; } |
44 | 46 | ||
45 | public string LoggingName { get; private set; } | 47 | public string LoggingName { get; private set; } |
@@ -47,7 +49,7 @@ namespace OpenSim.Framework.Monitoring | |||
47 | /// <summary> | 49 | /// <summary> |
48 | /// Is this engine running? | 50 | /// Is this engine running? |
49 | /// </summary> | 51 | /// </summary> |
50 | public bool IsRunning { get; private set; } | 52 | public bool IsRunning { get; private set; } |
51 | 53 | ||
52 | /// <summary> | 54 | /// <summary> |
53 | /// The current job that the engine is running. | 55 | /// The current job that the engine is running. |
@@ -55,7 +57,8 @@ namespace OpenSim.Framework.Monitoring | |||
55 | /// <remarks> | 57 | /// <remarks> |
56 | /// Will be null if no job is currently running. | 58 | /// Will be null if no job is currently running. |
57 | /// </remarks> | 59 | /// </remarks> |
58 | public Job CurrentJob { get; private set; } | 60 | private Job m_currentJob; |
61 | public Job CurrentJob { get { return m_currentJob;} } | ||
59 | 62 | ||
60 | /// <summary> | 63 | /// <summary> |
61 | /// Number of jobs waiting to be processed. | 64 | /// Number of jobs waiting to be processed. |
@@ -71,96 +74,64 @@ namespace OpenSim.Framework.Monitoring | |||
71 | /// Controls whether we need to warn in the log about exceeding the max queue size. | 74 | /// Controls whether we need to warn in the log about exceeding the max queue size. |
72 | /// </summary> | 75 | /// </summary> |
73 | /// <remarks> | 76 | /// <remarks> |
74 | /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in | 77 | /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in |
75 | /// order to avoid spamming the log with lots of warnings. | 78 | /// order to avoid spamming the log with lots of warnings. |
76 | /// </remarks> | 79 | /// </remarks> |
77 | private bool m_warnOverMaxQueue = true; | 80 | private bool m_warnOverMaxQueue = true; |
78 | 81 | ||
79 | private BlockingCollection<Job> m_jobQueue; | 82 | private BlockingCollection<Job> m_jobQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000); |
80 | 83 | ||
81 | private CancellationTokenSource m_cancelSource; | 84 | private CancellationTokenSource m_cancelSource; |
82 | 85 | ||
83 | /// <summary> | 86 | private int m_timeout = -1; |
84 | /// Used to signal that we are ready to complete stop. | ||
85 | /// </summary> | ||
86 | private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); | ||
87 | 87 | ||
88 | public JobEngine(string name, string loggingName) | 88 | private bool m_threadRunnig = false; |
89 | |||
90 | public JobEngine(string name, string loggingName, int timeout = -1) | ||
89 | { | 91 | { |
90 | Name = name; | 92 | Name = name; |
91 | LoggingName = loggingName; | 93 | LoggingName = loggingName; |
92 | 94 | m_timeout = timeout; | |
93 | RequestProcessTimeoutOnStop = 5000; | 95 | RequestProcessTimeoutOnStop = 5000; |
94 | } | 96 | } |
95 | 97 | ||
96 | public void Start() | 98 | public void Start() |
97 | { | 99 | { |
98 | lock (this) | 100 | lock (JobLock) |
99 | { | 101 | { |
100 | if (IsRunning) | 102 | if (IsRunning) |
101 | return; | 103 | return; |
102 | 104 | ||
103 | IsRunning = true; | 105 | IsRunning = true; |
104 | 106 | ||
105 | m_finishedProcessingAfterStop.Reset(); | ||
106 | |||
107 | m_jobQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000); | ||
108 | m_cancelSource = new CancellationTokenSource(); | 107 | m_cancelSource = new CancellationTokenSource(); |
109 | 108 | WorkManager.RunInThreadPool(ProcessRequests, null, Name, false); | |
110 | WorkManager.StartThread( | 109 | m_threadRunnig = true; |
111 | ProcessRequests, | ||
112 | Name, | ||
113 | ThreadPriority.Normal, | ||
114 | false, | ||
115 | true, | ||
116 | null, | ||
117 | int.MaxValue); | ||
118 | } | 110 | } |
119 | } | 111 | } |
120 | 112 | ||
121 | public void Stop() | 113 | public void Stop() |
122 | { | 114 | { |
123 | lock (this) | 115 | lock (JobLock) |
124 | { | 116 | { |
125 | try | 117 | try |
126 | { | 118 | { |
127 | if (!IsRunning) | 119 | if (!IsRunning) |
128 | return; | 120 | return; |
129 | 121 | ||
130 | IsRunning = false; | 122 | m_log.DebugFormat("[JobEngine] Stopping {0}", Name); |
131 | 123 | ||
132 | int requestsLeft = m_jobQueue.Count; | 124 | IsRunning = false; |
133 | 125 | if(m_threadRunnig) | |
134 | if (requestsLeft <= 0) | ||
135 | { | 126 | { |
136 | m_cancelSource.Cancel(); | 127 | m_cancelSource.Cancel(); |
137 | } | 128 | m_threadRunnig = false; |
138 | else | ||
139 | { | ||
140 | m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft); | ||
141 | |||
142 | while (requestsLeft > 0) | ||
143 | { | ||
144 | if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) | ||
145 | { | ||
146 | // After timeout no events have been written | ||
147 | if (requestsLeft == m_jobQueue.Count) | ||
148 | { | ||
149 | m_log.WarnFormat( | ||
150 | "[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests", | ||
151 | LoggingName, RequestProcessTimeoutOnStop, requestsLeft); | ||
152 | |||
153 | break; | ||
154 | } | ||
155 | } | ||
156 | |||
157 | requestsLeft = m_jobQueue.Count; | ||
158 | } | ||
159 | } | 129 | } |
160 | } | 130 | } |
161 | finally | 131 | finally |
162 | { | 132 | { |
163 | m_cancelSource.Dispose(); | 133 | if(m_cancelSource != null) |
134 | m_cancelSource.Dispose(); | ||
164 | } | 135 | } |
165 | } | 136 | } |
166 | } | 137 | } |
@@ -169,7 +140,7 @@ namespace OpenSim.Framework.Monitoring | |||
169 | /// Make a job. | 140 | /// Make a job. |
170 | /// </summary> | 141 | /// </summary> |
171 | /// <remarks> | 142 | /// <remarks> |
172 | /// We provide this method to replace the constructor so that we can later pool job objects if necessary to | 143 | /// We provide this method to replace the constructor so that we can later pool job objects if necessary to |
173 | /// reduce memory churn. Normally one would directly call QueueJob() with parameters anyway. | 144 | /// reduce memory churn. Normally one would directly call QueueJob() with parameters anyway. |
174 | /// </remarks> | 145 | /// </remarks> |
175 | /// <returns></returns> | 146 | /// <returns></returns> |
@@ -219,6 +190,18 @@ namespace OpenSim.Framework.Monitoring | |||
219 | /// </param> | 190 | /// </param> |
220 | public bool QueueJob(Job job) | 191 | public bool QueueJob(Job job) |
221 | { | 192 | { |
193 | lock(JobLock) | ||
194 | { | ||
195 | if(!IsRunning) | ||
196 | return false; | ||
197 | |||
198 | if(!m_threadRunnig) | ||
199 | { | ||
200 | WorkManager.RunInThreadPool(ProcessRequests, null, Name, false); | ||
201 | m_threadRunnig = true; | ||
202 | } | ||
203 | } | ||
204 | |||
222 | if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) | 205 | if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) |
223 | { | 206 | { |
224 | m_jobQueue.Add(job); | 207 | m_jobQueue.Add(job); |
@@ -238,56 +221,53 @@ namespace OpenSim.Framework.Monitoring | |||
238 | 221 | ||
239 | m_warnOverMaxQueue = false; | 222 | m_warnOverMaxQueue = false; |
240 | } | 223 | } |
241 | |||
242 | return false; | 224 | return false; |
243 | } | 225 | } |
244 | } | 226 | } |
245 | 227 | ||
246 | private void ProcessRequests() | 228 | private void ProcessRequests(Object o) |
247 | { | 229 | { |
248 | try | 230 | while(IsRunning) |
249 | { | 231 | { |
250 | while (IsRunning || m_jobQueue.Count > 0) | 232 | try |
251 | { | 233 | { |
252 | try | 234 | if(!m_jobQueue.TryTake(out m_currentJob, m_timeout, m_cancelSource.Token)) |
253 | { | 235 | { |
254 | CurrentJob = m_jobQueue.Take(m_cancelSource.Token); | 236 | lock(JobLock) |
255 | } | 237 | m_threadRunnig = false; |
256 | catch (ObjectDisposedException e) | 238 | break; |
257 | { | ||
258 | // If we see this whilst not running then it may be due to a race where this thread checks | ||
259 | // IsRunning after the stopping thread sets it to false and disposes of the cancellation source. | ||
260 | if (IsRunning) | ||
261 | throw e; | ||
262 | else | ||
263 | break; | ||
264 | } | 239 | } |
240 | } | ||
241 | catch(ObjectDisposedException e) | ||
242 | { | ||
243 | m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue", | ||
244 | Name,m_jobQueue.Count); | ||
245 | break; | ||
246 | } | ||
247 | catch(OperationCanceledException) | ||
248 | { | ||
249 | break; | ||
250 | } | ||
265 | 251 | ||
266 | if (LogLevel >= 1) | 252 | if(LogLevel >= 1) |
267 | m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name); | 253 | m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,m_currentJob.Name); |
268 | 254 | ||
269 | try | 255 | try |
270 | { | 256 | { |
271 | CurrentJob.Action(); | 257 | m_currentJob.Action(); |
272 | } | 258 | } |
273 | catch (Exception e) | 259 | catch(Exception e) |
274 | { | 260 | { |
275 | m_log.Error( | 261 | m_log.Error( |
276 | string.Format( | 262 | string.Format( |
277 | "[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e); | 263 | "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,m_currentJob.Name),e); |
278 | } | 264 | } |
279 | 265 | ||
280 | if (LogLevel >= 1) | 266 | if(LogLevel >= 1) |
281 | m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name); | 267 | m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,m_currentJob.Name); |
282 | 268 | ||
283 | CurrentJob = null; | 269 | m_currentJob = null; |
284 | } | ||
285 | } | ||
286 | catch (OperationCanceledException) | ||
287 | { | ||
288 | } | 270 | } |
289 | |||
290 | m_finishedProcessingAfterStop.Set(); | ||
291 | } | 271 | } |
292 | 272 | ||
293 | public class Job | 273 | public class Job |
@@ -320,7 +300,7 @@ namespace OpenSim.Framework.Monitoring | |||
320 | CommonId = commonId; | 300 | CommonId = commonId; |
321 | Action = action; | 301 | Action = action; |
322 | } | 302 | } |
323 | 303 | ||
324 | /// <summary> | 304 | /// <summary> |
325 | /// Make a job. It needs to be separately queued. | 305 | /// Make a job. It needs to be separately queued. |
326 | /// </summary> | 306 | /// </summary> |
@@ -338,4 +318,4 @@ namespace OpenSim.Framework.Monitoring | |||
338 | } | 318 | } |
339 | } | 319 | } |
340 | } | 320 | } |
341 | } \ No newline at end of file | 321 | } |