From 429a84f390212d0f414a08420707fc90aca2a331 Mon Sep 17 00:00:00 2001 From: John Hurliman Date: Mon, 5 Oct 2009 17:38:14 -0700 Subject: Beginning work on the new LLUDP implementation --- .../Region/ClientStack/LindenUDP/LLUDPServer.cs | 1003 +++++++++++--------- 1 file changed, 557 insertions(+), 446 deletions(-) (limited to 'OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs') diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs index c779b08..7964c50 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs @@ -26,616 +26,727 @@ */ using System; -using System.Collections; using System.Collections.Generic; using System.Net; using System.Net.Sockets; using System.Reflection; +using System.Threading; using log4net; using Nini.Config; using OpenMetaverse.Packets; using OpenSim.Framework; using OpenSim.Region.Framework.Scenes; +using OpenMetaverse; namespace OpenSim.Region.ClientStack.LindenUDP { - /// - /// This class handles the initial UDP circuit setup with a client and passes on subsequent packets to the LLPacketServer - /// - public class LLUDPServer : ILLClientStackNetworkHandler, IClientNetworkServer + public class LLUDPServerShim : IClientNetworkServer { - private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - - /// - /// The client circuits established with this UDP server. If a client exists here we can also assume that - /// it is populated in clientCircuits_reverse and proxyCircuits (if relevant) - /// - protected Dictionary clientCircuits = new Dictionary(); - public Hashtable clientCircuits_reverse = Hashtable.Synchronized(new Hashtable()); - protected Dictionary proxyCircuits = new Dictionary(); - - private Socket m_socket; - protected IPEndPoint ServerIncoming; - protected byte[] RecvBuffer = new byte[4096]; - protected byte[] ZeroBuffer = new byte[8192]; - - /// - /// This is an endpoint that is reused where we don't need to protect the information from potentially - /// being stomped on by other threads. - /// - protected EndPoint reusedEpSender = new IPEndPoint(IPAddress.Any, 0); - - protected int proxyPortOffset; - - protected AsyncCallback ReceivedData; - protected LLPacketServer m_packetServer; - protected Location m_location; - - protected uint listenPort; - protected bool Allow_Alternate_Port; - protected IPAddress listenIP = IPAddress.Parse("0.0.0.0"); - protected IScene m_localScene; - protected int m_clientSocketReceiveBuffer = 0; + LLUDPServer m_udpServer; - /// - /// Manages authentication for agent circuits - /// - protected AgentCircuitManager m_circuitManager; - - public IScene LocalScene + public LLUDPServerShim() { - set - { - m_localScene = value; - m_packetServer.LocalScene = m_localScene; - - m_location = new Location(m_localScene.RegionInfo.RegionHandle); - } } - public ulong RegionHandle + public void Initialise(IPAddress listenIP, ref uint port, int proxyPortOffsetParm, bool allow_alternate_port, IConfigSource configSource, AgentCircuitManager circuitManager) { - get { return m_location.RegionHandle; } + m_udpServer = new LLUDPServer(listenIP, ref port, proxyPortOffsetParm, allow_alternate_port, configSource, circuitManager); } - Socket IClientNetworkServer.Server + public void NetworkStop() { - get { return m_socket; } + m_udpServer.Stop(); } - public bool HandlesRegion(Location x) + public void AddScene(IScene scene) { - //return x.RegionHandle == m_location.RegionHandle; - return x == m_location; + m_udpServer.AddScene(scene); } - public void AddScene(IScene x) + public bool HandlesRegion(Location x) { - LocalScene = x; + return m_udpServer.HandlesRegion(x); } public void Start() { - ServerListener(); + m_udpServer.Start(); } public void Stop() { - m_socket.Close(); + m_udpServer.Stop(); } + } + + public class LLUDPServer : UDPBase + { + private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - public LLUDPServer() + /// Handlers for incoming packets + //PacketEventDictionary packetEvents = new PacketEventDictionary(); + /// Incoming packets that are awaiting handling + private OpenMetaverse.BlockingQueue packetInbox = new OpenMetaverse.BlockingQueue(); + /// + private UDPClientCollection clients = new UDPClientCollection(); + /// Bandwidth throttle for this UDP server + private TokenBucket m_throttle; + /// Bandwidth throttle rates for this UDP server + private ThrottleRates m_throttleRates; + /// Manages authentication for agent circuits + private AgentCircuitManager m_circuitManager; + /// Reference to the scene this UDP server is attached to + private IScene m_scene; + /// The X/Y coordinates of the scene this UDP server is attached to + private Location m_location; + /// The measured resolution of Environment.TickCount + private float m_tickCountResolution; + + /// The measured resolution of Environment.TickCount + public float TickCountResolution { get { return m_tickCountResolution; } } + public Socket Server { get { return null; } } + + public LLUDPServer(IPAddress listenIP, ref uint port, int proxyPortOffsetParm, bool allow_alternate_port, IConfigSource configSource, AgentCircuitManager circuitManager) + : base((int)port) { + #region Environment.TickCount Measurement + + // Measure the resolution of Environment.TickCount + m_tickCountResolution = 0f; + for (int i = 0; i < 5; i++) + { + int start = Environment.TickCount; + int now = start; + while (now == start) + now = Environment.TickCount; + m_tickCountResolution += (float)(now - start) * 0.2f; + } + m_log.Info("[LLUDPSERVER]: Average Environment.TickCount resolution: " + TickCountResolution + "ms"); + + #endregion Environment.TickCount Measurement + + m_circuitManager = circuitManager; + + // TODO: Config support for throttling the entire connection + m_throttle = new TokenBucket(null, 0, 0); + m_throttleRates = new ThrottleRates(configSource); } - public LLUDPServer( - IPAddress _listenIP, ref uint port, int proxyPortOffset, bool allow_alternate_port, IConfigSource configSource, - AgentCircuitManager authenticateClass) + public new void Start() { - Initialise(_listenIP, ref port, proxyPortOffset, allow_alternate_port, configSource, authenticateClass); + if (m_scene == null) + throw new InvalidOperationException("Cannot LLUDPServer.Start() without an IScene reference"); + + base.Start(); + + // Start the incoming packet processing thread + Thread incomingThread = new Thread(IncomingPacketHandler); + incomingThread.Name = "Incoming Packets (" + m_scene.RegionInfo.RegionName + ")"; + incomingThread.Start(); + + Thread outgoingThread = new Thread(OutgoingPacketHandler); + outgoingThread.Name = "Outgoing Packets (" + m_scene.RegionInfo.RegionName + ")"; + outgoingThread.Start(); } - /// - /// Initialize the server - /// - /// - /// - /// - /// - /// - /// - /// - public void Initialise( - IPAddress _listenIP, ref uint port, int proxyPortOffsetParm, bool allow_alternate_port, IConfigSource configSource, - AgentCircuitManager circuitManager) + public new void Stop() { - ClientStackUserSettings userSettings = new ClientStackUserSettings(); - - IConfig config = configSource.Configs["ClientStack.LindenUDP"]; + base.Stop(); + } - if (config != null) + public void AddScene(IScene scene) + { + if (m_scene == null) { - if (config.Contains("client_throttle_max_bps")) - { - int maxBPS = config.GetInt("client_throttle_max_bps", 1500000); - userSettings.TotalThrottleSettings = new ThrottleSettings(0, maxBPS, - maxBPS > 28000 ? maxBPS : 28000); - } - - if (config.Contains("client_throttle_multiplier")) - userSettings.ClientThrottleMultipler = config.GetFloat("client_throttle_multiplier"); - if (config.Contains("client_socket_rcvbuf_size")) - m_clientSocketReceiveBuffer = config.GetInt("client_socket_rcvbuf_size"); - } - - m_log.DebugFormat("[CLIENT]: client_throttle_multiplier = {0}", userSettings.ClientThrottleMultipler); - m_log.DebugFormat("[CLIENT]: client_socket_rcvbuf_size = {0}", (m_clientSocketReceiveBuffer != 0 ? - m_clientSocketReceiveBuffer.ToString() : "OS default")); - - proxyPortOffset = proxyPortOffsetParm; - listenPort = (uint) (port + proxyPortOffsetParm); - listenIP = _listenIP; - Allow_Alternate_Port = allow_alternate_port; - m_circuitManager = circuitManager; - CreatePacketServer(userSettings); - - // Return new port - // This because in Grid mode it is not really important what port the region listens to as long as it is correctly registered. - // So the option allow_alternate_ports="true" was added to default.xml - port = (uint)(listenPort - proxyPortOffsetParm); + m_scene = scene; + m_location = new Location(m_scene.RegionInfo.RegionHandle); + } + else + { + m_log.Error("[LLUDPSERVER]: AddScene() called on an LLUDPServer that already has a scene"); + } } - protected virtual void CreatePacketServer(ClientStackUserSettings userSettings) + public bool HandlesRegion(Location x) { - new LLPacketServer(this, userSettings); + return x == m_location; } - /// - /// This method is called every time that we receive new UDP data. - /// - /// - protected virtual void OnReceivedData(IAsyncResult result) + public void RemoveClient(IClientAPI client) { - Packet packet = null; - int numBytes = 1; - EndPoint epSender = new IPEndPoint(IPAddress.Any, 0); - EndPoint epProxy = null; + m_scene.ClientManager.Remove(client.CircuitCode); + client.Close(false); - try + LLUDPClient udpClient; + if (clients.TryGetValue(client.AgentId, out udpClient)) { - if (EndReceive(out numBytes, result, ref epSender)) - { - // Make sure we are getting zeroes when running off the - // end of grab / degrab packets from old clients - Array.Clear(RecvBuffer, numBytes, RecvBuffer.Length - numBytes); - - int packetEnd = numBytes - 1; - if (proxyPortOffset != 0) packetEnd -= 6; - - try - { - packet = PacketPool.Instance.GetPacket(RecvBuffer, ref packetEnd, ZeroBuffer); - } - catch (MalformedDataException e) - { - m_log.DebugFormat("[CLIENT]: Dropped Malformed Packet due to MalformedDataException: {0}", e.StackTrace); - } - catch (IndexOutOfRangeException e) - { - m_log.DebugFormat("[CLIENT]: Dropped Malformed Packet due to IndexOutOfRangeException: {0}", e.StackTrace); - } - catch (Exception e) - { - m_log.Debug("[CLIENT]: " + e); - } - } - - - if (proxyPortOffset != 0) - { - // If we've received a use circuit packet, then we need to decode an endpoint proxy, if one exists, - // before allowing the RecvBuffer to be overwritten by the next packet. - if (packet != null && packet.Type == PacketType.UseCircuitCode) - { - epProxy = epSender; - } - - // Now decode the message from the proxy server - epSender = ProxyCodec.DecodeProxyMessage(RecvBuffer, ref numBytes); - } + m_log.Debug("[LLUDPSERVER]: Removing LLUDPClient for " + client.Name); + udpClient.Shutdown(); + clients.Remove(client.AgentId, udpClient.RemoteEndPoint); } - catch (Exception ex) + else { - m_log.ErrorFormat("[CLIENT]: Exception thrown during EndReceive(): {0}", ex); + m_log.Warn("[LLUDPSERVER]: Failed to remove LLUDPClient for " + client.Name); } + } - BeginRobustReceive(); + public void RemoveClient(LLUDPClient udpClient) + { + IClientAPI client; + + if (m_scene.ClientManager.TryGetClient(udpClient.CircuitCode, out client)) + RemoveClient(client); + else + m_log.Warn("[LLUDPSERVER]: Failed to lookup IClientAPI for LLUDPClient " + udpClient.AgentID); + } - if (packet != null) + public void SetClientPaused(UUID agentID, bool paused) + { + LLUDPClient client; + if (clients.TryGetValue(agentID, out client)) { - if (packet.Type == PacketType.UseCircuitCode) - AddNewClient((UseCircuitCodePacket)packet, epSender, epProxy); - else - ProcessInPacket(packet, epSender); + client.IsPaused = paused; + } + else + { + m_log.Warn("[LLUDPSERVER]: Attempted to pause/unpause unknown agent " + agentID); } } - - /// - /// Process a successfully received packet. - /// - /// - /// - protected virtual void ProcessInPacket(Packet packet, EndPoint epSender) + + public void BroadcastPacket(Packet packet, ThrottleOutPacketType category, bool sendToPausedAgents, bool allowSplitting) { - try + if (allowSplitting && packet.HasVariableBlocks) { - // do we already have a circuit for this endpoint - uint circuit; - bool ret; - - lock (clientCircuits) - { - ret = clientCircuits.TryGetValue(epSender, out circuit); - } + byte[][] datas = packet.ToBytesMultiple(); + int packetCount = datas.Length; - if (ret) - { - //if so then send packet to the packetserver - //m_log.DebugFormat( - // "[UDPSERVER]: For circuit {0} {1} got packet {2}", circuit, epSender, packet.Type); + if (packetCount > 1) + m_log.Debug("[LLUDPSERVER]: Split " + packet.Type + " packet into " + packetCount + " packets"); - m_packetServer.InPacket(circuit, packet); + for (int i = 0; i < packetCount; i++) + { + byte[] data = datas[i]; + clients.ForEach( + delegate(LLUDPClient client) + { SendPacketData(client, data, data.Length, packet.Type, packet.Header.Zerocoded, category); }); } } - catch (Exception e) + else { - m_log.Error("[CLIENT]: Exception in processing packet - ignoring: ", e); + byte[] data = packet.ToBytes(); + clients.ForEach( + delegate(LLUDPClient client) + { SendPacketData(client, data, data.Length, packet.Type, packet.Header.Zerocoded, category); }); } } - - /// - /// Begin an asynchronous receive of the next bit of raw data - /// - protected virtual void BeginReceive() + + public void SendPacket(UUID agentID, Packet packet, ThrottleOutPacketType category, bool allowSplitting) { - m_socket.BeginReceiveFrom( - RecvBuffer, 0, RecvBuffer.Length, SocketFlags.None, ref reusedEpSender, ReceivedData, null); + LLUDPClient client; + if (clients.TryGetValue(agentID, out client)) + SendPacket(client, packet, category, allowSplitting); + else + m_log.Warn("[LLUDPSERVER]: Attempted to send a packet to unknown agentID " + agentID); } - /// - /// Begin a robust asynchronous receive of the next bit of raw data. Robust means that SocketExceptions are - /// automatically dealt with until the next set of valid UDP data is received. - /// - private void BeginRobustReceive() + public void SendPacket(LLUDPClient client, Packet packet, ThrottleOutPacketType category, bool allowSplitting) { - bool done = false; - - while (!done) + if (allowSplitting && packet.HasVariableBlocks) { - try + byte[][] datas = packet.ToBytesMultiple(); + int packetCount = datas.Length; + + if (packetCount > 1) + m_log.Debug("[LLUDPSERVER]: Split " + packet.Type + " packet into " + packetCount + " packets"); + + for (int i = 0; i < packetCount; i++) { - BeginReceive(); - done = true; + byte[] data = datas[i]; + SendPacketData(client, data, data.Length, packet.Type, packet.Header.Zerocoded, category); } - catch (SocketException e) - { - // ENDLESS LOOP ON PURPOSE! - // Reset connection and get next UDP packet off the buffer - // If the UDP packet is part of the same stream, this will happen several hundreds of times before - // the next set of UDP data is for a valid client. + } + else + { + byte[] data = packet.ToBytes(); + SendPacketData(client, data, data.Length, packet.Type, packet.Header.Zerocoded, category); + } + } - try - { - CloseCircuit(e); - } - catch (Exception e2) - { - m_log.ErrorFormat( - "[CLIENT]: Exception thrown when trying to close the circuit for {0} - {1}", reusedEpSender, - e2); - } - } - catch (ObjectDisposedException) - { - m_log.Info( - "[UDPSERVER]: UDP Object disposed. No need to worry about this if you're restarting the simulator."); + public void SendPacketData(LLUDPClient client, byte[] data, int dataLength, PacketType type, bool doZerocode, ThrottleOutPacketType category) + { + // Frequency analysis of outgoing packet sizes shows a large clump of packets at each end of the spectrum. + // The vast majority of packets are less than 200 bytes, although due to asset transfers and packet splitting + // there are a decent number of packets in the 1000-1140 byte range. We allocate one of two sizes of data here + // to accomodate for both common scenarios and provide ample room for ACK appending in both + int bufferSize = (dataLength > 180) ? Packet.MTU : 200; + + UDPPacketBuffer buffer = new UDPPacketBuffer(client.RemoteEndPoint, bufferSize); - done = true; + // Zerocode if needed + if (doZerocode) + { + try { dataLength = Helpers.ZeroEncode(data, dataLength, buffer.Data); } + catch (IndexOutOfRangeException) + { + // The packet grew larger than the bufferSize while zerocoding. + // Remove the MSG_ZEROCODED flag and send the unencoded data + // instead + m_log.Info("[LLUDPSERVER]: Packet exceeded buffer size during zerocoding. Removing MSG_ZEROCODED flag"); + data[0] = (byte)(data[0] & ~Helpers.MSG_ZEROCODED); + Buffer.BlockCopy(data, 0, buffer.Data, 0, dataLength); } - catch (Exception ex) + } + else + { + Buffer.BlockCopy(data, 0, buffer.Data, 0, dataLength); + } + buffer.DataLength = dataLength; + + #region Queue or Send + + // Look up the UDPClient this is going to + OutgoingPacket outgoingPacket = new OutgoingPacket(client, buffer, category); + + if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket)) + SendPacketFinal(outgoingPacket); + + #endregion Queue or Send + } + + public void SendAcks(LLUDPClient client) + { + uint ack; + + if (client.PendingAcks.Dequeue(out ack)) + { + List blocks = new List(); + PacketAckPacket.PacketsBlock block = new PacketAckPacket.PacketsBlock(); + block.ID = ack; + blocks.Add(block); + + while (client.PendingAcks.Dequeue(out ack)) { - m_log.ErrorFormat("[CLIENT]: Exception thrown during BeginReceive(): {0}", ex); + block = new PacketAckPacket.PacketsBlock(); + block.ID = ack; + blocks.Add(block); } + + PacketAckPacket packet = new PacketAckPacket(); + packet.Header.Reliable = false; + packet.Packets = blocks.ToArray(); + + SendPacket(client, packet, ThrottleOutPacketType.Unknown, true); } } - /// - /// Close a client circuit. This is done in response to an exception on receive, and should not be called - /// normally. - /// - /// The exception that caused the close. Can be null if there was no exception - private void CloseCircuit(Exception e) + public void ResendUnacked(LLUDPClient client) { - uint circuit; - lock (clientCircuits) + if (client.NeedAcks.Count > 0) { - if (clientCircuits.TryGetValue(reusedEpSender, out circuit)) + List expiredPackets = client.NeedAcks.GetExpiredPackets(client.RTO); + + if (expiredPackets != null) { - m_packetServer.CloseCircuit(circuit); - - if (e != null) - m_log.ErrorFormat( - "[CLIENT]: Closed circuit {0} {1} due to exception {2}", circuit, reusedEpSender, e); + // Resend packets + for (int i = 0; i < expiredPackets.Count; i++) + { + OutgoingPacket outgoingPacket = expiredPackets[i]; + + // FIXME: Make this an .ini setting + if (outgoingPacket.ResendCount < 3) + { + //Logger.Debug(String.Format("Resending packet #{0} (attempt {1}), {2}ms have passed", + // outgoingPacket.SequenceNumber, outgoingPacket.ResendCount, Environment.TickCount - outgoingPacket.TickCount)); + + // Set the resent flag + outgoingPacket.Buffer.Data[0] = (byte)(outgoingPacket.Buffer.Data[0] | Helpers.MSG_RESENT); + outgoingPacket.Category = ThrottleOutPacketType.Resend; + + // The TickCount will be set to the current time when the packet + // is actually sent out again + outgoingPacket.TickCount = 0; + + Interlocked.Increment(ref outgoingPacket.ResendCount); + //Interlocked.Increment(ref Stats.ResentPackets); + + // Queue or (re)send the packet + if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket)) + SendPacketFinal(outgoingPacket); + } + else + { + m_log.DebugFormat("[LLUDPSERVER]: Dropping packet #{0} for agent {1} after {2} failed attempts", + outgoingPacket.SequenceNumber, outgoingPacket.Client.RemoteEndPoint, outgoingPacket.ResendCount); + + lock (client.NeedAcks.SyncRoot) + client.NeedAcks.RemoveUnsafe(outgoingPacket.SequenceNumber); + + //Interlocked.Increment(ref Stats.DroppedPackets); + + // Disconnect an agent if no packets are received for some time + //FIXME: Make 60 an .ini setting + if (Environment.TickCount - client.TickLastPacketReceived > 1000 * 60) + { + m_log.Warn("[LLUDPSERVER]: Ack timeout, disconnecting " + client.RemoteEndPoint); + + RemoveClient(client); + return; + } + } + } } } } - - /// - /// Finish the process of asynchronously receiving the next bit of raw data - /// - /// The number of bytes received. Will return 0 if no bytes were recieved - /// - /// The sender of the data - /// - protected virtual bool EndReceive(out int numBytes, IAsyncResult result, ref EndPoint epSender) + + public void Flush() + { + } + + protected override void PacketReceived(UDPPacketBuffer buffer) { - bool hasReceivedOkay = false; - numBytes = 0; - + // Debugging/Profiling + //try { Thread.CurrentThread.Name = "PacketReceived (" + scene.RegionName + ")"; } + //catch (Exception) { } + + LLUDPClient client = null; + Packet packet = null; + int packetEnd = buffer.DataLength - 1; + IPEndPoint address = (IPEndPoint)buffer.RemoteEndPoint; + + #region Decoding + try { - numBytes = m_socket.EndReceiveFrom(result, ref epSender); - hasReceivedOkay = true; + packet = Packet.BuildPacket(buffer.Data, ref packetEnd, + // Only allocate a buffer for zerodecoding if the packet is zerocoded + ((buffer.Data[0] & Helpers.MSG_ZEROCODED) != 0) ? new byte[4096] : null); } - catch (SocketException e) + catch (MalformedDataException) { - // TODO : Actually only handle those states that we have control over, re-throw everything else, - // TODO: implement cases as we encounter them. - //m_log.Error("[CLIENT]: Connection Error! - " + e.ToString()); - switch (e.SocketErrorCode) - { - case SocketError.AlreadyInProgress: - return hasReceivedOkay; + m_log.ErrorFormat("[LLUDPSERVER]: Malformed data, cannot parse packet:\n{0}", + Utils.BytesToHexString(buffer.Data, buffer.DataLength, null)); + } + + // Fail-safe check + if (packet == null) + { + m_log.Warn("[LLUDPSERVER]: Couldn't build a message from the incoming data"); + return; + } - case SocketError.NetworkReset: - case SocketError.ConnectionReset: - case SocketError.OperationAborted: - break; + //Stats.RecvBytes += (ulong)buffer.DataLength; + //++Stats.RecvPackets; - default: - throw; + #endregion Decoding + + #region UseCircuitCode Handling + + if (packet.Type == PacketType.UseCircuitCode) + { + UseCircuitCodePacket useCircuitCode = (UseCircuitCodePacket)packet; + IClientAPI newuser; + uint circuitCode = useCircuitCode.CircuitCode.Code; + + // Check if the client is already established + if (!m_scene.ClientManager.TryGetClient(circuitCode, out newuser)) + { + AddNewClient(useCircuitCode, (IPEndPoint)buffer.RemoteEndPoint); } } - catch (ObjectDisposedException e) + + // Determine which agent this packet came from + if (!clients.TryGetValue(address, out client)) { - m_log.DebugFormat("[CLIENT]: ObjectDisposedException: Object {0} disposed.", e.ObjectName); - // Uhh, what object, and why? this needs better handling. + m_log.Warn("[LLUDPSERVER]: Received a " + packet.Type + " packet from an unrecognized source: " + address); + return; } - - return hasReceivedOkay; - } - /// - /// Add a new client circuit. - /// - /// - /// - /// - protected virtual void AddNewClient(UseCircuitCodePacket useCircuit, EndPoint epSender, EndPoint epProxy) - { - //Slave regions don't accept new clients - if (m_localScene.RegionStatus != RegionStatus.SlaveScene) + #endregion UseCircuitCode Handling + + //if (packet.Header.Resent) + // Interlocked.Increment(ref Stats.ReceivedResends); + + #region ACK Receiving + + int now = Environment.TickCount; + client.TickLastPacketReceived = now; + + // Handle appended ACKs + if (packet.Header.AppendedAcks && packet.Header.AckList != null) { - AuthenticateResponse sessionInfo; - bool isNewCircuit = false; - - if (!m_packetServer.IsClientAuthorized(useCircuit, m_circuitManager, out sessionInfo)) + lock (client.NeedAcks.SyncRoot) { - m_log.WarnFormat( - "[CONNECTION FAILURE]: Connection request for client {0} connecting with unnotified circuit code {1} from {2}", - useCircuit.CircuitCode.ID, useCircuit.CircuitCode.Code, epSender); - - return; + for (int i = 0; i < packet.Header.AckList.Length; i++) + AcknowledgePacket(client, packet.Header.AckList[i], now, packet.Header.Resent); } - - lock (clientCircuits) + } + + // Handle PacketAck packets + if (packet.Type == PacketType.PacketAck) + { + PacketAckPacket ackPacket = (PacketAckPacket)packet; + + lock (client.NeedAcks.SyncRoot) { - if (!clientCircuits.ContainsKey(epSender)) - { - clientCircuits.Add(epSender, useCircuit.CircuitCode.Code); - isNewCircuit = true; - } + for (int i = 0; i < ackPacket.Packets.Length; i++) + AcknowledgePacket(client, ackPacket.Packets[i].ID, now, packet.Header.Resent); } + } - if (isNewCircuit) - { - // This doesn't need locking as it's synchronized data - clientCircuits_reverse[useCircuit.CircuitCode.Code] = epSender; + #endregion ACK Receiving - lock (proxyCircuits) - { - proxyCircuits[useCircuit.CircuitCode.Code] = epProxy; - } - - m_packetServer.AddNewClient(epSender, useCircuit, sessionInfo, epProxy); - - //m_log.DebugFormat( - // "[CONNECTION SUCCESS]: Incoming client {0} (circuit code {1}) received and authenticated for {2}", - // useCircuit.CircuitCode.ID, useCircuit.CircuitCode.Code, m_localScene.RegionInfo.RegionName); - } + #region ACK Sending + + if (packet.Header.Reliable) + client.PendingAcks.Enqueue((uint)packet.Header.Sequence); + + // This is a somewhat odd sequence of steps to pull the client.BytesSinceLastACK value out, + // add the current received bytes to it, test if 2*MTU bytes have been sent, if so remove + // 2*MTU bytes from the value and send ACKs, and finally add the local value back to + // client.BytesSinceLastACK. Lockless thread safety + int bytesSinceLastACK = Interlocked.Exchange(ref client.BytesSinceLastACK, 0); + bytesSinceLastACK += buffer.DataLength; + if (bytesSinceLastACK > Packet.MTU * 2) + { + bytesSinceLastACK -= Packet.MTU * 2; + SendAcks(client); + } + Interlocked.Add(ref client.BytesSinceLastACK, bytesSinceLastACK); + + #endregion ACK Sending + + #region Incoming Packet Accounting + + // Check the archive of received reliable packet IDs to see whether we already received this packet + if (packet.Header.Reliable && !client.PacketArchive.TryEnqueue(packet.Header.Sequence)) + { + if (packet.Header.Resent) + m_log.Debug("[LLUDPSERVER]: Received a resend of already processed packet #" + packet.Header.Sequence + ", type: " + packet.Type); + else + m_log.Warn("[LLUDPSERVER]: Received a duplicate (not marked as resend) of packet #" + packet.Header.Sequence + ", type: " + packet.Type); + + // Avoid firing a callback twice for the same packet + return; } - - // Ack the UseCircuitCode packet - PacketAckPacket ack_it = (PacketAckPacket)PacketPool.Instance.GetPacket(PacketType.PacketAck); - // TODO: don't create new blocks if recycling an old packet - ack_it.Packets = new PacketAckPacket.PacketsBlock[1]; - ack_it.Packets[0] = new PacketAckPacket.PacketsBlock(); - ack_it.Packets[0].ID = useCircuit.Header.Sequence; - // ((useCircuit.Header.Sequence < uint.MaxValue) ? useCircuit.Header.Sequence : 0) is just a failsafe to ensure that we don't overflow. - ack_it.Header.Sequence = ((useCircuit.Header.Sequence < uint.MaxValue) ? useCircuit.Header.Sequence : 0) + 1; - ack_it.Header.Reliable = false; - byte[] ackmsg = ack_it.ToBytes(); + #endregion Incoming Packet Accounting - // Need some extra space in case we need to add proxy - // information to the message later - byte[] msg = new byte[4096]; - Buffer.BlockCopy(ackmsg, 0, msg, 0, ackmsg.Length); + // Don't bother clogging up the queue with PacketAck packets that are already handled here + if (packet.Type != PacketType.PacketAck) + { + // Inbox insertion + IncomingPacket incomingPacket; + incomingPacket.Client = client; + incomingPacket.Packet = packet; - SendPacketTo(msg, ackmsg.Length, SocketFlags.None, useCircuit.CircuitCode.Code); + packetInbox.Enqueue(incomingPacket); + } + } - PacketPool.Instance.ReturnPacket(useCircuit); + protected override void PacketSent(UDPPacketBuffer buffer, int bytesSent) + { } - public void ServerListener() + private bool IsClientAuthorized(UseCircuitCodePacket useCircuitCode, out AuthenticateResponse sessionInfo) { - uint newPort = listenPort; - m_log.Info("[UDPSERVER]: Opening UDP socket on " + listenIP + " " + newPort + "."); + UUID agentID = useCircuitCode.CircuitCode.ID; + UUID sessionID = useCircuitCode.CircuitCode.SessionID; + uint circuitCode = useCircuitCode.CircuitCode.Code; - ServerIncoming = new IPEndPoint(listenIP, (int)newPort); - m_socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); - if (0 != m_clientSocketReceiveBuffer) - m_socket.ReceiveBufferSize = m_clientSocketReceiveBuffer; - m_socket.Bind(ServerIncoming); - // Add flags to the UDP socket to prevent "Socket forcibly closed by host" - // uint IOC_IN = 0x80000000; - // uint IOC_VENDOR = 0x18000000; - // uint SIO_UDP_CONNRESET = IOC_IN | IOC_VENDOR | 12; - // TODO: this apparently works in .NET but not in Mono, need to sort out the right flags here. - // m_socket.IOControl((int)SIO_UDP_CONNRESET, new byte[] { Convert.ToByte(false) }, null); + sessionInfo = m_circuitManager.AuthenticateSession(sessionID, agentID, circuitCode); + return sessionInfo.Authorised; + } - listenPort = newPort; + private void AddNewClient(UseCircuitCodePacket useCircuitCode, IPEndPoint remoteEndPoint) + { + //Slave regions don't accept new clients + if (m_scene.RegionStatus != RegionStatus.SlaveScene) + { + AuthenticateResponse sessionInfo; + bool isNewCircuit = !clients.ContainsKey(remoteEndPoint); - m_log.Info("[UDPSERVER]: UDP socket bound, getting ready to listen"); + if (!IsClientAuthorized(useCircuitCode, out sessionInfo)) + { + m_log.WarnFormat( + "[CONNECTION FAILURE]: Connection request for client {0} connecting with unnotified circuit code {1} from {2}", + useCircuitCode.CircuitCode.ID, useCircuitCode.CircuitCode.Code, remoteEndPoint); + return; + } - ReceivedData = OnReceivedData; - BeginReceive(); + if (isNewCircuit) + { + UUID agentID = useCircuitCode.CircuitCode.ID; + UUID sessionID = useCircuitCode.CircuitCode.SessionID; + uint circuitCode = useCircuitCode.CircuitCode.Code; - m_log.Info("[UDPSERVER]: Listening on port " + newPort); + AddClient(circuitCode, agentID, sessionID, remoteEndPoint, sessionInfo); + } + } } - public virtual void RegisterPacketServer(LLPacketServer server) + private void AddClient(uint circuitCode, UUID agentID, UUID sessionID, IPEndPoint remoteEndPoint, AuthenticateResponse sessionInfo) { - m_packetServer = server; + // Create the LLUDPClient + LLUDPClient client = new LLUDPClient(this, m_throttleRates, m_throttle, circuitCode, agentID, remoteEndPoint); + clients.Add(agentID, client.RemoteEndPoint, client); + + // Create the IClientAPI + IClientAPI clientApi = new LLClientView(remoteEndPoint, m_scene, this, client, sessionInfo, agentID, sessionID, circuitCode); + clientApi.OnViewerEffect += m_scene.ClientManager.ViewerEffectHandler; + clientApi.OnLogout += LogoutHandler; + clientApi.OnConnectionClosed += RemoveClient; + + // Give LLUDPClient a reference to IClientAPI + client.ClientAPI = clientApi; + + // Start the IClientAPI + m_scene.ClientManager.Add(circuitCode, clientApi); + clientApi.Start(); } - public virtual void SendPacketTo(byte[] buffer, int size, SocketFlags flags, uint circuitcode) - //EndPoint packetSender) + private void AcknowledgePacket(LLUDPClient client, uint ack, int currentTime, bool fromResend) { - // find the endpoint for this circuit - EndPoint sendto; - try - { - sendto = (EndPoint)clientCircuits_reverse[circuitcode]; - } - catch + OutgoingPacket ackedPacket; + if (client.NeedAcks.RemoveUnsafe(ack, out ackedPacket) && !fromResend) { - // Exceptions here mean there is no circuit - m_log.Warn("[CLIENT]: Circuit not found, not sending packet"); - return; + // Calculate the round-trip time for this packet and its ACK + int rtt = currentTime - ackedPacket.TickCount; + if (rtt > 0) + client.UpdateRoundTrip(rtt); } + } + + private void IncomingPacketHandler() + { + IncomingPacket incomingPacket = new IncomingPacket(); + Packet packet = null; + LLUDPClient client = null; - if (sendto != null) + while (base.IsRunning) { - //we found the endpoint so send the packet to it - if (proxyPortOffset != 0) - { - //MainLog.Instance.Verbose("UDPSERVER", "SendPacketTo proxy " + proxyCircuits[circuitcode].ToString() + ": client " + sendto.ToString()); - ProxyCodec.EncodeProxyMessage(buffer, ref size, sendto); - m_socket.SendTo(buffer, size, flags, proxyCircuits[circuitcode]); - } - else + // Reset packet to null for the check below + packet = null; + + if (packetInbox.Dequeue(100, ref incomingPacket)) { - //MainLog.Instance.Verbose("UDPSERVER", "SendPacketTo : client " + sendto.ToString()); - try - { - m_socket.SendTo(buffer, size, flags, sendto); - } - catch (SocketException SockE) - { - m_log.ErrorFormat("[UDPSERVER]: Caught Socket Error in the send buffer!. {0}",SockE.ToString()); - } + packet = incomingPacket.Packet; + client = incomingPacket.Client; + + if (packet != null && client != null) + client.ClientAPI.ProcessInPacket(packet); } } + + if (packetInbox.Count > 0) + m_log.Warn("[LLUDPSERVER]: IncomingPacketHandler is shutting down, dropping " + packetInbox.Count + " packets"); + packetInbox.Clear(); } - public virtual void RemoveClientCircuit(uint circuitcode) + private void OutgoingPacketHandler() { - EndPoint sendto; - if (clientCircuits_reverse.Contains(circuitcode)) + int now = Environment.TickCount; + int elapsedMS = 0; + int elapsed100MS = 0; + + while (base.IsRunning) { - sendto = (EndPoint)clientCircuits_reverse[circuitcode]; + bool resendUnacked = false; + bool sendAcks = false; + bool packetSent = false; - clientCircuits_reverse.Remove(circuitcode); + elapsedMS += Environment.TickCount - now; - lock (clientCircuits) + // Check for packets that need to be resent every 100ms + if (elapsedMS >= 100) { - if (sendto != null) - { - clientCircuits.Remove(sendto); - } - else - { - m_log.DebugFormat( - "[CLIENT]: endpoint for circuit code {0} in RemoveClientCircuit() was unexpectedly null!", circuitcode); - } + resendUnacked = true; + elapsedMS -= 100; + ++elapsed100MS; } - lock (proxyCircuits) + // Check for ACKs that need to be sent out every 500ms + if (elapsed100MS >= 5) { - proxyCircuits.Remove(circuitcode); + sendAcks = true; + elapsed100MS = 0; } + + clients.ForEach( + delegate(LLUDPClient client) + { + if (client.DequeueOutgoing()) + packetSent = true; + if (resendUnacked) + ResendUnacked(client); + if (sendAcks) + SendAcks(client); + } + ); + + if (!packetSent) + Thread.Sleep(20); } } - public void RestoreClient(AgentCircuitData circuit, EndPoint userEP, EndPoint proxyEP) + private void LogoutHandler(IClientAPI client) { - //MainLog.Instance.Verbose("UDPSERVER", "RestoreClient"); + client.SendLogoutPacket(); + RemoveClient(client); + } + + internal void SendPacketFinal(OutgoingPacket outgoingPacket) + { + UDPPacketBuffer buffer = outgoingPacket.Buffer; + byte flags = buffer.Data[0]; + bool isResend = (flags & Helpers.MSG_RESENT) != 0; + bool isReliable = (flags & Helpers.MSG_RELIABLE) != 0; + LLUDPClient client = outgoingPacket.Client; + + // Keep track of when this packet was sent out (right now) + outgoingPacket.TickCount = Environment.TickCount; - UseCircuitCodePacket useCircuit = new UseCircuitCodePacket(); - useCircuit.CircuitCode.Code = circuit.circuitcode; - useCircuit.CircuitCode.ID = circuit.AgentID; - useCircuit.CircuitCode.SessionID = circuit.SessionID; - - AuthenticateResponse sessionInfo; - - if (!m_packetServer.IsClientAuthorized(useCircuit, m_circuitManager, out sessionInfo)) + #region ACK Appending + + int dataLength = buffer.DataLength; + + // Keep appending ACKs until there is no room left in the packet or there are + // no more ACKs to append + uint ackCount = 0; + uint ack; + while (dataLength + 5 < buffer.Data.Length && client.PendingAcks.Dequeue(out ack)) { - m_log.WarnFormat( - "[CLIENT]: Restore request denied to avatar {0} connecting with unauthorized circuit code {1}", - useCircuit.CircuitCode.ID, useCircuit.CircuitCode.Code); - - return; + Utils.UIntToBytesBig(ack, buffer.Data, dataLength); + dataLength += 4; + ++ackCount; } - lock (clientCircuits) + if (ackCount > 0) { - if (!clientCircuits.ContainsKey(userEP)) - clientCircuits.Add(userEP, useCircuit.CircuitCode.Code); - else - m_log.Error("[CLIENT]: clientCircuits already contains entry for user " + useCircuit.CircuitCode.Code + ". NOT adding."); + // Set the last byte of the packet equal to the number of appended ACKs + buffer.Data[dataLength++] = (byte)ackCount; + // Set the appended ACKs flag on this packet + buffer.Data[0] = (byte)(buffer.Data[0] | Helpers.MSG_APPENDED_ACKS); } - // This data structure is synchronized, so we don't need the lock - if (!clientCircuits_reverse.ContainsKey(useCircuit.CircuitCode.Code)) - clientCircuits_reverse.Add(useCircuit.CircuitCode.Code, userEP); - else - m_log.Error("[CLIENT]: clientCurcuits_reverse already contains entry for user " + useCircuit.CircuitCode.Code + ". NOT adding."); + buffer.DataLength = dataLength; - lock (proxyCircuits) + #endregion ACK Appending + + if (!isResend) { - if (!proxyCircuits.ContainsKey(useCircuit.CircuitCode.Code)) - { - proxyCircuits.Add(useCircuit.CircuitCode.Code, proxyEP); - } - else + // Not a resend, assign a new sequence number + uint sequenceNumber = (uint)Interlocked.Increment(ref client.CurrentSequence); + Utils.UIntToBytesBig(sequenceNumber, buffer.Data, 1); + outgoingPacket.SequenceNumber = sequenceNumber; + + if (isReliable) { - // re-set proxy endpoint - proxyCircuits.Remove(useCircuit.CircuitCode.Code); - proxyCircuits.Add(useCircuit.CircuitCode.Code, proxyEP); + // Add this packet to the list of ACK responses we are waiting on from the server + client.NeedAcks.Add(outgoingPacket); } } - m_packetServer.AddNewClient(userEP, useCircuit, sessionInfo, proxyEP); + // Put the UDP payload on the wire + AsyncBeginSend(buffer); } } } -- cgit v1.1