From 5e4d6cab00cb29cd088ab7b62ab13aff103b64cb Mon Sep 17 00:00:00 2001 From: onefang Date: Sun, 19 May 2019 21:24:15 +1000 Subject: Dump OpenSim 0.9.0.1 into it's own branch. --- .../HttpServer/PollServiceRequestManager.cs | 327 +++++++++------------ 1 file changed, 146 insertions(+), 181 deletions(-) (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs') diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs index 28bba70..c6a3e65 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs @@ -44,200 +44,183 @@ namespace OpenSim.Framework.Servers.HttpServer { private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - /// - /// Is the poll service request manager running? - /// - /// - /// Can be running either synchronously or asynchronously - /// - public bool IsRunning { get; private set; } - - /// - /// Is the poll service performing responses asynchronously (with its own threads) or synchronously (via - /// external calls)? - /// - public bool PerformResponsesAsync { get; private set; } - - /// - /// Number of responses actually processed and sent to viewer (or aborted due to error). - /// - public int ResponsesProcessed { get; private set; } - private readonly BaseHttpServer m_server; + private Dictionary> m_bycontext; private BlockingQueue m_requests = new BlockingQueue(); - private static List m_longPollRequests = new List(); + private static Queue m_retryRequests = new Queue(); private uint m_WorkerThreadCount = 0; private Thread[] m_workerThreads; + private Thread m_retrysThread; - private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); + private bool m_running = false; -// private int m_timeout = 1000; // increase timeout 250; now use the event one + private SmartThreadPool m_threadPool; public PollServiceRequestManager( BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout) { m_server = pSrv; - PerformResponsesAsync = performResponsesAsync; m_WorkerThreadCount = pWorkerThreadCount; m_workerThreads = new Thread[m_WorkerThreadCount]; - StatsManager.RegisterStat( - new Stat( - "QueuedPollResponses", - "Number of poll responses queued for processing.", - "", - "", - "httpserver", - m_server.Port.ToString(), - StatType.Pull, - MeasuresOfInterest.AverageChangeOverTime, - stat => stat.Value = m_requests.Count(), - StatVerbosity.Debug)); - - StatsManager.RegisterStat( - new Stat( - "ProcessedPollResponses", - "Number of poll responses processed.", - "", - "", - "httpserver", - m_server.Port.ToString(), - StatType.Pull, - MeasuresOfInterest.AverageChangeOverTime, - stat => stat.Value = ResponsesProcessed, - StatVerbosity.Debug)); + PollServiceHttpRequestComparer preqCp = new PollServiceHttpRequestComparer(); + m_bycontext = new Dictionary>(preqCp); + + STPStartInfo startInfo = new STPStartInfo(); + startInfo.IdleTimeout = 30000; + startInfo.MaxWorkerThreads = 20; + startInfo.MinWorkerThreads = 1; + startInfo.ThreadPriority = ThreadPriority.Normal; + startInfo.StartSuspended = true; + startInfo.ThreadPoolName = "PoolService"; + + m_threadPool = new SmartThreadPool(startInfo); } public void Start() { - IsRunning = true; - - if (PerformResponsesAsync) + m_running = true; + m_threadPool.Start(); + //startup worker threads + for (uint i = 0; i < m_WorkerThreadCount; i++) { - //startup worker threads - for (uint i = 0; i < m_WorkerThreadCount; i++) - { - m_workerThreads[i] - = WorkManager.StartThread( - PoolWorkerJob, - string.Format("PollServiceWorkerThread{0}:{1}", i, m_server.Port), - ThreadPriority.Normal, - false, - false, - null, - int.MaxValue); - } - - WorkManager.StartThread( - this.CheckLongPollThreads, - string.Format("LongPollServiceWatcherThread:{0}", m_server.Port), - ThreadPriority.Normal, - false, - true, - null, - 1000 * 60 * 10); + m_workerThreads[i] + = WorkManager.StartThread( + PoolWorkerJob, + string.Format("PollServiceWorkerThread {0}:{1}", i, m_server.Port), + ThreadPriority.Normal, + true, + false, + null, + int.MaxValue); } + + m_retrysThread = WorkManager.StartThread( + this.CheckRetries, + string.Format("PollServiceWatcherThread:{0}", m_server.Port), + ThreadPriority.Normal, + true, + true, + null, + 1000 * 60 * 10); + + } private void ReQueueEvent(PollServiceHttpRequest req) { - if (IsRunning) + if (m_running) { - // delay the enqueueing for 100ms. There's no need to have the event - // actively on the queue - Timer t = new Timer(self => { - ((Timer)self).Dispose(); - m_requests.Enqueue(req); - }); - - t.Change(100, Timeout.Infinite); - + lock (m_retryRequests) + m_retryRequests.Enqueue(req); } } public void Enqueue(PollServiceHttpRequest req) { - if (IsRunning) + lock (m_bycontext) { - if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) + Queue ctxQeueue; + if (m_bycontext.TryGetValue(req, out ctxQeueue)) { - lock (m_longPollRequests) - m_longPollRequests.Add(req); + ctxQeueue.Enqueue(req); } else - m_requests.Enqueue(req); + { + ctxQeueue = new Queue(); + m_bycontext[req] = ctxQeueue; + EnqueueInt(req); + } } } - private void CheckLongPollThreads() + public void byContextDequeue(PollServiceHttpRequest req) { - // The only purpose of this thread is to check the EQs for events. - // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests. - // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests. - // All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature, - // so if they aren't ready to be served by a worker thread (no events), they are placed - // directly back in the "ready-to-serve" queue by the worker thread. - while (IsRunning) + Queue ctxQeueue; + lock (m_bycontext) { - Thread.Sleep(500); - Watchdog.UpdateThread(); - -// List not_ready = new List(); - lock (m_longPollRequests) + if (m_bycontext.TryGetValue(req, out ctxQeueue)) { - if (m_longPollRequests.Count > 0 && IsRunning) + if (ctxQeueue.Count > 0) + { + PollServiceHttpRequest newreq = ctxQeueue.Dequeue(); + EnqueueInt(newreq); + } + else { - List ready = m_longPollRequests.FindAll(req => - (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ - (Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout - ); + m_bycontext.Remove(req); + } + } + } + } - ready.ForEach(req => - { - m_requests.Enqueue(req); - m_longPollRequests.Remove(req); - }); + public void EnqueueInt(PollServiceHttpRequest req) + { + if (m_running) + m_requests.Enqueue(req); + } - } + private void CheckRetries() + { + while (m_running) + { + Thread.Sleep(100); // let the world move .. back to faster rate + Watchdog.UpdateThread(); + lock (m_retryRequests) + { + while (m_retryRequests.Count > 0 && m_running) + m_requests.Enqueue(m_retryRequests.Dequeue()); } } } public void Stop() { - IsRunning = false; -// m_timeout = -10000; // cause all to expire - Thread.Sleep(1000); // let the world move + m_running = false; + + Thread.Sleep(100); // let the world move foreach (Thread t in m_workerThreads) Watchdog.AbortThread(t.ManagedThreadId); - PollServiceHttpRequest wreq; + m_threadPool.Shutdown(); + + // any entry in m_bycontext should have a active request on the other queues + // so just delete contents to easy GC + foreach (Queue qu in m_bycontext.Values) + qu.Clear(); + m_bycontext.Clear(); - lock (m_longPollRequests) + try + { + foreach (PollServiceHttpRequest req in m_retryRequests) + { + req.DoHTTPstop(m_server); + } + } + catch { - if (m_longPollRequests.Count > 0 && IsRunning) - m_longPollRequests.ForEach(req => m_requests.Enqueue(req)); } + PollServiceHttpRequest wreq; + + m_retryRequests.Clear(); + while (m_requests.Count() > 0) { try { wreq = m_requests.Dequeue(0); - ResponsesProcessed++; - wreq.DoHTTPGruntWork( - m_server, wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id)); + wreq.DoHTTPstop(m_server); } catch { } } - m_longPollRequests.Clear(); m_requests.Clear(); } @@ -245,87 +228,69 @@ namespace OpenSim.Framework.Servers.HttpServer private void PoolWorkerJob() { - while (IsRunning) + while (m_running) { + PollServiceHttpRequest req = m_requests.Dequeue(4500); Watchdog.UpdateThread(); - WaitPerformResponse(); - } - } - - public void WaitPerformResponse() - { - PollServiceHttpRequest req = m_requests.Dequeue(5000); -// m_log.DebugFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString())); - - if (req != null) - { - try + if (req != null) { - if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) + try { - Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); - - if (responsedata == null) - return; - - // This is the event queue. - // Even if we're not running we can still perform responses by explicit request. - if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll - || !PerformResponsesAsync) - { - try - { - ResponsesProcessed++; - req.DoHTTPGruntWork(m_server, responsedata); - } - catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream - { - // Ignore it, no need to reply - m_log.Error(e); - } - } - else + if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) { + Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); + m_threadPool.QueueWorkItem(x => { try { - ResponsesProcessed++; req.DoHTTPGruntWork(m_server, responsedata); } - catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream + catch (ObjectDisposedException) { - // Ignore it, no need to reply - m_log.Error(e); } - catch (Exception e) + finally { - m_log.Error(e); + byContextDequeue(req); } - return null; }, null); } - } - else - { - if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) - { - ResponsesProcessed++; - req.DoHTTPGruntWork( - m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); - } else { - ReQueueEvent(req); + if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) + { + m_threadPool.QueueWorkItem(x => + { + try + { + req.DoHTTPGruntWork(m_server, + req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); + } + catch (ObjectDisposedException) + { + // Ignore it, no need to reply + } + finally + { + byContextDequeue(req); + } + return null; + }, null); + } + else + { + ReQueueEvent(req); + } } } - } - catch (Exception e) - { - m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); + catch (Exception e) + { + m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); + } } } } + } -} \ No newline at end of file +} -- cgit v1.1