diff options
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r-- | OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | 145 |
1 files changed, 88 insertions, 57 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs index d83daab..44f7045 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | |||
@@ -47,19 +47,18 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
47 | private readonly BaseHttpServer m_server; | 47 | private readonly BaseHttpServer m_server; |
48 | 48 | ||
49 | private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); | 49 | private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); |
50 | private static List<PollServiceHttpRequest> m_longPollRequests = new List<PollServiceHttpRequest>(); | 50 | private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>(); |
51 | private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>(); | ||
51 | 52 | ||
52 | private uint m_WorkerThreadCount = 0; | 53 | private uint m_WorkerThreadCount = 0; |
53 | private Thread[] m_workerThreads; | 54 | private Thread[] m_workerThreads; |
54 | private Thread m_longPollThread; | 55 | private Thread m_retrysThread; |
55 | 56 | ||
56 | private bool m_running = true; | 57 | private bool m_running = true; |
57 | private int slowCount = 0; | 58 | private int slowCount = 0; |
58 | 59 | ||
59 | private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); | 60 | private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); |
60 | 61 | ||
61 | // private int m_timeout = 1000; // increase timeout 250; now use the event one | ||
62 | |||
63 | public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) | 62 | public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) |
64 | { | 63 | { |
65 | m_server = pSrv; | 64 | m_server = pSrv; |
@@ -83,9 +82,9 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
83 | int.MaxValue); | 82 | int.MaxValue); |
84 | } | 83 | } |
85 | 84 | ||
86 | m_longPollThread = Watchdog.StartThread( | 85 | m_retrysThread = Watchdog.StartThread( |
87 | this.CheckLongPollThreads, | 86 | this.CheckRetries, |
88 | string.Format("LongPollServiceWatcherThread:{0}", m_server.Port), | 87 | string.Format("PollServiceWatcherThread:{0}", m_server.Port), |
89 | ThreadPriority.Normal, | 88 | ThreadPriority.Normal, |
90 | false, | 89 | false, |
91 | true, | 90 | true, |
@@ -97,15 +96,8 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
97 | { | 96 | { |
98 | if (m_running) | 97 | if (m_running) |
99 | { | 98 | { |
100 | // delay the enqueueing for 100ms. There's no need to have the event | 99 | lock (m_retryRequests) |
101 | // actively on the queue | 100 | m_retryRequests.Enqueue(req); |
102 | Timer t = new Timer(self => { | ||
103 | ((Timer)self).Dispose(); | ||
104 | m_requests.Enqueue(req); | ||
105 | }); | ||
106 | |||
107 | t.Change(100, Timeout.Infinite); | ||
108 | |||
109 | } | 101 | } |
110 | } | 102 | } |
111 | 103 | ||
@@ -113,47 +105,39 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
113 | { | 105 | { |
114 | if (m_running) | 106 | if (m_running) |
115 | { | 107 | { |
116 | if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) | 108 | if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.LongPoll) |
117 | { | 109 | { |
118 | lock (m_longPollRequests) | 110 | m_requests.Enqueue(req); |
119 | m_longPollRequests.Add(req); | ||
120 | } | 111 | } |
121 | else | 112 | else |
122 | m_requests.Enqueue(req); | 113 | { |
114 | lock (m_slowRequests) | ||
115 | m_slowRequests.Enqueue(req); | ||
116 | } | ||
123 | } | 117 | } |
124 | } | 118 | } |
125 | 119 | ||
126 | private void CheckLongPollThreads() | 120 | private void CheckRetries() |
127 | { | 121 | { |
128 | // The only purpose of this thread is to check the EQs for events. | ||
129 | // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests. | ||
130 | // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests. | ||
131 | // All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature, | ||
132 | // so if they aren't ready to be served by a worker thread (no events), they are placed | ||
133 | // directly back in the "ready-to-serve" queue by the worker thread. | ||
134 | while (m_running) | 122 | while (m_running) |
135 | { | 123 | { |
136 | Thread.Sleep(1000); | 124 | Thread.Sleep(100); // let the world move .. back to faster rate |
137 | Watchdog.UpdateThread(); | 125 | Watchdog.UpdateThread(); |
138 | 126 | lock (m_retryRequests) | |
139 | List<PollServiceHttpRequest> not_ready = new List<PollServiceHttpRequest>(); | ||
140 | lock (m_longPollRequests) | ||
141 | { | 127 | { |
142 | if (m_longPollRequests.Count > 0 && m_running) | 128 | while (m_retryRequests.Count > 0 && m_running) |
143 | { | 129 | m_requests.Enqueue(m_retryRequests.Dequeue()); |
144 | List<PollServiceHttpRequest> ready = m_longPollRequests.FindAll(req => | 130 | } |
145 | (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ | 131 | slowCount++; |
146 | (Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout | 132 | if (slowCount >= 10) |
147 | ); | 133 | { |
148 | 134 | slowCount = 0; | |
149 | ready.ForEach(req => | ||
150 | { | ||
151 | m_requests.Enqueue(req); | ||
152 | m_longPollRequests.Remove(req); | ||
153 | }); | ||
154 | 135 | ||
136 | lock (m_slowRequests) | ||
137 | { | ||
138 | while (m_slowRequests.Count > 0 && m_running) | ||
139 | m_requests.Enqueue(m_slowRequests.Dequeue()); | ||
155 | } | 140 | } |
156 | |||
157 | } | 141 | } |
158 | } | 142 | } |
159 | } | 143 | } |
@@ -161,18 +145,30 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
161 | public void Stop() | 145 | public void Stop() |
162 | { | 146 | { |
163 | m_running = false; | 147 | m_running = false; |
164 | // m_timeout = -10000; // cause all to expire | ||
165 | Thread.Sleep(1000); // let the world move | 148 | Thread.Sleep(1000); // let the world move |
166 | 149 | ||
167 | foreach (Thread t in m_workerThreads) | 150 | foreach (Thread t in m_workerThreads) |
168 | Watchdog.AbortThread(t.ManagedThreadId); | 151 | Watchdog.AbortThread(t.ManagedThreadId); |
169 | 152 | ||
153 | try | ||
154 | { | ||
155 | foreach (PollServiceHttpRequest req in m_retryRequests) | ||
156 | { | ||
157 | DoHTTPGruntWork(m_server,req, | ||
158 | req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
159 | } | ||
160 | } | ||
161 | catch | ||
162 | { | ||
163 | } | ||
164 | |||
170 | PollServiceHttpRequest wreq; | 165 | PollServiceHttpRequest wreq; |
166 | m_retryRequests.Clear(); | ||
171 | 167 | ||
172 | lock (m_longPollRequests) | 168 | lock (m_slowRequests) |
173 | { | 169 | { |
174 | if (m_longPollRequests.Count > 0 && m_running) | 170 | while (m_slowRequests.Count > 0 && m_running) |
175 | m_longPollRequests.ForEach(req => m_requests.Enqueue(req)); | 171 | m_requests.Enqueue(m_slowRequests.Dequeue()); |
176 | } | 172 | } |
177 | 173 | ||
178 | while (m_requests.Count() > 0) | 174 | while (m_requests.Count() > 0) |
@@ -180,15 +176,14 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
180 | try | 176 | try |
181 | { | 177 | { |
182 | wreq = m_requests.Dequeue(0); | 178 | wreq = m_requests.Dequeue(0); |
183 | wreq.DoHTTPGruntWork( | 179 | DoHTTPGruntWork(m_server,wreq, |
184 | m_server, wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id)); | 180 | wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id)); |
185 | } | 181 | } |
186 | catch | 182 | catch |
187 | { | 183 | { |
188 | } | 184 | } |
189 | } | 185 | } |
190 | 186 | ||
191 | m_longPollRequests.Clear(); | ||
192 | m_requests.Clear(); | 187 | m_requests.Clear(); |
193 | } | 188 | } |
194 | 189 | ||
@@ -199,7 +194,6 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
199 | while (m_running) | 194 | while (m_running) |
200 | { | 195 | { |
201 | PollServiceHttpRequest req = m_requests.Dequeue(5000); | 196 | PollServiceHttpRequest req = m_requests.Dequeue(5000); |
202 | //m_log.WarnFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString())); | ||
203 | 197 | ||
204 | Watchdog.UpdateThread(); | 198 | Watchdog.UpdateThread(); |
205 | if (req != null) | 199 | if (req != null) |
@@ -217,7 +211,7 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
217 | { | 211 | { |
218 | try | 212 | try |
219 | { | 213 | { |
220 | req.DoHTTPGruntWork(m_server, responsedata); | 214 | DoHTTPGruntWork(m_server, req, responsedata); |
221 | } | 215 | } |
222 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream | 216 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream |
223 | { | 217 | { |
@@ -230,7 +224,7 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
230 | { | 224 | { |
231 | try | 225 | try |
232 | { | 226 | { |
233 | req.DoHTTPGruntWork(m_server, responsedata); | 227 | DoHTTPGruntWork(m_server, req, responsedata); |
234 | } | 228 | } |
235 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream | 229 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream |
236 | { | 230 | { |
@@ -245,8 +239,8 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
245 | { | 239 | { |
246 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) | 240 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) |
247 | { | 241 | { |
248 | req.DoHTTPGruntWork( | 242 | DoHTTPGruntWork(m_server, req, |
249 | m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | 243 | req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); |
250 | } | 244 | } |
251 | else | 245 | else |
252 | { | 246 | { |
@@ -261,5 +255,42 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
261 | } | 255 | } |
262 | } | 256 | } |
263 | } | 257 | } |
258 | |||
259 | // DoHTTPGruntWork changed, not sending response | ||
260 | // do the same work around as core | ||
261 | |||
262 | internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata) | ||
263 | { | ||
264 | OSHttpResponse response | ||
265 | = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext); | ||
266 | |||
267 | byte[] buffer = server.DoHTTPGruntWork(responsedata, response); | ||
268 | |||
269 | response.SendChunked = false; | ||
270 | response.ContentLength64 = buffer.Length; | ||
271 | response.ContentEncoding = Encoding.UTF8; | ||
272 | |||
273 | try | ||
274 | { | ||
275 | response.OutputStream.Write(buffer, 0, buffer.Length); | ||
276 | } | ||
277 | catch (Exception ex) | ||
278 | { | ||
279 | m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex)); | ||
280 | } | ||
281 | finally | ||
282 | { | ||
283 | try | ||
284 | { | ||
285 | response.OutputStream.Flush(); | ||
286 | response.Send(); | ||
287 | } | ||
288 | catch (Exception e) | ||
289 | { | ||
290 | m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e)); | ||
291 | } | ||
292 | } | ||
293 | } | ||
264 | } | 294 | } |
265 | } \ No newline at end of file | 295 | } |
296 | |||