diff options
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r-- | OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | 285 |
1 files changed, 279 insertions, 6 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs index 3e84c55..07bd48a 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | |||
@@ -33,15 +33,20 @@ using log4net; | |||
33 | using HttpServer; | 33 | using HttpServer; |
34 | using OpenSim.Framework; | 34 | using OpenSim.Framework; |
35 | using OpenSim.Framework.Monitoring; | 35 | using OpenSim.Framework.Monitoring; |
36 | using Amib.Threading; | ||
36 | 37 | ||
38 | |||
39 | /* | ||
37 | namespace OpenSim.Framework.Servers.HttpServer | 40 | namespace OpenSim.Framework.Servers.HttpServer |
38 | { | 41 | { |
42 | |||
39 | public class PollServiceRequestManager | 43 | public class PollServiceRequestManager |
40 | { | 44 | { |
41 | // private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); | 45 | // private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); |
42 | 46 | ||
43 | private readonly BaseHttpServer m_server; | 47 | private readonly BaseHttpServer m_server; |
44 | private static Queue m_requests = Queue.Synchronized(new Queue()); | 48 | private static Queue m_requests = Queue.Synchronized(new Queue()); |
49 | private static ManualResetEvent m_ev = new ManualResetEvent(false); | ||
45 | private uint m_WorkerThreadCount = 0; | 50 | private uint m_WorkerThreadCount = 0; |
46 | private Thread[] m_workerThreads; | 51 | private Thread[] m_workerThreads; |
47 | private PollServiceWorkerThread[] m_PollServiceWorkerThreads; | 52 | private PollServiceWorkerThread[] m_PollServiceWorkerThreads; |
@@ -74,7 +79,6 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
74 | ThreadPriority.Normal, | 79 | ThreadPriority.Normal, |
75 | false, | 80 | false, |
76 | true, | 81 | true, |
77 | null, | ||
78 | int.MaxValue); | 82 | int.MaxValue); |
79 | } | 83 | } |
80 | 84 | ||
@@ -84,7 +88,6 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
84 | ThreadPriority.Normal, | 88 | ThreadPriority.Normal, |
85 | false, | 89 | false, |
86 | true, | 90 | true, |
87 | null, | ||
88 | 1000 * 60 * 10); | 91 | 1000 * 60 * 10); |
89 | } | 92 | } |
90 | 93 | ||
@@ -98,15 +101,17 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
98 | { | 101 | { |
99 | lock (m_requests) | 102 | lock (m_requests) |
100 | m_requests.Enqueue(req); | 103 | m_requests.Enqueue(req); |
104 | m_ev.Set(); | ||
101 | } | 105 | } |
102 | 106 | ||
103 | public void ThreadStart() | 107 | public void ThreadStart() |
104 | { | 108 | { |
105 | while (m_running) | 109 | while (m_running) |
106 | { | 110 | { |
111 | m_ev.WaitOne(1000); | ||
112 | m_ev.Reset(); | ||
107 | Watchdog.UpdateThread(); | 113 | Watchdog.UpdateThread(); |
108 | ProcessQueuedRequests(); | 114 | ProcessQueuedRequests(); |
109 | Thread.Sleep(1000); | ||
110 | } | 115 | } |
111 | } | 116 | } |
112 | 117 | ||
@@ -150,8 +155,9 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
150 | foreach (object o in m_requests) | 155 | foreach (object o in m_requests) |
151 | { | 156 | { |
152 | PollServiceHttpRequest req = (PollServiceHttpRequest) o; | 157 | PollServiceHttpRequest req = (PollServiceHttpRequest) o; |
153 | PollServiceWorkerThread.DoHTTPGruntWork( | 158 | m_server.DoHTTPGruntWork( |
154 | m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | 159 | req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id), |
160 | new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext)); | ||
155 | } | 161 | } |
156 | 162 | ||
157 | m_requests.Clear(); | 163 | m_requests.Clear(); |
@@ -162,4 +168,271 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
162 | } | 168 | } |
163 | } | 169 | } |
164 | } | 170 | } |
165 | } \ No newline at end of file | 171 | } |
172 | */ | ||
173 | |||
174 | using System.IO; | ||
175 | using System.Text; | ||
176 | using System.Collections.Generic; | ||
177 | |||
178 | namespace OpenSim.Framework.Servers.HttpServer | ||
179 | { | ||
180 | public class PollServiceRequestManager | ||
181 | { | ||
182 | private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); | ||
183 | |||
184 | private readonly BaseHttpServer m_server; | ||
185 | |||
186 | private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); | ||
187 | private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>(); | ||
188 | private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>(); | ||
189 | |||
190 | private uint m_WorkerThreadCount = 0; | ||
191 | private Thread[] m_workerThreads; | ||
192 | private Thread m_retrysThread; | ||
193 | |||
194 | private bool m_running = true; | ||
195 | private int slowCount = 0; | ||
196 | |||
197 | private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); | ||
198 | |||
199 | // private int m_timeout = 1000; // increase timeout 250; now use the event one | ||
200 | |||
201 | public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) | ||
202 | { | ||
203 | m_server = pSrv; | ||
204 | m_WorkerThreadCount = pWorkerThreadCount; | ||
205 | m_workerThreads = new Thread[m_WorkerThreadCount]; | ||
206 | |||
207 | //startup worker threads | ||
208 | for (uint i = 0; i < m_WorkerThreadCount; i++) | ||
209 | { | ||
210 | m_workerThreads[i] | ||
211 | = Watchdog.StartThread( | ||
212 | PoolWorkerJob, | ||
213 | String.Format("PollServiceWorkerThread{0}", i), | ||
214 | ThreadPriority.Normal, | ||
215 | false, | ||
216 | false, | ||
217 | null, | ||
218 | int.MaxValue); | ||
219 | } | ||
220 | |||
221 | m_retrysThread = Watchdog.StartThread( | ||
222 | this.CheckRetries, | ||
223 | "PollServiceWatcherThread", | ||
224 | ThreadPriority.Normal, | ||
225 | false, | ||
226 | true, | ||
227 | null, | ||
228 | 1000 * 60 * 10); | ||
229 | } | ||
230 | |||
231 | |||
232 | private void ReQueueEvent(PollServiceHttpRequest req) | ||
233 | { | ||
234 | if (m_running) | ||
235 | { | ||
236 | lock (m_retryRequests) | ||
237 | m_retryRequests.Enqueue(req); | ||
238 | } | ||
239 | } | ||
240 | |||
241 | public void Enqueue(PollServiceHttpRequest req) | ||
242 | { | ||
243 | if (m_running) | ||
244 | { | ||
245 | if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.Normal) | ||
246 | { | ||
247 | m_requests.Enqueue(req); | ||
248 | } | ||
249 | else | ||
250 | { | ||
251 | lock (m_slowRequests) | ||
252 | m_slowRequests.Enqueue(req); | ||
253 | } | ||
254 | } | ||
255 | } | ||
256 | |||
257 | private void CheckRetries() | ||
258 | { | ||
259 | while (m_running) | ||
260 | { | ||
261 | Thread.Sleep(100); // let the world move .. back to faster rate | ||
262 | Watchdog.UpdateThread(); | ||
263 | lock (m_retryRequests) | ||
264 | { | ||
265 | while (m_retryRequests.Count > 0 && m_running) | ||
266 | m_requests.Enqueue(m_retryRequests.Dequeue()); | ||
267 | } | ||
268 | slowCount++; | ||
269 | if (slowCount >= 10) | ||
270 | { | ||
271 | slowCount = 0; | ||
272 | |||
273 | lock (m_slowRequests) | ||
274 | { | ||
275 | while (m_slowRequests.Count > 0 && m_running) | ||
276 | m_requests.Enqueue(m_slowRequests.Dequeue()); | ||
277 | } | ||
278 | } | ||
279 | } | ||
280 | } | ||
281 | |||
282 | ~PollServiceRequestManager() | ||
283 | { | ||
284 | m_running = false; | ||
285 | // m_timeout = -10000; // cause all to expire | ||
286 | Thread.Sleep(1000); // let the world move | ||
287 | |||
288 | foreach (Thread t in m_workerThreads) | ||
289 | Watchdog.AbortThread(t.ManagedThreadId); | ||
290 | |||
291 | try | ||
292 | { | ||
293 | foreach (PollServiceHttpRequest req in m_retryRequests) | ||
294 | { | ||
295 | DoHTTPGruntWork(m_server,req, | ||
296 | req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
297 | } | ||
298 | } | ||
299 | catch | ||
300 | { | ||
301 | } | ||
302 | |||
303 | PollServiceHttpRequest wreq; | ||
304 | m_retryRequests.Clear(); | ||
305 | |||
306 | lock (m_slowRequests) | ||
307 | { | ||
308 | while (m_slowRequests.Count > 0 && m_running) | ||
309 | m_requests.Enqueue(m_slowRequests.Dequeue()); | ||
310 | } | ||
311 | |||
312 | while (m_requests.Count() > 0) | ||
313 | { | ||
314 | try | ||
315 | { | ||
316 | wreq = m_requests.Dequeue(0); | ||
317 | DoHTTPGruntWork(m_server,wreq, | ||
318 | wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id)); | ||
319 | } | ||
320 | catch | ||
321 | { | ||
322 | } | ||
323 | } | ||
324 | |||
325 | m_requests.Clear(); | ||
326 | } | ||
327 | |||
328 | // work threads | ||
329 | |||
330 | private void PoolWorkerJob() | ||
331 | { | ||
332 | while (m_running) | ||
333 | { | ||
334 | PollServiceHttpRequest req = m_requests.Dequeue(5000); | ||
335 | |||
336 | Watchdog.UpdateThread(); | ||
337 | if (req != null) | ||
338 | { | ||
339 | try | ||
340 | { | ||
341 | if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) | ||
342 | { | ||
343 | Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); | ||
344 | |||
345 | if (responsedata == null) | ||
346 | continue; | ||
347 | |||
348 | if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.Normal) | ||
349 | { | ||
350 | try | ||
351 | { | ||
352 | DoHTTPGruntWork(m_server, req, responsedata); | ||
353 | } | ||
354 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream | ||
355 | { | ||
356 | // Ignore it, no need to reply | ||
357 | } | ||
358 | } | ||
359 | else | ||
360 | { | ||
361 | m_threadPool.QueueWorkItem(x => | ||
362 | { | ||
363 | try | ||
364 | { | ||
365 | DoHTTPGruntWork(m_server, req, responsedata); | ||
366 | } | ||
367 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream | ||
368 | { | ||
369 | // Ignore it, no need to reply | ||
370 | } | ||
371 | |||
372 | return null; | ||
373 | }, null); | ||
374 | } | ||
375 | } | ||
376 | else | ||
377 | { | ||
378 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) | ||
379 | { | ||
380 | DoHTTPGruntWork(m_server, req, | ||
381 | req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
382 | } | ||
383 | else | ||
384 | { | ||
385 | ReQueueEvent(req); | ||
386 | } | ||
387 | } | ||
388 | } | ||
389 | catch (Exception e) | ||
390 | { | ||
391 | m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); | ||
392 | } | ||
393 | } | ||
394 | } | ||
395 | } | ||
396 | |||
397 | // DoHTTPGruntWork changed, not sending response | ||
398 | // do the same work around as core | ||
399 | |||
400 | internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata) | ||
401 | { | ||
402 | OSHttpResponse response | ||
403 | = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext); | ||
404 | |||
405 | byte[] buffer = server.DoHTTPGruntWork(responsedata, response); | ||
406 | |||
407 | response.SendChunked = false; | ||
408 | response.ContentLength64 = buffer.Length; | ||
409 | response.ContentEncoding = Encoding.UTF8; | ||
410 | |||
411 | try | ||
412 | { | ||
413 | response.OutputStream.Write(buffer, 0, buffer.Length); | ||
414 | } | ||
415 | catch (Exception ex) | ||
416 | { | ||
417 | m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex)); | ||
418 | } | ||
419 | finally | ||
420 | { | ||
421 | //response.OutputStream.Close(); | ||
422 | try | ||
423 | { | ||
424 | response.OutputStream.Flush(); | ||
425 | response.Send(); | ||
426 | |||
427 | //if (!response.KeepAlive && response.ReuseContext) | ||
428 | // response.FreeContext(); | ||
429 | } | ||
430 | catch (Exception e) | ||
431 | { | ||
432 | m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e)); | ||
433 | } | ||
434 | } | ||
435 | } | ||
436 | } | ||
437 | } | ||
438 | |||