From 4221ec23f93cfb09a20abdcf83fcf46d2d6cd2c5 Mon Sep 17 00:00:00 2001
From: Sean Dague
Date: Fri, 7 Dec 2007 19:13:35 +0000
Subject: further screwing around with the PacketQueue data structure. Nearly
 time to replace a chunk of ClientView with this.

---
 OpenSim/Region/ClientStack/PacketQueue.cs | 188 +++++++++++++++++++++++-------
 1 file changed, 149 insertions(+), 39 deletions(-)

diff --git a/OpenSim/Region/ClientStack/PacketQueue.cs b/OpenSim/Region/ClientStack/PacketQueue.cs
index 6dfa15c..f2d270c 100644
--- a/OpenSim/Region/ClientStack/PacketQueue.cs
+++ b/OpenSim/Region/ClientStack/PacketQueue.cs
@@ -44,7 +44,7 @@ namespace OpenSim.Region.ClientStack
 {
     public class PacketQueue
     {
-        private BlockingQueue<QueItem> SendQueue;
+        private Queue<QueItem> SendQueue;
         
         private Queue<QueItem> IncomingPacketQueue;
         private Queue<QueItem> OutgoingPacketQueue;
@@ -76,7 +76,8 @@ namespace OpenSim.Region.ClientStack
         private PacketThrottle TextureThrottle;
         private PacketThrottle TotalThrottle;
 
-        private Timer throttleTimer;
+        private long LastThrottle;
+        private long ThrottleInterval;
 
         public PacketQueue() 
         {
@@ -85,7 +86,7 @@ namespace OpenSim.Region.ClientStack
             // in it to process.  it's an on-purpose threadlock though because 
             // without it, the clientloop will suck up all sim resources.
             
-            SendQueue = new BlockingQueue<QueItem>();
+            SendQueue = new Queue<QueItem>();
             
             IncomingPacketQueue = new Queue<QueItem>();
             OutgoingPacketQueue = new Queue<QueItem>();
@@ -111,9 +112,59 @@ namespace OpenSim.Region.ClientStack
             TotalThrottle = new PacketThrottle(0, 162144, 1536000);
             
             // TIMERS needed for this
-            throttleTimer = new Timer((int)(throttletimems/throttleTimeDivisor));
-            throttleTimer.Elapsed += new ElapsedEventHandler(throttleTimer_Elapsed);
-            throttleTimer.Start();
+            LastThrottle = DateTime.Now.Ticks;
+            ThrottleInterval = (long)(throttletimems/throttleTimeDivisor);
+        }
+
+        /* STANDARD QUEUE MANIPULATION INTERFACES */
+
+
+        public void Enqueue(QueItem item)
+        {
+            // We could micro lock, but that will tend to actually
+            // probably be worse than just synchronizing on SendQueue
+            lock (SendQueue) {
+                switch (item.throttleType)
+                {
+                case ThrottleOutPacketType.Resend:
+                    ThrottleCheck(ref ResendThrottle, ref ResendOutgoingPacketQueue, item);
+                    break;
+                case ThrottleOutPacketType.Texture:
+                    ThrottleCheck(ref TextureThrottle, ref TextureOutgoingPacketQueue, item);
+                    break;
+                case ThrottleOutPacketType.Task:
+                    ThrottleCheck(ref TaskThrottle, ref TaskOutgoingPacketQueue, item);
+                    break;
+                case ThrottleOutPacketType.Land:
+                    ThrottleCheck(ref LandThrottle, ref LandOutgoingPacketQueue, item);
+                    break;
+                case ThrottleOutPacketType.Asset:
+                    ThrottleCheck(ref AssetThrottle, ref AssetOutgoingPacketQueue, item);
+                    break;
+                case ThrottleOutPacketType.Cloud:
+                    ThrottleCheck(ref CloudThrottle, ref CloudOutgoingPacketQueue, item);
+                    break;
+                case ThrottleOutPacketType.Wind:
+                    ThrottleCheck(ref WindThrottle, ref WindOutgoingPacketQueue, item);
+                    break;
+                    
+                default:
+                    // Acknowledgements and other such stuff should go directly to the blocking Queue
+                    // Throttling them may and likely 'will' be problematic
+                    SendQueue.Enqueue(item); 
+                    break;
+                }
+            }
+        }
+        
+        public QueItem Dequeue()
+        {
+            if (ThrottlingTime()) {
+                ProcessThrottle();
+            }
+            lock (SendQueue) {
+                return SendQueue.Dequeue();
+            }
         }
 
         private void ResetCounters()
@@ -138,7 +189,99 @@ namespace OpenSim.Region.ClientStack
                     AssetOutgoingPacketQueue.Count > 0 ||
                     TextureOutgoingPacketQueue.Count > 0);
         }
+
+        // Run through our wait queues and flush out allotted numbers of bytes into the process queue
+
+        private bool ThrottlingTime()
+        {
+            if(DateTime.Now.Ticks < (LastThrottle + ThrottleInterval)) {
+                LastThrottle = DateTime.Now.Ticks;
+                return true;
+            } else {
+                return false;
+            }
+        }
         
