From 6885b7220c6f782034d7c0220762244adf00e3d3 Mon Sep 17 00:00:00 2001 From: Mic Bowman Date: Mon, 28 Mar 2011 10:00:53 -0700 Subject: Implements adaptive queue management and fair queueing for improved networking performance. Reprioritization algorithms need to be ported still. One is in place. --- .../Region/ClientStack/LindenUDP/LLClientView.cs | 353 ++++++++++++++------- 1 file changed, 235 insertions(+), 118 deletions(-) (limited to 'OpenSim/Region/ClientStack/LindenUDP') diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs index 2faffae..e9e1fa3 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs @@ -49,6 +49,8 @@ using Timer = System.Timers.Timer; using AssetLandmark = OpenSim.Framework.AssetLandmark; using Nini.Config; +using System.IO; + namespace OpenSim.Region.ClientStack.LindenUDP { public delegate bool PacketMethod(IClientAPI simClient, Packet packet); @@ -298,6 +300,77 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Used to adjust Sun Orbit values so Linden based viewers properly position sun private const float m_sunPainDaHalfOrbitalCutoff = 4.712388980384689858f; + // First log file or time has expired, start writing to a new log file +// +// ----------------------------------------------------------------- +// ----------------------------------------------------------------- +// THIS IS DEBUGGING CODE & SHOULD BE REMOVED +// ----------------------------------------------------------------- +// ----------------------------------------------------------------- + public class QueueLogger + { + public Int32 start = 0; + public StreamWriter Log = null; + private Dictionary m_idMap = new Dictionary(); + + public QueueLogger() + { + DateTime now = DateTime.Now; + String fname = String.Format("queue-{0}.log", now.ToString("yyyyMMddHHmmss")); + Log = new StreamWriter(fname); + + start = Util.EnvironmentTickCount(); + } + + public int LookupID(UUID uuid) + { + int localid; + if (! m_idMap.TryGetValue(uuid,out localid)) + { + localid = m_idMap.Count + 1; + m_idMap[uuid] = localid; + } + + return localid; + } + } + + public static QueueLogger QueueLog = null; + + // ----------------------------------------------------------------- + public void LogAvatarUpdateEvent(UUID client, UUID avatar, Int32 timeinqueue) + { + if (QueueLog == null) + QueueLog = new QueueLogger(); + + Int32 ticks = Util.EnvironmentTickCountSubtract(QueueLog.start); + lock(QueueLog) + { + int cid = QueueLog.LookupID(client); + int aid = QueueLog.LookupID(avatar); + QueueLog.Log.WriteLine("{0},AU,AV{1:D4},AV{2:D4},{3}",ticks,cid,aid,timeinqueue); + } + } + + // ----------------------------------------------------------------- + public void LogQueueProcessEvent(UUID client, PriorityQueue queue, uint maxup) + { + if (QueueLog == null) + QueueLog = new QueueLogger(); + + Int32 ticks = Util.EnvironmentTickCountSubtract(QueueLog.start); + lock(QueueLog) + { + int cid = QueueLog.LookupID(client); + QueueLog.Log.WriteLine("{0},PQ,AV{1:D4},{2},{3}",ticks,cid,maxup,queue.ToString()); + } + } +// ----------------------------------------------------------------- +// ----------------------------------------------------------------- +// ----------------------------------------------------------------- +// ----------------------------------------------------------------- +// + private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); protected static Dictionary PacketHandlers = new Dictionary(); //Global/static handlers for all clients @@ -3547,18 +3620,23 @@ namespace OpenSim.Region.ClientStack.LindenUDP #region Primitive Packet/Data Sending Methods + /// /// Generate one of the object update packets based on PrimUpdateFlags /// and broadcast the packet to clients /// public void SendPrimUpdate(ISceneEntity entity, PrimUpdateFlags updateFlags) { - double priority = m_prioritizer.GetUpdatePriority(this, entity); + //double priority = m_prioritizer.GetUpdatePriority(this, entity); + uint priority = m_prioritizer.GetUpdatePriority(this, entity); lock (m_entityUpdates.SyncRoot) - m_entityUpdates.Enqueue(priority, new EntityUpdate(entity, updateFlags, m_scene.TimeDilation), entity.LocalId); + m_entityUpdates.Enqueue(priority, new EntityUpdate(entity, updateFlags, m_scene.TimeDilation)); } + private Int32 m_LastQueueFill = 0; + private uint m_maxUpdates = 0; + private void ProcessEntityUpdates(int maxUpdates) { OpenSim.Framework.Lazy> objectUpdateBlocks = new OpenSim.Framework.Lazy>(); @@ -3566,23 +3644,55 @@ namespace OpenSim.Region.ClientStack.LindenUDP OpenSim.Framework.Lazy> terseUpdateBlocks = new OpenSim.Framework.Lazy>(); OpenSim.Framework.Lazy> terseAgentUpdateBlocks = new OpenSim.Framework.Lazy>(); - if (maxUpdates <= 0) maxUpdates = Int32.MaxValue; + if (maxUpdates <= 0) + { + m_maxUpdates = Int32.MaxValue; + } + else + { + if (m_maxUpdates == 0 || m_LastQueueFill == 0) + { + m_maxUpdates = (uint)maxUpdates; + } + else + { + if (Util.EnvironmentTickCountSubtract(m_LastQueueFill) < 200) + m_maxUpdates += 5; + else + m_maxUpdates = m_maxUpdates >> 1; + } + m_maxUpdates = Util.Clamp(m_maxUpdates,10,500); + } + m_LastQueueFill = Util.EnvironmentTickCount(); + int updatesThisCall = 0; +// +// DEBUGGING CODE... REMOVE +// LogQueueProcessEvent(this.m_agentId,m_entityUpdates,m_maxUpdates); +// // We must lock for both manipulating the kill record and sending the packet, in order to avoid a race // condition where a kill can be processed before an out-of-date update for the same object. lock (m_killRecord) { float avgTimeDilation = 1.0f; EntityUpdate update; - while (updatesThisCall < maxUpdates) + Int32 timeinqueue; // this is just debugging code & can be dropped later + + while (updatesThisCall < m_maxUpdates) { lock (m_entityUpdates.SyncRoot) - if (!m_entityUpdates.TryDequeue(out update)) + if (!m_entityUpdates.TryDequeue(out update, out timeinqueue)) break; avgTimeDilation += update.TimeDilation; avgTimeDilation *= 0.5f; +// +// DEBUGGING CODE... REMOVE +// if (update.Entity is ScenePresence) +// LogAvatarUpdateEvent(this.m_agentId,update.Entity.UUID,timeinqueue); +// + if (update.Entity is SceneObjectPart) { SceneObjectPart part = (SceneObjectPart)update.Entity; @@ -3679,36 +3789,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP } else { - // if (update.Entity is SceneObjectPart && ((SceneObjectPart)update.Entity).IsAttachment) - // { - // SceneObjectPart sop = (SceneObjectPart)update.Entity; - // string text = sop.Text; - // if (text.IndexOf("\n") >= 0) - // text = text.Remove(text.IndexOf("\n")); - // - // if (m_attachmentsSent.Contains(sop.ParentID)) - // { - //// m_log.DebugFormat( - //// "[CLIENT]: Sending full info about attached prim {0} text {1}", - //// sop.LocalId, text); - // - // objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock(sop, this.m_agentId)); - // - // m_attachmentsSent.Add(sop.LocalId); - // } - // else - // { - // m_log.DebugFormat( - // "[CLIENT]: Requeueing full update of prim {0} text {1} since we haven't sent its parent {2} yet", - // sop.LocalId, text, sop.ParentID); - // - // m_entityUpdates.Enqueue(double.MaxValue, update, sop.LocalId); - // } - // } - // else - // { - objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock((SceneObjectPart)update.Entity, this.m_agentId)); - // } + objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock((SceneObjectPart)update.Entity, this.m_agentId)); } } else if (!canUseImproved) @@ -3802,26 +3883,24 @@ namespace OpenSim.Region.ClientStack.LindenUDP public void ReprioritizeUpdates() { - //m_log.Debug("[CLIENT]: Reprioritizing prim updates for " + m_firstName + " " + m_lastName); - lock (m_entityUpdates.SyncRoot) m_entityUpdates.Reprioritize(UpdatePriorityHandler); } - private bool UpdatePriorityHandler(ref double priority, uint localID) + private bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity) { - EntityBase entity; - if (m_scene.Entities.TryGetValue(localID, out entity)) + if (entity != null) { priority = m_prioritizer.GetUpdatePriority(this, entity); + return true; } - return priority != double.NaN; + return false; } public void FlushPrimUpdates() { - m_log.Debug("[CLIENT]: Flushing prim updates to " + m_firstName + " " + m_lastName); + m_log.WarnFormat("[CLIENT]: Flushing prim updates to " + m_firstName + " " + m_lastName); while (m_entityUpdates.Count > 0) ProcessEntityUpdates(-1); @@ -11713,86 +11792,85 @@ namespace OpenSim.Region.ClientStack.LindenUDP #region PriorityQueue public class PriorityQueue { - internal delegate bool UpdatePriorityHandler(ref double priority, uint local_id); + internal delegate bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity); + + // Heap[0] for self updates + // Heap[1..12] for entity updates - private MinHeap[] m_heaps = new MinHeap[1]; + internal const uint m_numberOfQueues = 12; + private MinHeap[] m_heaps = new MinHeap[m_numberOfQueues]; private Dictionary m_lookupTable; - private Comparison m_comparison; private object m_syncRoot = new object(); - + private uint m_nextQueue = 0; + private UInt64 m_nextRequest = 0; + internal PriorityQueue() : - this(MinHeap.DEFAULT_CAPACITY, Comparer.Default) { } - internal PriorityQueue(int capacity) : - this(capacity, Comparer.Default) { } - internal PriorityQueue(IComparer comparer) : - this(new Comparison(comparer.Compare)) { } - internal PriorityQueue(Comparison comparison) : - this(MinHeap.DEFAULT_CAPACITY, comparison) { } - internal PriorityQueue(int capacity, IComparer comparer) : - this(capacity, new Comparison(comparer.Compare)) { } - internal PriorityQueue(int capacity, Comparison comparison) + this(MinHeap.DEFAULT_CAPACITY) { } + internal PriorityQueue(int capacity) { m_lookupTable = new Dictionary(capacity); for (int i = 0; i < m_heaps.Length; ++i) m_heaps[i] = new MinHeap(capacity); - this.m_comparison = comparison; } public object SyncRoot { get { return this.m_syncRoot; } } + internal int Count { get { int count = 0; for (int i = 0; i < m_heaps.Length; ++i) - count = m_heaps[i].Count; + count += m_heaps[i].Count; return count; } } - public bool Enqueue(double priority, EntityUpdate value, uint local_id) + public bool Enqueue(uint pqueue, EntityUpdate value) { - LookupItem item; + LookupItem lookup; - if (m_lookupTable.TryGetValue(local_id, out item)) + uint localid = value.Entity.LocalId; + UInt64 entry = m_nextRequest++; + if (m_lookupTable.TryGetValue(localid, out lookup)) { - // Combine flags - value.Flags |= item.Heap[item.Handle].Value.Flags; - - item.Heap[item.Handle] = new MinHeapItem(priority, value, local_id, this.m_comparison); - return false; - } - else - { - item.Heap = m_heaps[0]; - item.Heap.Add(new MinHeapItem(priority, value, local_id, this.m_comparison), ref item.Handle); - m_lookupTable.Add(local_id, item); - return true; + entry = lookup.Heap[lookup.Handle].EntryOrder; + value.Flags |= lookup.Heap[lookup.Handle].Value.Flags; + lookup.Heap.Remove(lookup.Handle); } - } - internal EntityUpdate Peek() - { - for (int i = 0; i < m_heaps.Length; ++i) - if (m_heaps[i].Count > 0) - return m_heaps[i].Min().Value; - throw new InvalidOperationException(string.Format("The {0} is empty", this.GetType().ToString())); + pqueue = Util.Clamp(pqueue, 0, m_numberOfQueues - 1); + lookup.Heap = m_heaps[pqueue]; + lookup.Heap.Add(new MinHeapItem(pqueue, entry, value), ref lookup.Handle); + m_lookupTable[localid] = lookup; + + return true; } - internal bool TryDequeue(out EntityUpdate value) + internal bool TryDequeue(out EntityUpdate value, out Int32 timeinqueue) { - for (int i = 0; i < m_heaps.Length; ++i) + for (int i = 0; i < m_numberOfQueues; ++i) { - if (m_heaps[i].Count > 0) + // To get the fair queing, we cycle through each of the + // queues when finding an element to dequeue, this code + // assumes that the distribution of updates in the queues + // is polynomial, probably quadractic (eg distance of PI * R^2) + uint h = (uint)((m_nextQueue + i) % m_numberOfQueues); + if (m_heaps[h].Count > 0) { - MinHeapItem item = m_heaps[i].RemoveMin(); - m_lookupTable.Remove(item.LocalID); + m_nextQueue = (uint)((h + 1) % m_numberOfQueues); + + MinHeapItem item = m_heaps[h].RemoveMin(); + m_lookupTable.Remove(item.Value.Entity.LocalId); + timeinqueue = Util.EnvironmentTickCountSubtract(item.EntryTime); value = item.Value; + return true; } } + timeinqueue = 0; value = default(EntityUpdate); return false; } @@ -11800,68 +11878,107 @@ namespace OpenSim.Region.ClientStack.LindenUDP internal void Reprioritize(UpdatePriorityHandler handler) { MinHeapItem item; - double priority; - foreach (LookupItem lookup in new List(this.m_lookupTable.Values)) { if (lookup.Heap.TryGetValue(lookup.Handle, out item)) { - priority = item.Priority; - if (handler(ref priority, item.LocalID)) + uint pqueue = item.PriorityQueue; + uint localid = item.Value.Entity.LocalId; + + if (handler(ref pqueue, item.Value.Entity)) { - if (lookup.Heap.ContainsHandle(lookup.Handle)) - lookup.Heap[lookup.Handle] = - new MinHeapItem(priority, item.Value, item.LocalID, this.m_comparison); + // unless the priority queue has changed, there is no need to modify + // the entry + pqueue = Util.Clamp(pqueue, 0, m_numberOfQueues - 1); + if (pqueue != item.PriorityQueue) + { + lookup.Heap.Remove(lookup.Handle); + + LookupItem litem = lookup; + litem.Heap = m_heaps[pqueue]; + litem.Heap.Add(new MinHeapItem(pqueue, item), ref litem.Handle); + m_lookupTable[localid] = litem; + } } else { - m_log.Warn("[LLCLIENTVIEW]: UpdatePriorityHandler returned false, dropping update"); + m_log.WarnFormat("[LLCLIENTVIEW]: UpdatePriorityHandler returned false for {0}",item.Value.Entity.UUID); lookup.Heap.Remove(lookup.Handle); - this.m_lookupTable.Remove(item.LocalID); + this.m_lookupTable.Remove(localid); } } } } + public override string ToString() + { + string s = ""; + for (int i = 0; i < m_numberOfQueues; i++) + { + if (s != "") s += ","; + s += m_heaps[i].Count.ToString(); + } + return s; + } + #region MinHeapItem private struct MinHeapItem : IComparable { - private double priority; private EntityUpdate value; - private uint local_id; - private Comparison comparison; + internal EntityUpdate Value { + get { + return this.value; + } + } + + private uint pqueue; + internal uint PriorityQueue { + get { + return this.pqueue; + } + } - internal MinHeapItem(double priority, EntityUpdate value, uint local_id) : - this(priority, value, local_id, Comparer.Default) { } - internal MinHeapItem(double priority, EntityUpdate value, uint local_id, IComparer comparer) : - this(priority, value, local_id, new Comparison(comparer.Compare)) { } - internal MinHeapItem(double priority, EntityUpdate value, uint local_id, Comparison comparison) + private Int32 entrytime; + internal Int32 EntryTime { + get { + return this.entrytime; + } + } + + private UInt64 entryorder; + internal UInt64 EntryOrder { - this.priority = priority; + get { + return this.entryorder; + } + } + + internal MinHeapItem(uint pqueue, MinHeapItem other) + { + this.entrytime = other.entrytime; + this.entryorder = other.entryorder; + this.value = other.value; + this.pqueue = pqueue; + } + + internal MinHeapItem(uint pqueue, UInt64 entryorder, EntityUpdate value) + { + this.entrytime = Util.EnvironmentTickCount(); + this.entryorder = entryorder; this.value = value; - this.local_id = local_id; - this.comparison = comparison; + this.pqueue = pqueue; } - internal double Priority { get { return this.priority; } } - internal EntityUpdate Value { get { return this.value; } } - internal uint LocalID { get { return this.local_id; } } - public override string ToString() { - StringBuilder sb = new StringBuilder(); - sb.Append("["); - sb.Append(this.priority.ToString()); - sb.Append(","); - if (this.value != null) - sb.Append(this.value.ToString()); - sb.Append("]"); - return sb.ToString(); + return String.Format("[{0},{1},{2}]",pqueue,entryorder,value.Entity.LocalId); } public int CompareTo(MinHeapItem other) { - return this.comparison(this.priority, other.priority); + // I'm assuming that the root part of an SOG is added to the update queue + // before the component parts + return Comparer.Default.Compare(this.EntryOrder, other.EntryOrder); } } #endregion -- cgit v1.1 From 8b134f37f2c8cc7895153af2fdc79e785f3b93e2 Mon Sep 17 00:00:00 2001 From: Mic Bowman Date: Mon, 4 Apr 2011 14:18:26 -0700 Subject: Fix a bug in the computation of the RTO. Basically... the RTO (the time to wait to retransmit packets) always maxed out (no retransmissions for 24 or 48 seconds. Note that this is going to cause faster (and more) retransmissions. Fix for dynamic throttling needs to go with this. --- OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'OpenSim/Region/ClientStack/LindenUDP') diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs index 65a8fe3..9a8bfd3 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs @@ -149,7 +149,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Caches packed throttle information private byte[] m_packedThrottles; - private int m_defaultRTO = 3000; + private int m_defaultRTO = 1000; // 1sec is the recommendation in the RFC private int m_maxRTO = 60000; /// @@ -557,7 +557,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP int rto = (int)(SRTT + Math.Max(m_udpServer.TickCountResolution, K * RTTVAR)); // Clamp the retransmission timeout to manageable values - rto = Utils.Clamp(RTO, m_defaultRTO, m_maxRTO); + rto = Utils.Clamp(rto, m_defaultRTO, m_maxRTO); RTO = rto; -- cgit v1.1 From 77cf9405de10f016d85d67b6016ed27d28ed898f Mon Sep 17 00:00:00 2001 From: Mic Bowman Date: Mon, 28 Mar 2011 10:00:53 -0700 Subject: Implements adaptive queue management and fair queueing for improved networking performance. Reprioritization algorithms need to be ported still. One is in place. --- .../Region/ClientStack/LindenUDP/LLClientView.cs | 353 ++++++++++++++------- 1 file changed, 235 insertions(+), 118 deletions(-) (limited to 'OpenSim/Region/ClientStack/LindenUDP') diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs index 34d72ac..a3f2c15 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs @@ -49,6 +49,8 @@ using Timer = System.Timers.Timer; using AssetLandmark = OpenSim.Framework.AssetLandmark; using Nini.Config; +using System.IO; + namespace OpenSim.Region.ClientStack.LindenUDP { public delegate bool PacketMethod(IClientAPI simClient, Packet packet); @@ -298,6 +300,77 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Used to adjust Sun Orbit values so Linden based viewers properly position sun private const float m_sunPainDaHalfOrbitalCutoff = 4.712388980384689858f; + // First log file or time has expired, start writing to a new log file +// +// ----------------------------------------------------------------- +// ----------------------------------------------------------------- +// THIS IS DEBUGGING CODE & SHOULD BE REMOVED +// ----------------------------------------------------------------- +// ----------------------------------------------------------------- + public class QueueLogger + { + public Int32 start = 0; + public StreamWriter Log = null; + private Dictionary m_idMap = new Dictionary(); + + public QueueLogger() + { + DateTime now = DateTime.Now; + String fname = String.Format("queue-{0}.log", now.ToString("yyyyMMddHHmmss")); + Log = new StreamWriter(fname); + + start = Util.EnvironmentTickCount(); + } + + public int LookupID(UUID uuid) + { + int localid; + if (! m_idMap.TryGetValue(uuid,out localid)) + { + localid = m_idMap.Count + 1; + m_idMap[uuid] = localid; + } + + return localid; + } + } + + public static QueueLogger QueueLog = null; + + // ----------------------------------------------------------------- + public void LogAvatarUpdateEvent(UUID client, UUID avatar, Int32 timeinqueue) + { + if (QueueLog == null) + QueueLog = new QueueLogger(); + + Int32 ticks = Util.EnvironmentTickCountSubtract(QueueLog.start); + lock(QueueLog) + { + int cid = QueueLog.LookupID(client); + int aid = QueueLog.LookupID(avatar); + QueueLog.Log.WriteLine("{0},AU,AV{1:D4},AV{2:D4},{3}",ticks,cid,aid,timeinqueue); + } + } + + // ----------------------------------------------------------------- + public void LogQueueProcessEvent(UUID client, PriorityQueue queue, uint maxup) + { + if (QueueLog == null) + QueueLog = new QueueLogger(); + + Int32 ticks = Util.EnvironmentTickCountSubtract(QueueLog.start); + lock(QueueLog) + { + int cid = QueueLog.LookupID(client); + QueueLog.Log.WriteLine("{0},PQ,AV{1:D4},{2},{3}",ticks,cid,maxup,queue.ToString()); + } + } +// ----------------------------------------------------------------- +// ----------------------------------------------------------------- +// ----------------------------------------------------------------- +// ----------------------------------------------------------------- +// + private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); protected static Dictionary PacketHandlers = new Dictionary(); //Global/static handlers for all clients @@ -3547,18 +3620,23 @@ namespace OpenSim.Region.ClientStack.LindenUDP #region Primitive Packet/Data Sending Methods + /// /// Generate one of the object update packets based on PrimUpdateFlags /// and broadcast the packet to clients /// public void SendPrimUpdate(ISceneEntity entity, PrimUpdateFlags updateFlags) { - double priority = m_prioritizer.GetUpdatePriority(this, entity); + //double priority = m_prioritizer.GetUpdatePriority(this, entity); + uint priority = m_prioritizer.GetUpdatePriority(this, entity); lock (m_entityUpdates.SyncRoot) - m_entityUpdates.Enqueue(priority, new EntityUpdate(entity, updateFlags, m_scene.TimeDilation), entity.LocalId); + m_entityUpdates.Enqueue(priority, new EntityUpdate(entity, updateFlags, m_scene.TimeDilation)); } + private Int32 m_LastQueueFill = 0; + private uint m_maxUpdates = 0; + private void ProcessEntityUpdates(int maxUpdates) { OpenSim.Framework.Lazy> objectUpdateBlocks = new OpenSim.Framework.Lazy>(); @@ -3566,23 +3644,55 @@ namespace OpenSim.Region.ClientStack.LindenUDP OpenSim.Framework.Lazy> terseUpdateBlocks = new OpenSim.Framework.Lazy>(); OpenSim.Framework.Lazy> terseAgentUpdateBlocks = new OpenSim.Framework.Lazy>(); - if (maxUpdates <= 0) maxUpdates = Int32.MaxValue; + if (maxUpdates <= 0) + { + m_maxUpdates = Int32.MaxValue; + } + else + { + if (m_maxUpdates == 0 || m_LastQueueFill == 0) + { + m_maxUpdates = (uint)maxUpdates; + } + else + { + if (Util.EnvironmentTickCountSubtract(m_LastQueueFill) < 200) + m_maxUpdates += 5; + else + m_maxUpdates = m_maxUpdates >> 1; + } + m_maxUpdates = Util.Clamp(m_maxUpdates,10,500); + } + m_LastQueueFill = Util.EnvironmentTickCount(); + int updatesThisCall = 0; +// +// DEBUGGING CODE... REMOVE +// LogQueueProcessEvent(this.m_agentId,m_entityUpdates,m_maxUpdates); +// // We must lock for both manipulating the kill record and sending the packet, in order to avoid a race // condition where a kill can be processed before an out-of-date update for the same object. lock (m_killRecord) { float avgTimeDilation = 1.0f; EntityUpdate update; - while (updatesThisCall < maxUpdates) + Int32 timeinqueue; // this is just debugging code & can be dropped later + + while (updatesThisCall < m_maxUpdates) { lock (m_entityUpdates.SyncRoot) - if (!m_entityUpdates.TryDequeue(out update)) + if (!m_entityUpdates.TryDequeue(out update, out timeinqueue)) break; avgTimeDilation += update.TimeDilation; avgTimeDilation *= 0.5f; +// +// DEBUGGING CODE... REMOVE +// if (update.Entity is ScenePresence) +// LogAvatarUpdateEvent(this.m_agentId,update.Entity.UUID,timeinqueue); +// + if (update.Entity is SceneObjectPart) { SceneObjectPart part = (SceneObjectPart)update.Entity; @@ -3679,36 +3789,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP } else { - // if (update.Entity is SceneObjectPart && ((SceneObjectPart)update.Entity).IsAttachment) - // { - // SceneObjectPart sop = (SceneObjectPart)update.Entity; - // string text = sop.Text; - // if (text.IndexOf("\n") >= 0) - // text = text.Remove(text.IndexOf("\n")); - // - // if (m_attachmentsSent.Contains(sop.ParentID)) - // { - //// m_log.DebugFormat( - //// "[CLIENT]: Sending full info about attached prim {0} text {1}", - //// sop.LocalId, text); - // - // objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock(sop, this.m_agentId)); - // - // m_attachmentsSent.Add(sop.LocalId); - // } - // else - // { - // m_log.DebugFormat( - // "[CLIENT]: Requeueing full update of prim {0} text {1} since we haven't sent its parent {2} yet", - // sop.LocalId, text, sop.ParentID); - // - // m_entityUpdates.Enqueue(double.MaxValue, update, sop.LocalId); - // } - // } - // else - // { - objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock((SceneObjectPart)update.Entity, this.m_agentId)); - // } + objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock((SceneObjectPart)update.Entity, this.m_agentId)); } } else if (!canUseImproved) @@ -3802,26 +3883,24 @@ namespace OpenSim.Region.ClientStack.LindenUDP public void ReprioritizeUpdates() { - //m_log.Debug("[CLIENT]: Reprioritizing prim updates for " + m_firstName + " " + m_lastName); - lock (m_entityUpdates.SyncRoot) m_entityUpdates.Reprioritize(UpdatePriorityHandler); } - private bool UpdatePriorityHandler(ref double priority, uint localID) + private bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity) { - EntityBase entity; - if (m_scene.Entities.TryGetValue(localID, out entity)) + if (entity != null) { priority = m_prioritizer.GetUpdatePriority(this, entity); + return true; } - return priority != double.NaN; + return false; } public void FlushPrimUpdates() { - m_log.Debug("[CLIENT]: Flushing prim updates to " + m_firstName + " " + m_lastName); + m_log.WarnFormat("[CLIENT]: Flushing prim updates to " + m_firstName + " " + m_lastName); while (m_entityUpdates.Count > 0) ProcessEntityUpdates(-1); @@ -11729,86 +11808,85 @@ namespace OpenSim.Region.ClientStack.LindenUDP #region PriorityQueue public class PriorityQueue { - internal delegate bool UpdatePriorityHandler(ref double priority, uint local_id); + internal delegate bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity); + + // Heap[0] for self updates + // Heap[1..12] for entity updates - private MinHeap[] m_heaps = new MinHeap[1]; + internal const uint m_numberOfQueues = 12; + private MinHeap[] m_heaps = new MinHeap[m_numberOfQueues]; private Dictionary m_lookupTable; - private Comparison m_comparison; private object m_syncRoot = new object(); - + private uint m_nextQueue = 0; + private UInt64 m_nextRequest = 0; + internal PriorityQueue() : - this(MinHeap.DEFAULT_CAPACITY, Comparer.Default) { } - internal PriorityQueue(int capacity) : - this(capacity, Comparer.Default) { } - internal PriorityQueue(IComparer comparer) : - this(new Comparison(comparer.Compare)) { } - internal PriorityQueue(Comparison comparison) : - this(MinHeap.DEFAULT_CAPACITY, comparison) { } - internal PriorityQueue(int capacity, IComparer comparer) : - this(capacity, new Comparison(comparer.Compare)) { } - internal PriorityQueue(int capacity, Comparison comparison) + this(MinHeap.DEFAULT_CAPACITY) { } + internal PriorityQueue(int capacity) { m_lookupTable = new Dictionary(capacity); for (int i = 0; i < m_heaps.Length; ++i) m_heaps[i] = new MinHeap(capacity); - this.m_comparison = comparison; } public object SyncRoot { get { return this.m_syncRoot; } } + internal int Count { get { int count = 0; for (int i = 0; i < m_heaps.Length; ++i) - count = m_heaps[i].Count; + count += m_heaps[i].Count; return count; } } - public bool Enqueue(double priority, EntityUpdate value, uint local_id) + public bool Enqueue(uint pqueue, EntityUpdate value) { - LookupItem item; + LookupItem lookup; - if (m_lookupTable.TryGetValue(local_id, out item)) + uint localid = value.Entity.LocalId; + UInt64 entry = m_nextRequest++; + if (m_lookupTable.TryGetValue(localid, out lookup)) { - // Combine flags - value.Flags |= item.Heap[item.Handle].Value.Flags; - - item.Heap[item.Handle] = new MinHeapItem(priority, value, local_id, this.m_comparison); - return false; - } - else - { - item.Heap = m_heaps[0]; - item.Heap.Add(new MinHeapItem(priority, value, local_id, this.m_comparison), ref item.Handle); - m_lookupTable.Add(local_id, item); - return true; + entry = lookup.Heap[lookup.Handle].EntryOrder; + value.Flags |= lookup.Heap[lookup.Handle].Value.Flags; + lookup.Heap.Remove(lookup.Handle); } - } - internal EntityUpdate Peek() - { - for (int i = 0; i < m_heaps.Length; ++i) - if (m_heaps[i].Count > 0) - return m_heaps[i].Min().Value; - throw new InvalidOperationException(string.Format("The {0} is empty", this.GetType().ToString())); + pqueue = Util.Clamp(pqueue, 0, m_numberOfQueues - 1); + lookup.Heap = m_heaps[pqueue]; + lookup.Heap.Add(new MinHeapItem(pqueue, entry, value), ref lookup.Handle); + m_lookupTable[localid] = lookup; + + return true; } - internal bool TryDequeue(out EntityUpdate value) + internal bool TryDequeue(out EntityUpdate value, out Int32 timeinqueue) { - for (int i = 0; i < m_heaps.Length; ++i) + for (int i = 0; i < m_numberOfQueues; ++i) { - if (m_heaps[i].Count > 0) + // To get the fair queing, we cycle through each of the + // queues when finding an element to dequeue, this code + // assumes that the distribution of updates in the queues + // is polynomial, probably quadractic (eg distance of PI * R^2) + uint h = (uint)((m_nextQueue + i) % m_numberOfQueues); + if (m_heaps[h].Count > 0) { - MinHeapItem item = m_heaps[i].RemoveMin(); - m_lookupTable.Remove(item.LocalID); + m_nextQueue = (uint)((h + 1) % m_numberOfQueues); + + MinHeapItem item = m_heaps[h].RemoveMin(); + m_lookupTable.Remove(item.Value.Entity.LocalId); + timeinqueue = Util.EnvironmentTickCountSubtract(item.EntryTime); value = item.Value; + return true; } } + timeinqueue = 0; value = default(EntityUpdate); return false; } @@ -11816,68 +11894,107 @@ namespace OpenSim.Region.ClientStack.LindenUDP internal void Reprioritize(UpdatePriorityHandler handler) { MinHeapItem item; - double priority; - foreach (LookupItem lookup in new List(this.m_lookupTable.Values)) { if (lookup.Heap.TryGetValue(lookup.Handle, out item)) { - priority = item.Priority; - if (handler(ref priority, item.LocalID)) + uint pqueue = item.PriorityQueue; + uint localid = item.Value.Entity.LocalId; + + if (handler(ref pqueue, item.Value.Entity)) { - if (lookup.Heap.ContainsHandle(lookup.Handle)) - lookup.Heap[lookup.Handle] = - new MinHeapItem(priority, item.Value, item.LocalID, this.m_comparison); + // unless the priority queue has changed, there is no need to modify + // the entry + pqueue = Util.Clamp(pqueue, 0, m_numberOfQueues - 1); + if (pqueue != item.PriorityQueue) + { + lookup.Heap.Remove(lookup.Handle); + + LookupItem litem = lookup; + litem.Heap = m_heaps[pqueue]; + litem.Heap.Add(new MinHeapItem(pqueue, item), ref litem.Handle); + m_lookupTable[localid] = litem; + } } else { - m_log.Warn("[LLCLIENTVIEW]: UpdatePriorityHandler returned false, dropping update"); + m_log.WarnFormat("[LLCLIENTVIEW]: UpdatePriorityHandler returned false for {0}",item.Value.Entity.UUID); lookup.Heap.Remove(lookup.Handle); - this.m_lookupTable.Remove(item.LocalID); + this.m_lookupTable.Remove(localid); } } } } + public override string ToString() + { + string s = ""; + for (int i = 0; i < m_numberOfQueues; i++) + { + if (s != "") s += ","; + s += m_heaps[i].Count.ToString(); + } + return s; + } + #region MinHeapItem private struct MinHeapItem : IComparable { - private double priority; private EntityUpdate value; - private uint local_id; - private Comparison comparison; + internal EntityUpdate Value { + get { + return this.value; + } + } + + private uint pqueue; + internal uint PriorityQueue { + get { + return this.pqueue; + } + } - internal MinHeapItem(double priority, EntityUpdate value, uint local_id) : - this(priority, value, local_id, Comparer.Default) { } - internal MinHeapItem(double priority, EntityUpdate value, uint local_id, IComparer comparer) : - this(priority, value, local_id, new Comparison(comparer.Compare)) { } - internal MinHeapItem(double priority, EntityUpdate value, uint local_id, Comparison comparison) + private Int32 entrytime; + internal Int32 EntryTime { + get { + return this.entrytime; + } + } + + private UInt64 entryorder; + internal UInt64 EntryOrder { - this.priority = priority; + get { + return this.entryorder; + } + } + + internal MinHeapItem(uint pqueue, MinHeapItem other) + { + this.entrytime = other.entrytime; + this.entryorder = other.entryorder; + this.value = other.value; + this.pqueue = pqueue; + } + + internal MinHeapItem(uint pqueue, UInt64 entryorder, EntityUpdate value) + { + this.entrytime = Util.EnvironmentTickCount(); + this.entryorder = entryorder; this.value = value; - this.local_id = local_id; - this.comparison = comparison; + this.pqueue = pqueue; } - internal double Priority { get { return this.priority; } } - internal EntityUpdate Value { get { return this.value; } } - internal uint LocalID { get { return this.local_id; } } - public override string ToString() { - StringBuilder sb = new StringBuilder(); - sb.Append("["); - sb.Append(this.priority.ToString()); - sb.Append(","); - if (this.value != null) - sb.Append(this.value.ToString()); - sb.Append("]"); - return sb.ToString(); + return String.Format("[{0},{1},{2}]",pqueue,entryorder,value.Entity.LocalId); } public int CompareTo(MinHeapItem other) { - return this.comparison(this.priority, other.priority); + // I'm assuming that the root part of an SOG is added to the update queue + // before the component parts + return Comparer.Default.Compare(this.EntryOrder, other.EntryOrder); } } #endregion -- cgit v1.1 From 83dc2470f2e815f6f9d469104be3a2806438a29a Mon Sep 17 00:00:00 2001 From: Mic Bowman Date: Mon, 4 Apr 2011 14:18:26 -0700 Subject: Fix a bug in the computation of the RTO. Basically... the RTO (the time to wait to retransmit packets) always maxed out (no retransmissions for 24 or 48 seconds. Note that this is going to cause faster (and more) retransmissions. Fix for dynamic throttling needs to go with this. --- OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'OpenSim/Region/ClientStack/LindenUDP') diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs index 65a8fe3..9a8bfd3 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs @@ -149,7 +149,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Caches packed throttle information private byte[] m_packedThrottles; - private int m_defaultRTO = 3000; + private int m_defaultRTO = 1000; // 1sec is the recommendation in the RFC private int m_maxRTO = 60000; /// @@ -557,7 +557,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP int rto = (int)(SRTT + Math.Max(m_udpServer.TickCountResolution, K * RTTVAR)); // Clamp the retransmission timeout to manageable values - rto = Utils.Clamp(RTO, m_defaultRTO, m_maxRTO); + rto = Utils.Clamp(rto, m_defaultRTO, m_maxRTO); RTO = rto; -- cgit v1.1 From 19c6d1d569e081418e139d139c15ea66640288ba Mon Sep 17 00:00:00 2001 From: Mic Bowman Date: Wed, 6 Apr 2011 09:26:38 -0700 Subject: Split the priority queue class into a seperate file. LLClientView is big enough. --- .../Region/ClientStack/LindenUDP/LLClientView.cs | 205 ----------------- .../Region/ClientStack/LindenUDP/PriorityQueue.cs | 245 +++++++++++++++++++++ 2 files changed, 245 insertions(+), 205 deletions(-) create mode 100644 OpenSim/Region/ClientStack/LindenUDP/PriorityQueue.cs (limited to 'OpenSim/Region/ClientStack/LindenUDP') diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs index a3f2c15..a724099 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs @@ -11805,209 +11805,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP OutPacket(pack, ThrottleOutPacketType.Task); } - #region PriorityQueue - public class PriorityQueue - { - internal delegate bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity); - - // Heap[0] for self updates - // Heap[1..12] for entity updates - - internal const uint m_numberOfQueues = 12; - private MinHeap[] m_heaps = new MinHeap[m_numberOfQueues]; - private Dictionary m_lookupTable; - private object m_syncRoot = new object(); - private uint m_nextQueue = 0; - private UInt64 m_nextRequest = 0; - - internal PriorityQueue() : - this(MinHeap.DEFAULT_CAPACITY) { } - internal PriorityQueue(int capacity) - { - m_lookupTable = new Dictionary(capacity); - - for (int i = 0; i < m_heaps.Length; ++i) - m_heaps[i] = new MinHeap(capacity); - } - - public object SyncRoot { get { return this.m_syncRoot; } } - - internal int Count - { - get - { - int count = 0; - for (int i = 0; i < m_heaps.Length; ++i) - count += m_heaps[i].Count; - return count; - } - } - - public bool Enqueue(uint pqueue, EntityUpdate value) - { - LookupItem lookup; - - uint localid = value.Entity.LocalId; - UInt64 entry = m_nextRequest++; - if (m_lookupTable.TryGetValue(localid, out lookup)) - { - entry = lookup.Heap[lookup.Handle].EntryOrder; - value.Flags |= lookup.Heap[lookup.Handle].Value.Flags; - lookup.Heap.Remove(lookup.Handle); - } - - pqueue = Util.Clamp(pqueue, 0, m_numberOfQueues - 1); - lookup.Heap = m_heaps[pqueue]; - lookup.Heap.Add(new MinHeapItem(pqueue, entry, value), ref lookup.Handle); - m_lookupTable[localid] = lookup; - - return true; - } - - internal bool TryDequeue(out EntityUpdate value, out Int32 timeinqueue) - { - for (int i = 0; i < m_numberOfQueues; ++i) - { - // To get the fair queing, we cycle through each of the - // queues when finding an element to dequeue, this code - // assumes that the distribution of updates in the queues - // is polynomial, probably quadractic (eg distance of PI * R^2) - uint h = (uint)((m_nextQueue + i) % m_numberOfQueues); - if (m_heaps[h].Count > 0) - { - m_nextQueue = (uint)((h + 1) % m_numberOfQueues); - - MinHeapItem item = m_heaps[h].RemoveMin(); - m_lookupTable.Remove(item.Value.Entity.LocalId); - timeinqueue = Util.EnvironmentTickCountSubtract(item.EntryTime); - value = item.Value; - - return true; - } - } - - timeinqueue = 0; - value = default(EntityUpdate); - return false; - } - - internal void Reprioritize(UpdatePriorityHandler handler) - { - MinHeapItem item; - foreach (LookupItem lookup in new List(this.m_lookupTable.Values)) - { - if (lookup.Heap.TryGetValue(lookup.Handle, out item)) - { - uint pqueue = item.PriorityQueue; - uint localid = item.Value.Entity.LocalId; - - if (handler(ref pqueue, item.Value.Entity)) - { - // unless the priority queue has changed, there is no need to modify - // the entry - pqueue = Util.Clamp(pqueue, 0, m_numberOfQueues - 1); - if (pqueue != item.PriorityQueue) - { - lookup.Heap.Remove(lookup.Handle); - - LookupItem litem = lookup; - litem.Heap = m_heaps[pqueue]; - litem.Heap.Add(new MinHeapItem(pqueue, item), ref litem.Handle); - m_lookupTable[localid] = litem; - } - } - else - { - m_log.WarnFormat("[LLCLIENTVIEW]: UpdatePriorityHandler returned false for {0}",item.Value.Entity.UUID); - lookup.Heap.Remove(lookup.Handle); - this.m_lookupTable.Remove(localid); - } - } - } - } - - public override string ToString() - { - string s = ""; - for (int i = 0; i < m_numberOfQueues; i++) - { - if (s != "") s += ","; - s += m_heaps[i].Count.ToString(); - } - return s; - } - - #region MinHeapItem - private struct MinHeapItem : IComparable - { - private EntityUpdate value; - internal EntityUpdate Value { - get { - return this.value; - } - } - - private uint pqueue; - internal uint PriorityQueue { - get { - return this.pqueue; - } - } - - private Int32 entrytime; - internal Int32 EntryTime { - get { - return this.entrytime; - } - } - - private UInt64 entryorder; - internal UInt64 EntryOrder - { - get { - return this.entryorder; - } - } - - internal MinHeapItem(uint pqueue, MinHeapItem other) - { - this.entrytime = other.entrytime; - this.entryorder = other.entryorder; - this.value = other.value; - this.pqueue = pqueue; - } - - internal MinHeapItem(uint pqueue, UInt64 entryorder, EntityUpdate value) - { - this.entrytime = Util.EnvironmentTickCount(); - this.entryorder = entryorder; - this.value = value; - this.pqueue = pqueue; - } - - public override string ToString() - { - return String.Format("[{0},{1},{2}]",pqueue,entryorder,value.Entity.LocalId); - } - - public int CompareTo(MinHeapItem other) - { - // I'm assuming that the root part of an SOG is added to the update queue - // before the component parts - return Comparer.Default.Compare(this.EntryOrder, other.EntryOrder); - } - } - #endregion - - #region LookupItem - private struct LookupItem - { - internal MinHeap Heap; - internal IHandle Handle; - } - #endregion - } - public struct PacketProcessor { public PacketMethod method; @@ -12028,8 +11825,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP } } - #endregion - public static OSD BuildEvent(string eventName, OSD eventBody) { OSDMap osdEvent = new OSDMap(2); diff --git a/OpenSim/Region/ClientStack/LindenUDP/PriorityQueue.cs b/OpenSim/Region/ClientStack/LindenUDP/PriorityQueue.cs new file mode 100644 index 0000000..364ce4b --- /dev/null +++ b/OpenSim/Region/ClientStack/LindenUDP/PriorityQueue.cs @@ -0,0 +1,245 @@ +/* + * Copyright (c) Contributors, http://opensimulator.org/ + * See CONTRIBUTORS.TXT for a full list of copyright holders. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the OpenSimulator Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +using System; +using System.Collections; +using System.Collections.Generic; +using System.Reflection; + +using OpenSim.Framework; +using OpenSim.Framework.Client; +using log4net; + +namespace OpenSim.Region.ClientStack.LindenUDP +{ + public class PriorityQueue + { + private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + + internal delegate bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity); + + // Heap[0] for self updates + // Heap[1..12] for entity updates + + internal const uint m_numberOfQueues = 12; + + private MinHeap[] m_heaps = new MinHeap[m_numberOfQueues]; + private Dictionary m_lookupTable; + private uint m_nextQueue = 0; + private UInt64 m_nextRequest = 0; + + private object m_syncRoot = new object(); + public object SyncRoot { + get { return this.m_syncRoot; } + } + + internal PriorityQueue() : this(MinHeap.DEFAULT_CAPACITY) { } + + internal PriorityQueue(int capacity) + { + m_lookupTable = new Dictionary(capacity); + + for (int i = 0; i < m_heaps.Length; ++i) + m_heaps[i] = new MinHeap(capacity); + } + + internal int Count + { + get + { + int count = 0; + for (int i = 0; i < m_heaps.Length; ++i) + count += m_heaps[i].Count; + return count; + } + } + + public bool Enqueue(uint pqueue, EntityUpdate value) + { + LookupItem lookup; + + uint localid = value.Entity.LocalId; + UInt64 entry = m_nextRequest++; + if (m_lookupTable.TryGetValue(localid, out lookup)) + { + entry = lookup.Heap[lookup.Handle].EntryOrder; + value.Flags |= lookup.Heap[lookup.Handle].Value.Flags; + lookup.Heap.Remove(lookup.Handle); + } + + pqueue = Util.Clamp(pqueue, 0, m_numberOfQueues - 1); + lookup.Heap = m_heaps[pqueue]; + lookup.Heap.Add(new MinHeapItem(pqueue, entry, value), ref lookup.Handle); + m_lookupTable[localid] = lookup; + + return true; + } + + internal bool TryDequeue(out EntityUpdate value, out Int32 timeinqueue) + { + for (int i = 0; i < m_numberOfQueues; ++i) + { + // To get the fair queing, we cycle through each of the + // queues when finding an element to dequeue, this code + // assumes that the distribution of updates in the queues + // is polynomial, probably quadractic (eg distance of PI * R^2) + uint h = (uint)((m_nextQueue + i) % m_numberOfQueues); + if (m_heaps[h].Count > 0) + { + m_nextQueue = (uint)((h + 1) % m_numberOfQueues); + + MinHeapItem item = m_heaps[h].RemoveMin(); + m_lookupTable.Remove(item.Value.Entity.LocalId); + timeinqueue = Util.EnvironmentTickCountSubtract(item.EntryTime); + value = item.Value; + + return true; + } + } + + timeinqueue = 0; + value = default(EntityUpdate); + return false; + } + + internal void Reprioritize(UpdatePriorityHandler handler) + { + MinHeapItem item; + foreach (LookupItem lookup in new List(this.m_lookupTable.Values)) + { + if (lookup.Heap.TryGetValue(lookup.Handle, out item)) + { + uint pqueue = item.PriorityQueue; + uint localid = item.Value.Entity.LocalId; + + if (handler(ref pqueue, item.Value.Entity)) + { + // unless the priority queue has changed, there is no need to modify + // the entry + pqueue = Util.Clamp(pqueue, 0, m_numberOfQueues - 1); + if (pqueue != item.PriorityQueue) + { + lookup.Heap.Remove(lookup.Handle); + + LookupItem litem = lookup; + litem.Heap = m_heaps[pqueue]; + litem.Heap.Add(new MinHeapItem(pqueue, item), ref litem.Handle); + m_lookupTable[localid] = litem; + } + } + else + { + // m_log.WarnFormat("[PQUEUE]: UpdatePriorityHandler returned false for {0}",item.Value.Entity.UUID); + lookup.Heap.Remove(lookup.Handle); + this.m_lookupTable.Remove(localid); + } + } + } + } + + public override string ToString() + { + string s = ""; + for (int i = 0; i < m_numberOfQueues; i++) + { + if (s != "") s += ","; + s += m_heaps[i].Count.ToString(); + } + return s; + } + +#region MinHeapItem + private struct MinHeapItem : IComparable + { + private EntityUpdate value; + internal EntityUpdate Value { + get { + return this.value; + } + } + + private uint pqueue; + internal uint PriorityQueue { + get { + return this.pqueue; + } + } + + private Int32 entrytime; + internal Int32 EntryTime { + get { + return this.entrytime; + } + } + + private UInt64 entryorder; + internal UInt64 EntryOrder + { + get { + return this.entryorder; + } + } + + internal MinHeapItem(uint pqueue, MinHeapItem other) + { + this.entrytime = other.entrytime; + this.entryorder = other.entryorder; + this.value = other.value; + this.pqueue = pqueue; + } + + internal MinHeapItem(uint pqueue, UInt64 entryorder, EntityUpdate value) + { + this.entrytime = Util.EnvironmentTickCount(); + this.entryorder = entryorder; + this.value = value; + this.pqueue = pqueue; + } + + public override string ToString() + { + return String.Format("[{0},{1},{2}]",pqueue,entryorder,value.Entity.LocalId); + } + + public int CompareTo(MinHeapItem other) + { + // I'm assuming that the root part of an SOG is added to the update queue + // before the component parts + return Comparer.Default.Compare(this.EntryOrder, other.EntryOrder); + } + } +#endregion + +#region LookupItem + private struct LookupItem + { + internal MinHeap Heap; + internal IHandle Handle; + } +#endregion + } +} -- cgit v1.1 From f778056c7a81443b11c0c4288979af3a8c6b5b9f Mon Sep 17 00:00:00 2001 From: Mic Bowman Date: Mon, 11 Apr 2011 08:37:43 -0700 Subject: Removed some priority queue debugging code --- .../Region/ClientStack/LindenUDP/LLClientView.cs | 80 ---------------------- 1 file changed, 80 deletions(-) (limited to 'OpenSim/Region/ClientStack/LindenUDP') diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs index a724099..b5bf26d 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs @@ -300,77 +300,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Used to adjust Sun Orbit values so Linden based viewers properly position sun private const float m_sunPainDaHalfOrbitalCutoff = 4.712388980384689858f; - // First log file or time has expired, start writing to a new log file -// -// ----------------------------------------------------------------- -// ----------------------------------------------------------------- -// THIS IS DEBUGGING CODE & SHOULD BE REMOVED -// ----------------------------------------------------------------- -// ----------------------------------------------------------------- - public class QueueLogger - { - public Int32 start = 0; - public StreamWriter Log = null; - private Dictionary m_idMap = new Dictionary(); - - public QueueLogger() - { - DateTime now = DateTime.Now; - String fname = String.Format("queue-{0}.log", now.ToString("yyyyMMddHHmmss")); - Log = new StreamWriter(fname); - - start = Util.EnvironmentTickCount(); - } - - public int LookupID(UUID uuid) - { - int localid; - if (! m_idMap.TryGetValue(uuid,out localid)) - { - localid = m_idMap.Count + 1; - m_idMap[uuid] = localid; - } - - return localid; - } - } - - public static QueueLogger QueueLog = null; - - // ----------------------------------------------------------------- - public void LogAvatarUpdateEvent(UUID client, UUID avatar, Int32 timeinqueue) - { - if (QueueLog == null) - QueueLog = new QueueLogger(); - - Int32 ticks = Util.EnvironmentTickCountSubtract(QueueLog.start); - lock(QueueLog) - { - int cid = QueueLog.LookupID(client); - int aid = QueueLog.LookupID(avatar); - QueueLog.Log.WriteLine("{0},AU,AV{1:D4},AV{2:D4},{3}",ticks,cid,aid,timeinqueue); - } - } - - // ----------------------------------------------------------------- - public void LogQueueProcessEvent(UUID client, PriorityQueue queue, uint maxup) - { - if (QueueLog == null) - QueueLog = new QueueLogger(); - - Int32 ticks = Util.EnvironmentTickCountSubtract(QueueLog.start); - lock(QueueLog) - { - int cid = QueueLog.LookupID(client); - QueueLog.Log.WriteLine("{0},PQ,AV{1:D4},{2},{3}",ticks,cid,maxup,queue.ToString()); - } - } -// ----------------------------------------------------------------- -// ----------------------------------------------------------------- -// ----------------------------------------------------------------- -// ----------------------------------------------------------------- -// - private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); protected static Dictionary PacketHandlers = new Dictionary(); //Global/static handlers for all clients @@ -3667,10 +3596,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP int updatesThisCall = 0; -// -// DEBUGGING CODE... REMOVE -// LogQueueProcessEvent(this.m_agentId,m_entityUpdates,m_maxUpdates); -// // We must lock for both manipulating the kill record and sending the packet, in order to avoid a race // condition where a kill can be processed before an out-of-date update for the same object. lock (m_killRecord) @@ -3687,11 +3612,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP avgTimeDilation += update.TimeDilation; avgTimeDilation *= 0.5f; -// -// DEBUGGING CODE... REMOVE -// if (update.Entity is ScenePresence) -// LogAvatarUpdateEvent(this.m_agentId,update.Entity.UUID,timeinqueue); -// if (update.Entity is SceneObjectPart) { -- cgit v1.1