aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Region/Environment/Modules/Framework/EventQueue/EventQueueGetModule.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Region/Environment/Modules/Framework/EventQueue/EventQueueGetModule.cs')
-rw-r--r--OpenSim/Region/Environment/Modules/Framework/EventQueue/EventQueueGetModule.cs565
1 files changed, 565 insertions, 0 deletions
diff --git a/OpenSim/Region/Environment/Modules/Framework/EventQueue/EventQueueGetModule.cs b/OpenSim/Region/Environment/Modules/Framework/EventQueue/EventQueueGetModule.cs
new file mode 100644
index 0000000..dc2a70c
--- /dev/null
+++ b/OpenSim/Region/Environment/Modules/Framework/EventQueue/EventQueueGetModule.cs
@@ -0,0 +1,565 @@
1/*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSim Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27using System;
28using System.Collections;
29using System.Collections.Generic;
30using System.Net;
31using System.Net.Sockets;
32using System.Reflection;
33using System.Threading;
34using System.Xml;
35using OpenMetaverse;
36using OpenMetaverse.StructuredData;
37using log4net;
38using Nini.Config;
39using Nwc.XmlRpc;
40using OpenSim.Framework;
41using OpenSim.Framework.Communications.Cache;
42using OpenSim.Framework.Communications.Capabilities;
43using OpenSim.Framework.Servers;
44using OpenSim.Region.Environment.Interfaces;
45using OpenSim.Region.Interfaces;
46using OpenSim.Region.Environment.Scenes;
47
48using OSD = OpenMetaverse.StructuredData.OSD;
49using OSDMap = OpenMetaverse.StructuredData.OSDMap;
50using OSDArray = OpenMetaverse.StructuredData.OSDArray;
51using Caps = OpenSim.Framework.Communications.Capabilities.Caps;
52using BlockingLLSDQueue = OpenSim.Framework.BlockingQueue<OpenMetaverse.StructuredData.OSD>;
53
54namespace OpenSim.Region.Environment.Modules.Framework
55{
56 public struct QueueItem
57 {
58 public int id;
59 public OSDMap body;
60 }
61
62 public class EventQueueGetModule : IEventQueue, IRegionModule
63 {
64 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
65 private Scene m_scene = null;
66 private IConfigSource m_gConfig;
67 bool enabledYN = false;
68
69 private Dictionary<UUID, int> m_ids = new Dictionary<UUID, int>();
70
71 private Dictionary<UUID, BlockingLLSDQueue> queues = new Dictionary<UUID, BlockingLLSDQueue>();
72 private Dictionary<UUID, UUID> m_QueueUUIDAvatarMapping = new Dictionary<UUID, UUID>();
73 private Dictionary<UUID, UUID> m_AvatarQueueUUIDMapping = new Dictionary<UUID, UUID>();
74
75
76 #region IRegionModule methods
77 public void Initialise(Scene scene, IConfigSource config)
78 {
79 m_gConfig = config;
80
81 IConfig startupConfig = m_gConfig.Configs["Startup"];
82
83 ReadConfigAndPopulate(scene, startupConfig, "Startup");
84
85 if (enabledYN)
86 {
87 m_scene = scene;
88 scene.RegisterModuleInterface<IEventQueue>(this);
89
90 // Register fallback handler
91 // Why does EQG Fail on region crossings!
92
93 //scene.AddLLSDHandler("/CAPS/EQG/", EventQueueFallBack);
94
95 scene.EventManager.OnNewClient += OnNewClient;
96
97 // TODO: Leaving these open, or closing them when we
98 // become a child is incorrect. It messes up TP in a big
99 // way. CAPS/EQ need to be active as long as the UDP
100 // circuit is there.
101
102 scene.EventManager.OnClientClosed += ClientClosed;
103 scene.EventManager.OnAvatarEnteringNewParcel += AvatarEnteringParcel;
104 scene.EventManager.OnMakeChildAgent += MakeChildAgent;
105 scene.EventManager.OnRegisterCaps += OnRegisterCaps;
106
107 m_log.DebugFormat("[EVENTQUEUE]: Enabled EventQueueGetModule for region {0}", scene.RegionInfo.RegionName);
108 }
109 else
110 {
111 m_gConfig = null;
112 }
113
114 }
115
116 private void ReadConfigAndPopulate(Scene scene, IConfig startupConfig, string p)
117 {
118 enabledYN = startupConfig.GetBoolean("EventQueue", true);
119 }
120
121 public void PostInitialise()
122 {
123 }
124
125 public void Close()
126 {
127 }
128
129 public string Name
130 {
131 get { return "EventQueueGetModule"; }
132 }
133
134 public bool IsSharedModule
135 {
136 get { return false; }
137 }
138 #endregion
139
140 /// <summary>
141 /// Always returns a valid queue
142 /// </summary>
143 /// <param name="agentId"></param>
144 /// <returns></returns>
145 private BlockingLLSDQueue TryGetQueue(UUID agentId)
146 {
147 lock (queues)
148 {
149 if (!queues.ContainsKey(agentId))
150 {
151 m_log.DebugFormat("[EVENTQUEUE]: Adding new queue for agent {0} in region {1}", agentId,
152 m_scene.RegionInfo.RegionName);
153 queues[agentId] = new BlockingLLSDQueue();
154 }
155 return queues[agentId];
156 }
157 }
158
159 /// <summary>
160 /// May return a null queue
161 /// </summary>
162 /// <param name="agentId"></param>
163 /// <returns></returns>
164 private BlockingLLSDQueue GetQueue(UUID agentId)
165 {
166 lock (queues)
167 {
168 if (queues.ContainsKey(agentId))
169 {
170 return queues[agentId];
171 }
172 else
173 return null;
174 }
175 }
176
177 #region IEventQueue Members
178
179 public bool Enqueue(OSD ev, UUID avatarID)
180 {
181 m_log.DebugFormat("[EVENTQUEUE]: Enqueuing event for {0} in region {1}", avatarID, m_scene.RegionInfo.RegionName);
182 try
183 {
184 BlockingLLSDQueue queue = GetQueue(avatarID);
185 if (queue != null)
186 queue.Enqueue(ev);
187 } catch(NullReferenceException e)
188 {
189 m_log.Debug("[EVENTQUEUE] Caught exception: " + e);
190 return false;
191 }
192 return true;
193 }
194
195 #endregion
196
197 private void OnNewClient(IClientAPI client)
198 {
199 //client.OnLogout += ClientClosed;
200 }
201
202// private void ClientClosed(IClientAPI client)
203// {
204// ClientClosed(client.AgentId);
205// }
206
207 private void ClientClosed(UUID AgentID)
208 {
209 m_log.DebugFormat("[EVENTQUEUE]: Closed client {0} in region {1}", AgentID, m_scene.RegionInfo.RegionName);
210
211 int count = 0;
212 while (queues.ContainsKey(AgentID) && queues[AgentID].Count() > 0 && count++ < 5)
213 {
214 Thread.Sleep(1000);
215 }
216
217 lock (queues)
218 {
219 queues.Remove(AgentID);
220 }
221 List<UUID> removeitems = new List<UUID>();
222 lock (m_AvatarQueueUUIDMapping)
223 {
224 foreach (UUID ky in m_AvatarQueueUUIDMapping.Keys)
225 {
226 if (ky == AgentID)
227 {
228 removeitems.Add(ky);
229 }
230 }
231
232 foreach (UUID ky in removeitems)
233 {
234 m_AvatarQueueUUIDMapping.Remove(ky);
235 m_scene.RemoveHTTPHandler("","/CAPS/EQG/" + ky.ToString() + "/");
236 m_log.Debug("[EVENTQUEUE]: Removing " + "/CAPS/EQG/" + ky.ToString() + "/");
237 }
238
239 }
240 UUID searchval = UUID.Zero;
241
242 removeitems.Clear();
243
244 lock (m_QueueUUIDAvatarMapping)
245 {
246 foreach (UUID ky in m_QueueUUIDAvatarMapping.Keys)
247 {
248 searchval = m_QueueUUIDAvatarMapping[ky];
249
250 if (searchval == AgentID)
251 {
252 removeitems.Add(ky);
253 }
254 }
255
256 foreach (UUID ky in removeitems)
257 m_QueueUUIDAvatarMapping.Remove(ky);
258
259 }
260
261 m_log.DebugFormat("[EVENTQUEUE]: Client {0} deregistered in region {1}.", AgentID, m_scene.RegionInfo.RegionName);
262 }
263
264 private void AvatarEnteringParcel(ScenePresence avatar, int localLandID, UUID regionID)
265 {
266 m_log.DebugFormat("[EVENTQUEUE]: Avatar {0} entering parcel {1} in region {2}.",
267 avatar.UUID, localLandID, m_scene.RegionInfo.RegionName);
268 }
269
270 private void MakeChildAgent(ScenePresence avatar)
271 {
272 //m_log.DebugFormat("[EVENTQUEUE]: Make Child agent {0} in region {1}.", avatar.UUID, m_scene.RegionInfo.RegionName);
273 //lock (m_ids)
274 // {
275 //if (m_ids.ContainsKey(avatar.UUID))
276 //{
277 // close the event queue.
278 //m_ids[avatar.UUID] = -1;
279 //}
280 //}
281 }
282
283 public void OnRegisterCaps(UUID agentID, Caps caps)
284 {
285 m_log.DebugFormat("[EVENTQUEUE] OnRegisterCaps: agentID {0} caps {1} region {2}", agentID, caps, m_scene.RegionInfo.RegionName);
286
287 // Let's instantiate a Queue for this agent right now
288 TryGetQueue(agentID);
289
290 string capsBase = "/CAPS/EQG/";
291 UUID EventQueueGetUUID = UUID.Zero;
292
293 lock (m_AvatarQueueUUIDMapping)
294 {
295 // Reuse open queues. The client does!
296 if (m_AvatarQueueUUIDMapping.ContainsKey(agentID))
297 {
298 m_log.DebugFormat("[EVENTQUEUE]: Found Existing UUID!");
299 EventQueueGetUUID = m_AvatarQueueUUIDMapping[agentID];
300 }
301 else
302 {
303 EventQueueGetUUID = UUID.Random();
304 m_log.DebugFormat("[EVENTQUEUE]: Using random UUID!");
305 }
306 }
307
308 lock (m_QueueUUIDAvatarMapping)
309 {
310 if (!m_QueueUUIDAvatarMapping.ContainsKey(EventQueueGetUUID))
311 m_QueueUUIDAvatarMapping.Add(EventQueueGetUUID, agentID);
312 }
313
314 lock (m_AvatarQueueUUIDMapping)
315 {
316 if (!m_AvatarQueueUUIDMapping.ContainsKey(agentID))
317 m_AvatarQueueUUIDMapping.Add(agentID, EventQueueGetUUID);
318 }
319
320 m_log.DebugFormat("[EVENTQUEUE]: CAPS URL: {0}", capsBase + EventQueueGetUUID.ToString() + "/");
321 // Register this as a caps handler
322 caps.RegisterHandler("EventQueueGet",
323 new RestHTTPHandler("POST", capsBase + EventQueueGetUUID.ToString() + "/",
324 delegate(Hashtable m_dhttpMethod)
325 {
326 return ProcessQueue(m_dhttpMethod,agentID, caps);
327 }));
328
329 // This will persist this beyond the expiry of the caps handlers
330 m_scene.AddHTTPHandler(capsBase + EventQueueGetUUID.ToString() + "/", EventQueuePath2);
331
332 Random rnd = new Random(System.Environment.TickCount);
333 lock (m_ids)
334 {
335 if (!m_ids.ContainsKey(agentID))
336 m_ids.Add(agentID, rnd.Next(30000000));
337 }
338 }
339
340 public Hashtable ProcessQueue(Hashtable request,UUID agentID, Caps caps)
341 {
342 // TODO: this has to be redone to not busy-wait (and block the thread),
343 // TODO: as soon as we have a non-blocking way to handle HTTP-requests.
344
345// if (m_log.IsDebugEnabled)
346// {
347// String debug = "[EVENTQUEUE]: Got request for agent {0} in region {1} from thread {2}: [ ";
348// foreach (object key in request.Keys)
349// {
350// debug += key.ToString() + "=" + request[key].ToString() + " ";
351// }
352// m_log.DebugFormat(debug + " ]", agentID, m_scene.RegionInfo.RegionName, System.Threading.Thread.CurrentThread.Name);
353// }
354
355 BlockingLLSDQueue queue = TryGetQueue(agentID);
356 OSD element = queue.Dequeue(15000); // 15s timeout
357
358 Hashtable responsedata = new Hashtable();
359
360 int thisID = 0;
361 lock (m_ids)
362 thisID = m_ids[agentID];
363
364 if (element == null)
365 {
366 //m_log.ErrorFormat("[EVENTQUEUE]: Nothing to process in " + m_scene.RegionInfo.RegionName);
367 if (thisID == -1) // close-request
368 {
369 m_log.ErrorFormat("[EVENTQUEUE]: 404 in " + m_scene.RegionInfo.RegionName);
370 responsedata["int_response_code"] = 404; //501; //410; //404;
371 responsedata["content_type"] = "text/plain";
372 responsedata["keepalive"] = false;
373 responsedata["str_response_string"] = "Closed EQG";
374 return responsedata;
375 }
376 responsedata["int_response_code"] = 502;
377 responsedata["content_type"] = "text/plain";
378 responsedata["keepalive"] = false;
379 responsedata["str_response_string"] = "Upstream error: ";
380 responsedata["error_status_text"] = "Upstream error:";
381 responsedata["http_protocol_version"] = "HTTP/1.0";
382 return responsedata;
383 }
384
385
386
387 OSDArray array = new OSDArray();
388 if (element == null) // didn't have an event in 15s
389 {
390 // Send it a fake event to keep the client polling! It doesn't like 502s like the proxys say!
391 array.Add(EventQueueHelper.KeepAliveEvent());
392 m_log.DebugFormat("[EVENTQUEUE]: adding fake event for {0} in region {1}", agentID, m_scene.RegionInfo.RegionName);
393 }
394 else
395 {
396 array.Add(element);
397 while (queue.Count() > 0)
398 {
399 array.Add(queue.Dequeue(1));
400 thisID++;
401 }
402 }
403
404 OSDMap events = new OSDMap();
405 events.Add("events", array);
406
407 events.Add("id", new OSDInteger(thisID));
408 lock (m_ids)
409 {
410 m_ids[agentID] = thisID + 1;
411 }
412
413 responsedata["int_response_code"] = 200;
414 responsedata["content_type"] = "application/xml";
415 responsedata["keepalive"] = false;
416 responsedata["str_response_string"] = OSDParser.SerializeLLSDXmlString(events);
417 m_log.DebugFormat("[EVENTQUEUE]: sending response for {0} in region {1}: {2}", agentID, m_scene.RegionInfo.RegionName, responsedata["str_response_string"]);
418
419 return responsedata;
420 }
421
422 public Hashtable EventQueuePath2(Hashtable request)
423 {
424 string capuuid = (string)request["uri"]; //path.Replace("/CAPS/EQG/","");
425 // pull off the last "/" in the path.
426 Hashtable responsedata = new Hashtable();
427 capuuid = capuuid.Substring(0, capuuid.Length - 1);
428 capuuid = capuuid.Replace("/CAPS/EQG/", "");
429 UUID AvatarID = UUID.Zero;
430 UUID capUUID = UUID.Zero;
431
432 // parse the path and search for the avatar with it registered
433 if (UUID.TryParse(capuuid, out capUUID))
434 {
435 lock (m_QueueUUIDAvatarMapping)
436 {
437 if (m_QueueUUIDAvatarMapping.ContainsKey(capUUID))
438 {
439 AvatarID = m_QueueUUIDAvatarMapping[capUUID];
440 }
441 }
442 if (AvatarID != UUID.Zero)
443 {
444 // m_scene.GetCapsHandlerForUser will return null if the agent doesn't have a caps handler
445 // registered
446 return ProcessQueue(request, AvatarID, m_scene.GetCapsHandlerForUser(AvatarID));
447 }
448 else
449 {
450 responsedata["int_response_code"] = 404;
451 responsedata["content_type"] = "text/plain";
452 responsedata["keepalive"] = false;
453 responsedata["str_response_string"] = "Not Found";
454 responsedata["error_status_text"] = "Not Found";
455 responsedata["http_protocol_version"] = "HTTP/1.0";
456 return responsedata;
457 // return 404
458 }
459 }
460 else
461 {
462 responsedata["int_response_code"] = 404;
463 responsedata["content_type"] = "text/plain";
464 responsedata["keepalive"] = false;
465 responsedata["str_response_string"] = "Not Found";
466 responsedata["error_status_text"] = "Not Found";
467 responsedata["http_protocol_version"] = "HTTP/1.0";
468 return responsedata;
469 // return 404
470 }
471
472 }
473
474 public OSD EventQueueFallBack(string path, OSD request, string endpoint)
475 {
476 // This is a fallback element to keep the client from loosing EventQueueGet
477 // Why does CAPS fail sometimes!?
478 m_log.Warn("[EVENTQUEUE]: In the Fallback handler! We lost the Queue in the rest handler!");
479 string capuuid = path.Replace("/CAPS/EQG/","");
480 capuuid = capuuid.Substring(0, capuuid.Length - 1);
481
482// UUID AvatarID = UUID.Zero;
483 UUID capUUID = UUID.Zero;
484 if (UUID.TryParse(capuuid, out capUUID))
485 {
486/* Don't remove this yet code cleaners!
487 * Still testing this!
488 *
489 lock (m_QueueUUIDAvatarMapping)
490 {
491 if (m_QueueUUIDAvatarMapping.ContainsKey(capUUID))
492 {
493 AvatarID = m_QueueUUIDAvatarMapping[capUUID];
494 }
495 }
496
497
498 if (AvatarID != UUID.Zero)
499 {
500 // Repair the CAP!
501 //OpenSim.Framework.Communications.Capabilities.Caps caps = m_scene.GetCapsHandlerForUser(AvatarID);
502 //string capsBase = "/CAPS/EQG/";
503 //caps.RegisterHandler("EventQueueGet",
504 //new RestHTTPHandler("POST", capsBase + capUUID.ToString() + "/",
505 //delegate(Hashtable m_dhttpMethod)
506 //{
507 // return ProcessQueue(m_dhttpMethod, AvatarID, caps);
508 //}));
509 // start new ID sequence.
510 Random rnd = new Random(System.Environment.TickCount);
511 lock (m_ids)
512 {
513 if (!m_ids.ContainsKey(AvatarID))
514 m_ids.Add(AvatarID, rnd.Next(30000000));
515 }
516
517
518 int thisID = 0;
519 lock (m_ids)
520 thisID = m_ids[AvatarID];
521
522 BlockingLLSDQueue queue = GetQueue(AvatarID);
523 OSDArray array = new OSDArray();
524 LLSD element = queue.Dequeue(15000); // 15s timeout
525 if (element == null)
526 {
527
528 array.Add(EventQueueHelper.KeepAliveEvent());
529 }
530 else
531 {
532 array.Add(element);
533 while (queue.Count() > 0)
534 {
535 array.Add(queue.Dequeue(1));
536 thisID++;
537 }
538 }
539 OSDMap events = new OSDMap();
540 events.Add("events", array);
541
542 events.Add("id", new LLSDInteger(thisID));
543
544 lock (m_ids)
545 {
546 m_ids[AvatarID] = thisID + 1;
547 }
548
549 return events;
550 }
551 else
552 {
553 return new LLSD();
554 }
555*
556*/
557 }
558 else
559 {
560 //return new LLSD();
561 }
562 return new OSDString("shutdown404!");
563 }
564 }
565}