aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r--OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs327
1 files changed, 146 insertions, 181 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index 28bba70..c6a3e65 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -44,200 +44,183 @@ namespace OpenSim.Framework.Servers.HttpServer
44 { 44 {
45 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); 45 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
46 46
47 /// <summary>
48 /// Is the poll service request manager running?
49 /// </summary>
50 /// <remarks>
51 /// Can be running either synchronously or asynchronously
52 /// </remarks>
53 public bool IsRunning { get; private set; }
54
55 /// <summary>
56 /// Is the poll service performing responses asynchronously (with its own threads) or synchronously (via
57 /// external calls)?
58 /// </summary>
59 public bool PerformResponsesAsync { get; private set; }
60
61 /// <summary>
62 /// Number of responses actually processed and sent to viewer (or aborted due to error).
63 /// </summary>
64 public int ResponsesProcessed { get; private set; }
65
66 private readonly BaseHttpServer m_server; 47 private readonly BaseHttpServer m_server;
67 48
49 private Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>> m_bycontext;
68 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); 50 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
69 private static List<PollServiceHttpRequest> m_longPollRequests = new List<PollServiceHttpRequest>(); 51 private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
70 52
71 private uint m_WorkerThreadCount = 0; 53 private uint m_WorkerThreadCount = 0;
72 private Thread[] m_workerThreads; 54 private Thread[] m_workerThreads;
55 private Thread m_retrysThread;
73 56
74 private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); 57 private bool m_running = false;
75 58
76// private int m_timeout = 1000; // increase timeout 250; now use the event one 59 private SmartThreadPool m_threadPool;
77 60
78 public PollServiceRequestManager( 61 public PollServiceRequestManager(
79 BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout) 62 BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout)
80 { 63 {
81 m_server = pSrv; 64 m_server = pSrv;
82 PerformResponsesAsync = performResponsesAsync;
83 m_WorkerThreadCount = pWorkerThreadCount; 65 m_WorkerThreadCount = pWorkerThreadCount;
84 m_workerThreads = new Thread[m_WorkerThreadCount]; 66 m_workerThreads = new Thread[m_WorkerThreadCount];
85 67
86 StatsManager.RegisterStat( 68 PollServiceHttpRequestComparer preqCp = new PollServiceHttpRequestComparer();
87 new Stat( 69 m_bycontext = new Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>>(preqCp);
88 "QueuedPollResponses", 70
89 "Number of poll responses queued for processing.", 71 STPStartInfo startInfo = new STPStartInfo();
90 "", 72 startInfo.IdleTimeout = 30000;
91 "", 73 startInfo.MaxWorkerThreads = 20;
92 "httpserver", 74 startInfo.MinWorkerThreads = 1;
93 m_server.Port.ToString(), 75 startInfo.ThreadPriority = ThreadPriority.Normal;
94 StatType.Pull, 76 startInfo.StartSuspended = true;
95 MeasuresOfInterest.AverageChangeOverTime, 77 startInfo.ThreadPoolName = "PoolService";
96 stat => stat.Value = m_requests.Count(), 78
97 StatVerbosity.Debug)); 79 m_threadPool = new SmartThreadPool(startInfo);
98
99 StatsManager.RegisterStat(
100 new Stat(
101 "ProcessedPollResponses",
102 "Number of poll responses processed.",
103 "",
104 "",
105 "httpserver",
106 m_server.Port.ToString(),
107 StatType.Pull,
108 MeasuresOfInterest.AverageChangeOverTime,
109 stat => stat.Value = ResponsesProcessed,
110 StatVerbosity.Debug));
111 } 80 }
112 81
113 public void Start() 82 public void Start()
114 { 83 {
115 IsRunning = true; 84 m_running = true;
116 85 m_threadPool.Start();
117 if (PerformResponsesAsync) 86 //startup worker threads
87 for (uint i = 0; i < m_WorkerThreadCount; i++)
118 { 88 {
119 //startup worker threads 89 m_workerThreads[i]
120 for (uint i = 0; i < m_WorkerThreadCount; i++) 90 = WorkManager.StartThread(
121 { 91 PoolWorkerJob,
122 m_workerThreads[i] 92 string.Format("PollServiceWorkerThread {0}:{1}", i, m_server.Port),
123 = WorkManager.StartThread( 93 ThreadPriority.Normal,
124 PoolWorkerJob, 94 true,
125 string.Format("PollServiceWorkerThread{0}:{1}", i, m_server.Port), 95 false,
126 ThreadPriority.Normal, 96 null,
127 false, 97 int.MaxValue);
128 false,
129 null,
130 int.MaxValue);
131 }
132
133 WorkManager.StartThread(
134 this.CheckLongPollThreads,
135 string.Format("LongPollServiceWatcherThread:{0}", m_server.Port),
136 ThreadPriority.Normal,
137 false,
138 true,
139 null,
140 1000 * 60 * 10);
141 } 98 }
99
100 m_retrysThread = WorkManager.StartThread(
101 this.CheckRetries,
102 string.Format("PollServiceWatcherThread:{0}", m_server.Port),
103 ThreadPriority.Normal,
104 true,
105 true,
106 null,
107 1000 * 60 * 10);
108
109
142 } 110 }
143 111
144 private void ReQueueEvent(PollServiceHttpRequest req) 112 private void ReQueueEvent(PollServiceHttpRequest req)
145 { 113 {
146 if (IsRunning) 114 if (m_running)
147 { 115 {
148 // delay the enqueueing for 100ms. There's no need to have the event 116 lock (m_retryRequests)
149 // actively on the queue 117 m_retryRequests.Enqueue(req);
150 Timer t = new Timer(self => {
151 ((Timer)self).Dispose();
152 m_requests.Enqueue(req);
153 });
154
155 t.Change(100, Timeout.Infinite);
156
157 } 118 }
158 } 119 }
159 120
160 public void Enqueue(PollServiceHttpRequest req) 121 public void Enqueue(PollServiceHttpRequest req)
161 { 122 {
162 if (IsRunning) 123 lock (m_bycontext)
163 { 124 {
164 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) 125 Queue<PollServiceHttpRequest> ctxQeueue;
126 if (m_bycontext.TryGetValue(req, out ctxQeueue))
165 { 127 {
166 lock (m_longPollRequests) 128 ctxQeueue.Enqueue(req);
167 m_longPollRequests.Add(req);
168 } 129 }
169 else 130 else
170 m_requests.Enqueue(req); 131 {
132 ctxQeueue = new Queue<PollServiceHttpRequest>();
133 m_bycontext[req] = ctxQeueue;
134 EnqueueInt(req);
135 }
171 } 136 }
172 } 137 }
173 138
174 private void CheckLongPollThreads() 139 public void byContextDequeue(PollServiceHttpRequest req)
175 { 140 {
176 // The only purpose of this thread is to check the EQs for events. 141 Queue<PollServiceHttpRequest> ctxQeueue;
177 // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests. 142 lock (m_bycontext)
178 // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests.
179 // All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature,
180 // so if they aren't ready to be served by a worker thread (no events), they are placed
181 // directly back in the "ready-to-serve" queue by the worker thread.
182 while (IsRunning)
183 { 143 {
184 Thread.Sleep(500); 144 if (m_bycontext.TryGetValue(req, out ctxQeueue))
185 Watchdog.UpdateThread();
186
187// List<PollServiceHttpRequest> not_ready = new List<PollServiceHttpRequest>();
188 lock (m_longPollRequests)
189 { 145 {
190 if (m_longPollRequests.Count > 0 && IsRunning) 146 if (ctxQeueue.Count > 0)
147 {
148 PollServiceHttpRequest newreq = ctxQeueue.Dequeue();
149 EnqueueInt(newreq);
150 }
151 else
191 { 152 {
192 List<PollServiceHttpRequest> ready = m_longPollRequests.FindAll(req => 153 m_bycontext.Remove(req);
193 (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ 154 }
194 (Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout 155 }
195 ); 156 }
157 }
196 158
197 ready.ForEach(req => 159 public void EnqueueInt(PollServiceHttpRequest req)
198 { 160 {
199 m_requests.Enqueue(req); 161 if (m_running)
200 m_longPollRequests.Remove(req); 162 m_requests.Enqueue(req);
201 }); 163 }
202 164
203 } 165 private void CheckRetries()
166 {
167 while (m_running)
204 168
169 {
170 Thread.Sleep(100); // let the world move .. back to faster rate
171 Watchdog.UpdateThread();
172 lock (m_retryRequests)
173 {
174 while (m_retryRequests.Count > 0 && m_running)
175 m_requests.Enqueue(m_retryRequests.Dequeue());
205 } 176 }
206 } 177 }
207 } 178 }
208 179
209 public void Stop() 180 public void Stop()
210 { 181 {
211 IsRunning = false; 182 m_running = false;
212// m_timeout = -10000; // cause all to expire 183
213 Thread.Sleep(1000); // let the world move 184 Thread.Sleep(100); // let the world move
214 185
215 foreach (Thread t in m_workerThreads) 186 foreach (Thread t in m_workerThreads)
216 Watchdog.AbortThread(t.ManagedThreadId); 187 Watchdog.AbortThread(t.ManagedThreadId);
217 188
218 PollServiceHttpRequest wreq; 189 m_threadPool.Shutdown();
190
191 // any entry in m_bycontext should have a active request on the other queues
192 // so just delete contents to easy GC
193 foreach (Queue<PollServiceHttpRequest> qu in m_bycontext.Values)
194 qu.Clear();
195 m_bycontext.Clear();
219 196
220 lock (m_longPollRequests) 197 try
198 {
199 foreach (PollServiceHttpRequest req in m_retryRequests)
200 {
201 req.DoHTTPstop(m_server);
202 }
203 }
204 catch
221 { 205 {
222 if (m_longPollRequests.Count > 0 && IsRunning)
223 m_longPollRequests.ForEach(req => m_requests.Enqueue(req));
224 } 206 }
225 207
208 PollServiceHttpRequest wreq;
209
210 m_retryRequests.Clear();
211
226 while (m_requests.Count() > 0) 212 while (m_requests.Count() > 0)
227 { 213 {
228 try 214 try
229 { 215 {
230 wreq = m_requests.Dequeue(0); 216 wreq = m_requests.Dequeue(0);
231 ResponsesProcessed++; 217 wreq.DoHTTPstop(m_server);
232 wreq.DoHTTPGruntWork(
233 m_server, wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
234 } 218 }
235 catch 219 catch
236 { 220 {
237 } 221 }
238 } 222 }
239 223
240 m_longPollRequests.Clear();
241 m_requests.Clear(); 224 m_requests.Clear();
242 } 225 }
243 226
@@ -245,87 +228,69 @@ namespace OpenSim.Framework.Servers.HttpServer
245 228
246 private void PoolWorkerJob() 229 private void PoolWorkerJob()
247 { 230 {
248 while (IsRunning) 231 while (m_running)
249 { 232 {
233 PollServiceHttpRequest req = m_requests.Dequeue(4500);
250 Watchdog.UpdateThread(); 234 Watchdog.UpdateThread();
251 WaitPerformResponse(); 235 if (req != null)
252 }
253 }
254
255 public void WaitPerformResponse()
256 {
257 PollServiceHttpRequest req = m_requests.Dequeue(5000);
258// m_log.DebugFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString()));
259
260 if (req != null)
261 {
262 try
263 { 236 {
264 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) 237 try
265 { 238 {
266 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); 239 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
267
268 if (responsedata == null)
269 return;
270
271 // This is the event queue.
272 // Even if we're not running we can still perform responses by explicit request.
273 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll
274 || !PerformResponsesAsync)
275 {
276 try
277 {
278 ResponsesProcessed++;
279 req.DoHTTPGruntWork(m_server, responsedata);
280 }
281 catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream
282 {
283 // Ignore it, no need to reply
284 m_log.Error(e);
285 }
286 }
287 else
288 { 240 {
241 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
242
289 m_threadPool.QueueWorkItem(x => 243 m_threadPool.QueueWorkItem(x =>
290 { 244 {
291 try 245 try
292 { 246 {
293 ResponsesProcessed++;
294 req.DoHTTPGruntWork(m_server, responsedata); 247 req.DoHTTPGruntWork(m_server, responsedata);
295 } 248 }
296 catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream 249 catch (ObjectDisposedException)
297 { 250 {
298 // Ignore it, no need to reply
299 m_log.Error(e);
300 } 251 }
301 catch (Exception e) 252 finally
302 { 253 {
303 m_log.Error(e); 254 byContextDequeue(req);
304 } 255 }
305
306 return null; 256 return null;
307 }, null); 257 }, null);
308 } 258 }
309 }
310 else
311 {
312 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
313 {
314 ResponsesProcessed++;
315 req.DoHTTPGruntWork(
316 m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
317 }
318 else 259 else
319 { 260 {
320 ReQueueEvent(req); 261 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
262 {
263 m_threadPool.QueueWorkItem(x =>
264 {
265 try
266 {
267 req.DoHTTPGruntWork(m_server,
268 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
269 }
270 catch (ObjectDisposedException)
271 {
272 // Ignore it, no need to reply
273 }
274 finally
275 {
276 byContextDequeue(req);
277 }
278 return null;
279 }, null);
280 }
281 else
282 {
283 ReQueueEvent(req);
284 }
321 } 285 }
322 } 286 }
323 } 287 catch (Exception e)
324 catch (Exception e) 288 {
325 { 289 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
326 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); 290 }
327 } 291 }
328 } 292 }
329 } 293 }
294
330 } 295 }
331} \ No newline at end of file 296}