From e7c877407f2a72a9519eb53debca5aeef20cded9 Mon Sep 17 00:00:00 2001
From: John Hurliman
Date: Tue, 6 Oct 2009 02:38:00 -0700
Subject: * Continued work on the new LLUDP implementation. Appears to be
functioning, although not everything is reimplemented yet * Replaced logic in
ThreadTracker with a call to System.Diagnostics that does the same thing *
Added Util.StringToBytes256() and Util.StringToBytes1024() to clamp output at
byte[256] and byte[1024], respectively * Fixed formatting for a
MySQLAssetData error logging line
---
.../Region/ClientStack/LindenUDP/LLUDPClient.cs | 370 +++++++++++++++++++++
1 file changed, 370 insertions(+)
create mode 100644 OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs
(limited to 'OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs')
diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs
new file mode 100644
index 0000000..ad01135
--- /dev/null
+++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs
@@ -0,0 +1,370 @@
+/*
+ * Copyright (c) Contributors, http://opensimulator.org/
+ * See CONTRIBUTORS.TXT for a full list of copyright holders.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of the OpenSimulator Project nor the
+ * names of its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net;
+using OpenSim.Framework;
+using OpenMetaverse;
+
+namespace OpenSim.Region.ClientStack.LindenUDP
+{
+ public delegate void QueueEmpty(ThrottleOutPacketType category);
+
+ public class LLUDPClient
+ {
+ /// The number of packet categories to throttle on. If a throttle category is added
+ /// or removed, this number must also change
+ const int THROTTLE_CATEGORY_COUNT = 7;
+
+ public event QueueEmpty OnQueueEmpty;
+
+ /// AgentID for this client
+ public readonly UUID AgentID;
+ /// The remote address of the connected client
+ public readonly IPEndPoint RemoteEndPoint;
+ /// Circuit code that this client is connected on
+ public readonly uint CircuitCode;
+ /// Sequence numbers of packets we've received (for duplicate checking)
+ public readonly IncomingPacketHistoryCollection PacketArchive = new IncomingPacketHistoryCollection(200);
+ /// Packets we have sent that need to be ACKed by the client
+ public readonly UnackedPacketCollection NeedAcks = new UnackedPacketCollection();
+ /// ACKs that are queued up, waiting to be sent to the client
+ public readonly LocklessQueue PendingAcks = new LocklessQueue();
+
+ /// Reference to the IClientAPI for this client
+ public LLClientView ClientAPI;
+ /// Current packet sequence number
+ public int CurrentSequence;
+ /// Current ping sequence number
+ public byte CurrentPingSequence;
+ /// True when this connection is alive, otherwise false
+ public bool IsConnected = true;
+ /// True when this connection is paused, otherwise false
+ public bool IsPaused = true;
+ /// Environment.TickCount when the last packet was received for this client
+ public int TickLastPacketReceived;
+
+ /// Timer granularity. This is set to the measured resolution of Environment.TickCount
+ public readonly float G;
+ /// Smoothed round-trip time. A smoothed average of the round-trip time for sending a
+ /// reliable packet to the client and receiving an ACK
+ public float SRTT;
+ /// Round-trip time variance. Measures the consistency of round-trip times
+ public float RTTVAR;
+ /// Retransmission timeout. Packets that have not been acknowledged in this number of
+ /// milliseconds or longer will be resent
+ /// Calculated from and using the
+ /// guidelines in RFC 2988
+ public int RTO;
+ /// Number of bytes received since the last acknowledgement was sent out. This is used
+ /// to loosely follow the TCP delayed ACK algorithm in RFC 1122 (4.2.3.2)
+ public int BytesSinceLastACK;
+
+ /// Throttle bucket for this agent's connection
+ private readonly TokenBucket throttle;
+ /// Throttle buckets for each packet category
+ private readonly TokenBucket[] throttleCategories;
+ /// Throttle rate defaults and limits
+ private readonly ThrottleRates defaultThrottleRates;
+ /// Outgoing queues for throttled packets
+ private readonly LocklessQueue[] packetOutboxes = new LocklessQueue[THROTTLE_CATEGORY_COUNT];
+ /// A container that can hold one packet for each outbox, used to store
+ /// dequeued packets that are being held for throttling
+ private readonly OutgoingPacket[] nextPackets = new OutgoingPacket[THROTTLE_CATEGORY_COUNT];
+ /// An optimization to store the length of dequeued packets being held
+ /// for throttling. This avoids expensive calls to Packet.Length
+ private readonly int[] nextPacketLengths = new int[THROTTLE_CATEGORY_COUNT];
+ /// A reference to the LLUDPServer that is managing this client
+ private readonly LLUDPServer udpServer;
+
+ public LLUDPClient(LLUDPServer server, ThrottleRates rates, TokenBucket parentThrottle, uint circuitCode, UUID agentID, IPEndPoint remoteEndPoint)
+ {
+ udpServer = server;
+ AgentID = agentID;
+ RemoteEndPoint = remoteEndPoint;
+ CircuitCode = circuitCode;
+ defaultThrottleRates = rates;
+
+ for (int i = 0; i < THROTTLE_CATEGORY_COUNT; i++)
+ packetOutboxes[i] = new LocklessQueue();
+
+ throttle = new TokenBucket(parentThrottle, 0, 0);
+ throttleCategories = new TokenBucket[THROTTLE_CATEGORY_COUNT];
+ throttleCategories[(int)ThrottleOutPacketType.Resend] = new TokenBucket(throttle, rates.ResendLimit, rates.Resend);
+ throttleCategories[(int)ThrottleOutPacketType.Land] = new TokenBucket(throttle, rates.LandLimit, rates.Land);
+ throttleCategories[(int)ThrottleOutPacketType.Wind] = new TokenBucket(throttle, rates.WindLimit, rates.Wind);
+ throttleCategories[(int)ThrottleOutPacketType.Cloud] = new TokenBucket(throttle, rates.CloudLimit, rates.Cloud);
+ throttleCategories[(int)ThrottleOutPacketType.Task] = new TokenBucket(throttle, rates.TaskLimit, rates.Task);
+ throttleCategories[(int)ThrottleOutPacketType.Texture] = new TokenBucket(throttle, rates.TextureLimit, rates.Texture);
+ throttleCategories[(int)ThrottleOutPacketType.Asset] = new TokenBucket(throttle, rates.AssetLimit, rates.Asset);
+
+ // Set the granularity variable used for retransmission calculations to
+ // the measured resolution of Environment.TickCount
+ G = server.TickCountResolution;
+
+ // Default the retransmission timeout to three seconds
+ RTO = 3000;
+ }
+
+ public void Shutdown()
+ {
+ IsConnected = false;
+ }
+
+ public ClientInfo GetClientInfo()
+ {
+ // TODO: This data structure is wrong in so many ways
+ ClientInfo info = new ClientInfo();
+ info.pendingAcks = new Dictionary();
+ info.needAck = new Dictionary();
+
+ info.resendThrottle = throttleCategories[(int)ThrottleOutPacketType.Resend].DripRate;
+ info.landThrottle = throttleCategories[(int)ThrottleOutPacketType.Land].DripRate;
+ info.windThrottle = throttleCategories[(int)ThrottleOutPacketType.Wind].DripRate;
+ info.cloudThrottle = throttleCategories[(int)ThrottleOutPacketType.Cloud].DripRate;
+ info.taskThrottle = throttleCategories[(int)ThrottleOutPacketType.Task].DripRate;
+ info.assetThrottle = throttleCategories[(int)ThrottleOutPacketType.Asset].DripRate;
+ info.textureThrottle = throttleCategories[(int)ThrottleOutPacketType.Texture].DripRate;
+ info.totalThrottle = info.resendThrottle + info.landThrottle + info.windThrottle + info.cloudThrottle +
+ info.taskThrottle + info.assetThrottle + info.textureThrottle;
+
+ return info;
+ }
+
+ public void SetClientInfo(ClientInfo info)
+ {
+ }
+
+ public string GetStats()
+ {
+ return string.Format("{0,7} {1,7} {2,7} {3,7} {4,7} {5,7} {6,7} {7,7} {8,7} {9,7}",
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0);
+ }
+
+ public void SetThrottles(byte[] throttleData)
+ {
+ byte[] adjData;
+ int pos = 0;
+
+ if (!BitConverter.IsLittleEndian)
+ {
+ byte[] newData = new byte[7 * 4];
+ Buffer.BlockCopy(throttleData, 0, newData, 0, 7 * 4);
+
+ for (int i = 0; i < 7; i++)
+ Array.Reverse(newData, i * 4, 4);
+
+ adjData = newData;
+ }
+ else
+ {
+ adjData = throttleData;
+ }
+
+ int resend = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4;
+ int land = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4;
+ int wind = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4;
+ int cloud = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4;
+ int task = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4;
+ int texture = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4;
+ int asset = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f);
+
+ resend = (resend <= defaultThrottleRates.ResendLimit) ? resend : defaultThrottleRates.ResendLimit;
+ land = (land <= defaultThrottleRates.LandLimit) ? land : defaultThrottleRates.LandLimit;
+ wind = (wind <= defaultThrottleRates.WindLimit) ? wind : defaultThrottleRates.WindLimit;
+ cloud = (cloud <= defaultThrottleRates.CloudLimit) ? cloud : defaultThrottleRates.CloudLimit;
+ task = (task <= defaultThrottleRates.TaskLimit) ? task : defaultThrottleRates.TaskLimit;
+ texture = (texture <= defaultThrottleRates.TextureLimit) ? texture : defaultThrottleRates.TextureLimit;
+ asset = (asset <= defaultThrottleRates.AssetLimit) ? asset : defaultThrottleRates.AssetLimit;
+
+ SetThrottle(ThrottleOutPacketType.Resend, resend);
+ SetThrottle(ThrottleOutPacketType.Land, land);
+ SetThrottle(ThrottleOutPacketType.Wind, wind);
+ SetThrottle(ThrottleOutPacketType.Cloud, cloud);
+ SetThrottle(ThrottleOutPacketType.Task, task);
+ SetThrottle(ThrottleOutPacketType.Texture, texture);
+ SetThrottle(ThrottleOutPacketType.Asset, asset);
+ }
+
+ public byte[] GetThrottlesPacked()
+ {
+ byte[] data = new byte[7 * 4];
+ int i = 0;
+
+ Buffer.BlockCopy(Utils.FloatToBytes((float)throttleCategories[(int)ThrottleOutPacketType.Resend].DripRate), 0, data, i, 4); i += 4;
+ Buffer.BlockCopy(Utils.FloatToBytes((float)throttleCategories[(int)ThrottleOutPacketType.Land].DripRate), 0, data, i, 4); i += 4;
+ Buffer.BlockCopy(Utils.FloatToBytes((float)throttleCategories[(int)ThrottleOutPacketType.Wind].DripRate), 0, data, i, 4); i += 4;
+ Buffer.BlockCopy(Utils.FloatToBytes((float)throttleCategories[(int)ThrottleOutPacketType.Cloud].DripRate), 0, data, i, 4); i += 4;
+ Buffer.BlockCopy(Utils.FloatToBytes((float)throttleCategories[(int)ThrottleOutPacketType.Task].DripRate), 0, data, i, 4); i += 4;
+ Buffer.BlockCopy(Utils.FloatToBytes((float)throttleCategories[(int)ThrottleOutPacketType.Texture].DripRate), 0, data, i, 4); i += 4;
+ Buffer.BlockCopy(Utils.FloatToBytes((float)throttleCategories[(int)ThrottleOutPacketType.Asset].DripRate), 0, data, i, 4); i += 4;
+
+ return data;
+ }
+
+ public void SetThrottle(ThrottleOutPacketType category, int rate)
+ {
+ int i = (int)category;
+ if (i >= 0 && i < throttleCategories.Length)
+ {
+ TokenBucket bucket = throttleCategories[(int)category];
+ bucket.MaxBurst = rate;
+ bucket.DripRate = rate;
+ }
+ }
+
+ public bool EnqueueOutgoing(OutgoingPacket packet)
+ {
+ int category = (int)packet.Category;
+
+ if (category >= 0 && category < packetOutboxes.Length)
+ {
+ LocklessQueue queue = packetOutboxes[category];
+ TokenBucket bucket = throttleCategories[category];
+
+ if (throttleCategories[category].RemoveTokens(packet.Buffer.DataLength))
+ {
+ // Enough tokens were removed from the bucket, the packet will not be queued
+ return false;
+ }
+ else
+ {
+ // Not enough tokens in the bucket, queue this packet
+ queue.Enqueue(packet);
+ return true;
+ }
+ }
+ else
+ {
+ // We don't have a token bucket for this category, so it will not be queued
+ return false;
+ }
+ }
+
+ ///
+ /// Loops through all of the packet queues for this client and tries to send
+ /// any outgoing packets, obeying the throttling bucket limits
+ ///
+ /// This function is only called from a synchronous loop in the
+ /// UDPServer so we don't need to bother making this thread safe
+ /// True if any packets were sent, otherwise false
+ public bool DequeueOutgoing()
+ {
+ OutgoingPacket packet;
+ LocklessQueue queue;
+ TokenBucket bucket;
+ bool packetSent = false;
+
+ for (int i = 0; i < THROTTLE_CATEGORY_COUNT; i++)
+ {
+ bucket = throttleCategories[i];
+
+ if (nextPackets[i] != null)
+ {
+ // This bucket was empty the last time we tried to send a packet,
+ // leaving a dequeued packet still waiting to be sent out. Try to
+ // send it again
+ if (bucket.RemoveTokens(nextPacketLengths[i]))
+ {
+ // Send the packet
+ udpServer.SendPacketFinal(nextPackets[i]);
+ nextPackets[i] = null;
+ packetSent = true;
+ }
+ }
+ else
+ {
+ // No dequeued packet waiting to be sent, try to pull one off
+ // this queue
+ queue = packetOutboxes[i];
+ if (queue.Dequeue(out packet))
+ {
+ // A packet was pulled off the queue. See if we have
+ // enough tokens in the bucket to send it out
+ if (bucket.RemoveTokens(packet.Buffer.DataLength))
+ {
+ // Send the packet
+ udpServer.SendPacketFinal(packet);
+ packetSent = true;
+ }
+ else
+ {
+ // Save the dequeued packet and the length calculation for
+ // the next iteration
+ nextPackets[i] = packet;
+ nextPacketLengths[i] = packet.Buffer.DataLength;
+ }
+ }
+ else
+ {
+ // No packets in this queue. Fire the queue empty callback
+ QueueEmpty callback = OnQueueEmpty;
+ if (callback != null)
+ callback((ThrottleOutPacketType)i);
+ }
+ }
+ }
+
+ return packetSent;
+ }
+
+ public void UpdateRoundTrip(float r)
+ {
+ const float ALPHA = 0.125f;
+ const float BETA = 0.25f;
+ const float K = 4.0f;
+
+ if (RTTVAR == 0.0f)
+ {
+ // First RTT measurement
+ SRTT = r;
+ RTTVAR = r * 0.5f;
+ }
+ else
+ {
+ // Subsequence RTT measurement
+ RTTVAR = (1.0f - BETA) * RTTVAR + BETA * Math.Abs(SRTT - r);
+ SRTT = (1.0f - ALPHA) * SRTT + ALPHA * r;
+ }
+
+ // Always round retransmission timeout up to two seconds
+ RTO = Math.Max(2000, (int)(SRTT + Math.Max(G, K * RTTVAR)));
+ //Logger.Debug("Setting agent " + this.Agent.FullName + "'s RTO to " + RTO + "ms with an RTTVAR of " +
+ // RTTVAR + " based on new RTT of " + r + "ms");
+ }
+ }
+}
--
cgit v1.1