From 2f436875899f432a88f432ab86a6858b3a4cc373 Mon Sep 17 00:00:00 2001 From: Mic Bowman Date: Mon, 11 Apr 2011 09:06:28 -0700 Subject: New tokenbucket algorithm. This one provides fair sharing of the queues when client and simulator throttles are set. This algorithm also uses pre-defined burst rate of 150% of the sustained rate for each of the throttles. Removed the "state" queue. The state queue is not a Linden queue and appeared to be used just to get kill packets sent. --- .../Region/ClientStack/LindenUDP/LLClientView.cs | 6 +- .../Region/ClientStack/LindenUDP/LLUDPClient.cs | 96 ++++--- .../Region/ClientStack/LindenUDP/LLUDPServer.cs | 2 +- .../Region/ClientStack/LindenUDP/TokenBucket.cs | 300 ++++++++++++++------- 4 files changed, 259 insertions(+), 145 deletions(-) diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs index 8de31d7..934a2d5 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs @@ -1610,7 +1610,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP } else { - OutPacket(kill, ThrottleOutPacketType.State); + // OutPacket(kill, ThrottleOutPacketType.State); + OutPacket(kill, ThrottleOutPacketType.Task); } } @@ -2440,7 +2441,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP packet.Effect = effectBlocks; - OutPacket(packet, ThrottleOutPacketType.State); + // OutPacket(packet, ThrottleOutPacketType.State); + OutPacket(packet, ThrottleOutPacketType.Task); } public void SendAvatarProperties(UUID avatarID, string aboutText, string bornOn, Byte[] charterMember, diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs index 9a8bfd3..5a69851 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs @@ -135,7 +135,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP private int m_nextOnQueueEmpty = 1; /// Throttle bucket for this agent's connection - private readonly TokenBucket m_throttle; + private readonly TokenBucket m_throttleClient; + /// Throttle bucket for this agent's connection + private readonly TokenBucket m_throttleCategory; /// Throttle buckets for each packet category private readonly TokenBucket[] m_throttleCategories; /// Outgoing queues for throttled packets @@ -174,7 +176,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP m_maxRTO = maxRTO; // Create a token bucket throttle for this client that has the scene token bucket as a parent - m_throttle = new TokenBucket(parentThrottle, rates.TotalLimit, rates.Total); + m_throttleClient = new TokenBucket(parentThrottle, rates.TotalLimit); + // Create a token bucket throttle for the total categary with the client bucket as a throttle + m_throttleCategory = new TokenBucket(m_throttleClient, rates.TotalLimit); // Create an array of token buckets for this clients different throttle categories m_throttleCategories = new TokenBucket[THROTTLE_CATEGORY_COUNT]; @@ -185,7 +189,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP // Initialize the packet outboxes, where packets sit while they are waiting for tokens m_packetOutboxes[i] = new OpenSim.Framework.LocklessQueue(); // Initialize the token buckets that control the throttling for each category - m_throttleCategories[i] = new TokenBucket(m_throttle, rates.GetLimit(type), rates.GetRate(type)); + m_throttleCategories[i] = new TokenBucket(m_throttleCategory, rates.GetLimit(type)); } // Default the retransmission timeout to three seconds @@ -206,6 +210,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP m_packetOutboxes[i].Clear(); m_nextPackets[i] = null; } + + // pull the throttle out of the scene throttle + m_throttleClient.Parent.UnregisterRequest(m_throttleClient); OnPacketStats = null; OnQueueEmpty = null; } @@ -216,6 +223,26 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Information about the client connection public ClientInfo GetClientInfo() { +/// + TokenBucket tb; + + tb = m_throttleClient.Parent; + m_log.WarnFormat("[TOKENS] {3}: Actual={0},Request={1},TotalRequest={2}",tb.DripRate,tb.RequestedDripRate,tb.TotalDripRequest,"ROOT"); + + tb = m_throttleClient; + m_log.WarnFormat("[TOKENS] {3}: Actual={0},Request={1},TotalRequest={2}",tb.DripRate,tb.RequestedDripRate,tb.TotalDripRequest," CLIENT"); + + tb = m_throttleCategory; + m_log.WarnFormat("[TOKENS] {3}: Actual={0},Request={1},TotalRequest={2}",tb.DripRate,tb.RequestedDripRate,tb.TotalDripRequest," CATEGORY"); + + for (int i = 0; i < THROTTLE_CATEGORY_COUNT; i++) + { + tb = m_throttleCategories[i]; + m_log.WarnFormat("[TOKENS] {4} <{0}:{1}>: Actual={2},Requested={3}",AgentID,i,tb.DripRate,tb.RequestedDripRate," BUCKET"); + } + +/// + // TODO: This data structure is wrong in so many ways. Locking and copying the entire lists // of pending and needed ACKs for every client every time some method wants information about // this connection is a recipe for poor performance @@ -223,13 +250,14 @@ namespace OpenSim.Region.ClientStack.LindenUDP info.pendingAcks = new Dictionary(); info.needAck = new Dictionary(); - info.resendThrottle = m_throttleCategories[(int)ThrottleOutPacketType.Resend].DripRate; - info.landThrottle = m_throttleCategories[(int)ThrottleOutPacketType.Land].DripRate; - info.windThrottle = m_throttleCategories[(int)ThrottleOutPacketType.Wind].DripRate; - info.cloudThrottle = m_throttleCategories[(int)ThrottleOutPacketType.Cloud].DripRate; - info.taskThrottle = m_throttleCategories[(int)ThrottleOutPacketType.State].DripRate + m_throttleCategories[(int)ThrottleOutPacketType.Task].DripRate; - info.assetThrottle = m_throttleCategories[(int)ThrottleOutPacketType.Asset].DripRate; - info.textureThrottle = m_throttleCategories[(int)ThrottleOutPacketType.Texture].DripRate; + info.resendThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Resend].DripRate; + info.landThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Land].DripRate; + info.windThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Wind].DripRate; + info.cloudThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Cloud].DripRate; + // info.taskThrottle = m_throttleCategories[(int)ThrottleOutPacketType.State].DripRate + m_throttleCategories[(int)ThrottleOutPacketType.Task].DripRate; + info.taskThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Task].DripRate; + info.assetThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Asset].DripRate; + info.textureThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Texture].DripRate; info.totalThrottle = info.resendThrottle + info.landThrottle + info.windThrottle + info.cloudThrottle + info.taskThrottle + info.assetThrottle + info.textureThrottle; @@ -317,8 +345,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP int texture = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4; int asset = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); // State is a subcategory of task that we allocate a percentage to - int state = (int)((float)task * STATE_TASK_PERCENTAGE); - task -= state; + int state = 0; + // int state = (int)((float)task * STATE_TASK_PERCENTAGE); + // task -= state; // Make sure none of the throttles are set below our packet MTU, // otherwise a throttle could become permanently clogged @@ -339,40 +368,32 @@ namespace OpenSim.Region.ClientStack.LindenUDP // Update the token buckets with new throttle values TokenBucket bucket; - bucket = m_throttle; - bucket.MaxBurst = total; + bucket = m_throttleCategory; + bucket.RequestedDripRate = total; bucket = m_throttleCategories[(int)ThrottleOutPacketType.Resend]; - bucket.DripRate = resend; - bucket.MaxBurst = resend; + bucket.RequestedDripRate = resend; bucket = m_throttleCategories[(int)ThrottleOutPacketType.Land]; - bucket.DripRate = land; - bucket.MaxBurst = land; + bucket.RequestedDripRate = land; bucket = m_throttleCategories[(int)ThrottleOutPacketType.Wind]; - bucket.DripRate = wind; - bucket.MaxBurst = wind; + bucket.RequestedDripRate = wind; bucket = m_throttleCategories[(int)ThrottleOutPacketType.Cloud]; - bucket.DripRate = cloud; - bucket.MaxBurst = cloud; + bucket.RequestedDripRate = cloud; bucket = m_throttleCategories[(int)ThrottleOutPacketType.Asset]; - bucket.DripRate = asset; - bucket.MaxBurst = asset; + bucket.RequestedDripRate = asset; bucket = m_throttleCategories[(int)ThrottleOutPacketType.Task]; - bucket.DripRate = task + state; - bucket.MaxBurst = task + state; + bucket.RequestedDripRate = task; bucket = m_throttleCategories[(int)ThrottleOutPacketType.State]; - bucket.DripRate = state; - bucket.MaxBurst = state; + bucket.RequestedDripRate = state; bucket = m_throttleCategories[(int)ThrottleOutPacketType.Texture]; - bucket.DripRate = texture; - bucket.MaxBurst = texture; + bucket.RequestedDripRate = texture; // Reset the packed throttles cached data m_packedThrottles = null; @@ -387,14 +408,13 @@ namespace OpenSim.Region.ClientStack.LindenUDP data = new byte[7 * 4]; int i = 0; - Buffer.BlockCopy(Utils.FloatToBytes((float)m_throttleCategories[(int)ThrottleOutPacketType.Resend].DripRate), 0, data, i, 4); i += 4; - Buffer.BlockCopy(Utils.FloatToBytes((float)m_throttleCategories[(int)ThrottleOutPacketType.Land].DripRate), 0, data, i, 4); i += 4; - Buffer.BlockCopy(Utils.FloatToBytes((float)m_throttleCategories[(int)ThrottleOutPacketType.Wind].DripRate), 0, data, i, 4); i += 4; - Buffer.BlockCopy(Utils.FloatToBytes((float)m_throttleCategories[(int)ThrottleOutPacketType.Cloud].DripRate), 0, data, i, 4); i += 4; - Buffer.BlockCopy(Utils.FloatToBytes((float)(m_throttleCategories[(int)ThrottleOutPacketType.Task].DripRate) + - m_throttleCategories[(int)ThrottleOutPacketType.State].DripRate), 0, data, i, 4); i += 4; - Buffer.BlockCopy(Utils.FloatToBytes((float)m_throttleCategories[(int)ThrottleOutPacketType.Texture].DripRate), 0, data, i, 4); i += 4; - Buffer.BlockCopy(Utils.FloatToBytes((float)m_throttleCategories[(int)ThrottleOutPacketType.Asset].DripRate), 0, data, i, 4); i += 4; + Buffer.BlockCopy(Utils.FloatToBytes((float)m_throttleCategories[(int)ThrottleOutPacketType.Resend].RequestedDripRate), 0, data, i, 4); i += 4; + Buffer.BlockCopy(Utils.FloatToBytes((float)m_throttleCategories[(int)ThrottleOutPacketType.Land].RequestedDripRate), 0, data, i, 4); i += 4; + Buffer.BlockCopy(Utils.FloatToBytes((float)m_throttleCategories[(int)ThrottleOutPacketType.Wind].RequestedDripRate), 0, data, i, 4); i += 4; + Buffer.BlockCopy(Utils.FloatToBytes((float)m_throttleCategories[(int)ThrottleOutPacketType.Cloud].RequestedDripRate), 0, data, i, 4); i += 4; + Buffer.BlockCopy(Utils.FloatToBytes((float)m_throttleCategories[(int)ThrottleOutPacketType.Task].RequestedDripRate), 0, data, i, 4); i += 4; + Buffer.BlockCopy(Utils.FloatToBytes((float)m_throttleCategories[(int)ThrottleOutPacketType.Texture].RequestedDripRate), 0, data, i, 4); i += 4; + Buffer.BlockCopy(Utils.FloatToBytes((float)m_throttleCategories[(int)ThrottleOutPacketType.Asset].RequestedDripRate), 0, data, i, 4); i += 4; m_packedThrottles = data; } diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs index 583214c..d08b25f 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs @@ -228,7 +228,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP } #endregion BinaryStats - m_throttle = new TokenBucket(null, sceneThrottleBps, sceneThrottleBps); + m_throttle = new TokenBucket(null, sceneThrottleBps); ThrottleRates = new ThrottleRates(configSource); } diff --git a/OpenSim/Region/ClientStack/LindenUDP/TokenBucket.cs b/OpenSim/Region/ClientStack/LindenUDP/TokenBucket.cs index 0a8331f..e4d59ff 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/TokenBucket.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/TokenBucket.cs @@ -26,6 +26,10 @@ */ using System; +using System.Collections; +using System.Collections.Generic; +using System.Reflection; +using log4net; namespace OpenSim.Region.ClientStack.LindenUDP { @@ -35,89 +39,126 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// public class TokenBucket { - /// Parent bucket to this bucket, or null if this is a root - /// bucket - TokenBucket parent; - /// Size of the bucket in bytes. If zero, the bucket has - /// infinite capacity - int maxBurst; - /// Rate that the bucket fills, in bytes per millisecond. If - /// zero, the bucket always remains full - int tokensPerMS; - /// Number of tokens currently in the bucket - int content; + private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + private static Int32 m_counter = 0; + + private Int32 m_identifier; + + /// + /// Number of ticks (ms) per quantum, drip rate and max burst + /// are defined over this interval. + /// + private const Int32 m_ticksPerQuantum = 1000; + + /// + /// This is the number of quantums worth of packets that can + /// be accommodated during a burst + /// + private const Double m_quantumsPerBurst = 1.5; + + /// + /// + private const Int32 m_minimumDripRate = 1400; + /// Time of the last drip, in system ticks - int lastDrip; + private Int32 m_lastDrip; + + /// + /// The number of bytes that can be sent at this moment. This is the + /// current number of tokens in the bucket + /// + private Int64 m_tokenCount; - #region Properties + /// + /// Map of children buckets and their requested maximum burst rate + /// + private Dictionary m_children = new Dictionary(); + +#region Properties /// /// The parent bucket of this bucket, or null if this bucket has no /// parent. The parent bucket will limit the aggregate bandwidth of all /// of its children buckets /// + private TokenBucket m_parent; public TokenBucket Parent { - get { return parent; } + get { return m_parent; } + set { m_parent = value; } } /// /// Maximum burst rate in bytes per second. This is the maximum number - /// of tokens that can accumulate in the bucket at any one time + /// of tokens that can accumulate in the bucket at any one time. This + /// also sets the total request for leaf nodes /// - public int MaxBurst + private Int64 m_burstRate; + public Int64 RequestedBurstRate { - get { return maxBurst; } - set { maxBurst = (value >= 0 ? value : 0); } + get { return m_burstRate; } + set { m_burstRate = (value < 0 ? 0 : value); } } + public Int64 BurstRate + { + get { + double rate = RequestedBurstRate * BurstRateModifier(); + if (rate < m_minimumDripRate * m_quantumsPerBurst) + rate = m_minimumDripRate * m_quantumsPerBurst; + + return (Int64) rate; + } + } + /// /// The speed limit of this bucket in bytes per second. This is the - /// number of tokens that are added to the bucket per second + /// number of tokens that are added to the bucket per quantum /// /// Tokens are added to the bucket any time /// is called, at the granularity of /// the system tick interval (typically around 15-22ms) - public int DripRate + private Int64 m_dripRate; + public Int64 RequestedDripRate { - get { return tokensPerMS * 1000; } - set - { - if (value == 0) - tokensPerMS = 0; - else - { - int bpms = (int)((float)value / 1000.0f); - - if (bpms <= 0) - tokensPerMS = 1; // 1 byte/ms is the minimum granularity - else - tokensPerMS = bpms; - } + get { return (m_dripRate == 0 ? m_totalDripRequest : m_dripRate); } + set { + m_dripRate = (value < 0 ? 0 : value); + m_burstRate = (Int64)((double)m_dripRate * m_quantumsPerBurst); + m_totalDripRequest = m_dripRate; + if (m_parent != null) + m_parent.RegisterRequest(this,m_dripRate); } } - /// - /// The speed limit of this bucket in bytes per millisecond - /// - public int DripPerMS + public Int64 DripRate { - get { return tokensPerMS; } + get { + if (m_parent == null) + return Math.Min(RequestedDripRate,TotalDripRequest); + + double rate = (double)RequestedDripRate * m_parent.DripRateModifier(); + if (rate < m_minimumDripRate) + rate = m_minimumDripRate; + + return (Int64)rate; + } } /// - /// The number of bytes that can be sent at this moment. This is the - /// current number of tokens in the bucket - /// If this bucket has a parent bucket that does not have - /// enough tokens for a request, will - /// return false regardless of the content of this bucket + /// The current total of the requested maximum burst rates of + /// this bucket's children buckets. /// - public int Content - { - get { return content; } - } + private Int64 m_totalDripRequest; + public Int64 TotalDripRequest + { + get { return m_totalDripRequest; } + set { m_totalDripRequest = value; } + } + +#endregion Properties - #endregion Properties +#region Constructor /// /// Default constructor @@ -128,56 +169,115 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// zero if this bucket has no maximum capacity /// Rate that the bucket fills, in bytes per /// second. If zero, the bucket always remains full - public TokenBucket(TokenBucket parent, int maxBurst, int dripRate) + public TokenBucket(TokenBucket parent, Int64 dripRate) { - this.parent = parent; - MaxBurst = maxBurst; - DripRate = dripRate; - lastDrip = Environment.TickCount & Int32.MaxValue; + m_identifier = m_counter++; + + Parent = parent; + RequestedDripRate = dripRate; + // TotalDripRequest = dripRate; // this will be overwritten when a child node registers + // MaxBurst = (Int64)((double)dripRate * m_quantumsPerBurst); + m_lastDrip = Environment.TickCount & Int32.MaxValue; } +#endregion Constructor + /// - /// Remove a given number of tokens from the bucket + /// Compute a modifier for the MaxBurst rate. This is 1.0, meaning + /// no modification if the requested bandwidth is less than the + /// max burst bandwidth all the way to the root of the throttle + /// hierarchy. However, if any of the parents is over-booked, then + /// the modifier will be less than 1. /// - /// Number of tokens to remove from the bucket - /// True if the requested number of tokens were removed from - /// the bucket, otherwise false - public bool RemoveTokens(int amount) + private double DripRateModifier() + { + Int64 driprate = DripRate; + return driprate >= TotalDripRequest ? 1.0 : (double)driprate / (double)TotalDripRequest; + } + + /// + /// + private double BurstRateModifier() + { + // for now... burst rate is always m_quantumsPerBurst (constant) + // larger than drip rate so the ratio of burst requests is the + // same as the drip ratio + return DripRateModifier(); + } + + /// + /// Register drip rate requested by a child of this throttle. Pass the + /// changes up the hierarchy. + /// + public void RegisterRequest(TokenBucket child, Int64 request) { - bool dummy; - return RemoveTokens(amount, out dummy); + m_children[child] = request; + // m_totalDripRequest = m_children.Values.Sum(); + + m_totalDripRequest = 0; + foreach (KeyValuePair cref in m_children) + m_totalDripRequest += cref.Value; + + // Pass the new values up to the parent + if (m_parent != null) + m_parent.RegisterRequest(this,Math.Min(RequestedDripRate, TotalDripRequest)); } /// + /// Remove the rate requested by a child of this throttle. Pass the + /// changes up the hierarchy. + /// + public void UnregisterRequest(TokenBucket child) + { + m_children.Remove(child); + // m_totalDripRequest = m_children.Values.Sum(); + + m_totalDripRequest = 0; + foreach (KeyValuePair cref in m_children) + m_totalDripRequest += cref.Value; + + // Pass the new values up to the parent + if (m_parent != null) + m_parent.RegisterRequest(this,Math.Min(RequestedDripRate, TotalDripRequest)); + } + + /// /// Remove a given number of tokens from the bucket /// /// Number of tokens to remove from the bucket - /// True if tokens were added to the bucket - /// during this call, otherwise false /// True if the requested number of tokens were removed from /// the bucket, otherwise false - public bool RemoveTokens(int amount, out bool dripSucceeded) + public bool RemoveTokens(Int64 amount) { - if (maxBurst == 0) + // Deposit tokens for this interval + Drip(); + + // If we have enough tokens then remove them and return + if (m_tokenCount - amount >= 0) { - dripSucceeded = true; - return true; + if (m_parent == null || m_parent.RemoveTokens(amount)) + { + m_tokenCount -= amount; + return true; + } } - dripSucceeded = Drip(); + return false; + } - if (content - amount >= 0) - { - if (parent != null && !parent.RemoveTokens(amount)) - return false; + /// + /// Deposit tokens into the bucket from a child bucket that did + /// not use all of its available tokens + /// + private void Deposit(Int64 count) + { + m_tokenCount += count; - content -= amount; - return true; - } - else - { - return false; - } + // Deposit the overflow in the parent bucket, this is how we share + // unused bandwidth + Int64 burstrate = BurstRate; + if (m_tokenCount > burstrate) + m_tokenCount = burstrate; } /// @@ -186,37 +286,29 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// call to Drip /// /// True if tokens were added to the bucket, otherwise false - public bool Drip() + private void Drip() { - if (tokensPerMS == 0) + // This should never happen... means we are a leaf node and were created + // with no drip rate... + if (DripRate == 0) { - content = maxBurst; - return true; + m_log.WarnFormat("[TOKENBUCKET] something odd is happening and drip rate is 0"); + return; } - else - { - int now = Environment.TickCount & Int32.MaxValue; - int deltaMS = now - lastDrip; - - if (deltaMS <= 0) - { - if (deltaMS < 0) - lastDrip = now; - return false; - } + + // Determine the interval over which we are adding tokens, never add + // more than a single quantum of tokens + Int32 now = Environment.TickCount & Int32.MaxValue; + Int32 deltaMS = Math.Min(now - m_lastDrip, m_ticksPerQuantum); - int dripAmount = deltaMS * tokensPerMS; + m_lastDrip = now; - content = Math.Min(content + dripAmount, maxBurst); - lastDrip = now; + // This can be 0 in the very unusual case that the timer wrapped + // It can be 0 if we try add tokens at a sub-tick rate + if (deltaMS <= 0) + return; - if (dripAmount < 0 || content < 0) - // sim has been idle for too long, integer has overflown - // previous calculation is meaningless, let's put it at correct max - content = maxBurst; - - return true; - } + Deposit(deltaMS * DripRate / m_ticksPerQuantum); } } } -- cgit v1.1