diff options
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r-- | OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | 327 |
1 files changed, 146 insertions, 181 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs index 28bba70..c6a3e65 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | |||
@@ -44,200 +44,183 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
44 | { | 44 | { |
45 | private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); | 45 | private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); |
46 | 46 | ||
47 | /// <summary> | ||
48 | /// Is the poll service request manager running? | ||
49 | /// </summary> | ||
50 | /// <remarks> | ||
51 | /// Can be running either synchronously or asynchronously | ||
52 | /// </remarks> | ||
53 | public bool IsRunning { get; private set; } | ||
54 | |||
55 | /// <summary> | ||
56 | /// Is the poll service performing responses asynchronously (with its own threads) or synchronously (via | ||
57 | /// external calls)? | ||
58 | /// </summary> | ||
59 | public bool PerformResponsesAsync { get; private set; } | ||
60 | |||
61 | /// <summary> | ||
62 | /// Number of responses actually processed and sent to viewer (or aborted due to error). | ||
63 | /// </summary> | ||
64 | public int ResponsesProcessed { get; private set; } | ||
65 | |||
66 | private readonly BaseHttpServer m_server; | 47 | private readonly BaseHttpServer m_server; |
67 | 48 | ||
49 | private Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>> m_bycontext; | ||
68 | private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); | 50 | private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); |
69 | private static List<PollServiceHttpRequest> m_longPollRequests = new List<PollServiceHttpRequest>(); | 51 | private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>(); |
70 | 52 | ||
71 | private uint m_WorkerThreadCount = 0; | 53 | private uint m_WorkerThreadCount = 0; |
72 | private Thread[] m_workerThreads; | 54 | private Thread[] m_workerThreads; |
55 | private Thread m_retrysThread; | ||
73 | 56 | ||
74 | private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); | 57 | private bool m_running = false; |
75 | 58 | ||
76 | // private int m_timeout = 1000; // increase timeout 250; now use the event one | 59 | private SmartThreadPool m_threadPool; |
77 | 60 | ||
78 | public PollServiceRequestManager( | 61 | public PollServiceRequestManager( |
79 | BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout) | 62 | BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout) |
80 | { | 63 | { |
81 | m_server = pSrv; | 64 | m_server = pSrv; |
82 | PerformResponsesAsync = performResponsesAsync; | ||
83 | m_WorkerThreadCount = pWorkerThreadCount; | 65 | m_WorkerThreadCount = pWorkerThreadCount; |
84 | m_workerThreads = new Thread[m_WorkerThreadCount]; | 66 | m_workerThreads = new Thread[m_WorkerThreadCount]; |
85 | 67 | ||
86 | StatsManager.RegisterStat( | 68 | PollServiceHttpRequestComparer preqCp = new PollServiceHttpRequestComparer(); |
87 | new Stat( | 69 | m_bycontext = new Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>>(preqCp); |
88 | "QueuedPollResponses", | 70 | |
89 | "Number of poll responses queued for processing.", | 71 | STPStartInfo startInfo = new STPStartInfo(); |
90 | "", | 72 | startInfo.IdleTimeout = 30000; |
91 | "", | 73 | startInfo.MaxWorkerThreads = 20; |
92 | "httpserver", | 74 | startInfo.MinWorkerThreads = 1; |
93 | m_server.Port.ToString(), | 75 | startInfo.ThreadPriority = ThreadPriority.Normal; |
94 | StatType.Pull, | 76 | startInfo.StartSuspended = true; |
95 | MeasuresOfInterest.AverageChangeOverTime, | 77 | startInfo.ThreadPoolName = "PoolService"; |
96 | stat => stat.Value = m_requests.Count(), | 78 | |
97 | StatVerbosity.Debug)); | 79 | m_threadPool = new SmartThreadPool(startInfo); |
98 | |||
99 | StatsManager.RegisterStat( | ||
100 | new Stat( | ||
101 | "ProcessedPollResponses", | ||
102 | "Number of poll responses processed.", | ||
103 | "", | ||
104 | "", | ||
105 | "httpserver", | ||
106 | m_server.Port.ToString(), | ||
107 | StatType.Pull, | ||
108 | MeasuresOfInterest.AverageChangeOverTime, | ||
109 | stat => stat.Value = ResponsesProcessed, | ||
110 | StatVerbosity.Debug)); | ||
111 | } | 80 | } |
112 | 81 | ||
113 | public void Start() | 82 | public void Start() |
114 | { | 83 | { |
115 | IsRunning = true; | 84 | m_running = true; |
116 | 85 | m_threadPool.Start(); | |
117 | if (PerformResponsesAsync) | 86 | //startup worker threads |
87 | for (uint i = 0; i < m_WorkerThreadCount; i++) | ||
118 | { | 88 | { |
119 | //startup worker threads | 89 | m_workerThreads[i] |
120 | for (uint i = 0; i < m_WorkerThreadCount; i++) | 90 | = WorkManager.StartThread( |
121 | { | 91 | PoolWorkerJob, |
122 | m_workerThreads[i] | 92 | string.Format("PollServiceWorkerThread {0}:{1}", i, m_server.Port), |
123 | = WorkManager.StartThread( | 93 | ThreadPriority.Normal, |
124 | PoolWorkerJob, | 94 | true, |
125 | string.Format("PollServiceWorkerThread{0}:{1}", i, m_server.Port), | 95 | false, |
126 | ThreadPriority.Normal, | 96 | null, |
127 | false, | 97 | int.MaxValue); |
128 | false, | ||
129 | null, | ||
130 | int.MaxValue); | ||
131 | } | ||
132 | |||
133 | WorkManager.StartThread( | ||
134 | this.CheckLongPollThreads, | ||
135 | string.Format("LongPollServiceWatcherThread:{0}", m_server.Port), | ||
136 | ThreadPriority.Normal, | ||
137 | false, | ||
138 | true, | ||
139 | null, | ||
140 | 1000 * 60 * 10); | ||
141 | } | 98 | } |
99 | |||
100 | m_retrysThread = WorkManager.StartThread( | ||
101 | this.CheckRetries, | ||
102 | string.Format("PollServiceWatcherThread:{0}", m_server.Port), | ||
103 | ThreadPriority.Normal, | ||
104 | true, | ||
105 | true, | ||
106 | null, | ||
107 | 1000 * 60 * 10); | ||
108 | |||
109 | |||
142 | } | 110 | } |
143 | 111 | ||
144 | private void ReQueueEvent(PollServiceHttpRequest req) | 112 | private void ReQueueEvent(PollServiceHttpRequest req) |
145 | { | 113 | { |
146 | if (IsRunning) | 114 | if (m_running) |
147 | { | 115 | { |
148 | // delay the enqueueing for 100ms. There's no need to have the event | 116 | lock (m_retryRequests) |
149 | // actively on the queue | 117 | m_retryRequests.Enqueue(req); |
150 | Timer t = new Timer(self => { | ||
151 | ((Timer)self).Dispose(); | ||
152 | m_requests.Enqueue(req); | ||
153 | }); | ||
154 | |||
155 | t.Change(100, Timeout.Infinite); | ||
156 | |||
157 | } | 118 | } |
158 | } | 119 | } |
159 | 120 | ||
160 | public void Enqueue(PollServiceHttpRequest req) | 121 | public void Enqueue(PollServiceHttpRequest req) |
161 | { | 122 | { |
162 | if (IsRunning) | 123 | lock (m_bycontext) |
163 | { | 124 | { |
164 | if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) | 125 | Queue<PollServiceHttpRequest> ctxQeueue; |
126 | if (m_bycontext.TryGetValue(req, out ctxQeueue)) | ||
165 | { | 127 | { |
166 | lock (m_longPollRequests) | 128 | ctxQeueue.Enqueue(req); |
167 | m_longPollRequests.Add(req); | ||
168 | } | 129 | } |
169 | else | 130 | else |
170 | m_requests.Enqueue(req); | 131 | { |
132 | ctxQeueue = new Queue<PollServiceHttpRequest>(); | ||
133 | m_bycontext[req] = ctxQeueue; | ||
134 | EnqueueInt(req); | ||
135 | } | ||
171 | } | 136 | } |
172 | } | 137 | } |
173 | 138 | ||
174 | private void CheckLongPollThreads() | 139 | public void byContextDequeue(PollServiceHttpRequest req) |
175 | { | 140 | { |
176 | // The only purpose of this thread is to check the EQs for events. | 141 | Queue<PollServiceHttpRequest> ctxQeueue; |
177 | // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests. | 142 | lock (m_bycontext) |
178 | // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests. | ||
179 | // All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature, | ||
180 | // so if they aren't ready to be served by a worker thread (no events), they are placed | ||
181 | // directly back in the "ready-to-serve" queue by the worker thread. | ||
182 | while (IsRunning) | ||
183 | { | 143 | { |
184 | Thread.Sleep(500); | 144 | if (m_bycontext.TryGetValue(req, out ctxQeueue)) |
185 | Watchdog.UpdateThread(); | ||
186 | |||
187 | // List<PollServiceHttpRequest> not_ready = new List<PollServiceHttpRequest>(); | ||
188 | lock (m_longPollRequests) | ||
189 | { | 145 | { |
190 | if (m_longPollRequests.Count > 0 && IsRunning) | 146 | if (ctxQeueue.Count > 0) |
147 | { | ||
148 | PollServiceHttpRequest newreq = ctxQeueue.Dequeue(); | ||
149 | EnqueueInt(newreq); | ||
150 | } | ||
151 | else | ||
191 | { | 152 | { |
192 | List<PollServiceHttpRequest> ready = m_longPollRequests.FindAll(req => | 153 | m_bycontext.Remove(req); |
193 | (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ | 154 | } |
194 | (Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout | 155 | } |
195 | ); | 156 | } |
157 | } | ||
196 | 158 | ||
197 | ready.ForEach(req => | 159 | public void EnqueueInt(PollServiceHttpRequest req) |
198 | { | 160 | { |
199 | m_requests.Enqueue(req); | 161 | if (m_running) |
200 | m_longPollRequests.Remove(req); | 162 | m_requests.Enqueue(req); |
201 | }); | 163 | } |
202 | 164 | ||
203 | } | 165 | private void CheckRetries() |
166 | { | ||
167 | while (m_running) | ||
204 | 168 | ||
169 | { | ||
170 | Thread.Sleep(100); // let the world move .. back to faster rate | ||
171 | Watchdog.UpdateThread(); | ||
172 | lock (m_retryRequests) | ||
173 | { | ||
174 | while (m_retryRequests.Count > 0 && m_running) | ||
175 | m_requests.Enqueue(m_retryRequests.Dequeue()); | ||
205 | } | 176 | } |
206 | } | 177 | } |
207 | } | 178 | } |
208 | 179 | ||
209 | public void Stop() | 180 | public void Stop() |
210 | { | 181 | { |
211 | IsRunning = false; | 182 | m_running = false; |
212 | // m_timeout = -10000; // cause all to expire | 183 | |
213 | Thread.Sleep(1000); // let the world move | 184 | Thread.Sleep(100); // let the world move |
214 | 185 | ||
215 | foreach (Thread t in m_workerThreads) | 186 | foreach (Thread t in m_workerThreads) |
216 | Watchdog.AbortThread(t.ManagedThreadId); | 187 | Watchdog.AbortThread(t.ManagedThreadId); |
217 | 188 | ||
218 | PollServiceHttpRequest wreq; | 189 | m_threadPool.Shutdown(); |
190 | |||
191 | // any entry in m_bycontext should have a active request on the other queues | ||
192 | // so just delete contents to easy GC | ||
193 | foreach (Queue<PollServiceHttpRequest> qu in m_bycontext.Values) | ||
194 | qu.Clear(); | ||
195 | m_bycontext.Clear(); | ||
219 | 196 | ||
220 | lock (m_longPollRequests) | 197 | try |
198 | { | ||
199 | foreach (PollServiceHttpRequest req in m_retryRequests) | ||
200 | { | ||
201 | req.DoHTTPstop(m_server); | ||
202 | } | ||
203 | } | ||
204 | catch | ||
221 | { | 205 | { |
222 | if (m_longPollRequests.Count > 0 && IsRunning) | ||
223 | m_longPollRequests.ForEach(req => m_requests.Enqueue(req)); | ||
224 | } | 206 | } |
225 | 207 | ||
208 | PollServiceHttpRequest wreq; | ||
209 | |||
210 | m_retryRequests.Clear(); | ||
211 | |||
226 | while (m_requests.Count() > 0) | 212 | while (m_requests.Count() > 0) |
227 | { | 213 | { |
228 | try | 214 | try |
229 | { | 215 | { |
230 | wreq = m_requests.Dequeue(0); | 216 | wreq = m_requests.Dequeue(0); |
231 | ResponsesProcessed++; | 217 | wreq.DoHTTPstop(m_server); |
232 | wreq.DoHTTPGruntWork( | ||
233 | m_server, wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id)); | ||
234 | } | 218 | } |
235 | catch | 219 | catch |
236 | { | 220 | { |
237 | } | 221 | } |
238 | } | 222 | } |
239 | 223 | ||
240 | m_longPollRequests.Clear(); | ||
241 | m_requests.Clear(); | 224 | m_requests.Clear(); |
242 | } | 225 | } |
243 | 226 | ||
@@ -245,87 +228,69 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
245 | 228 | ||
246 | private void PoolWorkerJob() | 229 | private void PoolWorkerJob() |
247 | { | 230 | { |
248 | while (IsRunning) | 231 | while (m_running) |
249 | { | 232 | { |
233 | PollServiceHttpRequest req = m_requests.Dequeue(4500); | ||
250 | Watchdog.UpdateThread(); | 234 | Watchdog.UpdateThread(); |
251 | WaitPerformResponse(); | 235 | if (req != null) |
252 | } | ||
253 | } | ||
254 | |||
255 | public void WaitPerformResponse() | ||
256 | { | ||
257 | PollServiceHttpRequest req = m_requests.Dequeue(5000); | ||
258 | // m_log.DebugFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString())); | ||
259 | |||
260 | if (req != null) | ||
261 | { | ||
262 | try | ||
263 | { | 236 | { |
264 | if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) | 237 | try |
265 | { | 238 | { |
266 | Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); | 239 | if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) |
267 | |||
268 | if (responsedata == null) | ||
269 | return; | ||
270 | |||
271 | // This is the event queue. | ||
272 | // Even if we're not running we can still perform responses by explicit request. | ||
273 | if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll | ||
274 | || !PerformResponsesAsync) | ||
275 | { | ||
276 | try | ||
277 | { | ||
278 | ResponsesProcessed++; | ||
279 | req.DoHTTPGruntWork(m_server, responsedata); | ||
280 | } | ||
281 | catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream | ||
282 | { | ||
283 | // Ignore it, no need to reply | ||
284 | m_log.Error(e); | ||
285 | } | ||
286 | } | ||
287 | else | ||
288 | { | 240 | { |
241 | Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); | ||
242 | |||
289 | m_threadPool.QueueWorkItem(x => | 243 | m_threadPool.QueueWorkItem(x => |
290 | { | 244 | { |
291 | try | 245 | try |
292 | { | 246 | { |
293 | ResponsesProcessed++; | ||
294 | req.DoHTTPGruntWork(m_server, responsedata); | 247 | req.DoHTTPGruntWork(m_server, responsedata); |
295 | } | 248 | } |
296 | catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream | 249 | catch (ObjectDisposedException) |
297 | { | 250 | { |
298 | // Ignore it, no need to reply | ||
299 | m_log.Error(e); | ||
300 | } | 251 | } |
301 | catch (Exception e) | 252 | finally |
302 | { | 253 | { |
303 | m_log.Error(e); | 254 | byContextDequeue(req); |
304 | } | 255 | } |
305 | |||
306 | return null; | 256 | return null; |
307 | }, null); | 257 | }, null); |
308 | } | 258 | } |
309 | } | ||
310 | else | ||
311 | { | ||
312 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) | ||
313 | { | ||
314 | ResponsesProcessed++; | ||
315 | req.DoHTTPGruntWork( | ||
316 | m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
317 | } | ||
318 | else | 259 | else |
319 | { | 260 | { |
320 | ReQueueEvent(req); | 261 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) |
262 | { | ||
263 | m_threadPool.QueueWorkItem(x => | ||
264 | { | ||
265 | try | ||
266 | { | ||
267 | req.DoHTTPGruntWork(m_server, | ||
268 | req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
269 | } | ||
270 | catch (ObjectDisposedException) | ||
271 | { | ||
272 | // Ignore it, no need to reply | ||
273 | } | ||
274 | finally | ||
275 | { | ||
276 | byContextDequeue(req); | ||
277 | } | ||
278 | return null; | ||
279 | }, null); | ||
280 | } | ||
281 | else | ||
282 | { | ||
283 | ReQueueEvent(req); | ||
284 | } | ||
321 | } | 285 | } |
322 | } | 286 | } |
323 | } | 287 | catch (Exception e) |
324 | catch (Exception e) | 288 | { |
325 | { | 289 | m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); |
326 | m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); | 290 | } |
327 | } | 291 | } |
328 | } | 292 | } |
329 | } | 293 | } |
294 | |||
330 | } | 295 | } |
331 | } \ No newline at end of file | 296 | } |