From 279e0061c515ee0a03036bef68eea9738273d785 Mon Sep 17 00:00:00 2001 From: Johan Berntsson Date: Tue, 4 Mar 2008 05:31:54 +0000 Subject: Merged 3Di code that provides scene and avatar serialization, and plugin support for region move/split/merge. See ThirdParty/3Di/README.txt. Unless the new modules are used there should be no noticeable changes when running OpenSim. --- ThirdParty/3Di/LoadBalancer/LoadBalancerPlugin.cs | 1101 +++++++++++++++++++++ ThirdParty/3Di/LoadBalancer/TcpClient.cs | 240 +++++ ThirdParty/3Di/LoadBalancer/TcpServer.cs | 219 ++++ 3 files changed, 1560 insertions(+) create mode 100644 ThirdParty/3Di/LoadBalancer/LoadBalancerPlugin.cs create mode 100644 ThirdParty/3Di/LoadBalancer/TcpClient.cs create mode 100644 ThirdParty/3Di/LoadBalancer/TcpServer.cs (limited to 'ThirdParty/3Di/LoadBalancer') diff --git a/ThirdParty/3Di/LoadBalancer/LoadBalancerPlugin.cs b/ThirdParty/3Di/LoadBalancer/LoadBalancerPlugin.cs new file mode 100644 index 0000000..6812777 --- /dev/null +++ b/ThirdParty/3Di/LoadBalancer/LoadBalancerPlugin.cs @@ -0,0 +1,1101 @@ +/* +* Copyright (c) Contributors, http://opensimulator.org/ +* See CONTRIBUTORS.TXT for a full list of copyright holders. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions are met: +* * Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* * Neither the name of the OpenSim Project nor the +* names of its contributors may be used to endorse or promote products +* derived from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY +* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +* DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY +* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +* +*/ + +using System; +using System.IO; +using System.Net; +using System.Xml; +using System.Text; +using System.Xml.Serialization; +using System.Net.Sockets; +using System.Collections; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; + +using OpenSim.Framework; +using OpenSim.Framework.Console; +using OpenSim.Framework.Servers; +using OpenSim.Region.Environment; +using OpenSim.Region.Environment.Scenes; +using OpenSim.Region.ClientStack; + +using Nwc.XmlRpc; +using Nini.Config; + +using Mono.Addins; + +using libsecondlife; +using libsecondlife.Packets; + +[assembly:Addin] +[assembly:AddinDependency ("OpenSim", "0.5")] + +namespace OpenSim.ApplicationPlugins.LoadBalancer +{ + [Extension("/OpenSim/Startup")] + public class LoadBalancerPlugin : IApplicationPlugin + { + private static readonly log4net.ILog m_log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); + + private OpenSimMain simMain; + private BaseHttpServer commandServer; + + private List udpServers; + private List regionData; + + private int proxyOffset; + private string proxyURL; + private SceneManager sceneManager; + private string serializeDir; + + private TcpServer mTcpServer; + private TcpClient mTcpClient; + + public void Initialise(OpenSimMain openSim) + { + m_log.Info("[BALANCER] "+"Entering Initialize()"); + + StartTcpServer(); + ClientView.SynchronizeClient = new ClientView.SynchronizeClientHandler(SynchronizePackets); + AsynchronousSocketListener.PacketHandler = new AsynchronousSocketListener.PacketRecieveHandler(SynchronizePacketRecieve); + + this.sceneManager = openSim.SceneManager; + this.udpServers = openSim.UdpServers; + this.regionData = openSim.RegionData; + this.simMain = openSim; + this.commandServer = openSim.HttpServer; + + proxyOffset = Int32.Parse(openSim.ConfigSource.Configs["Network"].GetString("proxy_offset", "0")); + proxyURL = openSim.ConfigSource.Configs["Network"].GetString("proxy_url", ""); + if(proxyURL.Length==0) return; + + serializeDir = openSim.ConfigSource.Configs["Network"].GetString("serialize_dir", "/tmp/"); + + commandServer.AddXmlRPCHandler("SerializeRegion", SerializeRegion); + commandServer.AddXmlRPCHandler("DeserializeRegion_Move", DeserializeRegion_Move); + commandServer.AddXmlRPCHandler("DeserializeRegion_Clone", DeserializeRegion_Clone); + commandServer.AddXmlRPCHandler("TerminateRegion", TerminateRegion); + + commandServer.AddXmlRPCHandler("SplitRegion", SplitRegion); + commandServer.AddXmlRPCHandler("MergeRegions", MergeRegions); + commandServer.AddXmlRPCHandler("UpdatePhysics", UpdatePhysics); + commandServer.AddXmlRPCHandler("GetStatus", GetStatus); + + m_log.Info("[BALANCER] "+"Exiting Initialize()"); + } + + private void StartTcpServer() + { + Thread server_thread = new Thread(new ThreadStart( + delegate { + mTcpServer = new TcpServer(10001); + mTcpServer.start(); + })); + server_thread.Start(); + } + + public void Close() + { + } + + private XmlRpcResponse GetStatus(XmlRpcRequest request) + { + XmlRpcResponse response = new XmlRpcResponse(); + try + { + m_log.Info("[BALANCER] "+"Entering RegionStatus()"); + + int src_port = (int)request.Params[0]; + Scene scene = null; + // try to get the scene object + RegionInfo src_region = SearchRegionFromPortNum(src_port); + if (sceneManager.TryGetScene(src_region.RegionID, out scene) == false) + { + m_log.Error("[BALANCER] "+"The Scene is not found"); + return response; + } + // serialization of client's informations + List presences = scene.GetScenePresences(); + int get_scene_presence = presences.Count; + int get_scene_presence_filter = 0; + foreach (ScenePresence pre in presences) + { + ClientView client = (ClientView) pre.ControllingClient; + //if(pre.MovementFlag!=0 && client.PacketProcessingEnabled==true) { + if(client.PacketProcessingEnabled==true) { + get_scene_presence_filter++; + } + } + List avatars = scene.GetAvatars(); + int get_avatar = avatars.Count; + int get_avatar_filter = 0; + string avatar_names = ""; + foreach (ScenePresence pre in avatars) + { + ClientView client = (ClientView) pre.ControllingClient; + //if(pre.MovementFlag!=0 && client.PacketProcessingEnabled==true) { + if(client.PacketProcessingEnabled==true) { + get_avatar_filter++; + avatar_names += pre.Firstname + " " + pre.Lastname + "; "; + } + } + + Hashtable responseData = new Hashtable(); + responseData["get_scene_presence_filter"] = get_scene_presence_filter; + responseData["get_scene_presence"] = get_scene_presence; + responseData["get_avatar_filter"] = get_avatar_filter; + responseData["get_avatar"] = get_avatar; + responseData["avatar_names"] = avatar_names; + response.Value = responseData; + + m_log.Info("[BALANCER] "+"Exiting RegionStatus()"); + } + catch (Exception e) + { + m_log.Error("[BALANCER] "+e.ToString()); + m_log.Error("[BALANCER] "+e.StackTrace); + } + return response; + } + + private XmlRpcResponse SerializeRegion(XmlRpcRequest request) + { + try + { + m_log.Info("[BALANCER] "+"Entering SerializeRegion()"); + + string src_url = (string)request.Params[0]; + int src_port = (int)request.Params[1]; + + SerializeRegion(src_url, src_port); + + m_log.Info("[BALANCER] "+"Exiting SerializeRegion()"); + } + catch (Exception e) + { + m_log.Error("[BALANCER] "+e.ToString()); + m_log.Error("[BALANCER] "+e.StackTrace); + } + + return new XmlRpcResponse(); + } + + private XmlRpcResponse DeserializeRegion_Move(XmlRpcRequest request) + { + try + { + m_log.Info("[BALANCER] "+"Entering DeserializeRegion_Move()"); + + string src_url = (string)request.Params[0]; + int src_port = (int)request.Params[1]; + string dst_url = (string)request.Params[2]; + int dst_port = (int)request.Params[3]; + + DeserializeRegion_Move(src_port, dst_port, src_url, dst_url); + + m_log.Info("[BALANCER] "+"Exiting DeserializeRegion_Move()"); + } + catch (Exception e) + { + m_log.Error("[BALANCER] "+e.ToString()); + m_log.Error("[BALANCER] "+e.StackTrace); + } + + return new XmlRpcResponse(); + } + + private XmlRpcResponse DeserializeRegion_Clone(XmlRpcRequest request) + { + try + { + m_log.Info("[BALANCER] "+"Entering DeserializeRegion_Clone()"); + + string src_url = (string)request.Params[0]; + int src_port = (int)request.Params[1]; + string dst_url = (string)request.Params[2]; + int dst_port = (int)request.Params[3]; + + DeserializeRegion_Clone(src_port, dst_port, src_url, dst_url); + + m_log.Info("[BALANCER] "+"Exiting DeserializeRegion_Clone()"); + } + catch (Exception e) + { + m_log.Error("[BALANCER] "+e.ToString()); + m_log.Error("[BALANCER] "+e.StackTrace); + throw e; + } + + return new XmlRpcResponse(); + } + + private XmlRpcResponse TerminateRegion(XmlRpcRequest request) + { + try + { + m_log.Info("[BALANCER] "+"Entering TerminateRegion()"); + + int src_port = (int)request.Params[0]; + + // backgroud + WaitCallback callback = new WaitCallback(TerminateRegion); + ThreadPool.QueueUserWorkItem(callback, src_port); + + m_log.Info("[BALANCER] "+"Exiting TerminateRegion()"); + } + catch (Exception e) + { + m_log.Error("[BALANCER] "+e.ToString()); + m_log.Error("[BALANCER] "+e.StackTrace); + } + + return new XmlRpcResponse(); + } + + // internal functions + + private void SerializeRegion(string src_url, int src_port) + { + RegionInfo src_region = null; + + //------------------------------------------ + // Processing of origin region + //------------------------------------------ + + // search origin region + src_region = SearchRegionFromPortNum(src_port); + + if (src_region == null) + { + m_log.Error("[BALANCER] "+"Region not found"); + return; + } + + simMain.ProxyCommand(src_region.proxyUrl, "BlockClientMessages", src_url, src_port + proxyOffset); + + // serialization of origin region's data + SerializeRegion(src_region, serializeDir); + } + + private void DeserializeRegion_Move(int src_port, int dst_port, string src_url, string dst_url) + { + RegionInfo dst_region = null; + + //------------------------------------------ + // Processing of destination region + //------------------------------------------ + + // import the source region's data + dst_region = DeserializeRegion(dst_port, true, serializeDir); + + simMain.ProxyCommand(dst_region.proxyUrl, "ChangeRegion", src_port + proxyOffset, src_url, dst_port + proxyOffset, dst_url); + simMain.ProxyCommand(dst_region.proxyUrl, "UnblockClientMessages", dst_url, dst_port + proxyOffset); + } + + private void DeserializeRegion_Clone(int src_port, int dst_port, string src_url, string dst_url) + { + RegionInfo dst_region = null; + + //------------------------------------------ + // Processing of destination region + //------------------------------------------ + + // import the source region's data + dst_region = DeserializeRegion(dst_port, false, serializeDir); + + // Decide who is in charge for each section + int[] port = new int[] { src_port, dst_port }; + string[] url = new string[] { "http://" + src_url + ":" + commandServer.Port, "http://" + dst_url + ":" + commandServer.Port }; + for(int i=0; i<2; i++) simMain.XmlRpcCommand(url[i], "SplitRegion", i, 2, port[0], port[1], url[0], url[1]); + + // Enable the proxy + simMain.ProxyCommand(dst_region.proxyUrl, "AddRegion", src_port + proxyOffset, src_url, dst_port + proxyOffset, dst_url); + simMain.ProxyCommand(dst_region.proxyUrl, "UnblockClientMessages", dst_url, dst_port + proxyOffset); + } + + private void TerminateRegion(object param) + { + RegionInfo src_region = null; + int src_port = (int)param; + + //------------------------------------------ + // Processing of remove region + //------------------------------------------ + + // search origin region + src_region = SearchRegionFromPortNum(src_port); + + if (src_region == null) + { + m_log.Error("[BALANCER] "+"Region not found"); + return; + } + + isSplit = false; + + // remove client resources + RemoveAllClientResource(src_region); + // remove old region + RemoveRegion(src_region.RegionID, src_region.InternalEndPoint.Port); + + m_log.Info("[BALANCER] "+"Region terminated"); + } + + private RegionInfo SearchRegionFromPortNum(int portnum) + { + RegionInfo result = null; + + foreach (RegionInfo rinfo in regionData) + { + if (rinfo.InternalEndPoint.Port == portnum) + { +// m_log.Info("BALANCER", +// "Region found. Internal Port = {0}, Handle={1}", +// rinfo.InternalEndPoint.Port, rinfo.RegionHandle); + result = rinfo; + break; + } + } + + return result; + } + + private UDPServer SearchUDPServerFromPortNum(int portnum) + { + return udpServers.Find( delegate(UDPServer server) { return (portnum + proxyOffset == ((IPEndPoint) server.Server.LocalEndPoint).Port); }); + } + + private void SerializeRegion(RegionInfo src_region, string export_dir) + { + Scene scene = null; + List presences; + string filename; + int i = 0; + + // try to get the scene object + if (sceneManager.TryGetScene(src_region.RegionID, out scene) == false) + { + m_log.Error("[BALANCER] "+"The Scene is not found"); + return; + } + + // create export directory + DirectoryInfo dirinfo = new DirectoryInfo(export_dir); + if (!dirinfo.Exists) + { + dirinfo.Create(); + } + + // serialization of client's informations + presences = scene.GetScenePresences(); + + foreach (ScenePresence pre in presences) + { + SerializeClient(i, scene, pre, export_dir); + i++; + } + + // serialization of region data + SearializableRegionInfo dst_region = new SearializableRegionInfo(src_region); + + filename = export_dir + "RegionInfo_" + src_region.RegionID.ToString() + ".bin"; + Util.SerializeToFile(filename, dst_region); + + // backup current scene's entities + //scene.Backup(); + + m_log.InfoFormat("[BALANCER] "+"region serialization completed [{0}]", + src_region.RegionID.ToString()); + } + + private void SerializeClient(int idx, Scene scene, ScenePresence pre, string export_dir) + { + string filename; + IClientAPI controller = null; + + m_log.InfoFormat("[BALANCER] "+"agent id : {0}", pre.m_uuid); + + uint[] circuits = scene.ClientManager.GetAllCircuits(pre.m_uuid); + + foreach (uint code in circuits) + { + m_log.InfoFormat("[BALANCER] "+"circuit code : {0}", code); + + if (scene.ClientManager.TryGetClient(code, out controller)) + { + ClientInfo info = controller.GetClientInfo(); + + filename = export_dir + "ClientInfo-" + String.Format("{0:0000}", idx) + "_" + controller.CircuitCode.ToString() + ".bin"; + + Util.SerializeToFile(filename, info); + + m_log.InfoFormat("[BALANCER] "+"client info serialized [filename={0}]", filename); + } + } + + //filename = export_dir + "Presence_" + controller.AgentId.ToString() + ".bin"; + filename = export_dir + "Presence_" + String.Format("{0:0000}", idx) + ".bin"; + + Util.SerializeToFile(filename, pre); + + m_log.InfoFormat("[BALANCER] "+"scene presence serialized [filename={0}]", filename); + } + + private RegionInfo DeserializeRegion(int dst_port, bool move_flag, string import_dir) + { + string[] files = null; + RegionInfo dst_region = null; + + try + { + // deserialization of region data + files = Directory.GetFiles(import_dir, "RegionInfo_*.bin"); + + foreach (string filename in files) + { + m_log.InfoFormat("[BALANCER] RegionInfo filename = [{0}]", filename); + + dst_region = new RegionInfo((SearializableRegionInfo)Util.DeserializeFromFile(filename)); + + m_log.InfoFormat("[BALANCER] "+"RegionID = [{0}]", dst_region.RegionID.ToString()); + m_log.InfoFormat("[BALANCER] "+"RegionHandle = [{0}]", dst_region.RegionHandle); + m_log.InfoFormat("[BALANCER] "+"ProxyUrl = [{0}]", dst_region.proxyUrl); + m_log.InfoFormat("[BALANCER] "+"OriginRegionID = [{0}]", dst_region.originRegionID.ToString()); + + CreateCloneRegion(dst_region, dst_port, true); + + File.Delete(filename); + + m_log.InfoFormat("[BALANCER] "+"region deserialized [{0}]", dst_region.RegionID); + } + + // deserialization of client data + DeserializeClient(dst_region, import_dir); + + m_log.InfoFormat("[BALANCER] "+"region deserialization completed [{0}]", + dst_region.ToString()); + } + catch (Exception e) + { + m_log.Error("[BALANCER] "+e.ToString()); + m_log.Error("[BALANCER] "+e.StackTrace); + throw e; + } + + return dst_region; + } + + private void DeserializeClient(RegionInfo dst_region, string import_dir) + { + ScenePresence sp = null; + ClientInfo data = null; + Scene scene = null; + string[] files = null; + IClientAPI controller = null; + UDPServer udpserv = null; + + if (sceneManager.TryGetScene(dst_region.RegionID, out scene)) + { + // search udpserver + udpserv = SearchUDPServerFromPortNum(scene.RegionInfo.InternalEndPoint.Port); + + // restore the scene presence +/* + files = Directory.GetFiles(import_dir, "Presence_*.bin"); + Array.Sort(files); + + foreach (string filename in files) + { + sp = (ScenePresence)Util.DeserializeFromFile(filename); + Console.WriteLine("agent id = {0}", sp.m_uuid); + + scene.m_restorePresences.Add(sp.m_uuid, sp); + File.Delete(filename); + + m_log.InfoFormat("[BALANCER] "+"scene presence deserialized [{0}]", sp.m_uuid); + } +*/ + for (int i = 0; ; i++) + { + string filename = import_dir + "Presence_" + String.Format("{0:0000}", i) + ".bin"; + + if (!File.Exists(filename)) + { + break; + } + + sp = (ScenePresence)Util.DeserializeFromFile(filename); + Console.WriteLine("agent id = {0}", sp.m_uuid); + + scene.m_restorePresences.Add(sp.m_uuid, sp); + File.Delete(filename); + + m_log.InfoFormat("[BALANCER] " + "scene presence deserialized [{0}]", sp.m_uuid); + + // restore the ClientView + + files = Directory.GetFiles(import_dir, "ClientInfo-" + String.Format("{0:0000}", i) + "_*.bin"); + + foreach (string fname in files) + { + int start = fname.IndexOf('_'); + int end = fname.LastIndexOf('.'); + uint circuit_code = uint.Parse(fname.Substring(start + 1, end - start - 1)); + m_log.InfoFormat("[BALANCER] " + "client circuit code = {0}", circuit_code); + + data = (ClientInfo)Util.DeserializeFromFile(fname); + + AgentCircuitData agentdata = new AgentCircuitData(data.agentcircuit); + scene.AuthenticateHandler.AddNewCircuit(circuit_code, agentdata); + + udpserv.RestoreClient(agentdata, data.userEP, data.proxyEP); + + // waiting for the scene-presense restored + lock (scene.m_restorePresences) + { + Monitor.Wait(scene.m_restorePresences, 3000); + } + + if (scene.ClientManager.TryGetClient(circuit_code, out controller)) + { + m_log.InfoFormat("[BALANCER] " + "get client [{0}]", circuit_code); + controller.SetClientInfo(data); + } + + File.Delete(fname); + + m_log.InfoFormat("[BALANCER] " + "client info deserialized [{0}]", circuit_code); + } + + // backup new scene's entities + //scene.Backup(); + } + } + } + + private void CreateCloneRegion(RegionInfo dst_region, int dst_port, bool createID_flag) + { + if (createID_flag) + { + dst_region.RegionID = LLUUID.Random(); + } + + // change RegionInfo (memory only) + dst_region.InternalEndPoint.Port = dst_port; + dst_region.ExternalHostName = proxyURL.Split(new char[] { '/', ':' })[3]; + + // Create new region + simMain.CreateRegion(dst_region, false); + } + + private void RemoveRegion(LLUUID regionID, int port) + { + Scene killScene; + if (sceneManager.TryGetScene(regionID, out killScene)) + { + Console.WriteLine("scene found."); + + if ((sceneManager.CurrentScene != null) + && (sceneManager.CurrentScene.RegionInfo.RegionID == killScene.RegionInfo.RegionID)) + { + sceneManager.TrySetCurrentScene(".."); + } + + m_log.Info("Removing region : " + killScene.RegionInfo.RegionName); + regionData.Remove(killScene.RegionInfo); + sceneManager.CloseScene(killScene); + } + + // Shutting down the UDP server + UDPServer udpsvr = SearchUDPServerFromPortNum(port); + + if (udpsvr != null) + { + udpsvr.Server.Close(); + udpServers.Remove(udpsvr); + } + } + + private void RemoveAllClientResource(RegionInfo src_region) + { + Scene scene = null; + List presences; + IClientAPI controller = null; + + // try to get the scene object + if (sceneManager.TryGetScene(src_region.RegionID, out scene) == false) + { + m_log.Error("[BALANCER] "+"The Scene is not found"); + return; + } + + // serialization of client's informations + presences = scene.GetScenePresences(); + + // remove all scene presences + foreach (ScenePresence pre in presences) + { + uint[] circuits = scene.ClientManager.GetAllCircuits(pre.m_uuid); + + foreach (uint code in circuits) + { + m_log.InfoFormat("[BALANCER] "+"circuit code : {0}", code); + + if (scene.ClientManager.TryGetClient(code, out controller)) + { + // stopping clientview thread + if (((ClientView)controller).PacketProcessingEnabled) + { + controller.Stop(); + ((ClientView)controller).PacketProcessingEnabled = false; + } + // teminateing clientview thread + controller.Terminate(); + m_log.Info("[BALANCER] "+"client thread stopped"); + } + } + + // remove scene presence + scene.RemoveClient(pre.m_uuid); + } + } + + /* + * This section implements scene splitting and synchronization + */ + + private bool[] isLocalNeighbour; + private string[] sceneURL; + private int[] regionPortList; + private TcpClient[] tcpClientList; + private bool isSplit = false; + + private XmlRpcResponse SplitRegion(XmlRpcRequest request) + { + try + { + int myID = (int) request.Params[0]; + int numRegions = (int) request.Params[1]; + regionPortList = new int[numRegions]; + sceneURL = new string[numRegions]; + tcpClientList = new TcpClient[numRegions]; + + for(int i=0; i presences = scene.GetScenePresences(); +presences.Sort(); + foreach (ScenePresence pre in presences) + { + // Divide the presences evenly over the set of subscenes + ClientView client = (ClientView) pre.ControllingClient; + client.PacketProcessingEnabled = (( (i + myID) % sceneURL.Length) == 0); + + m_log.InfoFormat("[SPLITSCENE] === SplitRegion {0}: SP.PacketEnabled {1}", region.RegionID, client.PacketProcessingEnabled); + + if (!client.PacketProcessingEnabled) + { + // stopping clientview thread + client.Stop(); + } + + ++i; + } + + scene.splitID = myID; + scene.SynchronizeScene = new Scene.SynchronizeSceneHandler(SynchronizeScenes); + isSplit = true; + } + else + { + m_log.Error("[SPLITSCENE] "+String.Format("Scene not found {0}", region.RegionID)); + } + } + catch (Exception e) + { + m_log.Error("[SPLITSCENE] "+e.ToString()); + m_log.Error("[SPLITSCENE] "+e.StackTrace); + } + + return new XmlRpcResponse(); + } + + private XmlRpcResponse MergeRegions(XmlRpcRequest request) + { + // This should only be called for the master scene + try + { + m_log.Info("[BALANCER] "+"Entering MergeRegions()"); + + string src_url = (string) request.Params[0]; + int src_port = (int) request.Params[1]; + + RegionInfo region = SearchRegionFromPortNum(src_port); + + simMain.ProxyCommand(region.proxyUrl, "BlockClientMessages", src_url, src_port + proxyOffset); + + Scene scene; + if (sceneManager.TryGetScene(region.RegionID, out scene)) + { + isSplit = false; + + scene.SynchronizeScene = null; + scene.Region_Status = RegionStatus.Up; + + List presences = scene.GetScenePresences(); + foreach (ScenePresence pre in presences) + { + ClientView client = (ClientView) pre.ControllingClient; + if (!client.PacketProcessingEnabled) + { + client.Restart(); + client.PacketProcessingEnabled = true; + } + } + } + + // Delete the slave scenes + for(int i=1; i presences = scene.GetScenePresences(); + foreach (ScenePresence pre in presences) + { + ClientView client = (ClientView) pre.ControllingClient; + + // Because data changes by the physics simulation when the client doesn't move, + // if MovementFlag is false, It is necessary to synchronize. + //if(pre.MovementFlag!=0 && client.PacketProcessingEnabled==true) + if(client.PacketProcessingEnabled==true) + { + //m_log.Info("[SPLITSCENE] "+String.Format("Client moving in {0} {1}", scene.RegionInfo.RegionID, pre.AbsolutePosition)); + + for (int i = 0; i < sceneURL.Length; i++) + { + if (i == scene.splitID) + { + continue; + } + + if(isLocalNeighbour[i]) + { + //m_log.Info("[SPLITSCENE] "+"Synchronize ScenePresence (Local) [region:{0}=>{1}, client:{2}]", + // scene.RegionInfo.RegionID, regionPortList[i], pre.UUID.ToString()); + LocalUpdatePhysics(regionPortList[i], pre.UUID, pre.AbsolutePosition, pre.Velocity, pre.PhysicsActor.Flying); + } + else + { + //m_log.Info("[SPLITSCENE] "+"Synchronize ScenePresence (Remote) [region port:{0}, client:{1}, position:{2}, velocity:{3}, flying:{4}]", + // regionPortList[i], pre.UUID.ToString(), pre.AbsolutePosition.ToString(), + // pre.Velocity.ToString(), pre.PhysicsActor.Flying); + + + simMain.XmlRpcCommand(sceneURL[i], "UpdatePhysics", + regionPortList[i], pre.UUID.GetBytes(), + pre.AbsolutePosition.GetBytes(), pre.Velocity.GetBytes(), + pre.PhysicsActor.Flying); + +/* + byte[] buff = new byte[12+12+1]; + + Buffer.BlockCopy(pre.AbsolutePosition.GetBytes(), 0, buff, 0, 12); + Buffer.BlockCopy(pre.Velocity.GetBytes(), 0, buff, 12, 12); + buff[24] = (byte)((pre.PhysicsActor.Flying)?1:0); + + // create header + InternalPacketHeader header = new InternalPacketHeader(); + + header.type = 1; + header.throttlePacketType = 0; + header.numbytes = buff.Length; + header.agent_id = pre.UUID.UUID; + header.region_port = regionPortList[i]; + + //Send + tcpClientList[i].send(header, buff); +*/ + } + } + } +// ++i; + } + } + } + + public bool SynchronizePackets(IScene scene, Packet packet, LLUUID agentID, ThrottleOutPacketType throttlePacketType) + { + if (!isSplit) + { + return false; + } + + Scene localScene = (Scene)scene; + + for (int i = 0; i < sceneURL.Length; i++) + { + if (i == localScene.splitID) + { + continue; + } + + if(isLocalNeighbour[i]) + { + //m_log.Info("[SPLITSCENE] "+"Synchronize Packet (Local) [type:{0}, client:{1}]", + // packet.Type.ToString(), agentID.ToString()); + LocalUpdatePacket(regionPortList[i], agentID, packet, throttlePacketType); + } + else + { + //m_log.Info("[SPLITSCENE] "+"Synchronize Packet (Remote) [type:{0}, client:{1}]", + // packet.Type.ToString(), agentID.ToString()); + // to bytes + byte[] buff = packet.ToBytes(); + + // create header + InternalPacketHeader header = new InternalPacketHeader(); + + header.type = 0; + header.throttlePacketType = (int)throttlePacketType; + header.numbytes = buff.Length; + header.agent_id = agentID.UUID; + header.region_port = regionPortList[i]; + + //Send + tcpClientList[i].send(header, buff); + + PacketPool.Instance.ReturnPacket(packet); + } + } + + return true; + } + + private void LocalUpdatePacket(int regionPort, LLUUID agentID, Packet packet, ThrottleOutPacketType throttlePacketType) + { + Scene scene; + + RegionInfo region = SearchRegionFromPortNum(regionPort); + +// m_log.Info("[SPLITSCENE] "+"LocalUpdatePacket [region port:{0}, client:{1}, packet type:{2}]", +// regionPort, agentID.ToString(), packet.GetType().ToString()); + + if (sceneManager.TryGetScene(region.RegionID, out scene)) + { + ScenePresence pre = scene.GetScenePresences().Find(delegate(ScenePresence x) { return x.UUID == agentID; }); + + if (pre == null) + { + m_log.ErrorFormat("[SPLITSCENE] [LocalUpdatePacket] ScenePresence is missing... ({0})", agentID.ToString()); + return; + } + + if (((ClientView)pre.ControllingClient).PacketProcessingEnabled==true) + { + pre.ControllingClient.OutPacket(packet, throttlePacketType); + } + else + { + PacketPool.Instance.ReturnPacket(packet); + } + } + } + + public void SynchronizePacketRecieve(InternalPacketHeader header, byte[] buff) + { +// m_log.Info("[SPLITSCENE] "+"entering SynchronizePacketRecieve[type={0}]", header.type); + + if (!isSplit) + { + return; + } + + switch (header.type) + { + case 0: + + Packet packet = null; + byte[] zero = new byte[3000]; + int packetEnd = 0; + + // deserialize packet + packetEnd = buff.Length - 1; +// packetEnd = buff.Length; + + try + { + //m_log.Info("[SPLITSCENE] "+"PacketPool.Instance : {0}", (PacketPool.Instance == null)?"null":"not null"); + //m_log.Info("[SPLITSCENE] "+"buff length={0}", buff.Length); + + packet = PacketPool.Instance.GetPacket(buff, ref packetEnd, zero); + + LocalUpdatePacket(header.region_port, new LLUUID(header.agent_id), + packet, (ThrottleOutPacketType)header.throttlePacketType); + } + catch (Exception e) + { + m_log.Error("[SPLITSCENE] "+e.ToString()); + m_log.Error("[SPLITSCENE] "+e.StackTrace); + } + + break; + + case 1: + + int regionPort = header.region_port; + LLUUID scenePresenceID = new LLUUID(header.agent_id); + LLVector3 position = new LLVector3(buff, 0); + LLVector3 velocity = new LLVector3(buff, 12); + bool flying = ((buff[24] == (byte)1)?true:false); + + LocalUpdatePhysics(regionPort, scenePresenceID, position, velocity, flying); + + break; + + default: + m_log.Info("[SPLITSCENE] "+"Invalid type"); + break; + } + +// m_log.Info("[SPLITSCENE] "+"exiting SynchronizePacketRecieve"); + } + } +} diff --git a/ThirdParty/3Di/LoadBalancer/TcpClient.cs b/ThirdParty/3Di/LoadBalancer/TcpClient.cs new file mode 100644 index 0000000..9f62d33 --- /dev/null +++ b/ThirdParty/3Di/LoadBalancer/TcpClient.cs @@ -0,0 +1,240 @@ +/* +* Copyright (c) Contributors, http://opensimulator.org/ +* See CONTRIBUTORS.TXT for a full list of copyright holders. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions are met: +* * Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* * Neither the name of the OpenSim Project nor the +* names of its contributors may be used to endorse or promote products +* derived from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY +* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +* DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY +* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +* +*/ + +using System; +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Text; +using System.Runtime.Serialization.Formatters.Binary; + +namespace OpenSim.ApplicationPlugins.LoadBalancer { + public class AsynchronousClient { + private static ManualResetEvent connectDone = new ManualResetEvent(false); + private static ManualResetEvent sendDone = new ManualResetEvent(false); + private static ManualResetEvent receiveDone = new ManualResetEvent(false); + private static String response = String.Empty; + + public static Socket StartClient(string hostname, int port) { + try { + IPHostEntry ipHostInfo = Dns.GetHostEntry(hostname); + IPAddress ipAddress = ipHostInfo.AddressList[0]; + IPEndPoint remoteEP = new IPEndPoint(ipAddress, port); + + Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + client.BeginConnect( remoteEP, new AsyncCallback(ConnectCallback), client); + connectDone.WaitOne(); + /* + Send(client,"This is a test"); + sendDone.WaitOne(); + Receive(client); + receiveDone.WaitOne(); + client.Shutdown(SocketShutdown.Both); + client.Close(); + */ + return client; + } catch (Exception e) { + Console.WriteLine(e.ToString()); + throw new Exception("socket error !!"); + } + } + + private static void ConnectCallback(IAsyncResult ar) { + try { + Socket client = (Socket) ar.AsyncState; + client.EndConnect(ar); + Console.WriteLine("Socket connected to {0}", client.RemoteEndPoint.ToString()); + connectDone.Set(); + } catch (Exception e) { + Console.WriteLine(e.ToString()); + } + } + +/* + public static void Receive(Socket client) { + try { + StateObject state = new StateObject(); + state.workSocket = client; + client.BeginReceive( state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state); + } catch (Exception e) { + Console.WriteLine(e.ToString()); + } + } + + private static void ReceiveCallback( IAsyncResult ar ) { + try { + StateObject state = (StateObject) ar.AsyncState; + Socket client = state.workSocket; + + int bytesRead = client.EndReceive(ar); + if (bytesRead > 0) { + state.sb.Append(Encoding.ASCII.GetString(state.buffer,0,bytesRead)); + client.BeginReceive(state.buffer,0,StateObject.BufferSize,0, new AsyncCallback(ReceiveCallback), state); + } else { + if (state.sb.Length > 1) { + response = state.sb.ToString(); + } + receiveDone.Set(); + } + } catch (Exception e) { + Console.WriteLine(e.ToString()); + } + } +*/ + public static void Send(Socket client, byte[] byteData) { + client.BeginSend(byteData, 0, byteData.Length, 0, new AsyncCallback(SendCallback), client); + } + + private static void SendCallback(IAsyncResult ar) { + try { + Socket client = (Socket) ar.AsyncState; + int bytesSent = client.EndSend(ar); + //Console.WriteLine("Sent {0} bytes to server.", bytesSent); + sendDone.Set(); + } catch (Exception e) { + Console.WriteLine(e.ToString()); + } + } + } + +public class InternalPacketHeader +{ + private byte[] buffer = new byte[32]; + public int type; + public int throttlePacketType; + public int numbytes; + public Guid agent_id; + public int region_port; + + public void FromBytes(byte[] bytes) + { + int i = 0; // offset + try + { + this.type = (int)(bytes[i++] + (bytes[i++] << 8) + (bytes[i++] << 16) + (bytes[i++] << 24)); + this.throttlePacketType = (int)(bytes[i++] + (bytes[i++] << 8) + (bytes[i++] << 16) + (bytes[i++] << 24)); + this.numbytes = (int)(bytes[i++] + (bytes[i++] << 8) + (bytes[i++] << 16) + (bytes[i++] << 24)); + this.agent_id = new Guid( + bytes[i++] | (bytes[i++] << 8) | (bytes[i++] << 16) | bytes[i++] << 24, + (short)(bytes[i++] | (bytes[i++] << 8)), + (short)(bytes[i++] | (bytes[i++] << 8)), + bytes[i++], bytes[i++], bytes[i++], bytes[i++], + bytes[i++], bytes[i++], bytes[i++], bytes[i++]); + this.region_port = (int)(bytes[i++] + (bytes[i++] << 8) + (bytes[i++] << 16) + (bytes[i++] << 24)); + } + catch (Exception) + { + throw new Exception("bad format!!!"); + } + } + + public byte[] ToBytes() + { + int i = 0; + this.buffer[i++] = (byte)(this.type % 256); + this.buffer[i++] = (byte)((this.type >> 8) % 256); + this.buffer[i++] = (byte)((this.type >> 16) % 256); + this.buffer[i++] = (byte)((this.type >> 24) % 256); + + this.buffer[i++] = (byte)(this.throttlePacketType % 256); + this.buffer[i++] = (byte)((this.throttlePacketType >> 8) % 256); + this.buffer[i++] = (byte)((this.throttlePacketType >> 16) % 256); + this.buffer[i++] = (byte)((this.throttlePacketType >> 24) % 256); + + this.buffer[i++] = (byte)(this.numbytes % 256); + this.buffer[i++] = (byte)((this.numbytes >> 8) % 256); + this.buffer[i++] = (byte)((this.numbytes >> 16) % 256); + this.buffer[i++] = (byte)((this.numbytes >> 24) % 256); + + // no endian care + Buffer.BlockCopy(agent_id.ToByteArray(), 0, this.buffer, i, 16); i += 16; + + this.buffer[i++] = (byte)(this.region_port % 256); + this.buffer[i++] = (byte)((this.region_port >> 8) % 256); + this.buffer[i++] = (byte)((this.region_port >> 16) % 256); + this.buffer[i++] = (byte)((this.region_port >> 24) % 256); + + return this.buffer; + } +} + public class TcpClient { + + public static int internalPacketHeaderSize = 4*4 + 16*1; + + private string mHostname; + private int mPort; + private Socket mConnection; + public TcpClient(string hostname, int port) { + this.mHostname = hostname; + this.mPort = port; + this.mConnection = null; + } + public void connect() { + this.mConnection = AsynchronousClient.StartClient(mHostname, mPort); + } +/* + public void recevie() { + if (mConnection == null) { + throw new Exception("client not initialized"); + } + try + { + AsynchronousClient.Receive(this.mConnection); + } + catch (Exception e) + { + Console.WriteLine(e.ToString()); + mConnection = null; + } + } +*/ + public void send(InternalPacketHeader header, byte[] packet) { + + lock (this) + { + + if (mConnection == null) { +// throw new Exception("client not initialized"); + connect(); + } + + AsynchronousClient.Send(this.mConnection, header.ToBytes()); + +/* +for (int i = 0; i < 10; i++) +{ + Console.Write(packet[i] + " "); +} +Console.WriteLine(""); +*/ + AsynchronousClient.Send(this.mConnection, packet); + } + } + } +} diff --git a/ThirdParty/3Di/LoadBalancer/TcpServer.cs b/ThirdParty/3Di/LoadBalancer/TcpServer.cs new file mode 100644 index 0000000..ee8bcba --- /dev/null +++ b/ThirdParty/3Di/LoadBalancer/TcpServer.cs @@ -0,0 +1,219 @@ +/* +* Copyright (c) Contributors, http://opensimulator.org/ +* See CONTRIBUTORS.TXT for a full list of copyright holders. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions are met: +* * Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* * Neither the name of the OpenSim Project nor the +* names of its contributors may be used to endorse or promote products +* derived from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY +* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +* DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY +* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +* +*/ + +using System; +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Runtime.Serialization.Formatters.Binary; + +using OpenSim.Framework.Console; + +namespace OpenSim.ApplicationPlugins.LoadBalancer { + + public class StateObject { + public Socket workSocket = null; + public const int BufferSize = 2048; + public byte[] buffer = new byte[BufferSize]; + public MemoryStream ms_ptr = new MemoryStream(); + public InternalPacketHeader header = null; + } + + public class AsynchronousSocketListener { + public static string data = null; + public static ManualResetEvent allDone = new ManualResetEvent(false); + +#region KIRYU + public delegate void PacketRecieveHandler(InternalPacketHeader header, byte[] buff); + public static PacketRecieveHandler PacketHandler = null; +#endregion + + public AsynchronousSocketListener() { } + + public static void StartListening(int port) { + IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName()); + IPAddress ipAddress = ipHostInfo.AddressList[0]; + IPEndPoint localEndPoint = new IPEndPoint(ipAddress, port); + + Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp ); + try { + listener.Bind(localEndPoint); + listener.Listen(100); + while (true) { + allDone.Reset(); + listener.BeginAccept( new AsyncCallback(AcceptCallback), listener ); + allDone.WaitOne(); + } + } catch (Exception e) { + Console.WriteLine(e.ToString()); + } + /* + Console.WriteLine("\nPress ENTER to continue..."); + Console.Read(); + */ + } + + public static void AcceptCallback(IAsyncResult ar) { + allDone.Set(); + Socket listener = (Socket) ar.AsyncState; + Socket handler = listener.EndAccept(ar); + StateObject state = new StateObject(); + state.workSocket = handler; + handler.BeginReceive( state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReadCallback), state); + } + + public static void ReadCallback(IAsyncResult ar) { + String content = String.Empty; + StateObject state = (StateObject) ar.AsyncState; + Socket handler = state.workSocket; + + try + { + + int bytesRead = handler.EndReceive(ar); + + //MainLog.Instance.Verbose("TCPSERVER", "Received packet [{0}]", bytesRead); + + if (bytesRead > 0) { + state.ms_ptr.Write(state.buffer, 0, bytesRead); + } + else + { + //MainLog.Instance.Verbose("TCPSERVER", "Connection terminated"); + return; + } + + long rest_size = state.ms_ptr.Length; + long current_pos = 0; + while (rest_size > TcpClient.internalPacketHeaderSize) { + + if ((state.header == null) && (rest_size >= TcpClient.internalPacketHeaderSize)) + { + //MainLog.Instance.Verbose("TCPSERVER", "Processing header"); + + // reading header + state.header = new InternalPacketHeader(); + + byte[] headerbytes = new byte[TcpClient.internalPacketHeaderSize]; + state.ms_ptr.Position = current_pos; + state.ms_ptr.Read(headerbytes, 0, TcpClient.internalPacketHeaderSize); + state.ms_ptr.Seek(0, SeekOrigin.End); + state.header.FromBytes(headerbytes); + } + + if ((state.header != null) && (rest_size >= state.header.numbytes + TcpClient.internalPacketHeaderSize)) + { + //MainLog.Instance.Verbose("TCPSERVER", "Processing body"); + + // reading body + byte[] packet = new byte[state.header.numbytes]; + state.ms_ptr.Position = current_pos + TcpClient.internalPacketHeaderSize; + state.ms_ptr.Read(packet, 0, state.header.numbytes); + +/* + for(int i=0; i