From 7c0bfca7a03584dd65c5659f177b434ee94ddc9d Mon Sep 17 00:00:00 2001
From: Melanie
Date: Fri, 7 Jun 2013 23:43:45 +0100
Subject: Adding Avination's PollService to round out the HTTP inventory
 changes

---
 OpenSim/Framework/Console/RemoteConsole.cs         |   4 +-
 .../Framework/Servers/HttpServer/BaseHttpServer.cs |   3 +-
 .../Servers/HttpServer/PollServiceEventArgs.cs     |  19 +-
 .../HttpServer/PollServiceRequestManager.cs        | 248 ++++++++++++++++-----
 .../Servers/HttpServer/PollServiceWorkerThread.cs  | 165 --------------
 5 files changed, 211 insertions(+), 228 deletions(-)
 delete mode 100644 OpenSim/Framework/Servers/HttpServer/PollServiceWorkerThread.cs

(limited to 'OpenSim/Framework')

diff --git a/OpenSim/Framework/Console/RemoteConsole.cs b/OpenSim/Framework/Console/RemoteConsole.cs
index 27edd4b..3e3c2b3 100644
--- a/OpenSim/Framework/Console/RemoteConsole.cs
+++ b/OpenSim/Framework/Console/RemoteConsole.cs
@@ -234,7 +234,7 @@ namespace OpenSim.Framework.Console
             string uri = "/ReadResponses/" + sessionID.ToString() + "/";
 
             m_Server.AddPollServiceHTTPHandler(
-                uri, new PollServiceEventArgs(null, HasEvents, GetEvents, NoEvents, sessionID));
+                uri, new PollServiceEventArgs(null, HasEvents, GetEvents, NoEvents, sessionID,25000)); // 25 secs timeout
 
             XmlDocument xmldoc = new XmlDocument();
             XmlNode xmlnode = xmldoc.CreateNode(XmlNodeType.XmlDeclaration,
@@ -425,7 +425,7 @@ namespace OpenSim.Framework.Console
             return false;
         }
 
-        private Hashtable GetEvents(UUID RequestID, UUID sessionID, string request)
+        private Hashtable GetEvents(UUID RequestID, UUID sessionID)
         {
             ConsoleConnection c = null;
 
diff --git a/OpenSim/Framework/Servers/HttpServer/BaseHttpServer.cs b/OpenSim/Framework/Servers/HttpServer/BaseHttpServer.cs
index 96a030b..eb7c578 100644
--- a/OpenSim/Framework/Servers/HttpServer/BaseHttpServer.cs
+++ b/OpenSim/Framework/Servers/HttpServer/BaseHttpServer.cs
@@ -1805,7 +1805,6 @@ namespace OpenSim.Framework.Servers.HttpServer
 
                 // Long Poll Service Manager with 3 worker threads a 25 second timeout for no events
                 m_PollServiceManager = new PollServiceRequestManager(this, 3, 25000);
-                m_PollServiceManager.Start();
                 HTTPDRunning = true;
 
                 //HttpListenerContext context;
@@ -1856,7 +1855,7 @@ namespace OpenSim.Framework.Servers.HttpServer
             HTTPDRunning = false;
             try
             {
-                m_PollServiceManager.Stop();
+//                m_PollServiceManager.Stop();
 
                 m_httpListener2.ExceptionThrown -= httpServerException;
                 //m_httpListener2.DisconnectHandler = null;
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceEventArgs.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceEventArgs.cs
index 3089351..c19ac32 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceEventArgs.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceEventArgs.cs
@@ -34,7 +34,7 @@ namespace OpenSim.Framework.Servers.HttpServer
     public delegate void RequestMethod(UUID requestID, Hashtable request);
     public delegate bool HasEventsMethod(UUID requestID, UUID pId);
 
-    public delegate Hashtable GetEventsMethod(UUID requestID, UUID pId, string request);
+    public delegate Hashtable GetEventsMethod(UUID requestID, UUID pId);
 
     public delegate Hashtable NoEventsMethod(UUID requestID, UUID pId);
 
@@ -45,17 +45,30 @@ namespace OpenSim.Framework.Servers.HttpServer
         public NoEventsMethod NoEvents;
         public RequestMethod Request;
         public UUID Id;
+        public int TimeOutms;
+        public EventType Type;    
+
+        public enum EventType : int
+        {
+            Normal = 0,
+            LslHttp = 1,
+            Inventory = 2,
+            Texture = 3, 
+            Mesh = 4
+        }
 
         public PollServiceEventArgs(
             RequestMethod pRequest,
             HasEventsMethod pHasEvents, GetEventsMethod pGetEvents, NoEventsMethod pNoEvents,
-            UUID pId)
+            UUID pId, int pTimeOutms)
         {
             Request = pRequest;
             HasEvents = pHasEvents;
             GetEvents = pGetEvents;
             NoEvents = pNoEvents;
             Id = pId;
+            TimeOutms = pTimeOutms;
+            Type = EventType.Normal;
         }
     }
