aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r--OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs290
1 files changed, 283 insertions, 7 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index 8d50151..a1dee4e 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -34,14 +34,18 @@ using HttpServer;
34using OpenSim.Framework; 34using OpenSim.Framework;
35using OpenSim.Framework.Monitoring; 35using OpenSim.Framework.Monitoring;
36 36
37
38/*
37namespace OpenSim.Framework.Servers.HttpServer 39namespace OpenSim.Framework.Servers.HttpServer
38{ 40{
41
39 public class PollServiceRequestManager 42 public class PollServiceRequestManager
40 { 43 {
41// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); 44// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
42 45
43 private readonly BaseHttpServer m_server; 46 private readonly BaseHttpServer m_server;
44 private static Queue m_requests = Queue.Synchronized(new Queue()); 47 private static Queue m_requests = Queue.Synchronized(new Queue());
48 private static ManualResetEvent m_ev = new ManualResetEvent(false);
45 private uint m_WorkerThreadCount = 0; 49 private uint m_WorkerThreadCount = 0;
46 private Thread[] m_workerThreads; 50 private Thread[] m_workerThreads;
47 private PollServiceWorkerThread[] m_PollServiceWorkerThreads; 51 private PollServiceWorkerThread[] m_PollServiceWorkerThreads;
@@ -67,7 +71,6 @@ namespace OpenSim.Framework.Servers.HttpServer
67 ThreadPriority.Normal, 71 ThreadPriority.Normal,
68 false, 72 false,
69 true, 73 true,
70 null,
71 int.MaxValue); 74 int.MaxValue);
72 } 75 }
73 76
@@ -77,7 +80,6 @@ namespace OpenSim.Framework.Servers.HttpServer
77 ThreadPriority.Normal, 80 ThreadPriority.Normal,
78 false, 81 false,
79 true, 82 true,
80 null,
81 1000 * 60 * 10); 83 1000 * 60 * 10);
82 } 84 }
83 85
@@ -91,15 +93,17 @@ namespace OpenSim.Framework.Servers.HttpServer
91 { 93 {
92 lock (m_requests) 94 lock (m_requests)
93 m_requests.Enqueue(req); 95 m_requests.Enqueue(req);
96 m_ev.Set();
94 } 97 }
95 98
96 public void ThreadStart() 99 public void ThreadStart()
97 { 100 {
98 while (m_running) 101 while (m_running)
99 { 102 {
103 m_ev.WaitOne(1000);
104 m_ev.Reset();
100 Watchdog.UpdateThread(); 105 Watchdog.UpdateThread();
101 ProcessQueuedRequests(); 106 ProcessQueuedRequests();
102 Thread.Sleep(1000);
103 } 107 }
104 } 108 }
105 109
@@ -141,8 +145,9 @@ namespace OpenSim.Framework.Servers.HttpServer
141 foreach (object o in m_requests) 145 foreach (object o in m_requests)
142 { 146 {
143 PollServiceHttpRequest req = (PollServiceHttpRequest) o; 147 PollServiceHttpRequest req = (PollServiceHttpRequest) o;
144 PollServiceWorkerThread.DoHTTPGruntWork( 148 m_server.DoHTTPGruntWork(
145 m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); 149 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id),
150 new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext));
146 } 151 }
147 152
148 m_requests.Clear(); 153 m_requests.Clear();
@@ -151,8 +156,279 @@ namespace OpenSim.Framework.Servers.HttpServer
151 { 156 {
152 t.Abort(); 157 t.Abort();
153 } 158 }
154
155 m_running = false; 159 m_running = false;
156 } 160 }
157 } 161 }
158} \ No newline at end of file 162}
163 */
164
165using System.IO;
166using System.Text;
167using System.Collections.Generic;
168
169namespace OpenSim.Framework.Servers.HttpServer
170{
171 public class PollServiceRequestManager
172 {
173 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
174
175 private readonly BaseHttpServer m_server;
176
177 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
178 private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>();
179 private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
180
181 private uint m_WorkerThreadCount = 0;
182 private Thread[] m_workerThreads;
183 private Thread m_retrysThread;
184
185 private bool m_running = true;
186 private int slowCount = 0;
187
188// private int m_timeout = 1000; // increase timeout 250; now use the event one
189
190 public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout)
191 {
192 m_server = pSrv;
193 m_WorkerThreadCount = pWorkerThreadCount;
194 m_workerThreads = new Thread[m_WorkerThreadCount];
195
196 //startup worker threads
197 for (uint i = 0; i < m_WorkerThreadCount; i++)
198 {
199 m_workerThreads[i]
200 = Watchdog.StartThread(
201 PoolWorkerJob,
202 String.Format("PollServiceWorkerThread{0}", i),
203 ThreadPriority.Normal,
204 false,
205 true,
206 null,
207 int.MaxValue);
208 }
209
210 m_retrysThread = Watchdog.StartThread(
211 this.CheckRetries,
212 "PollServiceWatcherThread",
213 ThreadPriority.Normal,
214 false,
215 true,
216 null,
217 1000 * 60 * 10);
218 }
219
220
221 private void ReQueueEvent(PollServiceHttpRequest req)
222 {
223 if (m_running)
224 {
225 lock (m_retryRequests)
226 m_retryRequests.Enqueue(req);
227 }
228 }
229
230 public void Enqueue(PollServiceHttpRequest req)
231 {
232 if (m_running)
233 {
234 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LslHttp ||
235 req.PollServiceArgs.Type == PollServiceEventArgs.EventType.Inventory)
236 {
237 m_requests.Enqueue(req);
238 }
239 else
240 {
241 lock (m_slowRequests)
242 m_slowRequests.Enqueue(req);
243 }
244 }
245 }
246
247 private void CheckRetries()
248 {
249 while (m_running)
250 {
251 Thread.Sleep(100); // let the world move .. back to faster rate
252 Watchdog.UpdateThread();
253 lock (m_retryRequests)
254 {
255 while (m_retryRequests.Count > 0 && m_running)
256 m_requests.Enqueue(m_retryRequests.Dequeue());
257 }
258 slowCount++;
259 if (slowCount >= 10)
260 {
261 slowCount = 0;
262
263 lock (m_slowRequests)
264 {
265 while (m_slowRequests.Count > 0 && m_running)
266 m_requests.Enqueue(m_slowRequests.Dequeue());
267 }
268 }
269 }
270 }
271
272 ~PollServiceRequestManager()
273 {
274 m_running = false;
275// m_timeout = -10000; // cause all to expire
276 Thread.Sleep(1000); // let the world move
277
278 foreach (Thread t in m_workerThreads)
279 {
280 try
281 {
282 t.Abort();
283 }
284 catch
285 {
286 }
287 }
288
289 try
290 {
291 foreach (PollServiceHttpRequest req in m_retryRequests)
292 {
293 DoHTTPGruntWork(m_server,req,
294 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
295 }
296 }
297 catch
298 {
299 }
300
301 PollServiceHttpRequest wreq;
302 m_retryRequests.Clear();
303
304 lock (m_slowRequests)
305 {
306 while (m_slowRequests.Count > 0 && m_running)
307 m_requests.Enqueue(m_slowRequests.Dequeue());
308 }
309
310 while (m_requests.Count() > 0)
311 {
312 try
313 {
314 wreq = m_requests.Dequeue(0);
315 DoHTTPGruntWork(m_server,wreq,
316 wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
317 }
318 catch
319 {
320 }
321 }
322
323 m_requests.Clear();
324 }
325
326 // work threads
327
328 private void PoolWorkerJob()
329 {
330 PollServiceHttpRequest req;
331 StreamReader str;
332
333// while (true)
334 while (m_running)
335 {
336 req = m_requests.Dequeue(5000);
337
338 Watchdog.UpdateThread();
339 if (req != null)
340 {
341 try
342 {
343 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
344 {
345 try
346 {
347 str = new StreamReader(req.Request.Body);
348 }
349 catch (System.ArgumentException)
350 {
351 // Stream was not readable means a child agent
352 // was closed due to logout, leaving the
353 // Event Queue request orphaned.
354 continue;
355 }
356
357 try
358 {
359 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id, str.ReadToEnd());
360 DoHTTPGruntWork(m_server, req, responsedata);
361 }
362 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
363 {
364 // Ignore it, no need to reply
365 }
366
367 str.Close();
368
369 }
370 else
371 {
372// if ((Environment.TickCount - req.RequestTime) > m_timeout)
373
374 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
375 {
376 DoHTTPGruntWork(m_server, req,
377 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
378 }
379 else
380 {
381 ReQueueEvent(req);
382 }
383 }
384 }
385 catch (Exception e)
386 {
387 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
388 }
389 }
390 }
391 }
392
393 // DoHTTPGruntWork changed, not sending response
394 // do the same work around as core
395
396 internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata)
397 {
398 OSHttpResponse response
399 = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext);
400
401 byte[] buffer = server.DoHTTPGruntWork(responsedata, response);
402
403 response.SendChunked = false;
404 response.ContentLength64 = buffer.Length;
405 response.ContentEncoding = Encoding.UTF8;
406
407 try
408 {
409 response.OutputStream.Write(buffer, 0, buffer.Length);
410 }
411 catch (Exception ex)
412 {
413 m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex));
414 }
415 finally
416 {
417 //response.OutputStream.Close();
418 try
419 {
420 response.OutputStream.Flush();
421 response.Send();
422
423 //if (!response.KeepAlive && response.ReuseContext)
424 // response.FreeContext();
425 }
426 catch (Exception e)
427 {
428 m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e));
429 }
430 }
431 }
432 }
433}
434