From 61371c7c16eea3b856a852b94c98b18b99ccf8fb Mon Sep 17 00:00:00 2001
From: Mic Bowman
Date: Mon, 28 Mar 2011 10:00:53 -0700
Subject: Implements adaptive queue management and fair queueing for improved
 networking performance.

Reprioritization algorithms need to be ported still. One is
in place.
---
 .../Region/ClientStack/LindenUDP/LLClientView.cs   | 353 ++++++++++++++-------
 1 file changed, 235 insertions(+), 118 deletions(-)

(limited to 'OpenSim/Region/ClientStack')

diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs
index 2c6795f..db149a1 100644
--- a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs
+++ b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs
@@ -49,6 +49,8 @@ using Timer = System.Timers.Timer;
 using AssetLandmark = OpenSim.Framework.AssetLandmark;
 using Nini.Config;
 
+using System.IO;
+
 namespace OpenSim.Region.ClientStack.LindenUDP
 {
     public delegate bool PacketMethod(IClientAPI simClient, Packet packet);
@@ -298,6 +300,77 @@ namespace OpenSim.Region.ClientStack.LindenUDP
         /// <summary>Used to adjust Sun Orbit values so Linden based viewers properly position sun</summary>
         private const float m_sunPainDaHalfOrbitalCutoff = 4.712388980384689858f;
 
+                        // First log file or time has expired, start writing to a new log file
+//<MIC>
+// -----------------------------------------------------------------
+// -----------------------------------------------------------------
+// THIS IS DEBUGGING CODE & SHOULD BE REMOVED
+// -----------------------------------------------------------------
+// -----------------------------------------------------------------
+        public class QueueLogger
+        {
+            public Int32 start = 0;
+            public StreamWriter Log = null;
+            private Dictionary<UUID,int> m_idMap = new Dictionary<UUID,int>();
+
+            public QueueLogger()
+            {
+                DateTime now = DateTime.Now;
+                String fname = String.Format("queue-{0}.log", now.ToString("yyyyMMddHHmmss"));
+                Log = new StreamWriter(fname);
+
+                start = Util.EnvironmentTickCount();
+            }
+
+            public int LookupID(UUID uuid)
+            {
+                int localid;
+                if (! m_idMap.TryGetValue(uuid,out localid))
+                {
+                    localid = m_idMap.Count + 1;
+                    m_idMap[uuid] = localid;
+                }
+                
+                return localid;                
+            }
+        }
+
+        public static QueueLogger QueueLog = null;
+
+        // -----------------------------------------------------------------
+        public void LogAvatarUpdateEvent(UUID client, UUID avatar, Int32 timeinqueue)
+        {
+            if (QueueLog == null)
+                QueueLog = new QueueLogger();
+
+            Int32 ticks = Util.EnvironmentTickCountSubtract(QueueLog.start);
+            lock(QueueLog)
+            {
+                int cid = QueueLog.LookupID(client);
+                int aid = QueueLog.LookupID(avatar);
+                QueueLog.Log.WriteLine("{0},AU,AV{1:D4},AV{2:D4},{3}",ticks,cid,aid,timeinqueue);
+            }
+        }
+
+        // -----------------------------------------------------------------
+        public void LogQueueProcessEvent(UUID client, PriorityQueue queue, uint maxup)
+        {
+            if (QueueLog == null)
+                QueueLog = new QueueLogger();
+
+            Int32 ticks = Util.EnvironmentTickCountSubtract(QueueLog.start);
+            lock(QueueLog)
+            {
+                int cid = QueueLog.LookupID(client);
+                QueueLog.Log.WriteLine("{0},PQ,AV{1:D4},{2},{3}",ticks,cid,maxup,queue.ToString());
+            }
+        }
+// -----------------------------------------------------------------
+// -----------------------------------------------------------------
+// -----------------------------------------------------------------
+// -----------------------------------------------------------------
+//</MIC>
+
         private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
         protected static Dictionary<PacketType, PacketMethod> PacketHandlers = new Dictionary<PacketType, PacketMethod>(); //Global/static handlers for all clients
 
@@ -3547,18 +3620,23 @@ namespace OpenSim.Region.ClientStack.LindenUDP
 
         #region Primitive Packet/Data Sending Methods
 
+        
         /// <summary>
         /// Generate one of the object update packets based on PrimUpdateFlags
         /// and broadcast the packet to clients
         /// </summary>
         public void SendPrimUpdate(ISceneEntity entity, PrimUpdateFlags updateFlags)
         {
-            double priority = m_prioritizer.GetUpdatePriority(this, entity);
+            //double priority = m_prioritizer.GetUpdatePriority(this, entity);
+            uint priority = m_prioritizer.GetUpdatePriority(this, entity);
 
             lock (m_entityUpdates.SyncRoot)
-                m_entityUpdates.Enqueue(priority, new EntityUpdate(entity, updateFlags, m_scene.TimeDilation), entity.LocalId);
+                m_entityUpdates.Enqueue(priority, new EntityUpdate(entity, updateFlags, m_scene.TimeDilation));
         }
 
+        private Int32 m_LastQueueFill = 0;
+        private uint m_maxUpdates = 0;
+
         private void ProcessEntityUpdates(int maxUpdates)
         {
             OpenSim.Framework.Lazy<List<ObjectUpdatePacket.ObjectDataBlock>> objectUpdateBlocks = new OpenSim.Framework.Lazy<List<ObjectUpdatePacket.ObjectDataBlock>>();
@@ -3566,23 +3644,55 @@ namespace OpenSim.Region.ClientStack.LindenUDP
             OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>> terseUpdateBlocks = new OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>>();
             OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>> terseAgentUpdateBlocks = new OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>>();
 
-            if (maxUpdates <= 0) maxUpdates = Int32.MaxValue;
+            if (maxUpdates <= 0)
+            {
+                m_maxUpdates = Int32.MaxValue;
+            }
+            else
+            {
+                if (m_maxUpdates == 0 || m_LastQueueFill == 0)
+                {
+                    m_maxUpdates = (uint)maxUpdates;
+                }
+                else
+                {
+                    if (Util.EnvironmentTickCountSubtract(m_LastQueueFill) < 200)
+                        m_maxUpdates += 5;
+                    else
+                        m_maxUpdates = m_maxUpdates >> 1;
+                }
+                m_maxUpdates = Util.Clamp<uint>(m_maxUpdates,10,500);
+            }
+            m_LastQueueFill = Util.EnvironmentTickCount();
+            
             int updatesThisCall = 0;
 
+//<MIC>
+// DEBUGGING CODE... REMOVE
+//            LogQueueProcessEvent(this.m_agentId,m_entityUpdates,m_maxUpdates);
+//</MIC>            
             // We must lock for both manipulating the kill record and sending the packet, in order to avoid a race
             // condition where a kill can be processed before an out-of-date update for the same object.                        
             lock (m_killRecord)
             {
                 float avgTimeDilation = 1.0f;
                 EntityUpdate update;
-                while (updatesThisCall < maxUpdates)
+                Int32 timeinqueue; // this is just debugging code & can be dropped later
+                
+                while (updatesThisCall < m_maxUpdates)
                 {
                     lock (m_entityUpdates.SyncRoot)
-                        if (!m_entityUpdates.TryDequeue(out update))
+                        if (!m_entityUpdates.TryDequeue(out update, out timeinqueue))
                             break;
                     avgTimeDilation += update.TimeDilation;
                     avgTimeDilation *= 0.5f;
 
+// <MIC>
+// DEBUGGING CODE... REMOVE
+//                    if (update.Entity is ScenePresence)
+//                        LogAvatarUpdateEvent(this.m_agentId,update.Entity.UUID,timeinqueue);
+// </MIC>
+
                     if (update.Entity is SceneObjectPart)
                     {
                         SceneObjectPart part = (SceneObjectPart)update.Entity;
@@ -3679,36 +3789,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
                         }
                         else
                         {
-    //                            if (update.Entity is SceneObjectPart && ((SceneObjectPart)update.Entity).IsAttachment)
-    //                            {
-    //                                SceneObjectPart sop = (SceneObjectPart)update.Entity;
-    //                                string text = sop.Text;
-    //                                if (text.IndexOf("\n") >= 0)
-    //                                    text = text.Remove(text.IndexOf("\n"));
-    //
-    //                                if (m_attachmentsSent.Contains(sop.ParentID))
-    //                                {
-    ////                                    m_log.DebugFormat(
-    ////                                        "[CLIENT]: Sending full info about attached prim {0} text {1}",
-    ////                                        sop.LocalId, text);
-    //
-    //                                    objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock(sop, this.m_agentId));
-    //
-    //                                    m_attachmentsSent.Add(sop.LocalId);
-    //                                }
-    //                                else
-    //                                {
-    //                                    m_log.DebugFormat(
-    //                                        "[CLIENT]: Requeueing full update of prim {0} text {1} since we haven't sent its parent {2} yet",
-    //                                        sop.LocalId, text, sop.ParentID);
-    //
-    //                                    m_entityUpdates.Enqueue(double.MaxValue, update, sop.LocalId);
-    //                                }
-    //                            }
-    //                            else
-    //                            {
-                                objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock((SceneObjectPart)update.Entity, this.m_agentId));
-    //                            }
+                            objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock((SceneObjectPart)update.Entity, this.m_agentId));
                         }
                     }
                     else if (!canUseImproved)
@@ -3802,26 +3883,24 @@ namespace OpenSim.Region.ClientStack.LindenUDP
 
         public void ReprioritizeUpdates()
         {
-            //m_log.Debug("[CLIENT]: Reprioritizing prim updates for " + m_firstName + " " + m_lastName);
-
             lock (m_entityUpdates.SyncRoot)
                 m_entityUpdates.Reprioritize(UpdatePriorityHandler);
         }
 
-        private bool UpdatePriorityHandler(ref double priority, uint localID)
+        private bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity)
         {
-            EntityBase entity;
-            if (m_scene.Entities.TryGetValue(localID, out entity))
+            if (entity != null)
             {
                 priority = m_prioritizer.GetUpdatePriority(this, entity);
+                return true;
             }
 
-            return priority != double.NaN;
+            return false;
         }
 
         public void FlushPrimUpdates()
         {
-            m_log.Debug("[CLIENT]: Flushing prim updates to " + m_firstName + " " + m_lastName);
+            m_log.WarnFormat("[CLIENT]: Flushing prim updates to " + m_firstName + " " + m_lastName);
 
             while (m_entityUpdates.Count > 0)
                 ProcessEntityUpdates(-1);
@@ -11704,86 +11783,85 @@ namespace OpenSim.Region.ClientStack.LindenUDP
         #region PriorityQueue
         public class PriorityQueue
         {
-            internal delegate bool UpdatePriorityHandler(ref double priority, uint local_id);
+            internal delegate bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity);
+
+            // Heap[0] for self updates
+            // Heap[1..12] for entity updates
 
-            private MinHeap<MinHeapItem>[] m_heaps = new MinHeap<MinHeapItem>[1];
+            internal const uint m_numberOfQueues = 12;
+            private MinHeap<MinHeapItem>[] m_heaps = new MinHeap<MinHeapItem>[m_numberOfQueues];
             private Dictionary<uint, LookupItem> m_lookupTable;
-            private Comparison<double> m_comparison;
             private object m_syncRoot = new object();
-
+            private uint m_nextQueue = 0;
+            private UInt64 m_nextRequest = 0;
+            
             internal PriorityQueue() :
-                this(MinHeap<MinHeapItem>.DEFAULT_CAPACITY, Comparer<double>.Default) { }
-            internal PriorityQueue(int capacity) :
-                this(capacity, Comparer<double>.Default) { }
-            internal PriorityQueue(IComparer<double> comparer) :
-                this(new Comparison<double>(comparer.Compare)) { }
-            internal PriorityQueue(Comparison<double> comparison) :
-                this(MinHeap<MinHeapItem>.DEFAULT_CAPACITY, comparison) { }
-            internal PriorityQueue(int capacity, IComparer<double> comparer) :
-                this(capacity, new Comparison<double>(comparer.Compare)) { }
-            internal PriorityQueue(int capacity, Comparison<double> comparison)
+                this(MinHeap<MinHeapItem>.DEFAULT_CAPACITY) { }
+            internal PriorityQueue(int capacity)
             {
                 m_lookupTable = new Dictionary<uint, LookupItem>(capacity);
 
                 for (int i = 0; i < m_heaps.Length; ++i)
                     m_heaps[i] = new MinHeap<MinHeapItem>(capacity);
-                this.m_comparison = comparison;
             }
 
             public object SyncRoot { get { return this.m_syncRoot; } }
+
             internal int Count
             {
                 get
                 {
                     int count = 0;
                     for (int i = 0; i < m_heaps.Length; ++i)
-                        count = m_heaps[i].Count;
+                        count += m_heaps[i].Count;
                     return count;
                 }
             }
 
-            public bool Enqueue(double priority, EntityUpdate value, uint local_id)
+            public bool Enqueue(uint pqueue, EntityUpdate value)
             {
-                LookupItem item;
+                LookupItem lookup;
 
-                if (m_lookupTable.TryGetValue(local_id, out item))
+                uint localid = value.Entity.LocalId;
+                UInt64 entry = m_nextRequest++;
+                if (m_lookupTable.TryGetValue(localid, out lookup))
                 {
-                    // Combine flags
-                    value.Flags |= item.Heap[item.Handle].Value.Flags;
-
-                    item.Heap[item.Handle] = new MinHeapItem(priority, value, local_id, this.m_comparison);
-                    return false;
-                }
-                else
-                {
-                    item.Heap = m_heaps[0];
-                    item.Heap.Add(new MinHeapItem(priority, value, local_id, this.m_comparison), ref item.Handle);
-                    m_lookupTable.Add(local_id, item);
-                    return true;
+                    entry = lookup.Heap[lookup.Handle].EntryOrder;
+                    value.Flags |=  lookup.Heap[lookup.Handle].Value.Flags;
+                    lookup.Heap.Remove(lookup.Handle);
                 }
-            }
 
-            internal EntityUpdate Peek()
-            {
-                for (int i = 0; i < m_heaps.Length; ++i)
-                    if (m_heaps[i].Count > 0)
-                        return m_heaps[i].Min().Value;
-                throw new InvalidOperationException(string.Format("The {0} is empty", this.GetType().ToString()));
+                pqueue = Util.Clamp<uint>(pqueue, 0, m_numberOfQueues - 1);
+                lookup.Heap = m_heaps[pqueue];
+                lookup.Heap.Add(new MinHeapItem(pqueue, entry, value), ref lookup.Handle);
+                m_lookupTable[localid] = lookup;
+
+                return true;
             }
 
-            internal bool TryDequeue(out EntityUpdate value)
+            internal bool TryDequeue(out EntityUpdate value, out Int32 timeinqueue)
             {
-                for (int i = 0; i < m_heaps.Length; ++i)
+                for (int i = 0; i < m_numberOfQueues; ++i)
                 {
-                    if (m_heaps[i].Count > 0)
+                    // To get the fair queing, we cycle through each of the
+                    // queues when finding an element to dequeue, this code
+                    // assumes that the distribution of updates in the queues
+                    // is polynomial, probably quadractic (eg distance of PI * R^2)
+                    uint h = (uint)((m_nextQueue + i) % m_numberOfQueues);
+                    if (m_heaps[h].Count > 0)
                     {
-                        MinHeapItem item = m_heaps[i].RemoveMin();
-                        m_lookupTable.Remove(item.LocalID);
+                        m_nextQueue = (uint)((h + 1) % m_numberOfQueues);
+
+                        MinHeapItem item = m_heaps[h].RemoveMin();
+                        m_lookupTable.Remove(item.Value.Entity.LocalId);
+                        timeinqueue = Util.EnvironmentTickCountSubtract(item.EntryTime);
                         value = item.Value;
+                        
                         return true;
                     }
                 }
 
+                timeinqueue = 0;
                 value = default(EntityUpdate);
                 return false;
             }
@@ -11791,68 +11869,107 @@ namespace OpenSim.Region.ClientStack.LindenUDP
             internal void Reprioritize(UpdatePriorityHandler handler)
             {
                 MinHeapItem item;
-                double priority;
-
                 foreach (LookupItem lookup in new List<LookupItem>(this.m_lookupTable.Values))
                 {
                     if (lookup.Heap.TryGetValue(lookup.Handle, out item))
                     {
-                        priority = item.Priority;
-                        if (handler(ref priority, item.LocalID))
+                        uint pqueue = item.PriorityQueue;
+                        uint localid = item.Value.Entity.LocalId;
+
+                        if (handler(ref pqueue, item.Value.Entity))
                         {
-                            if (lookup.Heap.ContainsHandle(lookup.Handle))
-                                lookup.Heap[lookup.Handle] =
-                                    new MinHeapItem(priority, item.Value, item.LocalID, this.m_comparison);
+                            // unless the priority queue has changed, there is no need to modify
+                            // the entry
+                            pqueue = Util.Clamp<uint>(pqueue, 0, m_numberOfQueues - 1);
+                            if (pqueue != item.PriorityQueue)
+                            {
+                                lookup.Heap.Remove(lookup.Handle);
+
+                                LookupItem litem = lookup;
+                                litem.Heap = m_heaps[pqueue];
+                                litem.Heap.Add(new MinHeapItem(pqueue, item), ref litem.Handle);
+                                m_lookupTable[localid] = litem;
+                            }
                         }
                         else
                         {
-                            m_log.Warn("[LLCLIENTVIEW]: UpdatePriorityHandler returned false, dropping update");
+                            m_log.WarnFormat("[LLCLIENTVIEW]: UpdatePriorityHandler returned false for {0}",item.Value.Entity.UUID);
                             lookup.Heap.Remove(lookup.Handle);
-                            this.m_lookupTable.Remove(item.LocalID);
+                            this.m_lookupTable.Remove(localid);
                         }
                     }
                 }
             }
 
+            public override string ToString()
+            {
+                string s = "";
+                for (int i = 0; i < m_numberOfQueues; i++)
+                {
+                    if (s != "") s += ",";
+                    s += m_heaps[i].Count.ToString();
+                }
+                return s;
+            }
+            
             #region MinHeapItem
             private struct MinHeapItem : IComparable<MinHeapItem>
             {
-                private double priority;
                 private EntityUpdate value;
-                private uint local_id;
-                private Comparison<double> comparison;
+                internal EntityUpdate Value {
+                    get {
+                        return this.value;
+                    }
+                }
+
+                private uint pqueue;
+                internal uint PriorityQueue {
+                    get {
+                        return this.pqueue;
+                    }
+                }
 
-                internal MinHeapItem(double priority, EntityUpdate value, uint local_id) :
-                    this(priority, value, local_id, Comparer<double>.Default) { }
-                internal MinHeapItem(double priority, EntityUpdate value, uint local_id, IComparer<double> comparer) :
-                    this(priority, value, local_id, new Comparison<double>(comparer.Compare)) { }
-                internal MinHeapItem(double priority, EntityUpdate value, uint local_id, Comparison<double> comparison)
+                private Int32 entrytime;
+                internal Int32 EntryTime {
+                    get {
+                        return this.entrytime;
+                    }
+                }
+
+                private UInt64 entryorder;
+                internal UInt64 EntryOrder 
                 {
-                    this.priority = priority;
+                    get {
+                        return this.entryorder;
+                    }
+                }
+                
+                internal MinHeapItem(uint pqueue, MinHeapItem other)
+                {
+                    this.entrytime = other.entrytime;
+                    this.entryorder = other.entryorder;
+                    this.value = other.value;
+                    this.pqueue = pqueue;
+                }
+                
+                internal MinHeapItem(uint pqueue, UInt64 entryorder, EntityUpdate value)
+                {
+                    this.entrytime = Util.EnvironmentTickCount();
+                    this.entryorder = entryorder;
                     this.value = value;
-                    this.local_id = local_id;
-                    this.comparison = comparison;
+                    this.pqueue = pqueue;
                 }
 
-                internal double Priority { get { return this.priority; } }
-                internal EntityUpdate Value { get { return this.value; } }
-                internal uint LocalID { get { return this.local_id; } }
-
                 public override string ToString()
                 {
-                    StringBuilder sb = new StringBuilder();
-                    sb.Append("[");
-                    sb.Append(this.priority.ToString());
-                    sb.Append(",");
-                    if (this.value != null)
-                        sb.Append(this.value.ToString());
-                    sb.Append("]");
-                    return sb.ToString();
+                    return String.Format("[{0},{1},{2}]",pqueue,entryorder,value.Entity.LocalId);
                 }
 
                 public int CompareTo(MinHeapItem other)
                 {
-                    return this.comparison(this.priority, other.priority);
+                    // I'm assuming that the root part of an SOG is added to the update queue
+                    // before the component parts
+                    return Comparer<UInt64>.Default.Compare(this.EntryOrder, other.EntryOrder);
                 }
             }
             #endregion
-- 
cgit v1.1