From 429a84f390212d0f414a08420707fc90aca2a331 Mon Sep 17 00:00:00 2001
From: John Hurliman
Date: Mon, 5 Oct 2009 17:38:14 -0700
Subject: Beginning work on the new LLUDP implementation
---
.../Region/ClientStack/LindenUDP/LLUDPServer.cs | 1003 +++++++++++---------
1 file changed, 557 insertions(+), 446 deletions(-)
(limited to 'OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs')
diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
index c779b08..7964c50 100644
--- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
+++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
@@ -26,616 +26,727 @@
*/
using System;
-using System.Collections;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
+using System.Threading;
using log4net;
using Nini.Config;
using OpenMetaverse.Packets;
using OpenSim.Framework;
using OpenSim.Region.Framework.Scenes;
+using OpenMetaverse;
namespace OpenSim.Region.ClientStack.LindenUDP
{
- ///
- /// This class handles the initial UDP circuit setup with a client and passes on subsequent packets to the LLPacketServer
- ///
- public class LLUDPServer : ILLClientStackNetworkHandler, IClientNetworkServer
+ public class LLUDPServerShim : IClientNetworkServer
{
- private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
-
- ///
- /// The client circuits established with this UDP server. If a client exists here we can also assume that
- /// it is populated in clientCircuits_reverse and proxyCircuits (if relevant)
- ///
- protected Dictionary clientCircuits = new Dictionary();
- public Hashtable clientCircuits_reverse = Hashtable.Synchronized(new Hashtable());
- protected Dictionary proxyCircuits = new Dictionary();
-
- private Socket m_socket;
- protected IPEndPoint ServerIncoming;
- protected byte[] RecvBuffer = new byte[4096];
- protected byte[] ZeroBuffer = new byte[8192];
-
- ///
- /// This is an endpoint that is reused where we don't need to protect the information from potentially
- /// being stomped on by other threads.
- ///
- protected EndPoint reusedEpSender = new IPEndPoint(IPAddress.Any, 0);
-
- protected int proxyPortOffset;
-
- protected AsyncCallback ReceivedData;
- protected LLPacketServer m_packetServer;
- protected Location m_location;
-
- protected uint listenPort;
- protected bool Allow_Alternate_Port;
- protected IPAddress listenIP = IPAddress.Parse("0.0.0.0");
- protected IScene m_localScene;
- protected int m_clientSocketReceiveBuffer = 0;
+ LLUDPServer m_udpServer;
- ///
- /// Manages authentication for agent circuits
- ///
- protected AgentCircuitManager m_circuitManager;
-
- public IScene LocalScene
+ public LLUDPServerShim()
{
- set
- {
- m_localScene = value;
- m_packetServer.LocalScene = m_localScene;
-
- m_location = new Location(m_localScene.RegionInfo.RegionHandle);
- }
}
- public ulong RegionHandle
+ public void Initialise(IPAddress listenIP, ref uint port, int proxyPortOffsetParm, bool allow_alternate_port, IConfigSource configSource, AgentCircuitManager circuitManager)
{
- get { return m_location.RegionHandle; }
+ m_udpServer = new LLUDPServer(listenIP, ref port, proxyPortOffsetParm, allow_alternate_port, configSource, circuitManager);
}
- Socket IClientNetworkServer.Server
+ public void NetworkStop()
{
- get { return m_socket; }
+ m_udpServer.Stop();
}
- public bool HandlesRegion(Location x)
+ public void AddScene(IScene scene)
{
- //return x.RegionHandle == m_location.RegionHandle;
- return x == m_location;
+ m_udpServer.AddScene(scene);
}
- public void AddScene(IScene x)
+ public bool HandlesRegion(Location x)
{
- LocalScene = x;
+ return m_udpServer.HandlesRegion(x);
}
public void Start()
{
- ServerListener();
+ m_udpServer.Start();
}
public void Stop()
{
- m_socket.Close();
+ m_udpServer.Stop();
}
+ }
+
+ public class LLUDPServer : UDPBase
+ {
+ private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
- public LLUDPServer()
+ /// Handlers for incoming packets
+ //PacketEventDictionary packetEvents = new PacketEventDictionary();
+ /// Incoming packets that are awaiting handling
+ private OpenMetaverse.BlockingQueue packetInbox = new OpenMetaverse.BlockingQueue();
+ ///
+ private UDPClientCollection clients = new UDPClientCollection();
+ /// Bandwidth throttle for this UDP server
+ private TokenBucket m_throttle;
+ /// Bandwidth throttle rates for this UDP server
+ private ThrottleRates m_throttleRates;
+ /// Manages authentication for agent circuits
+ private AgentCircuitManager m_circuitManager;
+ /// Reference to the scene this UDP server is attached to
+ private IScene m_scene;
+ /// The X/Y coordinates of the scene this UDP server is attached to
+ private Location m_location;
+ /// The measured resolution of Environment.TickCount
+ private float m_tickCountResolution;
+
+ /// The measured resolution of Environment.TickCount
+ public float TickCountResolution { get { return m_tickCountResolution; } }
+ public Socket Server { get { return null; } }
+
+ public LLUDPServer(IPAddress listenIP, ref uint port, int proxyPortOffsetParm, bool allow_alternate_port, IConfigSource configSource, AgentCircuitManager circuitManager)
+ : base((int)port)
{
+ #region Environment.TickCount Measurement
+
+ // Measure the resolution of Environment.TickCount
+ m_tickCountResolution = 0f;
+ for (int i = 0; i < 5; i++)
+ {
+ int start = Environment.TickCount;
+ int now = start;
+ while (now == start)
+ now = Environment.TickCount;
+ m_tickCountResolution += (float)(now - start) * 0.2f;
+ }
+ m_log.Info("[LLUDPSERVER]: Average Environment.TickCount resolution: " + TickCountResolution + "ms");
+
+ #endregion Environment.TickCount Measurement
+
+ m_circuitManager = circuitManager;
+
+ // TODO: Config support for throttling the entire connection
+ m_throttle = new TokenBucket(null, 0, 0);
+ m_throttleRates = new ThrottleRates(configSource);
}
- public LLUDPServer(
- IPAddress _listenIP, ref uint port, int proxyPortOffset, bool allow_alternate_port, IConfigSource configSource,
- AgentCircuitManager authenticateClass)
+ public new void Start()
{
- Initialise(_listenIP, ref port, proxyPortOffset, allow_alternate_port, configSource, authenticateClass);
+ if (m_scene == null)
+ throw new InvalidOperationException("Cannot LLUDPServer.Start() without an IScene reference");
+
+ base.Start();
+
+ // Start the incoming packet processing thread
+ Thread incomingThread = new Thread(IncomingPacketHandler);
+ incomingThread.Name = "Incoming Packets (" + m_scene.RegionInfo.RegionName + ")";
+ incomingThread.Start();
+
+ Thread outgoingThread = new Thread(OutgoingPacketHandler);
+ outgoingThread.Name = "Outgoing Packets (" + m_scene.RegionInfo.RegionName + ")";
+ outgoingThread.Start();
}
- ///
- /// Initialize the server
- ///
- ///
- ///
- ///
- ///
- ///
- ///
- ///
- public void Initialise(
- IPAddress _listenIP, ref uint port, int proxyPortOffsetParm, bool allow_alternate_port, IConfigSource configSource,
- AgentCircuitManager circuitManager)
+ public new void Stop()
{
- ClientStackUserSettings userSettings = new ClientStackUserSettings();
-
- IConfig config = configSource.Configs["ClientStack.LindenUDP"];
+ base.Stop();
+ }
- if (config != null)
+ public void AddScene(IScene scene)
+ {
+ if (m_scene == null)
{
- if (config.Contains("client_throttle_max_bps"))
- {
- int maxBPS = config.GetInt("client_throttle_max_bps", 1500000);
- userSettings.TotalThrottleSettings = new ThrottleSettings(0, maxBPS,
- maxBPS > 28000 ? maxBPS : 28000);
- }
-
- if (config.Contains("client_throttle_multiplier"))
- userSettings.ClientThrottleMultipler = config.GetFloat("client_throttle_multiplier");
- if (config.Contains("client_socket_rcvbuf_size"))
- m_clientSocketReceiveBuffer = config.GetInt("client_socket_rcvbuf_size");
- }
-
- m_log.DebugFormat("[CLIENT]: client_throttle_multiplier = {0}", userSettings.ClientThrottleMultipler);
- m_log.DebugFormat("[CLIENT]: client_socket_rcvbuf_size = {0}", (m_clientSocketReceiveBuffer != 0 ?
- m_clientSocketReceiveBuffer.ToString() : "OS default"));
-
- proxyPortOffset = proxyPortOffsetParm;
- listenPort = (uint) (port + proxyPortOffsetParm);
- listenIP = _listenIP;
- Allow_Alternate_Port = allow_alternate_port;
- m_circuitManager = circuitManager;
- CreatePacketServer(userSettings);
-
- // Return new port
- // This because in Grid mode it is not really important what port the region listens to as long as it is correctly registered.
- // So the option allow_alternate_ports="true" was added to default.xml
- port = (uint)(listenPort - proxyPortOffsetParm);
+ m_scene = scene;
+ m_location = new Location(m_scene.RegionInfo.RegionHandle);
+ }
+ else
+ {
+ m_log.Error("[LLUDPSERVER]: AddScene() called on an LLUDPServer that already has a scene");
+ }
}
- protected virtual void CreatePacketServer(ClientStackUserSettings userSettings)
+ public bool HandlesRegion(Location x)
{
- new LLPacketServer(this, userSettings);
+ return x == m_location;
}
- ///
- /// This method is called every time that we receive new UDP data.
- ///
- ///
- protected virtual void OnReceivedData(IAsyncResult result)
+ public void RemoveClient(IClientAPI client)
{
- Packet packet = null;
- int numBytes = 1;
- EndPoint epSender = new IPEndPoint(IPAddress.Any, 0);
- EndPoint epProxy = null;
+ m_scene.ClientManager.Remove(client.CircuitCode);
+ client.Close(false);
- try
+ LLUDPClient udpClient;
+ if (clients.TryGetValue(client.AgentId, out udpClient))
{
- if (EndReceive(out numBytes, result, ref epSender))
- {
- // Make sure we are getting zeroes when running off the
- // end of grab / degrab packets from old clients
- Array.Clear(RecvBuffer, numBytes, RecvBuffer.Length - numBytes);
-
- int packetEnd = numBytes - 1;
- if (proxyPortOffset != 0) packetEnd -= 6;
-
- try
- {
- packet = PacketPool.Instance.GetPacket(RecvBuffer, ref packetEnd, ZeroBuffer);
- }
- catch (MalformedDataException e)
- {
- m_log.DebugFormat("[CLIENT]: Dropped Malformed Packet due to MalformedDataException: {0}", e.StackTrace);
- }
- catch (IndexOutOfRangeException e)
- {
- m_log.DebugFormat("[CLIENT]: Dropped Malformed Packet due to IndexOutOfRangeException: {0}", e.StackTrace);
- }
- catch (Exception e)
- {
- m_log.Debug("[CLIENT]: " + e);
- }
- }
-
-
- if (proxyPortOffset != 0)
- {
- // If we've received a use circuit packet, then we need to decode an endpoint proxy, if one exists,
- // before allowing the RecvBuffer to be overwritten by the next packet.
- if (packet != null && packet.Type == PacketType.UseCircuitCode)
- {
- epProxy = epSender;
- }
-
- // Now decode the message from the proxy server
- epSender = ProxyCodec.DecodeProxyMessage(RecvBuffer, ref numBytes);
- }
+ m_log.Debug("[LLUDPSERVER]: Removing LLUDPClient for " + client.Name);
+ udpClient.Shutdown();
+ clients.Remove(client.AgentId, udpClient.RemoteEndPoint);
}
- catch (Exception ex)
+ else
{
- m_log.ErrorFormat("[CLIENT]: Exception thrown during EndReceive(): {0}", ex);
+ m_log.Warn("[LLUDPSERVER]: Failed to remove LLUDPClient for " + client.Name);
}
+ }
- BeginRobustReceive();
+ public void RemoveClient(LLUDPClient udpClient)
+ {
+ IClientAPI client;
+
+ if (m_scene.ClientManager.TryGetClient(udpClient.CircuitCode, out client))
+ RemoveClient(client);
+ else
+ m_log.Warn("[LLUDPSERVER]: Failed to lookup IClientAPI for LLUDPClient " + udpClient.AgentID);
+ }
- if (packet != null)
+ public void SetClientPaused(UUID agentID, bool paused)
+ {
+ LLUDPClient client;
+ if (clients.TryGetValue(agentID, out client))
{
- if (packet.Type == PacketType.UseCircuitCode)
- AddNewClient((UseCircuitCodePacket)packet, epSender, epProxy);
- else
- ProcessInPacket(packet, epSender);
+ client.IsPaused = paused;
+ }
+ else
+ {
+ m_log.Warn("[LLUDPSERVER]: Attempted to pause/unpause unknown agent " + agentID);
}
}
-
- ///
- /// Process a successfully received packet.
- ///
- ///
- ///
- protected virtual void ProcessInPacket(Packet packet, EndPoint epSender)
+
+ public void BroadcastPacket(Packet packet, ThrottleOutPacketType category, bool sendToPausedAgents, bool allowSplitting)
{
- try
+ if (allowSplitting && packet.HasVariableBlocks)
{
- // do we already have a circuit for this endpoint
- uint circuit;
- bool ret;
-
- lock (clientCircuits)
- {
- ret = clientCircuits.TryGetValue(epSender, out circuit);
- }
+ byte[][] datas = packet.ToBytesMultiple();
+ int packetCount = datas.Length;
- if (ret)
- {
- //if so then send packet to the packetserver
- //m_log.DebugFormat(
- // "[UDPSERVER]: For circuit {0} {1} got packet {2}", circuit, epSender, packet.Type);
+ if (packetCount > 1)
+ m_log.Debug("[LLUDPSERVER]: Split " + packet.Type + " packet into " + packetCount + " packets");
- m_packetServer.InPacket(circuit, packet);
+ for (int i = 0; i < packetCount; i++)
+ {
+ byte[] data = datas[i];
+ clients.ForEach(
+ delegate(LLUDPClient client)
+ { SendPacketData(client, data, data.Length, packet.Type, packet.Header.Zerocoded, category); });
}
}
- catch (Exception e)
+ else
{
- m_log.Error("[CLIENT]: Exception in processing packet - ignoring: ", e);
+ byte[] data = packet.ToBytes();
+ clients.ForEach(
+ delegate(LLUDPClient client)
+ { SendPacketData(client, data, data.Length, packet.Type, packet.Header.Zerocoded, category); });
}
}
-
- ///
- /// Begin an asynchronous receive of the next bit of raw data
- ///
- protected virtual void BeginReceive()
+
+ public void SendPacket(UUID agentID, Packet packet, ThrottleOutPacketType category, bool allowSplitting)
{
- m_socket.BeginReceiveFrom(
- RecvBuffer, 0, RecvBuffer.Length, SocketFlags.None, ref reusedEpSender, ReceivedData, null);
+ LLUDPClient client;
+ if (clients.TryGetValue(agentID, out client))
+ SendPacket(client, packet, category, allowSplitting);
+ else
+ m_log.Warn("[LLUDPSERVER]: Attempted to send a packet to unknown agentID " + agentID);
}
- ///
- /// Begin a robust asynchronous receive of the next bit of raw data. Robust means that SocketExceptions are
- /// automatically dealt with until the next set of valid UDP data is received.
- ///
- private void BeginRobustReceive()
+ public void SendPacket(LLUDPClient client, Packet packet, ThrottleOutPacketType category, bool allowSplitting)
{
- bool done = false;
-
- while (!done)
+ if (allowSplitting && packet.HasVariableBlocks)
{
- try
+ byte[][] datas = packet.ToBytesMultiple();
+ int packetCount = datas.Length;
+
+ if (packetCount > 1)
+ m_log.Debug("[LLUDPSERVER]: Split " + packet.Type + " packet into " + packetCount + " packets");
+
+ for (int i = 0; i < packetCount; i++)
{
- BeginReceive();
- done = true;
+ byte[] data = datas[i];
+ SendPacketData(client, data, data.Length, packet.Type, packet.Header.Zerocoded, category);
}
- catch (SocketException e)
- {
- // ENDLESS LOOP ON PURPOSE!
- // Reset connection and get next UDP packet off the buffer
- // If the UDP packet is part of the same stream, this will happen several hundreds of times before
- // the next set of UDP data is for a valid client.
+ }
+ else
+ {
+ byte[] data = packet.ToBytes();
+ SendPacketData(client, data, data.Length, packet.Type, packet.Header.Zerocoded, category);
+ }
+ }
- try
- {
- CloseCircuit(e);
- }
- catch (Exception e2)
- {
- m_log.ErrorFormat(
- "[CLIENT]: Exception thrown when trying to close the circuit for {0} - {1}", reusedEpSender,
- e2);
- }
- }
- catch (ObjectDisposedException)
- {
- m_log.Info(
- "[UDPSERVER]: UDP Object disposed. No need to worry about this if you're restarting the simulator.");
+ public void SendPacketData(LLUDPClient client, byte[] data, int dataLength, PacketType type, bool doZerocode, ThrottleOutPacketType category)
+ {
+ // Frequency analysis of outgoing packet sizes shows a large clump of packets at each end of the spectrum.
+ // The vast majority of packets are less than 200 bytes, although due to asset transfers and packet splitting
+ // there are a decent number of packets in the 1000-1140 byte range. We allocate one of two sizes of data here
+ // to accomodate for both common scenarios and provide ample room for ACK appending in both
+ int bufferSize = (dataLength > 180) ? Packet.MTU : 200;
+
+ UDPPacketBuffer buffer = new UDPPacketBuffer(client.RemoteEndPoint, bufferSize);
- done = true;
+ // Zerocode if needed
+ if (doZerocode)
+ {
+ try { dataLength = Helpers.ZeroEncode(data, dataLength, buffer.Data); }
+ catch (IndexOutOfRangeException)
+ {
+ // The packet grew larger than the bufferSize while zerocoding.
+ // Remove the MSG_ZEROCODED flag and send the unencoded data
+ // instead
+ m_log.Info("[LLUDPSERVER]: Packet exceeded buffer size during zerocoding. Removing MSG_ZEROCODED flag");
+ data[0] = (byte)(data[0] & ~Helpers.MSG_ZEROCODED);
+ Buffer.BlockCopy(data, 0, buffer.Data, 0, dataLength);
}
- catch (Exception ex)
+ }
+ else
+ {
+ Buffer.BlockCopy(data, 0, buffer.Data, 0, dataLength);
+ }
+ buffer.DataLength = dataLength;
+
+ #region Queue or Send
+
+ // Look up the UDPClient this is going to
+ OutgoingPacket outgoingPacket = new OutgoingPacket(client, buffer, category);
+
+ if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket))
+ SendPacketFinal(outgoingPacket);
+
+ #endregion Queue or Send
+ }
+
+ public void SendAcks(LLUDPClient client)
+ {
+ uint ack;
+
+ if (client.PendingAcks.Dequeue(out ack))
+ {
+ List blocks = new List();
+ PacketAckPacket.PacketsBlock block = new PacketAckPacket.PacketsBlock();
+ block.ID = ack;
+ blocks.Add(block);
+
+ while (client.PendingAcks.Dequeue(out ack))
{
- m_log.ErrorFormat("[CLIENT]: Exception thrown during BeginReceive(): {0}", ex);
+ block = new PacketAckPacket.PacketsBlock();
+ block.ID = ack;
+ blocks.Add(block);
}
+
+ PacketAckPacket packet = new PacketAckPacket();
+ packet.Header.Reliable = false;
+ packet.Packets = blocks.ToArray();
+
+ SendPacket(client, packet, ThrottleOutPacketType.Unknown, true);
}
}
- ///
- /// Close a client circuit. This is done in response to an exception on receive, and should not be called
- /// normally.
- ///
- /// The exception that caused the close. Can be null if there was no exception
- private void CloseCircuit(Exception e)
+ public void ResendUnacked(LLUDPClient client)
{
- uint circuit;
- lock (clientCircuits)
+ if (client.NeedAcks.Count > 0)
{
- if (clientCircuits.TryGetValue(reusedEpSender, out circuit))
+ List expiredPackets = client.NeedAcks.GetExpiredPackets(client.RTO);
+
+ if (expiredPackets != null)
{
- m_packetServer.CloseCircuit(circuit);
-
- if (e != null)
- m_log.ErrorFormat(
- "[CLIENT]: Closed circuit {0} {1} due to exception {2}", circuit, reusedEpSender, e);
+ // Resend packets
+ for (int i = 0; i < expiredPackets.Count; i++)
+ {
+ OutgoingPacket outgoingPacket = expiredPackets[i];
+
+ // FIXME: Make this an .ini setting
+ if (outgoingPacket.ResendCount < 3)
+ {
+ //Logger.Debug(String.Format("Resending packet #{0} (attempt {1}), {2}ms have passed",
+ // outgoingPacket.SequenceNumber, outgoingPacket.ResendCount, Environment.TickCount - outgoingPacket.TickCount));
+
+ // Set the resent flag
+ outgoingPacket.Buffer.Data[0] = (byte)(outgoingPacket.Buffer.Data[0] | Helpers.MSG_RESENT);
+ outgoingPacket.Category = ThrottleOutPacketType.Resend;
+
+ // The TickCount will be set to the current time when the packet
+ // is actually sent out again
+ outgoingPacket.TickCount = 0;
+
+ Interlocked.Increment(ref outgoingPacket.ResendCount);
+ //Interlocked.Increment(ref Stats.ResentPackets);
+
+ // Queue or (re)send the packet
+ if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket))
+ SendPacketFinal(outgoingPacket);
+ }
+ else
+ {
+ m_log.DebugFormat("[LLUDPSERVER]: Dropping packet #{0} for agent {1} after {2} failed attempts",
+ outgoingPacket.SequenceNumber, outgoingPacket.Client.RemoteEndPoint, outgoingPacket.ResendCount);
+
+ lock (client.NeedAcks.SyncRoot)
+ client.NeedAcks.RemoveUnsafe(outgoingPacket.SequenceNumber);
+
+ //Interlocked.Increment(ref Stats.DroppedPackets);
+
+ // Disconnect an agent if no packets are received for some time
+ //FIXME: Make 60 an .ini setting
+ if (Environment.TickCount - client.TickLastPacketReceived > 1000 * 60)
+ {
+ m_log.Warn("[LLUDPSERVER]: Ack timeout, disconnecting " + client.RemoteEndPoint);
+
+ RemoveClient(client);
+ return;
+ }
+ }
+ }
}
}
}
-
- ///
- /// Finish the process of asynchronously receiving the next bit of raw data
- ///
- /// The number of bytes received. Will return 0 if no bytes were recieved
- ///
- /// The sender of the data
- ///
- protected virtual bool EndReceive(out int numBytes, IAsyncResult result, ref EndPoint epSender)
+
+ public void Flush()
+ {
+ }
+
+ protected override void PacketReceived(UDPPacketBuffer buffer)
{
- bool hasReceivedOkay = false;
- numBytes = 0;
-
+ // Debugging/Profiling
+ //try { Thread.CurrentThread.Name = "PacketReceived (" + scene.RegionName + ")"; }
+ //catch (Exception) { }
+
+ LLUDPClient client = null;
+ Packet packet = null;
+ int packetEnd = buffer.DataLength - 1;
+ IPEndPoint address = (IPEndPoint)buffer.RemoteEndPoint;
+
+ #region Decoding
+
try
{
- numBytes = m_socket.EndReceiveFrom(result, ref epSender);
- hasReceivedOkay = true;
+ packet = Packet.BuildPacket(buffer.Data, ref packetEnd,
+ // Only allocate a buffer for zerodecoding if the packet is zerocoded
+ ((buffer.Data[0] & Helpers.MSG_ZEROCODED) != 0) ? new byte[4096] : null);
}
- catch (SocketException e)
+ catch (MalformedDataException)
{
- // TODO : Actually only handle those states that we have control over, re-throw everything else,
- // TODO: implement cases as we encounter them.
- //m_log.Error("[CLIENT]: Connection Error! - " + e.ToString());
- switch (e.SocketErrorCode)
- {
- case SocketError.AlreadyInProgress:
- return hasReceivedOkay;
+ m_log.ErrorFormat("[LLUDPSERVER]: Malformed data, cannot parse packet:\n{0}",
+ Utils.BytesToHexString(buffer.Data, buffer.DataLength, null));
+ }
+
+ // Fail-safe check
+ if (packet == null)
+ {
+ m_log.Warn("[LLUDPSERVER]: Couldn't build a message from the incoming data");
+ return;
+ }
- case SocketError.NetworkReset:
- case SocketError.ConnectionReset:
- case SocketError.OperationAborted:
- break;
+ //Stats.RecvBytes += (ulong)buffer.DataLength;
+ //++Stats.RecvPackets;
- default:
- throw;
+ #endregion Decoding
+
+ #region UseCircuitCode Handling
+
+ if (packet.Type == PacketType.UseCircuitCode)
+ {
+ UseCircuitCodePacket useCircuitCode = (UseCircuitCodePacket)packet;
+ IClientAPI newuser;
+ uint circuitCode = useCircuitCode.CircuitCode.Code;
+
+ // Check if the client is already established
+ if (!m_scene.ClientManager.TryGetClient(circuitCode, out newuser))
+ {
+ AddNewClient(useCircuitCode, (IPEndPoint)buffer.RemoteEndPoint);
}
}
- catch (ObjectDisposedException e)
+
+ // Determine which agent this packet came from
+ if (!clients.TryGetValue(address, out client))
{
- m_log.DebugFormat("[CLIENT]: ObjectDisposedException: Object {0} disposed.", e.ObjectName);
- // Uhh, what object, and why? this needs better handling.
+ m_log.Warn("[LLUDPSERVER]: Received a " + packet.Type + " packet from an unrecognized source: " + address);
+ return;
}
-
- return hasReceivedOkay;
- }
- ///
- /// Add a new client circuit.
- ///
- ///
- ///
- ///
- protected virtual void AddNewClient(UseCircuitCodePacket useCircuit, EndPoint epSender, EndPoint epProxy)
- {
- //Slave regions don't accept new clients
- if (m_localScene.RegionStatus != RegionStatus.SlaveScene)
+ #endregion UseCircuitCode Handling
+
+ //if (packet.Header.Resent)
+ // Interlocked.Increment(ref Stats.ReceivedResends);
+
+ #region ACK Receiving
+
+ int now = Environment.TickCount;
+ client.TickLastPacketReceived = now;
+
+ // Handle appended ACKs
+ if (packet.Header.AppendedAcks && packet.Header.AckList != null)
{
- AuthenticateResponse sessionInfo;
- bool isNewCircuit = false;
-
- if (!m_packetServer.IsClientAuthorized(useCircuit, m_circuitManager, out sessionInfo))
+ lock (client.NeedAcks.SyncRoot)
{
- m_log.WarnFormat(
- "[CONNECTION FAILURE]: Connection request for client {0} connecting with unnotified circuit code {1} from {2}",
- useCircuit.CircuitCode.ID, useCircuit.CircuitCode.Code, epSender);
-
- return;
+ for (int i = 0; i < packet.Header.AckList.Length; i++)
+ AcknowledgePacket(client, packet.Header.AckList[i], now, packet.Header.Resent);
}
-
- lock (clientCircuits)
+ }
+
+ // Handle PacketAck packets
+ if (packet.Type == PacketType.PacketAck)
+ {
+ PacketAckPacket ackPacket = (PacketAckPacket)packet;
+
+ lock (client.NeedAcks.SyncRoot)
{
- if (!clientCircuits.ContainsKey(epSender))
- {
- clientCircuits.Add(epSender, useCircuit.CircuitCode.Code);
- isNewCircuit = true;
- }
+ for (int i = 0; i < ackPacket.Packets.Length; i++)
+ AcknowledgePacket(client, ackPacket.Packets[i].ID, now, packet.Header.Resent);
}
+ }
- if (isNewCircuit)
- {
- // This doesn't need locking as it's synchronized data
- clientCircuits_reverse[useCircuit.CircuitCode.Code] = epSender;
+ #endregion ACK Receiving
- lock (proxyCircuits)
- {
- proxyCircuits[useCircuit.CircuitCode.Code] = epProxy;
- }
-
- m_packetServer.AddNewClient(epSender, useCircuit, sessionInfo, epProxy);
-
- //m_log.DebugFormat(
- // "[CONNECTION SUCCESS]: Incoming client {0} (circuit code {1}) received and authenticated for {2}",
- // useCircuit.CircuitCode.ID, useCircuit.CircuitCode.Code, m_localScene.RegionInfo.RegionName);
- }
+ #region ACK Sending
+
+ if (packet.Header.Reliable)
+ client.PendingAcks.Enqueue((uint)packet.Header.Sequence);
+
+ // This is a somewhat odd sequence of steps to pull the client.BytesSinceLastACK value out,
+ // add the current received bytes to it, test if 2*MTU bytes have been sent, if so remove
+ // 2*MTU bytes from the value and send ACKs, and finally add the local value back to
+ // client.BytesSinceLastACK. Lockless thread safety
+ int bytesSinceLastACK = Interlocked.Exchange(ref client.BytesSinceLastACK, 0);
+ bytesSinceLastACK += buffer.DataLength;
+ if (bytesSinceLastACK > Packet.MTU * 2)
+ {
+ bytesSinceLastACK -= Packet.MTU * 2;
+ SendAcks(client);
+ }
+ Interlocked.Add(ref client.BytesSinceLastACK, bytesSinceLastACK);
+
+ #endregion ACK Sending
+
+ #region Incoming Packet Accounting
+
+ // Check the archive of received reliable packet IDs to see whether we already received this packet
+ if (packet.Header.Reliable && !client.PacketArchive.TryEnqueue(packet.Header.Sequence))
+ {
+ if (packet.Header.Resent)
+ m_log.Debug("[LLUDPSERVER]: Received a resend of already processed packet #" + packet.Header.Sequence + ", type: " + packet.Type);
+ else
+ m_log.Warn("[LLUDPSERVER]: Received a duplicate (not marked as resend) of packet #" + packet.Header.Sequence + ", type: " + packet.Type);
+
+ // Avoid firing a callback twice for the same packet
+ return;
}
-
- // Ack the UseCircuitCode packet
- PacketAckPacket ack_it = (PacketAckPacket)PacketPool.Instance.GetPacket(PacketType.PacketAck);
- // TODO: don't create new blocks if recycling an old packet
- ack_it.Packets = new PacketAckPacket.PacketsBlock[1];
- ack_it.Packets[0] = new PacketAckPacket.PacketsBlock();
- ack_it.Packets[0].ID = useCircuit.Header.Sequence;
- // ((useCircuit.Header.Sequence < uint.MaxValue) ? useCircuit.Header.Sequence : 0) is just a failsafe to ensure that we don't overflow.
- ack_it.Header.Sequence = ((useCircuit.Header.Sequence < uint.MaxValue) ? useCircuit.Header.Sequence : 0) + 1;
- ack_it.Header.Reliable = false;
- byte[] ackmsg = ack_it.ToBytes();
+ #endregion Incoming Packet Accounting
- // Need some extra space in case we need to add proxy
- // information to the message later
- byte[] msg = new byte[4096];
- Buffer.BlockCopy(ackmsg, 0, msg, 0, ackmsg.Length);
+ // Don't bother clogging up the queue with PacketAck packets that are already handled here
+ if (packet.Type != PacketType.PacketAck)
+ {
+ // Inbox insertion
+ IncomingPacket incomingPacket;
+ incomingPacket.Client = client;
+ incomingPacket.Packet = packet;
- SendPacketTo(msg, ackmsg.Length, SocketFlags.None, useCircuit.CircuitCode.Code);
+ packetInbox.Enqueue(incomingPacket);
+ }
+ }
- PacketPool.Instance.ReturnPacket(useCircuit);
+ protected override void PacketSent(UDPPacketBuffer buffer, int bytesSent)
+ {
}
- public void ServerListener()
+ private bool IsClientAuthorized(UseCircuitCodePacket useCircuitCode, out AuthenticateResponse sessionInfo)
{
- uint newPort = listenPort;
- m_log.Info("[UDPSERVER]: Opening UDP socket on " + listenIP + " " + newPort + ".");
+ UUID agentID = useCircuitCode.CircuitCode.ID;
+ UUID sessionID = useCircuitCode.CircuitCode.SessionID;
+ uint circuitCode = useCircuitCode.CircuitCode.Code;
- ServerIncoming = new IPEndPoint(listenIP, (int)newPort);
- m_socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
- if (0 != m_clientSocketReceiveBuffer)
- m_socket.ReceiveBufferSize = m_clientSocketReceiveBuffer;
- m_socket.Bind(ServerIncoming);
- // Add flags to the UDP socket to prevent "Socket forcibly closed by host"
- // uint IOC_IN = 0x80000000;
- // uint IOC_VENDOR = 0x18000000;
- // uint SIO_UDP_CONNRESET = IOC_IN | IOC_VENDOR | 12;
- // TODO: this apparently works in .NET but not in Mono, need to sort out the right flags here.
- // m_socket.IOControl((int)SIO_UDP_CONNRESET, new byte[] { Convert.ToByte(false) }, null);
+ sessionInfo = m_circuitManager.AuthenticateSession(sessionID, agentID, circuitCode);
+ return sessionInfo.Authorised;
+ }
- listenPort = newPort;
+ private void AddNewClient(UseCircuitCodePacket useCircuitCode, IPEndPoint remoteEndPoint)
+ {
+ //Slave regions don't accept new clients
+ if (m_scene.RegionStatus != RegionStatus.SlaveScene)
+ {
+ AuthenticateResponse sessionInfo;
+ bool isNewCircuit = !clients.ContainsKey(remoteEndPoint);
- m_log.Info("[UDPSERVER]: UDP socket bound, getting ready to listen");
+ if (!IsClientAuthorized(useCircuitCode, out sessionInfo))
+ {
+ m_log.WarnFormat(
+ "[CONNECTION FAILURE]: Connection request for client {0} connecting with unnotified circuit code {1} from {2}",
+ useCircuitCode.CircuitCode.ID, useCircuitCode.CircuitCode.Code, remoteEndPoint);
+ return;
+ }
- ReceivedData = OnReceivedData;
- BeginReceive();
+ if (isNewCircuit)
+ {
+ UUID agentID = useCircuitCode.CircuitCode.ID;
+ UUID sessionID = useCircuitCode.CircuitCode.SessionID;
+ uint circuitCode = useCircuitCode.CircuitCode.Code;
- m_log.Info("[UDPSERVER]: Listening on port " + newPort);
+ AddClient(circuitCode, agentID, sessionID, remoteEndPoint, sessionInfo);
+ }
+ }
}
- public virtual void RegisterPacketServer(LLPacketServer server)
+ private void AddClient(uint circuitCode, UUID agentID, UUID sessionID, IPEndPoint remoteEndPoint, AuthenticateResponse sessionInfo)
{
- m_packetServer = server;
+ // Create the LLUDPClient
+ LLUDPClient client = new LLUDPClient(this, m_throttleRates, m_throttle, circuitCode, agentID, remoteEndPoint);
+ clients.Add(agentID, client.RemoteEndPoint, client);
+
+ // Create the IClientAPI
+ IClientAPI clientApi = new LLClientView(remoteEndPoint, m_scene, this, client, sessionInfo, agentID, sessionID, circuitCode);
+ clientApi.OnViewerEffect += m_scene.ClientManager.ViewerEffectHandler;
+ clientApi.OnLogout += LogoutHandler;
+ clientApi.OnConnectionClosed += RemoveClient;
+
+ // Give LLUDPClient a reference to IClientAPI
+ client.ClientAPI = clientApi;
+
+ // Start the IClientAPI
+ m_scene.ClientManager.Add(circuitCode, clientApi);
+ clientApi.Start();
}
- public virtual void SendPacketTo(byte[] buffer, int size, SocketFlags flags, uint circuitcode)
- //EndPoint packetSender)
+ private void AcknowledgePacket(LLUDPClient client, uint ack, int currentTime, bool fromResend)
{
- // find the endpoint for this circuit
- EndPoint sendto;
- try
- {
- sendto = (EndPoint)clientCircuits_reverse[circuitcode];
- }
- catch
+ OutgoingPacket ackedPacket;
+ if (client.NeedAcks.RemoveUnsafe(ack, out ackedPacket) && !fromResend)
{
- // Exceptions here mean there is no circuit
- m_log.Warn("[CLIENT]: Circuit not found, not sending packet");
- return;
+ // Calculate the round-trip time for this packet and its ACK
+ int rtt = currentTime - ackedPacket.TickCount;
+ if (rtt > 0)
+ client.UpdateRoundTrip(rtt);
}
+ }
+
+ private void IncomingPacketHandler()
+ {
+ IncomingPacket incomingPacket = new IncomingPacket();
+ Packet packet = null;
+ LLUDPClient client = null;
- if (sendto != null)
+ while (base.IsRunning)
{
- //we found the endpoint so send the packet to it
- if (proxyPortOffset != 0)
- {
- //MainLog.Instance.Verbose("UDPSERVER", "SendPacketTo proxy " + proxyCircuits[circuitcode].ToString() + ": client " + sendto.ToString());
- ProxyCodec.EncodeProxyMessage(buffer, ref size, sendto);
- m_socket.SendTo(buffer, size, flags, proxyCircuits[circuitcode]);
- }
- else
+ // Reset packet to null for the check below
+ packet = null;
+
+ if (packetInbox.Dequeue(100, ref incomingPacket))
{
- //MainLog.Instance.Verbose("UDPSERVER", "SendPacketTo : client " + sendto.ToString());
- try
- {
- m_socket.SendTo(buffer, size, flags, sendto);
- }
- catch (SocketException SockE)
- {
- m_log.ErrorFormat("[UDPSERVER]: Caught Socket Error in the send buffer!. {0}",SockE.ToString());
- }
+ packet = incomingPacket.Packet;
+ client = incomingPacket.Client;
+
+ if (packet != null && client != null)
+ client.ClientAPI.ProcessInPacket(packet);
}
}
+
+ if (packetInbox.Count > 0)
+ m_log.Warn("[LLUDPSERVER]: IncomingPacketHandler is shutting down, dropping " + packetInbox.Count + " packets");
+ packetInbox.Clear();
}
- public virtual void RemoveClientCircuit(uint circuitcode)
+ private void OutgoingPacketHandler()
{
- EndPoint sendto;
- if (clientCircuits_reverse.Contains(circuitcode))
+ int now = Environment.TickCount;
+ int elapsedMS = 0;
+ int elapsed100MS = 0;
+
+ while (base.IsRunning)
{
- sendto = (EndPoint)clientCircuits_reverse[circuitcode];
+ bool resendUnacked = false;
+ bool sendAcks = false;
+ bool packetSent = false;
- clientCircuits_reverse.Remove(circuitcode);
+ elapsedMS += Environment.TickCount - now;
- lock (clientCircuits)
+ // Check for packets that need to be resent every 100ms
+ if (elapsedMS >= 100)
{
- if (sendto != null)
- {
- clientCircuits.Remove(sendto);
- }
- else
- {
- m_log.DebugFormat(
- "[CLIENT]: endpoint for circuit code {0} in RemoveClientCircuit() was unexpectedly null!", circuitcode);
- }
+ resendUnacked = true;
+ elapsedMS -= 100;
+ ++elapsed100MS;
}
- lock (proxyCircuits)
+ // Check for ACKs that need to be sent out every 500ms
+ if (elapsed100MS >= 5)
{
- proxyCircuits.Remove(circuitcode);
+ sendAcks = true;
+ elapsed100MS = 0;
}
+
+ clients.ForEach(
+ delegate(LLUDPClient client)
+ {
+ if (client.DequeueOutgoing())
+ packetSent = true;
+ if (resendUnacked)
+ ResendUnacked(client);
+ if (sendAcks)
+ SendAcks(client);
+ }
+ );
+
+ if (!packetSent)
+ Thread.Sleep(20);
}
}
- public void RestoreClient(AgentCircuitData circuit, EndPoint userEP, EndPoint proxyEP)
+ private void LogoutHandler(IClientAPI client)
{
- //MainLog.Instance.Verbose("UDPSERVER", "RestoreClient");
+ client.SendLogoutPacket();
+ RemoveClient(client);
+ }
+
+ internal void SendPacketFinal(OutgoingPacket outgoingPacket)
+ {
+ UDPPacketBuffer buffer = outgoingPacket.Buffer;
+ byte flags = buffer.Data[0];
+ bool isResend = (flags & Helpers.MSG_RESENT) != 0;
+ bool isReliable = (flags & Helpers.MSG_RELIABLE) != 0;
+ LLUDPClient client = outgoingPacket.Client;
+
+ // Keep track of when this packet was sent out (right now)
+ outgoingPacket.TickCount = Environment.TickCount;
- UseCircuitCodePacket useCircuit = new UseCircuitCodePacket();
- useCircuit.CircuitCode.Code = circuit.circuitcode;
- useCircuit.CircuitCode.ID = circuit.AgentID;
- useCircuit.CircuitCode.SessionID = circuit.SessionID;
-
- AuthenticateResponse sessionInfo;
-
- if (!m_packetServer.IsClientAuthorized(useCircuit, m_circuitManager, out sessionInfo))
+ #region ACK Appending
+
+ int dataLength = buffer.DataLength;
+
+ // Keep appending ACKs until there is no room left in the packet or there are
+ // no more ACKs to append
+ uint ackCount = 0;
+ uint ack;
+ while (dataLength + 5 < buffer.Data.Length && client.PendingAcks.Dequeue(out ack))
{
- m_log.WarnFormat(
- "[CLIENT]: Restore request denied to avatar {0} connecting with unauthorized circuit code {1}",
- useCircuit.CircuitCode.ID, useCircuit.CircuitCode.Code);
-
- return;
+ Utils.UIntToBytesBig(ack, buffer.Data, dataLength);
+ dataLength += 4;
+ ++ackCount;
}
- lock (clientCircuits)
+ if (ackCount > 0)
{
- if (!clientCircuits.ContainsKey(userEP))
- clientCircuits.Add(userEP, useCircuit.CircuitCode.Code);
- else
- m_log.Error("[CLIENT]: clientCircuits already contains entry for user " + useCircuit.CircuitCode.Code + ". NOT adding.");
+ // Set the last byte of the packet equal to the number of appended ACKs
+ buffer.Data[dataLength++] = (byte)ackCount;
+ // Set the appended ACKs flag on this packet
+ buffer.Data[0] = (byte)(buffer.Data[0] | Helpers.MSG_APPENDED_ACKS);
}
- // This data structure is synchronized, so we don't need the lock
- if (!clientCircuits_reverse.ContainsKey(useCircuit.CircuitCode.Code))
- clientCircuits_reverse.Add(useCircuit.CircuitCode.Code, userEP);
- else
- m_log.Error("[CLIENT]: clientCurcuits_reverse already contains entry for user " + useCircuit.CircuitCode.Code + ". NOT adding.");
+ buffer.DataLength = dataLength;
- lock (proxyCircuits)
+ #endregion ACK Appending
+
+ if (!isResend)
{
- if (!proxyCircuits.ContainsKey(useCircuit.CircuitCode.Code))
- {
- proxyCircuits.Add(useCircuit.CircuitCode.Code, proxyEP);
- }
- else
+ // Not a resend, assign a new sequence number
+ uint sequenceNumber = (uint)Interlocked.Increment(ref client.CurrentSequence);
+ Utils.UIntToBytesBig(sequenceNumber, buffer.Data, 1);
+ outgoingPacket.SequenceNumber = sequenceNumber;
+
+ if (isReliable)
{
- // re-set proxy endpoint
- proxyCircuits.Remove(useCircuit.CircuitCode.Code);
- proxyCircuits.Add(useCircuit.CircuitCode.Code, proxyEP);
+ // Add this packet to the list of ACK responses we are waiting on from the server
+ client.NeedAcks.Add(outgoingPacket);
}
}
- m_packetServer.AddNewClient(userEP, useCircuit, sessionInfo, proxyEP);
+ // Put the UDP payload on the wire
+ AsyncBeginSend(buffer);
}
}
}
--
cgit v1.1
From e7c877407f2a72a9519eb53debca5aeef20cded9 Mon Sep 17 00:00:00 2001
From: John Hurliman
Date: Tue, 6 Oct 2009 02:38:00 -0700
Subject: * Continued work on the new LLUDP implementation. Appears to be
functioning, although not everything is reimplemented yet * Replaced logic in
ThreadTracker with a call to System.Diagnostics that does the same thing *
Added Util.StringToBytes256() and Util.StringToBytes1024() to clamp output at
byte[256] and byte[1024], respectively * Fixed formatting for a
MySQLAssetData error logging line
---
.../Region/ClientStack/LindenUDP/LLUDPServer.cs | 138 ++++++++++++++-------
1 file changed, 96 insertions(+), 42 deletions(-)
(limited to 'OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs')
diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
index 7964c50..348615e 100644
--- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
+++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
@@ -35,6 +35,7 @@ using log4net;
using Nini.Config;
using OpenMetaverse.Packets;
using OpenSim.Framework;
+using OpenSim.Framework.Statistics;
using OpenSim.Region.Framework.Scenes;
using OpenMetaverse;
@@ -190,31 +191,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP
}
}
- public void RemoveClient(LLUDPClient udpClient)
- {
- IClientAPI client;
-
- if (m_scene.ClientManager.TryGetClient(udpClient.CircuitCode, out client))
- RemoveClient(client);
- else
- m_log.Warn("[LLUDPSERVER]: Failed to lookup IClientAPI for LLUDPClient " + udpClient.AgentID);
- }
-
- public void SetClientPaused(UUID agentID, bool paused)
- {
- LLUDPClient client;
- if (clients.TryGetValue(agentID, out client))
- {
- client.IsPaused = paused;
- }
- else
- {
- m_log.Warn("[LLUDPSERVER]: Attempted to pause/unpause unknown agent " + agentID);
- }
- }
-
public void BroadcastPacket(Packet packet, ThrottleOutPacketType category, bool sendToPausedAgents, bool allowSplitting)
{
+ // CoarseLocationUpdate packets cannot be split in an automated way
+ if (packet.Type == PacketType.CoarseLocationUpdate && allowSplitting)
+ allowSplitting = false;
+
if (allowSplitting && packet.HasVariableBlocks)
{
byte[][] datas = packet.ToBytesMultiple();
@@ -251,6 +233,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP
public void SendPacket(LLUDPClient client, Packet packet, ThrottleOutPacketType category, bool allowSplitting)
{
+ // CoarseLocationUpdate packets cannot be split in an automated way
+ if (packet.Type == PacketType.CoarseLocationUpdate && allowSplitting)
+ allowSplitting = false;
+
if (allowSplitting && packet.HasVariableBlocks)
{
byte[][] datas = packet.ToBytesMultiple();
@@ -339,6 +325,13 @@ namespace OpenSim.Region.ClientStack.LindenUDP
}
}
+ public void SendPing(LLUDPClient client)
+ {
+ IClientAPI api = client.ClientAPI;
+ if (api != null)
+ api.SendStartPingCheck(client.CurrentPingSequence++);
+ }
+
public void ResendUnacked(LLUDPClient client)
{
if (client.NeedAcks.Count > 0)
@@ -387,9 +380,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
//FIXME: Make 60 an .ini setting
if (Environment.TickCount - client.TickLastPacketReceived > 1000 * 60)
{
- m_log.Warn("[LLUDPSERVER]: Ack timeout, disconnecting " + client.RemoteEndPoint);
+ m_log.Warn("[LLUDPSERVER]: Ack timeout, disconnecting " + client.ClientAPI.Name);
- RemoveClient(client);
+ RemoveClient(client.ClientAPI);
return;
}
}
@@ -590,8 +583,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
LLUDPClient client = new LLUDPClient(this, m_throttleRates, m_throttle, circuitCode, agentID, remoteEndPoint);
clients.Add(agentID, client.RemoteEndPoint, client);
- // Create the IClientAPI
- IClientAPI clientApi = new LLClientView(remoteEndPoint, m_scene, this, client, sessionInfo, agentID, sessionID, circuitCode);
+ // Create the LLClientView
+ LLClientView clientApi = new LLClientView(remoteEndPoint, m_scene, this, client, sessionInfo, agentID, sessionID, circuitCode);
clientApi.OnViewerEffect += m_scene.ClientManager.ViewerEffectHandler;
clientApi.OnLogout += LogoutHandler;
clientApi.OnConnectionClosed += RemoveClient;
@@ -618,23 +611,16 @@ namespace OpenSim.Region.ClientStack.LindenUDP
private void IncomingPacketHandler()
{
- IncomingPacket incomingPacket = new IncomingPacket();
- Packet packet = null;
- LLUDPClient client = null;
+ // Set this culture for the thread that incoming packets are received
+ // on to en-US to avoid number parsing issues
+ Culture.SetCurrentCulture();
+
+ IncomingPacket incomingPacket = default(IncomingPacket);
while (base.IsRunning)
{
- // Reset packet to null for the check below
- packet = null;
-
if (packetInbox.Dequeue(100, ref incomingPacket))
- {
- packet = incomingPacket.Packet;
- client = incomingPacket.Client;
-
- if (packet != null && client != null)
- client.ClientAPI.ProcessInPacket(packet);
- }
+ Util.FireAndForget(ProcessInPacket, incomingPacket);
}
if (packetInbox.Count > 0)
@@ -642,32 +628,98 @@ namespace OpenSim.Region.ClientStack.LindenUDP
packetInbox.Clear();
}
+ private void ProcessInPacket(object state)
+ {
+ IncomingPacket incomingPacket = (IncomingPacket)state;
+ Packet packet = incomingPacket.Packet;
+ LLUDPClient client = incomingPacket.Client;
+
+ if (packet != null && client != null)
+ {
+ try
+ {
+ client.ClientAPI.ProcessInPacket(packet);
+ }
+ catch (ThreadAbortException)
+ {
+ throw;
+ }
+ catch (Exception e)
+ {
+ if (StatsManager.SimExtraStats != null)
+ StatsManager.SimExtraStats.AddAbnormalClientThreadTermination();
+
+ // Don't let a failure in an individual client thread crash the whole sim.
+ m_log.ErrorFormat("[LLUDPSERVER]: Client thread for {0} {1} crashed. Logging them out", client.ClientAPI.Name, client.AgentID);
+ m_log.Error(e.Message, e);
+
+ try
+ {
+ // Make an attempt to alert the user that their session has crashed
+ AgentAlertMessagePacket alert = client.ClientAPI.BuildAgentAlertPacket(
+ "Unfortunately the session for this client on the server has crashed.\n" +
+ "Any further actions taken will not be processed.\n" +
+ "Please relog", true);
+
+ SendPacket(client, alert, ThrottleOutPacketType.Unknown, false);
+
+ // TODO: There may be a better way to do this. Perhaps kick? Not sure this propogates notifications to
+ // listeners yet, though.
+ client.ClientAPI.SendLogoutPacket();
+ RemoveClient(client.ClientAPI);
+ }
+ catch (ThreadAbortException)
+ {
+ throw;
+ }
+ catch (Exception e2)
+ {
+ m_log.Error("[LLUDPSERVER]: Further exception thrown on forced session logout for " + client.ClientAPI.Name);
+ m_log.Error(e2.Message, e2);
+ }
+ }
+ }
+ }
+
private void OutgoingPacketHandler()
{
+ // Set this culture for the thread that outgoing packets are sent
+ // on to en-US to avoid number parsing issues
+ Culture.SetCurrentCulture();
+
int now = Environment.TickCount;
int elapsedMS = 0;
int elapsed100MS = 0;
+ int elapsed500MS = 0;
while (base.IsRunning)
{
bool resendUnacked = false;
bool sendAcks = false;
+ bool sendPings = false;
bool packetSent = false;
elapsedMS += Environment.TickCount - now;
- // Check for packets that need to be resent every 100ms
+ // Check for pending outgoing resends every 100ms
if (elapsedMS >= 100)
{
resendUnacked = true;
elapsedMS -= 100;
++elapsed100MS;
}
- // Check for ACKs that need to be sent out every 500ms
+ // Check for pending outgoing ACKs every 500ms
if (elapsed100MS >= 5)
{
sendAcks = true;
elapsed100MS = 0;
+ ++elapsed500MS;
+ }
+ // Send pings to clients every 2000ms
+ if (elapsed500MS >= 4)
+ {
+ sendPings = true;
+ elapsed500MS = 0;
}
clients.ForEach(
@@ -679,6 +731,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
ResendUnacked(client);
if (sendAcks)
SendAcks(client);
+ if (sendPings)
+ SendPing(client);
}
);
--
cgit v1.1
From fb19d1ca0a7c6e82c540c4e8d22c82c09b7bec98 Mon Sep 17 00:00:00 2001
From: John Hurliman
Date: Tue, 6 Oct 2009 10:12:59 -0700
Subject: * Try/catch around EndInvoke() when Util.FireAndForget() returns to
catch exceptions thrown in the async method * Added packet stats handling to
the new LLUDP implementation * Attempting to avoid a race condition when
creating a new LLUDPClient
---
.../Region/ClientStack/LindenUDP/LLUDPServer.cs | 147 ++++++++++++---------
1 file changed, 81 insertions(+), 66 deletions(-)
(limited to 'OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs')
diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
index 348615e..38890da 100644
--- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
+++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
@@ -359,6 +359,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// is actually sent out again
outgoingPacket.TickCount = 0;
+ // Bump up the resend count on this packet
Interlocked.Increment(ref outgoingPacket.ResendCount);
//Interlocked.Increment(ref Stats.ResentPackets);
@@ -393,6 +394,68 @@ namespace OpenSim.Region.ClientStack.LindenUDP
public void Flush()
{
+ // FIXME: Implement?
+ }
+
+ internal void SendPacketFinal(OutgoingPacket outgoingPacket)
+ {
+ UDPPacketBuffer buffer = outgoingPacket.Buffer;
+ byte flags = buffer.Data[0];
+ bool isResend = (flags & Helpers.MSG_RESENT) != 0;
+ bool isReliable = (flags & Helpers.MSG_RELIABLE) != 0;
+ LLUDPClient client = outgoingPacket.Client;
+
+ // Keep track of when this packet was sent out (right now)
+ outgoingPacket.TickCount = Environment.TickCount;
+
+ #region ACK Appending
+
+ int dataLength = buffer.DataLength;
+
+ // Keep appending ACKs until there is no room left in the packet or there are
+ // no more ACKs to append
+ uint ackCount = 0;
+ uint ack;
+ while (dataLength + 5 < buffer.Data.Length && client.PendingAcks.Dequeue(out ack))
+ {
+ Utils.UIntToBytesBig(ack, buffer.Data, dataLength);
+ dataLength += 4;
+ ++ackCount;
+ }
+
+ if (ackCount > 0)
+ {
+ // Set the last byte of the packet equal to the number of appended ACKs
+ buffer.Data[dataLength++] = (byte)ackCount;
+ // Set the appended ACKs flag on this packet
+ buffer.Data[0] = (byte)(buffer.Data[0] | Helpers.MSG_APPENDED_ACKS);
+ }
+
+ buffer.DataLength = dataLength;
+
+ #endregion ACK Appending
+
+ if (!isResend)
+ {
+ // Not a resend, assign a new sequence number
+ uint sequenceNumber = (uint)Interlocked.Increment(ref client.CurrentSequence);
+ Utils.UIntToBytesBig(sequenceNumber, buffer.Data, 1);
+ outgoingPacket.SequenceNumber = sequenceNumber;
+
+ if (isReliable)
+ {
+ // Add this packet to the list of ACK responses we are waiting on from the server
+ client.NeedAcks.Add(outgoingPacket);
+ }
+ }
+
+ // Stats tracking
+ Interlocked.Increment(ref client.PacketsSent);
+ if (isReliable)
+ Interlocked.Add(ref client.UnackedBytes, outgoingPacket.Buffer.DataLength);
+
+ // Put the UDP payload on the wire
+ AsyncBeginSend(buffer);
}
protected override void PacketReceived(UDPPacketBuffer buffer)
@@ -456,8 +519,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
#endregion UseCircuitCode Handling
- //if (packet.Header.Resent)
- // Interlocked.Increment(ref Stats.ReceivedResends);
+ // Stats tracking
+ Interlocked.Increment(ref client.PacketsReceived);
#region ACK Receiving
@@ -581,7 +644,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{
// Create the LLUDPClient
LLUDPClient client = new LLUDPClient(this, m_throttleRates, m_throttle, circuitCode, agentID, remoteEndPoint);
- clients.Add(agentID, client.RemoteEndPoint, client);
// Create the LLClientView
LLClientView clientApi = new LLClientView(remoteEndPoint, m_scene, this, client, sessionInfo, agentID, sessionID, circuitCode);
@@ -589,12 +651,15 @@ namespace OpenSim.Region.ClientStack.LindenUDP
clientApi.OnLogout += LogoutHandler;
clientApi.OnConnectionClosed += RemoveClient;
- // Give LLUDPClient a reference to IClientAPI
- client.ClientAPI = clientApi;
-
// Start the IClientAPI
m_scene.ClientManager.Add(circuitCode, clientApi);
clientApi.Start();
+
+ // Give LLUDPClient a reference to IClientAPI
+ client.ClientAPI = clientApi;
+
+ // Add the new client to our list of tracked clients
+ clients.Add(agentID, client.RemoteEndPoint, client);
}
private void AcknowledgePacket(LLUDPClient client, uint ack, int currentTime, bool fromResend)
@@ -602,6 +667,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
OutgoingPacket ackedPacket;
if (client.NeedAcks.RemoveUnsafe(ack, out ackedPacket) && !fromResend)
{
+ // Update stats
+ Interlocked.Add(ref client.UnackedBytes, -ackedPacket.Buffer.DataLength);
+
// Calculate the round-trip time for this packet and its ACK
int rtt = currentTime - ackedPacket.TickCount;
if (rtt > 0)
@@ -650,7 +718,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
StatsManager.SimExtraStats.AddAbnormalClientThreadTermination();
// Don't let a failure in an individual client thread crash the whole sim.
- m_log.ErrorFormat("[LLUDPSERVER]: Client thread for {0} {1} crashed. Logging them out", client.ClientAPI.Name, client.AgentID);
+ m_log.ErrorFormat("[LLUDPSERVER]: Client thread for {0} crashed. Logging them out", client.AgentID);
m_log.Error(e.Message, e);
try
@@ -674,7 +742,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
}
catch (Exception e2)
{
- m_log.Error("[LLUDPSERVER]: Further exception thrown on forced session logout for " + client.ClientAPI.Name);
+ m_log.Error("[LLUDPSERVER]: Further exception thrown on forced session logout for " + client.AgentID);
m_log.Error(e2.Message, e2);
}
}
@@ -715,8 +783,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
elapsed100MS = 0;
++elapsed500MS;
}
- // Send pings to clients every 2000ms
- if (elapsed500MS >= 4)
+ // Send pings to clients every 5000ms
+ if (elapsed500MS >= 10)
{
sendPings = true;
elapsed500MS = 0;
@@ -730,7 +798,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if (resendUnacked)
ResendUnacked(client);
if (sendAcks)
+ {
SendAcks(client);
+ client.SendPacketStats();
+ }
if (sendPings)
SendPing(client);
}
@@ -746,61 +817,5 @@ namespace OpenSim.Region.ClientStack.LindenUDP
client.SendLogoutPacket();
RemoveClient(client);
}
-
- internal void SendPacketFinal(OutgoingPacket outgoingPacket)
- {
- UDPPacketBuffer buffer = outgoingPacket.Buffer;
- byte flags = buffer.Data[0];
- bool isResend = (flags & Helpers.MSG_RESENT) != 0;
- bool isReliable = (flags & Helpers.MSG_RELIABLE) != 0;
- LLUDPClient client = outgoingPacket.Client;
-
- // Keep track of when this packet was sent out (right now)
- outgoingPacket.TickCount = Environment.TickCount;
-
- #region ACK Appending
-
- int dataLength = buffer.DataLength;
-
- // Keep appending ACKs until there is no room left in the packet or there are
- // no more ACKs to append
- uint ackCount = 0;
- uint ack;
- while (dataLength + 5 < buffer.Data.Length && client.PendingAcks.Dequeue(out ack))
- {
- Utils.UIntToBytesBig(ack, buffer.Data, dataLength);
- dataLength += 4;
- ++ackCount;
- }
-
- if (ackCount > 0)
- {
- // Set the last byte of the packet equal to the number of appended ACKs
- buffer.Data[dataLength++] = (byte)ackCount;
- // Set the appended ACKs flag on this packet
- buffer.Data[0] = (byte)(buffer.Data[0] | Helpers.MSG_APPENDED_ACKS);
- }
-
- buffer.DataLength = dataLength;
-
- #endregion ACK Appending
-
- if (!isResend)
- {
- // Not a resend, assign a new sequence number
- uint sequenceNumber = (uint)Interlocked.Increment(ref client.CurrentSequence);
- Utils.UIntToBytesBig(sequenceNumber, buffer.Data, 1);
- outgoingPacket.SequenceNumber = sequenceNumber;
-
- if (isReliable)
- {
- // Add this packet to the list of ACK responses we are waiting on from the server
- client.NeedAcks.Add(outgoingPacket);
- }
- }
-
- // Put the UDP payload on the wire
- AsyncBeginSend(buffer);
- }
}
}
--
cgit v1.1
From 61b537215328499155c58f46e6338d459aba87ec Mon Sep 17 00:00:00 2001
From: John Hurliman
Date: Tue, 6 Oct 2009 12:13:16 -0700
Subject: * Added missing references to prebuild.xml and commented out the
LindenUDP tests until a new test harness is written * Clients are no longer
disconnected when a packet handler crashes. We'll see how this works out in
practice * Added documentation and cleanup, getting ready for the first
public push * Deleted an old LLUDP file
---
.../Region/ClientStack/LindenUDP/LLUDPServer.cs | 103 +++++++++------------
1 file changed, 43 insertions(+), 60 deletions(-)
(limited to 'OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs')
diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
index 38890da..c0a84a8 100644
--- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
+++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
@@ -41,7 +41,10 @@ using OpenMetaverse;
namespace OpenSim.Region.ClientStack.LindenUDP
{
- public class LLUDPServerShim : IClientNetworkServer
+ ///
+ /// A shim around LLUDPServer that implements the IClientNetworkServer interface
+ ///
+ public sealed class LLUDPServerShim : IClientNetworkServer
{
LLUDPServer m_udpServer;
@@ -80,6 +83,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP
}
}
+ ///
+ /// The LLUDP server for a region. This handles incoming and outgoing
+ /// packets for all UDP connections to the region
+ ///
public class LLUDPServer : UDPBase
{
private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
@@ -152,6 +159,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
public new void Stop()
{
+ m_log.Info("[LLUDPSERVER]: Shutting down the LLUDP server for " + m_scene.RegionInfo.RegionName);
base.Stop();
}
@@ -591,11 +599,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if (packet.Type != PacketType.PacketAck)
{
// Inbox insertion
- IncomingPacket incomingPacket;
- incomingPacket.Client = client;
- incomingPacket.Packet = packet;
-
- packetInbox.Enqueue(incomingPacket);
+ packetInbox.Enqueue(new IncomingPacket(client, packet));
}
}
@@ -683,7 +687,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// on to en-US to avoid number parsing issues
Culture.SetCurrentCulture();
- IncomingPacket incomingPacket = default(IncomingPacket);
+ IncomingPacket incomingPacket = null;
while (base.IsRunning)
{
@@ -696,59 +700,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
packetInbox.Clear();
}
- private void ProcessInPacket(object state)
- {
- IncomingPacket incomingPacket = (IncomingPacket)state;
- Packet packet = incomingPacket.Packet;
- LLUDPClient client = incomingPacket.Client;
-
- if (packet != null && client != null)
- {
- try
- {
- client.ClientAPI.ProcessInPacket(packet);
- }
- catch (ThreadAbortException)
- {
- throw;
- }
- catch (Exception e)
- {
- if (StatsManager.SimExtraStats != null)
- StatsManager.SimExtraStats.AddAbnormalClientThreadTermination();
-
- // Don't let a failure in an individual client thread crash the whole sim.
- m_log.ErrorFormat("[LLUDPSERVER]: Client thread for {0} crashed. Logging them out", client.AgentID);
- m_log.Error(e.Message, e);
-
- try
- {
- // Make an attempt to alert the user that their session has crashed
- AgentAlertMessagePacket alert = client.ClientAPI.BuildAgentAlertPacket(
- "Unfortunately the session for this client on the server has crashed.\n" +
- "Any further actions taken will not be processed.\n" +
- "Please relog", true);
-
- SendPacket(client, alert, ThrottleOutPacketType.Unknown, false);
-
- // TODO: There may be a better way to do this. Perhaps kick? Not sure this propogates notifications to
- // listeners yet, though.
- client.ClientAPI.SendLogoutPacket();
- RemoveClient(client.ClientAPI);
- }
- catch (ThreadAbortException)
- {
- throw;
- }
- catch (Exception e2)
- {
- m_log.Error("[LLUDPSERVER]: Further exception thrown on forced session logout for " + client.AgentID);
- m_log.Error(e2.Message, e2);
- }
- }
- }
- }
-
private void OutgoingPacketHandler()
{
// Set this culture for the thread that outgoing packets are sent
@@ -812,6 +763,38 @@ namespace OpenSim.Region.ClientStack.LindenUDP
}
}
+ private void ProcessInPacket(object state)
+ {
+ IncomingPacket incomingPacket = (IncomingPacket)state;
+ Packet packet = incomingPacket.Packet;
+ LLUDPClient client = incomingPacket.Client;
+
+ // Sanity check
+ if (packet == null || client == null || client.ClientAPI == null)
+ {
+ m_log.WarnFormat("[LLUDPSERVER]: Processing a packet with incomplete state. Packet=\"{0}\", Client=\"{1}\", Client.ClientAPI=\"{2}\"",
+ packet, client, (client != null) ? client.ClientAPI : null);
+ }
+
+ try
+ {
+ // Process this packet
+ client.ClientAPI.ProcessInPacket(packet);
+ }
+ catch (ThreadAbortException)
+ {
+ // If something is trying to abort the packet processing thread, take that as a hint that it's time to shut down
+ m_log.Info("[LLUDPSERVER]: Caught a thread abort, shutting down the LLUDP server");
+ Stop();
+ }
+ catch (Exception e)
+ {
+ // Don't let a failure in an individual client thread crash the whole sim.
+ m_log.ErrorFormat("[LLUDPSERVER]: Client packet handler for {0} for packet {1} threw an exception", client.AgentID, packet.Type);
+ m_log.Error(e.Message, e);
+ }
+ }
+
private void LogoutHandler(IClientAPI client)
{
client.SendLogoutPacket();
--
cgit v1.1
From 312438f145aa7286df365e37df423e743dd85db0 Mon Sep 17 00:00:00 2001
From: John Hurliman
Date: Tue, 6 Oct 2009 15:55:39 -0700
Subject: Commented noisy debugging about packet splitting
---
OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
(limited to 'OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs')
diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
index c0a84a8..2c5ad85 100644
--- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
+++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
@@ -210,8 +210,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
byte[][] datas = packet.ToBytesMultiple();
int packetCount = datas.Length;
- if (packetCount > 1)
- m_log.Debug("[LLUDPSERVER]: Split " + packet.Type + " packet into " + packetCount + " packets");
+ //if (packetCount > 1)
+ // m_log.Debug("[LLUDPSERVER]: Split " + packet.Type + " packet into " + packetCount + " packets");
for (int i = 0; i < packetCount; i++)
{
@@ -250,8 +250,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
byte[][] datas = packet.ToBytesMultiple();
int packetCount = datas.Length;
- if (packetCount > 1)
- m_log.Debug("[LLUDPSERVER]: Split " + packet.Type + " packet into " + packetCount + " packets");
+ //if (packetCount > 1)
+ // m_log.Debug("[LLUDPSERVER]: Split " + packet.Type + " packet into " + packetCount + " packets");
for (int i = 0; i < packetCount; i++)
{
--
cgit v1.1