From 6885b7220c6f782034d7c0220762244adf00e3d3 Mon Sep 17 00:00:00 2001
From: Mic Bowman
Date: Mon, 28 Mar 2011 10:00:53 -0700
Subject: Implements adaptive queue management and fair queueing for improved
networking performance.
Reprioritization algorithms need to be ported still. One is
in place.
---
.../Region/ClientStack/LindenUDP/LLClientView.cs | 353 ++++++++++++++-------
1 file changed, 235 insertions(+), 118 deletions(-)
(limited to 'OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs')
diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs
index 2faffae..e9e1fa3 100644
--- a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs
+++ b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs
@@ -49,6 +49,8 @@ using Timer = System.Timers.Timer;
using AssetLandmark = OpenSim.Framework.AssetLandmark;
using Nini.Config;
+using System.IO;
+
namespace OpenSim.Region.ClientStack.LindenUDP
{
public delegate bool PacketMethod(IClientAPI simClient, Packet packet);
@@ -298,6 +300,77 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// Used to adjust Sun Orbit values so Linden based viewers properly position sun
private const float m_sunPainDaHalfOrbitalCutoff = 4.712388980384689858f;
+ // First log file or time has expired, start writing to a new log file
+//
+// -----------------------------------------------------------------
+// -----------------------------------------------------------------
+// THIS IS DEBUGGING CODE & SHOULD BE REMOVED
+// -----------------------------------------------------------------
+// -----------------------------------------------------------------
+ public class QueueLogger
+ {
+ public Int32 start = 0;
+ public StreamWriter Log = null;
+ private Dictionary m_idMap = new Dictionary();
+
+ public QueueLogger()
+ {
+ DateTime now = DateTime.Now;
+ String fname = String.Format("queue-{0}.log", now.ToString("yyyyMMddHHmmss"));
+ Log = new StreamWriter(fname);
+
+ start = Util.EnvironmentTickCount();
+ }
+
+ public int LookupID(UUID uuid)
+ {
+ int localid;
+ if (! m_idMap.TryGetValue(uuid,out localid))
+ {
+ localid = m_idMap.Count + 1;
+ m_idMap[uuid] = localid;
+ }
+
+ return localid;
+ }
+ }
+
+ public static QueueLogger QueueLog = null;
+
+ // -----------------------------------------------------------------
+ public void LogAvatarUpdateEvent(UUID client, UUID avatar, Int32 timeinqueue)
+ {
+ if (QueueLog == null)
+ QueueLog = new QueueLogger();
+
+ Int32 ticks = Util.EnvironmentTickCountSubtract(QueueLog.start);
+ lock(QueueLog)
+ {
+ int cid = QueueLog.LookupID(client);
+ int aid = QueueLog.LookupID(avatar);
+ QueueLog.Log.WriteLine("{0},AU,AV{1:D4},AV{2:D4},{3}",ticks,cid,aid,timeinqueue);
+ }
+ }
+
+ // -----------------------------------------------------------------
+ public void LogQueueProcessEvent(UUID client, PriorityQueue queue, uint maxup)
+ {
+ if (QueueLog == null)
+ QueueLog = new QueueLogger();
+
+ Int32 ticks = Util.EnvironmentTickCountSubtract(QueueLog.start);
+ lock(QueueLog)
+ {
+ int cid = QueueLog.LookupID(client);
+ QueueLog.Log.WriteLine("{0},PQ,AV{1:D4},{2},{3}",ticks,cid,maxup,queue.ToString());
+ }
+ }
+// -----------------------------------------------------------------
+// -----------------------------------------------------------------
+// -----------------------------------------------------------------
+// -----------------------------------------------------------------
+//
+
private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
protected static Dictionary PacketHandlers = new Dictionary(); //Global/static handlers for all clients
@@ -3547,18 +3620,23 @@ namespace OpenSim.Region.ClientStack.LindenUDP
#region Primitive Packet/Data Sending Methods
+
///
/// Generate one of the object update packets based on PrimUpdateFlags
/// and broadcast the packet to clients
///
public void SendPrimUpdate(ISceneEntity entity, PrimUpdateFlags updateFlags)
{
- double priority = m_prioritizer.GetUpdatePriority(this, entity);
+ //double priority = m_prioritizer.GetUpdatePriority(this, entity);
+ uint priority = m_prioritizer.GetUpdatePriority(this, entity);
lock (m_entityUpdates.SyncRoot)
- m_entityUpdates.Enqueue(priority, new EntityUpdate(entity, updateFlags, m_scene.TimeDilation), entity.LocalId);
+ m_entityUpdates.Enqueue(priority, new EntityUpdate(entity, updateFlags, m_scene.TimeDilation));
}
+ private Int32 m_LastQueueFill = 0;
+ private uint m_maxUpdates = 0;
+
private void ProcessEntityUpdates(int maxUpdates)
{
OpenSim.Framework.Lazy> objectUpdateBlocks = new OpenSim.Framework.Lazy>();
@@ -3566,23 +3644,55 @@ namespace OpenSim.Region.ClientStack.LindenUDP
OpenSim.Framework.Lazy> terseUpdateBlocks = new OpenSim.Framework.Lazy>();
OpenSim.Framework.Lazy> terseAgentUpdateBlocks = new OpenSim.Framework.Lazy>();
- if (maxUpdates <= 0) maxUpdates = Int32.MaxValue;
+ if (maxUpdates <= 0)
+ {
+ m_maxUpdates = Int32.MaxValue;
+ }
+ else
+ {
+ if (m_maxUpdates == 0 || m_LastQueueFill == 0)
+ {
+ m_maxUpdates = (uint)maxUpdates;
+ }
+ else
+ {
+ if (Util.EnvironmentTickCountSubtract(m_LastQueueFill) < 200)
+ m_maxUpdates += 5;
+ else
+ m_maxUpdates = m_maxUpdates >> 1;
+ }
+ m_maxUpdates = Util.Clamp(m_maxUpdates,10,500);
+ }
+ m_LastQueueFill = Util.EnvironmentTickCount();
+
int updatesThisCall = 0;
+//
+// DEBUGGING CODE... REMOVE
+// LogQueueProcessEvent(this.m_agentId,m_entityUpdates,m_maxUpdates);
+//
// We must lock for both manipulating the kill record and sending the packet, in order to avoid a race
// condition where a kill can be processed before an out-of-date update for the same object.
lock (m_killRecord)
{
float avgTimeDilation = 1.0f;
EntityUpdate update;
- while (updatesThisCall < maxUpdates)
+ Int32 timeinqueue; // this is just debugging code & can be dropped later
+
+ while (updatesThisCall < m_maxUpdates)
{
lock (m_entityUpdates.SyncRoot)
- if (!m_entityUpdates.TryDequeue(out update))
+ if (!m_entityUpdates.TryDequeue(out update, out timeinqueue))
break;
avgTimeDilation += update.TimeDilation;
avgTimeDilation *= 0.5f;
+//
+// DEBUGGING CODE... REMOVE
+// if (update.Entity is ScenePresence)
+// LogAvatarUpdateEvent(this.m_agentId,update.Entity.UUID,timeinqueue);
+//
+
if (update.Entity is SceneObjectPart)
{
SceneObjectPart part = (SceneObjectPart)update.Entity;
@@ -3679,36 +3789,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
}
else
{
- // if (update.Entity is SceneObjectPart && ((SceneObjectPart)update.Entity).IsAttachment)
- // {
- // SceneObjectPart sop = (SceneObjectPart)update.Entity;
- // string text = sop.Text;
- // if (text.IndexOf("\n") >= 0)
- // text = text.Remove(text.IndexOf("\n"));
- //
- // if (m_attachmentsSent.Contains(sop.ParentID))
- // {
- //// m_log.DebugFormat(
- //// "[CLIENT]: Sending full info about attached prim {0} text {1}",
- //// sop.LocalId, text);
- //
- // objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock(sop, this.m_agentId));
- //
- // m_attachmentsSent.Add(sop.LocalId);
- // }
- // else
- // {
- // m_log.DebugFormat(
- // "[CLIENT]: Requeueing full update of prim {0} text {1} since we haven't sent its parent {2} yet",
- // sop.LocalId, text, sop.ParentID);
- //
- // m_entityUpdates.Enqueue(double.MaxValue, update, sop.LocalId);
- // }
- // }
- // else
- // {
- objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock((SceneObjectPart)update.Entity, this.m_agentId));
- // }
+ objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock((SceneObjectPart)update.Entity, this.m_agentId));
}
}
else if (!canUseImproved)
@@ -3802,26 +3883,24 @@ namespace OpenSim.Region.ClientStack.LindenUDP
public void ReprioritizeUpdates()
{
- //m_log.Debug("[CLIENT]: Reprioritizing prim updates for " + m_firstName + " " + m_lastName);
-
lock (m_entityUpdates.SyncRoot)
m_entityUpdates.Reprioritize(UpdatePriorityHandler);
}
- private bool UpdatePriorityHandler(ref double priority, uint localID)
+ private bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity)
{
- EntityBase entity;
- if (m_scene.Entities.TryGetValue(localID, out entity))
+ if (entity != null)
{
priority = m_prioritizer.GetUpdatePriority(this, entity);
+ return true;
}
- return priority != double.NaN;
+ return false;
}
public void FlushPrimUpdates()
{
- m_log.Debug("[CLIENT]: Flushing prim updates to " + m_firstName + " " + m_lastName);
+ m_log.WarnFormat("[CLIENT]: Flushing prim updates to " + m_firstName + " " + m_lastName);
while (m_entityUpdates.Count > 0)
ProcessEntityUpdates(-1);
@@ -11713,86 +11792,85 @@ namespace OpenSim.Region.ClientStack.LindenUDP
#region PriorityQueue
public class PriorityQueue
{
- internal delegate bool UpdatePriorityHandler(ref double priority, uint local_id);
+ internal delegate bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity);
+
+ // Heap[0] for self updates
+ // Heap[1..12] for entity updates
- private MinHeap[] m_heaps = new MinHeap[1];
+ internal const uint m_numberOfQueues = 12;
+ private MinHeap[] m_heaps = new MinHeap[m_numberOfQueues];
private Dictionary m_lookupTable;
- private Comparison m_comparison;
private object m_syncRoot = new object();
-
+ private uint m_nextQueue = 0;
+ private UInt64 m_nextRequest = 0;
+
internal PriorityQueue() :
- this(MinHeap.DEFAULT_CAPACITY, Comparer.Default) { }
- internal PriorityQueue(int capacity) :
- this(capacity, Comparer.Default) { }
- internal PriorityQueue(IComparer comparer) :
- this(new Comparison(comparer.Compare)) { }
- internal PriorityQueue(Comparison comparison) :
- this(MinHeap.DEFAULT_CAPACITY, comparison) { }
- internal PriorityQueue(int capacity, IComparer comparer) :
- this(capacity, new Comparison(comparer.Compare)) { }
- internal PriorityQueue(int capacity, Comparison comparison)
+ this(MinHeap.DEFAULT_CAPACITY) { }
+ internal PriorityQueue(int capacity)
{
m_lookupTable = new Dictionary(capacity);
for (int i = 0; i < m_heaps.Length; ++i)
m_heaps[i] = new MinHeap(capacity);
- this.m_comparison = comparison;
}
public object SyncRoot { get { return this.m_syncRoot; } }
+
internal int Count
{
get
{
int count = 0;
for (int i = 0; i < m_heaps.Length; ++i)
- count = m_heaps[i].Count;
+ count += m_heaps[i].Count;
return count;
}
}
- public bool Enqueue(double priority, EntityUpdate value, uint local_id)
+ public bool Enqueue(uint pqueue, EntityUpdate value)
{
- LookupItem item;
+ LookupItem lookup;
- if (m_lookupTable.TryGetValue(local_id, out item))
+ uint localid = value.Entity.LocalId;
+ UInt64 entry = m_nextRequest++;
+ if (m_lookupTable.TryGetValue(localid, out lookup))
{
- // Combine flags
- value.Flags |= item.Heap[item.Handle].Value.Flags;
-
- item.Heap[item.Handle] = new MinHeapItem(priority, value, local_id, this.m_comparison);
- return false;
- }
- else
- {
- item.Heap = m_heaps[0];
- item.Heap.Add(new MinHeapItem(priority, value, local_id, this.m_comparison), ref item.Handle);
- m_lookupTable.Add(local_id, item);
- return true;
+ entry = lookup.Heap[lookup.Handle].EntryOrder;
+ value.Flags |= lookup.Heap[lookup.Handle].Value.Flags;
+ lookup.Heap.Remove(lookup.Handle);
}
- }
- internal EntityUpdate Peek()
- {
- for (int i = 0; i < m_heaps.Length; ++i)
- if (m_heaps[i].Count > 0)
- return m_heaps[i].Min().Value;
- throw new InvalidOperationException(string.Format("The {0} is empty", this.GetType().ToString()));
+ pqueue = Util.Clamp(pqueue, 0, m_numberOfQueues - 1);
+ lookup.Heap = m_heaps[pqueue];
+ lookup.Heap.Add(new MinHeapItem(pqueue, entry, value), ref lookup.Handle);
+ m_lookupTable[localid] = lookup;
+
+ return true;
}
- internal bool TryDequeue(out EntityUpdate value)
+ internal bool TryDequeue(out EntityUpdate value, out Int32 timeinqueue)
{
- for (int i = 0; i < m_heaps.Length; ++i)
+ for (int i = 0; i < m_numberOfQueues; ++i)
{
- if (m_heaps[i].Count > 0)
+ // To get the fair queing, we cycle through each of the
+ // queues when finding an element to dequeue, this code
+ // assumes that the distribution of updates in the queues
+ // is polynomial, probably quadractic (eg distance of PI * R^2)
+ uint h = (uint)((m_nextQueue + i) % m_numberOfQueues);
+ if (m_heaps[h].Count > 0)
{
- MinHeapItem item = m_heaps[i].RemoveMin();
- m_lookupTable.Remove(item.LocalID);
+ m_nextQueue = (uint)((h + 1) % m_numberOfQueues);
+
+ MinHeapItem item = m_heaps[h].RemoveMin();
+ m_lookupTable.Remove(item.Value.Entity.LocalId);
+ timeinqueue = Util.EnvironmentTickCountSubtract(item.EntryTime);
value = item.Value;
+
return true;
}
}
+ timeinqueue = 0;
value = default(EntityUpdate);
return false;
}
@@ -11800,68 +11878,107 @@ namespace OpenSim.Region.ClientStack.LindenUDP
internal void Reprioritize(UpdatePriorityHandler handler)
{
MinHeapItem item;
- double priority;
-
foreach (LookupItem lookup in new List(this.m_lookupTable.Values))
{
if (lookup.Heap.TryGetValue(lookup.Handle, out item))
{
- priority = item.Priority;
- if (handler(ref priority, item.LocalID))
+ uint pqueue = item.PriorityQueue;
+ uint localid = item.Value.Entity.LocalId;
+
+ if (handler(ref pqueue, item.Value.Entity))
{
- if (lookup.Heap.ContainsHandle(lookup.Handle))
- lookup.Heap[lookup.Handle] =
- new MinHeapItem(priority, item.Value, item.LocalID, this.m_comparison);
+ // unless the priority queue has changed, there is no need to modify
+ // the entry
+ pqueue = Util.Clamp(pqueue, 0, m_numberOfQueues - 1);
+ if (pqueue != item.PriorityQueue)
+ {
+ lookup.Heap.Remove(lookup.Handle);
+
+ LookupItem litem = lookup;
+ litem.Heap = m_heaps[pqueue];
+ litem.Heap.Add(new MinHeapItem(pqueue, item), ref litem.Handle);
+ m_lookupTable[localid] = litem;
+ }
}
else
{
- m_log.Warn("[LLCLIENTVIEW]: UpdatePriorityHandler returned false, dropping update");
+ m_log.WarnFormat("[LLCLIENTVIEW]: UpdatePriorityHandler returned false for {0}",item.Value.Entity.UUID);
lookup.Heap.Remove(lookup.Handle);
- this.m_lookupTable.Remove(item.LocalID);
+ this.m_lookupTable.Remove(localid);
}
}
}
}
+ public override string ToString()
+ {
+ string s = "";
+ for (int i = 0; i < m_numberOfQueues; i++)
+ {
+ if (s != "") s += ",";
+ s += m_heaps[i].Count.ToString();
+ }
+ return s;
+ }
+
#region MinHeapItem
private struct MinHeapItem : IComparable
{
- private double priority;
private EntityUpdate value;
- private uint local_id;
- private Comparison comparison;
+ internal EntityUpdate Value {
+ get {
+ return this.value;
+ }
+ }
+
+ private uint pqueue;
+ internal uint PriorityQueue {
+ get {
+ return this.pqueue;
+ }
+ }
- internal MinHeapItem(double priority, EntityUpdate value, uint local_id) :
- this(priority, value, local_id, Comparer.Default) { }
- internal MinHeapItem(double priority, EntityUpdate value, uint local_id, IComparer comparer) :
- this(priority, value, local_id, new Comparison(comparer.Compare)) { }
- internal MinHeapItem(double priority, EntityUpdate value, uint local_id, Comparison comparison)
+ private Int32 entrytime;
+ internal Int32 EntryTime {
+ get {
+ return this.entrytime;
+ }
+ }
+
+ private UInt64 entryorder;
+ internal UInt64 EntryOrder
{
- this.priority = priority;
+ get {
+ return this.entryorder;
+ }
+ }
+
+ internal MinHeapItem(uint pqueue, MinHeapItem other)
+ {
+ this.entrytime = other.entrytime;
+ this.entryorder = other.entryorder;
+ this.value = other.value;
+ this.pqueue = pqueue;
+ }
+
+ internal MinHeapItem(uint pqueue, UInt64 entryorder, EntityUpdate value)
+ {
+ this.entrytime = Util.EnvironmentTickCount();
+ this.entryorder = entryorder;
this.value = value;
- this.local_id = local_id;
- this.comparison = comparison;
+ this.pqueue = pqueue;
}
- internal double Priority { get { return this.priority; } }
- internal EntityUpdate Value { get { return this.value; } }
- internal uint LocalID { get { return this.local_id; } }
-
public override string ToString()
{
- StringBuilder sb = new StringBuilder();
- sb.Append("[");
- sb.Append(this.priority.ToString());
- sb.Append(",");
- if (this.value != null)
- sb.Append(this.value.ToString());
- sb.Append("]");
- return sb.ToString();
+ return String.Format("[{0},{1},{2}]",pqueue,entryorder,value.Entity.LocalId);
}
public int CompareTo(MinHeapItem other)
{
- return this.comparison(this.priority, other.priority);
+ // I'm assuming that the root part of an SOG is added to the update queue
+ // before the component parts
+ return Comparer.Default.Compare(this.EntryOrder, other.EntryOrder);
}
}
#endregion
--
cgit v1.1