From 6885b7220c6f782034d7c0220762244adf00e3d3 Mon Sep 17 00:00:00 2001 From: Mic Bowman Date: Mon, 28 Mar 2011 10:00:53 -0700 Subject: Implements adaptive queue management and fair queueing for improved networking performance. Reprioritization algorithms need to be ported still. One is in place. --- .../Region/ClientStack/LindenUDP/LLClientView.cs | 353 ++++++++++++++------- 1 file changed, 235 insertions(+), 118 deletions(-) (limited to 'OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs') diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs index 2faffae..e9e1fa3 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs @@ -49,6 +49,8 @@ using Timer = System.Timers.Timer; using AssetLandmark = OpenSim.Framework.AssetLandmark; using Nini.Config; +using System.IO; + namespace OpenSim.Region.ClientStack.LindenUDP { public delegate bool PacketMethod(IClientAPI simClient, Packet packet); @@ -298,6 +300,77 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Used to adjust Sun Orbit values so Linden based viewers properly position sun private const float m_sunPainDaHalfOrbitalCutoff = 4.712388980384689858f; + // First log file or time has expired, start writing to a new log file +// +// ----------------------------------------------------------------- +// ----------------------------------------------------------------- +// THIS IS DEBUGGING CODE & SHOULD BE REMOVED +// ----------------------------------------------------------------- +// ----------------------------------------------------------------- + public class QueueLogger + { + public Int32 start = 0; + public StreamWriter Log = null; + private Dictionary m_idMap = new Dictionary(); + + public QueueLogger() + { + DateTime now = DateTime.Now; + String fname = String.Format("queue-{0}.log", now.ToString("yyyyMMddHHmmss")); + Log = new StreamWriter(fname); + + start = Util.EnvironmentTickCount(); + } + + public int LookupID(UUID uuid) + { + int localid; + if (! m_idMap.TryGetValue(uuid,out localid)) + { + localid = m_idMap.Count + 1; + m_idMap[uuid] = localid; + } + + return localid; + } + } + + public static QueueLogger QueueLog = null; + + // ----------------------------------------------------------------- + public void LogAvatarUpdateEvent(UUID client, UUID avatar, Int32 timeinqueue) + { + if (QueueLog == null) + QueueLog = new QueueLogger(); + + Int32 ticks = Util.EnvironmentTickCountSubtract(QueueLog.start); + lock(QueueLog) + { + int cid = QueueLog.LookupID(client); + int aid = QueueLog.LookupID(avatar); + QueueLog.Log.WriteLine("{0},AU,AV{1:D4},AV{2:D4},{3}",ticks,cid,aid,timeinqueue); + } + } + + // ----------------------------------------------------------------- + public void LogQueueProcessEvent(UUID client, PriorityQueue queue, uint maxup) + { + if (QueueLog == null) + QueueLog = new QueueLogger(); + + Int32 ticks = Util.EnvironmentTickCountSubtract(QueueLog.start); + lock(QueueLog) + { + int cid = QueueLog.LookupID(client); + QueueLog.Log.WriteLine("{0},PQ,AV{1:D4},{2},{3}",ticks,cid,maxup,queue.ToString()); + } + } +// ----------------------------------------------------------------- +// ----------------------------------------------------------------- +// ----------------------------------------------------------------- +// ----------------------------------------------------------------- +// + private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); protected static Dictionary PacketHandlers = new Dictionary(); //Global/static handlers for all clients @@ -3547,18 +3620,23 @@ namespace OpenSim.Region.ClientStack.LindenUDP #region Primitive Packet/Data Sending Methods + /// /// Generate one of the object update packets based on PrimUpdateFlags /// and broadcast the packet to clients /// public void SendPrimUpdate(ISceneEntity entity, PrimUpdateFlags updateFlags) { - double priority = m_prioritizer.GetUpdatePriority(this, entity); + //double priority = m_prioritizer.GetUpdatePriority(this, entity); + uint priority = m_prioritizer.GetUpdatePriority(this, entity); lock (m_entityUpdates.SyncRoot) - m_entityUpdates.Enqueue(priority, new EntityUpdate(entity, updateFlags, m_scene.TimeDilation), entity.LocalId); + m_entityUpdates.Enqueue(priority, new EntityUpdate(entity, updateFlags, m_scene.TimeDilation)); } + private Int32 m_LastQueueFill = 0; + private uint m_maxUpdates = 0; + private void ProcessEntityUpdates(int maxUpdates) { OpenSim.Framework.Lazy> objectUpdateBlocks = new OpenSim.Framework.Lazy>(); @@ -3566,23 +3644,55 @@ namespace OpenSim.Region.ClientStack.LindenUDP OpenSim.Framework.Lazy> terseUpdateBlocks = new OpenSim.Framework.Lazy>(); OpenSim.Framework.Lazy> terseAgentUpdateBlocks = new OpenSim.Framework.Lazy>(); - if (maxUpdates <= 0) maxUpdates = Int32.MaxValue; + if (maxUpdates <= 0) + { + m_maxUpdates = Int32.MaxValue; + } + else + { + if (m_maxUpdates == 0 || m_LastQueueFill == 0) + { + m_maxUpdates = (uint)maxUpdates; + } + else + { + if (Util.EnvironmentTickCountSubtract(m_LastQueueFill) < 200) + m_maxUpdates += 5; + else + m_maxUpdates = m_maxUpdates >> 1; + } + m_maxUpdates = Util.Clamp(m_maxUpdates,10,500); + } + m_LastQueueFill = Util.EnvironmentTickCount(); + int updatesThisCall = 0; +// +// DEBUGGING CODE... REMOVE +// LogQueueProcessEvent(this.m_agentId,m_entityUpdates,m_maxUpdates); +// // We must lock for both manipulating the kill record and sending the packet, in order to avoid a race // condition where a kill can be processed before an out-of-date update for the same object. lock (m_killRecord) { float avgTimeDilation = 1.0f; EntityUpdate update; - while (updatesThisCall < maxUpdates) + Int32 timeinqueue; // this is just debugging code & can be dropped later + + while (updatesThisCall < m_maxUpdates) { lock (m_entityUpdates.SyncRoot) - if (!m_entityUpdates.TryDequeue(out update)) + if (!m_entityUpdates.TryDequeue(out update, out timeinqueue)) break; avgTimeDilation += update.TimeDilation; avgTimeDilation *= 0.5f; +// +// DEBUGGING CODE... REMOVE +// if (update.Entity is ScenePresence) +// LogAvatarUpdateEvent(this.m_agentId,update.Entity.UUID,timeinqueue); +// + if (update.Entity is SceneObjectPart) { SceneObjectPart part = (SceneObjectPart)update.Entity; @@ -3679,36 +3789,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP } else { - // if (update.Entity is SceneObjectPart && ((SceneObjectPart)update.Entity).IsAttachment) - // { - // SceneObjectPart sop = (SceneObjectPart)update.Entity; - // string text = sop.Text; - // if (text.IndexOf("\n") >= 0) - // text = text.Remove(text.IndexOf("\n")); - // - // if (m_attachmentsSent.Contains(sop.ParentID)) - // { - //// m_log.DebugFormat( - //// "[CLIENT]: Sending full info about attached prim {0} text {1}", - //// sop.LocalId, text); - // - // objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock(sop, this.m_agentId)); - // - // m_attachmentsSent.Add(sop.LocalId); - // } - // else - // { - // m_log.DebugFormat( - // "[CLIENT]: Requeueing full update of prim {0} text {1} since we haven't sent its parent {2} yet", - // sop.LocalId, text, sop.ParentID); - // - // m_entityUpdates.Enqueue(double.MaxValue, update, sop.LocalId); - // } - // } - // else - // { - objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock((SceneObjectPart)update.Entity, this.m_agentId)); - // } + objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock((SceneObjectPart)update.Entity, this.m_agentId)); } } else if (!canUseImproved) @@ -3802,26 +3883,24 @@ namespace OpenSim.Region.ClientStack.LindenUDP public void ReprioritizeUpdates() { - //m_log.Debug("[CLIENT]: Reprioritizing prim updates for " + m_firstName + " " + m_lastName); - lock (m_entityUpdates.SyncRoot) m_entityUpdates.Reprioritize(UpdatePriorityHandler); } - private bool UpdatePriorityHandler(ref double priority, uint localID) + private bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity) { - EntityBase entity; - if (m_scene.Entities.TryGetValue(localID, out entity)) + if (entity != null) { priority = m_prioritizer.GetUpdatePriority(this, entity); + return true; } - return priority != double.NaN; + return false; } public void FlushPrimUpdates() { - m_log.Debug("[CLIENT]: Flushing prim updates to " + m_firstName + " " + m_lastName); + m_log.WarnFormat("[CLIENT]: Flushing prim updates to " + m_firstName + " " + m_lastName); while (m_entityUpdates.Count > 0) ProcessEntityUpdates(-1); @@ -11713,86 +11792,85 @@ namespace OpenSim.Region.ClientStack.LindenUDP #region PriorityQueue public class PriorityQueue { - internal delegate bool UpdatePriorityHandler(ref double priority, uint local_id); + internal delegate bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity); + + // Heap[0] for self updates + // Heap[1..12] for entity updates - private MinHeap[] m_heaps = new MinHeap[1]; + internal const uint m_numberOfQueues = 12; + private MinHeap[] m_heaps = new MinHeap[m_numberOfQueues]; private Dictionary m_lookupTable; - private Comparison m_comparison; private object m_syncRoot = new object(); - + private uint m_nextQueue = 0; + private UInt64 m_nextRequest = 0; + internal PriorityQueue() : - this(MinHeap.DEFAULT_CAPACITY, Comparer.Default) { } - internal PriorityQueue(int capacity) : - this(capacity, Comparer.Default) { } - internal PriorityQueue(IComparer comparer) : - this(new Comparison(comparer.Compare)) { } - internal PriorityQueue(Comparison comparison) : - this(MinHeap.DEFAULT_CAPACITY, comparison) { } - internal PriorityQueue(int capacity, IComparer comparer) : - this(capacity, new Comparison(comparer.Compare)) { } - internal PriorityQueue(int capacity, Comparison comparison) + this(MinHeap.DEFAULT_CAPACITY) { } + internal PriorityQueue(int capacity) { m_lookupTable = new Dictionary(capacity); for (int i = 0; i < m_heaps.Length; ++i) m_heaps[i] = new MinHeap(capacity); - this.m_comparison = comparison; } public object SyncRoot { get { return this.m_syncRoot; } } + internal int Count { get { int count = 0; for (int i = 0; i < m_heaps.Length; ++i) - count = m_heaps[i].Count; + count += m_heaps[i].Count; return count; } } - public bool Enqueue(double priority, EntityUpdate value, uint local_id) + public bool Enqueue(uint pqueue, EntityUpdate value) { - LookupItem item; + LookupItem lookup; - if (m_lookupTable.TryGetValue(local_id, out item)) + uint localid = value.Entity.LocalId; + UInt64 entry = m_nextRequest++; + if (m_lookupTable.TryGetValue(localid, out lookup)) { - // Combine flags - value.Flags |= item.Heap[item.Handle].Value.Flags; - - item.Heap[item.Handle] = new MinHeapItem(priority, value, local_id, this.m_comparison); - return false; - } - else - { - item.Heap = m_heaps[0]; - item.Heap.Add(new MinHeapItem(priority, value, local_id, this.m_comparison), ref item.Handle); - m_lookupTable.Add(local_id, item); - return true; + entry = lookup.Heap[lookup.Handle].EntryOrder; + value.Flags |= lookup.Heap[lookup.Handle].Value.Flags; + lookup.Heap.Remove(lookup.Handle); } - } - internal EntityUpdate Peek() - { - for (int i = 0; i < m_heaps.Length; ++i) - if (m_heaps[i].Count > 0) - return m_heaps[i].Min().Value; - throw new InvalidOperationException(string.Format("The {0} is empty", this.GetType().ToString())); + pqueue = Util.Clamp(pqueue, 0, m_numberOfQueues - 1); + lookup.Heap = m_heaps[pqueue]; + lookup.Heap.Add(new MinHeapItem(pqueue, entry, value), ref lookup.Handle); + m_lookupTable[localid] = lookup; + + return true; } - internal bool TryDequeue(out EntityUpdate value) + internal bool TryDequeue(out EntityUpdate value, out Int32 timeinqueue) { - for (int i = 0; i < m_heaps.Length; ++i) + for (int i = 0; i < m_numberOfQueues; ++i) { - if (m_heaps[i].Count > 0) + // To get the fair queing, we cycle through each of the + // queues when finding an element to dequeue, this code + // assumes that the distribution of updates in the queues + // is polynomial, probably quadractic (eg distance of PI * R^2) + uint h = (uint)((m_nextQueue + i) % m_numberOfQueues); + if (m_heaps[h].Count > 0) { - MinHeapItem item = m_heaps[i].RemoveMin(); - m_lookupTable.Remove(item.LocalID); + m_nextQueue = (uint)((h + 1) % m_numberOfQueues); + + MinHeapItem item = m_heaps[h].RemoveMin(); + m_lookupTable.Remove(item.Value.Entity.LocalId); + timeinqueue = Util.EnvironmentTickCountSubtract(item.EntryTime); value = item.Value; + return true; } } + timeinqueue = 0; value = default(EntityUpdate); return false; } @@ -11800,68 +11878,107 @@ namespace OpenSim.Region.ClientStack.LindenUDP internal void Reprioritize(UpdatePriorityHandler handler) { MinHeapItem item; - double priority; - foreach (LookupItem lookup in new List(this.m_lookupTable.Values)) { if (lookup.Heap.TryGetValue(lookup.Handle, out item)) { - priority = item.Priority; - if (handler(ref priority, item.LocalID)) + uint pqueue = item.PriorityQueue; + uint localid = item.Value.Entity.LocalId; + + if (handler(ref pqueue, item.Value.Entity)) { - if (lookup.Heap.ContainsHandle(lookup.Handle)) - lookup.Heap[lookup.Handle] = - new MinHeapItem(priority, item.Value, item.LocalID, this.m_comparison); + // unless the priority queue has changed, there is no need to modify + // the entry + pqueue = Util.Clamp(pqueue, 0, m_numberOfQueues - 1); + if (pqueue != item.PriorityQueue) + { + lookup.Heap.Remove(lookup.Handle); + + LookupItem litem = lookup; + litem.Heap = m_heaps[pqueue]; + litem.Heap.Add(new MinHeapItem(pqueue, item), ref litem.Handle); + m_lookupTable[localid] = litem; + } } else { - m_log.Warn("[LLCLIENTVIEW]: UpdatePriorityHandler returned false, dropping update"); + m_log.WarnFormat("[LLCLIENTVIEW]: UpdatePriorityHandler returned false for {0}",item.Value.Entity.UUID); lookup.Heap.Remove(lookup.Handle); - this.m_lookupTable.Remove(item.LocalID); + this.m_lookupTable.Remove(localid); } } } } + public override string ToString() + { + string s = ""; + for (int i = 0; i < m_numberOfQueues; i++) + { + if (s != "") s += ","; + s += m_heaps[i].Count.ToString(); + } + return s; + } + #region MinHeapItem private struct MinHeapItem : IComparable { - private double priority; private EntityUpdate value; - private uint local_id; - private Comparison comparison; + internal EntityUpdate Value { + get { + return this.value; + } + } + + private uint pqueue; + internal uint PriorityQueue { + get { + return this.pqueue; + } + } - internal MinHeapItem(double priority, EntityUpdate value, uint local_id) : - this(priority, value, local_id, Comparer.Default) { } - internal MinHeapItem(double priority, EntityUpdate value, uint local_id, IComparer comparer) : - this(priority, value, local_id, new Comparison(comparer.Compare)) { } - internal MinHeapItem(double priority, EntityUpdate value, uint local_id, Comparison comparison) + private Int32 entrytime; + internal Int32 EntryTime { + get { + return this.entrytime; + } + } + + private UInt64 entryorder; + internal UInt64 EntryOrder { - this.priority = priority; + get { + return this.entryorder; + } + } + + internal MinHeapItem(uint pqueue, MinHeapItem other) + { + this.entrytime = other.entrytime; + this.entryorder = other.entryorder; + this.value = other.value; + this.pqueue = pqueue; + } + + internal MinHeapItem(uint pqueue, UInt64 entryorder, EntityUpdate value) + { + this.entrytime = Util.EnvironmentTickCount(); + this.entryorder = entryorder; this.value = value; - this.local_id = local_id; - this.comparison = comparison; + this.pqueue = pqueue; } - internal double Priority { get { return this.priority; } } - internal EntityUpdate Value { get { return this.value; } } - internal uint LocalID { get { return this.local_id; } } - public override string ToString() { - StringBuilder sb = new StringBuilder(); - sb.Append("["); - sb.Append(this.priority.ToString()); - sb.Append(","); - if (this.value != null) - sb.Append(this.value.ToString()); - sb.Append("]"); - return sb.ToString(); + return String.Format("[{0},{1},{2}]",pqueue,entryorder,value.Entity.LocalId); } public int CompareTo(MinHeapItem other) { - return this.comparison(this.priority, other.priority); + // I'm assuming that the root part of an SOG is added to the update queue + // before the component parts + return Comparer.Default.Compare(this.EntryOrder, other.EntryOrder); } } #endregion -- cgit v1.1