aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r--OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs285
1 files changed, 279 insertions, 6 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index 3e84c55..07bd48a 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -33,15 +33,20 @@ using log4net;
33using HttpServer; 33using HttpServer;
34using OpenSim.Framework; 34using OpenSim.Framework;
35using OpenSim.Framework.Monitoring; 35using OpenSim.Framework.Monitoring;
36using Amib.Threading;
36 37
38
39/*
37namespace OpenSim.Framework.Servers.HttpServer 40namespace OpenSim.Framework.Servers.HttpServer
38{ 41{
42
39 public class PollServiceRequestManager 43 public class PollServiceRequestManager
40 { 44 {
41// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); 45// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
42 46
43 private readonly BaseHttpServer m_server; 47 private readonly BaseHttpServer m_server;
44 private static Queue m_requests = Queue.Synchronized(new Queue()); 48 private static Queue m_requests = Queue.Synchronized(new Queue());
49 private static ManualResetEvent m_ev = new ManualResetEvent(false);
45 private uint m_WorkerThreadCount = 0; 50 private uint m_WorkerThreadCount = 0;
46 private Thread[] m_workerThreads; 51 private Thread[] m_workerThreads;
47 private PollServiceWorkerThread[] m_PollServiceWorkerThreads; 52 private PollServiceWorkerThread[] m_PollServiceWorkerThreads;
@@ -74,7 +79,6 @@ namespace OpenSim.Framework.Servers.HttpServer
74 ThreadPriority.Normal, 79 ThreadPriority.Normal,
75 false, 80 false,
76 true, 81 true,
77 null,
78 int.MaxValue); 82 int.MaxValue);
79 } 83 }
80 84
@@ -84,7 +88,6 @@ namespace OpenSim.Framework.Servers.HttpServer
84 ThreadPriority.Normal, 88 ThreadPriority.Normal,
85 false, 89 false,
86 true, 90 true,
87 null,
88 1000 * 60 * 10); 91 1000 * 60 * 10);
89 } 92 }
90 93
@@ -98,15 +101,17 @@ namespace OpenSim.Framework.Servers.HttpServer
98 { 101 {
99 lock (m_requests) 102 lock (m_requests)
100 m_requests.Enqueue(req); 103 m_requests.Enqueue(req);
104 m_ev.Set();
101 } 105 }
102 106
103 public void ThreadStart() 107 public void ThreadStart()
104 { 108 {
105 while (m_running) 109 while (m_running)
106 { 110 {
111 m_ev.WaitOne(1000);
112 m_ev.Reset();
107 Watchdog.UpdateThread(); 113 Watchdog.UpdateThread();
108 ProcessQueuedRequests(); 114 ProcessQueuedRequests();
109 Thread.Sleep(1000);
110 } 115 }
111 } 116 }
112 117
@@ -150,8 +155,9 @@ namespace OpenSim.Framework.Servers.HttpServer
150 foreach (object o in m_requests) 155 foreach (object o in m_requests)
151 { 156 {
152 PollServiceHttpRequest req = (PollServiceHttpRequest) o; 157 PollServiceHttpRequest req = (PollServiceHttpRequest) o;
153 PollServiceWorkerThread.DoHTTPGruntWork( 158 m_server.DoHTTPGruntWork(
154 m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); 159 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id),
160 new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext));
155 } 161 }
156 162
157 m_requests.Clear(); 163 m_requests.Clear();
@@ -162,4 +168,271 @@ namespace OpenSim.Framework.Servers.HttpServer
162 } 168 }
163 } 169 }
164 } 170 }
165} \ No newline at end of file 171}
172 */
173
174using System.IO;
175using System.Text;
176using System.Collections.Generic;
177
178namespace OpenSim.Framework.Servers.HttpServer
179{
180 public class PollServiceRequestManager
181 {
182 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
183
184 private readonly BaseHttpServer m_server;
185
186 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
187 private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>();
188 private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
189
190 private uint m_WorkerThreadCount = 0;
191 private Thread[] m_workerThreads;
192 private Thread m_retrysThread;
193
194 private bool m_running = true;
195 private int slowCount = 0;
196
197 private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2);
198
199// private int m_timeout = 1000; // increase timeout 250; now use the event one
200
201 public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout)
202 {
203 m_server = pSrv;
204 m_WorkerThreadCount = pWorkerThreadCount;
205 m_workerThreads = new Thread[m_WorkerThreadCount];
206
207 //startup worker threads
208 for (uint i = 0; i < m_WorkerThreadCount; i++)
209 {
210 m_workerThreads[i]
211 = Watchdog.StartThread(
212 PoolWorkerJob,
213 String.Format("PollServiceWorkerThread{0}", i),
214 ThreadPriority.Normal,
215 false,
216 false,
217 null,
218 int.MaxValue);
219 }
220
221 m_retrysThread = Watchdog.StartThread(
222 this.CheckRetries,
223 "PollServiceWatcherThread",
224 ThreadPriority.Normal,
225 false,
226 true,
227 null,
228 1000 * 60 * 10);
229 }
230
231
232 private void ReQueueEvent(PollServiceHttpRequest req)
233 {
234 if (m_running)
235 {
236 lock (m_retryRequests)
237 m_retryRequests.Enqueue(req);
238 }
239 }
240
241 public void Enqueue(PollServiceHttpRequest req)
242 {
243 if (m_running)
244 {
245 if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.Normal)
246 {
247 m_requests.Enqueue(req);
248 }
249 else
250 {
251 lock (m_slowRequests)
252 m_slowRequests.Enqueue(req);
253 }
254 }
255 }
256
257 private void CheckRetries()
258 {
259 while (m_running)
260 {
261 Thread.Sleep(100); // let the world move .. back to faster rate
262 Watchdog.UpdateThread();
263 lock (m_retryRequests)
264 {
265 while (m_retryRequests.Count > 0 && m_running)
266 m_requests.Enqueue(m_retryRequests.Dequeue());
267 }
268 slowCount++;
269 if (slowCount >= 10)
270 {
271 slowCount = 0;
272
273 lock (m_slowRequests)
274 {
275 while (m_slowRequests.Count > 0 && m_running)
276 m_requests.Enqueue(m_slowRequests.Dequeue());
277 }
278 }
279 }
280 }
281
282 ~PollServiceRequestManager()
283 {
284 m_running = false;
285// m_timeout = -10000; // cause all to expire
286 Thread.Sleep(1000); // let the world move
287
288 foreach (Thread t in m_workerThreads)
289 Watchdog.AbortThread(t.ManagedThreadId);
290
291 try
292 {
293 foreach (PollServiceHttpRequest req in m_retryRequests)
294 {
295 DoHTTPGruntWork(m_server,req,
296 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
297 }
298 }
299 catch
300 {
301 }
302
303 PollServiceHttpRequest wreq;
304 m_retryRequests.Clear();
305
306 lock (m_slowRequests)
307 {
308 while (m_slowRequests.Count > 0 && m_running)
309 m_requests.Enqueue(m_slowRequests.Dequeue());
310 }
311
312 while (m_requests.Count() > 0)
313 {
314 try
315 {
316 wreq = m_requests.Dequeue(0);
317 DoHTTPGruntWork(m_server,wreq,
318 wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
319 }
320 catch
321 {
322 }
323 }
324
325 m_requests.Clear();
326 }
327
328 // work threads
329
330 private void PoolWorkerJob()
331 {
332 while (m_running)
333 {
334 PollServiceHttpRequest req = m_requests.Dequeue(5000);
335
336 Watchdog.UpdateThread();
337 if (req != null)
338 {
339 try
340 {
341 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
342 {
343 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
344
345 if (responsedata == null)
346 continue;
347
348 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.Normal)
349 {
350 try
351 {
352 DoHTTPGruntWork(m_server, req, responsedata);
353 }
354 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
355 {
356 // Ignore it, no need to reply
357 }
358 }
359 else
360 {
361 m_threadPool.QueueWorkItem(x =>
362 {
363 try
364 {
365 DoHTTPGruntWork(m_server, req, responsedata);
366 }
367 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
368 {
369 // Ignore it, no need to reply
370 }
371
372 return null;
373 }, null);
374 }
375 }
376 else
377 {
378 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
379 {
380 DoHTTPGruntWork(m_server, req,
381 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
382 }
383 else
384 {
385 ReQueueEvent(req);
386 }
387 }
388 }
389 catch (Exception e)
390 {
391 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
392 }
393 }
394 }
395 }
396
397 // DoHTTPGruntWork changed, not sending response
398 // do the same work around as core
399
400 internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata)
401 {
402 OSHttpResponse response
403 = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext);
404
405 byte[] buffer = server.DoHTTPGruntWork(responsedata, response);
406
407 response.SendChunked = false;
408 response.ContentLength64 = buffer.Length;
409 response.ContentEncoding = Encoding.UTF8;
410
411 try
412 {
413 response.OutputStream.Write(buffer, 0, buffer.Length);
414 }
415 catch (Exception ex)
416 {
417 m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex));
418 }
419 finally
420 {
421 //response.OutputStream.Close();
422 try
423 {
424 response.OutputStream.Flush();
425 response.Send();
426
427 //if (!response.KeepAlive && response.ReuseContext)
428 // response.FreeContext();
429 }
430 catch (Exception e)
431 {
432 m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e));
433 }
434 }
435 }
436 }
437}
438