diff options
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r-- | OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | 145 |
1 files changed, 89 insertions, 56 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs index 6aa5907..44f7045 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | |||
@@ -47,17 +47,18 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
47 | private readonly BaseHttpServer m_server; | 47 | private readonly BaseHttpServer m_server; |
48 | 48 | ||
49 | private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); | 49 | private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); |
50 | private static List<PollServiceHttpRequest> m_longPollRequests = new List<PollServiceHttpRequest>(); | 50 | private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>(); |
51 | private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>(); | ||
51 | 52 | ||
52 | private uint m_WorkerThreadCount = 0; | 53 | private uint m_WorkerThreadCount = 0; |
53 | private Thread[] m_workerThreads; | 54 | private Thread[] m_workerThreads; |
55 | private Thread m_retrysThread; | ||
54 | 56 | ||
55 | private bool m_running = true; | 57 | private bool m_running = true; |
58 | private int slowCount = 0; | ||
56 | 59 | ||
57 | private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); | 60 | private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); |
58 | 61 | ||
59 | // private int m_timeout = 1000; // increase timeout 250; now use the event one | ||
60 | |||
61 | public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) | 62 | public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) |
62 | { | 63 | { |
63 | m_server = pSrv; | 64 | m_server = pSrv; |
@@ -81,9 +82,9 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
81 | int.MaxValue); | 82 | int.MaxValue); |
82 | } | 83 | } |
83 | 84 | ||
84 | Watchdog.StartThread( | 85 | m_retrysThread = Watchdog.StartThread( |
85 | this.CheckLongPollThreads, | 86 | this.CheckRetries, |
86 | string.Format("LongPollServiceWatcherThread:{0}", m_server.Port), | 87 | string.Format("PollServiceWatcherThread:{0}", m_server.Port), |
87 | ThreadPriority.Normal, | 88 | ThreadPriority.Normal, |
88 | false, | 89 | false, |
89 | true, | 90 | true, |
@@ -95,15 +96,8 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
95 | { | 96 | { |
96 | if (m_running) | 97 | if (m_running) |
97 | { | 98 | { |
98 | // delay the enqueueing for 100ms. There's no need to have the event | 99 | lock (m_retryRequests) |
99 | // actively on the queue | 100 | m_retryRequests.Enqueue(req); |
100 | Timer t = new Timer(self => { | ||
101 | ((Timer)self).Dispose(); | ||
102 | m_requests.Enqueue(req); | ||
103 | }); | ||
104 | |||
105 | t.Change(100, Timeout.Infinite); | ||
106 | |||
107 | } | 101 | } |
108 | } | 102 | } |
109 | 103 | ||
@@ -111,47 +105,39 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
111 | { | 105 | { |
112 | if (m_running) | 106 | if (m_running) |
113 | { | 107 | { |
114 | if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) | 108 | if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.LongPoll) |
115 | { | 109 | { |
116 | lock (m_longPollRequests) | 110 | m_requests.Enqueue(req); |
117 | m_longPollRequests.Add(req); | ||
118 | } | 111 | } |
119 | else | 112 | else |
120 | m_requests.Enqueue(req); | 113 | { |
114 | lock (m_slowRequests) | ||
115 | m_slowRequests.Enqueue(req); | ||
116 | } | ||
121 | } | 117 | } |
122 | } | 118 | } |
123 | 119 | ||
124 | private void CheckLongPollThreads() | 120 | private void CheckRetries() |
125 | { | 121 | { |
126 | // The only purpose of this thread is to check the EQs for events. | ||
127 | // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests. | ||
128 | // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests. | ||
129 | // All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature, | ||
130 | // so if they aren't ready to be served by a worker thread (no events), they are placed | ||
131 | // directly back in the "ready-to-serve" queue by the worker thread. | ||
132 | while (m_running) | 122 | while (m_running) |
133 | { | 123 | { |
134 | Thread.Sleep(500); | 124 | Thread.Sleep(100); // let the world move .. back to faster rate |
135 | Watchdog.UpdateThread(); | 125 | Watchdog.UpdateThread(); |
136 | 126 | lock (m_retryRequests) | |
137 | // List<PollServiceHttpRequest> not_ready = new List<PollServiceHttpRequest>(); | ||
138 | lock (m_longPollRequests) | ||
139 | { | 127 | { |
140 | if (m_longPollRequests.Count > 0 && m_running) | 128 | while (m_retryRequests.Count > 0 && m_running) |
141 | { | 129 | m_requests.Enqueue(m_retryRequests.Dequeue()); |
142 | List<PollServiceHttpRequest> ready = m_longPollRequests.FindAll(req => | 130 | } |
143 | (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ | 131 | slowCount++; |
144 | (Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout | 132 | if (slowCount >= 10) |
145 | ); | 133 | { |
146 | 134 | slowCount = 0; | |
147 | ready.ForEach(req => | ||
148 | { | ||
149 | m_requests.Enqueue(req); | ||
150 | m_longPollRequests.Remove(req); | ||
151 | }); | ||
152 | 135 | ||
136 | lock (m_slowRequests) | ||
137 | { | ||
138 | while (m_slowRequests.Count > 0 && m_running) | ||
139 | m_requests.Enqueue(m_slowRequests.Dequeue()); | ||
153 | } | 140 | } |
154 | |||
155 | } | 141 | } |
156 | } | 142 | } |
157 | } | 143 | } |
@@ -159,18 +145,30 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
159 | public void Stop() | 145 | public void Stop() |
160 | { | 146 | { |
161 | m_running = false; | 147 | m_running = false; |
162 | // m_timeout = -10000; // cause all to expire | ||
163 | Thread.Sleep(1000); // let the world move | 148 | Thread.Sleep(1000); // let the world move |
164 | 149 | ||
165 | foreach (Thread t in m_workerThreads) | 150 | foreach (Thread t in m_workerThreads) |
166 | Watchdog.AbortThread(t.ManagedThreadId); | 151 | Watchdog.AbortThread(t.ManagedThreadId); |
167 | 152 | ||
153 | try | ||
154 | { | ||
155 | foreach (PollServiceHttpRequest req in m_retryRequests) | ||
156 | { | ||
157 | DoHTTPGruntWork(m_server,req, | ||
158 | req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
159 | } | ||
160 | } | ||
161 | catch | ||
162 | { | ||
163 | } | ||
164 | |||
168 | PollServiceHttpRequest wreq; | 165 | PollServiceHttpRequest wreq; |
166 | m_retryRequests.Clear(); | ||
169 | 167 | ||
170 | lock (m_longPollRequests) | 168 | lock (m_slowRequests) |
171 | { | 169 | { |
172 | if (m_longPollRequests.Count > 0 && m_running) | 170 | while (m_slowRequests.Count > 0 && m_running) |
173 | m_longPollRequests.ForEach(req => m_requests.Enqueue(req)); | 171 | m_requests.Enqueue(m_slowRequests.Dequeue()); |
174 | } | 172 | } |
175 | 173 | ||
176 | while (m_requests.Count() > 0) | 174 | while (m_requests.Count() > 0) |
@@ -178,15 +176,14 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
178 | try | 176 | try |
179 | { | 177 | { |
180 | wreq = m_requests.Dequeue(0); | 178 | wreq = m_requests.Dequeue(0); |
181 | wreq.DoHTTPGruntWork( | 179 | DoHTTPGruntWork(m_server,wreq, |
182 | m_server, wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id)); | 180 | wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id)); |
183 | } | 181 | } |
184 | catch | 182 | catch |
185 | { | 183 | { |
186 | } | 184 | } |
187 | } | 185 | } |
188 | 186 | ||
189 | m_longPollRequests.Clear(); | ||
190 | m_requests.Clear(); | 187 | m_requests.Clear(); |
191 | } | 188 | } |
192 | 189 | ||
@@ -197,7 +194,6 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
197 | while (m_running) | 194 | while (m_running) |
198 | { | 195 | { |
199 | PollServiceHttpRequest req = m_requests.Dequeue(5000); | 196 | PollServiceHttpRequest req = m_requests.Dequeue(5000); |
200 | //m_log.WarnFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString())); | ||
201 | 197 | ||
202 | Watchdog.UpdateThread(); | 198 | Watchdog.UpdateThread(); |
203 | if (req != null) | 199 | if (req != null) |
@@ -215,7 +211,7 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
215 | { | 211 | { |
216 | try | 212 | try |
217 | { | 213 | { |
218 | req.DoHTTPGruntWork(m_server, responsedata); | 214 | DoHTTPGruntWork(m_server, req, responsedata); |
219 | } | 215 | } |
220 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream | 216 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream |
221 | { | 217 | { |
@@ -228,7 +224,7 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
228 | { | 224 | { |
229 | try | 225 | try |
230 | { | 226 | { |
231 | req.DoHTTPGruntWork(m_server, responsedata); | 227 | DoHTTPGruntWork(m_server, req, responsedata); |
232 | } | 228 | } |
233 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream | 229 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream |
234 | { | 230 | { |
@@ -243,8 +239,8 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
243 | { | 239 | { |
244 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) | 240 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) |
245 | { | 241 | { |
246 | req.DoHTTPGruntWork( | 242 | DoHTTPGruntWork(m_server, req, |
247 | m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | 243 | req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); |
248 | } | 244 | } |
249 | else | 245 | else |
250 | { | 246 | { |
@@ -259,5 +255,42 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
259 | } | 255 | } |
260 | } | 256 | } |
261 | } | 257 | } |
258 | |||
259 | // DoHTTPGruntWork changed, not sending response | ||
260 | // do the same work around as core | ||
261 | |||
262 | internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata) | ||
263 | { | ||
264 | OSHttpResponse response | ||
265 | = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext); | ||
266 | |||
267 | byte[] buffer = server.DoHTTPGruntWork(responsedata, response); | ||
268 | |||
269 | response.SendChunked = false; | ||
270 | response.ContentLength64 = buffer.Length; | ||
271 | response.ContentEncoding = Encoding.UTF8; | ||
272 | |||
273 | try | ||
274 | { | ||
275 | response.OutputStream.Write(buffer, 0, buffer.Length); | ||
276 | } | ||
277 | catch (Exception ex) | ||
278 | { | ||
279 | m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex)); | ||
280 | } | ||
281 | finally | ||
282 | { | ||
283 | try | ||
284 | { | ||
285 | response.OutputStream.Flush(); | ||
286 | response.Send(); | ||
287 | } | ||
288 | catch (Exception e) | ||
289 | { | ||
290 | m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e)); | ||
291 | } | ||
292 | } | ||
293 | } | ||
262 | } | 294 | } |
263 | } \ No newline at end of file | 295 | } |
296 | |||