From 7c0bfca7a03584dd65c5659f177b434ee94ddc9d Mon Sep 17 00:00:00 2001 From: Melanie Date: Fri, 7 Jun 2013 23:43:45 +0100 Subject: Adding Avination's PollService to round out the HTTP inventory changes --- .../Framework/Servers/HttpServer/BaseHttpServer.cs | 3 +- .../Servers/HttpServer/PollServiceEventArgs.cs | 19 +- .../HttpServer/PollServiceRequestManager.cs | 248 ++++++++++++++++----- .../Servers/HttpServer/PollServiceWorkerThread.cs | 165 -------------- 4 files changed, 209 insertions(+), 226 deletions(-) delete mode 100644 OpenSim/Framework/Servers/HttpServer/PollServiceWorkerThread.cs (limited to 'OpenSim/Framework/Servers') diff --git a/OpenSim/Framework/Servers/HttpServer/BaseHttpServer.cs b/OpenSim/Framework/Servers/HttpServer/BaseHttpServer.cs index 96a030b..eb7c578 100644 --- a/OpenSim/Framework/Servers/HttpServer/BaseHttpServer.cs +++ b/OpenSim/Framework/Servers/HttpServer/BaseHttpServer.cs @@ -1805,7 +1805,6 @@ namespace OpenSim.Framework.Servers.HttpServer // Long Poll Service Manager with 3 worker threads a 25 second timeout for no events m_PollServiceManager = new PollServiceRequestManager(this, 3, 25000); - m_PollServiceManager.Start(); HTTPDRunning = true; //HttpListenerContext context; @@ -1856,7 +1855,7 @@ namespace OpenSim.Framework.Servers.HttpServer HTTPDRunning = false; try { - m_PollServiceManager.Stop(); +// m_PollServiceManager.Stop(); m_httpListener2.ExceptionThrown -= httpServerException; //m_httpListener2.DisconnectHandler = null; diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceEventArgs.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceEventArgs.cs index 3089351..c19ac32 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceEventArgs.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceEventArgs.cs @@ -34,7 +34,7 @@ namespace OpenSim.Framework.Servers.HttpServer public delegate void RequestMethod(UUID requestID, Hashtable request); public delegate bool HasEventsMethod(UUID requestID, UUID pId); - public delegate Hashtable GetEventsMethod(UUID requestID, UUID pId, string request); + public delegate Hashtable GetEventsMethod(UUID requestID, UUID pId); public delegate Hashtable NoEventsMethod(UUID requestID, UUID pId); @@ -45,17 +45,30 @@ namespace OpenSim.Framework.Servers.HttpServer public NoEventsMethod NoEvents; public RequestMethod Request; public UUID Id; + public int TimeOutms; + public EventType Type; + + public enum EventType : int + { + Normal = 0, + LslHttp = 1, + Inventory = 2, + Texture = 3, + Mesh = 4 + } public PollServiceEventArgs( RequestMethod pRequest, HasEventsMethod pHasEvents, GetEventsMethod pGetEvents, NoEventsMethod pNoEvents, - UUID pId) + UUID pId, int pTimeOutms) { Request = pRequest; HasEvents = pHasEvents; GetEvents = pGetEvents; NoEvents = pNoEvents; Id = pId; + TimeOutms = pTimeOutms; + Type = EventType.Normal; } } -} \ No newline at end of file +} diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs index 3e84c55..a5380c1 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs @@ -33,53 +33,56 @@ using log4net; using HttpServer; using OpenSim.Framework; using OpenSim.Framework.Monitoring; +using Amib.Threading; +using System.IO; +using System.Text; +using System.Collections.Generic; namespace OpenSim.Framework.Servers.HttpServer { public class PollServiceRequestManager { -// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); private readonly BaseHttpServer m_server; - private static Queue m_requests = Queue.Synchronized(new Queue()); + + private BlockingQueue m_requests = new BlockingQueue(); + private static Queue m_slowRequests = new Queue(); + private static Queue m_retryRequests = new Queue(); + private uint m_WorkerThreadCount = 0; private Thread[] m_workerThreads; - private PollServiceWorkerThread[] m_PollServiceWorkerThreads; - private volatile bool m_running = true; - private int m_pollTimeout; + private Thread m_retrysThread; + + private bool m_running = true; + private int slowCount = 0; + + private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); + +// private int m_timeout = 1000; // increase timeout 250; now use the event one public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) { m_server = pSrv; m_WorkerThreadCount = pWorkerThreadCount; - m_pollTimeout = pTimeout; - } - - public void Start() - { - m_running = true; m_workerThreads = new Thread[m_WorkerThreadCount]; - m_PollServiceWorkerThreads = new PollServiceWorkerThread[m_WorkerThreadCount]; //startup worker threads for (uint i = 0; i < m_WorkerThreadCount; i++) { - m_PollServiceWorkerThreads[i] = new PollServiceWorkerThread(m_server, m_pollTimeout); - m_PollServiceWorkerThreads[i].ReQueue += ReQueueEvent; - m_workerThreads[i] = Watchdog.StartThread( - m_PollServiceWorkerThreads[i].ThreadStart, + PoolWorkerJob, String.Format("PollServiceWorkerThread{0}", i), ThreadPriority.Normal, false, - true, + false, null, int.MaxValue); } - Watchdog.StartThread( - this.ThreadStart, + m_retrysThread = Watchdog.StartThread( + this.CheckRetries, "PollServiceWatcherThread", ThreadPriority.Normal, false, @@ -88,78 +91,211 @@ namespace OpenSim.Framework.Servers.HttpServer 1000 * 60 * 10); } - internal void ReQueueEvent(PollServiceHttpRequest req) + + private void ReQueueEvent(PollServiceHttpRequest req) { - // Do accounting stuff here - Enqueue(req); + if (m_running) + { + lock (m_retryRequests) + m_retryRequests.Enqueue(req); + } } public void Enqueue(PollServiceHttpRequest req) { - lock (m_requests) - m_requests.Enqueue(req); + if (m_running) + { + if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.Normal) + { + m_requests.Enqueue(req); + } + else + { + lock (m_slowRequests) + m_slowRequests.Enqueue(req); + } + } } - public void ThreadStart() + private void CheckRetries() { while (m_running) { + Thread.Sleep(100); // let the world move .. back to faster rate Watchdog.UpdateThread(); - ProcessQueuedRequests(); - Thread.Sleep(1000); + lock (m_retryRequests) + { + while (m_retryRequests.Count > 0 && m_running) + m_requests.Enqueue(m_retryRequests.Dequeue()); + } + slowCount++; + if (slowCount >= 10) + { + slowCount = 0; + + lock (m_slowRequests) + { + while (m_slowRequests.Count > 0 && m_running) + m_requests.Enqueue(m_slowRequests.Dequeue()); + } + } } } - private void ProcessQueuedRequests() + ~PollServiceRequestManager() { - lock (m_requests) + m_running = false; +// m_timeout = -10000; // cause all to expire + Thread.Sleep(1000); // let the world move + + foreach (Thread t in m_workerThreads) + Watchdog.AbortThread(t.ManagedThreadId); + + try + { + foreach (PollServiceHttpRequest req in m_retryRequests) + { + DoHTTPGruntWork(m_server,req, + req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); + } + } + catch + { + } + + PollServiceHttpRequest wreq; + m_retryRequests.Clear(); + + lock (m_slowRequests) { - if (m_requests.Count == 0) - return; + while (m_slowRequests.Count > 0 && m_running) + m_requests.Enqueue(m_slowRequests.Dequeue()); + } -// m_log.DebugFormat("[POLL SERVICE REQUEST MANAGER]: Processing {0} requests", m_requests.Count); + while (m_requests.Count() > 0) + { + try + { + wreq = m_requests.Dequeue(0); + DoHTTPGruntWork(m_server,wreq, + wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id)); + } + catch + { + } + } - int reqperthread = (int) (m_requests.Count/m_WorkerThreadCount) + 1; + m_requests.Clear(); + } - // For Each WorkerThread - for (int tc = 0; tc < m_WorkerThreadCount && m_requests.Count > 0; tc++) + // work threads + + private void PoolWorkerJob() + { + while (m_running) + { + PollServiceHttpRequest req = m_requests.Dequeue(5000); + + Watchdog.UpdateThread(); + if (req != null) { - //Loop over number of requests each thread handles. - for (int i = 0; i < reqperthread && m_requests.Count > 0; i++) + try { - try + if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) { - m_PollServiceWorkerThreads[tc].Enqueue((PollServiceHttpRequest)m_requests.Dequeue()); + Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); + + if (responsedata == null) + continue; + + if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.Normal) // This is the event queue + { + try + { + DoHTTPGruntWork(m_server, req, responsedata); + } + catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream + { + // Ignore it, no need to reply + } + } + else + { + m_threadPool.QueueWorkItem(x => + { + try + { + DoHTTPGruntWork(m_server, req, responsedata); + } + catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream + { + // Ignore it, no need to reply + } + + return null; + }, null); + } } - catch (InvalidOperationException) + else { - // The queue is empty, we did our calculations wrong! - return; + if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) + { + DoHTTPGruntWork(m_server, req, + req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); + } + else + { + ReQueueEvent(req); + } } - + } + catch (Exception e) + { + m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); } } } - } - public void Stop() + // DoHTTPGruntWork changed, not sending response + // do the same work around as core + + internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata) { - m_running = false; + OSHttpResponse response + = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext); - foreach (object o in m_requests) - { - PollServiceHttpRequest req = (PollServiceHttpRequest) o; - PollServiceWorkerThread.DoHTTPGruntWork( - m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); - } + byte[] buffer = server.DoHTTPGruntWork(responsedata, response); - m_requests.Clear(); + response.SendChunked = false; + response.ContentLength64 = buffer.Length; + response.ContentEncoding = Encoding.UTF8; - foreach (Thread t in m_workerThreads) + try + { + response.OutputStream.Write(buffer, 0, buffer.Length); + } + catch (Exception ex) { - t.Abort(); + m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex)); + } + finally + { + //response.OutputStream.Close(); + try + { + response.OutputStream.Flush(); + response.Send(); + + //if (!response.KeepAlive && response.ReuseContext) + // response.FreeContext(); + } + catch (Exception e) + { + m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e)); + } } } } -} \ No newline at end of file +} + diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceWorkerThread.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceWorkerThread.cs deleted file mode 100644 index 5adbcd1..0000000 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceWorkerThread.cs +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Copyright (c) Contributors, http://opensimulator.org/ - * See CONTRIBUTORS.TXT for a full list of copyright holders. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of the OpenSimulator Project nor the - * names of its contributors may be used to endorse or promote products - * derived from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -using System; -using System.Collections; -using System.Collections.Generic; -using System.IO; -using System.Text; -using HttpServer; -using OpenMetaverse; -using System.Reflection; -using log4net; -using OpenSim.Framework.Monitoring; - -namespace OpenSim.Framework.Servers.HttpServer -{ - public delegate void ReQueuePollServiceItem(PollServiceHttpRequest req); - - public class PollServiceWorkerThread - { - private static readonly ILog m_log = - LogManager.GetLogger( - MethodBase.GetCurrentMethod().DeclaringType); - - public event ReQueuePollServiceItem ReQueue; - - private readonly BaseHttpServer m_server; - private BlockingQueue m_request; - private bool m_running = true; - private int m_timeout = 250; - - public PollServiceWorkerThread(BaseHttpServer pSrv, int pTimeout) - { - m_request = new BlockingQueue(); - m_server = pSrv; - m_timeout = pTimeout; - } - - public void ThreadStart() - { - Run(); - } - - public void Run() - { - while (m_running) - { - PollServiceHttpRequest req = m_request.Dequeue(); - - Watchdog.UpdateThread(); - - try - { - if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) - { - StreamReader str; - try - { - str = new StreamReader(req.Request.Body); - } - catch (System.ArgumentException) - { - // Stream was not readable means a child agent - // was closed due to logout, leaving the - // Event Queue request orphaned. - continue; - } - - Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id, str.ReadToEnd()); - DoHTTPGruntWork(m_server, req, responsedata); - } - else - { - if ((Environment.TickCount - req.RequestTime) > m_timeout) - { - DoHTTPGruntWork( - m_server, - req, - req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); - } - else - { - ReQueuePollServiceItem reQueueItem = ReQueue; - if (reQueueItem != null) - reQueueItem(req); - } - } - } - catch (Exception e) - { - m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); - } - } - } - - internal void Enqueue(PollServiceHttpRequest pPollServiceHttpRequest) - { - m_request.Enqueue(pPollServiceHttpRequest); - } - - /// - /// FIXME: This should be part of BaseHttpServer - /// - internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata) - { - OSHttpResponse response - = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext); - - byte[] buffer = server.DoHTTPGruntWork(responsedata, response); - - response.SendChunked = false; - response.ContentLength64 = buffer.Length; - response.ContentEncoding = Encoding.UTF8; - - try - { - response.OutputStream.Write(buffer, 0, buffer.Length); - } - catch (Exception ex) - { - m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex)); - } - finally - { - //response.OutputStream.Close(); - try - { - response.OutputStream.Flush(); - response.Send(); - - //if (!response.KeepAlive && response.ReuseContext) - // response.FreeContext(); - } - catch (Exception e) - { - m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e)); - } - } - } - } -} \ No newline at end of file -- cgit v1.1