aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r--OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs318
1 files changed, 242 insertions, 76 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index 3e84c55..28bba70 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -33,132 +33,298 @@ using log4net;
33using HttpServer; 33using HttpServer;
34using OpenSim.Framework; 34using OpenSim.Framework;
35using OpenSim.Framework.Monitoring; 35using OpenSim.Framework.Monitoring;
36using Amib.Threading;
37using System.IO;
38using System.Text;
39using System.Collections.Generic;
36 40
37namespace OpenSim.Framework.Servers.HttpServer 41namespace OpenSim.Framework.Servers.HttpServer
38{ 42{
39 public class PollServiceRequestManager 43 public class PollServiceRequestManager
40 { 44 {
41// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); 45 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
46
47 /// <summary>
48 /// Is the poll service request manager running?
49 /// </summary>
50 /// <remarks>
51 /// Can be running either synchronously or asynchronously
52 /// </remarks>
53 public bool IsRunning { get; private set; }
54
55 /// <summary>
56 /// Is the poll service performing responses asynchronously (with its own threads) or synchronously (via
57 /// external calls)?
58 /// </summary>
59 public bool PerformResponsesAsync { get; private set; }
60
61 /// <summary>
62 /// Number of responses actually processed and sent to viewer (or aborted due to error).
63 /// </summary>
64 public int ResponsesProcessed { get; private set; }
42 65
43 private readonly BaseHttpServer m_server; 66 private readonly BaseHttpServer m_server;
44 private static Queue m_requests = Queue.Synchronized(new Queue()); 67
68 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
69 private static List<PollServiceHttpRequest> m_longPollRequests = new List<PollServiceHttpRequest>();
70
45 private uint m_WorkerThreadCount = 0; 71 private uint m_WorkerThreadCount = 0;
46 private Thread[] m_workerThreads; 72 private Thread[] m_workerThreads;
47 private PollServiceWorkerThread[] m_PollServiceWorkerThreads;
48 private volatile bool m_running = true;
49 private int m_pollTimeout;
50 73
51 public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) 74 private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2);
75
76// private int m_timeout = 1000; // increase timeout 250; now use the event one
77
78 public PollServiceRequestManager(
79 BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout)
52 { 80 {
53 m_server = pSrv; 81 m_server = pSrv;
82 PerformResponsesAsync = performResponsesAsync;
54 m_WorkerThreadCount = pWorkerThreadCount; 83 m_WorkerThreadCount = pWorkerThreadCount;
55 m_pollTimeout = pTimeout; 84 m_workerThreads = new Thread[m_WorkerThreadCount];
85
86 StatsManager.RegisterStat(
87 new Stat(
88 "QueuedPollResponses",
89 "Number of poll responses queued for processing.",
90 "",
91 "",
92 "httpserver",
93 m_server.Port.ToString(),
94 StatType.Pull,
95 MeasuresOfInterest.AverageChangeOverTime,
96 stat => stat.Value = m_requests.Count(),
97 StatVerbosity.Debug));
98
99 StatsManager.RegisterStat(
100 new Stat(
101 "ProcessedPollResponses",
102 "Number of poll responses processed.",
103 "",
104 "",
105 "httpserver",
106 m_server.Port.ToString(),
107 StatType.Pull,
108 MeasuresOfInterest.AverageChangeOverTime,
109 stat => stat.Value = ResponsesProcessed,
110 StatVerbosity.Debug));
56 } 111 }
57 112
58 public void Start() 113 public void Start()
59 { 114 {
60 m_running = true; 115 IsRunning = true;
61 m_workerThreads = new Thread[m_WorkerThreadCount];
62 m_PollServiceWorkerThreads = new PollServiceWorkerThread[m_WorkerThreadCount];
63 116
64 //startup worker threads 117 if (PerformResponsesAsync)
65 for (uint i = 0; i < m_WorkerThreadCount; i++)
66 { 118 {
67 m_PollServiceWorkerThreads[i] = new PollServiceWorkerThread(m_server, m_pollTimeout); 119 //startup worker threads
68 m_PollServiceWorkerThreads[i].ReQueue += ReQueueEvent; 120 for (uint i = 0; i < m_WorkerThreadCount; i++)
69 121 {
70 m_workerThreads[i] 122 m_workerThreads[i]
71 = Watchdog.StartThread( 123 = WorkManager.StartThread(
72 m_PollServiceWorkerThreads[i].ThreadStart, 124 PoolWorkerJob,
73 String.Format("PollServiceWorkerThread{0}", i), 125 string.Format("PollServiceWorkerThread{0}:{1}", i, m_server.Port),
74 ThreadPriority.Normal, 126 ThreadPriority.Normal,
75 false, 127 false,
76 true, 128 false,
77 null, 129 null,
78 int.MaxValue); 130 int.MaxValue);
79 } 131 }
80 132
81 Watchdog.StartThread( 133 WorkManager.StartThread(
82 this.ThreadStart, 134 this.CheckLongPollThreads,
83 "PollServiceWatcherThread", 135 string.Format("LongPollServiceWatcherThread:{0}", m_server.Port),
84 ThreadPriority.Normal, 136 ThreadPriority.Normal,
85 false, 137 false,
86 true, 138 true,
87 null, 139 null,
88 1000 * 60 * 10); 140 1000 * 60 * 10);
141 }
89 } 142 }
90 143
91 internal void ReQueueEvent(PollServiceHttpRequest req) 144 private void ReQueueEvent(PollServiceHttpRequest req)
92 { 145 {
93 // Do accounting stuff here 146 if (IsRunning)
94 Enqueue(req); 147 {
95 } 148 // delay the enqueueing for 100ms. There's no need to have the event
149 // actively on the queue
150 Timer t = new Timer(self => {
151 ((Timer)self).Dispose();
152 m_requests.Enqueue(req);
153 });
96 154
97 public void Enqueue(PollServiceHttpRequest req) 155 t.Change(100, Timeout.Infinite);
98 { 156
99 lock (m_requests) 157 }
100 m_requests.Enqueue(req);
101 } 158 }
102 159
103 public void ThreadStart() 160 public void Enqueue(PollServiceHttpRequest req)
104 { 161 {
105 while (m_running) 162 if (IsRunning)
106 { 163 {
107 Watchdog.UpdateThread(); 164 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll)
108 ProcessQueuedRequests(); 165 {
109 Thread.Sleep(1000); 166 lock (m_longPollRequests)
167 m_longPollRequests.Add(req);
168 }
169 else
170 m_requests.Enqueue(req);
110 } 171 }
111 } 172 }
112 173
113 private void ProcessQueuedRequests() 174 private void CheckLongPollThreads()
114 { 175 {
115 lock (m_requests) 176 // The only purpose of this thread is to check the EQs for events.
177 // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests.
178 // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests.
179 // All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature,
180 // so if they aren't ready to be served by a worker thread (no events), they are placed
181 // directly back in the "ready-to-serve" queue by the worker thread.
182 while (IsRunning)
116 { 183 {
117 if (m_requests.Count == 0) 184 Thread.Sleep(500);
118 return; 185 Watchdog.UpdateThread();
119
120// m_log.DebugFormat("[POLL SERVICE REQUEST MANAGER]: Processing {0} requests", m_requests.Count);
121
122 int reqperthread = (int) (m_requests.Count/m_WorkerThreadCount) + 1;
123 186
124 // For Each WorkerThread 187// List<PollServiceHttpRequest> not_ready = new List<PollServiceHttpRequest>();
125 for (int tc = 0; tc < m_WorkerThreadCount && m_requests.Count > 0; tc++) 188 lock (m_longPollRequests)
126 { 189 {
127 //Loop over number of requests each thread handles. 190 if (m_longPollRequests.Count > 0 && IsRunning)
128 for (int i = 0; i < reqperthread && m_requests.Count > 0; i++)
129 { 191 {
130 try 192 List<PollServiceHttpRequest> ready = m_longPollRequests.FindAll(req =>
131 { 193 (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ
132 m_PollServiceWorkerThreads[tc].Enqueue((PollServiceHttpRequest)m_requests.Dequeue()); 194 (Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout
133 } 195 );
134 catch (InvalidOperationException) 196
135 { 197 ready.ForEach(req =>
136 // The queue is empty, we did our calculations wrong! 198 {
137 return; 199 m_requests.Enqueue(req);
138 } 200 m_longPollRequests.Remove(req);
139 201 });
202
140 } 203 }
204
141 } 205 }
142 } 206 }
143
144 } 207 }
145 208
146 public void Stop() 209 public void Stop()
147 { 210 {
148 m_running = false; 211 IsRunning = false;
212// m_timeout = -10000; // cause all to expire
213 Thread.Sleep(1000); // let the world move
214
215 foreach (Thread t in m_workerThreads)
216 Watchdog.AbortThread(t.ManagedThreadId);
217
218 PollServiceHttpRequest wreq;
149 219
150 foreach (object o in m_requests) 220 lock (m_longPollRequests)
151 { 221 {
152 PollServiceHttpRequest req = (PollServiceHttpRequest) o; 222 if (m_longPollRequests.Count > 0 && IsRunning)
153 PollServiceWorkerThread.DoHTTPGruntWork( 223 m_longPollRequests.ForEach(req => m_requests.Enqueue(req));
154 m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
155 } 224 }
156 225
226 while (m_requests.Count() > 0)
227 {
228 try
229 {
230 wreq = m_requests.Dequeue(0);
231 ResponsesProcessed++;
232 wreq.DoHTTPGruntWork(
233 m_server, wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
234 }
235 catch
236 {
237 }
238 }
239
240 m_longPollRequests.Clear();
157 m_requests.Clear(); 241 m_requests.Clear();
242 }
158 243
159 foreach (Thread t in m_workerThreads) 244 // work threads
245
246 private void PoolWorkerJob()
247 {
248 while (IsRunning)
160 { 249 {
161 t.Abort(); 250 Watchdog.UpdateThread();
251 WaitPerformResponse();
252 }
253 }
254
255 public void WaitPerformResponse()
256 {
257 PollServiceHttpRequest req = m_requests.Dequeue(5000);
258// m_log.DebugFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString()));
259
260 if (req != null)
261 {
262 try
263 {
264 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
265 {
266 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
267
268 if (responsedata == null)
269 return;
270
271 // This is the event queue.
272 // Even if we're not running we can still perform responses by explicit request.
273 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll
274 || !PerformResponsesAsync)
275 {
276 try
277 {
278 ResponsesProcessed++;
279 req.DoHTTPGruntWork(m_server, responsedata);
280 }
281 catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream
282 {
283 // Ignore it, no need to reply
284 m_log.Error(e);
285 }
286 }
287 else
288 {
289 m_threadPool.QueueWorkItem(x =>
290 {
291 try
292 {
293 ResponsesProcessed++;
294 req.DoHTTPGruntWork(m_server, responsedata);
295 }
296 catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream
297 {
298 // Ignore it, no need to reply
299 m_log.Error(e);
300 }
301 catch (Exception e)
302 {
303 m_log.Error(e);
304 }
305
306 return null;
307 }, null);
308 }
309 }
310 else
311 {
312 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
313 {
314 ResponsesProcessed++;
315 req.DoHTTPGruntWork(
316 m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
317 }
318 else
319 {
320 ReQueueEvent(req);
321 }
322 }
323 }
324 catch (Exception e)
325 {
326 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
327 }
162 } 328 }
163 } 329 }
164 } 330 }