diff options
Diffstat (limited to 'OpenSim/Framework/Monitoring/JobEngine.cs')
-rw-r--r-- | OpenSim/Framework/Monitoring/JobEngine.cs | 341 |
1 files changed, 341 insertions, 0 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs new file mode 100644 index 0000000..6db9a67 --- /dev/null +++ b/OpenSim/Framework/Monitoring/JobEngine.cs | |||
@@ -0,0 +1,341 @@ | |||
1 | /* | ||
2 | * Copyright (c) Contributors, http://opensimulator.org/ | ||
3 | * See CONTRIBUTORS.TXT for a full list of copyright holders. | ||
4 | * | ||
5 | * Redistribution and use in source and binary forms, with or without | ||
6 | * modification, are permitted provided that the following conditions are met: | ||
7 | * * Redistributions of source code must retain the above copyright | ||
8 | * notice, this list of conditions and the following disclaimer. | ||
9 | * * Redistributions in binary form must reproduce the above copyright | ||
10 | * notice, this list of conditions and the following disclaimer in the | ||
11 | * documentation and/or other materials provided with the distribution. | ||
12 | * * Neither the name of the OpenSimulator Project nor the | ||
13 | * names of its contributors may be used to endorse or promote products | ||
14 | * derived from this software without specific prior written permission. | ||
15 | * | ||
16 | * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY | ||
17 | * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | ||
18 | * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | ||
19 | * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY | ||
20 | * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES | ||
21 | * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; | ||
22 | * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND | ||
23 | * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
24 | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | ||
25 | * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
26 | */ | ||
27 | |||
28 | using System; | ||
29 | using System.Collections.Concurrent; | ||
30 | using System.Reflection; | ||
31 | using System.Threading; | ||
32 | using log4net; | ||
33 | using OpenSim.Framework; | ||
34 | |||
35 | namespace OpenSim.Framework.Monitoring | ||
36 | { | ||
37 | public class JobEngine | ||
38 | { | ||
39 | private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); | ||
40 | |||
41 | public int LogLevel { get; set; } | ||
42 | |||
43 | public string Name { get; private set; } | ||
44 | |||
45 | public string LoggingName { get; private set; } | ||
46 | |||
47 | /// <summary> | ||
48 | /// Is this engine running? | ||
49 | /// </summary> | ||
50 | public bool IsRunning { get; private set; } | ||
51 | |||
52 | /// <summary> | ||
53 | /// The current job that the engine is running. | ||
54 | /// </summary> | ||
55 | /// <remarks> | ||
56 | /// Will be null if no job is currently running. | ||
57 | /// </remarks> | ||
58 | public Job CurrentJob { get; private set; } | ||
59 | |||
60 | /// <summary> | ||
61 | /// Number of jobs waiting to be processed. | ||
62 | /// </summary> | ||
63 | public int JobsWaiting { get { return m_jobQueue.Count; } } | ||
64 | |||
65 | /// <summary> | ||
66 | /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping. | ||
67 | /// </summary> | ||
68 | public int RequestProcessTimeoutOnStop { get; set; } | ||
69 | |||
70 | /// <summary> | ||
71 | /// Controls whether we need to warn in the log about exceeding the max queue size. | ||
72 | /// </summary> | ||
73 | /// <remarks> | ||
74 | /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in | ||
75 | /// order to avoid spamming the log with lots of warnings. | ||
76 | /// </remarks> | ||
77 | private bool m_warnOverMaxQueue = true; | ||
78 | |||
79 | private BlockingCollection<Job> m_jobQueue; | ||
80 | |||
81 | private CancellationTokenSource m_cancelSource; | ||
82 | |||
83 | /// <summary> | ||
84 | /// Used to signal that we are ready to complete stop. | ||
85 | /// </summary> | ||
86 | private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); | ||
87 | |||
88 | public JobEngine(string name, string loggingName) | ||
89 | { | ||
90 | Name = name; | ||
91 | LoggingName = loggingName; | ||
92 | |||
93 | RequestProcessTimeoutOnStop = 5000; | ||
94 | } | ||
95 | |||
96 | public void Start() | ||
97 | { | ||
98 | lock (this) | ||
99 | { | ||
100 | if (IsRunning) | ||
101 | return; | ||
102 | |||
103 | IsRunning = true; | ||
104 | |||
105 | m_finishedProcessingAfterStop.Reset(); | ||
106 | |||
107 | m_jobQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000); | ||
108 | m_cancelSource = new CancellationTokenSource(); | ||
109 | |||
110 | WorkManager.StartThread( | ||
111 | ProcessRequests, | ||
112 | Name, | ||
113 | ThreadPriority.Normal, | ||
114 | false, | ||
115 | true, | ||
116 | null, | ||
117 | int.MaxValue); | ||
118 | } | ||
119 | } | ||
120 | |||
121 | public void Stop() | ||
122 | { | ||
123 | lock (this) | ||
124 | { | ||
125 | try | ||
126 | { | ||
127 | if (!IsRunning) | ||
128 | return; | ||
129 | |||
130 | IsRunning = false; | ||
131 | |||
132 | int requestsLeft = m_jobQueue.Count; | ||
133 | |||
134 | if (requestsLeft <= 0) | ||
135 | { | ||
136 | m_cancelSource.Cancel(); | ||
137 | } | ||
138 | else | ||
139 | { | ||
140 | m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft); | ||
141 | |||
142 | while (requestsLeft > 0) | ||
143 | { | ||
144 | if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) | ||
145 | { | ||
146 | // After timeout no events have been written | ||
147 | if (requestsLeft == m_jobQueue.Count) | ||
148 | { | ||
149 | m_log.WarnFormat( | ||
150 | "[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests", | ||
151 | LoggingName, RequestProcessTimeoutOnStop, requestsLeft); | ||
152 | |||
153 | break; | ||
154 | } | ||
155 | } | ||
156 | |||
157 | requestsLeft = m_jobQueue.Count; | ||
158 | } | ||
159 | } | ||
160 | } | ||
161 | finally | ||
162 | { | ||
163 | m_cancelSource.Dispose(); | ||
164 | } | ||
165 | } | ||
166 | } | ||
167 | |||
168 | /// <summary> | ||
169 | /// Make a job. | ||
170 | /// </summary> | ||
171 | /// <remarks> | ||
172 | /// We provide this method to replace the constructor so that we can later pool job objects if necessary to | ||
173 | /// reduce memory churn. Normally one would directly call QueueJob() with parameters anyway. | ||
174 | /// </remarks> | ||
175 | /// <returns></returns> | ||
176 | /// <param name="name">Name.</param> | ||
177 | /// <param name="action">Action.</param> | ||
178 | /// <param name="commonId">Common identifier.</param> | ||
179 | public static Job MakeJob(string name, Action action, string commonId = null) | ||
180 | { | ||
181 | return Job.MakeJob(name, action, commonId); | ||
182 | } | ||
183 | |||
184 | /// <summary> | ||
185 | /// Remove the next job queued for processing. | ||
186 | /// </summary> | ||
187 | /// <remarks> | ||
188 | /// Returns null if there is no next job. | ||
189 | /// Will not remove a job currently being performed. | ||
190 | /// </remarks> | ||
191 | public Job RemoveNextJob() | ||
192 | { | ||
193 | Job nextJob; | ||
194 | m_jobQueue.TryTake(out nextJob); | ||
195 | |||
196 | return nextJob; | ||
197 | } | ||
198 | |||
199 | /// <summary> | ||
200 | /// Queue the job for processing. | ||
201 | /// </summary> | ||
202 | /// <returns><c>true</c>, if job was queued, <c>false</c> otherwise.</returns> | ||
203 | /// <param name="name">Name of job. This appears on the console and in logging.</param> | ||
204 | /// <param name="action">Action to perform.</param> | ||
205 | /// <param name="commonId"> | ||
206 | /// Common identifier for a set of jobs. This is allows a set of jobs to be removed | ||
207 | /// if required (e.g. all jobs for a given agent. Optional. | ||
208 | /// </param> | ||
209 | public bool QueueJob(string name, Action action, string commonId = null) | ||
210 | { | ||
211 | return QueueJob(MakeJob(name, action, commonId)); | ||
212 | } | ||
213 | |||
214 | /// <summary> | ||
215 | /// Queue the job for processing. | ||
216 | /// </summary> | ||
217 | /// <returns><c>true</c>, if job was queued, <c>false</c> otherwise.</returns> | ||
218 | /// <param name="job">The job</param> | ||
219 | /// </param> | ||
220 | public bool QueueJob(Job job) | ||
221 | { | ||
222 | if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) | ||
223 | { | ||
224 | m_jobQueue.Add(job); | ||
225 | |||
226 | if (!m_warnOverMaxQueue) | ||
227 | m_warnOverMaxQueue = true; | ||
228 | |||
229 | return true; | ||
230 | } | ||
231 | else | ||
232 | { | ||
233 | if (m_warnOverMaxQueue) | ||
234 | { | ||
235 | m_log.WarnFormat( | ||
236 | "[{0}]: Job queue at maximum capacity, not recording job from {1} in {2}", | ||
237 | LoggingName, job.Name, Name); | ||
238 | |||
239 | m_warnOverMaxQueue = false; | ||
240 | } | ||
241 | |||
242 | return false; | ||
243 | } | ||
244 | } | ||
245 | |||
246 | private void ProcessRequests() | ||
247 | { | ||
248 | try | ||
249 | { | ||
250 | while (IsRunning || m_jobQueue.Count > 0) | ||
251 | { | ||
252 | try | ||
253 | { | ||
254 | CurrentJob = m_jobQueue.Take(m_cancelSource.Token); | ||
255 | } | ||
256 | catch (ObjectDisposedException e) | ||
257 | { | ||
258 | // If we see this whilst not running then it may be due to a race where this thread checks | ||
259 | // IsRunning after the stopping thread sets it to false and disposes of the cancellation source. | ||
260 | if (IsRunning) | ||
261 | throw e; | ||
262 | else | ||
263 | break; | ||
264 | } | ||
265 | |||
266 | if (LogLevel >= 1) | ||
267 | m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name); | ||
268 | |||
269 | try | ||
270 | { | ||
271 | CurrentJob.Action(); | ||
272 | } | ||
273 | catch (Exception e) | ||
274 | { | ||
275 | m_log.Error( | ||
276 | string.Format( | ||
277 | "[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e); | ||
278 | } | ||
279 | |||
280 | if (LogLevel >= 1) | ||
281 | m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name); | ||
282 | |||
283 | CurrentJob = null; | ||
284 | } | ||
285 | } | ||
286 | catch (OperationCanceledException) | ||
287 | { | ||
288 | } | ||
289 | |||
290 | m_finishedProcessingAfterStop.Set(); | ||
291 | } | ||
292 | |||
293 | public class Job | ||
294 | { | ||
295 | /// <summary> | ||
296 | /// Name of the job. | ||
297 | /// </summary> | ||
298 | /// <remarks> | ||
299 | /// This appears on console and debug output. | ||
300 | /// </remarks> | ||
301 | public string Name { get; private set; } | ||
302 | |||
303 | /// <summary> | ||
304 | /// Common ID for this job. | ||
305 | /// </summary> | ||
306 | /// <remarks> | ||
307 | /// This allows all jobs with a certain common ID (e.g. a client UUID) to be removed en-masse if required. | ||
308 | /// Can be null if this is not required. | ||
309 | /// </remarks> | ||
310 | public string CommonId { get; private set; } | ||
311 | |||
312 | /// <summary> | ||
313 | /// Action to perform when this job is processed. | ||
314 | /// </summary> | ||
315 | public Action Action { get; private set; } | ||
316 | |||
317 | private Job(string name, string commonId, Action action) | ||
318 | { | ||
319 | Name = name; | ||
320 | CommonId = commonId; | ||
321 | Action = action; | ||
322 | } | ||
323 | |||
324 | /// <summary> | ||
325 | /// Make a job. It needs to be separately queued. | ||
326 | /// </summary> | ||
327 | /// <remarks> | ||
328 | /// We provide this method to replace the constructor so that we can pool job objects if necessary to | ||
329 | /// to reduce memory churn. Normally one would directly call JobEngine.QueueJob() with parameters anyway. | ||
330 | /// </remarks> | ||
331 | /// <returns></returns> | ||
332 | /// <param name="name">Name.</param> | ||
333 | /// <param name="action">Action.</param> | ||
334 | /// <param name="commonId">Common identifier.</param> | ||
335 | public static Job MakeJob(string name, Action action, string commonId = null) | ||
336 | { | ||
337 | return new Job(name, commonId, action); | ||
338 | } | ||
339 | } | ||
340 | } | ||
341 | } \ No newline at end of file | ||