aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r--OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs145
1 files changed, 89 insertions, 56 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index 6aa5907..44f7045 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -47,17 +47,18 @@ namespace OpenSim.Framework.Servers.HttpServer
47 private readonly BaseHttpServer m_server; 47 private readonly BaseHttpServer m_server;
48 48
49 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); 49 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
50 private static List<PollServiceHttpRequest> m_longPollRequests = new List<PollServiceHttpRequest>(); 50 private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>();
51 private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
51 52
52 private uint m_WorkerThreadCount = 0; 53 private uint m_WorkerThreadCount = 0;
53 private Thread[] m_workerThreads; 54 private Thread[] m_workerThreads;
55 private Thread m_retrysThread;
54 56
55 private bool m_running = true; 57 private bool m_running = true;
58 private int slowCount = 0;
56 59
57 private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); 60 private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2);
58 61
59// private int m_timeout = 1000; // increase timeout 250; now use the event one
60
61 public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) 62 public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout)
62 { 63 {
63 m_server = pSrv; 64 m_server = pSrv;
@@ -81,9 +82,9 @@ namespace OpenSim.Framework.Servers.HttpServer
81 int.MaxValue); 82 int.MaxValue);
82 } 83 }
83 84
84 Watchdog.StartThread( 85 m_retrysThread = Watchdog.StartThread(
85 this.CheckLongPollThreads, 86 this.CheckRetries,
86 string.Format("LongPollServiceWatcherThread:{0}", m_server.Port), 87 string.Format("PollServiceWatcherThread:{0}", m_server.Port),
87 ThreadPriority.Normal, 88 ThreadPriority.Normal,
88 false, 89 false,
89 true, 90 true,
@@ -95,15 +96,8 @@ namespace OpenSim.Framework.Servers.HttpServer
95 { 96 {
96 if (m_running) 97 if (m_running)
97 { 98 {
98 // delay the enqueueing for 100ms. There's no need to have the event 99 lock (m_retryRequests)
99 // actively on the queue 100 m_retryRequests.Enqueue(req);
100 Timer t = new Timer(self => {
101 ((Timer)self).Dispose();
102 m_requests.Enqueue(req);
103 });
104
105 t.Change(100, Timeout.Infinite);
106
107 } 101 }
108 } 102 }
109 103
@@ -111,47 +105,39 @@ namespace OpenSim.Framework.Servers.HttpServer
111 { 105 {
112 if (m_running) 106 if (m_running)
113 { 107 {
114 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) 108 if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.LongPoll)
115 { 109 {
116 lock (m_longPollRequests) 110 m_requests.Enqueue(req);
117 m_longPollRequests.Add(req);
118 } 111 }
119 else 112 else
120 m_requests.Enqueue(req); 113 {
114 lock (m_slowRequests)
115 m_slowRequests.Enqueue(req);
116 }
121 } 117 }
122 } 118 }
123 119
124 private void CheckLongPollThreads() 120 private void CheckRetries()
125 { 121 {
126 // The only purpose of this thread is to check the EQs for events.
127 // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests.
128 // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests.
129 // All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature,
130 // so if they aren't ready to be served by a worker thread (no events), they are placed
131 // directly back in the "ready-to-serve" queue by the worker thread.
132 while (m_running) 122 while (m_running)
133 { 123 {
134 Thread.Sleep(500); 124 Thread.Sleep(100); // let the world move .. back to faster rate
135 Watchdog.UpdateThread(); 125 Watchdog.UpdateThread();
136 126 lock (m_retryRequests)
137// List<PollServiceHttpRequest> not_ready = new List<PollServiceHttpRequest>();
138 lock (m_longPollRequests)
139 { 127 {
140 if (m_longPollRequests.Count > 0 && m_running) 128 while (m_retryRequests.Count > 0 && m_running)
141 { 129 m_requests.Enqueue(m_retryRequests.Dequeue());
142 List<PollServiceHttpRequest> ready = m_longPollRequests.FindAll(req => 130 }
143 (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ 131 slowCount++;
144 (Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout 132 if (slowCount >= 10)
145 ); 133 {
146 134 slowCount = 0;
147 ready.ForEach(req =>
148 {
149 m_requests.Enqueue(req);
150 m_longPollRequests.Remove(req);
151 });
152 135
136 lock (m_slowRequests)
137 {
138 while (m_slowRequests.Count > 0 && m_running)
139 m_requests.Enqueue(m_slowRequests.Dequeue());
153 } 140 }
154
155 } 141 }
156 } 142 }
157 } 143 }
@@ -159,18 +145,30 @@ namespace OpenSim.Framework.Servers.HttpServer
159 public void Stop() 145 public void Stop()
160 { 146 {
161 m_running = false; 147 m_running = false;
162// m_timeout = -10000; // cause all to expire
163 Thread.Sleep(1000); // let the world move 148 Thread.Sleep(1000); // let the world move
164 149
165 foreach (Thread t in m_workerThreads) 150 foreach (Thread t in m_workerThreads)
166 Watchdog.AbortThread(t.ManagedThreadId); 151 Watchdog.AbortThread(t.ManagedThreadId);
167 152
153 try
154 {
155 foreach (PollServiceHttpRequest req in m_retryRequests)
156 {
157 DoHTTPGruntWork(m_server,req,
158 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
159 }
160 }
161 catch
162 {
163 }
164
168 PollServiceHttpRequest wreq; 165 PollServiceHttpRequest wreq;
166 m_retryRequests.Clear();
169 167
170 lock (m_longPollRequests) 168 lock (m_slowRequests)
171 { 169 {
172 if (m_longPollRequests.Count > 0 && m_running) 170 while (m_slowRequests.Count > 0 && m_running)
173 m_longPollRequests.ForEach(req => m_requests.Enqueue(req)); 171 m_requests.Enqueue(m_slowRequests.Dequeue());
174 } 172 }
175 173
176 while (m_requests.Count() > 0) 174 while (m_requests.Count() > 0)
@@ -178,15 +176,14 @@ namespace OpenSim.Framework.Servers.HttpServer
178 try 176 try
179 { 177 {
180 wreq = m_requests.Dequeue(0); 178 wreq = m_requests.Dequeue(0);
181 wreq.DoHTTPGruntWork( 179 DoHTTPGruntWork(m_server,wreq,
182 m_server, wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id)); 180 wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
183 } 181 }
184 catch 182 catch
185 { 183 {
186 } 184 }
187 } 185 }
188 186
189 m_longPollRequests.Clear();
190 m_requests.Clear(); 187 m_requests.Clear();
191 } 188 }
192 189
@@ -197,7 +194,6 @@ namespace OpenSim.Framework.Servers.HttpServer
197 while (m_running) 194 while (m_running)
198 { 195 {
199 PollServiceHttpRequest req = m_requests.Dequeue(5000); 196 PollServiceHttpRequest req = m_requests.Dequeue(5000);
200 //m_log.WarnFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString()));
201 197
202 Watchdog.UpdateThread(); 198 Watchdog.UpdateThread();
203 if (req != null) 199 if (req != null)
@@ -215,7 +211,7 @@ namespace OpenSim.Framework.Servers.HttpServer
215 { 211 {
216 try 212 try
217 { 213 {
218 req.DoHTTPGruntWork(m_server, responsedata); 214 DoHTTPGruntWork(m_server, req, responsedata);
219 } 215 }
220 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream 216 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
221 { 217 {
@@ -228,7 +224,7 @@ namespace OpenSim.Framework.Servers.HttpServer
228 { 224 {
229 try 225 try
230 { 226 {
231 req.DoHTTPGruntWork(m_server, responsedata); 227 DoHTTPGruntWork(m_server, req, responsedata);
232 } 228 }
233 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream 229 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
234 { 230 {
@@ -243,8 +239,8 @@ namespace OpenSim.Framework.Servers.HttpServer
243 { 239 {
244 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) 240 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
245 { 241 {
246 req.DoHTTPGruntWork( 242 DoHTTPGruntWork(m_server, req,
247 m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); 243 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
248 } 244 }
249 else 245 else
250 { 246 {
@@ -259,5 +255,42 @@ namespace OpenSim.Framework.Servers.HttpServer
259 } 255 }
260 } 256 }
261 } 257 }
258
259 // DoHTTPGruntWork changed, not sending response
260 // do the same work around as core
261
262 internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata)
263 {
264 OSHttpResponse response
265 = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext);
266
267 byte[] buffer = server.DoHTTPGruntWork(responsedata, response);
268
269 response.SendChunked = false;
270 response.ContentLength64 = buffer.Length;
271 response.ContentEncoding = Encoding.UTF8;
272
273 try
274 {
275 response.OutputStream.Write(buffer, 0, buffer.Length);
276 }
277 catch (Exception ex)
278 {
279 m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex));
280 }
281 finally
282 {
283 try
284 {
285 response.OutputStream.Flush();
286 response.Send();
287 }
288 catch (Exception e)
289 {
290 m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e));
291 }
292 }
293 }
262 } 294 }
263} \ No newline at end of file 295}
296