From 134f86e8d5c414409631b25b8c6f0ee45fbd8631 Mon Sep 17 00:00:00 2001 From: David Walter Seikel Date: Thu, 3 Nov 2016 21:44:39 +1000 Subject: Initial update to OpenSim 0.8.2.1 source code. --- .../HttpServer/PollServiceRequestManager.cs | 318 ++++++++++++++++----- 1 file changed, 242 insertions(+), 76 deletions(-) (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs') diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs index 3e84c55..28bba70 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs @@ -33,132 +33,298 @@ using log4net; using HttpServer; using OpenSim.Framework; using OpenSim.Framework.Monitoring; +using Amib.Threading; +using System.IO; +using System.Text; +using System.Collections.Generic; namespace OpenSim.Framework.Servers.HttpServer { public class PollServiceRequestManager { -// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + + /// + /// Is the poll service request manager running? + /// + /// + /// Can be running either synchronously or asynchronously + /// + public bool IsRunning { get; private set; } + + /// + /// Is the poll service performing responses asynchronously (with its own threads) or synchronously (via + /// external calls)? + /// + public bool PerformResponsesAsync { get; private set; } + + /// + /// Number of responses actually processed and sent to viewer (or aborted due to error). + /// + public int ResponsesProcessed { get; private set; } private readonly BaseHttpServer m_server; - private static Queue m_requests = Queue.Synchronized(new Queue()); + + private BlockingQueue m_requests = new BlockingQueue(); + private static List m_longPollRequests = new List(); + private uint m_WorkerThreadCount = 0; private Thread[] m_workerThreads; - private PollServiceWorkerThread[] m_PollServiceWorkerThreads; - private volatile bool m_running = true; - private int m_pollTimeout; - public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) + private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); + +// private int m_timeout = 1000; // increase timeout 250; now use the event one + + public PollServiceRequestManager( + BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout) { m_server = pSrv; + PerformResponsesAsync = performResponsesAsync; m_WorkerThreadCount = pWorkerThreadCount; - m_pollTimeout = pTimeout; + m_workerThreads = new Thread[m_WorkerThreadCount]; + + StatsManager.RegisterStat( + new Stat( + "QueuedPollResponses", + "Number of poll responses queued for processing.", + "", + "", + "httpserver", + m_server.Port.ToString(), + StatType.Pull, + MeasuresOfInterest.AverageChangeOverTime, + stat => stat.Value = m_requests.Count(), + StatVerbosity.Debug)); + + StatsManager.RegisterStat( + new Stat( + "ProcessedPollResponses", + "Number of poll responses processed.", + "", + "", + "httpserver", + m_server.Port.ToString(), + StatType.Pull, + MeasuresOfInterest.AverageChangeOverTime, + stat => stat.Value = ResponsesProcessed, + StatVerbosity.Debug)); } public void Start() { - m_running = true; - m_workerThreads = new Thread[m_WorkerThreadCount]; - m_PollServiceWorkerThreads = new PollServiceWorkerThread[m_WorkerThreadCount]; + IsRunning = true; - //startup worker threads - for (uint i = 0; i < m_WorkerThreadCount; i++) + if (PerformResponsesAsync) { - m_PollServiceWorkerThreads[i] = new PollServiceWorkerThread(m_server, m_pollTimeout); - m_PollServiceWorkerThreads[i].ReQueue += ReQueueEvent; - - m_workerThreads[i] - = Watchdog.StartThread( - m_PollServiceWorkerThreads[i].ThreadStart, - String.Format("PollServiceWorkerThread{0}", i), - ThreadPriority.Normal, - false, - true, - null, - int.MaxValue); - } + //startup worker threads + for (uint i = 0; i < m_WorkerThreadCount; i++) + { + m_workerThreads[i] + = WorkManager.StartThread( + PoolWorkerJob, + string.Format("PollServiceWorkerThread{0}:{1}", i, m_server.Port), + ThreadPriority.Normal, + false, + false, + null, + int.MaxValue); + } - Watchdog.StartThread( - this.ThreadStart, - "PollServiceWatcherThread", - ThreadPriority.Normal, - false, - true, - null, - 1000 * 60 * 10); + WorkManager.StartThread( + this.CheckLongPollThreads, + string.Format("LongPollServiceWatcherThread:{0}", m_server.Port), + ThreadPriority.Normal, + false, + true, + null, + 1000 * 60 * 10); + } } - internal void ReQueueEvent(PollServiceHttpRequest req) + private void ReQueueEvent(PollServiceHttpRequest req) { - // Do accounting stuff here - Enqueue(req); - } + if (IsRunning) + { + // delay the enqueueing for 100ms. There's no need to have the event + // actively on the queue + Timer t = new Timer(self => { + ((Timer)self).Dispose(); + m_requests.Enqueue(req); + }); - public void Enqueue(PollServiceHttpRequest req) - { - lock (m_requests) - m_requests.Enqueue(req); + t.Change(100, Timeout.Infinite); + + } } - public void ThreadStart() + public void Enqueue(PollServiceHttpRequest req) { - while (m_running) + if (IsRunning) { - Watchdog.UpdateThread(); - ProcessQueuedRequests(); - Thread.Sleep(1000); + if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) + { + lock (m_longPollRequests) + m_longPollRequests.Add(req); + } + else + m_requests.Enqueue(req); } } - private void ProcessQueuedRequests() + private void CheckLongPollThreads() { - lock (m_requests) + // The only purpose of this thread is to check the EQs for events. + // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests. + // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests. + // All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature, + // so if they aren't ready to be served by a worker thread (no events), they are placed + // directly back in the "ready-to-serve" queue by the worker thread. + while (IsRunning) { - if (m_requests.Count == 0) - return; - -// m_log.DebugFormat("[POLL SERVICE REQUEST MANAGER]: Processing {0} requests", m_requests.Count); - - int reqperthread = (int) (m_requests.Count/m_WorkerThreadCount) + 1; + Thread.Sleep(500); + Watchdog.UpdateThread(); - // For Each WorkerThread - for (int tc = 0; tc < m_WorkerThreadCount && m_requests.Count > 0; tc++) +// List not_ready = new List(); + lock (m_longPollRequests) { - //Loop over number of requests each thread handles. - for (int i = 0; i < reqperthread && m_requests.Count > 0; i++) + if (m_longPollRequests.Count > 0 && IsRunning) { - try - { - m_PollServiceWorkerThreads[tc].Enqueue((PollServiceHttpRequest)m_requests.Dequeue()); - } - catch (InvalidOperationException) - { - // The queue is empty, we did our calculations wrong! - return; - } - + List ready = m_longPollRequests.FindAll(req => + (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ + (Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout + ); + + ready.ForEach(req => + { + m_requests.Enqueue(req); + m_longPollRequests.Remove(req); + }); + } + } } - } public void Stop() { - m_running = false; + IsRunning = false; +// m_timeout = -10000; // cause all to expire + Thread.Sleep(1000); // let the world move + + foreach (Thread t in m_workerThreads) + Watchdog.AbortThread(t.ManagedThreadId); + + PollServiceHttpRequest wreq; - foreach (object o in m_requests) + lock (m_longPollRequests) { - PollServiceHttpRequest req = (PollServiceHttpRequest) o; - PollServiceWorkerThread.DoHTTPGruntWork( - m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); + if (m_longPollRequests.Count > 0 && IsRunning) + m_longPollRequests.ForEach(req => m_requests.Enqueue(req)); } + while (m_requests.Count() > 0) + { + try + { + wreq = m_requests.Dequeue(0); + ResponsesProcessed++; + wreq.DoHTTPGruntWork( + m_server, wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id)); + } + catch + { + } + } + + m_longPollRequests.Clear(); m_requests.Clear(); + } - foreach (Thread t in m_workerThreads) + // work threads + + private void PoolWorkerJob() + { + while (IsRunning) { - t.Abort(); + Watchdog.UpdateThread(); + WaitPerformResponse(); + } + } + + public void WaitPerformResponse() + { + PollServiceHttpRequest req = m_requests.Dequeue(5000); +// m_log.DebugFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString())); + + if (req != null) + { + try + { + if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) + { + Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); + + if (responsedata == null) + return; + + // This is the event queue. + // Even if we're not running we can still perform responses by explicit request. + if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll + || !PerformResponsesAsync) + { + try + { + ResponsesProcessed++; + req.DoHTTPGruntWork(m_server, responsedata); + } + catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream + { + // Ignore it, no need to reply + m_log.Error(e); + } + } + else + { + m_threadPool.QueueWorkItem(x => + { + try + { + ResponsesProcessed++; + req.DoHTTPGruntWork(m_server, responsedata); + } + catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream + { + // Ignore it, no need to reply + m_log.Error(e); + } + catch (Exception e) + { + m_log.Error(e); + } + + return null; + }, null); + } + } + else + { + if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) + { + ResponsesProcessed++; + req.DoHTTPGruntWork( + m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); + } + else + { + ReQueueEvent(req); + } + } + } + catch (Exception e) + { + m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); + } } } } -- cgit v1.1