aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
diff options
context:
space:
mode:
authorMelanie2013-06-07 23:43:45 +0100
committerMelanie2013-06-07 23:43:45 +0100
commit7c0bfca7a03584dd65c5659f177b434ee94ddc9d (patch)
tree82ceefae330b2c2b80a20db68abd79ae2439b034 /OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
parentminor: add dr0berts to contributors list (diff)
downloadopensim-SC-7c0bfca7a03584dd65c5659f177b434ee94ddc9d.zip
opensim-SC-7c0bfca7a03584dd65c5659f177b434ee94ddc9d.tar.gz
opensim-SC-7c0bfca7a03584dd65c5659f177b434ee94ddc9d.tar.bz2
opensim-SC-7c0bfca7a03584dd65c5659f177b434ee94ddc9d.tar.xz
Adding Avination's PollService to round out the HTTP inventory changes
Diffstat (limited to '')
-rw-r--r--OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs248
1 files changed, 192 insertions, 56 deletions
diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
index 3e84c55..a5380c1 100644
--- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
+++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs
@@ -33,53 +33,56 @@ using log4net;
33using HttpServer; 33using HttpServer;
34using OpenSim.Framework; 34using OpenSim.Framework;
35using OpenSim.Framework.Monitoring; 35using OpenSim.Framework.Monitoring;
36using Amib.Threading;
37using System.IO;
38using System.Text;
39using System.Collections.Generic;
36 40
37namespace OpenSim.Framework.Servers.HttpServer 41namespace OpenSim.Framework.Servers.HttpServer
38{ 42{
39 public class PollServiceRequestManager 43 public class PollServiceRequestManager
40 { 44 {
41// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); 45 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
42 46
43 private readonly BaseHttpServer m_server; 47 private readonly BaseHttpServer m_server;
44 private static Queue m_requests = Queue.Synchronized(new Queue()); 48
49 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
50 private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>();
51 private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
52
45 private uint m_WorkerThreadCount = 0; 53 private uint m_WorkerThreadCount = 0;
46 private Thread[] m_workerThreads; 54 private Thread[] m_workerThreads;
47 private PollServiceWorkerThread[] m_PollServiceWorkerThreads; 55 private Thread m_retrysThread;
48 private volatile bool m_running = true; 56
49 private int m_pollTimeout; 57 private bool m_running = true;
58 private int slowCount = 0;
59
60 private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2);
61
62// private int m_timeout = 1000; // increase timeout 250; now use the event one
50 63
51 public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) 64 public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout)
52 { 65 {
53 m_server = pSrv; 66 m_server = pSrv;
54 m_WorkerThreadCount = pWorkerThreadCount; 67 m_WorkerThreadCount = pWorkerThreadCount;
55 m_pollTimeout = pTimeout;
56 }
57
58 public void Start()
59 {
60 m_running = true;
61 m_workerThreads = new Thread[m_WorkerThreadCount]; 68 m_workerThreads = new Thread[m_WorkerThreadCount];
62 m_PollServiceWorkerThreads = new PollServiceWorkerThread[m_WorkerThreadCount];
63 69
64 //startup worker threads 70 //startup worker threads
65 for (uint i = 0; i < m_WorkerThreadCount; i++) 71 for (uint i = 0; i < m_WorkerThreadCount; i++)
66 { 72 {
67 m_PollServiceWorkerThreads[i] = new PollServiceWorkerThread(m_server, m_pollTimeout);
68 m_PollServiceWorkerThreads[i].ReQueue += ReQueueEvent;
69
70 m_workerThreads[i] 73 m_workerThreads[i]
71 = Watchdog.StartThread( 74 = Watchdog.StartThread(
72 m_PollServiceWorkerThreads[i].ThreadStart, 75 PoolWorkerJob,
73 String.Format("PollServiceWorkerThread{0}", i), 76 String.Format("PollServiceWorkerThread{0}", i),
74 ThreadPriority.Normal, 77 ThreadPriority.Normal,
75 false, 78 false,
76 true, 79 false,
77 null, 80 null,
78 int.MaxValue); 81 int.MaxValue);
79 } 82 }
80 83
81 Watchdog.StartThread( 84 m_retrysThread = Watchdog.StartThread(
82 this.ThreadStart, 85 this.CheckRetries,
83 "PollServiceWatcherThread", 86 "PollServiceWatcherThread",
84 ThreadPriority.Normal, 87 ThreadPriority.Normal,
85 false, 88 false,
@@ -88,78 +91,211 @@ namespace OpenSim.Framework.Servers.HttpServer
88 1000 * 60 * 10); 91 1000 * 60 * 10);
89 } 92 }
90 93
91 internal void ReQueueEvent(PollServiceHttpRequest req) 94
95 private void ReQueueEvent(PollServiceHttpRequest req)
92 { 96 {
93 // Do accounting stuff here 97 if (m_running)
94 Enqueue(req); 98 {
99 lock (m_retryRequests)
100 m_retryRequests.Enqueue(req);
101 }
95 } 102 }
96 103
97 public void Enqueue(PollServiceHttpRequest req) 104 public void Enqueue(PollServiceHttpRequest req)
98 { 105 {
99 lock (m_requests) 106 if (m_running)
100 m_requests.Enqueue(req); 107 {
108 if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.Normal)
109 {
110 m_requests.Enqueue(req);
111 }
112 else
113 {
114 lock (m_slowRequests)
115 m_slowRequests.Enqueue(req);
116 }
117 }
101 } 118 }
102 119
103 public void ThreadStart() 120 private void CheckRetries()
104 { 121 {
105 while (m_running) 122 while (m_running)
106 { 123 {
124 Thread.Sleep(100); // let the world move .. back to faster rate
107 Watchdog.UpdateThread(); 125 Watchdog.UpdateThread();
108 ProcessQueuedRequests(); 126 lock (m_retryRequests)
109 Thread.Sleep(1000); 127 {
128 while (m_retryRequests.Count > 0 && m_running)
129 m_requests.Enqueue(m_retryRequests.Dequeue());
130 }
131 slowCount++;
132 if (slowCount >= 10)
133 {
134 slowCount = 0;
135
136 lock (m_slowRequests)
137 {
138 while (m_slowRequests.Count > 0 && m_running)
139 m_requests.Enqueue(m_slowRequests.Dequeue());
140 }
141 }
110 } 142 }
111 } 143 }
112 144
113 private void ProcessQueuedRequests() 145 ~PollServiceRequestManager()
114 { 146 {
115 lock (m_requests) 147 m_running = false;
148// m_timeout = -10000; // cause all to expire
149 Thread.Sleep(1000); // let the world move
150
151 foreach (Thread t in m_workerThreads)
152 Watchdog.AbortThread(t.ManagedThreadId);
153
154 try
155 {
156 foreach (PollServiceHttpRequest req in m_retryRequests)
157 {
158 DoHTTPGruntWork(m_server,req,
159 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
160 }
161 }
162 catch
163 {
164 }
165
166 PollServiceHttpRequest wreq;
167 m_retryRequests.Clear();
168
169 lock (m_slowRequests)
116 { 170 {
117 if (m_requests.Count == 0) 171 while (m_slowRequests.Count > 0 && m_running)
118 return; 172 m_requests.Enqueue(m_slowRequests.Dequeue());
173 }
119 174
120// m_log.DebugFormat("[POLL SERVICE REQUEST MANAGER]: Processing {0} requests", m_requests.Count); 175 while (m_requests.Count() > 0)
176 {
177 try
178 {
179 wreq = m_requests.Dequeue(0);
180 DoHTTPGruntWork(m_server,wreq,
181 wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
182 }
183 catch
184 {
185 }
186 }
121 187
122 int reqperthread = (int) (m_requests.Count/m_WorkerThreadCount) + 1; 188 m_requests.Clear();
189 }
123 190
124 // For Each WorkerThread 191 // work threads
125 for (int tc = 0; tc < m_WorkerThreadCount && m_requests.Count > 0; tc++) 192
193 private void PoolWorkerJob()
194 {
195 while (m_running)
196 {
197 PollServiceHttpRequest req = m_requests.Dequeue(5000);
198
199 Watchdog.UpdateThread();
200 if (req != null)
126 { 201 {
127 //Loop over number of requests each thread handles. 202 try
128 for (int i = 0; i < reqperthread && m_requests.Count > 0; i++)
129 { 203 {
130 try 204 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
131 { 205 {
132 m_PollServiceWorkerThreads[tc].Enqueue((PollServiceHttpRequest)m_requests.Dequeue()); 206 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
207
208 if (responsedata == null)
209 continue;
210
211 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.Normal) // This is the event queue
212 {
213 try
214 {
215 DoHTTPGruntWork(m_server, req, responsedata);
216 }
217 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
218 {
219 // Ignore it, no need to reply
220 }
221 }
222 else
223 {
224 m_threadPool.QueueWorkItem(x =>
225 {
226 try
227 {
228 DoHTTPGruntWork(m_server, req, responsedata);
229 }
230 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
231 {
232 // Ignore it, no need to reply
233 }
234
235 return null;
236 }, null);
237 }
133 } 238 }
134 catch (InvalidOperationException) 239 else
135 { 240 {
136 // The queue is empty, we did our calculations wrong! 241 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
137 return; 242 {
243 DoHTTPGruntWork(m_server, req,
244 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
245 }
246 else
247 {
248 ReQueueEvent(req);
249 }
138 } 250 }
139 251 }
252 catch (Exception e)
253 {
254 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
140 } 255 }
141 } 256 }
142 } 257 }
143
144 } 258 }
145 259
146 public void Stop() 260 // DoHTTPGruntWork changed, not sending response
261 // do the same work around as core
262
263 internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata)
147 { 264 {
148 m_running = false; 265 OSHttpResponse response
266 = new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext);
149 267
150 foreach (object o in m_requests) 268 byte[] buffer = server.DoHTTPGruntWork(responsedata, response);
151 {
152 PollServiceHttpRequest req = (PollServiceHttpRequest) o;
153 PollServiceWorkerThread.DoHTTPGruntWork(
154 m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
155 }
156 269
157 m_requests.Clear(); 270 response.SendChunked = false;
271 response.ContentLength64 = buffer.Length;
272 response.ContentEncoding = Encoding.UTF8;
158 273
159 foreach (Thread t in m_workerThreads) 274 try
275 {
276 response.OutputStream.Write(buffer, 0, buffer.Length);
277 }
278 catch (Exception ex)
160 { 279 {
161 t.Abort(); 280 m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex));
281 }
282 finally
283 {
284 //response.OutputStream.Close();
285 try
286 {
287 response.OutputStream.Flush();
288 response.Send();
289
290 //if (!response.KeepAlive && response.ReuseContext)
291 // response.FreeContext();
292 }
293 catch (Exception e)
294 {
295 m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e));
296 }
162 } 297 }
163 } 298 }
164 } 299 }
165} \ No newline at end of file 300}
301