aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r--OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs188
1 files changed, 103 insertions, 85 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index c6a3e65..a2f6a11 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -30,13 +30,10 @@ using System.Collections;
30using System.Threading; 30using System.Threading;
31using System.Reflection; 31using System.Reflection;
32using log4net; 32using log4net;
33using HttpServer;
34using OpenSim.Framework;
35using OpenSim.Framework.Monitoring; 33using OpenSim.Framework.Monitoring;
36using Amib.Threading; 34using Amib.Threading;
37using System.IO;
38using System.Text;
39using System.Collections.Generic; 35using System.Collections.Generic;
36using System.Collections.Concurrent;
40 37
41namespace OpenSim.Framework.Servers.HttpServer 38namespace OpenSim.Framework.Servers.HttpServer
42{ 39{
@@ -46,9 +43,9 @@ namespace OpenSim.Framework.Servers.HttpServer
46 43
47 private readonly BaseHttpServer m_server; 44 private readonly BaseHttpServer m_server;
48 45
49 private Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>> m_bycontext; 46 private Dictionary<int, Queue<PollServiceHttpRequest>> m_bycontext;
50 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); 47 private BlockingCollection<PollServiceHttpRequest> m_requests = new BlockingCollection<PollServiceHttpRequest>();
51 private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>(); 48 private static ConcurrentQueue<PollServiceHttpRequest> m_retryRequests = new ConcurrentQueue<PollServiceHttpRequest>();
52 49
53 private uint m_WorkerThreadCount = 0; 50 private uint m_WorkerThreadCount = 0;
54 private Thread[] m_workerThreads; 51 private Thread[] m_workerThreads;
@@ -65,8 +62,7 @@ namespace OpenSim.Framework.Servers.HttpServer
65 m_WorkerThreadCount = pWorkerThreadCount; 62 m_WorkerThreadCount = pWorkerThreadCount;
66 m_workerThreads = new Thread[m_WorkerThreadCount]; 63 m_workerThreads = new Thread[m_WorkerThreadCount];
67 64
68 PollServiceHttpRequestComparer preqCp = new PollServiceHttpRequestComparer(); 65 m_bycontext = new Dictionary<int, Queue<PollServiceHttpRequest>>(256);
69 m_bycontext = new Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>>(preqCp);
70 66
71 STPStartInfo startInfo = new STPStartInfo(); 67 STPStartInfo startInfo = new STPStartInfo();
72 startInfo.IdleTimeout = 30000; 68 startInfo.IdleTimeout = 30000;
@@ -105,32 +101,28 @@ namespace OpenSim.Framework.Servers.HttpServer
105 true, 101 true,
106 null, 102 null,
107 1000 * 60 * 10); 103 1000 * 60 * 10);
108
109
110 } 104 }
111 105
112 private void ReQueueEvent(PollServiceHttpRequest req) 106 private void ReQueueEvent(PollServiceHttpRequest req)
113 { 107 {
114 if (m_running) 108 if (m_running)
115 { 109 m_retryRequests.Enqueue(req);
116 lock (m_retryRequests)
117 m_retryRequests.Enqueue(req);
118 }
119 } 110 }
120 111
121 public void Enqueue(PollServiceHttpRequest req) 112 public void Enqueue(PollServiceHttpRequest req)
122 { 113 {
114 Queue<PollServiceHttpRequest> ctxQeueue;
115 int rhash = req.contextHash;
123 lock (m_bycontext) 116 lock (m_bycontext)
124 { 117 {
125 Queue<PollServiceHttpRequest> ctxQeueue; 118 if (m_bycontext.TryGetValue(rhash, out ctxQeueue))
126 if (m_bycontext.TryGetValue(req, out ctxQeueue))
127 { 119 {
128 ctxQeueue.Enqueue(req); 120 ctxQeueue.Enqueue(req);
129 } 121 }
130 else 122 else
131 { 123 {
132 ctxQeueue = new Queue<PollServiceHttpRequest>(); 124 ctxQeueue = new Queue<PollServiceHttpRequest>();
133 m_bycontext[req] = ctxQeueue; 125 m_bycontext[rhash] = ctxQeueue;
134 EnqueueInt(req); 126 EnqueueInt(req);
135 } 127 }
136 } 128 }
@@ -139,9 +131,10 @@ namespace OpenSim.Framework.Servers.HttpServer
139 public void byContextDequeue(PollServiceHttpRequest req) 131 public void byContextDequeue(PollServiceHttpRequest req)
140 { 132 {
141 Queue<PollServiceHttpRequest> ctxQeueue; 133 Queue<PollServiceHttpRequest> ctxQeueue;
134 int rhash = req.contextHash;
142 lock (m_bycontext) 135 lock (m_bycontext)
143 { 136 {
144 if (m_bycontext.TryGetValue(req, out ctxQeueue)) 137 if (m_bycontext.TryGetValue(rhash, out ctxQeueue))
145 { 138 {
146 if (ctxQeueue.Count > 0) 139 if (ctxQeueue.Count > 0)
147 { 140 {
@@ -150,30 +143,41 @@ namespace OpenSim.Framework.Servers.HttpServer
150 } 143 }
151 else 144 else
152 { 145 {
153 m_bycontext.Remove(req); 146 m_bycontext.Remove(rhash);
154 } 147 }
155 } 148 }
156 } 149 }
157 } 150 }
158 151
152 public void DropByContext(PollServiceHttpRequest req)
153 {
154 Queue<PollServiceHttpRequest> ctxQeueue;
155 int rhash = req.contextHash;
156 lock (m_bycontext)
157 {
158 if (m_bycontext.TryGetValue(rhash, out ctxQeueue))
159 {
160 ctxQeueue.Clear();
161 m_bycontext.Remove(rhash);
162 }
163 }
164 }
165
159 public void EnqueueInt(PollServiceHttpRequest req) 166 public void EnqueueInt(PollServiceHttpRequest req)
160 { 167 {
161 if (m_running) 168 if (m_running)
162 m_requests.Enqueue(req); 169 m_requests.Add(req);
163 } 170 }
164 171
165 private void CheckRetries() 172 private void CheckRetries()
166 { 173 {
174 PollServiceHttpRequest preq;
167 while (m_running) 175 while (m_running)
168
169 { 176 {
170 Thread.Sleep(100); // let the world move .. back to faster rate 177 Thread.Sleep(100);
171 Watchdog.UpdateThread(); 178 Watchdog.UpdateThread();
172 lock (m_retryRequests) 179 while (m_running && m_retryRequests.TryDequeue(out preq))
173 { 180 m_requests.Add(preq);
174 while (m_retryRequests.Count > 0 && m_running)
175 m_requests.Enqueue(m_retryRequests.Dequeue());
176 }
177 } 181 }
178 } 182 }
179 183
@@ -194,103 +198,117 @@ namespace OpenSim.Framework.Servers.HttpServer
194 qu.Clear(); 198 qu.Clear();
195 m_bycontext.Clear(); 199 m_bycontext.Clear();
196 200
201 PollServiceHttpRequest req;
197 try 202 try
198 { 203 {
199 foreach (PollServiceHttpRequest req in m_retryRequests) 204 while(m_retryRequests.TryDequeue(out req))
200 {
201 req.DoHTTPstop(m_server); 205 req.DoHTTPstop(m_server);
202 }
203 } 206 }
204 catch 207 catch
205 { 208 {
206 } 209 }
207 210
208 PollServiceHttpRequest wreq; 211 try
209 212 {
210 m_retryRequests.Clear(); 213 while(m_requests.TryTake(out req, 0))
211 214 req.DoHTTPstop(m_server);
212 while (m_requests.Count() > 0) 215 }
216 catch
213 { 217 {
214 try
215 {
216 wreq = m_requests.Dequeue(0);
217 wreq.DoHTTPstop(m_server);
218 }
219 catch
220 {
221 }
222 } 218 }
223 219
224 m_requests.Clear(); 220 m_requests.Dispose();
221
225 } 222 }
226 223
227 // work threads 224 // work threads
228 225
229 private void PoolWorkerJob() 226 private void PoolWorkerJob()
230 { 227 {
228 PollServiceHttpRequest req;
231 while (m_running) 229 while (m_running)
232 { 230 {
233 PollServiceHttpRequest req = m_requests.Dequeue(4500); 231 req = null;
234 Watchdog.UpdateThread(); 232 if(!m_requests.TryTake(out req, 4500) || req == null)
235 if (req != null)
236 { 233 {
237 try 234 Watchdog.UpdateThread();
235 continue;
236 }
237
238 Watchdog.UpdateThread();
239
240 try
241 {
242 if(!req.HttpContext.CanSend())
238 { 243 {
239 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) 244 req.PollServiceArgs.Drop(req.RequestID, req.PollServiceArgs.Id);
245 byContextDequeue(req);
246 continue;
247 }
248
249 if(req.HttpContext.IsSending())
250 {
251 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
240 { 252 {
241 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); 253 req.PollServiceArgs.Drop(req.RequestID, req.PollServiceArgs.Id);
254 byContextDequeue(req);
255 }
256 else
257 ReQueueEvent(req);
258 continue;
259 }
242 260
261 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
262 {
263 PollServiceHttpRequest nreq = req;
264 m_threadPool.QueueWorkItem(x =>
265 {
266 try
267 {
268 Hashtable responsedata = nreq.PollServiceArgs.GetEvents(nreq.RequestID, nreq.PollServiceArgs.Id);
269 nreq.DoHTTPGruntWork(m_server, responsedata);
270 }
271 catch (ObjectDisposedException) { }
272 finally
273 {
274 byContextDequeue(nreq);
275 nreq = null;
276 }
277 return null;
278 }, null);
279 }
280 else
281 {
282 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
283 {
284 PollServiceHttpRequest nreq = req;
243 m_threadPool.QueueWorkItem(x => 285 m_threadPool.QueueWorkItem(x =>
244 { 286 {
245 try 287 try
246 { 288 {
247 req.DoHTTPGruntWork(m_server, responsedata); 289 nreq.DoHTTPGruntWork(m_server,
248 } 290 nreq.PollServiceArgs.NoEvents(nreq.RequestID, nreq.PollServiceArgs.Id));
249 catch (ObjectDisposedException)
250 {
251 } 291 }
292 catch (ObjectDisposedException) {}
252 finally 293 finally
253 { 294 {
254 byContextDequeue(req); 295 byContextDequeue(nreq);
296 nreq = null;
255 } 297 }
256 return null; 298 return null;
257 }, null); 299 }, null);
258 } 300 }
259 else 301 else
260 { 302 {
261 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) 303 ReQueueEvent(req);
262 {
263 m_threadPool.QueueWorkItem(x =>
264 {
265 try
266 {
267 req.DoHTTPGruntWork(m_server,
268 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
269 }
270 catch (ObjectDisposedException)
271 {
272 // Ignore it, no need to reply
273 }
274 finally
275 {
276 byContextDequeue(req);
277 }
278 return null;
279 }, null);
280 }
281 else
282 {
283 ReQueueEvent(req);
284 }
285 } 304 }
286 } 305 }
287 catch (Exception e) 306 }
288 { 307 catch (Exception e)
289 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); 308 {
290 } 309 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
291 } 310 }
292 } 311 }
293 } 312 }
294
295 } 313 }
296} 314}