diff options
4 files changed, 89 insertions, 95 deletions
diff --git a/OpenSim/Framework/BlockingQueue.cs b/OpenSim/Framework/BlockingQueue.cs index 3e90fac..aef1192 100644 --- a/OpenSim/Framework/BlockingQueue.cs +++ b/OpenSim/Framework/BlockingQueue.cs | |||
@@ -76,10 +76,9 @@ namespace OpenSim.Framework | |||
76 | { | 76 | { |
77 | lock (m_queueSync) | 77 | lock (m_queueSync) |
78 | { | 78 | { |
79 | bool success = true; | 79 | if (m_queue.Count < 1 && m_pqueue.Count < 1) |
80 | while (m_queue.Count < 1 && m_pqueue.Count < 1 && success) | ||
81 | { | 80 | { |
82 | success = Monitor.Wait(m_queueSync, msTimeout); | 81 | Monitor.Wait(m_queueSync, msTimeout); |
83 | } | 82 | } |
84 | 83 | ||
85 | if (m_pqueue.Count > 0) | 84 | if (m_pqueue.Count > 0) |
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceEventArgs.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceEventArgs.cs index 9477100..020bfd5 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceEventArgs.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceEventArgs.cs | |||
@@ -50,7 +50,7 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
50 | 50 | ||
51 | public enum EventType : int | 51 | public enum EventType : int |
52 | { | 52 | { |
53 | LongPoll = 0, | 53 | Normal = 0, |
54 | LslHttp = 1, | 54 | LslHttp = 1, |
55 | Inventory = 2 | 55 | Inventory = 2 |
56 | } | 56 | } |
@@ -80,7 +80,7 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
80 | NoEvents = pNoEvents; | 80 | NoEvents = pNoEvents; |
81 | Id = pId; | 81 | Id = pId; |
82 | TimeOutms = pTimeOutms; | 82 | TimeOutms = pTimeOutms; |
83 | Type = EventType.LongPoll; | 83 | Type = EventType.Normal; |
84 | } | 84 | } |
85 | } | 85 | } |
86 | } | 86 | } |
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs index b8f5743..1b9010a 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | |||
@@ -47,11 +47,12 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
47 | private readonly BaseHttpServer m_server; | 47 | private readonly BaseHttpServer m_server; |
48 | 48 | ||
49 | private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); | 49 | private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); |
50 | private static Queue<PollServiceHttpRequest> m_longPollRequests = new Queue<PollServiceHttpRequest>(); | 50 | private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>(); |
51 | private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>(); | ||
51 | 52 | ||
52 | private uint m_WorkerThreadCount = 0; | 53 | private uint m_WorkerThreadCount = 0; |
53 | private Thread[] m_workerThreads; | 54 | private Thread[] m_workerThreads; |
54 | private Thread m_longPollThread; | 55 | private Thread m_retrysThread; |
55 | 56 | ||
56 | private bool m_running = true; | 57 | private bool m_running = true; |
57 | private int slowCount = 0; | 58 | private int slowCount = 0; |
@@ -83,9 +84,9 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
83 | int.MaxValue); | 84 | int.MaxValue); |
84 | } | 85 | } |
85 | 86 | ||
86 | m_longPollThread = Watchdog.StartThread( | 87 | m_retrysThread = Watchdog.StartThread( |
87 | this.CheckLongPollThreads, | 88 | this.CheckRetries, |
88 | string.Format("LongPollServiceWatcherThread:{0}", m_server.Port), | 89 | string.Format("PollServiceWatcherThread:{0}", m_server.Port), |
89 | ThreadPriority.Normal, | 90 | ThreadPriority.Normal, |
90 | false, | 91 | false, |
91 | true, | 92 | true, |
@@ -96,52 +97,49 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
96 | private void ReQueueEvent(PollServiceHttpRequest req) | 97 | private void ReQueueEvent(PollServiceHttpRequest req) |
97 | { | 98 | { |
98 | if (m_running) | 99 | if (m_running) |
99 | m_requests.Enqueue(req); | 100 | { |
101 | lock (m_retryRequests) | ||
102 | m_retryRequests.Enqueue(req); | ||
103 | } | ||
100 | } | 104 | } |
101 | 105 | ||
102 | public void Enqueue(PollServiceHttpRequest req) | 106 | public void Enqueue(PollServiceHttpRequest req) |
103 | { | 107 | { |
104 | if (m_running) | 108 | if (m_running) |
105 | { | 109 | { |
106 | if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) | 110 | if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.Normal) |
107 | { | 111 | { |
108 | lock (m_longPollRequests) | 112 | m_requests.Enqueue(req); |
109 | m_longPollRequests.Enqueue(req); | ||
110 | } | 113 | } |
111 | else | 114 | else |
112 | m_requests.Enqueue(req); | 115 | { |
116 | lock (m_slowRequests) | ||
117 | m_slowRequests.Enqueue(req); | ||
118 | } | ||
113 | } | 119 | } |
114 | } | 120 | } |
115 | 121 | ||
116 | private void CheckLongPollThreads() | 122 | private void CheckRetries() |
117 | { | 123 | { |
118 | // The only purpose of this thread is to check the EQs for events. | ||
119 | // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests. | ||
120 | // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests. | ||
121 | // All other types of tasks (Inventory handlers) don't have the long-poll nature, | ||
122 | // so if they aren't ready to be served by a worker thread (no events), they are placed | ||
123 | // directly back in the "ready-to-serve" queue by the worker thread. | ||
124 | while (m_running) | 124 | while (m_running) |
125 | { | 125 | { |
126 | Thread.Sleep(1000); | 126 | Thread.Sleep(100); // let the world move .. back to faster rate |
127 | Watchdog.UpdateThread(); | 127 | Watchdog.UpdateThread(); |
128 | 128 | lock (m_retryRequests) | |
129 | List<PollServiceHttpRequest> not_ready = new List<PollServiceHttpRequest>(); | ||
130 | lock (m_longPollRequests) | ||
131 | { | 129 | { |
132 | while (m_longPollRequests.Count > 0 && m_running) | 130 | while (m_retryRequests.Count > 0 && m_running) |
131 | m_requests.Enqueue(m_retryRequests.Dequeue()); | ||
132 | } | ||
133 | slowCount++; | ||
134 | if (slowCount >= 10) | ||
135 | { | ||
136 | slowCount = 0; | ||
137 | |||
138 | lock (m_slowRequests) | ||
133 | { | 139 | { |
134 | PollServiceHttpRequest req = m_longPollRequests.Dequeue(); | 140 | while (m_slowRequests.Count > 0 && m_running) |
135 | if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ | 141 | m_requests.Enqueue(m_slowRequests.Dequeue()); |
136 | (Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout | ||
137 | m_requests.Enqueue(req); | ||
138 | else | ||
139 | not_ready.Add(req); | ||
140 | } | 142 | } |
141 | |||
142 | foreach (PollServiceHttpRequest req in not_ready) | ||
143 | m_longPollRequests.Enqueue(req); | ||
144 | |||
145 | } | 143 | } |
146 | } | 144 | } |
147 | } | 145 | } |
@@ -155,12 +153,24 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
155 | foreach (Thread t in m_workerThreads) | 153 | foreach (Thread t in m_workerThreads) |
156 | Watchdog.AbortThread(t.ManagedThreadId); | 154 | Watchdog.AbortThread(t.ManagedThreadId); |
157 | 155 | ||
156 | try | ||
157 | { | ||
158 | foreach (PollServiceHttpRequest req in m_retryRequests) | ||
159 | { | ||
160 | req.DoHTTPGruntWork(m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
161 | } | ||
162 | } | ||
163 | catch | ||
164 | { | ||
165 | } | ||
166 | |||
158 | PollServiceHttpRequest wreq; | 167 | PollServiceHttpRequest wreq; |
168 | m_retryRequests.Clear(); | ||
159 | 169 | ||
160 | lock (m_longPollRequests) | 170 | lock (m_slowRequests) |
161 | { | 171 | { |
162 | while (m_longPollRequests.Count > 0 && m_running) | 172 | while (m_slowRequests.Count > 0 && m_running) |
163 | m_requests.Enqueue(m_longPollRequests.Dequeue()); | 173 | m_requests.Enqueue(m_slowRequests.Dequeue()); |
164 | } | 174 | } |
165 | 175 | ||
166 | while (m_requests.Count() > 0) | 176 | while (m_requests.Count() > 0) |
@@ -185,33 +195,34 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
185 | { | 195 | { |
186 | while (m_running) | 196 | while (m_running) |
187 | { | 197 | { |
188 | Watchdog.UpdateThread(); | 198 | PollServiceHttpRequest req = m_requests.Dequeue(5000); |
189 | 199 | ||
190 | PollServiceHttpRequest req = null; | 200 | Watchdog.UpdateThread(); |
191 | lock (m_requests) | 201 | if (req != null) |
192 | { | ||
193 | if (m_requests.Count() > 0) | ||
194 | req = m_requests.Dequeue(); | ||
195 | } | ||
196 | if (req == null) | ||
197 | Thread.Sleep(100); | ||
198 | else | ||
199 | { | 202 | { |
200 | //PollServiceHttpRequest req = m_requests.Dequeue(5000); | 203 | try |
201 | //m_log.WarnFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString())); | ||
202 | |||
203 | if (req != null) | ||
204 | { | 204 | { |
205 | try | 205 | if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) |
206 | { | 206 | { |
207 | if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) | 207 | Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); |
208 | { | ||
209 | Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); | ||
210 | 208 | ||
211 | if (responsedata == null) | 209 | if (responsedata == null) |
212 | continue; | 210 | continue; |
213 | 211 | ||
214 | if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) // This is the event queue | 212 | if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.Normal) // This is the event queue |
213 | { | ||
214 | try | ||
215 | { | ||
216 | req.DoHTTPGruntWork(m_server, responsedata); | ||
217 | } | ||
218 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream | ||
219 | { | ||
220 | // Ignore it, no need to reply | ||
221 | } | ||
222 | } | ||
223 | else | ||
224 | { | ||
225 | m_threadPool.QueueWorkItem(x => | ||
215 | { | 226 | { |
216 | try | 227 | try |
217 | { | 228 | { |
@@ -221,41 +232,27 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
221 | { | 232 | { |
222 | // Ignore it, no need to reply | 233 | // Ignore it, no need to reply |
223 | } | 234 | } |
224 | } | ||
225 | else | ||
226 | { | ||
227 | m_threadPool.QueueWorkItem(x => | ||
228 | { | ||
229 | try | ||
230 | { | ||
231 | req.DoHTTPGruntWork(m_server, responsedata); | ||
232 | } | ||
233 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream | ||
234 | { | ||
235 | // Ignore it, no need to reply | ||
236 | } | ||
237 | 235 | ||
238 | return null; | 236 | return null; |
239 | }, null); | 237 | }, null); |
240 | } | 238 | } |
239 | } | ||
240 | else | ||
241 | { | ||
242 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) | ||
243 | { | ||
244 | req.DoHTTPGruntWork( | ||
245 | m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
241 | } | 246 | } |
242 | else | 247 | else |
243 | { | 248 | { |
244 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) | 249 | ReQueueEvent(req); |
245 | { | ||
246 | req.DoHTTPGruntWork( | ||
247 | m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
248 | } | ||
249 | else | ||
250 | { | ||
251 | ReQueueEvent(req); | ||
252 | } | ||
253 | } | 250 | } |
254 | } | 251 | } |
255 | catch (Exception e) | 252 | } |
256 | { | 253 | catch (Exception e) |
257 | m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); | 254 | { |
258 | } | 255 | m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); |
259 | } | 256 | } |
260 | } | 257 | } |
261 | } | 258 | } |
diff --git a/OpenSim/Region/ClientStack/Linden/Caps/EventQueue/EventQueueGetModule.cs b/OpenSim/Region/ClientStack/Linden/Caps/EventQueue/EventQueueGetModule.cs index f0445ff..1835a72 100644 --- a/OpenSim/Region/ClientStack/Linden/Caps/EventQueue/EventQueueGetModule.cs +++ b/OpenSim/Region/ClientStack/Linden/Caps/EventQueue/EventQueueGetModule.cs | |||
@@ -364,7 +364,8 @@ namespace OpenSim.Region.ClientStack.Linden | |||
364 | 364 | ||
365 | caps.RegisterPollHandler( | 365 | caps.RegisterPollHandler( |
366 | "EventQueueGet", | 366 | "EventQueueGet", |
367 | new PollServiceEventArgs(null, GenerateEqgCapPath(eventQueueGetUUID), HasEvents, GetEvents, NoEvents, agentID, 40000)); | 367 | new PollServiceEventArgs( |
368 | null, GenerateEqgCapPath(eventQueueGetUUID), HasEvents, GetEvents, NoEvents, agentID, 40000)); | ||
368 | 369 | ||
369 | Random rnd = new Random(Environment.TickCount); | 370 | Random rnd = new Random(Environment.TickCount); |
370 | lock (m_ids) | 371 | lock (m_ids) |
@@ -382,10 +383,7 @@ namespace OpenSim.Region.ClientStack.Linden | |||
382 | Queue<OSD> queue = GetQueue(agentID); | 383 | Queue<OSD> queue = GetQueue(agentID); |
383 | if (queue != null) | 384 | if (queue != null) |
384 | lock (queue) | 385 | lock (queue) |
385 | { | ||
386 | //m_log.WarnFormat("POLLED FOR EVENTS BY {0} in {1} -- {2}", agentID, m_scene.RegionInfo.RegionName, queue.Count); | ||
387 | return queue.Count > 0; | 386 | return queue.Count > 0; |
388 | } | ||
389 | 387 | ||
390 | return false; | 388 | return false; |
391 | } | 389 | } |
@@ -408,7 +406,7 @@ namespace OpenSim.Region.ClientStack.Linden | |||
408 | public Hashtable GetEvents(UUID requestID, UUID pAgentId) | 406 | public Hashtable GetEvents(UUID requestID, UUID pAgentId) |
409 | { | 407 | { |
410 | if (DebugLevel >= 2) | 408 | if (DebugLevel >= 2) |
411 | m_log.WarnFormat("POLLED FOR EQ MESSAGES BY {0} in {1}", pAgentId, m_scene.RegionInfo.RegionName); | 409 | m_log.DebugFormat("POLLED FOR EQ MESSAGES BY {0} in {1}", pAgentId, m_scene.RegionInfo.RegionName); |
412 | 410 | ||
413 | Queue<OSD> queue = TryGetQueue(pAgentId); | 411 | Queue<OSD> queue = TryGetQueue(pAgentId); |
414 | OSD element; | 412 | OSD element; |