aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs')
-rw-r--r--OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs241
1 files changed, 185 insertions, 56 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index 3e84c55..5406f00 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -33,53 +33,54 @@ using log4net;
33using HttpServer; 33using HttpServer;
34using OpenSim.Framework; 34using OpenSim.Framework;
35using OpenSim.Framework.Monitoring; 35using OpenSim.Framework.Monitoring;
36using Amib.Threading;
37using System.IO;
38using System.Text;
39using System.Collections.Generic;
36 40
37namespace OpenSim.Framework.Servers.HttpServer 41namespace OpenSim.Framework.Servers.HttpServer
38{ 42{
39 public class PollServiceRequestManager 43 public class PollServiceRequestManager
40 { 44 {
41// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); 45 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
42 46
43 private readonly BaseHttpServer m_server; 47 private readonly BaseHttpServer m_server;
44 private static Queue m_requests = Queue.Synchronized(new Queue()); 48
49 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
50 private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>();
51 private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
52
45 private uint m_WorkerThreadCount = 0; 53 private uint m_WorkerThreadCount = 0;
46 private Thread[] m_workerThreads; 54 private Thread[] m_workerThreads;
47 private PollServiceWorkerThread[] m_PollServiceWorkerThreads; 55 private Thread m_retrysThread;
48 private volatile bool m_running = true; 56
49 private int m_pollTimeout; 57 private bool m_running = true;
58 private int slowCount = 0;
59
60 private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2);
50 61
51 public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) 62 public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout)
52 { 63 {
53 m_server = pSrv; 64 m_server = pSrv;
54 m_WorkerThreadCount = pWorkerThreadCount; 65 m_WorkerThreadCount = pWorkerThreadCount;
55 m_pollTimeout = pTimeout;
56 }
57
58 public void Start()
59 {
60 m_running = true;
61 m_workerThreads = new Thread[m_WorkerThreadCount]; 66 m_workerThreads = new Thread[m_WorkerThreadCount];
62 m_PollServiceWorkerThreads = new PollServiceWorkerThread[m_WorkerThreadCount];
63 67
64 //startup worker threads 68 //startup worker threads
65 for (uint i = 0; i < m_WorkerThreadCount; i++) 69 for (uint i = 0; i < m_WorkerThreadCount; i++)
66 { 70 {
67 m_PollServiceWorkerThreads[i] = new PollServiceWorkerThread(m_server, m_pollTimeout);
68 m_PollServiceWorkerThreads[i].ReQueue += ReQueueEvent;
69
70 m_workerThreads[i] 71 m_workerThreads[i]
71 = Watchdog.StartThread( 72 = Watchdog.StartThread(
72 m_PollServiceWorkerThreads[i].ThreadStart, 73 PoolWorkerJob,
73 String.Format("PollServiceWorkerThread{0}", i), 74 String.Format("PollServiceWorkerThread{0}", i),
74 ThreadPriority.Normal, 75 ThreadPriority.Normal,
75 false, 76 false,
76 true, 77 false,
77 null, 78 null,
78 int.MaxValue); 79 int.MaxValue);
79 } 80 }
80 81
81 Watchdog.StartThread( 82 m_retrysThread = Watchdog.StartThread(
82 this.ThreadStart, 83 this.CheckRetries,
83 "PollServiceWatcherThread", 84 "PollServiceWatcherThread",
84 ThreadPriority.Normal, 85 ThreadPriority.Normal,
85 false, 86 false,
@@ -88,78 +89,206 @@ namespace OpenSim.Framework.Servers.HttpServer
88 1000 * 60 * 10); 89 1000 * 60 * 10);
89 } 90 }
90 91
91 internal void ReQueueEvent(PollServiceHttpRequest req) 92
93 private void ReQueueEvent(PollServiceHttpRequest req)
92 { 94 {
93 // Do accounting stuff here 95 if (m_running)
94 Enqueue(req); 96 {
97 lock (m_retryRequests)
98 m_retryRequests.Enqueue(req);
99 }
95 } 100 }
96 101
97 public void Enqueue(PollServiceHttpRequest req) 102 public void Enqueue(PollServiceHttpRequest req)
98 { 103 {
99 lock (m_requests) 104 if (m_running)
100 m_requests.Enqueue(req); 105 {
106 if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.Normal)
107 {
108 m_requests.Enqueue(req);
109 }
110 else
111 {
112 lock (m_slowRequests)
113 m_slowRequests.Enqueue(req);
114 }
115 }
101 } 116 }
102 117
103 public void ThreadStart() 118 private void CheckRetries()
104 { 119 {
105 while (m_running) 120 while (m_running)
106 { 121 {
122 Thread.Sleep(100); // let the world move .. back to faster rate
107 Watchdog.UpdateThread(); 123 Watchdog.UpdateThread();
108 ProcessQueuedRequests(); 124 lock (m_retryRequests)
109 Thread.Sleep(1000); 125 {
126 while (m_retryRequests.Count > 0 && m_running)
127 m_requests.Enqueue(m_retryRequests.Dequeue());
128 }
129 slowCount++;
130 if (slowCount >= 10)
131 {
132 slowCount = 0;
133
134 lock (m_slowRequests)
135 {
136 while (m_slowRequests.Count > 0 && m_running)
137 m_requests.Enqueue(m_slowRequests.Dequeue());
138 }
139 }
110 } 140 }
111 } 141 }
112 142
113 private void ProcessQueuedRequests() 143 ~PollServiceRequestManager()
114 { 144 {
115 lock (m_requests) 145 m_running = false;
146 Thread.Sleep(1000); // let the world move
147
148 foreach (Thread t in m_workerThreads)
149 Watchdog.AbortThread(t.ManagedThreadId);
150
151 try
152 {
153 foreach (PollServiceHttpRequest req in m_retryRequests)
154 {
155 DoHTTPGruntWork(m_server,req,
156 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
157 }
158 }
159 catch
160 {
161 }
162
163 PollServiceHttpRequest wreq;
164 m_retryRequests.Clear();
165
166 lock (m_slowRequests)
116 { 167 {
117 if (m_requests.Count == 0) 168 while (m_slowRequests.Count > 0 && m_running)
118 return; 169 m_requests.Enqueue(m_slowRequests.Dequeue());
170 }
171
172 while (m_requests.Count() > 0)
173 {
174 try
175 {
176 wreq = m_requests.Dequeue(0);
177 DoHTTPGruntWork(m_server,wreq,
178 wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
179 }
180 catch
181 {
182 }
183 }
119 184
120// m_log.DebugFormat("[POLL SERVICE REQUEST MANAGER]: Processing {0} requests", m_requests.Count); 185 m_requests.Clear();
186 }
121 187
122 int reqperthread = (int) (m_requests.Count/m_WorkerThreadCount) + 1; 188 // work threads
123 189
124 // For Each WorkerThread 190 private void PoolWorkerJob()
125 for (int tc = 0; tc < m_WorkerThreadCount && m_requests.Count > 0; tc++) 191 {
192 while (m_running)
193 {
194 PollServiceHttpRequest req = m_requests.Dequeue(5000);
195
196 Watchdog.UpdateThread();
197 if (req != null)
126 { 198 {
127 //Loop over number of requests each thread handles. 199 try
128 for (int i = 0; i < reqperthread && m_requests.Count > 0; i++)
129 { 200 {
130 try 201 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
131 { 202 {
132 m_PollServiceWorkerThreads[tc].Enqueue((PollServiceHttpRequest)m_requests.Dequeue()); 203 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
204
205 if (responsedata == null)
206 continue;
207
208 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.Normal) // This is the event queue
209 {
210 try
211 {
212 DoHTTPGruntWork(m_server, req, responsedata);
213 }
214 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
215 {
216 // Ignore it, no need to reply
217 }
218 }
219 else
220 {
221 m_threadPool.QueueWorkItem(x =>
222 {
223 try
224 {
225 DoHTTPGruntWork(m_server, req, responsedata);
226 }
227 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
228 {
229 // Ignore it, no need to reply
230 }
231
232 return null;
233 }, null);
234 }
133 } 235 }
134 catch (InvalidOperationException) 236 else
135 { 237 {
136 // The queue is empty, we did our calculations wrong! 238 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
137 return; 239 {
240 DoHTTPGruntWork(m_server, req,
241 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
242 }
243 else
244 {
245 ReQueueEvent(req);
246 }
138 } 247 }
139 248 }
249 catch (Exception e)
250 {
251 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
140 } 252 }
141 } 253 }
142 } 254 }
143
144 } 255 }
145 256
146 public void Stop() 257 // DoHTTPGruntWork changed, not sending response
258 // do the same work around as core
259
260 internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata)
147 { 261 {
148 m_running = false; 262 OSHttpResponse response
263 = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext);
149 264
150 foreach (object o in m_requests) 265 byte[] buffer = server.DoHTTPGruntWork(responsedata, response);
151 {
152 PollServiceHttpRequest req = (PollServiceHttpRequest) o;
153 PollServiceWorkerThread.DoHTTPGruntWork(
154 m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
155 }
156 266
157 m_requests.Clear(); 267 response.SendChunked = false;
268 response.ContentLength64 = buffer.Length;
269 response.ContentEncoding = Encoding.UTF8;
158 270
159 foreach (Thread t in m_workerThreads) 271 try
160 { 272 {
161 t.Abort(); 273 response.OutputStream.Write(buffer, 0, buffer.Length);
274 }
275 catch (Exception ex)
276 {
277 m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex));
278 }
279 finally
280 {
281 try
282 {
283 response.OutputStream.Flush();
284 response.Send();
285 }
286 catch (Exception e)
287 {
288 m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e));
289 }
162 } 290 }
163 } 291 }
164 } 292 }
165} \ No newline at end of file 293}
294