aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Region/CoreModules/Framework/EventQueue/EventQueueGetModule.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Region/CoreModules/Framework/EventQueue/EventQueueGetModule.cs')
-rw-r--r--OpenSim/Region/CoreModules/Framework/EventQueue/EventQueueGetModule.cs630
1 files changed, 630 insertions, 0 deletions
diff --git a/OpenSim/Region/CoreModules/Framework/EventQueue/EventQueueGetModule.cs b/OpenSim/Region/CoreModules/Framework/EventQueue/EventQueueGetModule.cs
new file mode 100644
index 0000000..e81466a
--- /dev/null
+++ b/OpenSim/Region/CoreModules/Framework/EventQueue/EventQueueGetModule.cs
@@ -0,0 +1,630 @@
1/*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSim Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28using System;
29using System.Collections;
30using System.Collections.Generic;
31using System.Net;
32using System.Net.Sockets;
33using System.Reflection;
34using System.Threading;
35using System.Xml;
36using OpenMetaverse;
37using OpenMetaverse.Packets;
38using OpenMetaverse.StructuredData;
39using log4net;
40using Nini.Config;
41using Nwc.XmlRpc;
42using OpenSim.Framework;
43using OpenSim.Framework.Communications.Cache;
44using OpenSim.Framework.Communications.Capabilities;
45using OpenSim.Framework.Servers;
46using OpenSim.Region.Framework.Interfaces;
47using OpenSim.Region.Framework.Scenes;
48
49using OSD = OpenMetaverse.StructuredData.OSD;
50using OSDMap = OpenMetaverse.StructuredData.OSDMap;
51using OSDArray = OpenMetaverse.StructuredData.OSDArray;
52using Caps = OpenSim.Framework.Communications.Capabilities.Caps;
53using BlockingLLSDQueue = OpenSim.Framework.BlockingQueue<OpenMetaverse.StructuredData.OSD>;
54
55namespace OpenSim.Region.CoreModules.Framework.EventQueue
56{
57 public struct QueueItem
58 {
59 public int id;
60 public OSDMap body;
61 }
62
63 public class EventQueueGetModule : IEventQueue, IRegionModule
64 {
65 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
66 private Scene m_scene = null;
67 private IConfigSource m_gConfig;
68 bool enabledYN = false;
69
70 private Dictionary<UUID, int> m_ids = new Dictionary<UUID, int>();
71
72 private Dictionary<UUID, BlockingLLSDQueue> queues = new Dictionary<UUID, BlockingLLSDQueue>();
73 private Dictionary<UUID, UUID> m_QueueUUIDAvatarMapping = new Dictionary<UUID, UUID>();
74 private Dictionary<UUID, UUID> m_AvatarQueueUUIDMapping = new Dictionary<UUID, UUID>();
75
76 #region IRegionModule methods
77 public void Initialise(Scene scene, IConfigSource config)
78 {
79 m_gConfig = config;
80
81 IConfig startupConfig = m_gConfig.Configs["Startup"];
82
83 ReadConfigAndPopulate(scene, startupConfig, "Startup");
84
85 if (enabledYN)
86 {
87 m_scene = scene;
88 scene.RegisterModuleInterface<IEventQueue>(this);
89
90 // Register fallback handler
91 // Why does EQG Fail on region crossings!
92
93 //scene.CommsManager.HttpServer.AddLLSDHandler("/CAPS/EQG/", EventQueueFallBack);
94
95 scene.EventManager.OnNewClient += OnNewClient;
96
97 // TODO: Leaving these open, or closing them when we
98 // become a child is incorrect. It messes up TP in a big
99 // way. CAPS/EQ need to be active as long as the UDP
100 // circuit is there.
101
102 scene.EventManager.OnClientClosed += ClientClosed;
103 scene.EventManager.OnMakeChildAgent += MakeChildAgent;
104 scene.EventManager.OnRegisterCaps += OnRegisterCaps;
105 }
106 else
107 {
108 m_gConfig = null;
109 }
110
111 }
112
113 private void ReadConfigAndPopulate(Scene scene, IConfig startupConfig, string p)
114 {
115 enabledYN = startupConfig.GetBoolean("EventQueue", true);
116 }
117
118 public void PostInitialise()
119 {
120 }
121
122 public void Close()
123 {
124 }
125
126 public string Name
127 {
128 get { return "EventQueueGetModule"; }
129 }
130
131 public bool IsSharedModule
132 {
133 get { return false; }
134 }
135 #endregion
136
137 /// <summary>
138 /// Always returns a valid queue
139 /// </summary>
140 /// <param name="agentId"></param>
141 /// <returns></returns>
142 private BlockingLLSDQueue TryGetQueue(UUID agentId)
143 {
144 lock (queues)
145 {
146 if (!queues.ContainsKey(agentId))
147 {
148 m_log.DebugFormat(
149 "[EVENTQUEUE]: Adding new queue for agent {0} in region {1}",
150 agentId, m_scene.RegionInfo.RegionName);
151
152 queues[agentId] = new BlockingLLSDQueue();
153 }
154
155 return queues[agentId];
156 }
157 }
158
159 /// <summary>
160 /// May return a null queue
161 /// </summary>
162 /// <param name="agentId"></param>
163 /// <returns></returns>
164 private BlockingLLSDQueue GetQueue(UUID agentId)
165 {
166 lock (queues)
167 {
168 if (queues.ContainsKey(agentId))
169 {
170 return queues[agentId];
171 }
172 else
173 return null;
174 }
175 }
176
177 #region IEventQueue Members
178
179 public bool Enqueue(OSD ev, UUID avatarID)
180 {
181 //m_log.DebugFormat("[EVENTQUEUE]: Enqueuing event for {0} in region {1}", avatarID, m_scene.RegionInfo.RegionName);
182 try
183 {
184 BlockingLLSDQueue queue = GetQueue(avatarID);
185 if (queue != null)
186 queue.Enqueue(ev);
187 }
188 catch(NullReferenceException e)
189 {
190 m_log.Error("[EVENTQUEUE] Caught exception: " + e);
191 return false;
192 }
193
194 return true;
195 }
196
197 #endregion
198
199 private void OnNewClient(IClientAPI client)
200 {
201 //client.OnLogout += ClientClosed;
202 }
203
204// private void ClientClosed(IClientAPI client)
205// {
206// ClientClosed(client.AgentId);
207// }
208
209 private void ClientClosed(UUID AgentID)
210 {
211 m_log.DebugFormat("[EVENTQUEUE]: Closed client {0} in region {1}", AgentID, m_scene.RegionInfo.RegionName);
212
213 int count = 0;
214 while (queues.ContainsKey(AgentID) && queues[AgentID].Count() > 0 && count++ < 5)
215 {
216 Thread.Sleep(1000);
217 }
218
219 lock (queues)
220 {
221 queues.Remove(AgentID);
222 }
223 List<UUID> removeitems = new List<UUID>();
224 lock (m_AvatarQueueUUIDMapping)
225 {
226 foreach (UUID ky in m_AvatarQueueUUIDMapping.Keys)
227 {
228 if (ky == AgentID)
229 {
230 removeitems.Add(ky);
231 }
232 }
233
234 foreach (UUID ky in removeitems)
235 {
236 m_AvatarQueueUUIDMapping.Remove(ky);
237 m_scene.CommsManager.HttpServer.RemoveHTTPHandler("","/CAPS/EQG/" + ky.ToString() + "/");
238 }
239
240 }
241 UUID searchval = UUID.Zero;
242
243 removeitems.Clear();
244
245 lock (m_QueueUUIDAvatarMapping)
246 {
247 foreach (UUID ky in m_QueueUUIDAvatarMapping.Keys)
248 {
249 searchval = m_QueueUUIDAvatarMapping[ky];
250
251 if (searchval == AgentID)
252 {
253 removeitems.Add(ky);
254 }
255 }
256
257 foreach (UUID ky in removeitems)
258 m_QueueUUIDAvatarMapping.Remove(ky);
259
260 }
261 }
262
263 private void MakeChildAgent(ScenePresence avatar)
264 {
265 //m_log.DebugFormat("[EVENTQUEUE]: Make Child agent {0} in region {1}.", avatar.UUID, m_scene.RegionInfo.RegionName);
266 //lock (m_ids)
267 // {
268 //if (m_ids.ContainsKey(avatar.UUID))
269 //{
270 // close the event queue.
271 //m_ids[avatar.UUID] = -1;
272 //}
273 //}
274 }
275
276 public void OnRegisterCaps(UUID agentID, Caps caps)
277 {
278 // Register an event queue for the client
279
280 //m_log.DebugFormat(
281 // "[EVENTQUEUE]: OnRegisterCaps: agentID {0} caps {1} region {2}",
282 // agentID, caps, m_scene.RegionInfo.RegionName);
283
284 // Let's instantiate a Queue for this agent right now
285 TryGetQueue(agentID);
286
287 string capsBase = "/CAPS/EQG/";
288 UUID EventQueueGetUUID = UUID.Zero;
289
290 lock (m_AvatarQueueUUIDMapping)
291 {
292 // Reuse open queues. The client does!
293 if (m_AvatarQueueUUIDMapping.ContainsKey(agentID))
294 {
295 m_log.DebugFormat("[EVENTQUEUE]: Found Existing UUID!");
296 EventQueueGetUUID = m_AvatarQueueUUIDMapping[agentID];
297 }
298 else
299 {
300 EventQueueGetUUID = UUID.Random();
301 //m_log.DebugFormat("[EVENTQUEUE]: Using random UUID!");
302 }
303 }
304
305 lock (m_QueueUUIDAvatarMapping)
306 {
307 if (!m_QueueUUIDAvatarMapping.ContainsKey(EventQueueGetUUID))
308 m_QueueUUIDAvatarMapping.Add(EventQueueGetUUID, agentID);
309 }
310
311 lock (m_AvatarQueueUUIDMapping)
312 {
313 if (!m_AvatarQueueUUIDMapping.ContainsKey(agentID))
314 m_AvatarQueueUUIDMapping.Add(agentID, EventQueueGetUUID);
315 }
316
317 // Register this as a caps handler
318 caps.RegisterHandler("EventQueueGet",
319 new RestHTTPHandler("POST", capsBase + EventQueueGetUUID.ToString() + "/",
320 delegate(Hashtable m_dhttpMethod)
321 {
322 return ProcessQueue(m_dhttpMethod, agentID, caps);
323 }));
324
325 // This will persist this beyond the expiry of the caps handlers
326 m_scene.CommsManager.HttpServer.AddHTTPHandler(
327 capsBase + EventQueueGetUUID.ToString() + "/", EventQueuePath2);
328
329 Random rnd = new Random(System.Environment.TickCount);
330 lock (m_ids)
331 {
332 if (!m_ids.ContainsKey(agentID))
333 m_ids.Add(agentID, rnd.Next(30000000));
334 }
335 }
336
337 public Hashtable ProcessQueue(Hashtable request, UUID agentID, Caps caps)
338 {
339 // TODO: this has to be redone to not busy-wait (and block the thread),
340 // TODO: as soon as we have a non-blocking way to handle HTTP-requests.
341
342// if (m_log.IsDebugEnabled)
343// {
344// String debug = "[EVENTQUEUE]: Got request for agent {0} in region {1} from thread {2}: [ ";
345// foreach (object key in request.Keys)
346// {
347// debug += key.ToString() + "=" + request[key].ToString() + " ";
348// }
349// m_log.DebugFormat(debug + " ]", agentID, m_scene.RegionInfo.RegionName, System.Threading.Thread.CurrentThread.Name);
350// }
351
352 BlockingLLSDQueue queue = TryGetQueue(agentID);
353 OSD element = queue.Dequeue(15000); // 15s timeout
354
355 Hashtable responsedata = new Hashtable();
356
357 int thisID = 0;
358 lock (m_ids)
359 thisID = m_ids[agentID];
360
361 if (element == null)
362 {
363 //m_log.ErrorFormat("[EVENTQUEUE]: Nothing to process in " + m_scene.RegionInfo.RegionName);
364 if (thisID == -1) // close-request
365 {
366 m_log.ErrorFormat("[EVENTQUEUE]: 404 in " + m_scene.RegionInfo.RegionName);
367 responsedata["int_response_code"] = 404; //501; //410; //404;
368 responsedata["content_type"] = "text/plain";
369 responsedata["keepalive"] = false;
370 responsedata["str_response_string"] = "Closed EQG";
371 return responsedata;
372 }
373 responsedata["int_response_code"] = 502;
374 responsedata["content_type"] = "text/plain";
375 responsedata["keepalive"] = false;
376 responsedata["str_response_string"] = "Upstream error: ";
377 responsedata["error_status_text"] = "Upstream error:";
378 responsedata["http_protocol_version"] = "HTTP/1.0";
379 return responsedata;
380 }
381
382 OSDArray array = new OSDArray();
383 if (element == null) // didn't have an event in 15s
384 {
385 // Send it a fake event to keep the client polling! It doesn't like 502s like the proxys say!
386 array.Add(EventQueueHelper.KeepAliveEvent());
387 m_log.DebugFormat("[EVENTQUEUE]: adding fake event for {0} in region {1}", agentID, m_scene.RegionInfo.RegionName);
388 }
389 else
390 {
391 array.Add(element);
392 while (queue.Count() > 0)
393 {
394 array.Add(queue.Dequeue(1));
395 thisID++;
396 }
397 }
398
399 OSDMap events = new OSDMap();
400 events.Add("events", array);
401
402 events.Add("id", new OSDInteger(thisID));
403 lock (m_ids)
404 {
405 m_ids[agentID] = thisID + 1;
406 }
407
408 responsedata["int_response_code"] = 200;
409 responsedata["content_type"] = "application/xml";
410 responsedata["keepalive"] = false;
411 responsedata["str_response_string"] = OSDParser.SerializeLLSDXmlString(events);
412 //m_log.DebugFormat("[EVENTQUEUE]: sending response for {0} in region {1}: {2}", agentID, m_scene.RegionInfo.RegionName, responsedata["str_response_string"]);
413
414 return responsedata;
415 }
416
417 public Hashtable EventQueuePath2(Hashtable request)
418 {
419 string capuuid = (string)request["uri"]; //path.Replace("/CAPS/EQG/","");
420 // pull off the last "/" in the path.
421 Hashtable responsedata = new Hashtable();
422 capuuid = capuuid.Substring(0, capuuid.Length - 1);
423 capuuid = capuuid.Replace("/CAPS/EQG/", "");
424 UUID AvatarID = UUID.Zero;
425 UUID capUUID = UUID.Zero;
426
427 // parse the path and search for the avatar with it registered
428 if (UUID.TryParse(capuuid, out capUUID))
429 {
430 lock (m_QueueUUIDAvatarMapping)
431 {
432 if (m_QueueUUIDAvatarMapping.ContainsKey(capUUID))
433 {
434 AvatarID = m_QueueUUIDAvatarMapping[capUUID];
435 }
436 }
437 if (AvatarID != UUID.Zero)
438 {
439 return ProcessQueue(request, AvatarID, m_scene.CapsModule.GetCapsHandlerForUser(AvatarID));
440 }
441 else
442 {
443 responsedata["int_response_code"] = 404;
444 responsedata["content_type"] = "text/plain";
445 responsedata["keepalive"] = false;
446 responsedata["str_response_string"] = "Not Found";
447 responsedata["error_status_text"] = "Not Found";
448 responsedata["http_protocol_version"] = "HTTP/1.0";
449 return responsedata;
450 // return 404
451 }
452 }
453 else
454 {
455 responsedata["int_response_code"] = 404;
456 responsedata["content_type"] = "text/plain";
457 responsedata["keepalive"] = false;
458 responsedata["str_response_string"] = "Not Found";
459 responsedata["error_status_text"] = "Not Found";
460 responsedata["http_protocol_version"] = "HTTP/1.0";
461 return responsedata;
462 // return 404
463 }
464
465 }
466
467 public OSD EventQueueFallBack(string path, OSD request, string endpoint)
468 {
469 // This is a fallback element to keep the client from loosing EventQueueGet
470 // Why does CAPS fail sometimes!?
471 m_log.Warn("[EVENTQUEUE]: In the Fallback handler! We lost the Queue in the rest handler!");
472 string capuuid = path.Replace("/CAPS/EQG/","");
473 capuuid = capuuid.Substring(0, capuuid.Length - 1);
474
475// UUID AvatarID = UUID.Zero;
476 UUID capUUID = UUID.Zero;
477 if (UUID.TryParse(capuuid, out capUUID))
478 {
479/* Don't remove this yet code cleaners!
480 * Still testing this!
481 *
482 lock (m_QueueUUIDAvatarMapping)
483 {
484 if (m_QueueUUIDAvatarMapping.ContainsKey(capUUID))
485 {
486 AvatarID = m_QueueUUIDAvatarMapping[capUUID];
487 }
488 }
489
490
491 if (AvatarID != UUID.Zero)
492 {
493 // Repair the CAP!
494 //OpenSim.Framework.Communications.Capabilities.Caps caps = m_scene.GetCapsHandlerForUser(AvatarID);
495 //string capsBase = "/CAPS/EQG/";
496 //caps.RegisterHandler("EventQueueGet",
497 //new RestHTTPHandler("POST", capsBase + capUUID.ToString() + "/",
498 //delegate(Hashtable m_dhttpMethod)
499 //{
500 // return ProcessQueue(m_dhttpMethod, AvatarID, caps);
501 //}));
502 // start new ID sequence.
503 Random rnd = new Random(System.Environment.TickCount);
504 lock (m_ids)
505 {
506 if (!m_ids.ContainsKey(AvatarID))
507 m_ids.Add(AvatarID, rnd.Next(30000000));
508 }
509
510
511 int thisID = 0;
512 lock (m_ids)
513 thisID = m_ids[AvatarID];
514
515 BlockingLLSDQueue queue = GetQueue(AvatarID);
516 OSDArray array = new OSDArray();
517 LLSD element = queue.Dequeue(15000); // 15s timeout
518 if (element == null)
519 {
520
521 array.Add(EventQueueHelper.KeepAliveEvent());
522 }
523 else
524 {
525 array.Add(element);
526 while (queue.Count() > 0)
527 {
528 array.Add(queue.Dequeue(1));
529 thisID++;
530 }
531 }
532 OSDMap events = new OSDMap();
533 events.Add("events", array);
534
535 events.Add("id", new LLSDInteger(thisID));
536
537 lock (m_ids)
538 {
539 m_ids[AvatarID] = thisID + 1;
540 }
541
542 return events;
543 }
544 else
545 {
546 return new LLSD();
547 }
548*
549*/
550 }
551 else
552 {
553 //return new LLSD();
554 }
555
556 return new OSDString("shutdown404!");
557 }
558
559 public void DisableSimulator(ulong handle, UUID avatarID)
560 {
561 OSD item = EventQueueHelper.DisableSimulator(handle);
562 Enqueue(item, avatarID);
563 }
564
565 public void EnableSimulator(ulong handle, IPEndPoint endPoint, UUID avatarID)
566 {
567 OSD item = EventQueueHelper.EnableSimulator(handle, endPoint);
568 Enqueue(item, avatarID);
569 }
570
571 public void EstablishAgentCommunication(UUID avatarID, IPEndPoint endPoint, string capsPath)
572 {
573 OSD item = EventQueueHelper.EstablishAgentCommunication(avatarID, endPoint.ToString(), capsPath);
574 Enqueue(item, avatarID);
575 }
576
577 public void TeleportFinishEvent(ulong regionHandle, byte simAccess,
578 IPEndPoint regionExternalEndPoint,
579 uint locationID, uint flags, string capsURL,
580 UUID avatarID)
581 {
582 OSD item = EventQueueHelper.TeleportFinishEvent(regionHandle, simAccess, regionExternalEndPoint,
583 locationID, flags, capsURL, avatarID);
584 Enqueue(item, avatarID);
585 }
586
587 public void CrossRegion(ulong handle, Vector3 pos, Vector3 lookAt,
588 IPEndPoint newRegionExternalEndPoint,
589 string capsURL, UUID avatarID, UUID sessionID)
590 {
591 OSD item = EventQueueHelper.CrossRegion(handle, pos, lookAt, newRegionExternalEndPoint,
592 capsURL, avatarID, sessionID);
593 Enqueue(item, avatarID);
594 }
595
596 public void ChatterboxInvitation(UUID sessionID, string sessionName,
597 UUID fromAgent, string message, UUID toAgent, string fromName, byte dialog,
598 uint timeStamp, bool offline, int parentEstateID, Vector3 position,
599 uint ttl, UUID transactionID, bool fromGroup, byte[] binaryBucket)
600 {
601 OSD item = EventQueueHelper.ChatterboxInvitation(sessionID, sessionName, fromAgent, message, toAgent, fromName, dialog,
602 timeStamp, offline, parentEstateID, position, ttl, transactionID,
603 fromGroup, binaryBucket);
604 Enqueue(item, toAgent);
605 m_log.InfoFormat("########### eq ChatterboxInvitation #############\n{0}", item);
606
607 }
608
609 public void ChatterBoxSessionAgentListUpdates(UUID sessionID, UUID fromAgent, UUID toAgent, bool canVoiceChat,
610 bool isModerator, bool textMute)
611 {
612 OSD item = EventQueueHelper.ChatterBoxSessionAgentListUpdates(sessionID, fromAgent, canVoiceChat,
613 isModerator, textMute);
614 Enqueue(item, toAgent);
615 m_log.InfoFormat("########### eq ChatterBoxSessionAgentListUpdates #############\n{0}", item);
616 }
617
618 public void ParcelProperties(ParcelPropertiesPacket parcelPropertiesPacket, UUID avatarID)
619 {
620 OSD item = EventQueueHelper.ParcelProperties(parcelPropertiesPacket);
621 Enqueue(item, avatarID);
622 }
623
624 public void GroupMembership(AgentGroupDataUpdatePacket groupUpdate, UUID avatarID)
625 {
626 OSD item = EventQueueHelper.GroupMembership(groupUpdate);
627 Enqueue(item, avatarID);
628 }
629 }
630}