+        public void ProcessThrottle()
+        {
+            
+            // I was considering this..   Will an event fire if the thread it's on is blocked?
+            
+            // Then I figured out..  it doesn't really matter..  because this thread won't be blocked for long
+            // The General overhead of the UDP protocol gets sent to the queue un-throttled by this
+            // so This'll pick up about around the right time.
+            
+            int MaxThrottleLoops = 4550; // 50*7 packets can be dequeued at once.
+            int throttleLoops = 0;
+            
+            // We're going to dequeue all of the saved up packets until 
+            // we've hit the throttle limit or there's no more packets to send
+            lock (SendQueue) {
+                ResetCounters(); 
+                while (TotalThrottle.UnderLimit() && PacketsWaiting() && 
+                       (throttleLoops <= MaxThrottleLoops))
+                {
+                    throttleLoops++;
+                    //Now comes the fun part..   we dump all our elements into PacketQueue that we've saved up.
+                    if (ResendThrottle.UnderLimit() && ResendOutgoingPacketQueue.Count > 0)
+                    {
+                        QueItem qpack = ResendOutgoingPacketQueue.Dequeue();
+                        
+                        SendQueue.Enqueue(qpack);
+                        TotalThrottle.Add(qpack.Packet.ToBytes().Length);
+                        ResendThrottle.Add(qpack.Packet.ToBytes().Length);
+                    }
+                    if (LandThrottle.UnderLimit() && LandOutgoingPacketQueue.Count > 0)
+                    {
+                        QueItem qpack = LandOutgoingPacketQueue.Dequeue();
+                        
+                        SendQueue.Enqueue(qpack);
+                        TotalThrottle.Add(qpack.Packet.ToBytes().Length);
+                        LandThrottle.Add(qpack.Packet.ToBytes().Length);
+                    }
+                    if (WindThrottle.UnderLimit() && WindOutgoingPacketQueue.Count > 0)
+                    {
+                        QueItem qpack = WindOutgoingPacketQueue.Dequeue();
+                        
+                        SendQueue.Enqueue(qpack);
+                        TotalThrottle.Add(qpack.Packet.ToBytes().Length);
+                        WindThrottle.Add(qpack.Packet.ToBytes().Length);
+                    }
+                    if (CloudThrottle.UnderLimit() && CloudOutgoingPacketQueue.Count > 0)
+                    {
+                        QueItem qpack = CloudOutgoingPacketQueue.Dequeue();
+                        
+                        SendQueue.Enqueue(qpack);
+                        TotalThrottle.Add(qpack.Packet.ToBytes().Length);
+                        CloudThrottle.Add(qpack.Packet.ToBytes().Length);
+                    }
+                    if (TaskThrottle.UnderLimit() && TaskOutgoingPacketQueue.Count > 0)
+                    {
+                        QueItem qpack = TaskOutgoingPacketQueue.Dequeue();
+                        
+                        SendQueue.Enqueue(qpack);
+                        TotalThrottle.Add(qpack.Packet.ToBytes().Length);
+                        TaskThrottle.Add(qpack.Packet.ToBytes().Length);
+                    }
+                    if (TextureThrottle.UnderLimit() && TextureOutgoingPacketQueue.Count > 0)
+                    {
+                        QueItem qpack = TextureOutgoingPacketQueue.Dequeue();
+                        
+                        SendQueue.Enqueue(qpack);
+                        TotalThrottle.Add(qpack.Packet.ToBytes().Length);
+                        TextureThrottle.Add(qpack.Packet.ToBytes().Length);
+                    }
+                    if (AssetThrottle.UnderLimit() && AssetOutgoingPacketQueue.Count > 0)
+                    {
+                        QueItem qpack = AssetOutgoingPacketQueue.Dequeue();
+                        
+                        SendQueue.Enqueue(qpack);
+                        TotalThrottle.Add(qpack.Packet.ToBytes().Length);
+                        AssetThrottle.Add(qpack.Packet.ToBytes().Length);
+                    }
+                }
+            }
+        }
        
         private void throttleTimer_Elapsed(object sender, ElapsedEventArgs e)
         {   
@@ -239,39 +382,6 @@ namespace OpenSim.Region.ClientStack
             }
         }
 
-        public void Add(QueItem item)
-        {
-            switch (item.throttleType)
-            {
-                case ThrottleOutPacketType.Resend:
-                    ThrottleCheck(ref ResendThrottle, ref ResendOutgoingPacketQueue, item);
-                    break;
-                case ThrottleOutPacketType.Texture:
-                    ThrottleCheck(ref TextureThrottle, ref TextureOutgoingPacketQueue, item);
-                    break;
-                case ThrottleOutPacketType.Task:
-                    ThrottleCheck(ref TaskThrottle, ref TaskOutgoingPacketQueue, item);
-                    break;
-                case ThrottleOutPacketType.Land:
-                    ThrottleCheck(ref LandThrottle, ref LandOutgoingPacketQueue, item);
-                    break;
-                case ThrottleOutPacketType.Asset:
-                    ThrottleCheck(ref AssetThrottle, ref AssetOutgoingPacketQueue, item);
-                    break;
-                case ThrottleOutPacketType.Cloud:
-                    ThrottleCheck(ref CloudThrottle, ref CloudOutgoingPacketQueue, item);
-                    break;
-                case ThrottleOutPacketType.Wind:
-                    ThrottleCheck(ref WindThrottle, ref WindOutgoingPacketQueue, item);
-                    break;
-
-                default:
-                    // Acknowledgements and other such stuff should go directly to the blocking Queue
-                    // Throttling them may and likely 'will' be problematic
-                    SendQueue.Enqueue(item); 
-                    break;
-            }
-        }
 
         private int ScaleThrottle(int value, int curmax, int newmax)
         {
-- 
cgit v1.1