From 582cb89beb597247ceb6d82cdfc8fc983ffe8496 Mon Sep 17 00:00:00 2001 From: Melanie Date: Wed, 16 Jan 2013 19:29:27 +0100 Subject: Add a way to put things at the front of the queue for any throttle group. Adds a DoubleLocklessQueue and uses it for the outgoing buckets. Added a flag value to the Throttle Type (again) because although it's hacky, it's the best of a bad bunch to get the message through the UDP stack to where it's needed. --- OpenSim/Framework/LocklessQueue.cs | 8 ++-- OpenSim/Framework/ThrottleOutPacketType.cs | 2 + .../Region/ClientStack/Linden/UDP/LLClientView.cs | 8 ++-- .../Region/ClientStack/Linden/UDP/LLUDPClient.cs | 54 ++++++++++++++++++---- .../Region/ClientStack/Linden/UDP/LLUDPServer.cs | 10 +++- 5 files changed, 63 insertions(+), 19 deletions(-) diff --git a/OpenSim/Framework/LocklessQueue.cs b/OpenSim/Framework/LocklessQueue.cs index 84f887c..9bd9baf 100644 --- a/OpenSim/Framework/LocklessQueue.cs +++ b/OpenSim/Framework/LocklessQueue.cs @@ -29,7 +29,7 @@ using System.Threading; namespace OpenSim.Framework { - public sealed class LocklessQueue + public class LocklessQueue { private sealed class SingleLinkNode { @@ -41,7 +41,7 @@ namespace OpenSim.Framework SingleLinkNode tail; int count; - public int Count { get { return count; } } + public virtual int Count { get { return count; } } public LocklessQueue() { @@ -76,7 +76,7 @@ namespace OpenSim.Framework Interlocked.Increment(ref count); } - public bool Dequeue(out T item) + public virtual bool Dequeue(out T item) { item = default(T); SingleLinkNode oldHead = null; @@ -136,4 +136,4 @@ namespace OpenSim.Framework (object)Interlocked.CompareExchange(ref location, newValue, comparand); } } -} \ No newline at end of file +} diff --git a/OpenSim/Framework/ThrottleOutPacketType.cs b/OpenSim/Framework/ThrottleOutPacketType.cs index ca4b126..87899f0 100644 --- a/OpenSim/Framework/ThrottleOutPacketType.cs +++ b/OpenSim/Framework/ThrottleOutPacketType.cs @@ -47,6 +47,8 @@ namespace OpenSim.Framework Texture = 5, /// Non-texture assets Asset = 6, + + HighPriority = 128, } [Flags] diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs index 956c2c9..9550b5a 100644 --- a/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs +++ b/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs @@ -2788,7 +2788,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP Transfer.TransferInfo.Size = req.AssetInf.Data.Length; Transfer.TransferInfo.TransferID = req.TransferRequestID; Transfer.Header.Zerocoded = true; - OutPacket(Transfer, isWearable ? ThrottleOutPacketType.Task : ThrottleOutPacketType.Asset); + OutPacket(Transfer, isWearable ? ThrottleOutPacketType.Task | ThrottleOutPacketType.HighPriority : ThrottleOutPacketType.Asset); if (req.NumPackets == 1) { @@ -2799,7 +2799,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP TransferPacket.TransferData.Data = req.AssetInf.Data; TransferPacket.TransferData.Status = 1; TransferPacket.Header.Zerocoded = true; - OutPacket(TransferPacket, isWearable ? ThrottleOutPacketType.Task : ThrottleOutPacketType.Asset); + OutPacket(TransferPacket, isWearable ? ThrottleOutPacketType.Task | ThrottleOutPacketType.HighPriority : ThrottleOutPacketType.Asset); } else { @@ -2832,7 +2832,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP TransferPacket.TransferData.Status = 1; } TransferPacket.Header.Zerocoded = true; - OutPacket(TransferPacket, isWearable ? ThrottleOutPacketType.Task : ThrottleOutPacketType.Asset); + OutPacket(TransferPacket, isWearable ? ThrottleOutPacketType.Task | ThrottleOutPacketType.HighPriority : ThrottleOutPacketType.Asset); processedLength += chunkSize; packetNumber++; @@ -3605,7 +3605,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP } } - OutPacket(aw, ThrottleOutPacketType.Task); + OutPacket(aw, ThrottleOutPacketType.Task | ThrottleOutPacketType.HighPriority); } public void SendAppearance(UUID agentID, byte[] visualParams, byte[] textureEntry) diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs index f1a1812..e52ac37 100644 --- a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs +++ b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs @@ -92,7 +92,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Packets we have sent that need to be ACKed by the client public readonly UnackedPacketCollection NeedAcks = new UnackedPacketCollection(); /// ACKs that are queued up, waiting to be sent to the client - public readonly OpenSim.Framework.LocklessQueue PendingAcks = new OpenSim.Framework.LocklessQueue(); + public readonly DoubleLocklessQueue PendingAcks = new DoubleLocklessQueue(); /// Current packet sequence number public int CurrentSequence; @@ -146,7 +146,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Throttle buckets for each packet category private readonly TokenBucket[] m_throttleCategories; /// Outgoing queues for throttled packets - private readonly OpenSim.Framework.LocklessQueue[] m_packetOutboxes = new OpenSim.Framework.LocklessQueue[THROTTLE_CATEGORY_COUNT]; + private readonly DoubleLocklessQueue[] m_packetOutboxes = new DoubleLocklessQueue[THROTTLE_CATEGORY_COUNT]; /// A container that can hold one packet for each outbox, used to store /// dequeued packets that are being held for throttling private readonly OutgoingPacket[] m_nextPackets = new OutgoingPacket[THROTTLE_CATEGORY_COUNT]; @@ -202,7 +202,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP ThrottleOutPacketType type = (ThrottleOutPacketType)i; // Initialize the packet outboxes, where packets sit while they are waiting for tokens - m_packetOutboxes[i] = new OpenSim.Framework.LocklessQueue(); + m_packetOutboxes[i] = new DoubleLocklessQueue(); // Initialize the token buckets that control the throttling for each category m_throttleCategories[i] = new TokenBucket(m_throttleCategory, rates.GetRate(type)); } @@ -430,15 +430,20 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// public bool EnqueueOutgoing(OutgoingPacket packet, bool forceQueue) { + return EnqueueOutgoing(packet, forceQueue, false); + } + + public bool EnqueueOutgoing(OutgoingPacket packet, bool forceQueue, bool highPriority) + { int category = (int)packet.Category; if (category >= 0 && category < m_packetOutboxes.Length) { - OpenSim.Framework.LocklessQueue queue = m_packetOutboxes[category]; + DoubleLocklessQueue queue = m_packetOutboxes[category]; if (m_deliverPackets == false) { - queue.Enqueue(packet); + queue.Enqueue(packet, highPriority); return true; } @@ -449,7 +454,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP // queued packets if (queue.Count > 0) { - queue.Enqueue(packet); + queue.Enqueue(packet, highPriority); return true; } @@ -462,7 +467,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP else { // Force queue specified or not enough tokens in the bucket, queue this packet - queue.Enqueue(packet); + queue.Enqueue(packet, highPriority); return true; } } @@ -494,7 +499,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP if (m_deliverPackets == false) return false; OutgoingPacket packet = null; - OpenSim.Framework.LocklessQueue queue; + DoubleLocklessQueue queue; TokenBucket bucket; bool packetSent = false; ThrottleOutPacketTypeFlags emptyCategories = 0; @@ -534,7 +539,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP } catch { - m_packetOutboxes[i] = new OpenSim.Framework.LocklessQueue(); + m_packetOutboxes[i] = new DoubleLocklessQueue(); } if (success) { @@ -567,7 +572,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP } else { - m_packetOutboxes[i] = new OpenSim.Framework.LocklessQueue(); + m_packetOutboxes[i] = new DoubleLocklessQueue(); emptyCategories |= CategoryToFlag(i); } } @@ -724,4 +729,33 @@ namespace OpenSim.Region.ClientStack.LindenUDP } } } + + public class DoubleLocklessQueue : OpenSim.Framework.LocklessQueue + { + OpenSim.Framework.LocklessQueue highQueue = new OpenSim.Framework.LocklessQueue(); + + public override int Count + { + get + { + return base.Count + highQueue.Count; + } + } + + public override bool Dequeue(out T item) + { + if (highQueue.Dequeue(out item)) + return true; + + return base.Dequeue(out item); + } + + public void Enqueue(T item, bool highPriority) + { + if (highPriority) + highQueue.Enqueue(item); + else + Enqueue(item); + } + } } diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs index 9a4abd4..6c72edc 100644 --- a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs +++ b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs @@ -803,6 +803,14 @@ namespace OpenSim.Region.ClientStack.LindenUDP #region Queue or Send + bool highPriority = false; + + if (category != ThrottleOutPacketType.Unknown && (category & ThrottleOutPacketType.HighPriority) != 0) + { + category = (ThrottleOutPacketType)((int)category & 127); + highPriority = true; + } + OutgoingPacket outgoingPacket = new OutgoingPacket(udpClient, buffer, category, null); // If we were not provided a method for handling unacked, use the UDPServer default method outgoingPacket.UnackedMethod = ((method == null) ? delegate(OutgoingPacket oPacket) { ResendUnacked(oPacket); } : method); @@ -811,7 +819,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP // continue to display the deleted object until relog. Therefore, we need to always queue a kill object // packet so that it isn't sent before a queued update packet. bool requestQueue = type == PacketType.KillObject; - if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, requestQueue)) + if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, requestQueue, highPriority)) SendPacketFinal(outgoingPacket); #endregion Queue or Send -- cgit v1.1