diff options
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r-- | OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | 318 |
1 files changed, 242 insertions, 76 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs index 3e84c55..28bba70 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | |||
@@ -33,132 +33,298 @@ using log4net; | |||
33 | using HttpServer; | 33 | using HttpServer; |
34 | using OpenSim.Framework; | 34 | using OpenSim.Framework; |
35 | using OpenSim.Framework.Monitoring; | 35 | using OpenSim.Framework.Monitoring; |
36 | using Amib.Threading; | ||
37 | using System.IO; | ||
38 | using System.Text; | ||
39 | using System.Collections.Generic; | ||
36 | 40 | ||
37 | namespace OpenSim.Framework.Servers.HttpServer | 41 | namespace OpenSim.Framework.Servers.HttpServer |
38 | { | 42 | { |
39 | public class PollServiceRequestManager | 43 | public class PollServiceRequestManager |
40 | { | 44 | { |
41 | // private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); | 45 | private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); |
46 | |||
47 | /// <summary> | ||
48 | /// Is the poll service request manager running? | ||
49 | /// </summary> | ||
50 | /// <remarks> | ||
51 | /// Can be running either synchronously or asynchronously | ||
52 | /// </remarks> | ||
53 | public bool IsRunning { get; private set; } | ||
54 | |||
55 | /// <summary> | ||
56 | /// Is the poll service performing responses asynchronously (with its own threads) or synchronously (via | ||
57 | /// external calls)? | ||
58 | /// </summary> | ||
59 | public bool PerformResponsesAsync { get; private set; } | ||
60 | |||
61 | /// <summary> | ||
62 | /// Number of responses actually processed and sent to viewer (or aborted due to error). | ||
63 | /// </summary> | ||
64 | public int ResponsesProcessed { get; private set; } | ||
42 | 65 | ||
43 | private readonly BaseHttpServer m_server; | 66 | private readonly BaseHttpServer m_server; |
44 | private static Queue m_requests = Queue.Synchronized(new Queue()); | 67 | |
68 | private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); | ||
69 | private static List<PollServiceHttpRequest> m_longPollRequests = new List<PollServiceHttpRequest>(); | ||
70 | |||
45 | private uint m_WorkerThreadCount = 0; | 71 | private uint m_WorkerThreadCount = 0; |
46 | private Thread[] m_workerThreads; | 72 | private Thread[] m_workerThreads; |
47 | private PollServiceWorkerThread[] m_PollServiceWorkerThreads; | ||
48 | private volatile bool m_running = true; | ||
49 | private int m_pollTimeout; | ||
50 | 73 | ||
51 | public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) | 74 | private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); |
75 | |||
76 | // private int m_timeout = 1000; // increase timeout 250; now use the event one | ||
77 | |||
78 | public PollServiceRequestManager( | ||
79 | BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout) | ||
52 | { | 80 | { |
53 | m_server = pSrv; | 81 | m_server = pSrv; |
82 | PerformResponsesAsync = performResponsesAsync; | ||
54 | m_WorkerThreadCount = pWorkerThreadCount; | 83 | m_WorkerThreadCount = pWorkerThreadCount; |
55 | m_pollTimeout = pTimeout; | 84 | m_workerThreads = new Thread[m_WorkerThreadCount]; |
85 | |||
86 | StatsManager.RegisterStat( | ||
87 | new Stat( | ||
88 | "QueuedPollResponses", | ||
89 | "Number of poll responses queued for processing.", | ||
90 | "", | ||
91 | "", | ||
92 | "httpserver", | ||
93 | m_server.Port.ToString(), | ||
94 | StatType.Pull, | ||
95 | MeasuresOfInterest.AverageChangeOverTime, | ||
96 | stat => stat.Value = m_requests.Count(), | ||
97 | StatVerbosity.Debug)); | ||
98 | |||
99 | StatsManager.RegisterStat( | ||
100 | new Stat( | ||
101 | "ProcessedPollResponses", | ||
102 | "Number of poll responses processed.", | ||
103 | "", | ||
104 | "", | ||
105 | "httpserver", | ||
106 | m_server.Port.ToString(), | ||
107 | StatType.Pull, | ||
108 | MeasuresOfInterest.AverageChangeOverTime, | ||
109 | stat => stat.Value = ResponsesProcessed, | ||
110 | StatVerbosity.Debug)); | ||
56 | } | 111 | } |
57 | 112 | ||
58 | public void Start() | 113 | public void Start() |
59 | { | 114 | { |
60 | m_running = true; | 115 | IsRunning = true; |
61 | m_workerThreads = new Thread[m_WorkerThreadCount]; | ||
62 | m_PollServiceWorkerThreads = new PollServiceWorkerThread[m_WorkerThreadCount]; | ||
63 | 116 | ||
64 | //startup worker threads | 117 | if (PerformResponsesAsync) |
65 | for (uint i = 0; i < m_WorkerThreadCount; i++) | ||
66 | { | 118 | { |
67 | m_PollServiceWorkerThreads[i] = new PollServiceWorkerThread(m_server, m_pollTimeout); | 119 | //startup worker threads |
68 | m_PollServiceWorkerThreads[i].ReQueue += ReQueueEvent; | 120 | for (uint i = 0; i < m_WorkerThreadCount; i++) |
69 | 121 | { | |
70 | m_workerThreads[i] | 122 | m_workerThreads[i] |
71 | = Watchdog.StartThread( | 123 | = WorkManager.StartThread( |
72 | m_PollServiceWorkerThreads[i].ThreadStart, | 124 | PoolWorkerJob, |
73 | String.Format("PollServiceWorkerThread{0}", i), | 125 | string.Format("PollServiceWorkerThread{0}:{1}", i, m_server.Port), |
74 | ThreadPriority.Normal, | 126 | ThreadPriority.Normal, |
75 | false, | 127 | false, |
76 | true, | 128 | false, |
77 | null, | 129 | null, |
78 | int.MaxValue); | 130 | int.MaxValue); |
79 | } | 131 | } |
80 | 132 | ||
81 | Watchdog.StartThread( | 133 | WorkManager.StartThread( |
82 | this.ThreadStart, | 134 | this.CheckLongPollThreads, |
83 | "PollServiceWatcherThread", | 135 | string.Format("LongPollServiceWatcherThread:{0}", m_server.Port), |
84 | ThreadPriority.Normal, | 136 | ThreadPriority.Normal, |
85 | false, | 137 | false, |
86 | true, | 138 | true, |
87 | null, | 139 | null, |
88 | 1000 * 60 * 10); | 140 | 1000 * 60 * 10); |
141 | } | ||
89 | } | 142 | } |
90 | 143 | ||
91 | internal void ReQueueEvent(PollServiceHttpRequest req) | 144 | private void ReQueueEvent(PollServiceHttpRequest req) |
92 | { | 145 | { |
93 | // Do accounting stuff here | 146 | if (IsRunning) |
94 | Enqueue(req); | 147 | { |
95 | } | 148 | // delay the enqueueing for 100ms. There's no need to have the event |
149 | // actively on the queue | ||
150 | Timer t = new Timer(self => { | ||
151 | ((Timer)self).Dispose(); | ||
152 | m_requests.Enqueue(req); | ||
153 | }); | ||
96 | 154 | ||
97 | public void Enqueue(PollServiceHttpRequest req) | 155 | t.Change(100, Timeout.Infinite); |
98 | { | 156 | |
99 | lock (m_requests) | 157 | } |
100 | m_requests.Enqueue(req); | ||
101 | } | 158 | } |
102 | 159 | ||
103 | public void ThreadStart() | 160 | public void Enqueue(PollServiceHttpRequest req) |
104 | { | 161 | { |
105 | while (m_running) | 162 | if (IsRunning) |
106 | { | 163 | { |
107 | Watchdog.UpdateThread(); | 164 | if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) |
108 | ProcessQueuedRequests(); | 165 | { |
109 | Thread.Sleep(1000); | 166 | lock (m_longPollRequests) |
167 | m_longPollRequests.Add(req); | ||
168 | } | ||
169 | else | ||
170 | m_requests.Enqueue(req); | ||
110 | } | 171 | } |
111 | } | 172 | } |
112 | 173 | ||
113 | private void ProcessQueuedRequests() | 174 | private void CheckLongPollThreads() |
114 | { | 175 | { |
115 | lock (m_requests) | 176 | // The only purpose of this thread is to check the EQs for events. |
177 | // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests. | ||
178 | // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests. | ||
179 | // All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature, | ||
180 | // so if they aren't ready to be served by a worker thread (no events), they are placed | ||
181 | // directly back in the "ready-to-serve" queue by the worker thread. | ||
182 | while (IsRunning) | ||
116 | { | 183 | { |
117 | if (m_requests.Count == 0) | 184 | Thread.Sleep(500); |
118 | return; | 185 | Watchdog.UpdateThread(); |
119 | |||
120 | // m_log.DebugFormat("[POLL SERVICE REQUEST MANAGER]: Processing {0} requests", m_requests.Count); | ||
121 | |||
122 | int reqperthread = (int) (m_requests.Count/m_WorkerThreadCount) + 1; | ||
123 | 186 | ||
124 | // For Each WorkerThread | 187 | // List<PollServiceHttpRequest> not_ready = new List<PollServiceHttpRequest>(); |
125 | for (int tc = 0; tc < m_WorkerThreadCount && m_requests.Count > 0; tc++) | 188 | lock (m_longPollRequests) |
126 | { | 189 | { |
127 | //Loop over number of requests each thread handles. | 190 | if (m_longPollRequests.Count > 0 && IsRunning) |
128 | for (int i = 0; i < reqperthread && m_requests.Count > 0; i++) | ||
129 | { | 191 | { |
130 | try | 192 | List<PollServiceHttpRequest> ready = m_longPollRequests.FindAll(req => |
131 | { | 193 | (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ |
132 | m_PollServiceWorkerThreads[tc].Enqueue((PollServiceHttpRequest)m_requests.Dequeue()); | 194 | (Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout |
133 | } | 195 | ); |
134 | catch (InvalidOperationException) | 196 | |
135 | { | 197 | ready.ForEach(req => |
136 | // The queue is empty, we did our calculations wrong! | 198 | { |
137 | return; | 199 | m_requests.Enqueue(req); |
138 | } | 200 | m_longPollRequests.Remove(req); |
139 | 201 | }); | |
202 | |||
140 | } | 203 | } |
204 | |||
141 | } | 205 | } |
142 | } | 206 | } |
143 | |||
144 | } | 207 | } |
145 | 208 | ||
146 | public void Stop() | 209 | public void Stop() |
147 | { | 210 | { |
148 | m_running = false; | 211 | IsRunning = false; |
212 | // m_timeout = -10000; // cause all to expire | ||
213 | Thread.Sleep(1000); // let the world move | ||
214 | |||
215 | foreach (Thread t in m_workerThreads) | ||
216 | Watchdog.AbortThread(t.ManagedThreadId); | ||
217 | |||
218 | PollServiceHttpRequest wreq; | ||
149 | 219 | ||
150 | foreach (object o in m_requests) | 220 | lock (m_longPollRequests) |
151 | { | 221 | { |
152 | PollServiceHttpRequest req = (PollServiceHttpRequest) o; | 222 | if (m_longPollRequests.Count > 0 && IsRunning) |
153 | PollServiceWorkerThread.DoHTTPGruntWork( | 223 | m_longPollRequests.ForEach(req => m_requests.Enqueue(req)); |
154 | m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
155 | } | 224 | } |
156 | 225 | ||
226 | while (m_requests.Count() > 0) | ||
227 | { | ||
228 | try | ||
229 | { | ||
230 | wreq = m_requests.Dequeue(0); | ||
231 | ResponsesProcessed++; | ||
232 | wreq.DoHTTPGruntWork( | ||
233 | m_server, wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id)); | ||
234 | } | ||
235 | catch | ||
236 | { | ||
237 | } | ||
238 | } | ||
239 | |||
240 | m_longPollRequests.Clear(); | ||
157 | m_requests.Clear(); | 241 | m_requests.Clear(); |
242 | } | ||
158 | 243 | ||
159 | foreach (Thread t in m_workerThreads) | 244 | // work threads |
245 | |||
246 | private void PoolWorkerJob() | ||
247 | { | ||
248 | while (IsRunning) | ||
160 | { | 249 | { |
161 | t.Abort(); | 250 | Watchdog.UpdateThread(); |
251 | WaitPerformResponse(); | ||
252 | } | ||
253 | } | ||
254 | |||
255 | public void WaitPerformResponse() | ||
256 | { | ||
257 | PollServiceHttpRequest req = m_requests.Dequeue(5000); | ||
258 | // m_log.DebugFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString())); | ||
259 | |||
260 | if (req != null) | ||
261 | { | ||
262 | try | ||
263 | { | ||
264 | if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) | ||
265 | { | ||
266 | Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); | ||
267 | |||
268 | if (responsedata == null) | ||
269 | return; | ||
270 | |||
271 | // This is the event queue. | ||
272 | // Even if we're not running we can still perform responses by explicit request. | ||
273 | if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll | ||
274 | || !PerformResponsesAsync) | ||
275 | { | ||
276 | try | ||
277 | { | ||
278 | ResponsesProcessed++; | ||
279 | req.DoHTTPGruntWork(m_server, responsedata); | ||
280 | } | ||
281 | catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream | ||
282 | { | ||
283 | // Ignore it, no need to reply | ||
284 | m_log.Error(e); | ||
285 | } | ||
286 | } | ||
287 | else | ||
288 | { | ||
289 | m_threadPool.QueueWorkItem(x => | ||
290 | { | ||
291 | try | ||
292 | { | ||
293 | ResponsesProcessed++; | ||
294 | req.DoHTTPGruntWork(m_server, responsedata); | ||
295 | } | ||
296 | catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream | ||
297 | { | ||
298 | // Ignore it, no need to reply | ||
299 | m_log.Error(e); | ||
300 | } | ||
301 | catch (Exception e) | ||
302 | { | ||
303 | m_log.Error(e); | ||
304 | } | ||
305 | |||
306 | return null; | ||
307 | }, null); | ||
308 | } | ||
309 | } | ||
310 | else | ||
311 | { | ||
312 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) | ||
313 | { | ||
314 | ResponsesProcessed++; | ||
315 | req.DoHTTPGruntWork( | ||
316 | m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
317 | } | ||
318 | else | ||
319 | { | ||
320 | ReQueueEvent(req); | ||
321 | } | ||
322 | } | ||
323 | } | ||
324 | catch (Exception e) | ||
325 | { | ||
326 | m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); | ||
327 | } | ||
162 | } | 328 | } |
163 | } | 329 | } |
164 | } | 330 | } |