diff options
-rw-r--r-- | OpenSim/Framework/Monitoring/JobEngine.cs | 329 |
1 files changed, 329 insertions, 0 deletions
diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs new file mode 100644 index 0000000..44f5d9a --- /dev/null +++ b/OpenSim/Framework/Monitoring/JobEngine.cs | |||
@@ -0,0 +1,329 @@ | |||
1 | /* | ||
2 | * Copyright (c) Contributors, http://opensimulator.org/ | ||
3 | * See CONTRIBUTORS.TXT for a full list of copyright holders. | ||
4 | * | ||
5 | * Redistribution and use in source and binary forms, with or without | ||
6 | * modification, are permitted provided that the following conditions are met: | ||
7 | * * Redistributions of source code must retain the above copyright | ||
8 | * notice, this list of conditions and the following disclaimer. | ||
9 | * * Redistributions in binary form must reproduce the above copyright | ||
10 | * notice, this list of conditions and the following disclaimer in the | ||
11 | * documentation and/or other materials provided with the distribution. | ||
12 | * * Neither the name of the OpenSimulator Project nor the | ||
13 | * names of its contributors may be used to endorse or promote products | ||
14 | * derived from this software without specific prior written permission. | ||
15 | * | ||
16 | * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY | ||
17 | * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | ||
18 | * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | ||
19 | * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY | ||
20 | * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES | ||
21 | * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; | ||
22 | * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND | ||
23 | * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
24 | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | ||
25 | * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
26 | */ | ||
27 | |||
28 | using System; | ||
29 | using System.Collections.Concurrent; | ||
30 | using System.Reflection; | ||
31 | using System.Threading; | ||
32 | using log4net; | ||
33 | using OpenSim.Framework; | ||
34 | |||
35 | namespace OpenSim.Framework.Monitoring | ||
36 | { | ||
37 | public class JobEngine | ||
38 | { | ||
39 | private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); | ||
40 | |||
41 | public int LogLevel { get; set; } | ||
42 | |||
43 | public string Name { get; private set; } | ||
44 | |||
45 | public string LoggingName { get; private set; } | ||
46 | |||
47 | /// <summary> | ||
48 | /// Is this engine running? | ||
49 | /// </summary> | ||
50 | public bool IsRunning { get; private set; } | ||
51 | |||
52 | /// <summary> | ||
53 | /// The current job that the engine is running. | ||
54 | /// </summary> | ||
55 | /// <remarks> | ||
56 | /// Will be null if no job is currently running. | ||
57 | /// </remarks> | ||
58 | public Job CurrentJob { get; private set; } | ||
59 | |||
60 | /// <summary> | ||
61 | /// Number of jobs waiting to be processed. | ||
62 | /// </summary> | ||
63 | public int JobsWaiting { get { return m_jobQueue.Count; } } | ||
64 | |||
65 | /// <summary> | ||
66 | /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping. | ||
67 | /// </summary> | ||
68 | public int RequestProcessTimeoutOnStop { get; set; } | ||
69 | |||
70 | /// <summary> | ||
71 | /// Controls whether we need to warn in the log about exceeding the max queue size. | ||
72 | /// </summary> | ||
73 | /// <remarks> | ||
74 | /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in | ||
75 | /// order to avoid spamming the log with lots of warnings. | ||
76 | /// </remarks> | ||
77 | private bool m_warnOverMaxQueue = true; | ||
78 | |||
79 | private BlockingCollection<Job> m_jobQueue; | ||
80 | |||
81 | private CancellationTokenSource m_cancelSource = new CancellationTokenSource(); | ||
82 | |||
83 | /// <summary> | ||
84 | /// Used to signal that we are ready to complete stop. | ||
85 | /// </summary> | ||
86 | private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); | ||
87 | |||
88 | public JobEngine(string name, string loggingName) | ||
89 | { | ||
90 | Name = name; | ||
91 | LoggingName = loggingName; | ||
92 | |||
93 | RequestProcessTimeoutOnStop = 5000; | ||
94 | } | ||
95 | |||
96 | public void Start() | ||
97 | { | ||
98 | lock (this) | ||
99 | { | ||
100 | if (IsRunning) | ||
101 | return; | ||
102 | |||
103 | IsRunning = true; | ||
104 | |||
105 | m_finishedProcessingAfterStop.Reset(); | ||
106 | |||
107 | m_jobQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000); | ||
108 | |||
109 | WorkManager.StartThread( | ||
110 | ProcessRequests, | ||
111 | Name, | ||
112 | ThreadPriority.Normal, | ||
113 | false, | ||
114 | true, | ||
115 | null, | ||
116 | int.MaxValue); | ||
117 | } | ||
118 | } | ||
119 | |||
120 | public void Stop() | ||
121 | { | ||
122 | lock (this) | ||
123 | { | ||
124 | try | ||
125 | { | ||
126 | if (!IsRunning) | ||
127 | return; | ||
128 | |||
129 | IsRunning = false; | ||
130 | |||
131 | int requestsLeft = m_jobQueue.Count; | ||
132 | |||
133 | if (requestsLeft <= 0) | ||
134 | { | ||
135 | m_cancelSource.Cancel(); | ||
136 | } | ||
137 | else | ||
138 | { | ||
139 | m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft); | ||
140 | |||
141 | while (requestsLeft > 0) | ||
142 | { | ||
143 | if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) | ||
144 | { | ||
145 | // After timeout no events have been written | ||
146 | if (requestsLeft == m_jobQueue.Count) | ||
147 | { | ||
148 | m_log.WarnFormat( | ||
149 | "[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests", | ||
150 | LoggingName, RequestProcessTimeoutOnStop, requestsLeft); | ||
151 | |||
152 | break; | ||
153 | } | ||
154 | } | ||
155 | |||
156 | requestsLeft = m_jobQueue.Count; | ||
157 | } | ||
158 | } | ||
159 | } | ||
160 | finally | ||
161 | { | ||
162 | m_cancelSource.Dispose(); | ||
163 | m_jobQueue = null; | ||
164 | } | ||
165 | } | ||
166 | } | ||
167 | |||
168 | /// <summary> | ||
169 | /// Make a job. | ||
170 | /// </summary> | ||
171 | /// <remarks> | ||
172 | /// We provide this method to replace the constructor so that we can later pool job objects if necessary to | ||
173 | /// reduce memory churn. Normally one would directly call QueueJob() with parameters anyway. | ||
174 | /// </remarks> | ||
175 | /// <returns></returns> | ||
176 | /// <param name="name">Name.</param> | ||
177 | /// <param name="action">Action.</param> | ||
178 | /// <param name="commonId">Common identifier.</param> | ||
179 | public static Job MakeJob(string name, Action action, string commonId = null) | ||
180 | { | ||
181 | return Job.MakeJob(name, action, commonId); | ||
182 | } | ||
183 | |||
184 | /// <summary> | ||
185 | /// Remove the next job queued for processing. | ||
186 | /// </summary> | ||
187 | /// <remarks> | ||
188 | /// Returns null if there is no next job. | ||
189 | /// Will not remove a job currently being performed. | ||
190 | /// </remarks> | ||
191 | public Job RemoveNextJob() | ||
192 | { | ||
193 | Job nextJob; | ||
194 | m_jobQueue.TryTake(out nextJob); | ||
195 | |||
196 | return nextJob; | ||
197 | } | ||
198 | |||
199 | /// <summary> | ||
200 | /// Queue the job for processing. | ||
201 | /// </summary> | ||
202 | /// <returns><c>true</c>, if job was queued, <c>false</c> otherwise.</returns> | ||
203 | /// <param name="name">Name of job. This appears on the console and in logging.</param> | ||
204 | /// <param name="action">Action to perform.</param> | ||
205 | /// <param name="commonId"> | ||
206 | /// Common identifier for a set of jobs. This is allows a set of jobs to be removed | ||
207 | /// if required (e.g. all jobs for a given agent. Optional. | ||
208 | /// </param> | ||
209 | public bool QueueJob(string name, Action action, string commonId = null) | ||
210 | { | ||
211 | return QueueJob(MakeJob(name, action, commonId)); | ||
212 | } | ||
213 | |||
214 | /// <summary> | ||
215 | /// Queue the job for processing. | ||
216 | /// </summary> | ||
217 | /// <returns><c>true</c>, if job was queued, <c>false</c> otherwise.</returns> | ||
218 | /// <param name="job">The job</param> | ||
219 | /// </param> | ||
220 | public bool QueueJob(Job job) | ||
221 | { | ||
222 | if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) | ||
223 | { | ||
224 | m_jobQueue.Add(job); | ||
225 | |||
226 | if (!m_warnOverMaxQueue) | ||
227 | m_warnOverMaxQueue = true; | ||
228 | |||
229 | return true; | ||
230 | } | ||
231 | else | ||
232 | { | ||
233 | if (m_warnOverMaxQueue) | ||
234 | { | ||
235 | m_log.WarnFormat( | ||
236 | "[{0}]: Job queue at maximum capacity, not recording job from {1} in {2}", | ||
237 | LoggingName, job.Name, Name); | ||
238 | |||
239 | m_warnOverMaxQueue = false; | ||
240 | } | ||
241 | |||
242 | return false; | ||
243 | } | ||
244 | } | ||
245 | |||
246 | private void ProcessRequests() | ||
247 | { | ||
248 | try | ||
249 | { | ||
250 | while (IsRunning || m_jobQueue.Count > 0) | ||
251 | { | ||
252 | CurrentJob = m_jobQueue.Take(m_cancelSource.Token); | ||
253 | |||
254 | if (LogLevel >= 1) | ||
255 | m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name); | ||
256 | |||
257 | try | ||
258 | { | ||
259 | CurrentJob.Action(); | ||
260 | } | ||
261 | catch (Exception e) | ||
262 | { | ||
263 | m_log.Error( | ||
264 | string.Format( | ||
265 | "[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e); | ||
266 | } | ||
267 | |||
268 | if (LogLevel >= 1) | ||
269 | m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name); | ||
270 | |||
271 | CurrentJob = null; | ||
272 | } | ||
273 | } | ||
274 | catch (OperationCanceledException) | ||
275 | { | ||
276 | } | ||
277 | |||
278 | m_finishedProcessingAfterStop.Set(); | ||
279 | } | ||
280 | |||
281 | public class Job | ||
282 | { | ||
283 | /// <summary> | ||
284 | /// Name of the job. | ||
285 | /// </summary> | ||
286 | /// <remarks> | ||
287 | /// This appears on console and debug output. | ||
288 | /// </remarks> | ||
289 | public string Name { get; private set; } | ||
290 | |||
291 | /// <summary> | ||
292 | /// Common ID for this job. | ||
293 | /// </summary> | ||
294 | /// <remarks> | ||
295 | /// This allows all jobs with a certain common ID (e.g. a client UUID) to be removed en-masse if required. | ||
296 | /// Can be null if this is not required. | ||
297 | /// </remarks> | ||
298 | public string CommonId { get; private set; } | ||
299 | |||
300 | /// <summary> | ||
301 | /// Action to perform when this job is processed. | ||
302 | /// </summary> | ||
303 | public Action Action { get; private set; } | ||
304 | |||
305 | private Job(string name, string commonId, Action action) | ||
306 | { | ||
307 | Name = name; | ||
308 | CommonId = commonId; | ||
309 | Action = action; | ||
310 | } | ||
311 | |||
312 | /// <summary> | ||
313 | /// Make a job. It needs to be separately queued. | ||
314 | /// </summary> | ||
315 | /// <remarks> | ||
316 | /// We provide this method to replace the constructor so that we can pool job objects if necessary to | ||
317 | /// to reduce memory churn. Normally one would directly call JobEngine.QueueJob() with parameters anyway. | ||
318 | /// </remarks> | ||
319 | /// <returns></returns> | ||
320 | /// <param name="name">Name.</param> | ||
321 | /// <param name="action">Action.</param> | ||
322 | /// <param name="commonId">Common identifier.</param> | ||
323 | public static Job MakeJob(string name, Action action, string commonId = null) | ||
324 | { | ||
325 | return new Job(name, commonId, action); | ||
326 | } | ||
327 | } | ||
328 | } | ||
329 | } \ No newline at end of file | ||