aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs328
1 files changed, 328 insertions, 0 deletions
diff --git a/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs b/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs
new file mode 100644
index 0000000..874ddae
--- /dev/null
+++ b/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs
@@ -0,0 +1,328 @@
1/*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28using System;
29using System.Collections.Concurrent;
30using System.Reflection;
31using System.Threading;
32using log4net;
33using OpenSim.Framework;
34using OpenSim.Framework.Monitoring;
35using OpenSim.Region.Framework.Scenes;
36
37namespace OpenSim.Region.ClientStack.LindenUDP
38{
39 public class Job
40 {
41 public string Name;
42 public WaitCallback Callback;
43 public object O;
44
45 public Job(string name, WaitCallback callback, object o)
46 {
47 Name = name;
48 Callback = callback;
49 O = o;
50 }
51 }
52
53 // TODO: These kinds of classes MUST be generalized with JobEngine, etc.
54 public class IncomingPacketAsyncHandlingEngine
55 {
56 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
57
58 public int LogLevel { get; set; }
59
60 public bool IsRunning { get; private set; }
61
62 /// <summary>
63 /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
64 /// </summary>
65 public int RequestProcessTimeoutOnStop { get; set; }
66
67 /// <summary>
68 /// Controls whether we need to warn in the log about exceeding the max queue size.
69 /// </summary>
70 /// <remarks>
71 /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
72 /// order to avoid spamming the log with lots of warnings.
73 /// </remarks>
74 private bool m_warnOverMaxQueue = true;
75
76 private BlockingCollection<Job> m_requestQueue;
77
78 private CancellationTokenSource m_cancelSource = new CancellationTokenSource();
79
80 private LLUDPServer m_udpServer;
81
82 private Stat m_requestsWaitingStat;
83
84 private Job m_currentJob;
85
86 /// <summary>
87 /// Used to signal that we are ready to complete stop.
88 /// </summary>
89 private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
90
91 public IncomingPacketAsyncHandlingEngine(LLUDPServer server)
92 {
93 //LogLevel = 1;
94 m_udpServer = server;
95 RequestProcessTimeoutOnStop = 5000;
96
97 // MainConsole.Instance.Commands.AddCommand(
98 // "Debug",
99 // false,
100 // "debug jobengine",
101 // "debug jobengine <start|stop|status>",
102 // "Start, stop or get status of the job engine.",
103 // "If stopped then all jobs are processed immediately.",
104 // HandleControlCommand);
105 }
106
107 public void Start()
108 {
109 lock (this)
110 {
111 if (IsRunning)
112 return;
113
114 IsRunning = true;
115
116 m_finishedProcessingAfterStop.Reset();
117
118 m_requestQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000);
119
120 m_requestsWaitingStat =
121 new Stat(
122 "IncomingPacketAsyncRequestsWaiting",
123 "Number of incoming packets waiting for async processing in engine.",
124 "",
125 "",
126 "clientstack",
127 m_udpServer.Scene.Name,
128 StatType.Pull,
129 MeasuresOfInterest.None,
130 stat => stat.Value = m_requestQueue.Count,
131 StatVerbosity.Debug);
132
133 StatsManager.RegisterStat(m_requestsWaitingStat);
134
135 Watchdog.StartThread(
136 ProcessRequests,
137 string.Format("Incoming Packet Async Handling Engine Thread ({0})", m_udpServer.Scene.Name),
138 ThreadPriority.Normal,
139 false,
140 true,
141 null,
142 int.MaxValue);
143 }
144 }
145
146 public void Stop()
147 {
148 lock (this)
149 {
150 try
151 {
152 if (!IsRunning)
153 return;
154
155 IsRunning = false;
156
157 int requestsLeft = m_requestQueue.Count;
158
159 if (requestsLeft <= 0)
160 {
161 m_cancelSource.Cancel();
162 }
163 else
164 {
165 m_log.InfoFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Waiting to write {0} events after stop.", requestsLeft);
166
167 while (requestsLeft > 0)
168 {
169 if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
170 {
171 // After timeout no events have been written
172 if (requestsLeft == m_requestQueue.Count)
173 {
174 m_log.WarnFormat(
175 "[INCOMING PACKET ASYNC HANDLING ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests",
176 RequestProcessTimeoutOnStop, requestsLeft);
177
178 break;
179 }
180 }
181
182 requestsLeft = m_requestQueue.Count;
183 }
184 }
185 }
186 finally
187 {
188 m_cancelSource.Dispose();
189 StatsManager.DeregisterStat(m_requestsWaitingStat);
190 m_requestsWaitingStat = null;
191 m_requestQueue = null;
192 }
193 }
194 }
195
196 public bool QueueRequest(string name, WaitCallback req, object o)
197 {
198 if (LogLevel >= 1)
199 m_log.DebugFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Queued job {0}", name);
200
201 if (m_requestQueue.Count < m_requestQueue.BoundedCapacity)
202 {
203 // m_log.DebugFormat(
204 // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}",
205 // categories, client.AgentID, m_udpServer.Scene.Name);
206
207 m_requestQueue.Add(new Job(name, req, o));
208
209 if (!m_warnOverMaxQueue)
210 m_warnOverMaxQueue = true;
211
212 return true;
213 }
214 else
215 {
216 if (m_warnOverMaxQueue)
217 {
218 // m_log.WarnFormat(
219 // "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}",
220 // client.AgentID, m_udpServer.Scene.Name);
221
222 m_log.WarnFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Request queue at maximum capacity, not recording job");
223
224 m_warnOverMaxQueue = false;
225 }
226
227 return false;
228 }
229 }
230
231 private void ProcessRequests()
232 {
233 try
234 {
235 while (IsRunning || m_requestQueue.Count > 0)
236 {
237 m_currentJob = m_requestQueue.Take(m_cancelSource.Token);
238
239 // QueueEmpty callback = req.Client.OnQueueEmpty;
240 //
241 // if (callback != null)
242 // {
243 // try
244 // {
245 // callback(req.Categories);
246 // }
247 // catch (Exception e)
248 // {
249 // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e);
250 // }
251 // }
252
253 if (LogLevel >= 1)
254 m_log.DebugFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Processing job {0}", m_currentJob.Name);
255
256 try
257 {
258 m_currentJob.Callback.Invoke(m_currentJob.O);
259 }
260 catch (Exception e)
261 {
262 m_log.Error(
263 string.Format(
264 "[INCOMING PACKET ASYNC HANDLING ENGINE]: Job {0} failed, continuing. Exception ", m_currentJob.Name), e);
265 }
266
267 if (LogLevel >= 1)
268 m_log.DebugFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Processed job {0}", m_currentJob.Name);
269
270 m_currentJob = null;
271 }
272 }
273 catch (OperationCanceledException)
274 {
275 }
276
277 m_finishedProcessingAfterStop.Set();
278 }
279
280 // private void HandleControlCommand(string module, string[] args)
281 // {
282 // // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
283 // // return;
284 //
285 // if (args.Length < 3)
286 // {
287 // MainConsole.Instance.Output("Usage: debug jobengine <stop|start|status|loglevel>");
288 // return;
289 // }
290 //
291 // string subCommand = args[2];
292 //
293 // if (subCommand == "stop")
294 // {
295 // Stop();
296 // MainConsole.Instance.OutputFormat("Stopped job engine.");
297 // }
298 // else if (subCommand == "start")
299 // {
300 // Start();
301 // MainConsole.Instance.OutputFormat("Started job engine.");
302 // }
303 // else if (subCommand == "status")
304 // {
305 // MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning);
306 // MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none");
307 // MainConsole.Instance.OutputFormat(
308 // "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a");
309 // MainConsole.Instance.OutputFormat("Log Level: {0}", LogLevel);
310 // }
311 //
312 // else if (subCommand == "loglevel")
313 // {
314 // // int logLevel;
315 // int logLevel = int.Parse(args[3]);
316 // // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel))
317 // // {
318 // LogLevel = logLevel;
319 // MainConsole.Instance.OutputFormat("Set log level to {0}", LogLevel);
320 // // }
321 // }
322 // else
323 // {
324 // MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand);
325 // }
326 // }
327 }
328}