aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r--OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs286
1 files changed, 279 insertions, 7 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index 8d50151..4be8bf4 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -33,15 +33,20 @@ using log4net;
33using HttpServer; 33using HttpServer;
34using OpenSim.Framework; 34using OpenSim.Framework;
35using OpenSim.Framework.Monitoring; 35using OpenSim.Framework.Monitoring;
36using Amib.Threading;
36 37
38
39/*
37namespace OpenSim.Framework.Servers.HttpServer 40namespace OpenSim.Framework.Servers.HttpServer
38{ 41{
42
39 public class PollServiceRequestManager 43 public class PollServiceRequestManager
40 { 44 {
41// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); 45// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
42 46
43 private readonly BaseHttpServer m_server; 47 private readonly BaseHttpServer m_server;
44 private static Queue m_requests = Queue.Synchronized(new Queue()); 48 private static Queue m_requests = Queue.Synchronized(new Queue());
49 private static ManualResetEvent m_ev = new ManualResetEvent(false);
45 private uint m_WorkerThreadCount = 0; 50 private uint m_WorkerThreadCount = 0;
46 private Thread[] m_workerThreads; 51 private Thread[] m_workerThreads;
47 private PollServiceWorkerThread[] m_PollServiceWorkerThreads; 52 private PollServiceWorkerThread[] m_PollServiceWorkerThreads;
@@ -67,7 +72,6 @@ namespace OpenSim.Framework.Servers.HttpServer
67 ThreadPriority.Normal, 72 ThreadPriority.Normal,
68 false, 73 false,
69 true, 74 true,
70 null,
71 int.MaxValue); 75 int.MaxValue);
72 } 76 }
73 77
@@ -77,7 +81,6 @@ namespace OpenSim.Framework.Servers.HttpServer
77 ThreadPriority.Normal, 81 ThreadPriority.Normal,
78 false, 82 false,
79 true, 83 true,
80 null,
81 1000 * 60 * 10); 84 1000 * 60 * 10);
82 } 85 }
83 86
@@ -91,15 +94,17 @@ namespace OpenSim.Framework.Servers.HttpServer
91 { 94 {
92 lock (m_requests) 95 lock (m_requests)
93 m_requests.Enqueue(req); 96 m_requests.Enqueue(req);
97 m_ev.Set();
94 } 98 }
95 99
96 public void ThreadStart() 100 public void ThreadStart()
97 { 101 {
98 while (m_running) 102 while (m_running)
99 { 103 {
104 m_ev.WaitOne(1000);
105 m_ev.Reset();
100 Watchdog.UpdateThread(); 106 Watchdog.UpdateThread();
101 ProcessQueuedRequests(); 107 ProcessQueuedRequests();
102 Thread.Sleep(1000);
103 } 108 }
104 } 109 }
105 110
@@ -141,8 +146,9 @@ namespace OpenSim.Framework.Servers.HttpServer
141 foreach (object o in m_requests) 146 foreach (object o in m_requests)
142 { 147 {
143 PollServiceHttpRequest req = (PollServiceHttpRequest) o; 148 PollServiceHttpRequest req = (PollServiceHttpRequest) o;
144 PollServiceWorkerThread.DoHTTPGruntWork( 149 m_server.DoHTTPGruntWork(
145 m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); 150 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id),
151 new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext));
146 } 152 }
147 153
148 m_requests.Clear(); 154 m_requests.Clear();
@@ -151,8 +157,274 @@ namespace OpenSim.Framework.Servers.HttpServer
151 { 157 {
152 t.Abort(); 158 t.Abort();
153 } 159 }
154
155 m_running = false; 160 m_running = false;
156 } 161 }
157 } 162 }
158} \ No newline at end of file 163}
164 */
165
166using System.IO;
167using System.Text;
168using System.Collections.Generic;
169
170namespace OpenSim.Framework.Servers.HttpServer
171{
172 public class PollServiceRequestManager
173 {
174 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
175
176 private readonly BaseHttpServer m_server;
177
178 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
179 private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>();
180 private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
181
182 private uint m_WorkerThreadCount = 0;
183 private Thread[] m_workerThreads;
184 private Thread m_retrysThread;
185
186 private bool m_running = true;
187 private int slowCount = 0;
188
189 private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2);
190
191// private int m_timeout = 1000; // increase timeout 250; now use the event one
192
193 public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout)
194 {
195 m_server = pSrv;
196 m_WorkerThreadCount = pWorkerThreadCount;
197 m_workerThreads = new Thread[m_WorkerThreadCount];
198
199 //startup worker threads
200 for (uint i = 0; i < m_WorkerThreadCount; i++)
201 {
202 m_workerThreads[i]
203 = Watchdog.StartThread(
204 PoolWorkerJob,
205 String.Format("PollServiceWorkerThread{0}", i),
206 ThreadPriority.Normal,
207 false,
208 false,
209 null,
210 int.MaxValue);
211 }
212
213 m_retrysThread = Watchdog.StartThread(
214 this.CheckRetries,
215 "PollServiceWatcherThread",
216 ThreadPriority.Normal,
217 false,
218 true,
219 null,
220 1000 * 60 * 10);
221 }
222
223
224 private void ReQueueEvent(PollServiceHttpRequest req)
225 {
226 if (m_running)
227 {
228 lock (m_retryRequests)
229 m_retryRequests.Enqueue(req);
230 }
231 }
232
233 public void Enqueue(PollServiceHttpRequest req)
234 {
235 if (m_running)
236 {
237 if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.Normal)
238 {
239 m_requests.Enqueue(req);
240 }
241 else
242 {
243 lock (m_slowRequests)
244 m_slowRequests.Enqueue(req);
245 }
246 }
247 }
248
249 private void CheckRetries()
250 {
251 while (m_running)
252 {
253 Thread.Sleep(100); // let the world move .. back to faster rate
254 Watchdog.UpdateThread();
255 lock (m_retryRequests)
256 {
257 while (m_retryRequests.Count > 0 && m_running)
258 m_requests.Enqueue(m_retryRequests.Dequeue());
259 }
260 slowCount++;
261 if (slowCount >= 10)
262 {
263 slowCount = 0;
264
265 lock (m_slowRequests)
266 {
267 while (m_slowRequests.Count > 0 && m_running)
268 m_requests.Enqueue(m_slowRequests.Dequeue());
269 }
270 }
271 }
272 }
273
274 ~PollServiceRequestManager()
275 {
276 m_running = false;
277// m_timeout = -10000; // cause all to expire
278 Thread.Sleep(1000); // let the world move
279
280 foreach (Thread t in m_workerThreads)
281 Watchdog.AbortThread(t.ManagedThreadId);
282
283 try
284 {
285 foreach (PollServiceHttpRequest req in m_retryRequests)
286 {
287 DoHTTPGruntWork(m_server,req,
288 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
289 }
290 }
291 catch
292 {
293 }
294
295 PollServiceHttpRequest wreq;
296 m_retryRequests.Clear();
297
298 lock (m_slowRequests)
299 {
300 while (m_slowRequests.Count > 0 && m_running)
301 m_requests.Enqueue(m_slowRequests.Dequeue());
302 }
303
304 while (m_requests.Count() > 0)
305 {
306 try
307 {
308 wreq = m_requests.Dequeue(0);
309 DoHTTPGruntWork(m_server,wreq,
310 wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
311 }
312 catch
313 {
314 }
315 }
316
317 m_requests.Clear();
318 }
319
320 // work threads
321
322 private void PoolWorkerJob()
323 {
324 while (m_running)
325 {
326 PollServiceHttpRequest req = m_requests.Dequeue(5000);
327
328 Watchdog.UpdateThread();
329 if (req != null)
330 {
331 try
332 {
333 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
334 {
335 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
336
337 if (responsedata == null)
338 continue;
339
340 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.Normal)
341 {
342 try
343 {
344 DoHTTPGruntWork(m_server, req, responsedata);
345 }
346 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
347 {
348 // Ignore it, no need to reply
349 }
350 }
351 else
352 {
353 m_threadPool.QueueWorkItem(x =>
354 {
355 try
356 {
357 DoHTTPGruntWork(m_server, req, responsedata);
358 }
359 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
360 {
361 // Ignore it, no need to reply
362 }
363
364 return null;
365 }, null);
366 }
367 }
368 else
369 {
370 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
371 {
372 DoHTTPGruntWork(m_server, req,
373 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
374 }
375 else
376 {
377 ReQueueEvent(req);
378 }
379 }
380 }
381 catch (Exception e)
382 {
383 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
384 }
385 }
386 }
387 }
388
389 // DoHTTPGruntWork changed, not sending response
390 // do the same work around as core
391
392 internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata)
393 {
394 OSHttpResponse response
395 = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext);
396
397 byte[] buffer = server.DoHTTPGruntWork(responsedata, response);
398
399 response.SendChunked = false;
400 response.ContentLength64 = buffer.Length;
401 response.ContentEncoding = Encoding.UTF8;
402
403 try
404 {
405 response.OutputStream.Write(buffer, 0, buffer.Length);
406 }
407 catch (Exception ex)
408 {
409 m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex));
410 }
411 finally
412 {
413 //response.OutputStream.Close();
414 try
415 {
416 response.OutputStream.Flush();
417 response.Send();
418
419 //if (!response.KeepAlive && response.ReuseContext)
420 // response.FreeContext();
421 }
422 catch (Exception e)
423 {
424 m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e));
425 }
426 }
427 }
428 }
429}
430