-}
\ No newline at end of file
+}
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index 3e84c55..a5380c1 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -33,53 +33,56 @@ using log4net;
 using HttpServer;
 using OpenSim.Framework;
 using OpenSim.Framework.Monitoring;
+using Amib.Threading;
+using System.IO;
+using System.Text;
+using System.Collections.Generic;
 
 namespace OpenSim.Framework.Servers.HttpServer
 {
     public class PollServiceRequestManager
     {
-//        private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+        private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
 
         private readonly BaseHttpServer m_server;
-        private static Queue m_requests = Queue.Synchronized(new Queue());
+
+        private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
+        private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>();
+        private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
+
         private uint m_WorkerThreadCount = 0;
         private Thread[] m_workerThreads;
-        private PollServiceWorkerThread[] m_PollServiceWorkerThreads;
-        private volatile bool m_running = true;
-        private int m_pollTimeout;
+        private Thread m_retrysThread;
+
+        private bool m_running = true;
+        private int slowCount = 0;
+
+        private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2);
+
+//        private int m_timeout = 1000;   //  increase timeout 250; now use the event one
 
         public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout)
         {
             m_server = pSrv;
             m_WorkerThreadCount = pWorkerThreadCount;
-            m_pollTimeout = pTimeout;
-        }
-
-        public void Start()
-        {
-            m_running = true;
             m_workerThreads = new Thread[m_WorkerThreadCount];
-            m_PollServiceWorkerThreads = new PollServiceWorkerThread[m_WorkerThreadCount];
 
             //startup worker threads
             for (uint i = 0; i < m_WorkerThreadCount; i++)
             {
-                m_PollServiceWorkerThreads[i] = new PollServiceWorkerThread(m_server, m_pollTimeout);
-                m_PollServiceWorkerThreads[i].ReQueue += ReQueueEvent;
-
                 m_workerThreads[i]
                     = Watchdog.StartThread(
-                        m_PollServiceWorkerThreads[i].ThreadStart,
+                        PoolWorkerJob,
                         String.Format("PollServiceWorkerThread{0}", i),
                         ThreadPriority.Normal,
                         false,
-                        true,
+                        false,
                         null,
                         int.MaxValue);
             }
 
-            Watchdog.StartThread(
-                this.ThreadStart,
+            m_retrysThread = Watchdog.StartThread(
+                this.CheckRetries,
                 "PollServiceWatcherThread",
                 ThreadPriority.Normal,
                 false,
@@ -88,78 +91,211 @@ namespace OpenSim.Framework.Servers.HttpServer
                 1000 * 60 * 10);
         }
 
-        internal void ReQueueEvent(PollServiceHttpRequest req)
+
+        private void ReQueueEvent(PollServiceHttpRequest req)
         {
-            // Do accounting stuff here
-            Enqueue(req);
+            if (m_running)
+            {
+                lock (m_retryRequests)
+                    m_retryRequests.Enqueue(req);
+            }
         }
 
         public void Enqueue(PollServiceHttpRequest req)
         {
-            lock (m_requests)
-                m_requests.Enqueue(req);
+            if (m_running)
+            {
+                if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.Normal)
+                {
+                    m_requests.Enqueue(req);
+                }
+                else
+                {
+                    lock (m_slowRequests)
+                        m_slowRequests.Enqueue(req);
+                }
+            }
         }
 
-        public void ThreadStart()
+        private void CheckRetries()
         {
             while (m_running)
             {
+                Thread.Sleep(100); // let the world move  .. back to faster rate
                 Watchdog.UpdateThread();
-                ProcessQueuedRequests();
-                Thread.Sleep(1000);
+                lock (m_retryRequests)
+                {
+                    while (m_retryRequests.Count > 0 && m_running)
+                        m_requests.Enqueue(m_retryRequests.Dequeue());
+                }
+                slowCount++;
+                if (slowCount >= 10)
+                {
+                    slowCount = 0;
+
+                    lock (m_slowRequests)
+                    {
+                        while (m_slowRequests.Count > 0 && m_running)
+                            m_requests.Enqueue(m_slowRequests.Dequeue());
+                    }
+                }
             }
         }
 
