diff options
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r-- | OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | 188 |
1 files changed, 103 insertions, 85 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs index c6a3e65..a2f6a11 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | |||
@@ -30,13 +30,10 @@ using System.Collections; | |||
30 | using System.Threading; | 30 | using System.Threading; |
31 | using System.Reflection; | 31 | using System.Reflection; |
32 | using log4net; | 32 | using log4net; |
33 | using HttpServer; | ||
34 | using OpenSim.Framework; | ||
35 | using OpenSim.Framework.Monitoring; | 33 | using OpenSim.Framework.Monitoring; |
36 | using Amib.Threading; | 34 | using Amib.Threading; |
37 | using System.IO; | ||
38 | using System.Text; | ||
39 | using System.Collections.Generic; | 35 | using System.Collections.Generic; |
36 | using System.Collections.Concurrent; | ||
40 | 37 | ||
41 | namespace OpenSim.Framework.Servers.HttpServer | 38 | namespace OpenSim.Framework.Servers.HttpServer |
42 | { | 39 | { |
@@ -46,9 +43,9 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
46 | 43 | ||
47 | private readonly BaseHttpServer m_server; | 44 | private readonly BaseHttpServer m_server; |
48 | 45 | ||
49 | private Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>> m_bycontext; | 46 | private Dictionary<int, Queue<PollServiceHttpRequest>> m_bycontext; |
50 | private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); | 47 | private BlockingCollection<PollServiceHttpRequest> m_requests = new BlockingCollection<PollServiceHttpRequest>(); |
51 | private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>(); | 48 | private static ConcurrentQueue<PollServiceHttpRequest> m_retryRequests = new ConcurrentQueue<PollServiceHttpRequest>(); |
52 | 49 | ||
53 | private uint m_WorkerThreadCount = 0; | 50 | private uint m_WorkerThreadCount = 0; |
54 | private Thread[] m_workerThreads; | 51 | private Thread[] m_workerThreads; |
@@ -65,8 +62,7 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
65 | m_WorkerThreadCount = pWorkerThreadCount; | 62 | m_WorkerThreadCount = pWorkerThreadCount; |
66 | m_workerThreads = new Thread[m_WorkerThreadCount]; | 63 | m_workerThreads = new Thread[m_WorkerThreadCount]; |
67 | 64 | ||
68 | PollServiceHttpRequestComparer preqCp = new PollServiceHttpRequestComparer(); | 65 | m_bycontext = new Dictionary<int, Queue<PollServiceHttpRequest>>(256); |
69 | m_bycontext = new Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>>(preqCp); | ||
70 | 66 | ||
71 | STPStartInfo startInfo = new STPStartInfo(); | 67 | STPStartInfo startInfo = new STPStartInfo(); |
72 | startInfo.IdleTimeout = 30000; | 68 | startInfo.IdleTimeout = 30000; |
@@ -105,32 +101,28 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
105 | true, | 101 | true, |
106 | null, | 102 | null, |
107 | 1000 * 60 * 10); | 103 | 1000 * 60 * 10); |
108 | |||
109 | |||
110 | } | 104 | } |
111 | 105 | ||
112 | private void ReQueueEvent(PollServiceHttpRequest req) | 106 | private void ReQueueEvent(PollServiceHttpRequest req) |
113 | { | 107 | { |
114 | if (m_running) | 108 | if (m_running) |
115 | { | 109 | m_retryRequests.Enqueue(req); |
116 | lock (m_retryRequests) | ||
117 | m_retryRequests.Enqueue(req); | ||
118 | } | ||
119 | } | 110 | } |
120 | 111 | ||
121 | public void Enqueue(PollServiceHttpRequest req) | 112 | public void Enqueue(PollServiceHttpRequest req) |
122 | { | 113 | { |
114 | Queue<PollServiceHttpRequest> ctxQeueue; | ||
115 | int rhash = req.contextHash; | ||
123 | lock (m_bycontext) | 116 | lock (m_bycontext) |
124 | { | 117 | { |
125 | Queue<PollServiceHttpRequest> ctxQeueue; | 118 | if (m_bycontext.TryGetValue(rhash, out ctxQeueue)) |
126 | if (m_bycontext.TryGetValue(req, out ctxQeueue)) | ||
127 | { | 119 | { |
128 | ctxQeueue.Enqueue(req); | 120 | ctxQeueue.Enqueue(req); |
129 | } | 121 | } |
130 | else | 122 | else |
131 | { | 123 | { |
132 | ctxQeueue = new Queue<PollServiceHttpRequest>(); | 124 | ctxQeueue = new Queue<PollServiceHttpRequest>(); |
133 | m_bycontext[req] = ctxQeueue; | 125 | m_bycontext[rhash] = ctxQeueue; |
134 | EnqueueInt(req); | 126 | EnqueueInt(req); |
135 | } | 127 | } |
136 | } | 128 | } |
@@ -139,9 +131,10 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
139 | public void byContextDequeue(PollServiceHttpRequest req) | 131 | public void byContextDequeue(PollServiceHttpRequest req) |
140 | { | 132 | { |
141 | Queue<PollServiceHttpRequest> ctxQeueue; | 133 | Queue<PollServiceHttpRequest> ctxQeueue; |
134 | int rhash = req.contextHash; | ||
142 | lock (m_bycontext) | 135 | lock (m_bycontext) |
143 | { | 136 | { |
144 | if (m_bycontext.TryGetValue(req, out ctxQeueue)) | 137 | if (m_bycontext.TryGetValue(rhash, out ctxQeueue)) |
145 | { | 138 | { |
146 | if (ctxQeueue.Count > 0) | 139 | if (ctxQeueue.Count > 0) |
147 | { | 140 | { |
@@ -150,30 +143,41 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
150 | } | 143 | } |
151 | else | 144 | else |
152 | { | 145 | { |
153 | m_bycontext.Remove(req); | 146 | m_bycontext.Remove(rhash); |
154 | } | 147 | } |
155 | } | 148 | } |
156 | } | 149 | } |
157 | } | 150 | } |
158 | 151 | ||
152 | public void DropByContext(PollServiceHttpRequest req) | ||
153 | { | ||
154 | Queue<PollServiceHttpRequest> ctxQeueue; | ||
155 | int rhash = req.contextHash; | ||
156 | lock (m_bycontext) | ||
157 | { | ||
158 | if (m_bycontext.TryGetValue(rhash, out ctxQeueue)) | ||
159 | { | ||
160 | ctxQeueue.Clear(); | ||
161 | m_bycontext.Remove(rhash); | ||
162 | } | ||
163 | } | ||
164 | } | ||
165 | |||
159 | public void EnqueueInt(PollServiceHttpRequest req) | 166 | public void EnqueueInt(PollServiceHttpRequest req) |
160 | { | 167 | { |
161 | if (m_running) | 168 | if (m_running) |
162 | m_requests.Enqueue(req); | 169 | m_requests.Add(req); |
163 | } | 170 | } |
164 | 171 | ||
165 | private void CheckRetries() | 172 | private void CheckRetries() |
166 | { | 173 | { |
174 | PollServiceHttpRequest preq; | ||
167 | while (m_running) | 175 | while (m_running) |
168 | |||
169 | { | 176 | { |
170 | Thread.Sleep(100); // let the world move .. back to faster rate | 177 | Thread.Sleep(100); |
171 | Watchdog.UpdateThread(); | 178 | Watchdog.UpdateThread(); |
172 | lock (m_retryRequests) | 179 | while (m_running && m_retryRequests.TryDequeue(out preq)) |
173 | { | 180 | m_requests.Add(preq); |
174 | while (m_retryRequests.Count > 0 && m_running) | ||
175 | m_requests.Enqueue(m_retryRequests.Dequeue()); | ||
176 | } | ||
177 | } | 181 | } |
178 | } | 182 | } |
179 | 183 | ||
@@ -194,103 +198,117 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
194 | qu.Clear(); | 198 | qu.Clear(); |
195 | m_bycontext.Clear(); | 199 | m_bycontext.Clear(); |
196 | 200 | ||
201 | PollServiceHttpRequest req; | ||
197 | try | 202 | try |
198 | { | 203 | { |
199 | foreach (PollServiceHttpRequest req in m_retryRequests) | 204 | while(m_retryRequests.TryDequeue(out req)) |
200 | { | ||
201 | req.DoHTTPstop(m_server); | 205 | req.DoHTTPstop(m_server); |
202 | } | ||
203 | } | 206 | } |
204 | catch | 207 | catch |
205 | { | 208 | { |
206 | } | 209 | } |
207 | 210 | ||
208 | PollServiceHttpRequest wreq; | 211 | try |
209 | 212 | { | |
210 | m_retryRequests.Clear(); | 213 | while(m_requests.TryTake(out req, 0)) |
211 | 214 | req.DoHTTPstop(m_server); | |
212 | while (m_requests.Count() > 0) | 215 | } |
216 | catch | ||
213 | { | 217 | { |
214 | try | ||
215 | { | ||
216 | wreq = m_requests.Dequeue(0); | ||
217 | wreq.DoHTTPstop(m_server); | ||
218 | } | ||
219 | catch | ||
220 | { | ||
221 | } | ||
222 | } | 218 | } |
223 | 219 | ||
224 | m_requests.Clear(); | 220 | m_requests.Dispose(); |
221 | |||
225 | } | 222 | } |
226 | 223 | ||
227 | // work threads | 224 | // work threads |
228 | 225 | ||
229 | private void PoolWorkerJob() | 226 | private void PoolWorkerJob() |
230 | { | 227 | { |
228 | PollServiceHttpRequest req; | ||
231 | while (m_running) | 229 | while (m_running) |
232 | { | 230 | { |
233 | PollServiceHttpRequest req = m_requests.Dequeue(4500); | 231 | req = null; |
234 | Watchdog.UpdateThread(); | 232 | if(!m_requests.TryTake(out req, 4500) || req == null) |
235 | if (req != null) | ||
236 | { | 233 | { |
237 | try | 234 | Watchdog.UpdateThread(); |
235 | continue; | ||
236 | } | ||
237 | |||
238 | Watchdog.UpdateThread(); | ||
239 | |||
240 | try | ||
241 | { | ||
242 | if(!req.HttpContext.CanSend()) | ||
238 | { | 243 | { |
239 | if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) | 244 | req.PollServiceArgs.Drop(req.RequestID, req.PollServiceArgs.Id); |
245 | byContextDequeue(req); | ||
246 | continue; | ||
247 | } | ||
248 | |||
249 | if(req.HttpContext.IsSending()) | ||
250 | { | ||
251 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) | ||
240 | { | 252 | { |
241 | Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); | 253 | req.PollServiceArgs.Drop(req.RequestID, req.PollServiceArgs.Id); |
254 | byContextDequeue(req); | ||
255 | } | ||
256 | else | ||
257 | ReQueueEvent(req); | ||
258 | continue; | ||
259 | } | ||
242 | 260 | ||
261 | if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) | ||
262 | { | ||
263 | PollServiceHttpRequest nreq = req; | ||
264 | m_threadPool.QueueWorkItem(x => | ||
265 | { | ||
266 | try | ||
267 | { | ||
268 | Hashtable responsedata = nreq.PollServiceArgs.GetEvents(nreq.RequestID, nreq.PollServiceArgs.Id); | ||
269 | nreq.DoHTTPGruntWork(m_server, responsedata); | ||
270 | } | ||
271 | catch (ObjectDisposedException) { } | ||
272 | finally | ||
273 | { | ||
274 | byContextDequeue(nreq); | ||
275 | nreq = null; | ||
276 | } | ||
277 | return null; | ||
278 | }, null); | ||
279 | } | ||
280 | else | ||
281 | { | ||
282 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) | ||
283 | { | ||
284 | PollServiceHttpRequest nreq = req; | ||
243 | m_threadPool.QueueWorkItem(x => | 285 | m_threadPool.QueueWorkItem(x => |
244 | { | 286 | { |
245 | try | 287 | try |
246 | { | 288 | { |
247 | req.DoHTTPGruntWork(m_server, responsedata); | 289 | nreq.DoHTTPGruntWork(m_server, |
248 | } | 290 | nreq.PollServiceArgs.NoEvents(nreq.RequestID, nreq.PollServiceArgs.Id)); |
249 | catch (ObjectDisposedException) | ||
250 | { | ||
251 | } | 291 | } |
292 | catch (ObjectDisposedException) {} | ||
252 | finally | 293 | finally |
253 | { | 294 | { |
254 | byContextDequeue(req); | 295 | byContextDequeue(nreq); |
296 | nreq = null; | ||
255 | } | 297 | } |
256 | return null; | 298 | return null; |
257 | }, null); | 299 | }, null); |
258 | } | 300 | } |
259 | else | 301 | else |
260 | { | 302 | { |
261 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) | 303 | ReQueueEvent(req); |
262 | { | ||
263 | m_threadPool.QueueWorkItem(x => | ||
264 | { | ||
265 | try | ||
266 | { | ||
267 | req.DoHTTPGruntWork(m_server, | ||
268 | req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
269 | } | ||
270 | catch (ObjectDisposedException) | ||
271 | { | ||
272 | // Ignore it, no need to reply | ||
273 | } | ||
274 | finally | ||
275 | { | ||
276 | byContextDequeue(req); | ||
277 | } | ||
278 | return null; | ||
279 | }, null); | ||
280 | } | ||
281 | else | ||
282 | { | ||
283 | ReQueueEvent(req); | ||
284 | } | ||
285 | } | 304 | } |
286 | } | 305 | } |
287 | catch (Exception e) | 306 | } |
288 | { | 307 | catch (Exception e) |
289 | m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); | 308 | { |
290 | } | 309 | m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); |
291 | } | 310 | } |
292 | } | 311 | } |
293 | } | 312 | } |
294 | |||
295 | } | 313 | } |
296 | } | 314 | } |