From 4221ec23f93cfb09a20abdcf83fcf46d2d6cd2c5 Mon Sep 17 00:00:00 2001 From: Sean Dague Date: Fri, 7 Dec 2007 19:13:35 +0000 Subject: further screwing around with the PacketQueue data structure. Nearly time to replace a chunk of ClientView with this. --- OpenSim/Region/ClientStack/PacketQueue.cs | 188 +++++++++++++++++++++++------- 1 file changed, 149 insertions(+), 39 deletions(-) (limited to 'OpenSim') diff --git a/OpenSim/Region/ClientStack/PacketQueue.cs b/OpenSim/Region/ClientStack/PacketQueue.cs index 6dfa15c..f2d270c 100644 --- a/OpenSim/Region/ClientStack/PacketQueue.cs +++ b/OpenSim/Region/ClientStack/PacketQueue.cs @@ -44,7 +44,7 @@ namespace OpenSim.Region.ClientStack { public class PacketQueue { - private BlockingQueue SendQueue; + private Queue SendQueue; private Queue IncomingPacketQueue; private Queue OutgoingPacketQueue; @@ -76,7 +76,8 @@ namespace OpenSim.Region.ClientStack private PacketThrottle TextureThrottle; private PacketThrottle TotalThrottle; - private Timer throttleTimer; + private long LastThrottle; + private long ThrottleInterval; public PacketQueue() { @@ -85,7 +86,7 @@ namespace OpenSim.Region.ClientStack // in it to process. it's an on-purpose threadlock though because // without it, the clientloop will suck up all sim resources. - SendQueue = new BlockingQueue(); + SendQueue = new Queue(); IncomingPacketQueue = new Queue(); OutgoingPacketQueue = new Queue(); @@ -111,9 +112,59 @@ namespace OpenSim.Region.ClientStack TotalThrottle = new PacketThrottle(0, 162144, 1536000); // TIMERS needed for this - throttleTimer = new Timer((int)(throttletimems/throttleTimeDivisor)); - throttleTimer.Elapsed += new ElapsedEventHandler(throttleTimer_Elapsed); - throttleTimer.Start(); + LastThrottle = DateTime.Now.Ticks; + ThrottleInterval = (long)(throttletimems/throttleTimeDivisor); + } + + /* STANDARD QUEUE MANIPULATION INTERFACES */ + + + public void Enqueue(QueItem item) + { + // We could micro lock, but that will tend to actually + // probably be worse than just synchronizing on SendQueue + lock (SendQueue) { + switch (item.throttleType) + { + case ThrottleOutPacketType.Resend: + ThrottleCheck(ref ResendThrottle, ref ResendOutgoingPacketQueue, item); + break; + case ThrottleOutPacketType.Texture: + ThrottleCheck(ref TextureThrottle, ref TextureOutgoingPacketQueue, item); + break; + case ThrottleOutPacketType.Task: + ThrottleCheck(ref TaskThrottle, ref TaskOutgoingPacketQueue, item); + break; + case ThrottleOutPacketType.Land: + ThrottleCheck(ref LandThrottle, ref LandOutgoingPacketQueue, item); + break; + case ThrottleOutPacketType.Asset: + ThrottleCheck(ref AssetThrottle, ref AssetOutgoingPacketQueue, item); + break; + case ThrottleOutPacketType.Cloud: + ThrottleCheck(ref CloudThrottle, ref CloudOutgoingPacketQueue, item); + break; + case ThrottleOutPacketType.Wind: + ThrottleCheck(ref WindThrottle, ref WindOutgoingPacketQueue, item); + break; + + default: + // Acknowledgements and other such stuff should go directly to the blocking Queue + // Throttling them may and likely 'will' be problematic + SendQueue.Enqueue(item); + break; + } + } + } + + public QueItem Dequeue() + { + if (ThrottlingTime()) { + ProcessThrottle(); + } + lock (SendQueue) { + return SendQueue.Dequeue(); + } } private void ResetCounters() @@ -138,7 +189,99 @@ namespace OpenSim.Region.ClientStack AssetOutgoingPacketQueue.Count > 0 || TextureOutgoingPacketQueue.Count > 0); } + + // Run through our wait queues and flush out allotted numbers of bytes into the process queue + + private bool ThrottlingTime() + { + if(DateTime.Now.Ticks < (LastThrottle + ThrottleInterval)) { + LastThrottle = DateTime.Now.Ticks; + return true; + } else { + return false; + } + } + public void ProcessThrottle() + { + + // I was considering this.. Will an event fire if the thread it's on is blocked? + + // Then I figured out.. it doesn't really matter.. because this thread won't be blocked for long + // The General overhead of the UDP protocol gets sent to the queue un-throttled by this + // so This'll pick up about around the right time. + + int MaxThrottleLoops = 4550; // 50*7 packets can be dequeued at once. + int throttleLoops = 0; + + // We're going to dequeue all of the saved up packets until + // we've hit the throttle limit or there's no more packets to send + lock (SendQueue) { + ResetCounters(); + while (TotalThrottle.UnderLimit() && PacketsWaiting() && + (throttleLoops <= MaxThrottleLoops)) + { + throttleLoops++; + //Now comes the fun part.. we dump all our elements into PacketQueue that we've saved up. + if (ResendThrottle.UnderLimit() && ResendOutgoingPacketQueue.Count > 0) + { + QueItem qpack = ResendOutgoingPacketQueue.Dequeue(); + + SendQueue.Enqueue(qpack); + TotalThrottle.Add(qpack.Packet.ToBytes().Length); + ResendThrottle.Add(qpack.Packet.ToBytes().Length); + } + if (LandThrottle.UnderLimit() && LandOutgoingPacketQueue.Count > 0) + { + QueItem qpack = LandOutgoingPacketQueue.Dequeue(); + + SendQueue.Enqueue(qpack); + TotalThrottle.Add(qpack.Packet.ToBytes().Length); + LandThrottle.Add(qpack.Packet.ToBytes().Length); + } + if (WindThrottle.UnderLimit() && WindOutgoingPacketQueue.Count > 0) + { + QueItem qpack = WindOutgoingPacketQueue.Dequeue(); + + SendQueue.Enqueue(qpack); + TotalThrottle.Add(qpack.Packet.ToBytes().Length); + WindThrottle.Add(qpack.Packet.ToBytes().Length); + } + if (CloudThrottle.UnderLimit() && CloudOutgoingPacketQueue.Count > 0) + { + QueItem qpack = CloudOutgoingPacketQueue.Dequeue(); + + SendQueue.Enqueue(qpack); + TotalThrottle.Add(qpack.Packet.ToBytes().Length); + CloudThrottle.Add(qpack.Packet.ToBytes().Length); + } + if (TaskThrottle.UnderLimit() && TaskOutgoingPacketQueue.Count > 0) + { + QueItem qpack = TaskOutgoingPacketQueue.Dequeue(); + + SendQueue.Enqueue(qpack); + TotalThrottle.Add(qpack.Packet.ToBytes().Length); + TaskThrottle.Add(qpack.Packet.ToBytes().Length); + } + if (TextureThrottle.UnderLimit() && TextureOutgoingPacketQueue.Count > 0) + { + QueItem qpack = TextureOutgoingPacketQueue.Dequeue(); + + SendQueue.Enqueue(qpack); + TotalThrottle.Add(qpack.Packet.ToBytes().Length); + TextureThrottle.Add(qpack.Packet.ToBytes().Length); + } + if (AssetThrottle.UnderLimit() && AssetOutgoingPacketQueue.Count > 0) + { + QueItem qpack = AssetOutgoingPacketQueue.Dequeue(); + + SendQueue.Enqueue(qpack); + TotalThrottle.Add(qpack.Packet.ToBytes().Length); + AssetThrottle.Add(qpack.Packet.ToBytes().Length); + } + } + } + } private void throttleTimer_Elapsed(object sender, ElapsedEventArgs e) { @@ -239,39 +382,6 @@ namespace OpenSim.Region.ClientStack } } - public void Add(QueItem item) - { - switch (item.throttleType) - { - case ThrottleOutPacketType.Resend: - ThrottleCheck(ref ResendThrottle, ref ResendOutgoingPacketQueue, item); - break; - case ThrottleOutPacketType.Texture: - ThrottleCheck(ref TextureThrottle, ref TextureOutgoingPacketQueue, item); - break; - case ThrottleOutPacketType.Task: - ThrottleCheck(ref TaskThrottle, ref TaskOutgoingPacketQueue, item); - break; - case ThrottleOutPacketType.Land: - ThrottleCheck(ref LandThrottle, ref LandOutgoingPacketQueue, item); - break; - case ThrottleOutPacketType.Asset: - ThrottleCheck(ref AssetThrottle, ref AssetOutgoingPacketQueue, item); - break; - case ThrottleOutPacketType.Cloud: - ThrottleCheck(ref CloudThrottle, ref CloudOutgoingPacketQueue, item); - break; - case ThrottleOutPacketType.Wind: - ThrottleCheck(ref WindThrottle, ref WindOutgoingPacketQueue, item); - break; - - default: - // Acknowledgements and other such stuff should go directly to the blocking Queue - // Throttling them may and likely 'will' be problematic - SendQueue.Enqueue(item); - break; - } - } private int ScaleThrottle(int value, int curmax, int newmax) { -- cgit v1.1