-        private void ProcessQueuedRequests()
+        ~PollServiceRequestManager()
         {
-            lock (m_requests)
+            m_running = false;
+//            m_timeout = -10000; // cause all to expire
+            Thread.Sleep(1000); // let the world move
+
+            foreach (Thread t in m_workerThreads)
+                    Watchdog.AbortThread(t.ManagedThreadId);
+
+            try
+            {
+                foreach (PollServiceHttpRequest req in m_retryRequests)
+                {
+                   DoHTTPGruntWork(m_server,req,
+                        req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
+                }
+            }
+            catch
+            {
+            }
+
+            PollServiceHttpRequest wreq;
+            m_retryRequests.Clear();
+
+            lock (m_slowRequests)
             {
-                if (m_requests.Count == 0)
-                    return;
+                while (m_slowRequests.Count > 0 && m_running)
+                    m_requests.Enqueue(m_slowRequests.Dequeue());
+            }
 
-//                m_log.DebugFormat("[POLL SERVICE REQUEST MANAGER]: Processing {0} requests", m_requests.Count);
+            while (m_requests.Count() > 0)
+            {
+                try
+                {
+                    wreq = m_requests.Dequeue(0);
+                    DoHTTPGruntWork(m_server,wreq,
+                        wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
+                }
+                catch
+                {
+                }
+            }
 
-                int reqperthread = (int) (m_requests.Count/m_WorkerThreadCount) + 1;
+            m_requests.Clear();
+        }
 
-                // For Each WorkerThread
-                for (int tc = 0; tc < m_WorkerThreadCount && m_requests.Count > 0; tc++)
+        // work threads
+
+        private void PoolWorkerJob()
+        {
+            while (m_running)
+            {
+                PollServiceHttpRequest req = m_requests.Dequeue(5000);
+
+                Watchdog.UpdateThread();
+                if (req != null)
                 {
-                    //Loop over number of requests each thread handles.
-                    for (int i = 0; i < reqperthread && m_requests.Count > 0; i++)
+                    try
                     {
-                        try
+                        if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
                         {
-                            m_PollServiceWorkerThreads[tc].Enqueue((PollServiceHttpRequest)m_requests.Dequeue());
+                            Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
+
+                            if (responsedata == null)
+                                continue;
+
+                            if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.Normal) // This is the event queue
+                            {
+                                try
+                                {
+                                    DoHTTPGruntWork(m_server, req, responsedata);
+                                }
+                                catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
+                                {
+                                    // Ignore it, no need to reply
+                                }
+                            }
+                            else
+                            {
+                                m_threadPool.QueueWorkItem(x =>
+                                {
+                                    try
+                                    {
+                                        DoHTTPGruntWork(m_server, req, responsedata);
+                                    }
+                                    catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
+                                    {
+                                        // Ignore it, no need to reply
+                                    }
+
+                                    return null;
+                                }, null);
+                            }
                         }
-                        catch (InvalidOperationException)
+                        else
                         {
-                            // The queue is empty, we did our calculations wrong!
-                            return;
+                            if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
+                            {
+                                DoHTTPGruntWork(m_server, req, 
+                                    req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
+                            }
+                            else
+                            {
+                                ReQueueEvent(req);
+                            }
                         }
-                        
+                    }
+                    catch (Exception e)
+                    {
+                        m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
                     }
                 }
             }
-            
         }
 
-        public void Stop()
+        // DoHTTPGruntWork  changed, not sending response
+        // do the same work around as core
+
+        internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata)
         {
-            m_running = false;
+            OSHttpResponse response
+                = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext);
 
-            foreach (object o in m_requests)
-            {
-                PollServiceHttpRequest req = (PollServiceHttpRequest) o;
-                PollServiceWorkerThread.DoHTTPGruntWork(
-                    m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
-            }
+            byte[] buffer = server.DoHTTPGruntWork(responsedata, response);
 
-            m_requests.Clear();
+            response.SendChunked = false;
+            response.ContentLength64 = buffer.Length;
+            response.ContentEncoding = Encoding.UTF8;
 
-            foreach (Thread t in m_workerThreads)
+            try
+            {
+                response.OutputStream.Write(buffer, 0, buffer.Length);
+            }
+            catch (Exception ex)
             {
-                t.Abort();
+                m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex));
+            }
+            finally
+            {
+                //response.OutputStream.Close();
+                try
+                {
+                    response.OutputStream.Flush();
+                    response.Send();
+
+                    //if (!response.KeepAlive && response.ReuseContext)
+                    //    response.FreeContext();
+                }
+                catch (Exception e)
+                {
+                    m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e));
+                }
             }
         }
     }
