From df2d5a460f060129e5c09148b9fa4df2f241d8b1 Mon Sep 17 00:00:00 2001 From: jjgreens Date: Wed, 14 Oct 2009 23:15:03 -0700 Subject: Replaced the update lists with a priority queue implementation in LLClientView Replaced the update lists with a priority queue implementation in LLClientView. The priority queues are based on the MinHeap implementation also included in this commit within the OpneSim.Framework namespace. Initially setup to exactly mimic the behavior beofre the change which was a first come first serve queue. --- .../Region/ClientStack/LindenUDP/LLClientView.cs | 224 ++++++++++++++++----- 1 file changed, 179 insertions(+), 45 deletions(-) (limited to 'OpenSim/Region') diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs index 82a2cdd..93fdeef 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs @@ -321,11 +321,14 @@ namespace OpenSim.Region.ClientStack.LindenUDP private int m_cachedTextureSerial; private Timer m_avatarTerseUpdateTimer; - private List m_avatarTerseUpdates = new List(); + private PriorityQueue m_avatarTerseUpdates_ = + new PriorityQueue(); private Timer m_primTerseUpdateTimer; - private List m_primTerseUpdates = new List(); + private PriorityQueue m_primTerseUpdates_ = + new PriorityQueue(); private Timer m_primFullUpdateTimer; - private List m_primFullUpdates = new List(); + private PriorityQueue m_primFullUpdates_ = + new PriorityQueue(); private int m_moneyBalance; private int m_animationSequenceNumber = 1; private bool m_SendLogoutPacketWhenClosing = true; @@ -3435,16 +3438,16 @@ namespace OpenSim.Region.ClientStack.LindenUDP ImprovedTerseObjectUpdatePacket.ObjectDataBlock terseBlock = CreateAvatarImprovedBlock(localID, position, velocity,rotation); - lock (m_avatarTerseUpdates) + lock (m_avatarTerseUpdates_.SyncRoot) { - m_avatarTerseUpdates.Add(terseBlock); + m_avatarTerseUpdates_.Enqueue(DateTime.Now.ToOADate(), terseBlock, localID); // If packet is full or own movement packet, send it. - if (m_avatarTerseUpdates.Count >= m_avatarTerseUpdatesPerPacket) + if (m_avatarTerseUpdates_.Count >= m_avatarTerseUpdatesPerPacket) { ProcessAvatarTerseUpdates(this, null); } - else if (m_avatarTerseUpdates.Count == 1) + else if (m_avatarTerseUpdates_.Count == 1) { lock (m_avatarTerseUpdateTimer) m_avatarTerseUpdateTimer.Start(); @@ -3454,7 +3457,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP private void ProcessAvatarTerseUpdates(object sender, ElapsedEventArgs e) { - lock (m_avatarTerseUpdates) + lock (m_avatarTerseUpdates_.SyncRoot) { ImprovedTerseObjectUpdatePacket terse = (ImprovedTerseObjectUpdatePacket)PacketPool.Instance.GetPacket(PacketType.ImprovedTerseObjectUpdate); @@ -3465,8 +3468,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP (ushort)(Scene.TimeDilation * ushort.MaxValue); int max = m_avatarTerseUpdatesPerPacket; - if (max > m_avatarTerseUpdates.Count) - max = m_avatarTerseUpdates.Count; + if (max > m_avatarTerseUpdates_.Count) + max = m_avatarTerseUpdates_.Count; int count = 0; int size = 0; @@ -3474,30 +3477,30 @@ namespace OpenSim.Region.ClientStack.LindenUDP byte[] zerobuffer = new byte[1024]; byte[] blockbuffer = new byte[1024]; + Queue updates = new Queue(); + for (count = 0 ; count < max ; count++) { int length = 0; - m_avatarTerseUpdates[count].ToBytes(blockbuffer, ref length); + m_avatarTerseUpdates_.Peek().ToBytes(blockbuffer, ref length); length = Helpers.ZeroEncode(blockbuffer, length, zerobuffer); if (size + length > Packet.MTU) break; size += length; + updates.Enqueue(m_avatarTerseUpdates_.Dequeue()); } terse.ObjectData = new ImprovedTerseObjectUpdatePacket.ObjectDataBlock[count]; for (int i = 0 ; i < count ; i++) - { - terse.ObjectData[i] = m_avatarTerseUpdates[0]; - m_avatarTerseUpdates.RemoveAt(0); - } + terse.ObjectData[i] = updates.Dequeue(); terse.Header.Reliable = false; terse.Header.Zerocoded = true; // FIXME: Move this to ThrottleOutPacketType.State when the real prioritization code is committed OutPacket(terse, ThrottleOutPacketType.Task); - if (m_avatarTerseUpdates.Count == 0) + if (m_avatarTerseUpdates_.Count == 0) { lock (m_avatarTerseUpdateTimer) m_avatarTerseUpdateTimer.Stop(); @@ -3660,14 +3663,14 @@ namespace OpenSim.Region.ClientStack.LindenUDP objectData.TextureAnim = textureanim; } - lock (m_primFullUpdates) + lock (m_primFullUpdates_.SyncRoot) { - if (m_primFullUpdates.Count == 0) + if (m_primFullUpdates_.Count == 0) m_primFullUpdateTimer.Start(); - m_primFullUpdates.Add(objectData); + m_primFullUpdates_.Enqueue(DateTime.Now.ToOADate(), objectData, localID); - if (m_primFullUpdates.Count >= m_primFullUpdatesPerPacket) + if (m_primFullUpdates_.Count >= m_primFullUpdatesPerPacket) ProcessPrimFullUpdates(this, null); } } @@ -3690,9 +3693,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP void ProcessPrimFullUpdates(object sender, ElapsedEventArgs e) { - lock (m_primFullUpdates) + lock (m_primFullUpdates_.SyncRoot) { - if (m_primFullUpdates.Count == 0 && m_primFullUpdateTimer.Enabled) + if (m_primFullUpdates_.Count == 0 && m_primFullUpdateTimer.Enabled) { lock (m_primFullUpdateTimer) m_primFullUpdateTimer.Stop(); @@ -3709,7 +3712,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP outPacket.RegionData.TimeDilation = (ushort)(Scene.TimeDilation * ushort.MaxValue); - int max = m_primFullUpdates.Count; + int max = m_primFullUpdates_.Count; if (max > m_primFullUpdatesPerPacket) max = m_primFullUpdatesPerPacket; @@ -3719,29 +3722,29 @@ namespace OpenSim.Region.ClientStack.LindenUDP byte[] zerobuffer = new byte[1024]; byte[] blockbuffer = new byte[1024]; + Queue updates = new Queue(); + for (count = 0 ; count < max ; count++) { int length = 0; - m_primFullUpdates[count].ToBytes(blockbuffer, ref length); + m_primFullUpdates_.Peek().ToBytes(blockbuffer, ref length); length = Helpers.ZeroEncode(blockbuffer, length, zerobuffer); if (size + length > Packet.MTU) break; size += length; + updates.Enqueue(m_primFullUpdates_.Dequeue()); } outPacket.ObjectData = new ObjectUpdatePacket.ObjectDataBlock[count]; for (int index = 0 ; index < count ; index++) - { - outPacket.ObjectData[index] = m_primFullUpdates[0]; - m_primFullUpdates.RemoveAt(0); - } + outPacket.ObjectData[index] = updates.Dequeue(); outPacket.Header.Zerocoded = true; OutPacket(outPacket, ThrottleOutPacketType.State); - if (m_primFullUpdates.Count == 0 && m_primFullUpdateTimer.Enabled) + if (m_primFullUpdates_.Count == 0 && m_primFullUpdateTimer.Enabled) lock (m_primFullUpdateTimer) m_primFullUpdateTimer.Stop(); } @@ -3763,23 +3766,23 @@ namespace OpenSim.Region.ClientStack.LindenUDP CreatePrimImprovedBlock(localID, position, rotation, velocity, rotationalvelocity, state); - lock (m_primTerseUpdates) + lock (m_primTerseUpdates_.SyncRoot) { - if (m_primTerseUpdates.Count == 0) + if (m_primTerseUpdates_.Count == 0) m_primTerseUpdateTimer.Start(); - m_primTerseUpdates.Add(objectData); + m_primTerseUpdates_.Enqueue(DateTime.Now.ToOADate(), objectData, localID); - if (m_primTerseUpdates.Count >= m_primTerseUpdatesPerPacket) + if (m_primTerseUpdates_.Count >= m_primTerseUpdatesPerPacket) ProcessPrimTerseUpdates(this, null); } } void ProcessPrimTerseUpdates(object sender, ElapsedEventArgs e) { - lock (m_primTerseUpdates) + lock (m_primTerseUpdates_.SyncRoot) { - if (m_primTerseUpdates.Count == 0) + if (m_primTerseUpdates_.Count == 0) { lock (m_primTerseUpdateTimer) m_primTerseUpdateTimer.Stop(); @@ -3797,7 +3800,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP outPacket.RegionData.TimeDilation = (ushort)(Scene.TimeDilation * ushort.MaxValue); - int max = m_primTerseUpdates.Count; + int max = m_primTerseUpdates_.Count; if (max > m_primTerseUpdatesPerPacket) max = m_primTerseUpdatesPerPacket; @@ -3807,14 +3810,17 @@ namespace OpenSim.Region.ClientStack.LindenUDP byte[] zerobuffer = new byte[1024]; byte[] blockbuffer = new byte[1024]; + Queue updates = new Queue(); + for (count = 0 ; count < max ; count++) { int length = 0; - m_primTerseUpdates[count].ToBytes(blockbuffer, ref length); + m_primTerseUpdates_.Peek().ToBytes(blockbuffer, ref length); length = Helpers.ZeroEncode(blockbuffer, length, zerobuffer); if (size + length > Packet.MTU) break; size += length; + updates.Enqueue(m_primTerseUpdates_.Dequeue()); } outPacket.ObjectData = @@ -3822,16 +3828,13 @@ namespace OpenSim.Region.ClientStack.LindenUDP ObjectDataBlock[count]; for (int index = 0 ; index < count ; index++) - { - outPacket.ObjectData[index] = m_primTerseUpdates[0]; - m_primTerseUpdates.RemoveAt(0); - } + outPacket.ObjectData[index] = updates.Dequeue(); outPacket.Header.Reliable = false; outPacket.Header.Zerocoded = true; OutPacket(outPacket, ThrottleOutPacketType.State); - if (m_primTerseUpdates.Count == 0) + if (m_primTerseUpdates_.Count == 0) lock (m_primTerseUpdateTimer) m_primTerseUpdateTimer.Stop(); } @@ -3839,15 +3842,15 @@ namespace OpenSim.Region.ClientStack.LindenUDP public void FlushPrimUpdates() { - while (m_primFullUpdates.Count > 0) + while (m_primFullUpdates_.Count > 0) { ProcessPrimFullUpdates(this, null); } - while (m_primTerseUpdates.Count > 0) + while (m_primTerseUpdates_.Count > 0) { ProcessPrimTerseUpdates(this, null); } - while (m_avatarTerseUpdates.Count > 0) + while (m_avatarTerseUpdates_.Count > 0) { ProcessAvatarTerseUpdates(this, null); } @@ -10578,5 +10581,136 @@ namespace OpenSim.Region.ClientStack.LindenUDP pack.TextureData.TextureID = textureID; OutPacket(pack, ThrottleOutPacketType.Task); } + + #region PriorityQueue + private class PriorityQueue + { + private MinHeap[] heaps = new MinHeap[1]; + private Dictionary lookup_table = new Dictionary(); + private Comparison comparison; + private object sync_root = new object(); + + internal PriorityQueue() : + this(MinHeap.DEFAULT_CAPACITY, Comparer.Default) { } + internal PriorityQueue(int capacity) : + this(capacity, Comparer.Default) { } + internal PriorityQueue(IComparer comparer) : + this(new Comparison(comparer.Compare)) { } + internal PriorityQueue(Comparison comparison) : + this(MinHeap.DEFAULT_CAPACITY, comparison) { } + internal PriorityQueue(int capacity, IComparer comparer) : + this(capacity, new Comparison(comparer.Compare)) { } + internal PriorityQueue(int capacity, Comparison comparison) + { + for (int i = 0; i < heaps.Length; ++i) + heaps[i] = new MinHeap(capacity); + this.comparison = comparison; + } + + internal object SyncRoot { get { return this.sync_root; } } + internal int Count + { + get + { + int count = 0; + for (int i = 0; i < heaps.Length; ++i) + count = heaps[i].Count; + return count; + } + } + + internal bool Enqueue(TPriority priority, TValue value, uint local_id) + { + LookupItem item; + + if (lookup_table.TryGetValue(local_id, out item)) + { + item.Heap[item.Handle] = new MinHeapItem(priority, value, local_id, this.comparison); + return false; + } + else + { + item.Heap = heaps[0]; + item.Heap.Add(new MinHeapItem(priority, value, local_id, this.comparison), ref item.Handle); + lookup_table.Add(local_id, item); + return true; + } + } + + internal TValue Peek() + { + for (int i = 0; i < heaps.Length; ++i) + if (heaps[i].Count > 0) + return heaps[i].Min().Value; + throw new InvalidOperationException(string.Format("The {0} is empty", this.GetType().ToString())); + } + + internal TValue Dequeue() + { + for (int i = 0; i < heaps.Length; ++i) + { + if (heaps[i].Count > 0) + { + MinHeapItem item = heaps[i].RemoveMin(); + lookup_table.Remove(item.LocalID); + return item.Value; + } + } + throw new InvalidOperationException(string.Format("The {0} is empty", this.GetType().ToString())); + } + + #region MinHeapItem + private struct MinHeapItem : IComparable + { + private TPriority priority; + private TValue value; + private uint local_id; + private Comparison comparison; + + internal MinHeapItem(TPriority priority, TValue value, uint local_id) : + this(priority, value, local_id, Comparer.Default) { } + internal MinHeapItem(TPriority priority, TValue value, uint local_id, IComparer comparer) : + this(priority, value, local_id, new Comparison(comparer.Compare)) { } + internal MinHeapItem(TPriority priority, TValue value, uint local_id, Comparison comparison) + { + this.priority = priority; + this.value = value; + this.local_id = local_id; + this.comparison = comparison; + } + + internal TPriority Priority { get { return this.priority; } } + internal TValue Value { get { return this.value; } } + internal uint LocalID { get { return this.local_id; } } + + public override string ToString() + { + StringBuilder sb = new StringBuilder(); + sb.Append("["); + if (this.priority != null) + sb.Append(this.priority.ToString()); + sb.Append(","); + if (this.value != null) + sb.Append(this.value.ToString()); + sb.Append("]"); + return sb.ToString(); + } + + public int CompareTo(MinHeapItem other) + { + return this.comparison(this.priority, other.priority); + } + } + #endregion + + #region LookupItem + private struct LookupItem { + internal MinHeap Heap; + internal IHandle Handle; + } + #endregion + } + #endregion + } } -- cgit v1.1