aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs')
-rw-r--r--OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs325
1 files changed, 206 insertions, 119 deletions
diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs
index 0394e54..36e0a0e 100644
--- a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs
+++ b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs
@@ -96,9 +96,11 @@ namespace OpenSim.Region.ClientStack.LindenUDP
96 set 96 set
97 { 97 {
98 m_throttleDebugLevel = value; 98 m_throttleDebugLevel = value;
99/*
99 m_throttleClient.DebugLevel = m_throttleDebugLevel; 100 m_throttleClient.DebugLevel = m_throttleDebugLevel;
100 foreach (TokenBucket tb in m_throttleCategories) 101 foreach (TokenBucket tb in m_throttleCategories)
101 tb.DebugLevel = m_throttleDebugLevel; 102 tb.DebugLevel = m_throttleDebugLevel;
103 */
102 } 104 }
103 } 105 }
104 private int m_throttleDebugLevel; 106 private int m_throttleDebugLevel;
@@ -120,16 +122,11 @@ namespace OpenSim.Region.ClientStack.LindenUDP
120 /// <summary>Sequence numbers of packets we've received (for duplicate checking)</summary> 122 /// <summary>Sequence numbers of packets we've received (for duplicate checking)</summary>
121 public readonly IncomingPacketHistoryCollection PacketArchive = new IncomingPacketHistoryCollection(200); 123 public readonly IncomingPacketHistoryCollection PacketArchive = new IncomingPacketHistoryCollection(200);
122 124
123 /// <summary>
124 /// If true then we take action in response to unacked reliably sent packets such as resending the packet.
125 /// </summary>
126 public bool ProcessUnackedSends { get; set; }
127
128 /// <summary>Packets we have sent that need to be ACKed by the client</summary> 125 /// <summary>Packets we have sent that need to be ACKed by the client</summary>
129 public readonly UnackedPacketCollection NeedAcks = new UnackedPacketCollection(); 126 public readonly UnackedPacketCollection NeedAcks = new UnackedPacketCollection();
130 127
131 /// <summary>ACKs that are queued up, waiting to be sent to the client</summary> 128 /// <summary>ACKs that are queued up, waiting to be sent to the client</summary>
132 public readonly OpenSim.Framework.LocklessQueue<uint> PendingAcks = new OpenSim.Framework.LocklessQueue<uint>(); 129 public readonly DoubleLocklessQueue<uint> PendingAcks = new DoubleLocklessQueue<uint>();
133 130
134 /// <summary>Current packet sequence number</summary> 131 /// <summary>Current packet sequence number</summary>
135 public int CurrentSequence; 132 public int CurrentSequence;
@@ -181,7 +178,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
181 /// <summary>Throttle buckets for each packet category</summary> 178 /// <summary>Throttle buckets for each packet category</summary>
182 private readonly TokenBucket[] m_throttleCategories; 179 private readonly TokenBucket[] m_throttleCategories;
183 /// <summary>Outgoing queues for throttled packets</summary> 180 /// <summary>Outgoing queues for throttled packets</summary>
184 private readonly OpenSim.Framework.LocklessQueue<OutgoingPacket>[] m_packetOutboxes = new OpenSim.Framework.LocklessQueue<OutgoingPacket>[THROTTLE_CATEGORY_COUNT]; 181 private readonly DoubleLocklessQueue<OutgoingPacket>[] m_packetOutboxes = new DoubleLocklessQueue<OutgoingPacket>[THROTTLE_CATEGORY_COUNT];
185 /// <summary>A container that can hold one packet for each outbox, used to store 182 /// <summary>A container that can hold one packet for each outbox, used to store
186 /// dequeued packets that are being held for throttling</summary> 183 /// dequeued packets that are being held for throttling</summary>
187 private readonly OutgoingPacket[] m_nextPackets = new OutgoingPacket[THROTTLE_CATEGORY_COUNT]; 184 private readonly OutgoingPacket[] m_nextPackets = new OutgoingPacket[THROTTLE_CATEGORY_COUNT];
@@ -193,6 +190,24 @@ namespace OpenSim.Region.ClientStack.LindenUDP
193 190
194 private int m_defaultRTO = 1000; // 1sec is the recommendation in the RFC 191 private int m_defaultRTO = 1000; // 1sec is the recommendation in the RFC
195 private int m_maxRTO = 60000; 192 private int m_maxRTO = 60000;
193 public bool m_deliverPackets = true;
194
195 private float m_burstTime;
196
197 public int m_lastStartpingTimeMS;
198 public int m_pingMS;
199
200 public int PingTimeMS
201 {
202 get
203 {
204 if (m_pingMS < 10)
205 return 10;
206 if(m_pingMS > 2000)
207 return 2000;
208 return m_pingMS;
209 }
210 }
196 211
197 /// <summary> 212 /// <summary>
198 /// This is the percentage of the udp texture queue to add to the task queue since 213 /// This is the percentage of the udp texture queue to add to the task queue since
@@ -232,31 +247,27 @@ namespace OpenSim.Region.ClientStack.LindenUDP
232 if (maxRTO != 0) 247 if (maxRTO != 0)
233 m_maxRTO = maxRTO; 248 m_maxRTO = maxRTO;
234 249
235 ProcessUnackedSends = true; 250 m_burstTime = rates.BrustTime;
251 float m_burst = rates.ClientMaxRate * m_burstTime;
236 252
237 // Create a token bucket throttle for this client that has the scene token bucket as a parent 253 // Create a token bucket throttle for this client that has the scene token bucket as a parent
238 m_throttleClient 254 m_throttleClient = new AdaptiveTokenBucket(parentThrottle, rates.ClientMaxRate, m_burst, rates.AdaptiveThrottlesEnabled);
239 = new AdaptiveTokenBucket(
240 string.Format("adaptive throttle for {0} in {1}", AgentID, server.Scene.Name),
241 parentThrottle, 0, rates.Total, rates.MinimumAdaptiveThrottleRate, rates.AdaptiveThrottlesEnabled);
242 255
243 // Create an array of token buckets for this clients different throttle categories 256 // Create an array of token buckets for this clients different throttle categories
244 m_throttleCategories = new TokenBucket[THROTTLE_CATEGORY_COUNT]; 257 m_throttleCategories = new TokenBucket[THROTTLE_CATEGORY_COUNT];
245 258
246 m_cannibalrate = rates.CannibalizeTextureRate; 259 m_cannibalrate = rates.CannibalizeTextureRate;
247 260
261 m_burst = rates.Total * rates.BrustTime;
262
248 for (int i = 0; i < THROTTLE_CATEGORY_COUNT; i++) 263 for (int i = 0; i < THROTTLE_CATEGORY_COUNT; i++)
249 { 264 {
250 ThrottleOutPacketType type = (ThrottleOutPacketType)i; 265 ThrottleOutPacketType type = (ThrottleOutPacketType)i;
251 266
252 // Initialize the packet outboxes, where packets sit while they are waiting for tokens 267 // Initialize the packet outboxes, where packets sit while they are waiting for tokens
253 m_packetOutboxes[i] = new OpenSim.Framework.LocklessQueue<OutgoingPacket>(); 268 m_packetOutboxes[i] = new DoubleLocklessQueue<OutgoingPacket>();
254
255 // Initialize the token buckets that control the throttling for each category 269 // Initialize the token buckets that control the throttling for each category
256 m_throttleCategories[i] 270 m_throttleCategories[i] = new TokenBucket(m_throttleClient, rates.GetRate(type), m_burst);
257 = new TokenBucket(
258 string.Format("{0} throttle for {1} in {2}", type, AgentID, server.Scene.Name),
259 m_throttleClient, rates.GetRate(type), 0);
260 } 271 }
261 272
262 // Default the retransmission timeout to one second 273 // Default the retransmission timeout to one second
@@ -264,6 +275,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
264 275
265 // Initialize this to a sane value to prevent early disconnects 276 // Initialize this to a sane value to prevent early disconnects
266 TickLastPacketReceived = Environment.TickCount & Int32.MaxValue; 277 TickLastPacketReceived = Environment.TickCount & Int32.MaxValue;
278 m_pingMS = (int)(3.0 * server.TickCountResolution); // so filter doesnt start at 0;
267 } 279 }
268 280
269 /// <summary> 281 /// <summary>
@@ -302,9 +314,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
302 m_info.assetThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Asset].DripRate; 314 m_info.assetThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Asset].DripRate;
303 m_info.textureThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Texture].DripRate; 315 m_info.textureThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Texture].DripRate;
304 m_info.totalThrottle = (int)m_throttleClient.DripRate; 316 m_info.totalThrottle = (int)m_throttleClient.DripRate;
305 m_info.targetThrottle = (int)m_throttleClient.TargetDripRate;
306 m_info.maxThrottle = (int)m_throttleClient.MaxDripRate;
307
308 return m_info; 317 return m_info;
309 } 318 }
310 319
@@ -341,8 +350,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
341 /// <param name="throttleType"></param> 350 /// <param name="throttleType"></param>
342 public int GetPacketsQueuedCount(ThrottleOutPacketType throttleType) 351 public int GetPacketsQueuedCount(ThrottleOutPacketType throttleType)
343 { 352 {
344 if ((int)throttleType > 0) 353 int icat = (int)throttleType;
345 return m_packetOutboxes[(int)throttleType].Count; 354 if (icat > 0 && icat < THROTTLE_CATEGORY_COUNT)
355 return m_packetOutboxes[icat].Count;
346 else 356 else
347 return 0; 357 return 0;
348 } 358 }
@@ -389,6 +399,11 @@ namespace OpenSim.Region.ClientStack.LindenUDP
389 399
390 public void SetThrottles(byte[] throttleData) 400 public void SetThrottles(byte[] throttleData)
391 { 401 {
402 SetThrottles(throttleData, 1.0f);
403 }
404
405 public void SetThrottles(byte[] throttleData, float factor)
406 {
392 byte[] adjData; 407 byte[] adjData;
393 int pos = 0; 408 int pos = 0;
394 409
@@ -408,24 +423,21 @@ namespace OpenSim.Region.ClientStack.LindenUDP
408 } 423 }
409 424
410 // 0.125f converts from bits to bytes 425 // 0.125f converts from bits to bytes
411 int resend = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4; 426 float scale = 0.125f * factor;
412 int land = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4; 427 int resend = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
413 int wind = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4; 428 int land = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
414 int cloud = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4; 429 int wind = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
415 int task = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4; 430 int cloud = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
416 int texture = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4; 431 int task = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
417 int asset = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); 432 int texture = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
433 int asset = (int)(BitConverter.ToSingle(adjData, pos) * scale);
434
418 435
419 if (ThrottleDebugLevel > 0)
420 {
421 long total = resend + land + wind + cloud + task + texture + asset;
422 m_log.DebugFormat(
423 "[LLUDPCLIENT]: {0} is setting throttles in {1} to Resend={2}, Land={3}, Wind={4}, Cloud={5}, Task={6}, Texture={7}, Asset={8}, TOTAL = {9}",
424 AgentID, m_udpServer.Scene.Name, resend, land, wind, cloud, task, texture, asset, total);
425 }
426 436
427 // Make sure none of the throttles are set below our packet MTU, 437 // Make sure none of the throttles are set below our packet MTU,
428 // otherwise a throttle could become permanently clogged 438 // otherwise a throttle could become permanently clogged
439
440/* now using floats
429 resend = Math.Max(resend, LLUDPServer.MTU); 441 resend = Math.Max(resend, LLUDPServer.MTU);
430 land = Math.Max(land, LLUDPServer.MTU); 442 land = Math.Max(land, LLUDPServer.MTU);
431 wind = Math.Max(wind, LLUDPServer.MTU); 443 wind = Math.Max(wind, LLUDPServer.MTU);
@@ -433,52 +445,54 @@ namespace OpenSim.Region.ClientStack.LindenUDP
433 task = Math.Max(task, LLUDPServer.MTU); 445 task = Math.Max(task, LLUDPServer.MTU);
434 texture = Math.Max(texture, LLUDPServer.MTU); 446 texture = Math.Max(texture, LLUDPServer.MTU);
435 asset = Math.Max(asset, LLUDPServer.MTU); 447 asset = Math.Max(asset, LLUDPServer.MTU);
448*/
436 449
437 // Since most textures are now delivered through http, make it possible 450 // Since most textures are now delivered through http, make it possible
438 // to cannibalize some of the bw from the texture throttle to use for 451 // to cannibalize some of the bw from the texture throttle to use for
439 // the task queue (e.g. object updates) 452 // the task queue (e.g. object updates)
440 task = task + (int)(m_cannibalrate * texture); 453 task = task + (int)(m_cannibalrate * texture);
441 texture = (int)((1 - m_cannibalrate) * texture); 454 texture = (int)((1 - m_cannibalrate) * texture);
442 455
443 //int total = resend + land + wind + cloud + task + texture + asset; 456 int total = resend + land + wind + cloud + task + texture + asset;
457
458 float m_burst = total * m_burstTime;
444 459
445 if (ThrottleDebugLevel > 0) 460 if (ThrottleDebugLevel > 0)
446 { 461 {
447 long total = resend + land + wind + cloud + task + texture + asset;
448 m_log.DebugFormat( 462 m_log.DebugFormat(
449 "[LLUDPCLIENT]: {0} is setting throttles in {1} to Resend={2}, Land={3}, Wind={4}, Cloud={5}, Task={6}, Texture={7}, Asset={8}, TOTAL = {9}", 463 "[LLUDPCLIENT]: {0} is setting throttles in {1} to Resend={2}, Land={3}, Wind={4}, Cloud={5}, Task={6}, Texture={7}, Asset={8}, TOTAL = {9}",
450 AgentID, m_udpServer.Scene.Name, resend, land, wind, cloud, task, texture, asset, total); 464 AgentID, m_udpServer.Scene.Name, resend, land, wind, cloud, task, texture, asset, total);
451 } 465 }
452 466
453 // Update the token buckets with new throttle values
454 if (m_throttleClient.AdaptiveEnabled)
455 {
456 long total = resend + land + wind + cloud + task + texture + asset;
457 m_throttleClient.TargetDripRate = total;
458 }
459
460 TokenBucket bucket; 467 TokenBucket bucket;
461 468
462 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Resend]; 469 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Resend];
463 bucket.RequestedDripRate = resend; 470 bucket.RequestedDripRate = resend;
471 bucket.RequestedBurst = m_burst;
464 472
465 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Land]; 473 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Land];
466 bucket.RequestedDripRate = land; 474 bucket.RequestedDripRate = land;
475 bucket.RequestedBurst = m_burst;
467 476
468 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Wind]; 477 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Wind];
469 bucket.RequestedDripRate = wind; 478 bucket.RequestedDripRate = wind;
479 bucket.RequestedBurst = m_burst;
470 480
471 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Cloud]; 481 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Cloud];
472 bucket.RequestedDripRate = cloud; 482 bucket.RequestedDripRate = cloud;
483 bucket.RequestedBurst = m_burst;
473 484
474 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Asset]; 485 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Asset];
475 bucket.RequestedDripRate = asset; 486 bucket.RequestedDripRate = asset;
487 bucket.RequestedBurst = m_burst;
476 488
477 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Task]; 489 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Task];
478 bucket.RequestedDripRate = task; 490 bucket.RequestedDripRate = task;
491 bucket.RequestedBurst = m_burst;
479 492
480 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Texture]; 493 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Texture];
481 bucket.RequestedDripRate = texture; 494 bucket.RequestedDripRate = texture;
495 bucket.RequestedBurst = m_burst;
482 496
483 // Reset the packed throttles cached data 497 // Reset the packed throttles cached data
484 m_packedThrottles = null; 498 m_packedThrottles = null;
@@ -496,25 +510,27 @@ namespace OpenSim.Region.ClientStack.LindenUDP
496 int i = 0; 510 int i = 0;
497 511
498 // multiply by 8 to convert bytes back to bits 512 // multiply by 8 to convert bytes back to bits
499 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Resend].RequestedDripRate * 8 * multiplier; 513 multiplier *= 8;
514
515 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Resend].RequestedDripRate * multiplier;
500 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4; 516 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
501 517
502 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Land].RequestedDripRate * 8 * multiplier; 518 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Land].RequestedDripRate * multiplier;
503 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4; 519 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
504 520
505 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Wind].RequestedDripRate * 8 * multiplier; 521 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Wind].RequestedDripRate * multiplier;
506 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4; 522 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
507 523
508 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Cloud].RequestedDripRate * 8 * multiplier; 524 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Cloud].RequestedDripRate * multiplier;
509 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4; 525 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
510 526
511 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Task].RequestedDripRate * 8 * multiplier; 527 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Task].RequestedDripRate * multiplier;
512 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4; 528 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
513 529
514 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Texture].RequestedDripRate * 8 * multiplier; 530 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Texture].RequestedDripRate * multiplier;
515 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4; 531 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
516 532
517 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Asset].RequestedDripRate * 8 * multiplier; 533 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Asset].RequestedDripRate * multiplier;
518 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4; 534 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
519 535
520 m_packedThrottles = data; 536 m_packedThrottles = data;
@@ -522,6 +538,18 @@ namespace OpenSim.Region.ClientStack.LindenUDP
522 538
523 return data; 539 return data;
524 } 540 }
541
542 public int GetCatBytesCanSend(ThrottleOutPacketType cat, int timeMS)
543 {
544 int icat = (int)cat;
545 if (icat > 0 && icat < THROTTLE_CATEGORY_COUNT)
546 {
547 TokenBucket bucket = m_throttleCategories[icat];
548 return bucket.GetCatBytesCanSend(timeMS);
549 }
550 else
551 return 0;
552 }
525 553
526 /// <summary> 554 /// <summary>
527 /// Queue an outgoing packet if appropriate. 555 /// Queue an outgoing packet if appropriate.
@@ -534,32 +562,42 @@ namespace OpenSim.Region.ClientStack.LindenUDP
534 /// </returns> 562 /// </returns>
535 public bool EnqueueOutgoing(OutgoingPacket packet, bool forceQueue) 563 public bool EnqueueOutgoing(OutgoingPacket packet, bool forceQueue)
536 { 564 {
565 return EnqueueOutgoing(packet, forceQueue, false);
566 }
567
568 public bool EnqueueOutgoing(OutgoingPacket packet, bool forceQueue, bool highPriority)
569 {
537 int category = (int)packet.Category; 570 int category = (int)packet.Category;
538 571
539 if (category >= 0 && category < m_packetOutboxes.Length) 572 if (category >= 0 && category < m_packetOutboxes.Length)
540 { 573 {
541 OpenSim.Framework.LocklessQueue<OutgoingPacket> queue = m_packetOutboxes[category]; 574 DoubleLocklessQueue<OutgoingPacket> queue = m_packetOutboxes[category];
575
576 if (m_deliverPackets == false)
577 {
578 queue.Enqueue(packet, highPriority);
579 return true;
580 }
581
542 TokenBucket bucket = m_throttleCategories[category]; 582 TokenBucket bucket = m_throttleCategories[category];
543 583
544 // Don't send this packet if there is already a packet waiting in the queue 584 // Don't send this packet if queue is not empty
545 // even if we have the tokens to send it, tokens should go to the already 585 if (queue.Count > 0 || m_nextPackets[category] != null)
546 // queued packets
547 if (queue.Count > 0)
548 { 586 {
549 queue.Enqueue(packet); 587 queue.Enqueue(packet, highPriority);
550 return true; 588 return true;
551 } 589 }
552 590
553 591 if (!forceQueue && bucket.CheckTokens(packet.Buffer.DataLength))
554 if (!forceQueue && bucket.RemoveTokens(packet.Buffer.DataLength))
555 { 592 {
556 // Enough tokens were removed from the bucket, the packet will not be queued 593 // enough tokens so it can be sent imediatly by caller
594 bucket.RemoveTokens(packet.Buffer.DataLength);
557 return false; 595 return false;
558 } 596 }
559 else 597 else
560 { 598 {
561 // Force queue specified or not enough tokens in the bucket, queue this packet 599 // Force queue specified or not enough tokens in the bucket, queue this packet
562 queue.Enqueue(packet); 600 queue.Enqueue(packet, highPriority);
563 return true; 601 return true;
564 } 602 }
565 } 603 }
@@ -568,6 +606,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
568 // We don't have a token bucket for this category, so it will not be queued 606 // We don't have a token bucket for this category, so it will not be queued
569 return false; 607 return false;
570 } 608 }
609
571 } 610 }
572 611
573 /// <summary> 612 /// <summary>
@@ -588,8 +627,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP
588 /// <returns>True if any packets were sent, otherwise false</returns> 627 /// <returns>True if any packets were sent, otherwise false</returns>
589 public bool DequeueOutgoing() 628 public bool DequeueOutgoing()
590 { 629 {
591 OutgoingPacket packet; 630// if (m_deliverPackets == false) return false;
592 OpenSim.Framework.LocklessQueue<OutgoingPacket> queue; 631
632 OutgoingPacket packet = null;
633 DoubleLocklessQueue<OutgoingPacket> queue;
593 TokenBucket bucket; 634 TokenBucket bucket;
594 bool packetSent = false; 635 bool packetSent = false;
595 ThrottleOutPacketTypeFlags emptyCategories = 0; 636 ThrottleOutPacketTypeFlags emptyCategories = 0;
@@ -613,6 +654,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
613 m_udpServer.SendPacketFinal(nextPacket); 654 m_udpServer.SendPacketFinal(nextPacket);
614 m_nextPackets[i] = null; 655 m_nextPackets[i] = null;
615 packetSent = true; 656 packetSent = true;
657
658 if (m_packetOutboxes[i].Count < 5)
659 emptyCategories |= CategoryToFlag(i);
616 } 660 }
617 } 661 }
618 else 662 else
@@ -620,32 +664,47 @@ namespace OpenSim.Region.ClientStack.LindenUDP
620 // No dequeued packet waiting to be sent, try to pull one off 664 // No dequeued packet waiting to be sent, try to pull one off
621 // this queue 665 // this queue
622 queue = m_packetOutboxes[i]; 666 queue = m_packetOutboxes[i];
623 if (queue.Dequeue(out packet)) 667 if (queue != null)
624 { 668 {
625 // A packet was pulled off the queue. See if we have 669 bool success = false;
626 // enough tokens in the bucket to send it out 670 try
627 if (bucket.RemoveTokens(packet.Buffer.DataLength))
628 { 671 {
629 // Send the packet 672 success = queue.Dequeue(out packet);
630 m_udpServer.SendPacketFinal(packet);
631 packetSent = true;
632 } 673 }
633 else 674 catch
634 { 675 {
635 // Save the dequeued packet for the next iteration 676 m_packetOutboxes[i] = new DoubleLocklessQueue<OutgoingPacket>();
636 m_nextPackets[i] = packet;
637 } 677 }
678 if (success)
679 {
680 // A packet was pulled off the queue. See if we have
681 // enough tokens in the bucket to send it out
682 if (bucket.RemoveTokens(packet.Buffer.DataLength))
683 {
684 // Send the packet
685 m_udpServer.SendPacketFinal(packet);
686 packetSent = true;
687
688 if (queue.Count < 5)
689 emptyCategories |= CategoryToFlag(i);
690 }
691 else
692 {
693 // Save the dequeued packet for the next iteration
694 m_nextPackets[i] = packet;
695 }
638 696
639 // If the queue is empty after this dequeue, fire the queue 697 }
640 // empty callback now so it has a chance to fill before we 698 else
641 // get back here 699 {
642 if (queue.Count == 0) 700 // No packets in this queue. Fire the queue empty callback
701 // if it has not been called recently
643 emptyCategories |= CategoryToFlag(i); 702 emptyCategories |= CategoryToFlag(i);
703 }
644 } 704 }
645 else 705 else
646 { 706 {
647 // No packets in this queue. Fire the queue empty callback 707 m_packetOutboxes[i] = new DoubleLocklessQueue<OutgoingPacket>();
648 // if it has not been called recently
649 emptyCategories |= CategoryToFlag(i); 708 emptyCategories |= CategoryToFlag(i);
650 } 709 }
651 } 710 }
@@ -712,6 +771,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
712 RTO = Math.Min(RTO * 2, m_maxRTO); 771 RTO = Math.Min(RTO * 2, m_maxRTO);
713 } 772 }
714 773
774
775 const int MIN_CALLBACK_MS = 10;
776
715 /// <summary> 777 /// <summary>
716 /// Does an early check to see if this queue empty callback is already 778 /// Does an early check to see if this queue empty callback is already
717 /// running, then asynchronously firing the event 779 /// running, then asynchronously firing the event
@@ -719,24 +781,20 @@ namespace OpenSim.Region.ClientStack.LindenUDP
719 /// <param name="categories">Throttle categories to fire the callback for</param> 781 /// <param name="categories">Throttle categories to fire the callback for</param>
720 private void BeginFireQueueEmpty(ThrottleOutPacketTypeFlags categories) 782 private void BeginFireQueueEmpty(ThrottleOutPacketTypeFlags categories)
721 { 783 {
722// if (m_nextOnQueueEmpty != 0 && (Environment.TickCount & Int32.MaxValue) >= m_nextOnQueueEmpty) 784 if (!m_isQueueEmptyRunning)
723 if (!m_isQueueEmptyRunning && (Environment.TickCount & Int32.MaxValue) >= m_nextOnQueueEmpty)
724 { 785 {
725 m_isQueueEmptyRunning = true;
726
727 int start = Environment.TickCount & Int32.MaxValue; 786 int start = Environment.TickCount & Int32.MaxValue;
728 const int MIN_CALLBACK_MS = 30; 787
788 if (start < m_nextOnQueueEmpty)
789 return;
790
791 m_isQueueEmptyRunning = true;
729 792
730 m_nextOnQueueEmpty = start + MIN_CALLBACK_MS; 793 m_nextOnQueueEmpty = start + MIN_CALLBACK_MS;
731 if (m_nextOnQueueEmpty == 0) 794 if (m_nextOnQueueEmpty == 0)
732 m_nextOnQueueEmpty = 1; 795 m_nextOnQueueEmpty = 1;
733 796
734 // Use a value of 0 to signal that FireQueueEmpty is running 797 if (HasUpdates(categories))
735// m_nextOnQueueEmpty = 0;
736
737 m_categories = categories;
738
739 if (HasUpdates(m_categories))
740 { 798 {
741 if (!m_udpServer.OqrEngine.IsRunning) 799 if (!m_udpServer.OqrEngine.IsRunning)
742 { 800 {
@@ -756,7 +814,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
756 } 814 }
757 815
758 private bool m_isQueueEmptyRunning; 816 private bool m_isQueueEmptyRunning;
759 private ThrottleOutPacketTypeFlags m_categories = 0; 817
760 818
761 /// <summary> 819 /// <summary>
762 /// Fires the OnQueueEmpty callback and sets the minimum time that it 820 /// Fires the OnQueueEmpty callback and sets the minimum time that it
@@ -767,33 +825,33 @@ namespace OpenSim.Region.ClientStack.LindenUDP
767 /// signature</param> 825 /// signature</param>
768 public void FireQueueEmpty(object o) 826 public void FireQueueEmpty(object o)
769 { 827 {
770// m_log.DebugFormat("[LLUDPCLIENT]: FireQueueEmpty for {0} in {1}", AgentID, m_udpServer.Scene.Name); 828 ThrottleOutPacketTypeFlags categories = (ThrottleOutPacketTypeFlags)o;
771 829 QueueEmpty callback = OnQueueEmpty;
772// int start = Environment.TickCount & Int32.MaxValue;
773// const int MIN_CALLBACK_MS = 30;
774 830
775// if (m_udpServer.IsRunningOutbound) 831 if (callback != null)
776// { 832 {
777 ThrottleOutPacketTypeFlags categories = (ThrottleOutPacketTypeFlags)o; 833 // if (m_udpServer.IsRunningOutbound)
778 QueueEmpty callback = OnQueueEmpty; 834 // {
779 835 try { callback(categories); }
780 if (callback != null) 836 catch (Exception e) { m_log.Error("[LLUDPCLIENT]: OnQueueEmpty(" + categories + ") threw an exception: " + e.Message, e); }
781 { 837 // }
782// if (m_udpServer.IsRunningOutbound) 838 }
783// {
784 try { callback(categories); }
785 catch (Exception e) { m_log.Error("[LLUDPCLIENT]: OnQueueEmpty(" + categories + ") threw an exception: " + e.Message, e); }
786// }
787 }
788// }
789 839
790// m_nextOnQueueEmpty = start + MIN_CALLBACK_MS; 840 m_isQueueEmptyRunning = false;
791// if (m_nextOnQueueEmpty == 0) 841 }
792// m_nextOnQueueEmpty = 1;
793 842
794// } 843 internal void ForceThrottleSetting(int throttle, int setting)
844 {
845 if (throttle > 0 && throttle < THROTTLE_CATEGORY_COUNT)
846 m_throttleCategories[throttle].RequestedDripRate = Math.Max(setting, LLUDPServer.MTU);
847 }
795 848
796 m_isQueueEmptyRunning = false; 849 internal int GetThrottleSetting(int throttle)
850 {
851 if (throttle > 0 && throttle < THROTTLE_CATEGORY_COUNT)
852 return (int)m_throttleCategories[throttle].RequestedDripRate;
853 else
854 return 0;
797 } 855 }
798 856
799 /// <summary> 857 /// <summary>
@@ -839,4 +897,33 @@ namespace OpenSim.Region.ClientStack.LindenUDP
839 } 897 }
840 } 898 }
841 } 899 }
900
901 public class DoubleLocklessQueue<T> : OpenSim.Framework.LocklessQueue<T>
902 {
903 OpenSim.Framework.LocklessQueue<T> highQueue = new OpenSim.Framework.LocklessQueue<T>();
904
905 public override int Count
906 {
907 get
908 {
909 return base.Count + highQueue.Count;
910 }
911 }
912
913 public override bool Dequeue(out T item)
914 {
915 if (highQueue.Dequeue(out item))
916 return true;
917
918 return base.Dequeue(out item);
919 }
920
921 public void Enqueue(T item, bool highPriority)
922 {
923 if (highPriority)
924 highQueue.Enqueue(item);
925 else
926 Enqueue(item);
927 }
928 }
842} 929}