-}
\ No newline at end of file
+}
+
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceWorkerThread.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceWorkerThread.cs
deleted file mode 100644
index 5adbcd1..0000000
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceWorkerThread.cs
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Copyright (c) Contributors, http://opensimulator.org/
- * See CONTRIBUTORS.TXT for a full list of copyright holders.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *     * Redistributions of source code must retain the above copyright
- *       notice, this list of conditions and the following disclaimer.
- *     * Redistributions in binary form must reproduce the above copyright
- *       notice, this list of conditions and the following disclaimer in the
- *       documentation and/or other materials provided with the distribution.
- *     * Neither the name of the OpenSimulator Project nor the
- *       names of its contributors may be used to endorse or promote products
- *       derived from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
- * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
- * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-using System;
-using System.Collections;
-using System.Collections.Generic;
-using System.IO;
-using System.Text;
-using HttpServer;
-using OpenMetaverse;
-using System.Reflection;
-using log4net;
-using OpenSim.Framework.Monitoring;
-
-namespace OpenSim.Framework.Servers.HttpServer
-{
-    public delegate void ReQueuePollServiceItem(PollServiceHttpRequest req);
-    
-    public class PollServiceWorkerThread
-    {
-        private static readonly ILog m_log =
-                LogManager.GetLogger(
-                MethodBase.GetCurrentMethod().DeclaringType);
-
-        public event ReQueuePollServiceItem ReQueue;
-
-        private readonly BaseHttpServer m_server;
-        private BlockingQueue<PollServiceHttpRequest> m_request;
-        private bool m_running = true;
-        private int m_timeout = 250;
-
-        public PollServiceWorkerThread(BaseHttpServer pSrv, int pTimeout)
-        {
-            m_request = new BlockingQueue<PollServiceHttpRequest>();
-            m_server = pSrv;
-            m_timeout = pTimeout;
-        }
-
-        public void ThreadStart()
-        {
-            Run();
-        }
-
-        public void Run()
-        {
-            while (m_running)
-            {
-                PollServiceHttpRequest req = m_request.Dequeue();
-
-                Watchdog.UpdateThread();
-                
-                try
-                {
-                    if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
-                    {
-                        StreamReader str;
-                        try
-                        {
-                            str = new StreamReader(req.Request.Body);
-                        }
-                        catch (System.ArgumentException)
-                        {
-                            // Stream was not readable means a child agent
-                            // was closed due to logout, leaving the
-                            // Event Queue request orphaned.
-                            continue;
-                        }
-
-                        Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id, str.ReadToEnd());
-                        DoHTTPGruntWork(m_server, req, responsedata);
-                    }
-                    else
-                    {
-                        if ((Environment.TickCount - req.RequestTime) > m_timeout)
-                        {
-                            DoHTTPGruntWork(
-                                m_server,
-                                req,
-                                req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
-                        }
-                        else
-                        {
-                            ReQueuePollServiceItem reQueueItem = ReQueue;
-                            if (reQueueItem != null)
-                                reQueueItem(req);
-                        }
-                    }
-                }
-                catch (Exception e)
-                {
-                    m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
-                }
-            }
-        }
-
-        internal void Enqueue(PollServiceHttpRequest pPollServiceHttpRequest)
-        {
-            m_request.Enqueue(pPollServiceHttpRequest);
-        }
-
-        /// <summary>
-        /// FIXME: This should be part of BaseHttpServer
-        /// </summary>
-        internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata)
-        {
-            OSHttpResponse response
-                = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext);
-
-            byte[] buffer = server.DoHTTPGruntWork(responsedata, response);
-
-            response.SendChunked = false;
-            response.ContentLength64 = buffer.Length;
-            response.ContentEncoding = Encoding.UTF8;
-
-            try
-            {
-                response.OutputStream.Write(buffer, 0, buffer.Length);
-            }
-            catch (Exception ex)
-            {
-                m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex));
-            }
-            finally
-            {
-                //response.OutputStream.Close();
-                try
-                {
-                    response.OutputStream.Flush();
-                    response.Send();
-
-                    //if (!response.KeepAlive && response.ReuseContext)
-                    //    response.FreeContext();
-                }
-                catch (Exception e)
-                {
-                    m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e));
-                }
-            }
-        }
-    }
-}
\ No newline at end of file
-- 
cgit v1.1