aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim
diff options
context:
space:
mode:
authorJohn Hurliman2009-10-21 11:59:48 -0700
committerJohn Hurliman2009-10-21 11:59:48 -0700
commit9178537e9414478f0a9bd84bb5e106b2f15640c3 (patch)
treec21bca46c08cdc9868ca7608856f51d029207aab /OpenSim
parentFixed the way OnQueueEmpty is called to prevent simultaneous calls for the sa... (diff)
downloadopensim-SC_OLD-9178537e9414478f0a9bd84bb5e106b2f15640c3.zip
opensim-SC_OLD-9178537e9414478f0a9bd84bb5e106b2f15640c3.tar.gz
opensim-SC_OLD-9178537e9414478f0a9bd84bb5e106b2f15640c3.tar.bz2
opensim-SC_OLD-9178537e9414478f0a9bd84bb5e106b2f15640c3.tar.xz
* Replaced the UnackedPacketCollection with a lockless implementation. The tiny amount of time spent in the locks turned into a lot of time when the rest of the LLUDP implementation went lockless
* Changed the timer tracking numbers for each client to not have "memory". It will no longer queue up calls to functions like ResendUnacked * Reverted Jim's WaitHandle code. Although it was technically more correct, it exhibited the exact same behavior as the old code but spent more cycles. The 20ms has been replaced with the minimum amount of time before a token bucket could receive a drip, and an else { sleep(0); } was added to make sure the outgoing packet handler always yields at least a minimum amount
Diffstat (limited to 'OpenSim')
-rw-r--r--OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs11
-rw-r--r--OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs33
-rw-r--r--OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs143
-rw-r--r--OpenSim/Region/ClientStack/LindenUDP/UnackedPacketCollection.cs147
4 files changed, 126 insertions, 208 deletions
diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs
index 38bbce0..997f38c 100644
--- a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs
+++ b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs
@@ -1228,10 +1228,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
1228 StartPingCheckPacket pc = (StartPingCheckPacket)PacketPool.Instance.GetPacket(PacketType.StartPingCheck); 1228 StartPingCheckPacket pc = (StartPingCheckPacket)PacketPool.Instance.GetPacket(PacketType.StartPingCheck);
1229 pc.Header.Reliable = false; 1229 pc.Header.Reliable = false;
1230 1230
1231 OutgoingPacket oldestPacket = m_udpClient.NeedAcks.GetOldest();
1232
1233 pc.PingID.PingID = seq; 1231 pc.PingID.PingID = seq;
1234 pc.PingID.OldestUnacked = (oldestPacket != null) ? oldestPacket.SequenceNumber : 0; 1232 // We *could* get OldestUnacked, but it would hurt performance and not provide any benefit
1233 pc.PingID.OldestUnacked = 0;
1235 1234
1236 OutPacket(pc, ThrottleOutPacketType.Unknown); 1235 OutPacket(pc, ThrottleOutPacketType.Unknown);
1237 } 1236 }
@@ -3320,8 +3319,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
3320 // If we received an update about our own avatar, process the avatar update priority queue immediately 3319 // If we received an update about our own avatar, process the avatar update priority queue immediately
3321 if (data.AgentID == m_agentId) 3320 if (data.AgentID == m_agentId)
3322 ProcessAvatarTerseUpdates(); 3321 ProcessAvatarTerseUpdates();
3323 else
3324 m_udpServer.SignalOutgoingPacketHandler();
3325 } 3322 }
3326 3323
3327 private void ProcessAvatarTerseUpdates() 3324 private void ProcessAvatarTerseUpdates()
@@ -3409,8 +3406,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
3409 3406
3410 lock (m_primFullUpdates.SyncRoot) 3407 lock (m_primFullUpdates.SyncRoot)
3411 m_primFullUpdates.Enqueue(data.priority, objectData, data.localID); 3408 m_primFullUpdates.Enqueue(data.priority, objectData, data.localID);
3412
3413 m_udpServer.SignalOutgoingPacketHandler();
3414 } 3409 }
3415 3410
3416 void ProcessPrimFullUpdates() 3411 void ProcessPrimFullUpdates()
@@ -3454,8 +3449,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
3454 3449
3455 lock (m_primTerseUpdates.SyncRoot) 3450 lock (m_primTerseUpdates.SyncRoot)
3456 m_primTerseUpdates.Enqueue(data.Priority, objectData, data.LocalID); 3451 m_primTerseUpdates.Enqueue(data.Priority, objectData, data.LocalID);
3457
3458 m_udpServer.SignalOutgoingPacketHandler();
3459 } 3452 }
3460 3453
3461 void ProcessPrimTerseUpdates() 3454 void ProcessPrimTerseUpdates()
diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs
index ca9925c..458e78d 100644
--- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs
+++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs
@@ -202,7 +202,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
202 public void Shutdown() 202 public void Shutdown()
203 { 203 {
204 IsConnected = false; 204 IsConnected = false;
205 NeedAcks.Clear();
206 for (int i = 0; i < THROTTLE_CATEGORY_COUNT; i++) 205 for (int i = 0; i < THROTTLE_CATEGORY_COUNT; i++)
207 { 206 {
208 m_packetOutboxes[i].Clear(); 207 m_packetOutboxes[i].Clear();
@@ -394,7 +393,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
394 { 393 {
395 // Not enough tokens in the bucket, queue this packet 394 // Not enough tokens in the bucket, queue this packet
396 queue.Enqueue(packet); 395 queue.Enqueue(packet);
397 m_udpServer.SignalOutgoingPacketHandler();
398 return true; 396 return true;
399 } 397 }
400 } 398 }
@@ -411,15 +409,13 @@ namespace OpenSim.Region.ClientStack.LindenUDP
411 /// </summary> 409 /// </summary>
412 /// <remarks>This function is only called from a synchronous loop in the 410 /// <remarks>This function is only called from a synchronous loop in the
413 /// UDPServer so we don't need to bother making this thread safe</remarks> 411 /// UDPServer so we don't need to bother making this thread safe</remarks>
414 /// <returns>The minimum amount of time before the next packet 412 /// <returns>True if any packets were sent, otherwise false</returns>
415 /// can be sent to this client</returns> 413 public bool DequeueOutgoing()
416 public int DequeueOutgoing()
417 { 414 {
418 OutgoingPacket packet; 415 OutgoingPacket packet;
419 OpenSim.Framework.LocklessQueue<OutgoingPacket> queue; 416 OpenSim.Framework.LocklessQueue<OutgoingPacket> queue;
420 TokenBucket bucket; 417 TokenBucket bucket;
421 int dataLength; 418 bool packetSent = false;
422 int minTimeout = Int32.MaxValue;
423 419
424 //string queueDebugOutput = String.Empty; // Serious debug business 420 //string queueDebugOutput = String.Empty; // Serious debug business
425 421
@@ -434,18 +430,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP
434 // leaving a dequeued packet still waiting to be sent out. Try to 430 // leaving a dequeued packet still waiting to be sent out. Try to
435 // send it again 431 // send it again
436 OutgoingPacket nextPacket = m_nextPackets[i]; 432 OutgoingPacket nextPacket = m_nextPackets[i];
437 dataLength = nextPacket.Buffer.DataLength; 433 if (bucket.RemoveTokens(nextPacket.Buffer.DataLength))
438 if (bucket.RemoveTokens(dataLength))
439 { 434 {
440 // Send the packet 435 // Send the packet
441 m_udpServer.SendPacketFinal(nextPacket); 436 m_udpServer.SendPacketFinal(nextPacket);
442 m_nextPackets[i] = null; 437 m_nextPackets[i] = null;
443 minTimeout = 0; 438 packetSent = true;
444 }
445 else if (minTimeout != 0)
446 {
447 // Check the minimum amount of time we would have to wait before this packet can be sent out
448 minTimeout = Math.Min(minTimeout, ((dataLength - bucket.Content) / bucket.DripPerMS) + 1);
449 } 439 }
450 } 440 }
451 else 441 else
@@ -457,23 +447,16 @@ namespace OpenSim.Region.ClientStack.LindenUDP
457 { 447 {
458 // A packet was pulled off the queue. See if we have 448 // A packet was pulled off the queue. See if we have
459 // enough tokens in the bucket to send it out 449 // enough tokens in the bucket to send it out
460 dataLength = packet.Buffer.DataLength; 450 if (bucket.RemoveTokens(packet.Buffer.DataLength))
461 if (bucket.RemoveTokens(dataLength))
462 { 451 {
463 // Send the packet 452 // Send the packet
464 m_udpServer.SendPacketFinal(packet); 453 m_udpServer.SendPacketFinal(packet);
465 minTimeout = 0; 454 packetSent = true;
466 } 455 }
467 else 456 else
468 { 457 {
469 // Save the dequeued packet for the next iteration 458 // Save the dequeued packet for the next iteration
470 m_nextPackets[i] = packet; 459 m_nextPackets[i] = packet;
471
472 if (minTimeout != 0)
473 {
474 // Check the minimum amount of time we would have to wait before this packet can be sent out
475 minTimeout = Math.Min(minTimeout, ((dataLength - bucket.Content) / bucket.DripPerMS) + 1);
476 }
477 } 460 }
478 461
479 // If the queue is empty after this dequeue, fire the queue 462 // If the queue is empty after this dequeue, fire the queue
@@ -492,7 +475,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
492 } 475 }
493 476
494 //m_log.Info("[LLUDPCLIENT]: Queues: " + queueDebugOutput); // Serious debug business 477 //m_log.Info("[LLUDPCLIENT]: Queues: " + queueDebugOutput); // Serious debug business
495 return minTimeout; 478 return packetSent;
496 } 479 }
497 480
498 /// <summary> 481 /// <summary>
diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
index 7d5c11e..a8ce102 100644
--- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
+++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs
@@ -121,12 +121,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
121 private int m_recvBufferSize; 121 private int m_recvBufferSize;
122 /// <summary>Flag to process packets asynchronously or synchronously</summary> 122 /// <summary>Flag to process packets asynchronously or synchronously</summary>
123 private bool m_asyncPacketHandling; 123 private bool m_asyncPacketHandling;
124 /// <summary>Track the minimum amount of time needed to send the next packet in the 124 /// <summary>Tracks whether or not a packet was sent each round so we know
125 /// OutgoingPacketHandler loop so we know when to sleep</summary> 125 /// whether or not to sleep</summary>
126 private int m_minTimeout = Int32.MaxValue; 126 private bool m_packetSent;
127 /// <summary>EventWaitHandle to signify the outgoing packet handler thread that
128 /// there is more work to do</summary>
129 private EventWaitHandle m_outgoingWaitHandle;
130 127
131 public Socket Server { get { return null; } } 128 public Socket Server { get { return null; } }
132 129
@@ -174,8 +171,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
174 171
175 base.Start(m_recvBufferSize, m_asyncPacketHandling); 172 base.Start(m_recvBufferSize, m_asyncPacketHandling);
176 173
177 m_outgoingWaitHandle = new EventWaitHandle(false, EventResetMode.AutoReset);
178
179 // Start the incoming packet processing thread 174 // Start the incoming packet processing thread
180 Thread incomingThread = new Thread(IncomingPacketHandler); 175 Thread incomingThread = new Thread(IncomingPacketHandler);
181 incomingThread.Name = "Incoming Packets (" + m_scene.RegionInfo.RegionName + ")"; 176 incomingThread.Name = "Incoming Packets (" + m_scene.RegionInfo.RegionName + ")";
@@ -190,8 +185,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
190 { 185 {
191 m_log.Info("[LLUDPSERVER]: Shutting down the LLUDP server for " + m_scene.RegionInfo.RegionName); 186 m_log.Info("[LLUDPSERVER]: Shutting down the LLUDP server for " + m_scene.RegionInfo.RegionName);
192 base.Stop(); 187 base.Stop();
193
194 m_outgoingWaitHandle.Close();
195 } 188 }
196 189
197 public void AddScene(IScene scene) 190 public void AddScene(IScene scene)
@@ -374,10 +367,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
374 StartPingCheckPacket pc = (StartPingCheckPacket)PacketPool.Instance.GetPacket(PacketType.StartPingCheck); 367 StartPingCheckPacket pc = (StartPingCheckPacket)PacketPool.Instance.GetPacket(PacketType.StartPingCheck);
375 pc.Header.Reliable = false; 368 pc.Header.Reliable = false;
376 369
377 OutgoingPacket oldestPacket = udpClient.NeedAcks.GetOldest();
378
379 pc.PingID.PingID = (byte)udpClient.CurrentPingSequence++; 370 pc.PingID.PingID = (byte)udpClient.CurrentPingSequence++;
380 pc.PingID.OldestUnacked = (oldestPacket != null) ? oldestPacket.SequenceNumber : 0; 371 // We *could* get OldestUnacked, but it would hurt performance and not provide any benefit
372 pc.PingID.OldestUnacked = 0;
381 373
382 SendPacket(udpClient, pc, ThrottleOutPacketType.Unknown, false); 374 SendPacket(udpClient, pc, ThrottleOutPacketType.Unknown, false);
383 } 375 }
@@ -397,39 +389,36 @@ namespace OpenSim.Region.ClientStack.LindenUDP
397 return; 389 return;
398 } 390 }
399 391
400 if (udpClient.NeedAcks.Count > 0) 392 // Get a list of all of the packets that have been sitting unacked longer than udpClient.RTO
393 List<OutgoingPacket> expiredPackets = udpClient.NeedAcks.GetExpiredPackets(udpClient.RTO);
394
395 if (expiredPackets != null)
401 { 396 {
402 // Get a list of all of the packets that have been sitting unacked longer than udpClient.RTO 397 m_log.Debug("[LLUDPSERVER]: Resending " + expiredPackets.Count + " packets to " + udpClient.AgentID + ", RTO=" + udpClient.RTO);
403 List<OutgoingPacket> expiredPackets = udpClient.NeedAcks.GetExpiredPackets(udpClient.RTO);
404 398
405 if (expiredPackets != null) 399 // Resend packets
400 for (int i = 0; i < expiredPackets.Count; i++)
406 { 401 {
407 m_log.Debug("[LLUDPSERVER]: Resending " + expiredPackets.Count + " packets to " + udpClient.AgentID); 402 OutgoingPacket outgoingPacket = expiredPackets[i];
408 403
409 // Resend packets 404 //m_log.DebugFormat("[LLUDPSERVER]: Resending packet #{0} (attempt {1}), {2}ms have passed",
410 for (int i = 0; i < expiredPackets.Count; i++) 405 // outgoingPacket.SequenceNumber, outgoingPacket.ResendCount, Environment.TickCount - outgoingPacket.TickCount);
411 {
412 OutgoingPacket outgoingPacket = expiredPackets[i];
413
414 //m_log.DebugFormat("[LLUDPSERVER]: Resending packet #{0} (attempt {1}), {2}ms have passed",
415 // outgoingPacket.SequenceNumber, outgoingPacket.ResendCount, Environment.TickCount - outgoingPacket.TickCount);
416 406
417 // Set the resent flag 407 // Set the resent flag
418 outgoingPacket.Buffer.Data[0] = (byte)(outgoingPacket.Buffer.Data[0] | Helpers.MSG_RESENT); 408 outgoingPacket.Buffer.Data[0] = (byte)(outgoingPacket.Buffer.Data[0] | Helpers.MSG_RESENT);
419 outgoingPacket.Category = ThrottleOutPacketType.Resend; 409 outgoingPacket.Category = ThrottleOutPacketType.Resend;
420 410
421 // The TickCount will be set to the current time when the packet 411 // The TickCount will be set to the current time when the packet
422 // is actually sent out again 412 // is actually sent out again
423 outgoingPacket.TickCount = 0; 413 outgoingPacket.TickCount = 0;
424 414
425 // Bump up the resend count on this packet 415 // Bump up the resend count on this packet
426 Interlocked.Increment(ref outgoingPacket.ResendCount); 416 Interlocked.Increment(ref outgoingPacket.ResendCount);
427 //Interlocked.Increment(ref Stats.ResentPackets); 417 //Interlocked.Increment(ref Stats.ResentPackets);
428 418
429 // Requeue or resend the packet 419 // Requeue or resend the packet
430 if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket)) 420 if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket))
431 SendPacketFinal(outgoingPacket); 421 SendPacketFinal(outgoingPacket);
432 }
433 } 422 }
434 } 423 }
435 } 424 }
@@ -577,11 +566,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
577 // Handle appended ACKs 566 // Handle appended ACKs
578 if (packet.Header.AppendedAcks && packet.Header.AckList != null) 567 if (packet.Header.AppendedAcks && packet.Header.AckList != null)
579 { 568 {
580 lock (udpClient.NeedAcks.SyncRoot) 569 for (int i = 0; i < packet.Header.AckList.Length; i++)
581 { 570 udpClient.NeedAcks.Remove(packet.Header.AckList[i], now, packet.Header.Resent);
582 for (int i = 0; i < packet.Header.AckList.Length; i++)
583 AcknowledgePacket(udpClient, packet.Header.AckList[i], now, packet.Header.Resent);
584 }
585 } 571 }
586 572
587 // Handle PacketAck packets 573 // Handle PacketAck packets
@@ -589,11 +575,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
589 { 575 {
590 PacketAckPacket ackPacket = (PacketAckPacket)packet; 576 PacketAckPacket ackPacket = (PacketAckPacket)packet;
591 577
592 lock (udpClient.NeedAcks.SyncRoot) 578 for (int i = 0; i < ackPacket.Packets.Length; i++)
593 { 579 udpClient.NeedAcks.Remove(ackPacket.Packets[i].ID, now, packet.Header.Resent);
594 for (int i = 0; i < ackPacket.Packets.Length; i++)
595 AcknowledgePacket(udpClient, ackPacket.Packets[i].ID, now, packet.Header.Resent);
596 }
597 580
598 // We don't need to do anything else with PacketAck packets 581 // We don't need to do anything else with PacketAck packets
599 return; 582 return;
@@ -734,21 +717,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
734 client.Close(); 717 client.Close();
735 } 718 }
736 719
737 private void AcknowledgePacket(LLUDPClient client, uint ack, int currentTime, bool fromResend)
738 {
739 OutgoingPacket ackedPacket;
740 if (client.NeedAcks.RemoveUnsafe(ack, out ackedPacket) && !fromResend)
741 {
742 // Update stats
743 Interlocked.Add(ref client.UnackedBytes, -ackedPacket.Buffer.DataLength);
744
745 // Calculate the round-trip time for this packet and its ACK
746 int rtt = currentTime - ackedPacket.TickCount;
747 if (rtt > 0)
748 client.UpdateRoundTrip(rtt);
749 }
750 }
751
752 private void IncomingPacketHandler() 720 private void IncomingPacketHandler()
753 { 721 {
754 // Set this culture for the thread that incoming packets are received 722 // Set this culture for the thread that incoming packets are received
@@ -775,11 +743,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
775 packetInbox.Clear(); 743 packetInbox.Clear();
776 } 744 }
777 745
778 public bool SignalOutgoingPacketHandler()
779 {
780 return m_outgoingWaitHandle.Set();
781 }
782
783 private void OutgoingPacketHandler() 746 private void OutgoingPacketHandler()
784 { 747 {
785 // Set this culture for the thread that outgoing packets are sent 748 // Set this culture for the thread that outgoing packets are sent
@@ -790,28 +753,19 @@ namespace OpenSim.Region.ClientStack.LindenUDP
790 { 753 {
791 try 754 try
792 { 755 {
793 m_minTimeout = Int32.MaxValue; 756 m_packetSent = false;
794 757
795 // Handle outgoing packets, resends, acknowledgements, and pings for each 758 // Handle outgoing packets, resends, acknowledgements, and pings for each
796 // client. m_minTimeout will be set to 0 if more packets are waiting in the 759 // client. m_packetSent will be set to true if a packet is sent
797 // queues with bandwidth to spare, or the number of milliseconds we need to
798 // wait before at least one packet can be sent to a client
799 m_scene.ClientManager.ForEachSync(ClientOutgoingPacketHandler); 760 m_scene.ClientManager.ForEachSync(ClientOutgoingPacketHandler);
800 761
801 // Can't wait for a negative amount of time, and put a 100ms ceiling on our 762 // If a packet was sent, only do a minimum length context switch to allow
802 // maximum wait time 763 // other parts of the code to do work. If nothing was sent, sleep for the
803 m_minTimeout = Utils.Clamp(m_minTimeout, 0, 100); 764 // minimum amount of time before a token bucket could get more tokens
804 765 if (m_packetSent)
805 if (m_minTimeout > 0) 766 Thread.Sleep(0);
806 { 767 else
807 // Don't bother waiting for a shorter interval than our TickCountResolution 768 Thread.Sleep((int)TickCountResolution);
808 // since the token buckets wouldn't update anyways
809 m_minTimeout = Math.Max(m_minTimeout, (int)TickCountResolution);
810
811 // Wait for someone to signal that packets are ready to be sent, or for our
812 // sleep interval to expire
813 m_outgoingWaitHandle.WaitOne(m_minTimeout);
814 }
815 } 769 }
816 catch (Exception ex) 770 catch (Exception ex)
817 { 771 {
@@ -841,7 +795,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
841 if (udpClient.ElapsedMSOutgoingPacketHandler >= 100) 795 if (udpClient.ElapsedMSOutgoingPacketHandler >= 100)
842 { 796 {
843 ResendUnacked(udpClient); 797 ResendUnacked(udpClient);
844 udpClient.ElapsedMSOutgoingPacketHandler -= 100; 798 udpClient.ElapsedMSOutgoingPacketHandler = 0;
845 udpClient.Elapsed100MSOutgoingPacketHandler += 1; 799 udpClient.Elapsed100MSOutgoingPacketHandler += 1;
846 } 800 }
847 801
@@ -849,7 +803,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
849 if (udpClient.Elapsed100MSOutgoingPacketHandler >= 5) 803 if (udpClient.Elapsed100MSOutgoingPacketHandler >= 5)
850 { 804 {
851 SendAcks(udpClient); 805 SendAcks(udpClient);
852 udpClient.Elapsed100MSOutgoingPacketHandler -= 5; 806 udpClient.Elapsed100MSOutgoingPacketHandler = 0;
853 udpClient.Elapsed500MSOutgoingPacketHandler += 1; 807 udpClient.Elapsed500MSOutgoingPacketHandler += 1;
854 } 808 }
855 809
@@ -857,19 +811,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP
857 if (udpClient.Elapsed500MSOutgoingPacketHandler >= 10) 811 if (udpClient.Elapsed500MSOutgoingPacketHandler >= 10)
858 { 812 {
859 SendPing(udpClient); 813 SendPing(udpClient);
860 udpClient.Elapsed500MSOutgoingPacketHandler -= 10; 814 udpClient.Elapsed500MSOutgoingPacketHandler = 0;
861 } 815 }
862 816
863 // Dequeue any outgoing packets that are within the throttle limits 817 // Dequeue any outgoing packets that are within the throttle limits
864 // and get the minimum time we would have to sleep before this client 818 if (udpClient.DequeueOutgoing())
865 // could send a packet out 819 m_packetSent = true;
866 int minTimeoutThisLoop = udpClient.DequeueOutgoing();
867
868 // Although this is not thread safe, it is cheaper than locking and the
869 // worst that will happen is we sleep for slightly longer than the
870 // minimum necessary interval
871 if (minTimeoutThisLoop < m_minTimeout)
872 m_minTimeout = minTimeoutThisLoop;
873 } 820 }
874 821
875 udpClient.TickLastOutgoingPacketHandler = thisTick; 822 udpClient.TickLastOutgoingPacketHandler = thisTick;
diff --git a/OpenSim/Region/ClientStack/LindenUDP/UnackedPacketCollection.cs b/OpenSim/Region/ClientStack/LindenUDP/UnackedPacketCollection.cs
index 87c7df4..12f0c0a 100644
--- a/OpenSim/Region/ClientStack/LindenUDP/UnackedPacketCollection.cs
+++ b/OpenSim/Region/ClientStack/LindenUDP/UnackedPacketCollection.cs
@@ -37,97 +37,57 @@ namespace OpenSim.Region.ClientStack.LindenUDP
37 /// </summary> 37 /// </summary>
38 public sealed class UnackedPacketCollection 38 public sealed class UnackedPacketCollection
39 { 39 {
40 /// <summary>Synchronization primitive. A lock must be acquired on this
41 /// object before calling any of the unsafe methods</summary>
42 public object SyncRoot = new object();
43
44 /// <summary>Holds the actual unacked packet data, sorted by sequence number</summary>
45 private SortedDictionary<uint, OutgoingPacket> packets = new SortedDictionary<uint, OutgoingPacket>();
46
47 /// <summary>Gets the total number of unacked packets</summary>
48 public int Count { get { return packets.Count; } }
49
50 /// <summary> 40 /// <summary>
51 /// Default constructor 41 /// Holds information about a pending acknowledgement
52 /// </summary> 42 /// </summary>
53 public UnackedPacketCollection() 43 private struct PendingAck
54 { 44 {
55 } 45 /// <summary>Sequence number of the packet to remove</summary>
46 public uint SequenceNumber;
47 /// <summary>Environment.TickCount value when the remove was queued.
48 /// This is used to update round-trip times for packets</summary>
49 public int RemoveTime;
50 /// <summary>Whether or not this acknowledgement was attached to a
51 /// resent packet. If so, round-trip time will not be calculated</summary>
52 public bool FromResend;
56 53
57 /// <summary> 54 public PendingAck(uint sequenceNumber, int currentTime, bool fromResend)
58 /// Add an unacked packet to the collection
59 /// </summary>
60 /// <param name="packet">Packet that is awaiting acknowledgement</param>
61 /// <returns>True if the packet was successfully added, false if the
62 /// packet already existed in the collection</returns>
63 public bool Add(OutgoingPacket packet)
64 {
65 lock (SyncRoot)
66 { 55 {
67 if (!packets.ContainsKey(packet.SequenceNumber)) 56 SequenceNumber = sequenceNumber;
68 { 57 RemoveTime = currentTime;
69 packets.Add(packet.SequenceNumber, packet); 58 FromResend = fromResend;
70 return true;
71 }
72 return false;
73 } 59 }
74 } 60 }
75 61
76 /// <summary> 62 /// <summary>Holds the actual unacked packet data, sorted by sequence number</summary>
77 /// Removes a packet from the collection without attempting to obtain a 63 private SortedDictionary<uint, OutgoingPacket> m_packets = new SortedDictionary<uint, OutgoingPacket>();
78 /// lock first 64 /// <summary>Holds packets that need to be added to the unacknowledged list</summary>
79 /// </summary> 65 private LocklessQueue<OutgoingPacket> m_pendingAdds = new LocklessQueue<OutgoingPacket>();
80 /// <param name="sequenceNumber">Sequence number of the packet to remove</param> 66 /// <summary>Holds information about pending acknowledgements</summary>
81 /// <returns>True if the packet was found and removed, otherwise false</returns> 67 private LocklessQueue<PendingAck> m_pendingRemoves = new LocklessQueue<PendingAck>();
82 public bool RemoveUnsafe(uint sequenceNumber)
83 {
84 return packets.Remove(sequenceNumber);
85 }
86
87 /// <summary>
88 /// Removes a packet from the collection without attempting to obtain a
89 /// lock first
90 /// </summary>
91 /// <param name="sequenceNumber">Sequence number of the packet to remove</param>
92 /// <param name="packet">Returns the removed packet</param>
93 /// <returns>True if the packet was found and removed, otherwise false</returns>
94 public bool RemoveUnsafe(uint sequenceNumber, out OutgoingPacket packet)
95 {
96 if (packets.TryGetValue(sequenceNumber, out packet))
97 {
98 packets.Remove(sequenceNumber);
99 return true;
100 }
101
102 return false;
103 }
104 68
105 /// <summary> 69 /// <summary>
106 /// Removes all elements from the collection 70 /// Add an unacked packet to the collection
107 /// </summary> 71 /// </summary>
108 public void Clear() 72 /// <param name="packet">Packet that is awaiting acknowledgement</param>
73 /// <returns>True if the packet was successfully added, false if the
74 /// packet already existed in the collection</returns>
75 /// <remarks>This does not immediately add the ACK to the collection,
76 /// it only queues it so it can be added in a thread-safe way later</remarks>
77 public void Add(OutgoingPacket packet)
109 { 78 {
110 lock (SyncRoot) 79 m_pendingAdds.Enqueue(packet);
111 packets.Clear();
112 } 80 }
113 81
114 /// <summary> 82 /// <summary>
115 /// Gets the packet with the lowest sequence number 83 /// Marks a packet as acknowledged
116 /// </summary> 84 /// </summary>
117 /// <returns>The packet with the lowest sequence number, or null if the 85 /// <param name="sequenceNumber">Sequence number of the packet to
118 /// collection is empty</returns> 86 /// acknowledge</param>
119 public OutgoingPacket GetOldest() 87 /// <param name="currentTime">Current value of Environment.TickCount</param>
88 public void Remove(uint sequenceNumber, int currentTime, bool fromResend)
120 { 89 {
121 lock (SyncRoot) 90 m_pendingRemoves.Enqueue(new PendingAck(sequenceNumber, currentTime, fromResend));
122 {
123 using (SortedDictionary<uint, OutgoingPacket>.ValueCollection.Enumerator e = packets.Values.GetEnumerator())
124 {
125 if (e.MoveNext())
126 return e.Current;
127 else
128 return null;
129 }
130 }
131 } 91 }
132 92
133 /// <summary> 93 /// <summary>
@@ -138,14 +98,19 @@ namespace OpenSim.Region.ClientStack.LindenUDP
138 /// packet is considered expired</param> 98 /// packet is considered expired</param>
139 /// <returns>A list of all expired packets according to the given 99 /// <returns>A list of all expired packets according to the given
140 /// expiration timeout</returns> 100 /// expiration timeout</returns>
101 /// <remarks>This function is not thread safe, and cannot be called
102 /// multiple times concurrently</remarks>
141 public List<OutgoingPacket> GetExpiredPackets(int timeoutMS) 103 public List<OutgoingPacket> GetExpiredPackets(int timeoutMS)
142 { 104 {
105 ProcessQueues();
106
143 List<OutgoingPacket> expiredPackets = null; 107 List<OutgoingPacket> expiredPackets = null;
144 108
145 lock (SyncRoot) 109 if (m_packets.Count > 0)
146 { 110 {
147 int now = Environment.TickCount; 111 int now = Environment.TickCount;
148 foreach (OutgoingPacket packet in packets.Values) 112
113 foreach (OutgoingPacket packet in m_packets.Values)
149 { 114 {
150 // TickCount of zero means a packet is in the resend queue 115 // TickCount of zero means a packet is in the resend queue
151 // but hasn't actually been sent over the wire yet 116 // but hasn't actually been sent over the wire yet
@@ -167,5 +132,35 @@ namespace OpenSim.Region.ClientStack.LindenUDP
167 132
168 return expiredPackets; 133 return expiredPackets;
169 } 134 }
135
136 private void ProcessQueues()
137 {
138 // Process all the pending adds
139 OutgoingPacket pendingAdd;
140 while (m_pendingAdds.Dequeue(out pendingAdd))
141 m_packets[pendingAdd.SequenceNumber] = pendingAdd;
142
143 // Process all the pending removes, including updating statistics and round-trip times
144 PendingAck pendingRemove;
145 OutgoingPacket ackedPacket;
146 while (m_pendingRemoves.Dequeue(out pendingRemove))
147 {
148 if (m_packets.TryGetValue(pendingRemove.SequenceNumber, out ackedPacket))
149 {
150 m_packets.Remove(pendingRemove.SequenceNumber);
151
152 // Update stats
153 System.Threading.Interlocked.Add(ref ackedPacket.Client.UnackedBytes, -ackedPacket.Buffer.DataLength);
154
155 if (!pendingRemove.FromResend)
156 {
157 // Calculate the round-trip time for this packet and its ACK
158 int rtt = pendingRemove.RemoveTime - ackedPacket.TickCount;
159 if (rtt > 0)
160 ackedPacket.Client.UpdateRoundTrip(rtt);
161 }
162 }
163 }
164 }
170 } 165 }
171} 166}