aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r--OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs361
1 files changed, 179 insertions, 182 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index 28bba70..0e4323a 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -44,183 +44,199 @@ namespace OpenSim.Framework.Servers.HttpServer
44 { 44 {
45 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); 45 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
46 46
47 /// <summary>
48 /// Is the poll service request manager running?
49 /// </summary>
50 /// <remarks>
51 /// Can be running either synchronously or asynchronously
52 /// </remarks>
53 public bool IsRunning { get; private set; }
54
55 /// <summary>
56 /// Is the poll service performing responses asynchronously (with its own threads) or synchronously (via
57 /// external calls)?
58 /// </summary>
59 public bool PerformResponsesAsync { get; private set; }
60
61 /// <summary>
62 /// Number of responses actually processed and sent to viewer (or aborted due to error).
63 /// </summary>
64 public int ResponsesProcessed { get; private set; }
65
66 private readonly BaseHttpServer m_server; 47 private readonly BaseHttpServer m_server;
67 48
49 private Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>> m_bycontext;
68 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); 50 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
69 private static List<PollServiceHttpRequest> m_longPollRequests = new List<PollServiceHttpRequest>(); 51 private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>();
52 private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
70 53
71 private uint m_WorkerThreadCount = 0; 54 private uint m_WorkerThreadCount = 0;
72 private Thread[] m_workerThreads; 55 private Thread[] m_workerThreads;
56 private Thread m_retrysThread;
57
58 private bool m_running = false;
59 private int slowCount = 0;
73 60
74 private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); 61 private SmartThreadPool m_threadPool;
75 62
76// private int m_timeout = 1000; // increase timeout 250; now use the event one
77 63
78 public PollServiceRequestManager( 64 public PollServiceRequestManager(
79 BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout) 65 BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout)
80 { 66 {
81 m_server = pSrv; 67 m_server = pSrv;
82 PerformResponsesAsync = performResponsesAsync;
83 m_WorkerThreadCount = pWorkerThreadCount; 68 m_WorkerThreadCount = pWorkerThreadCount;
84 m_workerThreads = new Thread[m_WorkerThreadCount]; 69 m_workerThreads = new Thread[m_WorkerThreadCount];
85 70
86 StatsManager.RegisterStat( 71 PollServiceHttpRequestComparer preqCp = new PollServiceHttpRequestComparer();
87 new Stat( 72 m_bycontext = new Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>>(preqCp);
88 "QueuedPollResponses", 73
89 "Number of poll responses queued for processing.", 74 STPStartInfo startInfo = new STPStartInfo();
90 "", 75 startInfo.IdleTimeout = 30000;
91 "", 76 startInfo.MaxWorkerThreads = 15;
92 "httpserver", 77 startInfo.MinWorkerThreads = 1;
93 m_server.Port.ToString(), 78 startInfo.ThreadPriority = ThreadPriority.Normal;
94 StatType.Pull, 79 startInfo.StartSuspended = true;
95 MeasuresOfInterest.AverageChangeOverTime, 80 startInfo.ThreadPoolName = "PoolService";
96 stat => stat.Value = m_requests.Count(), 81
97 StatVerbosity.Debug)); 82 m_threadPool = new SmartThreadPool(startInfo);
98 83
99 StatsManager.RegisterStat(
100 new Stat(
101 "ProcessedPollResponses",
102 "Number of poll responses processed.",
103 "",
104 "",
105 "httpserver",
106 m_server.Port.ToString(),
107 StatType.Pull,
108 MeasuresOfInterest.AverageChangeOverTime,
109 stat => stat.Value = ResponsesProcessed,
110 StatVerbosity.Debug));
111 } 84 }
112 85
113 public void Start() 86 public void Start()
114 { 87 {
115 IsRunning = true; 88 m_running = true;
116 89 m_threadPool.Start();
117 if (PerformResponsesAsync) 90 //startup worker threads
91 for (uint i = 0; i < m_WorkerThreadCount; i++)
118 { 92 {
119 //startup worker threads 93 m_workerThreads[i]
120 for (uint i = 0; i < m_WorkerThreadCount; i++) 94 = WorkManager.StartThread(
121 { 95 PoolWorkerJob,
122 m_workerThreads[i] 96 string.Format("PollServiceWorkerThread {0}:{1}", i, m_server.Port),
123 = WorkManager.StartThread( 97 ThreadPriority.Normal,
124 PoolWorkerJob, 98 false,
125 string.Format("PollServiceWorkerThread{0}:{1}", i, m_server.Port), 99 false,
126 ThreadPriority.Normal, 100 null,
127 false, 101 int.MaxValue);
128 false,
129 null,
130 int.MaxValue);
131 }
132
133 WorkManager.StartThread(
134 this.CheckLongPollThreads,
135 string.Format("LongPollServiceWatcherThread:{0}", m_server.Port),
136 ThreadPriority.Normal,
137 false,
138 true,
139 null,
140 1000 * 60 * 10);
141 } 102 }
103
104 m_retrysThread = WorkManager.StartThread(
105 this.CheckRetries,
106 string.Format("PollServiceWatcherThread:{0}", m_server.Port),
107 ThreadPriority.Normal,
108 false,
109 true,
110 null,
111 1000 * 60 * 10);
112
113
142 } 114 }
143 115
144 private void ReQueueEvent(PollServiceHttpRequest req) 116 private void ReQueueEvent(PollServiceHttpRequest req)
145 { 117 {
146 if (IsRunning) 118 if (m_running)
147 { 119 {
148 // delay the enqueueing for 100ms. There's no need to have the event 120 lock (m_retryRequests)
149 // actively on the queue 121 m_retryRequests.Enqueue(req);
150 Timer t = new Timer(self => {
151 ((Timer)self).Dispose();
152 m_requests.Enqueue(req);
153 });
154
155 t.Change(100, Timeout.Infinite);
156
157 } 122 }
158 } 123 }
159 124
160 public void Enqueue(PollServiceHttpRequest req) 125 public void Enqueue(PollServiceHttpRequest req)
161 { 126 {
162 if (IsRunning) 127 lock (m_bycontext)
163 { 128 {
164 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) 129 Queue<PollServiceHttpRequest> ctxQeueue;
130 if (m_bycontext.TryGetValue(req, out ctxQeueue))
165 { 131 {
166 lock (m_longPollRequests) 132 ctxQeueue.Enqueue(req);
167 m_longPollRequests.Add(req);
168 } 133 }
169 else 134 else
170 m_requests.Enqueue(req); 135 {
136 ctxQeueue = new Queue<PollServiceHttpRequest>();
137 m_bycontext[req] = ctxQeueue;
138 EnqueueInt(req);
139 }
171 } 140 }
172 } 141 }
173 142
174 private void CheckLongPollThreads() 143 public void byContextDequeue(PollServiceHttpRequest req)
175 { 144 {
176 // The only purpose of this thread is to check the EQs for events. 145 Queue<PollServiceHttpRequest> ctxQeueue;
177 // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests. 146 lock (m_bycontext)
178 // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests.
179 // All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature,
180 // so if they aren't ready to be served by a worker thread (no events), they are placed
181 // directly back in the "ready-to-serve" queue by the worker thread.
182 while (IsRunning)
183 { 147 {
184 Thread.Sleep(500); 148 if (m_bycontext.TryGetValue(req, out ctxQeueue))
185 Watchdog.UpdateThread();
186
187// List<PollServiceHttpRequest> not_ready = new List<PollServiceHttpRequest>();
188 lock (m_longPollRequests)
189 { 149 {
190 if (m_longPollRequests.Count > 0 && IsRunning) 150 if (ctxQeueue.Count > 0)
191 { 151 {
192 List<PollServiceHttpRequest> ready = m_longPollRequests.FindAll(req => 152 PollServiceHttpRequest newreq = ctxQeueue.Dequeue();
193 (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ 153 EnqueueInt(newreq);
194 (Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout 154 }
195 ); 155 else
156 {
157 m_bycontext.Remove(req);
158 }
159 }
160 }
161 }
196 162
197 ready.ForEach(req =>
198 {
199 m_requests.Enqueue(req);
200 m_longPollRequests.Remove(req);
201 });
202 163
203 } 164 public void EnqueueInt(PollServiceHttpRequest req)
165 {
166 if (m_running)
167 {
168 if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.LongPoll)
169 {
170 m_requests.Enqueue(req);
171 }
172 else
173 {
174 lock (m_slowRequests)
175 m_slowRequests.Enqueue(req);
176 }
177 }
178 }
179
180 private void CheckRetries()
181 {
182 while (m_running)
183
184 {
185 Thread.Sleep(100); // let the world move .. back to faster rate
186 Watchdog.UpdateThread();
187 lock (m_retryRequests)
188 {
189 while (m_retryRequests.Count > 0 && m_running)
190 m_requests.Enqueue(m_retryRequests.Dequeue());
191 }
192 slowCount++;
193 if (slowCount >= 10)
194 {
195 slowCount = 0;
204 196
197 lock (m_slowRequests)
198 {
199 while (m_slowRequests.Count > 0 && m_running)
200 m_requests.Enqueue(m_slowRequests.Dequeue());
201 }
205 } 202 }
206 } 203 }
207 } 204 }
208 205
209 public void Stop() 206 public void Stop()
210 { 207 {
211 IsRunning = false; 208 m_running = false;
212// m_timeout = -10000; // cause all to expire 209
213 Thread.Sleep(1000); // let the world move 210 Thread.Sleep(1000); // let the world move
214 211
215 foreach (Thread t in m_workerThreads) 212 foreach (Thread t in m_workerThreads)
216 Watchdog.AbortThread(t.ManagedThreadId); 213 Watchdog.AbortThread(t.ManagedThreadId);
217 214
215 // any entry in m_bycontext should have a active request on the other queues
216 // so just delete contents to easy GC
217 foreach (Queue<PollServiceHttpRequest> qu in m_bycontext.Values)
218 qu.Clear();
219 m_bycontext.Clear();
220
221 try
222 {
223 foreach (PollServiceHttpRequest req in m_retryRequests)
224 {
225 req.DoHTTPstop(m_server);
226 }
227 }
228 catch
229 {
230 }
231
218 PollServiceHttpRequest wreq; 232 PollServiceHttpRequest wreq;
233 m_retryRequests.Clear();
219 234
220 lock (m_longPollRequests) 235 lock (m_slowRequests)
221 { 236 {
222 if (m_longPollRequests.Count > 0 && IsRunning) 237 while (m_slowRequests.Count > 0)
223 m_longPollRequests.ForEach(req => m_requests.Enqueue(req)); 238 m_requests.Enqueue(m_slowRequests.Dequeue());
239
224 } 240 }
225 241
226 while (m_requests.Count() > 0) 242 while (m_requests.Count() > 0)
@@ -228,16 +244,14 @@ namespace OpenSim.Framework.Servers.HttpServer
228 try 244 try
229 { 245 {
230 wreq = m_requests.Dequeue(0); 246 wreq = m_requests.Dequeue(0);
231 ResponsesProcessed++; 247 wreq.DoHTTPstop(m_server);
232 wreq.DoHTTPGruntWork( 248
233 m_server, wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
234 } 249 }
235 catch 250 catch
236 { 251 {
237 } 252 }
238 } 253 }
239 254
240 m_longPollRequests.Clear();
241 m_requests.Clear(); 255 m_requests.Clear();
242 } 256 }
243 257
@@ -245,87 +259,70 @@ namespace OpenSim.Framework.Servers.HttpServer
245 259
246 private void PoolWorkerJob() 260 private void PoolWorkerJob()
247 { 261 {
248 while (IsRunning) 262 while (m_running)
249 { 263 {
250 Watchdog.UpdateThread(); 264 PollServiceHttpRequest req = m_requests.Dequeue(5000);
251 WaitPerformResponse();
252 }
253 }
254
255 public void WaitPerformResponse()
256 {
257 PollServiceHttpRequest req = m_requests.Dequeue(5000);
258// m_log.DebugFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString()));
259 265
260 if (req != null) 266 Watchdog.UpdateThread();
261 { 267 if (req != null)
262 try
263 { 268 {
264 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) 269 try
265 { 270 {
266 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); 271 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
267
268 if (responsedata == null)
269 return;
270
271 // This is the event queue.
272 // Even if we're not running we can still perform responses by explicit request.
273 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll
274 || !PerformResponsesAsync)
275 { 272 {
276 try 273 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
277 { 274
278 ResponsesProcessed++; 275 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) // This is the event queue
279 req.DoHTTPGruntWork(m_server, responsedata);
280 }
281 catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream
282 {
283 // Ignore it, no need to reply
284 m_log.Error(e);
285 }
286 }
287 else
288 {
289 m_threadPool.QueueWorkItem(x =>
290 { 276 {
291 try 277 try
292 { 278 {
293 ResponsesProcessed++;
294 req.DoHTTPGruntWork(m_server, responsedata); 279 req.DoHTTPGruntWork(m_server, responsedata);
280 byContextDequeue(req);
295 } 281 }
296 catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream 282 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
297 { 283 {
298 // Ignore it, no need to reply 284 // Ignore it, no need to reply
299 m_log.Error(e);
300 } 285 }
301 catch (Exception e) 286 }
287 else
288 {
289 m_threadPool.QueueWorkItem(x =>
302 { 290 {
303 m_log.Error(e); 291 try
304 } 292 {
305 293 req.DoHTTPGruntWork(m_server, responsedata);
306 return null; 294 byContextDequeue(req);
307 }, null); 295 }
308 } 296 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
309 } 297 {
310 else 298 // Ignore it, no need to reply
311 { 299 }
312 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) 300
313 { 301 return null;
314 ResponsesProcessed++; 302 }, null);
315 req.DoHTTPGruntWork( 303 }
316 m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
317 } 304 }
318 else 305 else
319 { 306 {
320 ReQueueEvent(req); 307 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
308 {
309 req.DoHTTPGruntWork(m_server,
310 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
311 byContextDequeue(req);
312 }
313 else
314 {
315 ReQueueEvent(req);
316 }
321 } 317 }
322 } 318 }
323 } 319 catch (Exception e)
324 catch (Exception e) 320 {
325 { 321 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
326 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); 322 }
327 } 323 }
328 } 324 }
329 } 325 }
326
330 } 327 }
331} \ No newline at end of file 328}