aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs
diff options
context:
space:
mode:
authorRobert Adams2014-08-21 06:36:19 -0700
committerRobert Adams2014-08-21 06:36:19 -0700
commit7ba3b88fb66de8c22ae1e067224b40214c23957c (patch)
tree24fec969bc6894d273565bb4d53da0b4196dae90 /OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs
parentFix typo in OpenSimDefaults.ini comment (diff)
parentExtend drop command to "debug lludp drop <in|out>..." to allow drop of inboun... (diff)
downloadopensim-SC_OLD-7ba3b88fb66de8c22ae1e067224b40214c23957c.zip
opensim-SC_OLD-7ba3b88fb66de8c22ae1e067224b40214c23957c.tar.gz
opensim-SC_OLD-7ba3b88fb66de8c22ae1e067224b40214c23957c.tar.bz2
opensim-SC_OLD-7ba3b88fb66de8c22ae1e067224b40214c23957c.tar.xz
Merge branch 'master' into bullet-2.82
Diffstat (limited to 'OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs')
-rw-r--r--OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs286
1 files changed, 286 insertions, 0 deletions
diff --git a/OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs b/OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs
new file mode 100644
index 0000000..0659d8e
--- /dev/null
+++ b/OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs
@@ -0,0 +1,286 @@
1/*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28using System;
29using System.Collections.Concurrent;
30using System.Reflection;
31using System.Threading;
32using log4net;
33using OpenSim.Framework;
34using OpenSim.Framework.Monitoring;
35using OpenSim.Region.Framework.Scenes;
36
37namespace OpenSim.Region.ClientStack.LindenUDP
38{
39 public struct RefillRequest
40 {
41 public LLUDPClient Client;
42 public ThrottleOutPacketTypeFlags Categories;
43
44 public RefillRequest(LLUDPClient client, ThrottleOutPacketTypeFlags categories)
45 {
46 Client = client;
47 Categories = categories;
48 }
49 }
50
51 public class OutgoingQueueRefillEngine
52 {
53 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
54
55 public bool IsRunning { get; private set; }
56
57 /// <summary>
58 /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
59 /// </summary>
60 public int RequestProcessTimeoutOnStop { get; set; }
61
62 /// <summary>
63 /// Controls whether we need to warn in the log about exceeding the max queue size.
64 /// </summary>
65 /// <remarks>
66 /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
67 /// order to avoid spamming the log with lots of warnings.
68 /// </remarks>
69 private bool m_warnOverMaxQueue = true;
70
71 private BlockingCollection<RefillRequest> m_requestQueue;
72
73 private CancellationTokenSource m_cancelSource = new CancellationTokenSource();
74
75 private LLUDPServer m_udpServer;
76
77 private Stat m_oqreRequestsWaitingStat;
78
79 /// <summary>
80 /// Used to signal that we are ready to complete stop.
81 /// </summary>
82 private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
83
84 public OutgoingQueueRefillEngine(LLUDPServer server)
85 {
86 RequestProcessTimeoutOnStop = 5000;
87 m_udpServer = server;
88
89 MainConsole.Instance.Commands.AddCommand(
90 "Debug",
91 false,
92 "debug lludp oqre",
93 "debug lludp oqre <start|stop|status>",
94 "Start, stop or get status of OutgoingQueueRefillEngine.",
95 "Experimental.",
96 HandleOqreCommand);
97 }
98
99 public void Start()
100 {
101 lock (this)
102 {
103 if (IsRunning)
104 return;
105
106 IsRunning = true;
107
108 m_finishedProcessingAfterStop.Reset();
109
110 m_requestQueue = new BlockingCollection<RefillRequest>(new ConcurrentQueue<RefillRequest>(), 5000);
111
112 m_oqreRequestsWaitingStat =
113 new Stat(
114 "OQRERequestsWaiting",
115 "Number of outgong queue refill requests waiting for processing.",
116 "",
117 "",
118 "clientstack",
119 m_udpServer.Scene.Name,
120 StatType.Pull,
121 MeasuresOfInterest.None,
122 stat => stat.Value = m_requestQueue.Count,
123 StatVerbosity.Debug);
124
125 StatsManager.RegisterStat(m_oqreRequestsWaitingStat);
126
127 Watchdog.StartThread(
128 ProcessRequests,
129 String.Format("OutgoingQueueRefillEngineThread ({0})", m_udpServer.Scene.Name),
130 ThreadPriority.Normal,
131 false,
132 true,
133 null,
134 int.MaxValue);
135 }
136 }
137
138 public void Stop()
139 {
140 lock (this)
141 {
142 try
143 {
144 if (!IsRunning)
145 return;
146
147 IsRunning = false;
148
149 int requestsLeft = m_requestQueue.Count;
150
151 if (requestsLeft <= 0)
152 {
153 m_cancelSource.Cancel();
154 }
155 else
156 {
157 m_log.InfoFormat("[OUTGOING QUEUE REFILL ENGINE]: Waiting to write {0} events after stop.", requestsLeft);
158
159 while (requestsLeft > 0)
160 {
161 if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
162 {
163 // After timeout no events have been written
164 if (requestsLeft == m_requestQueue.Count)
165 {
166 m_log.WarnFormat(
167 "[OUTGOING QUEUE REFILL ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests",
168 RequestProcessTimeoutOnStop, requestsLeft);
169
170 break;
171 }
172 }
173
174 requestsLeft = m_requestQueue.Count;
175 }
176 }
177 }
178 finally
179 {
180 m_cancelSource.Dispose();
181 StatsManager.DeregisterStat(m_oqreRequestsWaitingStat);
182 m_oqreRequestsWaitingStat = null;
183 m_requestQueue = null;
184 }
185 }
186 }
187
188 public bool QueueRequest(LLUDPClient client, ThrottleOutPacketTypeFlags categories)
189 {
190 if (m_requestQueue.Count < m_requestQueue.BoundedCapacity)
191 {
192// m_log.DebugFormat(
193// "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}",
194// categories, client.AgentID, m_udpServer.Scene.Name);
195
196 m_requestQueue.Add(new RefillRequest(client, categories));
197
198 if (!m_warnOverMaxQueue)
199 m_warnOverMaxQueue = true;
200
201 return true;
202 }
203 else
204 {
205 if (m_warnOverMaxQueue)
206 {
207 m_log.WarnFormat(
208 "[OUTGOING QUEUE REFILL ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}",
209 client.AgentID, m_udpServer.Scene.Name);
210
211 m_warnOverMaxQueue = false;
212 }
213
214 return false;
215 }
216 }
217
218 private void ProcessRequests()
219 {
220 try
221 {
222 while (IsRunning || m_requestQueue.Count > 0)
223 {
224 RefillRequest req = m_requestQueue.Take(m_cancelSource.Token);
225
226 // QueueEmpty callback = req.Client.OnQueueEmpty;
227 //
228 // if (callback != null)
229 // {
230 // try
231 // {
232 // callback(req.Categories);
233 // }
234 // catch (Exception e)
235 // {
236 // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e);
237 // }
238 // }
239
240 req.Client.FireQueueEmpty(req.Categories);
241 }
242 }
243 catch (OperationCanceledException)
244 {
245 }
246
247 m_finishedProcessingAfterStop.Set();
248 }
249
250 private void HandleOqreCommand(string module, string[] args)
251 {
252 if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
253 return;
254
255 if (args.Length != 4)
256 {
257 MainConsole.Instance.Output("Usage: debug lludp oqre <stop|start|status>");
258 return;
259 }
260
261 string subCommand = args[3];
262
263 if (subCommand == "stop")
264 {
265 Stop();
266 MainConsole.Instance.OutputFormat("Stopped OQRE for {0}", m_udpServer.Scene.Name);
267 }
268 else if (subCommand == "start")
269 {
270 Start();
271 MainConsole.Instance.OutputFormat("Started OQRE for {0}", m_udpServer.Scene.Name);
272 }
273 else if (subCommand == "status")
274 {
275 MainConsole.Instance.OutputFormat("OQRE in {0}", m_udpServer.Scene.Name);
276 MainConsole.Instance.OutputFormat("Running: {0}", IsRunning);
277 MainConsole.Instance.OutputFormat(
278 "Requests waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a");
279 }
280 else
281 {
282 MainConsole.Instance.OutputFormat("Unrecognized OQRE subcommand {0}", subCommand);
283 }
284 }
285 }
286} \ No newline at end of file