From 134f86e8d5c414409631b25b8c6f0ee45fbd8631 Mon Sep 17 00:00:00 2001
From: David Walter Seikel
Date: Thu, 3 Nov 2016 21:44:39 +1000
Subject: Initial update to OpenSim 0.8.2.1 source code.
---
.../HttpServer/PollServiceRequestManager.cs | 318 ++++++++++++++++-----
1 file changed, 242 insertions(+), 76 deletions(-)
(limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index 3e84c55..28bba70 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -33,132 +33,298 @@ using log4net;
using HttpServer;
using OpenSim.Framework;
using OpenSim.Framework.Monitoring;
+using Amib.Threading;
+using System.IO;
+using System.Text;
+using System.Collections.Generic;
namespace OpenSim.Framework.Servers.HttpServer
{
public class PollServiceRequestManager
{
-// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+ private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ ///
+ /// Is the poll service request manager running?
+ ///
+ ///
+ /// Can be running either synchronously or asynchronously
+ ///
+ public bool IsRunning { get; private set; }
+
+ ///
+ /// Is the poll service performing responses asynchronously (with its own threads) or synchronously (via
+ /// external calls)?
+ ///
+ public bool PerformResponsesAsync { get; private set; }
+
+ ///
+ /// Number of responses actually processed and sent to viewer (or aborted due to error).
+ ///
+ public int ResponsesProcessed { get; private set; }
private readonly BaseHttpServer m_server;
- private static Queue m_requests = Queue.Synchronized(new Queue());
+
+ private BlockingQueue m_requests = new BlockingQueue();
+ private static List m_longPollRequests = new List();
+
private uint m_WorkerThreadCount = 0;
private Thread[] m_workerThreads;
- private PollServiceWorkerThread[] m_PollServiceWorkerThreads;
- private volatile bool m_running = true;
- private int m_pollTimeout;
- public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout)
+ private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2);
+
+// private int m_timeout = 1000; // increase timeout 250; now use the event one
+
+ public PollServiceRequestManager(
+ BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout)
{
m_server = pSrv;
+ PerformResponsesAsync = performResponsesAsync;
m_WorkerThreadCount = pWorkerThreadCount;
- m_pollTimeout = pTimeout;
+ m_workerThreads = new Thread[m_WorkerThreadCount];
+
+ StatsManager.RegisterStat(
+ new Stat(
+ "QueuedPollResponses",
+ "Number of poll responses queued for processing.",
+ "",
+ "",
+ "httpserver",
+ m_server.Port.ToString(),
+ StatType.Pull,
+ MeasuresOfInterest.AverageChangeOverTime,
+ stat => stat.Value = m_requests.Count(),
+ StatVerbosity.Debug));
+
+ StatsManager.RegisterStat(
+ new Stat(
+ "ProcessedPollResponses",
+ "Number of poll responses processed.",
+ "",
+ "",
+ "httpserver",
+ m_server.Port.ToString(),
+ StatType.Pull,
+ MeasuresOfInterest.AverageChangeOverTime,
+ stat => stat.Value = ResponsesProcessed,
+ StatVerbosity.Debug));
}
public void Start()
{
- m_running = true;
- m_workerThreads = new Thread[m_WorkerThreadCount];
- m_PollServiceWorkerThreads = new PollServiceWorkerThread[m_WorkerThreadCount];
+ IsRunning = true;
- //startup worker threads
- for (uint i = 0; i < m_WorkerThreadCount; i++)
+ if (PerformResponsesAsync)
{
- m_PollServiceWorkerThreads[i] = new PollServiceWorkerThread(m_server, m_pollTimeout);
- m_PollServiceWorkerThreads[i].ReQueue += ReQueueEvent;
-
- m_workerThreads[i]
- = Watchdog.StartThread(
- m_PollServiceWorkerThreads[i].ThreadStart,
- String.Format("PollServiceWorkerThread{0}", i),
- ThreadPriority.Normal,
- false,
- true,
- null,
- int.MaxValue);
- }
+ //startup worker threads
+ for (uint i = 0; i < m_WorkerThreadCount; i++)
+ {
+ m_workerThreads[i]
+ = WorkManager.StartThread(
+ PoolWorkerJob,
+ string.Format("PollServiceWorkerThread{0}:{1}", i, m_server.Port),
+ ThreadPriority.Normal,
+ false,
+ false,
+ null,
+ int.MaxValue);
+ }
- Watchdog.StartThread(
- this.ThreadStart,
- "PollServiceWatcherThread",
- ThreadPriority.Normal,
- false,
- true,
- null,
- 1000 * 60 * 10);
+ WorkManager.StartThread(
+ this.CheckLongPollThreads,
+ string.Format("LongPollServiceWatcherThread:{0}", m_server.Port),
+ ThreadPriority.Normal,
+ false,
+ true,
+ null,
+ 1000 * 60 * 10);
+ }
}
- internal void ReQueueEvent(PollServiceHttpRequest req)
+ private void ReQueueEvent(PollServiceHttpRequest req)
{
- // Do accounting stuff here
- Enqueue(req);
- }
+ if (IsRunning)
+ {
+ // delay the enqueueing for 100ms. There's no need to have the event
+ // actively on the queue
+ Timer t = new Timer(self => {
+ ((Timer)self).Dispose();
+ m_requests.Enqueue(req);
+ });
- public void Enqueue(PollServiceHttpRequest req)
- {
- lock (m_requests)
- m_requests.Enqueue(req);
+ t.Change(100, Timeout.Infinite);
+
+ }
}
- public void ThreadStart()
+ public void Enqueue(PollServiceHttpRequest req)
{
- while (m_running)
+ if (IsRunning)
{
- Watchdog.UpdateThread();
- ProcessQueuedRequests();
- Thread.Sleep(1000);
+ if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll)
+ {
+ lock (m_longPollRequests)
+ m_longPollRequests.Add(req);
+ }
+ else
+ m_requests.Enqueue(req);
}
}
- private void ProcessQueuedRequests()
+ private void CheckLongPollThreads()
{
- lock (m_requests)
+ // The only purpose of this thread is to check the EQs for events.
+ // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests.
+ // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests.
+ // All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature,
+ // so if they aren't ready to be served by a worker thread (no events), they are placed
+ // directly back in the "ready-to-serve" queue by the worker thread.
+ while (IsRunning)
{
- if (m_requests.Count == 0)
- return;
-
-// m_log.DebugFormat("[POLL SERVICE REQUEST MANAGER]: Processing {0} requests", m_requests.Count);
-
- int reqperthread = (int) (m_requests.Count/m_WorkerThreadCount) + 1;
+ Thread.Sleep(500);
+ Watchdog.UpdateThread();
- // For Each WorkerThread
- for (int tc = 0; tc < m_WorkerThreadCount && m_requests.Count > 0; tc++)
+// List not_ready = new List();
+ lock (m_longPollRequests)
{
- //Loop over number of requests each thread handles.
- for (int i = 0; i < reqperthread && m_requests.Count > 0; i++)
+ if (m_longPollRequests.Count > 0 && IsRunning)
{
- try
- {
- m_PollServiceWorkerThreads[tc].Enqueue((PollServiceHttpRequest)m_requests.Dequeue());
- }
- catch (InvalidOperationException)
- {
- // The queue is empty, we did our calculations wrong!
- return;
- }
-
+ List ready = m_longPollRequests.FindAll(req =>
+ (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ
+ (Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout
+ );
+
+ ready.ForEach(req =>
+ {
+ m_requests.Enqueue(req);
+ m_longPollRequests.Remove(req);
+ });
+
}
+
}
}
-
}
public void Stop()
{
- m_running = false;
+ IsRunning = false;
+// m_timeout = -10000; // cause all to expire
+ Thread.Sleep(1000); // let the world move
+
+ foreach (Thread t in m_workerThreads)
+ Watchdog.AbortThread(t.ManagedThreadId);
+
+ PollServiceHttpRequest wreq;
- foreach (object o in m_requests)
+ lock (m_longPollRequests)
{
- PollServiceHttpRequest req = (PollServiceHttpRequest) o;
- PollServiceWorkerThread.DoHTTPGruntWork(
- m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
+ if (m_longPollRequests.Count > 0 && IsRunning)
+ m_longPollRequests.ForEach(req => m_requests.Enqueue(req));
}
+ while (m_requests.Count() > 0)
+ {
+ try
+ {
+ wreq = m_requests.Dequeue(0);
+ ResponsesProcessed++;
+ wreq.DoHTTPGruntWork(
+ m_server, wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
+ }
+ catch
+ {
+ }
+ }
+
+ m_longPollRequests.Clear();
m_requests.Clear();
+ }
- foreach (Thread t in m_workerThreads)
+ // work threads
+
+ private void PoolWorkerJob()
+ {
+ while (IsRunning)
{
- t.Abort();
+ Watchdog.UpdateThread();
+ WaitPerformResponse();
+ }
+ }
+
+ public void WaitPerformResponse()
+ {
+ PollServiceHttpRequest req = m_requests.Dequeue(5000);
+// m_log.DebugFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString()));
+
+ if (req != null)
+ {
+ try
+ {
+ if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
+ {
+ Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
+
+ if (responsedata == null)
+ return;
+
+ // This is the event queue.
+ // Even if we're not running we can still perform responses by explicit request.
+ if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll
+ || !PerformResponsesAsync)
+ {
+ try
+ {
+ ResponsesProcessed++;
+ req.DoHTTPGruntWork(m_server, responsedata);
+ }
+ catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream
+ {
+ // Ignore it, no need to reply
+ m_log.Error(e);
+ }
+ }
+ else
+ {
+ m_threadPool.QueueWorkItem(x =>
+ {
+ try
+ {
+ ResponsesProcessed++;
+ req.DoHTTPGruntWork(m_server, responsedata);
+ }
+ catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream
+ {
+ // Ignore it, no need to reply
+ m_log.Error(e);
+ }
+ catch (Exception e)
+ {
+ m_log.Error(e);
+ }
+
+ return null;
+ }, null);
+ }
+ }
+ else
+ {
+ if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
+ {
+ ResponsesProcessed++;
+ req.DoHTTPGruntWork(
+ m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
+ }
+ else
+ {
+ ReQueueEvent(req);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
+ }
}
}
}
--
cgit v1.1