aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r--OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs289
1 files changed, 282 insertions, 7 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index 8d50151..db088e7 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -34,14 +34,18 @@ using HttpServer;
34using OpenSim.Framework; 34using OpenSim.Framework;
35using OpenSim.Framework.Monitoring; 35using OpenSim.Framework.Monitoring;
36 36
37
38/*
37namespace OpenSim.Framework.Servers.HttpServer 39namespace OpenSim.Framework.Servers.HttpServer
38{ 40{
41
39 public class PollServiceRequestManager 42 public class PollServiceRequestManager
40 { 43 {
41// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); 44// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
42 45
43 private readonly BaseHttpServer m_server; 46 private readonly BaseHttpServer m_server;
44 private static Queue m_requests = Queue.Synchronized(new Queue()); 47 private static Queue m_requests = Queue.Synchronized(new Queue());
48 private static ManualResetEvent m_ev = new ManualResetEvent(false);
45 private uint m_WorkerThreadCount = 0; 49 private uint m_WorkerThreadCount = 0;
46 private Thread[] m_workerThreads; 50 private Thread[] m_workerThreads;
47 private PollServiceWorkerThread[] m_PollServiceWorkerThreads; 51 private PollServiceWorkerThread[] m_PollServiceWorkerThreads;
@@ -67,7 +71,6 @@ namespace OpenSim.Framework.Servers.HttpServer
67 ThreadPriority.Normal, 71 ThreadPriority.Normal,
68 false, 72 false,
69 true, 73 true,
70 null,
71 int.MaxValue); 74 int.MaxValue);
72 } 75 }
73 76
@@ -77,7 +80,6 @@ namespace OpenSim.Framework.Servers.HttpServer
77 ThreadPriority.Normal, 80 ThreadPriority.Normal,
78 false, 81 false,
79 true, 82 true,
80 null,
81 1000 * 60 * 10); 83 1000 * 60 * 10);
82 } 84 }
83 85
@@ -91,15 +93,17 @@ namespace OpenSim.Framework.Servers.HttpServer
91 { 93 {
92 lock (m_requests) 94 lock (m_requests)
93 m_requests.Enqueue(req); 95 m_requests.Enqueue(req);
96 m_ev.Set();
94 } 97 }
95 98
96 public void ThreadStart() 99 public void ThreadStart()
97 { 100 {
98 while (m_running) 101 while (m_running)
99 { 102 {
103 m_ev.WaitOne(1000);
104 m_ev.Reset();
100 Watchdog.UpdateThread(); 105 Watchdog.UpdateThread();
101 ProcessQueuedRequests(); 106 ProcessQueuedRequests();
102 Thread.Sleep(1000);
103 } 107 }
104 } 108 }
105 109
@@ -141,8 +145,9 @@ namespace OpenSim.Framework.Servers.HttpServer
141 foreach (object o in m_requests) 145 foreach (object o in m_requests)
142 { 146 {
143 PollServiceHttpRequest req = (PollServiceHttpRequest) o; 147 PollServiceHttpRequest req = (PollServiceHttpRequest) o;
144 PollServiceWorkerThread.DoHTTPGruntWork( 148 m_server.DoHTTPGruntWork(
145 m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); 149 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id),
150 new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext));
146 } 151 }
147 152
148 m_requests.Clear(); 153 m_requests.Clear();
@@ -151,8 +156,278 @@ namespace OpenSim.Framework.Servers.HttpServer
151 { 156 {
152 t.Abort(); 157 t.Abort();
153 } 158 }
154
155 m_running = false; 159 m_running = false;
156 } 160 }
157 } 161 }
158} \ No newline at end of file 162}
163 */
164
165using System.IO;
166using System.Text;
167using System.Collections.Generic;
168
169namespace OpenSim.Framework.Servers.HttpServer
170{
171 public class PollServiceRequestManager
172 {
173 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
174
175 private readonly BaseHttpServer m_server;
176
177 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
178 private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>();
179 private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
180
181 private uint m_WorkerThreadCount = 0;
182 private Thread[] m_workerThreads;
183 private Thread m_retrysThread;
184
185 private bool m_running = true;
186 private int slowCount = 0;
187
188// private int m_timeout = 1000; // increase timeout 250; now use the event one
189
190 public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout)
191 {
192 m_server = pSrv;
193 m_WorkerThreadCount = pWorkerThreadCount;
194 m_workerThreads = new Thread[m_WorkerThreadCount];
195
196 //startup worker threads
197 for (uint i = 0; i < m_WorkerThreadCount; i++)
198 {
199 m_workerThreads[i]
200 = Watchdog.StartThread(
201 PoolWorkerJob,
202 String.Format("PollServiceWorkerThread{0}", i),
203 ThreadPriority.Normal,
204 false,
205 true,
206 null,
207 int.MaxValue);
208 }
209
210 m_retrysThread = Watchdog.StartThread(
211 this.CheckRetries,
212 "PollServiceWatcherThread",
213 ThreadPriority.Normal,
214 false,
215 true,
216 null,
217 1000 * 60 * 10);
218 }
219
220
221 private void ReQueueEvent(PollServiceHttpRequest req)
222 {
223 if (m_running)
224 {
225 lock (m_retryRequests)
226 m_retryRequests.Enqueue(req);
227 }
228 }
229
230 public void Enqueue(PollServiceHttpRequest req)
231 {
232 if (m_running)
233 {
234 if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.Normal)
235 {
236 m_requests.Enqueue(req);
237 }
238 else
239 {
240 lock (m_slowRequests)
241 m_slowRequests.Enqueue(req);
242 }
243 }
244 }
245
246 private void CheckRetries()
247 {
248 while (m_running)
249 {
250 Thread.Sleep(100); // let the world move .. back to faster rate
251 Watchdog.UpdateThread();
252 lock (m_retryRequests)
253 {
254 while (m_retryRequests.Count > 0 && m_running)
255 m_requests.Enqueue(m_retryRequests.Dequeue());
256 }
257 slowCount++;
258 if (slowCount >= 10)
259 {
260 slowCount = 0;
261
262 lock (m_slowRequests)
263 {
264 while (m_slowRequests.Count > 0 && m_running)
265 m_requests.Enqueue(m_slowRequests.Dequeue());
266 }
267 }
268 }
269 }
270
271 ~PollServiceRequestManager()
272 {
273 m_running = false;
274// m_timeout = -10000; // cause all to expire
275 Thread.Sleep(1000); // let the world move
276
277 foreach (Thread t in m_workerThreads)
278 {
279 try
280 {
281 t.Abort();
282 }
283 catch
284 {
285 }
286 }
287
288 try
289 {
290 foreach (PollServiceHttpRequest req in m_retryRequests)
291 {
292 DoHTTPGruntWork(m_server,req,
293 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
294 }
295 }
296 catch
297 {
298 }
299
300 PollServiceHttpRequest wreq;
301 m_retryRequests.Clear();
302
303 lock (m_slowRequests)
304 {
305 while (m_slowRequests.Count > 0 && m_running)
306 m_requests.Enqueue(m_slowRequests.Dequeue());
307 }
308
309 while (m_requests.Count() > 0)
310 {
311 try
312 {
313 wreq = m_requests.Dequeue(0);
314 DoHTTPGruntWork(m_server,wreq,
315 wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
316 }
317 catch
318 {
319 }
320 }
321
322 m_requests.Clear();
323 }
324
325 // work threads
326
327 private void PoolWorkerJob()
328 {
329 PollServiceHttpRequest req;
330 StreamReader str;
331
332// while (true)
333 while (m_running)
334 {
335 req = m_requests.Dequeue(5000);
336
337 Watchdog.UpdateThread();
338 if (req != null)
339 {
340 try
341 {
342 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
343 {
344 try
345 {
346 str = new StreamReader(req.Request.Body);
347 }
348 catch (System.ArgumentException)
349 {
350 // Stream was not readable means a child agent
351 // was closed due to logout, leaving the
352 // Event Queue request orphaned.
353 continue;
354 }
355
356 try
357 {
358 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id, str.ReadToEnd());
359 DoHTTPGruntWork(m_server, req, responsedata);
360 }
361 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
362 {
363 // Ignore it, no need to reply
364 }
365
366 str.Close();
367
368 }
369 else
370 {
371// if ((Environment.TickCount - req.RequestTime) > m_timeout)
372
373 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
374 {
375 DoHTTPGruntWork(m_server, req,
376 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
377 }
378 else
379 {
380 ReQueueEvent(req);
381 }
382 }
383 }
384 catch (Exception e)
385 {
386 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
387 }
388 }
389 }
390 }
391
392 // DoHTTPGruntWork changed, not sending response
393 // do the same work around as core
394
395 internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata)
396 {
397 OSHttpResponse response
398 = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext);
399
400 byte[] buffer = server.DoHTTPGruntWork(responsedata, response);
401
402 response.SendChunked = false;
403 response.ContentLength64 = buffer.Length;
404 response.ContentEncoding = Encoding.UTF8;
405
406 try
407 {
408 response.OutputStream.Write(buffer, 0, buffer.Length);
409 }
410 catch (Exception ex)
411 {
412 m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex));
413 }
414 finally
415 {
416 //response.OutputStream.Close();
417 try
418 {
419 response.OutputStream.Flush();
420 response.Send();
421
422 //if (!response.KeepAlive && response.ReuseContext)
423 // response.FreeContext();
424 }
425 catch (Exception e)
426 {
427 m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e));
428 }
429 }
430 }
431 }
432}
433