diff options
author | Melanie | 2013-06-07 23:43:45 +0100 |
---|---|---|
committer | Melanie | 2013-06-07 23:43:45 +0100 |
commit | 7c0bfca7a03584dd65c5659f177b434ee94ddc9d (patch) | |
tree | 82ceefae330b2c2b80a20db68abd79ae2439b034 /OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | |
parent | minor: add dr0berts to contributors list (diff) | |
download | opensim-SC_OLD-7c0bfca7a03584dd65c5659f177b434ee94ddc9d.zip opensim-SC_OLD-7c0bfca7a03584dd65c5659f177b434ee94ddc9d.tar.gz opensim-SC_OLD-7c0bfca7a03584dd65c5659f177b434ee94ddc9d.tar.bz2 opensim-SC_OLD-7c0bfca7a03584dd65c5659f177b434ee94ddc9d.tar.xz |
Adding Avination's PollService to round out the HTTP inventory changes
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r-- | OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | 248 |
1 files changed, 192 insertions, 56 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs index 3e84c55..a5380c1 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs | |||
@@ -33,53 +33,56 @@ using log4net; | |||
33 | using HttpServer; | 33 | using HttpServer; |
34 | using OpenSim.Framework; | 34 | using OpenSim.Framework; |
35 | using OpenSim.Framework.Monitoring; | 35 | using OpenSim.Framework.Monitoring; |
36 | using Amib.Threading; | ||
37 | using System.IO; | ||
38 | using System.Text; | ||
39 | using System.Collections.Generic; | ||
36 | 40 | ||
37 | namespace OpenSim.Framework.Servers.HttpServer | 41 | namespace OpenSim.Framework.Servers.HttpServer |
38 | { | 42 | { |
39 | public class PollServiceRequestManager | 43 | public class PollServiceRequestManager |
40 | { | 44 | { |
41 | // private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); | 45 | private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); |
42 | 46 | ||
43 | private readonly BaseHttpServer m_server; | 47 | private readonly BaseHttpServer m_server; |
44 | private static Queue m_requests = Queue.Synchronized(new Queue()); | 48 | |
49 | private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); | ||
50 | private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>(); | ||
51 | private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>(); | ||
52 | |||
45 | private uint m_WorkerThreadCount = 0; | 53 | private uint m_WorkerThreadCount = 0; |
46 | private Thread[] m_workerThreads; | 54 | private Thread[] m_workerThreads; |
47 | private PollServiceWorkerThread[] m_PollServiceWorkerThreads; | 55 | private Thread m_retrysThread; |
48 | private volatile bool m_running = true; | 56 | |
49 | private int m_pollTimeout; | 57 | private bool m_running = true; |
58 | private int slowCount = 0; | ||
59 | |||
60 | private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2); | ||
61 | |||
62 | // private int m_timeout = 1000; // increase timeout 250; now use the event one | ||
50 | 63 | ||
51 | public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) | 64 | public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) |
52 | { | 65 | { |
53 | m_server = pSrv; | 66 | m_server = pSrv; |
54 | m_WorkerThreadCount = pWorkerThreadCount; | 67 | m_WorkerThreadCount = pWorkerThreadCount; |
55 | m_pollTimeout = pTimeout; | ||
56 | } | ||
57 | |||
58 | public void Start() | ||
59 | { | ||
60 | m_running = true; | ||
61 | m_workerThreads = new Thread[m_WorkerThreadCount]; | 68 | m_workerThreads = new Thread[m_WorkerThreadCount]; |
62 | m_PollServiceWorkerThreads = new PollServiceWorkerThread[m_WorkerThreadCount]; | ||
63 | 69 | ||
64 | //startup worker threads | 70 | //startup worker threads |
65 | for (uint i = 0; i < m_WorkerThreadCount; i++) | 71 | for (uint i = 0; i < m_WorkerThreadCount; i++) |
66 | { | 72 | { |
67 | m_PollServiceWorkerThreads[i] = new PollServiceWorkerThread(m_server, m_pollTimeout); | ||
68 | m_PollServiceWorkerThreads[i].ReQueue += ReQueueEvent; | ||
69 | |||
70 | m_workerThreads[i] | 73 | m_workerThreads[i] |
71 | = Watchdog.StartThread( | 74 | = Watchdog.StartThread( |
72 | m_PollServiceWorkerThreads[i].ThreadStart, | 75 | PoolWorkerJob, |
73 | String.Format("PollServiceWorkerThread{0}", i), | 76 | String.Format("PollServiceWorkerThread{0}", i), |
74 | ThreadPriority.Normal, | 77 | ThreadPriority.Normal, |
75 | false, | 78 | false, |
76 | true, | 79 | false, |
77 | null, | 80 | null, |
78 | int.MaxValue); | 81 | int.MaxValue); |
79 | } | 82 | } |
80 | 83 | ||
81 | Watchdog.StartThread( | 84 | m_retrysThread = Watchdog.StartThread( |
82 | this.ThreadStart, | 85 | this.CheckRetries, |
83 | "PollServiceWatcherThread", | 86 | "PollServiceWatcherThread", |
84 | ThreadPriority.Normal, | 87 | ThreadPriority.Normal, |
85 | false, | 88 | false, |
@@ -88,78 +91,211 @@ namespace OpenSim.Framework.Servers.HttpServer | |||
88 | 1000 * 60 * 10); | 91 | 1000 * 60 * 10); |
89 | } | 92 | } |
90 | 93 | ||
91 | internal void ReQueueEvent(PollServiceHttpRequest req) | 94 | |
95 | private void ReQueueEvent(PollServiceHttpRequest req) | ||
92 | { | 96 | { |
93 | // Do accounting stuff here | 97 | if (m_running) |
94 | Enqueue(req); | 98 | { |
99 | lock (m_retryRequests) | ||
100 | m_retryRequests.Enqueue(req); | ||
101 | } | ||
95 | } | 102 | } |
96 | 103 | ||
97 | public void Enqueue(PollServiceHttpRequest req) | 104 | public void Enqueue(PollServiceHttpRequest req) |
98 | { | 105 | { |
99 | lock (m_requests) | 106 | if (m_running) |
100 | m_requests.Enqueue(req); | 107 | { |
108 | if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.Normal) | ||
109 | { | ||
110 | m_requests.Enqueue(req); | ||
111 | } | ||
112 | else | ||
113 | { | ||
114 | lock (m_slowRequests) | ||
115 | m_slowRequests.Enqueue(req); | ||
116 | } | ||
117 | } | ||
101 | } | 118 | } |
102 | 119 | ||
103 | public void ThreadStart() | 120 | private void CheckRetries() |
104 | { | 121 | { |
105 | while (m_running) | 122 | while (m_running) |
106 | { | 123 | { |
124 | Thread.Sleep(100); // let the world move .. back to faster rate | ||
107 | Watchdog.UpdateThread(); | 125 | Watchdog.UpdateThread(); |
108 | ProcessQueuedRequests(); | 126 | lock (m_retryRequests) |
109 | Thread.Sleep(1000); | 127 | { |
128 | while (m_retryRequests.Count > 0 && m_running) | ||
129 | m_requests.Enqueue(m_retryRequests.Dequeue()); | ||
130 | } | ||
131 | slowCount++; | ||
132 | if (slowCount >= 10) | ||
133 | { | ||
134 | slowCount = 0; | ||
135 | |||
136 | lock (m_slowRequests) | ||
137 | { | ||
138 | while (m_slowRequests.Count > 0 && m_running) | ||
139 | m_requests.Enqueue(m_slowRequests.Dequeue()); | ||
140 | } | ||
141 | } | ||
110 | } | 142 | } |
111 | } | 143 | } |
112 | 144 | ||
113 | private void ProcessQueuedRequests() | 145 | ~PollServiceRequestManager() |
114 | { | 146 | { |
115 | lock (m_requests) | 147 | m_running = false; |
148 | // m_timeout = -10000; // cause all to expire | ||
149 | Thread.Sleep(1000); // let the world move | ||
150 | |||
151 | foreach (Thread t in m_workerThreads) | ||
152 | Watchdog.AbortThread(t.ManagedThreadId); | ||
153 | |||
154 | try | ||
155 | { | ||
156 | foreach (PollServiceHttpRequest req in m_retryRequests) | ||
157 | { | ||
158 | DoHTTPGruntWork(m_server,req, | ||
159 | req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
160 | } | ||
161 | } | ||
162 | catch | ||
163 | { | ||
164 | } | ||
165 | |||
166 | PollServiceHttpRequest wreq; | ||
167 | m_retryRequests.Clear(); | ||
168 | |||
169 | lock (m_slowRequests) | ||
116 | { | 170 | { |
117 | if (m_requests.Count == 0) | 171 | while (m_slowRequests.Count > 0 && m_running) |
118 | return; | 172 | m_requests.Enqueue(m_slowRequests.Dequeue()); |
173 | } | ||
119 | 174 | ||
120 | // m_log.DebugFormat("[POLL SERVICE REQUEST MANAGER]: Processing {0} requests", m_requests.Count); | 175 | while (m_requests.Count() > 0) |
176 | { | ||
177 | try | ||
178 | { | ||
179 | wreq = m_requests.Dequeue(0); | ||
180 | DoHTTPGruntWork(m_server,wreq, | ||
181 | wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id)); | ||
182 | } | ||
183 | catch | ||
184 | { | ||
185 | } | ||
186 | } | ||
121 | 187 | ||
122 | int reqperthread = (int) (m_requests.Count/m_WorkerThreadCount) + 1; | 188 | m_requests.Clear(); |
189 | } | ||
123 | 190 | ||
124 | // For Each WorkerThread | 191 | // work threads |
125 | for (int tc = 0; tc < m_WorkerThreadCount && m_requests.Count > 0; tc++) | 192 | |
193 | private void PoolWorkerJob() | ||
194 | { | ||
195 | while (m_running) | ||
196 | { | ||
197 | PollServiceHttpRequest req = m_requests.Dequeue(5000); | ||
198 | |||
199 | Watchdog.UpdateThread(); | ||
200 | if (req != null) | ||
126 | { | 201 | { |
127 | //Loop over number of requests each thread handles. | 202 | try |
128 | for (int i = 0; i < reqperthread && m_requests.Count > 0; i++) | ||
129 | { | 203 | { |
130 | try | 204 | if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) |
131 | { | 205 | { |
132 | m_PollServiceWorkerThreads[tc].Enqueue((PollServiceHttpRequest)m_requests.Dequeue()); | 206 | Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); |
207 | |||
208 | if (responsedata == null) | ||
209 | continue; | ||
210 | |||
211 | if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.Normal) // This is the event queue | ||
212 | { | ||
213 | try | ||
214 | { | ||
215 | DoHTTPGruntWork(m_server, req, responsedata); | ||
216 | } | ||
217 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream | ||
218 | { | ||
219 | // Ignore it, no need to reply | ||
220 | } | ||
221 | } | ||
222 | else | ||
223 | { | ||
224 | m_threadPool.QueueWorkItem(x => | ||
225 | { | ||
226 | try | ||
227 | { | ||
228 | DoHTTPGruntWork(m_server, req, responsedata); | ||
229 | } | ||
230 | catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream | ||
231 | { | ||
232 | // Ignore it, no need to reply | ||
233 | } | ||
234 | |||
235 | return null; | ||
236 | }, null); | ||
237 | } | ||
133 | } | 238 | } |
134 | catch (InvalidOperationException) | 239 | else |
135 | { | 240 | { |
136 | // The queue is empty, we did our calculations wrong! | 241 | if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) |
137 | return; | 242 | { |
243 | DoHTTPGruntWork(m_server, req, | ||
244 | req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
245 | } | ||
246 | else | ||
247 | { | ||
248 | ReQueueEvent(req); | ||
249 | } | ||
138 | } | 250 | } |
139 | 251 | } | |
252 | catch (Exception e) | ||
253 | { | ||
254 | m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); | ||
140 | } | 255 | } |
141 | } | 256 | } |
142 | } | 257 | } |
143 | |||
144 | } | 258 | } |
145 | 259 | ||
146 | public void Stop() | 260 | // DoHTTPGruntWork changed, not sending response |
261 | // do the same work around as core | ||
262 | |||
263 | internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata) | ||
147 | { | 264 | { |
148 | m_running = false; | 265 | OSHttpResponse response |
266 | = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext); | ||
149 | 267 | ||
150 | foreach (object o in m_requests) | 268 | byte[] buffer = server.DoHTTPGruntWork(responsedata, response); |
151 | { | ||
152 | PollServiceHttpRequest req = (PollServiceHttpRequest) o; | ||
153 | PollServiceWorkerThread.DoHTTPGruntWork( | ||
154 | m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); | ||
155 | } | ||
156 | 269 | ||
157 | m_requests.Clear(); | 270 | response.SendChunked = false; |
271 | response.ContentLength64 = buffer.Length; | ||
272 | response.ContentEncoding = Encoding.UTF8; | ||
158 | 273 | ||
159 | foreach (Thread t in m_workerThreads) | 274 | try |
275 | { | ||
276 | response.OutputStream.Write(buffer, 0, buffer.Length); | ||
277 | } | ||
278 | catch (Exception ex) | ||
160 | { | 279 | { |
161 | t.Abort(); | 280 | m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex)); |
281 | } | ||
282 | finally | ||
283 | { | ||
284 | //response.OutputStream.Close(); | ||
285 | try | ||
286 | { | ||
287 | response.OutputStream.Flush(); | ||
288 | response.Send(); | ||
289 | |||
290 | //if (!response.KeepAlive && response.ReuseContext) | ||
291 | // response.FreeContext(); | ||
292 | } | ||
293 | catch (Exception e) | ||
294 | { | ||
295 | m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e)); | ||
296 | } | ||
162 | } | 297 | } |
163 | } | 298 | } |
164 | } | 299 | } |
165 | } \ No newline at end of file | 300 | } |
301 | |||