aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r--OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs215
1 files changed, 193 insertions, 22 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index 28bba70..4ffe6e5 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -65,15 +65,25 @@ namespace OpenSim.Framework.Servers.HttpServer
65 65
66 private readonly BaseHttpServer m_server; 66 private readonly BaseHttpServer m_server;
67 67
68 private Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>> m_bycontext;
68 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); 69 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
69 private static List<PollServiceHttpRequest> m_longPollRequests = new List<PollServiceHttpRequest>(); 70 private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>();
71 private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
70 72
71 private uint m_WorkerThreadCount = 0; 73 private uint m_WorkerThreadCount = 0;
72 private Thread[] m_workerThreads; 74 private Thread[] m_workerThreads;
75 private Thread m_retrysThread;
73 76
77<<<<<<< HEAD
74 private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); 78 private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2);
75 79
76// private int m_timeout = 1000; // increase timeout 250; now use the event one 80// private int m_timeout = 1000; // increase timeout 250; now use the event one
81=======
82 private bool m_running = true;
83 private int slowCount = 0;
84
85 private SmartThreadPool m_threadPool;
86>>>>>>> avn/ubitvar
77 87
78 public PollServiceRequestManager( 88 public PollServiceRequestManager(
79 BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout) 89 BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout)
@@ -83,6 +93,7 @@ namespace OpenSim.Framework.Servers.HttpServer
83 m_WorkerThreadCount = pWorkerThreadCount; 93 m_WorkerThreadCount = pWorkerThreadCount;
84 m_workerThreads = new Thread[m_WorkerThreadCount]; 94 m_workerThreads = new Thread[m_WorkerThreadCount];
85 95
96<<<<<<< HEAD
86 StatsManager.RegisterStat( 97 StatsManager.RegisterStat(
87 new Stat( 98 new Stat(
88 "QueuedPollResponses", 99 "QueuedPollResponses",
@@ -108,10 +119,25 @@ namespace OpenSim.Framework.Servers.HttpServer
108 MeasuresOfInterest.AverageChangeOverTime, 119 MeasuresOfInterest.AverageChangeOverTime,
109 stat => stat.Value = ResponsesProcessed, 120 stat => stat.Value = ResponsesProcessed,
110 StatVerbosity.Debug)); 121 StatVerbosity.Debug));
122=======
123 PollServiceHttpRequestComparer preqCp = new PollServiceHttpRequestComparer();
124 m_bycontext = new Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>>(preqCp);
125
126 STPStartInfo startInfo = new STPStartInfo();
127 startInfo.IdleTimeout = 30000;
128 startInfo.MaxWorkerThreads = 15;
129 startInfo.MinWorkerThreads = 1;
130 startInfo.ThreadPriority = ThreadPriority.Normal;
131 startInfo.StartSuspended = true;
132 startInfo.ThreadPoolName = "PoolService";
133
134 m_threadPool = new SmartThreadPool(startInfo);
135>>>>>>> avn/ubitvar
111 } 136 }
112 137
113 public void Start() 138 public void Start()
114 { 139 {
140<<<<<<< HEAD
115 IsRunning = true; 141 IsRunning = true;
116 142
117 if (PerformResponsesAsync) 143 if (PerformResponsesAsync)
@@ -139,40 +165,100 @@ namespace OpenSim.Framework.Servers.HttpServer
139 null, 165 null,
140 1000 * 60 * 10); 166 1000 * 60 * 10);
141 } 167 }
168=======
169 m_threadPool.Start();
170 //startup worker threads
171 for (uint i = 0; i < m_WorkerThreadCount; i++)
172 {
173 m_workerThreads[i]
174 = Watchdog.StartThread(
175 PoolWorkerJob,
176 string.Format("PollServiceWorkerThread {0}:{1}", i, m_server.Port),
177 ThreadPriority.Normal,
178 false,
179 false,
180 null,
181 int.MaxValue);
182 }
183
184 m_retrysThread = Watchdog.StartThread(
185 this.CheckRetries,
186 string.Format("PollServiceWatcherThread:{0}", m_server.Port),
187 ThreadPriority.Normal,
188 false,
189 true,
190 null,
191 1000 * 60 * 10);
192>>>>>>> avn/ubitvar
142 } 193 }
143 194
144 private void ReQueueEvent(PollServiceHttpRequest req) 195 private void ReQueueEvent(PollServiceHttpRequest req)
145 { 196 {
146 if (IsRunning) 197 if (IsRunning)
147 { 198 {
148 // delay the enqueueing for 100ms. There's no need to have the event 199 lock (m_retryRequests)
149 // actively on the queue 200 m_retryRequests.Enqueue(req);
150 Timer t = new Timer(self => { 201 }
151 ((Timer)self).Dispose(); 202 }
152 m_requests.Enqueue(req);
153 });
154 203
155 t.Change(100, Timeout.Infinite); 204 public void Enqueue(PollServiceHttpRequest req)
205 {
206 lock (m_bycontext)
207 {
208 Queue<PollServiceHttpRequest> ctxQeueue;
209 if (m_bycontext.TryGetValue(req, out ctxQeueue))
210 {
211 ctxQeueue.Enqueue(req);
212 }
213 else
214 {
215 ctxQeueue = new Queue<PollServiceHttpRequest>();
216 m_bycontext[req] = ctxQeueue;
217 EnqueueInt(req);
218 }
219 }
220 }
156 221
222 public void byContextDequeue(PollServiceHttpRequest req)
223 {
224 Queue<PollServiceHttpRequest> ctxQeueue;
225 lock (m_bycontext)
226 {
227 if (m_bycontext.TryGetValue(req, out ctxQeueue))
228 {
229 if (ctxQeueue.Count > 0)
230 {
231 PollServiceHttpRequest newreq = ctxQeueue.Dequeue();
232 EnqueueInt(newreq);
233 }
234 else
235 {
236 m_bycontext.Remove(req);
237 }
238 }
157 } 239 }
158 } 240 }
159 241
160 public void Enqueue(PollServiceHttpRequest req) 242
243 public void EnqueueInt(PollServiceHttpRequest req)
161 { 244 {
162 if (IsRunning) 245 if (IsRunning)
163 { 246 {
164 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) 247 if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.LongPoll)
165 { 248 {
166 lock (m_longPollRequests) 249 m_requests.Enqueue(req);
167 m_longPollRequests.Add(req);
168 } 250 }
169 else 251 else
170 m_requests.Enqueue(req); 252 {
253 lock (m_slowRequests)
254 m_slowRequests.Enqueue(req);
255 }
171 } 256 }
172 } 257 }
173 258
174 private void CheckLongPollThreads() 259 private void CheckRetries()
175 { 260 {
261<<<<<<< HEAD
176 // The only purpose of this thread is to check the EQs for events. 262 // The only purpose of this thread is to check the EQs for events.
177 // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests. 263 // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests.
178 // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests. 264 // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests.
@@ -180,13 +266,15 @@ namespace OpenSim.Framework.Servers.HttpServer
180 // so if they aren't ready to be served by a worker thread (no events), they are placed 266 // so if they aren't ready to be served by a worker thread (no events), they are placed
181 // directly back in the "ready-to-serve" queue by the worker thread. 267 // directly back in the "ready-to-serve" queue by the worker thread.
182 while (IsRunning) 268 while (IsRunning)
269=======
270 while (m_running)
271>>>>>>> avn/ubitvar
183 { 272 {
184 Thread.Sleep(500); 273 Thread.Sleep(100); // let the world move .. back to faster rate
185 Watchdog.UpdateThread(); 274 Watchdog.UpdateThread();
186 275 lock (m_retryRequests)
187// List<PollServiceHttpRequest> not_ready = new List<PollServiceHttpRequest>();
188 lock (m_longPollRequests)
189 { 276 {
277<<<<<<< HEAD
190 if (m_longPollRequests.Count > 0 && IsRunning) 278 if (m_longPollRequests.Count > 0 && IsRunning)
191 { 279 {
192 List<PollServiceHttpRequest> ready = m_longPollRequests.FindAll(req => 280 List<PollServiceHttpRequest> ready = m_longPollRequests.FindAll(req =>
@@ -199,28 +287,67 @@ namespace OpenSim.Framework.Servers.HttpServer
199 m_requests.Enqueue(req); 287 m_requests.Enqueue(req);
200 m_longPollRequests.Remove(req); 288 m_longPollRequests.Remove(req);
201 }); 289 });
290=======
291 while (m_retryRequests.Count > 0 && m_running)
292 m_requests.Enqueue(m_retryRequests.Dequeue());
293 }
294 slowCount++;
295 if (slowCount >= 10)
296 {
297 slowCount = 0;
298>>>>>>> avn/ubitvar
202 299
300 lock (m_slowRequests)
301 {
302 while (m_slowRequests.Count > 0 && m_running)
303 m_requests.Enqueue(m_slowRequests.Dequeue());
203 } 304 }
204
205 } 305 }
206 } 306 }
207 } 307 }
208 308
209 public void Stop() 309 public void Stop()
210 { 310 {
311<<<<<<< HEAD
211 IsRunning = false; 312 IsRunning = false;
212// m_timeout = -10000; // cause all to expire 313// m_timeout = -10000; // cause all to expire
314=======
315 m_running = false;
316>>>>>>> avn/ubitvar
213 Thread.Sleep(1000); // let the world move 317 Thread.Sleep(1000); // let the world move
214 318
215 foreach (Thread t in m_workerThreads) 319 foreach (Thread t in m_workerThreads)
216 Watchdog.AbortThread(t.ManagedThreadId); 320 Watchdog.AbortThread(t.ManagedThreadId);
217 321
322 // any entry in m_bycontext should have a active request on the other queues
323 // so just delete contents to easy GC
324 foreach (Queue<PollServiceHttpRequest> qu in m_bycontext.Values)
325 qu.Clear();
326 m_bycontext.Clear();
327
328 try
329 {
330 foreach (PollServiceHttpRequest req in m_retryRequests)
331 {
332 req.DoHTTPstop(m_server);
333 }
334 }
335 catch
336 {
337 }
338
218 PollServiceHttpRequest wreq; 339 PollServiceHttpRequest wreq;
340 m_retryRequests.Clear();
219 341
220 lock (m_longPollRequests) 342 lock (m_slowRequests)
221 { 343 {
344<<<<<<< HEAD
222 if (m_longPollRequests.Count > 0 && IsRunning) 345 if (m_longPollRequests.Count > 0 && IsRunning)
223 m_longPollRequests.ForEach(req => m_requests.Enqueue(req)); 346 m_longPollRequests.ForEach(req => m_requests.Enqueue(req));
347=======
348 while (m_slowRequests.Count > 0)
349 m_requests.Enqueue(m_slowRequests.Dequeue());
350>>>>>>> avn/ubitvar
224 } 351 }
225 352
226 while (m_requests.Count() > 0) 353 while (m_requests.Count() > 0)
@@ -228,16 +355,19 @@ namespace OpenSim.Framework.Servers.HttpServer
228 try 355 try
229 { 356 {
230 wreq = m_requests.Dequeue(0); 357 wreq = m_requests.Dequeue(0);
358<<<<<<< HEAD
231 ResponsesProcessed++; 359 ResponsesProcessed++;
232 wreq.DoHTTPGruntWork( 360 wreq.DoHTTPGruntWork(
233 m_server, wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id)); 361 m_server, wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
362=======
363 wreq.DoHTTPstop(m_server);
364>>>>>>> avn/ubitvar
234 } 365 }
235 catch 366 catch
236 { 367 {
237 } 368 }
238 } 369 }
239 370
240 m_longPollRequests.Clear();
241 m_requests.Clear(); 371 m_requests.Clear();
242 } 372 }
243 373
@@ -247,6 +377,11 @@ namespace OpenSim.Framework.Servers.HttpServer
247 { 377 {
248 while (IsRunning) 378 while (IsRunning)
249 { 379 {
380<<<<<<< HEAD
381=======
382 PollServiceHttpRequest req = m_requests.Dequeue(5000);
383
384>>>>>>> avn/ubitvar
250 Watchdog.UpdateThread(); 385 Watchdog.UpdateThread();
251 WaitPerformResponse(); 386 WaitPerformResponse();
252 } 387 }
@@ -265,6 +400,7 @@ namespace OpenSim.Framework.Servers.HttpServer
265 { 400 {
266 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); 401 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
267 402
403<<<<<<< HEAD
268 if (responsedata == null) 404 if (responsedata == null)
269 return; 405 return;
270 406
@@ -287,11 +423,15 @@ namespace OpenSim.Framework.Servers.HttpServer
287 else 423 else
288 { 424 {
289 m_threadPool.QueueWorkItem(x => 425 m_threadPool.QueueWorkItem(x =>
426=======
427 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) // This is the event queue
428>>>>>>> avn/ubitvar
290 { 429 {
291 try 430 try
292 { 431 {
293 ResponsesProcessed++; 432 ResponsesProcessed++;
294 req.DoHTTPGruntWork(m_server, responsedata); 433 req.DoHTTPGruntWork(m_server, responsedata);
434 byContextDequeue(req);
295 } 435 }
296 catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream 436 catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream
297 { 437 {
@@ -300,6 +440,7 @@ namespace OpenSim.Framework.Servers.HttpServer
300 } 440 }
301 catch (Exception e) 441 catch (Exception e)
302 { 442 {
443<<<<<<< HEAD
303 m_log.Error(e); 444 m_log.Error(e);
304 } 445 }
305 446
@@ -318,6 +459,34 @@ namespace OpenSim.Framework.Servers.HttpServer
318 else 459 else
319 { 460 {
320 ReQueueEvent(req); 461 ReQueueEvent(req);
462=======
463 try
464 {
465 req.DoHTTPGruntWork(m_server, responsedata);
466 byContextDequeue(req);
467 }
468 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
469 {
470 // Ignore it, no need to reply
471 }
472
473 return null;
474 }, null);
475 }
476 }
477 else
478 {
479 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
480 {
481 req.DoHTTPGruntWork(m_server,
482 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
483 byContextDequeue(req);
484 }
485 else
486 {
487 ReQueueEvent(req);
488 }
489>>>>>>> avn/ubitvar
321 } 490 }
322 } 491 }
323 } 492 }
@@ -327,5 +496,7 @@ namespace OpenSim.Framework.Servers.HttpServer
327 } 496 }
328 } 497 }
329 } 498 }
499
330 } 500 }
331} \ No newline at end of file 501}
502