aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r--OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs289
1 files changed, 282 insertions, 7 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index 3252251..a3bd330 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -33,14 +33,18 @@ using log4net;
33using HttpServer; 33using HttpServer;
34using OpenSim.Framework; 34using OpenSim.Framework;
35 35
36
37/*
36namespace OpenSim.Framework.Servers.HttpServer 38namespace OpenSim.Framework.Servers.HttpServer
37{ 39{
40
38 public class PollServiceRequestManager 41 public class PollServiceRequestManager
39 { 42 {
40// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); 43// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
41 44
42 private readonly BaseHttpServer m_server; 45 private readonly BaseHttpServer m_server;
43 private static Queue m_requests = Queue.Synchronized(new Queue()); 46 private static Queue m_requests = Queue.Synchronized(new Queue());
47 private static ManualResetEvent m_ev = new ManualResetEvent(false);
44 private uint m_WorkerThreadCount = 0; 48 private uint m_WorkerThreadCount = 0;
45 private Thread[] m_workerThreads; 49 private Thread[] m_workerThreads;
46 private PollServiceWorkerThread[] m_PollServiceWorkerThreads; 50 private PollServiceWorkerThread[] m_PollServiceWorkerThreads;
@@ -66,7 +70,6 @@ namespace OpenSim.Framework.Servers.HttpServer
66 ThreadPriority.Normal, 70 ThreadPriority.Normal,
67 false, 71 false,
68 true, 72 true,
69 null,
70 int.MaxValue); 73 int.MaxValue);
71 } 74 }
72 75
@@ -76,7 +79,6 @@ namespace OpenSim.Framework.Servers.HttpServer
76 ThreadPriority.Normal, 79 ThreadPriority.Normal,
77 false, 80 false,
78 true, 81 true,
79 null,
80 1000 * 60 * 10); 82 1000 * 60 * 10);
81 } 83 }
82 84
@@ -90,15 +92,17 @@ namespace OpenSim.Framework.Servers.HttpServer
90 { 92 {
91 lock (m_requests) 93 lock (m_requests)
92 m_requests.Enqueue(req); 94 m_requests.Enqueue(req);
95 m_ev.Set();
93 } 96 }
94 97
95 public void ThreadStart() 98 public void ThreadStart()
96 { 99 {
97 while (m_running) 100 while (m_running)
98 { 101 {
102 m_ev.WaitOne(1000);
103 m_ev.Reset();
99 Watchdog.UpdateThread(); 104 Watchdog.UpdateThread();
100 ProcessQueuedRequests(); 105 ProcessQueuedRequests();
101 Thread.Sleep(1000);
102 } 106 }
103 } 107 }
104 108
@@ -140,8 +144,9 @@ namespace OpenSim.Framework.Servers.HttpServer
140 foreach (object o in m_requests) 144 foreach (object o in m_requests)
141 { 145 {
142 PollServiceHttpRequest req = (PollServiceHttpRequest) o; 146 PollServiceHttpRequest req = (PollServiceHttpRequest) o;
143 PollServiceWorkerThread.DoHTTPGruntWork( 147 m_server.DoHTTPGruntWork(
144 m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); 148 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id),
149 new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext));
145 } 150 }
146 151
147 m_requests.Clear(); 152 m_requests.Clear();
@@ -150,8 +155,278 @@ namespace OpenSim.Framework.Servers.HttpServer
150 { 155 {
151 t.Abort(); 156 t.Abort();
152 } 157 }
153
154 m_running = false; 158 m_running = false;
155 } 159 }
156 } 160 }
157} \ No newline at end of file 161}
162 */
163
164using System.IO;
165using System.Text;
166using System.Collections.Generic;
167
168namespace OpenSim.Framework.Servers.HttpServer
169{
170 public class PollServiceRequestManager
171 {
172 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
173
174 private readonly BaseHttpServer m_server;
175
176 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
177 private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>();
178 private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
179
180 private uint m_WorkerThreadCount = 0;
181 private Thread[] m_workerThreads;
182 private Thread m_retrysThread;
183
184 private bool m_running = true;
185 private int slowCount = 0;
186
187// private int m_timeout = 1000; // increase timeout 250; now use the event one
188
189 public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout)
190 {
191 m_server = pSrv;
192 m_WorkerThreadCount = pWorkerThreadCount;
193 m_workerThreads = new Thread[m_WorkerThreadCount];
194
195 //startup worker threads
196 for (uint i = 0; i < m_WorkerThreadCount; i++)
197 {
198 m_workerThreads[i]
199 = Watchdog.StartThread(
200 PoolWorkerJob,
201 String.Format("PollServiceWorkerThread{0}", i),
202 ThreadPriority.Normal,
203 false,
204 true,
205 null,
206 int.MaxValue);
207 }
208
209 m_retrysThread = Watchdog.StartThread(
210 this.CheckRetries,
211 "PollServiceWatcherThread",
212 ThreadPriority.Normal,
213 false,
214 true,
215 null,
216 1000 * 60 * 10);
217 }
218
219
220 private void ReQueueEvent(PollServiceHttpRequest req)
221 {
222 if (m_running)
223 {
224 lock (m_retryRequests)
225 m_retryRequests.Enqueue(req);
226 }
227 }
228
229 public void Enqueue(PollServiceHttpRequest req)
230 {
231 if (m_running)
232 {
233 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LslHttp)
234 {
235 m_requests.Enqueue(req);
236 }
237 else
238 {
239 lock (m_slowRequests)
240 m_slowRequests.Enqueue(req);
241 }
242 }
243 }
244
245 private void CheckRetries()
246 {
247 while (m_running)
248 {
249 Thread.Sleep(100); // let the world move .. back to faster rate
250 Watchdog.UpdateThread();
251 lock (m_retryRequests)
252 {
253 while (m_retryRequests.Count > 0 && m_running)
254 m_requests.Enqueue(m_retryRequests.Dequeue());
255 }
256 slowCount++;
257 if (slowCount >= 10)
258 {
259 slowCount = 0;
260
261 lock (m_slowRequests)
262 {
263 while (m_slowRequests.Count > 0 && m_running)
264 m_requests.Enqueue(m_slowRequests.Dequeue());
265 }
266 }
267 }
268 }
269
270 ~PollServiceRequestManager()
271 {
272 m_running = false;
273// m_timeout = -10000; // cause all to expire
274 Thread.Sleep(1000); // let the world move
275
276 foreach (Thread t in m_workerThreads)
277 {
278 try
279 {
280 t.Abort();
281 }
282 catch
283 {
284 }
285 }
286
287 try
288 {
289 foreach (PollServiceHttpRequest req in m_retryRequests)
290 {
291 DoHTTPGruntWork(m_server,req,
292 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
293 }
294 }
295 catch
296 {
297 }
298
299 PollServiceHttpRequest wreq;
300 m_retryRequests.Clear();
301
302 lock (m_slowRequests)
303 {
304 while (m_slowRequests.Count > 0 && m_running)
305 m_requests.Enqueue(m_slowRequests.Dequeue());
306 }
307
308 while (m_requests.Count() > 0)
309 {
310 try
311 {
312 wreq = m_requests.Dequeue(0);
313 DoHTTPGruntWork(m_server,wreq,
314 wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
315 }
316 catch
317 {
318 }
319 }
320
321 m_requests.Clear();
322 }
323
324 // work threads
325
326 private void PoolWorkerJob()
327 {
328 PollServiceHttpRequest req;
329 StreamReader str;
330
331// while (true)
332 while (m_running)
333 {
334 req = m_requests.Dequeue(5000);
335
336 Watchdog.UpdateThread();
337 if (req != null)
338 {
339 try
340 {
341 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
342 {
343 try
344 {
345 str = new StreamReader(req.Request.Body);
346 }
347 catch (System.ArgumentException)
348 {
349 // Stream was not readable means a child agent
350 // was closed due to logout, leaving the
351 // Event Queue request orphaned.
352 continue;
353 }
354
355 try
356 {
357 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id, str.ReadToEnd());
358 DoHTTPGruntWork(m_server, req, responsedata);
359 }
360 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
361 {
362 // Ignore it, no need to reply
363 }
364
365 str.Close();
366
367 }
368 else
369 {
370// if ((Environment.TickCount - req.RequestTime) > m_timeout)
371
372 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
373 {
374 DoHTTPGruntWork(m_server, req,
375 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
376 }
377 else
378 {
379 ReQueueEvent(req);
380 }
381 }
382 }
383 catch (Exception e)
384 {
385 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
386 }
387 }
388 }
389 }
390
391 // DoHTTPGruntWork changed, not sending response
392 // do the same work around as core
393
394 internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata)
395 {
396 OSHttpResponse response
397 = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext);
398
399 byte[] buffer = server.DoHTTPGruntWork(responsedata, response);
400
401 response.SendChunked = false;
402 response.ContentLength64 = buffer.Length;
403 response.ContentEncoding = Encoding.UTF8;
404
405 try
406 {
407 response.OutputStream.Write(buffer, 0, buffer.Length);
408 }
409 catch (Exception ex)
410 {
411 m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex));
412 }
413 finally
414 {
415 //response.OutputStream.Close();
416 try
417 {
418 response.OutputStream.Flush();
419 response.Send();
420
421 //if (!response.KeepAlive && response.ReuseContext)
422 // response.FreeContext();
423 }
424 catch (Exception e)
425 {
426 m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e));
427 }
428 }
429 }
430 }
431}
432