diff options
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r-- | OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | 175 |
1 files changed, 104 insertions, 71 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs index 6aa5907..456acb0 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | |||
@@ -44,6 +44,20 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
44 | { | 44 | { |
45 | private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); | 45 | private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); |
46 | 46 | ||
47 | /// <summary> | ||
48 | /// Is the poll service request manager running? | ||
49 | /// </summary> | ||
50 | /// <remarks> | ||
51 | /// Can be running either synchronously or asynchronously | ||
52 | /// </remarks> | ||
53 | public bool IsRunning { get; private set; } | ||
54 | |||
55 | /// <summary> | ||
56 | /// Is the poll service performing responses asynchronously (with its own threads) or synchronously (via | ||
57 | /// external calls)? | ||
58 | /// </summary> | ||
59 | public bool PerformResponsesAsync { get; private set; } | ||
60 | |||
47 | private readonly BaseHttpServer m_server; | 61 | private readonly BaseHttpServer m_server; |
48 | 62 | ||
49 | private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); | 63 | private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); |
@@ -52,48 +66,53 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
52 | private uint m_WorkerThreadCount = 0; | 66 | private uint m_WorkerThreadCount = 0; |
53 | private Thread[] m_workerThreads; | 67 | private Thread[] m_workerThreads; |
54 | 68 | ||
55 | private bool m_running = true; | ||
56 | |||
57 | private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); | 69 | private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); |
58 | 70 | ||
59 | // private int m_timeout = 1000; // increase timeout 250; now use the event one | 71 | // private int m_timeout = 1000; // increase timeout 250; now use the event one |
60 | 72 | ||
61 | public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) | 73 | public PollServiceRequestManager( |
74 | BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout) | ||
62 | { | 75 | { |
63 | m_server = pSrv; | 76 | m_server = pSrv; |
77 | PerformResponsesAsync = performResponsesAsync; | ||
64 | m_WorkerThreadCount = pWorkerThreadCount; | 78 | m_WorkerThreadCount = pWorkerThreadCount; |
65 | m_workerThreads = new Thread[m_WorkerThreadCount]; | 79 | m_workerThreads = new Thread[m_WorkerThreadCount]; |
66 | } | 80 | } |
67 | 81 | ||
68 | public void Start() | 82 | public void Start() |
69 | { | 83 | { |
70 | //startup worker threads | 84 | IsRunning = true; |
71 | for (uint i = 0; i < m_WorkerThreadCount; i++) | 85 | |
86 | if (PerformResponsesAsync) | ||
72 | { | 87 | { |
73 | m_workerThreads[i] | 88 | //startup worker threads |
74 | = Watchdog.StartThread( | 89 | for (uint i = 0; i < m_WorkerThreadCount; i++) |
75 | PoolWorkerJob, | 90 | { |
76 | string.Format("PollServiceWorkerThread{0}:{1}", i, m_server.Port), | 91 | m_workerThreads[i] |
77 | ThreadPriority.Normal, | 92 | = Watchdog.StartThread( |
78 | false, | 93 | PoolWorkerJob, |
79 | false, | 94 | string.Format("PollServiceWorkerThread{0}:{1}", i, m_server.Port), |
80 | null, | 95 | ThreadPriority.Normal, |
81 | int.MaxValue); | 96 | false, |
82 | } | 97 | false, |
98 | null, | ||
99 | int.MaxValue); | ||
100 | } | ||
83 | 101 | ||
84 | Watchdog.StartThread( | 102 | Watchdog.StartThread( |
85 | this.CheckLongPollThreads, | 103 | this.CheckLongPollThreads, |
86 | string.Format("LongPollServiceWatcherThread:{0}", m_server.Port), | 104 | string.Format("LongPollServiceWatcherThread:{0}", m_server.Port), |
87 | ThreadPriority.Normal, | 105 | ThreadPriority.Normal, |
88 | false, | 106 | false, |
89 | true, | 107 | true, |
90 | null, | 108 | null, |
91 | 1000 * 60 * 10); | 109 | 1000 * 60 * 10); |
110 | } | ||
92 | } | 111 | } |
93 | 112 | ||
94 | private void ReQueueEvent(PollServiceHttpRequest req) | 113 | private void ReQueueEvent(PollServiceHttpRequest req) |
95 | { | 114 | { |
96 | if (m_running) | 115 | if (IsRunning) |
97 | { | 116 | { |
98 | // delay the enqueueing for 100ms. There's no need to have the event | 117 | // delay the enqueueing for 100ms. There's no need to have the event |
99 | // actively on the queue | 118 | // actively on the queue |
@@ -109,7 +128,7 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
109 | 128 | ||
110 | public void Enqueue(PollServiceHttpRequest req) | 129 | public void Enqueue(PollServiceHttpRequest req) |
111 | { | 130 | { |
112 | if (m_running) | 131 | if (IsRunning) |
113 | { | 132 | { |
114 | if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) | 133 | if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) |
115 | { | 134 | { |
@@ -129,7 +148,7 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
129 | // All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature, | 148 | // All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature, |
130 | // so if they aren't ready to be served by a worker thread (no events), they are placed | 149 | // so if they aren't ready to be served by a worker thread (no events), they are placed |
131 | // directly back in the "ready-to-serve" queue by the worker thread. | 150 | // directly back in the "ready-to-serve" queue by the worker thread. |
132 | while (m_running) | 151 | while (IsRunning) |
133 | { | 152 | { |
134 | Thread.Sleep(500); | 153 | Thread.Sleep(500); |
135 | Watchdog.UpdateThread(); | 154 | Watchdog.UpdateThread(); |
@@ -137,7 +156,7 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
137 | // List<PollServiceHttpRequest> not_ready = new List<PollServiceHttpRequest>(); | 156 | // List<PollServiceHttpRequest> not_ready = new List<PollServiceHttpRequest>(); |
138 | lock (m_longPollRequests) | 157 | lock (m_longPollRequests) |
139 | { | 158 | { |
140 | if (m_longPollRequests.Count > 0 && m_running) | 159 | if (m_longPollRequests.Count > 0 && IsRunning) |
141 | { | 160 | { |
142 | List<PollServiceHttpRequest> ready = m_longPollRequests.FindAll(req => | 161 | List<PollServiceHttpRequest> ready = m_longPollRequests.FindAll(req => |
143 | (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ | 162 | (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ |
@@ -158,7 +177,7 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
158 | 177 | ||
159 | public void Stop() | 178 | public void Stop() |
160 | { | 179 | { |
161 | m_running = false; | 180 | IsRunning = false; |
162 | // m_timeout = -10000; // cause all to expire | 181 | // m_timeout = -10000; // cause all to expire |
163 | Thread.Sleep(1000); // let the world move | 182 | Thread.Sleep(1000); // let the world move |
164 | 183 | ||
@@ -169,7 +188,7 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
169 | 188 | ||
170 | lock (m_longPollRequests) | 189 | lock (m_longPollRequests) |
171 | { | 190 | { |
172 | if (m_longPollRequests.Count > 0 && m_running) | 191 | if (m_longPollRequests.Count > 0 && IsRunning) |
173 | m_longPollRequests.ForEach(req => m_requests.Enqueue(req)); | 192 | m_longPollRequests.ForEach(req => m_requests.Enqueue(req)); |
174 | } | 193 | } |
175 | 194 | ||
@@ -194,68 +213,82 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
194 | 213 | ||
195 | private void PoolWorkerJob() | 214 | private void PoolWorkerJob() |
196 | { | 215 | { |
197 | while (m_running) | 216 | while (IsRunning) |
198 | { | 217 | { |
199 | PollServiceHttpRequest req = m_requests.Dequeue(5000); | ||
200 | //m_log.WarnFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString())); | ||
201 | |||
202 | Watchdog.UpdateThread(); | 218 | Watchdog.UpdateThread(); |
203 | if (req != null) | 219 | WaitPerformResponse(); |
220 | } | ||
221 | } | ||
222 | |||
223 | public void WaitPerformResponse() | ||
224 | { | ||
225 | PollServiceHttpRequest req = m_requests.Dequeue(5000); | ||
226 | // m_log.DebugFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString())); | ||
227 | |||
228 | if (req != null) | ||
229 | { | ||
230 | try | ||
204 | { | 231 | { |
205 | try | 232 | if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) |
206 | { | 233 | { |
207 | if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) | 234 | Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); |
208 | { | ||
209 | Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); | ||
210 | 235 | ||
211 | if (responsedata == null) | 236 | if (responsedata == null) |
212 | continue; | 237 | return; |
213 | 238 | ||
214 | if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) // This is the event queue | 239 | // This is the event queue. |
240 | // Even if we're not running we can still perform responses by explicit request. | ||
241 | if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll | ||
242 | || !PerformResponsesAsync) | ||
243 | { | ||
244 | try | ||
245 | { | ||
246 | req.DoHTTPGruntWork(m_server, responsedata); | ||
247 | } | ||
248 | catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream | ||
249 | { | ||
250 | // Ignore it, no need to reply | ||
251 | m_log.Error(e); | ||
252 | } | ||
253 | } | ||
254 | else | ||
255 | { | ||
256 | m_threadPool.QueueWorkItem(x => | ||
215 | { | 257 | { |
216 | try | 258 | try |
217 | { | 259 | { |
218 | req.DoHTTPGruntWork(m_server, responsedata); | 260 | req.DoHTTPGruntWork(m_server, responsedata); |
219 | } | 261 | } |
220 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream | 262 | catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream |
221 | { | 263 | { |
222 | // Ignore it, no need to reply | 264 | // Ignore it, no need to reply |
265 | m_log.Error(e); | ||
223 | } | 266 | } |
224 | } | 267 | catch (Exception e) |
225 | else | ||
226 | { | ||
227 | m_threadPool.QueueWorkItem(x => | ||
228 | { | 268 | { |
229 | try | 269 | m_log.Error(e); |
230 | { | 270 | } |
231 | req.DoHTTPGruntWork(m_server, responsedata); | 271 | |
232 | } | 272 | return null; |
233 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream | 273 | }, null); |
234 | { | 274 | } |
235 | // Ignore it, no need to reply | 275 | } |
236 | } | 276 | else |
237 | 277 | { | |
238 | return null; | 278 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) |
239 | }, null); | 279 | { |
240 | } | 280 | req.DoHTTPGruntWork( |
281 | m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
241 | } | 282 | } |
242 | else | 283 | else |
243 | { | 284 | { |
244 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) | 285 | ReQueueEvent(req); |
245 | { | ||
246 | req.DoHTTPGruntWork( | ||
247 | m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
248 | } | ||
249 | else | ||
250 | { | ||
251 | ReQueueEvent(req); | ||
252 | } | ||
253 | } | 286 | } |
254 | } | 287 | } |
255 | catch (Exception e) | 288 | } |
256 | { | 289 | catch (Exception e) |
257 | m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); | 290 | { |
258 | } | 291 | m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); |
259 | } | 292 | } |
260 | } | 293 | } |
261 | } | 294 | } |