From 7c0bfca7a03584dd65c5659f177b434ee94ddc9d Mon Sep 17 00:00:00 2001 From: Melanie Date: Fri, 7 Jun 2013 23:43:45 +0100 Subject: Adding Avination's PollService to round out the HTTP inventory changes --- .../HttpServer/PollServiceRequestManager.cs | 248 ++++++++++++++++----- 1 file changed, 192 insertions(+), 56 deletions(-) (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs') diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs index 3e84c55..a5380c1 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs @@ -33,53 +33,56 @@ using log4net; using HttpServer; using OpenSim.Framework; using OpenSim.Framework.Monitoring; +using Amib.Threading; +using System.IO; +using System.Text; +using System.Collections.Generic; namespace OpenSim.Framework.Servers.HttpServer { public class PollServiceRequestManager { -// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); private readonly BaseHttpServer m_server; - private static Queue m_requests = Queue.Synchronized(new Queue()); + + private BlockingQueue m_requests = new BlockingQueue(); + private static Queue m_slowRequests = new Queue(); + private static Queue m_retryRequests = new Queue(); + private uint m_WorkerThreadCount = 0; private Thread[] m_workerThreads; - private PollServiceWorkerThread[] m_PollServiceWorkerThreads; - private volatile bool m_running = true; - private int m_pollTimeout; + private Thread m_retrysThread; + + private bool m_running = true; + private int slowCount = 0; + + private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); + +// private int m_timeout = 1000; // increase timeout 250; now use the event one public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) { m_server = pSrv; m_WorkerThreadCount = pWorkerThreadCount; - m_pollTimeout = pTimeout; - } - - public void Start() - { - m_running = true; m_workerThreads = new Thread[m_WorkerThreadCount]; - m_PollServiceWorkerThreads = new PollServiceWorkerThread[m_WorkerThreadCount]; //startup worker threads for (uint i = 0; i < m_WorkerThreadCount; i++) { - m_PollServiceWorkerThreads[i] = new PollServiceWorkerThread(m_server, m_pollTimeout); - m_PollServiceWorkerThreads[i].ReQueue += ReQueueEvent; - m_workerThreads[i] = Watchdog.StartThread( - m_PollServiceWorkerThreads[i].ThreadStart, + PoolWorkerJob, String.Format("PollServiceWorkerThread{0}", i), ThreadPriority.Normal, false, - true, + false, null, int.MaxValue); } - Watchdog.StartThread( - this.ThreadStart, + m_retrysThread = Watchdog.StartThread( + this.CheckRetries, "PollServiceWatcherThread", ThreadPriority.Normal, false, @@ -88,78 +91,211 @@ namespace OpenSim.Framework.Servers.HttpServer 1000 * 60 * 10); } - internal void ReQueueEvent(PollServiceHttpRequest req) + + private void ReQueueEvent(PollServiceHttpRequest req) { - // Do accounting stuff here - Enqueue(req); + if (m_running) + { + lock (m_retryRequests) + m_retryRequests.Enqueue(req); + } } public void Enqueue(PollServiceHttpRequest req) { - lock (m_requests) - m_requests.Enqueue(req); + if (m_running) + { + if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.Normal) + { + m_requests.Enqueue(req); + } + else + { + lock (m_slowRequests) + m_slowRequests.Enqueue(req); + } + } } - public void ThreadStart() + private void CheckRetries() { while (m_running) { + Thread.Sleep(100); // let the world move .. back to faster rate Watchdog.UpdateThread(); - ProcessQueuedRequests(); - Thread.Sleep(1000); + lock (m_retryRequests) + { + while (m_retryRequests.Count > 0 && m_running) + m_requests.Enqueue(m_retryRequests.Dequeue()); + } + slowCount++; + if (slowCount >= 10) + { + slowCount = 0; + + lock (m_slowRequests) + { + while (m_slowRequests.Count > 0 && m_running) + m_requests.Enqueue(m_slowRequests.Dequeue()); + } + } } } - private void ProcessQueuedRequests() + ~PollServiceRequestManager() { - lock (m_requests) + m_running = false; +// m_timeout = -10000; // cause all to expire + Thread.Sleep(1000); // let the world move + + foreach (Thread t in m_workerThreads) + Watchdog.AbortThread(t.ManagedThreadId); + + try + { + foreach (PollServiceHttpRequest req in m_retryRequests) + { + DoHTTPGruntWork(m_server,req, + req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); + } + } + catch + { + } + + PollServiceHttpRequest wreq; + m_retryRequests.Clear(); + + lock (m_slowRequests) { - if (m_requests.Count == 0) - return; + while (m_slowRequests.Count > 0 && m_running) + m_requests.Enqueue(m_slowRequests.Dequeue()); + } -// m_log.DebugFormat("[POLL SERVICE REQUEST MANAGER]: Processing {0} requests", m_requests.Count); + while (m_requests.Count() > 0) + { + try + { + wreq = m_requests.Dequeue(0); + DoHTTPGruntWork(m_server,wreq, + wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id)); + } + catch + { + } + } - int reqperthread = (int) (m_requests.Count/m_WorkerThreadCount) + 1; + m_requests.Clear(); + } - // For Each WorkerThread - for (int tc = 0; tc < m_WorkerThreadCount && m_requests.Count > 0; tc++) + // work threads + + private void PoolWorkerJob() + { + while (m_running) + { + PollServiceHttpRequest req = m_requests.Dequeue(5000); + + Watchdog.UpdateThread(); + if (req != null) { - //Loop over number of requests each thread handles. - for (int i = 0; i < reqperthread && m_requests.Count > 0; i++) + try { - try + if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) { - m_PollServiceWorkerThreads[tc].Enqueue((PollServiceHttpRequest)m_requests.Dequeue()); + Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); + + if (responsedata == null) + continue; + + if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.Normal) // This is the event queue + { + try + { + DoHTTPGruntWork(m_server, req, responsedata); + } + catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream + { + // Ignore it, no need to reply + } + } + else + { + m_threadPool.QueueWorkItem(x => + { + try + { + DoHTTPGruntWork(m_server, req, responsedata); + } + catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream + { + // Ignore it, no need to reply + } + + return null; + }, null); + } } - catch (InvalidOperationException) + else { - // The queue is empty, we did our calculations wrong! - return; + if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) + { + DoHTTPGruntWork(m_server, req, + req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); + } + else + { + ReQueueEvent(req); + } } - + } + catch (Exception e) + { + m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); } } } - } - public void Stop() + // DoHTTPGruntWork changed, not sending response + // do the same work around as core + + internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata) { - m_running = false; + OSHttpResponse response + = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext); - foreach (object o in m_requests) - { - PollServiceHttpRequest req = (PollServiceHttpRequest) o; - PollServiceWorkerThread.DoHTTPGruntWork( - m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); - } + byte[] buffer = server.DoHTTPGruntWork(responsedata, response); - m_requests.Clear(); + response.SendChunked = false; + response.ContentLength64 = buffer.Length; + response.ContentEncoding = Encoding.UTF8; - foreach (Thread t in m_workerThreads) + try + { + response.OutputStream.Write(buffer, 0, buffer.Length); + } + catch (Exception ex) { - t.Abort(); + m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex)); + } + finally + { + //response.OutputStream.Close(); + try + { + response.OutputStream.Flush(); + response.Send(); + + //if (!response.KeepAlive && response.ReuseContext) + // response.FreeContext(); + } + catch (Exception e) + { + m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e)); + } } } } -} \ No newline at end of file +} + -- cgit v1.1