aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r--OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs243
1 files changed, 241 insertions, 2 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index 3252251..3a14b6f 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -33,14 +33,18 @@ using log4net;
33using HttpServer; 33using HttpServer;
34using OpenSim.Framework; 34using OpenSim.Framework;
35 35
36
37/*
36namespace OpenSim.Framework.Servers.HttpServer 38namespace OpenSim.Framework.Servers.HttpServer
37{ 39{
40
38 public class PollServiceRequestManager 41 public class PollServiceRequestManager
39 { 42 {
40// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); 43// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
41 44
42 private readonly BaseHttpServer m_server; 45 private readonly BaseHttpServer m_server;
43 private static Queue m_requests = Queue.Synchronized(new Queue()); 46 private static Queue m_requests = Queue.Synchronized(new Queue());
47 private static ManualResetEvent m_ev = new ManualResetEvent(false);
44 private uint m_WorkerThreadCount = 0; 48 private uint m_WorkerThreadCount = 0;
45 private Thread[] m_workerThreads; 49 private Thread[] m_workerThreads;
46 private PollServiceWorkerThread[] m_PollServiceWorkerThreads; 50 private PollServiceWorkerThread[] m_PollServiceWorkerThreads;
@@ -90,15 +94,17 @@ namespace OpenSim.Framework.Servers.HttpServer
90 { 94 {
91 lock (m_requests) 95 lock (m_requests)
92 m_requests.Enqueue(req); 96 m_requests.Enqueue(req);
97 m_ev.Set();
93 } 98 }
94 99
95 public void ThreadStart() 100 public void ThreadStart()
96 { 101 {
97 while (m_running) 102 while (m_running)
98 { 103 {
104 m_ev.WaitOne(1000);
105 m_ev.Reset();
99 Watchdog.UpdateThread(); 106 Watchdog.UpdateThread();
100 ProcessQueuedRequests(); 107 ProcessQueuedRequests();
101 Thread.Sleep(1000);
102 } 108 }
103 } 109 }
104 110
@@ -154,4 +160,237 @@ namespace OpenSim.Framework.Servers.HttpServer
154 m_running = false; 160 m_running = false;
155 } 161 }
156 } 162 }
157} \ No newline at end of file 163}
164 */
165
166using System.IO;
167using System.Text;
168using System.Collections.Generic;
169
170namespace OpenSim.Framework.Servers.HttpServer
171{
172 public class PollServiceRequestManager
173 {
174 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
175
176 private readonly BaseHttpServer m_server;
177
178 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
179 private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>();
180 private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
181
182 private uint m_WorkerThreadCount = 0;
183 private Thread[] m_workerThreads;
184 private Thread m_retrysThread;
185
186 private bool m_running = true;
187 private int slowCount = 0;
188
189 // private int m_timeout = 250; // increase timeout 250; now use the event one
190
191 public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout)
192 {
193 m_server = pSrv;
194 m_WorkerThreadCount = pWorkerThreadCount;
195 m_workerThreads = new Thread[m_WorkerThreadCount];
196
197 //startup worker threads
198 for (uint i = 0; i < m_WorkerThreadCount; i++)
199 {
200 m_workerThreads[i]
201 = Watchdog.StartThread(
202 PoolWorkerJob,
203 String.Format("PollServiceWorkerThread{0}", i),
204 ThreadPriority.Normal,
205 false,
206 true,
207 null,
208 int.MaxValue);
209 }
210
211 m_retrysThread = Watchdog.StartThread(
212 this.CheckRetries,
213 "PollServiceWatcherThread",
214 ThreadPriority.Normal,
215 false,
216 true,
217 null,
218 1000 * 60 * 10);
219 }
220
221
222 private void ReQueueEvent(PollServiceHttpRequest req)
223 {
224 if (m_running)
225 {
226 lock (m_retryRequests)
227 m_retryRequests.Enqueue(req);
228 }
229 }
230
231 public void Enqueue(PollServiceHttpRequest req)
232 {
233 if (m_running)
234 {
235 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LslHttp)
236 {
237 m_requests.Enqueue(req);
238 }
239 else
240 {
241 lock (m_slowRequests)
242 m_slowRequests.Enqueue(req);
243 }
244 }
245 }
246
247 private void CheckRetries()
248 {
249 while (m_running)
250 {
251 Thread.Sleep(100); // let the world move .. back to faster rate
252 Watchdog.UpdateThread();
253 lock (m_retryRequests)
254 {
255 while (m_retryRequests.Count > 0 && m_running)
256 m_requests.Enqueue(m_retryRequests.Dequeue());
257 }
258 slowCount++;
259 if (slowCount >= 10)
260 {
261 slowCount = 0;
262
263 lock (m_slowRequests)
264 {
265 while (m_slowRequests.Count > 0 && m_running)
266 m_requests.Enqueue(m_slowRequests.Dequeue());
267 }
268 }
269 }
270 }
271
272 ~PollServiceRequestManager()
273 {
274 m_running = false;
275// m_timeout = -10000; // cause all to expire
276 Thread.Sleep(1000); // let the world move
277
278 foreach (Thread t in m_workerThreads)
279 {
280 try
281 {
282 t.Abort();
283 }
284 catch
285 {
286 }
287 }
288
289 try
290 {
291 foreach (PollServiceHttpRequest req in m_retryRequests)
292 {
293 m_server.DoHTTPGruntWork(
294 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id),
295 new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext));
296 }
297 }
298 catch
299 {
300 }
301
302 PollServiceHttpRequest wreq;
303 m_retryRequests.Clear();
304
305 lock (m_slowRequests)
306 {
307 while (m_slowRequests.Count > 0 && m_running)
308 m_requests.Enqueue(m_slowRequests.Dequeue());
309 }
310
311 while (m_requests.Count() > 0)
312 {
313 try
314 {
315 wreq = m_requests.Dequeue(0);
316 m_server.DoHTTPGruntWork(
317 wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id),
318 new OSHttpResponse(new HttpResponse(wreq.HttpContext, wreq.Request), wreq.HttpContext));
319 }
320 catch
321 {
322 }
323 }
324
325 m_requests.Clear();
326 }
327
328 // work threads
329
330 private void PoolWorkerJob()
331 {
332 PollServiceHttpRequest req;
333 StreamReader str;
334
335// while (true)
336 while (m_running)
337 {
338 req = m_requests.Dequeue(5000);
339
340 Watchdog.UpdateThread();
341 if (req != null)
342 {
343 try
344 {
345 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
346 {
347 try
348 {
349 str = new StreamReader(req.Request.Body);
350 }
351 catch (System.ArgumentException)
352 {
353 // Stream was not readable means a child agent
354 // was closed due to logout, leaving the
355 // Event Queue request orphaned.
356 continue;
357 }
358
359 try
360 {
361 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id, str.ReadToEnd());
362 m_server.DoHTTPGruntWork(responsedata,
363 new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext));
364 }
365 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
366 {
367 // Ignore it, no need to reply
368 }
369
370 str.Close();
371
372 }
373 else
374 {
375 // if ((Environment.TickCount - req.RequestTime) > m_timeout)
376 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
377 {
378 m_server.DoHTTPGruntWork(req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id),
379 new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext));
380 }
381 else
382 {
383 ReQueueEvent(req);
384 }
385 }
386 }
387 catch (Exception e)
388 {
389 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
390 }
391 }
392 }
393 }
394 }
395}
396