aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs')
-rw-r--r--OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs388
1 files changed, 232 insertions, 156 deletions
diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs
index 0394e54..439621a 100644
--- a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs
+++ b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs
@@ -52,7 +52,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
52 /// are waiting on ACKs for</param> 52 /// are waiting on ACKs for</param>
53 public delegate void PacketStats(int inPackets, int outPackets, int unAckedBytes); 53 public delegate void PacketStats(int inPackets, int outPackets, int unAckedBytes);
54 /// <summary> 54 /// <summary>
55 /// Fired when the queue for one or more packet categories is empty. This 55 /// Fired when the queue for one or more packet categories is empty. This
56 /// event can be hooked to put more data on the empty queues 56 /// event can be hooked to put more data on the empty queues
57 /// </summary> 57 /// </summary>
58 /// <param name="category">Categories of the packet queues that are empty</param> 58 /// <param name="category">Categories of the packet queues that are empty</param>
@@ -86,8 +86,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
86 /// Controls whether information is logged about each outbound packet immediately before it is sent. For debug purposes. 86 /// Controls whether information is logged about each outbound packet immediately before it is sent. For debug purposes.
87 /// </summary> 87 /// </summary>
88 /// <remarks>Any level above 0 will turn on logging.</remarks> 88 /// <remarks>Any level above 0 will turn on logging.</remarks>
89 public int ThrottleDebugLevel 89 public int ThrottleDebugLevel
90 { 90 {
91 get 91 get
92 { 92 {
93 return m_throttleDebugLevel; 93 return m_throttleDebugLevel;
@@ -96,9 +96,11 @@ namespace OpenSim.Region.ClientStack.LindenUDP
96 set 96 set
97 { 97 {
98 m_throttleDebugLevel = value; 98 m_throttleDebugLevel = value;
99/*
99 m_throttleClient.DebugLevel = m_throttleDebugLevel; 100 m_throttleClient.DebugLevel = m_throttleDebugLevel;
100 foreach (TokenBucket tb in m_throttleCategories) 101 foreach (TokenBucket tb in m_throttleCategories)
101 tb.DebugLevel = m_throttleDebugLevel; 102 tb.DebugLevel = m_throttleDebugLevel;
103 */
102 } 104 }
103 } 105 }
104 private int m_throttleDebugLevel; 106 private int m_throttleDebugLevel;
@@ -118,18 +120,13 @@ namespace OpenSim.Region.ClientStack.LindenUDP
118 /// <summary>Circuit code that this client is connected on</summary> 120 /// <summary>Circuit code that this client is connected on</summary>
119 public readonly uint CircuitCode; 121 public readonly uint CircuitCode;
120 /// <summary>Sequence numbers of packets we've received (for duplicate checking)</summary> 122 /// <summary>Sequence numbers of packets we've received (for duplicate checking)</summary>
121 public readonly IncomingPacketHistoryCollection PacketArchive = new IncomingPacketHistoryCollection(200); 123 public IncomingPacketHistoryCollection PacketArchive = new IncomingPacketHistoryCollection(200);
122
123 /// <summary>
124 /// If true then we take action in response to unacked reliably sent packets such as resending the packet.
125 /// </summary>
126 public bool ProcessUnackedSends { get; set; }
127 124
128 /// <summary>Packets we have sent that need to be ACKed by the client</summary> 125 /// <summary>Packets we have sent that need to be ACKed by the client</summary>
129 public readonly UnackedPacketCollection NeedAcks = new UnackedPacketCollection(); 126 public UnackedPacketCollection NeedAcks = new UnackedPacketCollection();
130 127
131 /// <summary>ACKs that are queued up, waiting to be sent to the client</summary> 128 /// <summary>ACKs that are queued up, waiting to be sent to the client</summary>
132 public readonly OpenSim.Framework.LocklessQueue<uint> PendingAcks = new OpenSim.Framework.LocklessQueue<uint>(); 129 public DoubleLocklessQueue<uint> PendingAcks = new DoubleLocklessQueue<uint>();
133 130
134 /// <summary>Current packet sequence number</summary> 131 /// <summary>Current packet sequence number</summary>
135 public int CurrentSequence; 132 public int CurrentSequence;
@@ -160,19 +157,20 @@ namespace OpenSim.Region.ClientStack.LindenUDP
160 /// <summary>Number of packets sent to this client</summary> 157 /// <summary>Number of packets sent to this client</summary>
161 public int PacketsSent; 158 public int PacketsSent;
162 /// <summary>Number of packets resent to this client</summary> 159 /// <summary>Number of packets resent to this client</summary>
163 public int PacketsResent; 160 public int PacketsResent;
164 /// <summary>Total byte count of unacked packets sent to this client</summary> 161 /// <summary>Total byte count of unacked packets sent to this client</summary>
165 public int UnackedBytes; 162 public int UnackedBytes;
166 163
164 private int m_packetsUnAckReported;
167 /// <summary>Total number of received packets that we have reported to the OnPacketStats event(s)</summary> 165 /// <summary>Total number of received packets that we have reported to the OnPacketStats event(s)</summary>
168 private int m_packetsReceivedReported; 166 private int m_packetsReceivedReported;
169 /// <summary>Total number of sent packets that we have reported to the OnPacketStats event(s)</summary> 167 /// <summary>Total number of sent packets that we have reported to the OnPacketStats event(s)</summary>
170 private int m_packetsSentReported; 168 private int m_packetsSentReported;
171 /// <summary>Holds the Environment.TickCount value of when the next OnQueueEmpty can be fired</summary> 169 /// <summary>Holds the Environment.TickCount value of when the next OnQueueEmpty can be fired</summary>
172 private int m_nextOnQueueEmpty = 1; 170 private double m_nextOnQueueEmpty = 0;
173 171
174 /// <summary>Throttle bucket for this agent's connection</summary> 172 /// <summary>Throttle bucket for this agent's connection</summary>
175 private readonly AdaptiveTokenBucket m_throttleClient; 173 private AdaptiveTokenBucket m_throttleClient;
176 public AdaptiveTokenBucket FlowThrottle 174 public AdaptiveTokenBucket FlowThrottle
177 { 175 {
178 get { return m_throttleClient; } 176 get { return m_throttleClient; }
@@ -181,10 +179,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP
181 /// <summary>Throttle buckets for each packet category</summary> 179 /// <summary>Throttle buckets for each packet category</summary>
182 private readonly TokenBucket[] m_throttleCategories; 180 private readonly TokenBucket[] m_throttleCategories;
183 /// <summary>Outgoing queues for throttled packets</summary> 181 /// <summary>Outgoing queues for throttled packets</summary>
184 private readonly OpenSim.Framework.LocklessQueue<OutgoingPacket>[] m_packetOutboxes = new OpenSim.Framework.LocklessQueue<OutgoingPacket>[THROTTLE_CATEGORY_COUNT]; 182 private DoubleLocklessQueue<OutgoingPacket>[] m_packetOutboxes = new DoubleLocklessQueue<OutgoingPacket>[THROTTLE_CATEGORY_COUNT];
185 /// <summary>A container that can hold one packet for each outbox, used to store 183 /// <summary>A container that can hold one packet for each outbox, used to store
186 /// dequeued packets that are being held for throttling</summary> 184 /// dequeued packets that are being held for throttling</summary>
187 private readonly OutgoingPacket[] m_nextPackets = new OutgoingPacket[THROTTLE_CATEGORY_COUNT]; 185 private OutgoingPacket[] m_nextPackets = new OutgoingPacket[THROTTLE_CATEGORY_COUNT];
188 /// <summary>A reference to the LLUDPServer that is managing this client</summary> 186 /// <summary>A reference to the LLUDPServer that is managing this client</summary>
189 private readonly LLUDPServer m_udpServer; 187 private readonly LLUDPServer m_udpServer;
190 188
@@ -193,13 +191,31 @@ namespace OpenSim.Region.ClientStack.LindenUDP
193 191
194 private int m_defaultRTO = 1000; // 1sec is the recommendation in the RFC 192 private int m_defaultRTO = 1000; // 1sec is the recommendation in the RFC
195 private int m_maxRTO = 60000; 193 private int m_maxRTO = 60000;
194 public bool m_deliverPackets = true;
195
196 private float m_burstTime;
197
198 public int m_lastStartpingTimeMS;
199 public int m_pingMS;
200
201 public int PingTimeMS
202 {
203 get
204 {
205 if (m_pingMS < 10)
206 return 10;
207 if(m_pingMS > 2000)
208 return 2000;
209 return m_pingMS;
210 }
211 }
196 212
197 /// <summary> 213 /// <summary>
198 /// This is the percentage of the udp texture queue to add to the task queue since 214 /// This is the percentage of the udp texture queue to add to the task queue since
199 /// textures are now generally handled through http. 215 /// textures are now generally handled through http.
200 /// </summary> 216 /// </summary>
201 private double m_cannibalrate = 0.0; 217 private double m_cannibalrate = 0.0;
202 218
203 private ClientInfo m_info = new ClientInfo(); 219 private ClientInfo m_info = new ClientInfo();
204 220
205 /// <summary> 221 /// <summary>
@@ -232,31 +248,27 @@ namespace OpenSim.Region.ClientStack.LindenUDP
232 if (maxRTO != 0) 248 if (maxRTO != 0)
233 m_maxRTO = maxRTO; 249 m_maxRTO = maxRTO;
234 250
235 ProcessUnackedSends = true; 251 m_burstTime = rates.BrustTime;
252 float m_burst = rates.ClientMaxRate * m_burstTime;
236 253
237 // Create a token bucket throttle for this client that has the scene token bucket as a parent 254 // Create a token bucket throttle for this client that has the scene token bucket as a parent
238 m_throttleClient 255 m_throttleClient = new AdaptiveTokenBucket(parentThrottle, rates.ClientMaxRate, m_burst, rates.AdaptiveThrottlesEnabled);
239 = new AdaptiveTokenBucket(
240 string.Format("adaptive throttle for {0} in {1}", AgentID, server.Scene.Name),
241 parentThrottle, 0, rates.Total, rates.MinimumAdaptiveThrottleRate, rates.AdaptiveThrottlesEnabled);
242 256
243 // Create an array of token buckets for this clients different throttle categories 257 // Create an array of token buckets for this clients different throttle categories
244 m_throttleCategories = new TokenBucket[THROTTLE_CATEGORY_COUNT]; 258 m_throttleCategories = new TokenBucket[THROTTLE_CATEGORY_COUNT];
245 259
246 m_cannibalrate = rates.CannibalizeTextureRate; 260 m_cannibalrate = rates.CannibalizeTextureRate;
247 261
262 m_burst = rates.Total * rates.BrustTime;
263
248 for (int i = 0; i < THROTTLE_CATEGORY_COUNT; i++) 264 for (int i = 0; i < THROTTLE_CATEGORY_COUNT; i++)
249 { 265 {
250 ThrottleOutPacketType type = (ThrottleOutPacketType)i; 266 ThrottleOutPacketType type = (ThrottleOutPacketType)i;
251 267
252 // Initialize the packet outboxes, where packets sit while they are waiting for tokens 268 // Initialize the packet outboxes, where packets sit while they are waiting for tokens
253 m_packetOutboxes[i] = new OpenSim.Framework.LocklessQueue<OutgoingPacket>(); 269 m_packetOutboxes[i] = new DoubleLocklessQueue<OutgoingPacket>();
254
255 // Initialize the token buckets that control the throttling for each category 270 // Initialize the token buckets that control the throttling for each category
256 m_throttleCategories[i] 271 m_throttleCategories[i] = new TokenBucket(m_throttleClient, rates.GetRate(type), m_burst);
257 = new TokenBucket(
258 string.Format("{0} throttle for {1} in {2}", type, AgentID, server.Scene.Name),
259 m_throttleClient, rates.GetRate(type), 0);
260 } 272 }
261 273
262 // Default the retransmission timeout to one second 274 // Default the retransmission timeout to one second
@@ -264,6 +276,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
264 276
265 // Initialize this to a sane value to prevent early disconnects 277 // Initialize this to a sane value to prevent early disconnects
266 TickLastPacketReceived = Environment.TickCount & Int32.MaxValue; 278 TickLastPacketReceived = Environment.TickCount & Int32.MaxValue;
279 m_pingMS = (int)(3.0 * server.TickCountResolution); // so filter doesnt start at 0;
267 } 280 }
268 281
269 /// <summary> 282 /// <summary>
@@ -280,9 +293,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
280 293
281 // pull the throttle out of the scene throttle 294 // pull the throttle out of the scene throttle
282 m_throttleClient.Parent.UnregisterRequest(m_throttleClient); 295 m_throttleClient.Parent.UnregisterRequest(m_throttleClient);
283 OnPacketStats = null; 296 PendingAcks.Clear();
284 OnQueueEmpty = null; 297 NeedAcks.Clear();
285 } 298 }
286 299
287 /// <summary> 300 /// <summary>
288 /// Gets information about this client connection 301 /// Gets information about this client connection
@@ -302,9 +315,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
302 m_info.assetThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Asset].DripRate; 315 m_info.assetThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Asset].DripRate;
303 m_info.textureThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Texture].DripRate; 316 m_info.textureThrottle = (int)m_throttleCategories[(int)ThrottleOutPacketType.Texture].DripRate;
304 m_info.totalThrottle = (int)m_throttleClient.DripRate; 317 m_info.totalThrottle = (int)m_throttleClient.DripRate;
305 m_info.targetThrottle = (int)m_throttleClient.TargetDripRate;
306 m_info.maxThrottle = (int)m_throttleClient.MaxDripRate;
307
308 return m_info; 318 return m_info;
309 } 319 }
310 320
@@ -341,8 +351,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
341 /// <param name="throttleType"></param> 351 /// <param name="throttleType"></param>
342 public int GetPacketsQueuedCount(ThrottleOutPacketType throttleType) 352 public int GetPacketsQueuedCount(ThrottleOutPacketType throttleType)
343 { 353 {
344 if ((int)throttleType > 0) 354 int icat = (int)throttleType;
345 return m_packetOutboxes[(int)throttleType].Count; 355 if (icat > 0 && icat < THROTTLE_CATEGORY_COUNT)
356 return m_packetOutboxes[icat].Count;
346 else 357 else
347 return 0; 358 return 0;
348 } 359 }
@@ -359,7 +370,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
359 return string.Format( 370 return string.Format(
360 "{0,7} {1,7} {2,7} {3,9} {4,7} {5,7} {6,7} {7,7} {8,7} {9,8} {10,7} {11,7}", 371 "{0,7} {1,7} {2,7} {3,9} {4,7} {5,7} {6,7} {7,7} {8,7} {9,8} {10,7} {11,7}",
361 Util.EnvironmentTickCountSubtract(TickLastPacketReceived), 372 Util.EnvironmentTickCountSubtract(TickLastPacketReceived),
362 PacketsReceived, 373 PacketsReceived,
363 PacketsSent, 374 PacketsSent,
364 PacketsResent, 375 PacketsResent,
365 UnackedBytes, 376 UnackedBytes,
@@ -379,16 +390,22 @@ namespace OpenSim.Region.ClientStack.LindenUDP
379 { 390 {
380 int newPacketsReceived = PacketsReceived - m_packetsReceivedReported; 391 int newPacketsReceived = PacketsReceived - m_packetsReceivedReported;
381 int newPacketsSent = PacketsSent - m_packetsSentReported; 392 int newPacketsSent = PacketsSent - m_packetsSentReported;
382 393 int newPacketUnAck = UnackedBytes - m_packetsUnAckReported;
383 callback(newPacketsReceived, newPacketsSent, UnackedBytes); 394 callback(newPacketsReceived, newPacketsSent, UnackedBytes);
384 395
385 m_packetsReceivedReported += newPacketsReceived; 396 m_packetsReceivedReported += newPacketsReceived;
386 m_packetsSentReported += newPacketsSent; 397 m_packetsSentReported += newPacketsSent;
398 m_packetsUnAckReported += newPacketUnAck;
387 } 399 }
388 } 400 }
389 401
390 public void SetThrottles(byte[] throttleData) 402 public void SetThrottles(byte[] throttleData)
391 { 403 {
404 SetThrottles(throttleData, 1.0f);
405 }
406
407 public void SetThrottles(byte[] throttleData, float factor)
408 {
392 byte[] adjData; 409 byte[] adjData;
393 int pos = 0; 410 int pos = 0;
394 411
@@ -408,24 +425,21 @@ namespace OpenSim.Region.ClientStack.LindenUDP
408 } 425 }
409 426
410 // 0.125f converts from bits to bytes 427 // 0.125f converts from bits to bytes
411 int resend = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4; 428 float scale = 0.125f * factor;
412 int land = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4; 429 int resend = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
413 int wind = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4; 430 int land = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
414 int cloud = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4; 431 int wind = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
415 int task = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4; 432 int cloud = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
416 int texture = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); pos += 4; 433 int task = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
417 int asset = (int)(BitConverter.ToSingle(adjData, pos) * 0.125f); 434 int texture = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
435 int asset = (int)(BitConverter.ToSingle(adjData, pos) * scale);
436
418 437
419 if (ThrottleDebugLevel > 0)
420 {
421 long total = resend + land + wind + cloud + task + texture + asset;
422 m_log.DebugFormat(
423 "[LLUDPCLIENT]: {0} is setting throttles in {1} to Resend={2}, Land={3}, Wind={4}, Cloud={5}, Task={6}, Texture={7}, Asset={8}, TOTAL = {9}",
424 AgentID, m_udpServer.Scene.Name, resend, land, wind, cloud, task, texture, asset, total);
425 }
426 438
427 // Make sure none of the throttles are set below our packet MTU, 439 // Make sure none of the throttles are set below our packet MTU,
428 // otherwise a throttle could become permanently clogged 440 // otherwise a throttle could become permanently clogged
441
442/* now using floats
429 resend = Math.Max(resend, LLUDPServer.MTU); 443 resend = Math.Max(resend, LLUDPServer.MTU);
430 land = Math.Max(land, LLUDPServer.MTU); 444 land = Math.Max(land, LLUDPServer.MTU);
431 wind = Math.Max(wind, LLUDPServer.MTU); 445 wind = Math.Max(wind, LLUDPServer.MTU);
@@ -433,52 +447,54 @@ namespace OpenSim.Region.ClientStack.LindenUDP
433 task = Math.Max(task, LLUDPServer.MTU); 447 task = Math.Max(task, LLUDPServer.MTU);
434 texture = Math.Max(texture, LLUDPServer.MTU); 448 texture = Math.Max(texture, LLUDPServer.MTU);
435 asset = Math.Max(asset, LLUDPServer.MTU); 449 asset = Math.Max(asset, LLUDPServer.MTU);
450*/
436 451
437 // Since most textures are now delivered through http, make it possible 452 // Since most textures are now delivered through http, make it possible
438 // to cannibalize some of the bw from the texture throttle to use for 453 // to cannibalize some of the bw from the texture throttle to use for
439 // the task queue (e.g. object updates) 454 // the task queue (e.g. object updates)
440 task = task + (int)(m_cannibalrate * texture); 455 task = task + (int)(m_cannibalrate * texture);
441 texture = (int)((1 - m_cannibalrate) * texture); 456 texture = (int)((1 - m_cannibalrate) * texture);
442 457
443 //int total = resend + land + wind + cloud + task + texture + asset; 458 int total = resend + land + wind + cloud + task + texture + asset;
459
460 float m_burst = total * m_burstTime;
444 461
445 if (ThrottleDebugLevel > 0) 462 if (ThrottleDebugLevel > 0)
446 { 463 {
447 long total = resend + land + wind + cloud + task + texture + asset;
448 m_log.DebugFormat( 464 m_log.DebugFormat(
449 "[LLUDPCLIENT]: {0} is setting throttles in {1} to Resend={2}, Land={3}, Wind={4}, Cloud={5}, Task={6}, Texture={7}, Asset={8}, TOTAL = {9}", 465 "[LLUDPCLIENT]: {0} is setting throttles in {1} to Resend={2}, Land={3}, Wind={4}, Cloud={5}, Task={6}, Texture={7}, Asset={8}, TOTAL = {9}",
450 AgentID, m_udpServer.Scene.Name, resend, land, wind, cloud, task, texture, asset, total); 466 AgentID, m_udpServer.Scene.Name, resend, land, wind, cloud, task, texture, asset, total);
451 } 467 }
452 468
453 // Update the token buckets with new throttle values
454 if (m_throttleClient.AdaptiveEnabled)
455 {
456 long total = resend + land + wind + cloud + task + texture + asset;
457 m_throttleClient.TargetDripRate = total;
458 }
459
460 TokenBucket bucket; 469 TokenBucket bucket;
461 470
462 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Resend]; 471 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Resend];
463 bucket.RequestedDripRate = resend; 472 bucket.RequestedDripRate = resend;
473 bucket.RequestedBurst = m_burst;
464 474
465 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Land]; 475 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Land];
466 bucket.RequestedDripRate = land; 476 bucket.RequestedDripRate = land;
477 bucket.RequestedBurst = m_burst;
467 478
468 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Wind]; 479 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Wind];
469 bucket.RequestedDripRate = wind; 480 bucket.RequestedDripRate = wind;
481 bucket.RequestedBurst = m_burst;
470 482
471 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Cloud]; 483 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Cloud];
472 bucket.RequestedDripRate = cloud; 484 bucket.RequestedDripRate = cloud;
485 bucket.RequestedBurst = m_burst;
473 486
474 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Asset]; 487 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Asset];
475 bucket.RequestedDripRate = asset; 488 bucket.RequestedDripRate = asset;
489 bucket.RequestedBurst = m_burst;
476 490
477 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Task]; 491 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Task];
478 bucket.RequestedDripRate = task; 492 bucket.RequestedDripRate = task;
493 bucket.RequestedBurst = m_burst;
479 494
480 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Texture]; 495 bucket = m_throttleCategories[(int)ThrottleOutPacketType.Texture];
481 bucket.RequestedDripRate = texture; 496 bucket.RequestedDripRate = texture;
497 bucket.RequestedBurst = m_burst;
482 498
483 // Reset the packed throttles cached data 499 // Reset the packed throttles cached data
484 m_packedThrottles = null; 500 m_packedThrottles = null;
@@ -496,25 +512,27 @@ namespace OpenSim.Region.ClientStack.LindenUDP
496 int i = 0; 512 int i = 0;
497 513
498 // multiply by 8 to convert bytes back to bits 514 // multiply by 8 to convert bytes back to bits
499 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Resend].RequestedDripRate * 8 * multiplier; 515 multiplier *= 8;
516
517 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Resend].RequestedDripRate * multiplier;
500 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4; 518 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
501 519
502 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Land].RequestedDripRate * 8 * multiplier; 520 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Land].RequestedDripRate * multiplier;
503 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4; 521 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
504 522
505 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Wind].RequestedDripRate * 8 * multiplier; 523 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Wind].RequestedDripRate * multiplier;
506 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4; 524 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
507 525
508 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Cloud].RequestedDripRate * 8 * multiplier; 526 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Cloud].RequestedDripRate * multiplier;
509 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4; 527 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
510 528
511 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Task].RequestedDripRate * 8 * multiplier; 529 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Task].RequestedDripRate * multiplier;
512 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4; 530 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
513 531
514 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Texture].RequestedDripRate * 8 * multiplier; 532 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Texture].RequestedDripRate * multiplier;
515 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4; 533 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
516 534
517 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Asset].RequestedDripRate * 8 * multiplier; 535 rate = (float)m_throttleCategories[(int)ThrottleOutPacketType.Asset].RequestedDripRate * multiplier;
518 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4; 536 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
519 537
520 m_packedThrottles = data; 538 m_packedThrottles = data;
@@ -523,43 +541,65 @@ namespace OpenSim.Region.ClientStack.LindenUDP
523 return data; 541 return data;
524 } 542 }
525 543
544 public int GetCatBytesCanSend(ThrottleOutPacketType cat, int timeMS)
545 {
546 int icat = (int)cat;
547 if (icat > 0 && icat < THROTTLE_CATEGORY_COUNT)
548 {
549 TokenBucket bucket = m_throttleCategories[icat];
550 return bucket.GetCatBytesCanSend(timeMS);
551 }
552 else
553 return 0;
554 }
555
526 /// <summary> 556 /// <summary>
527 /// Queue an outgoing packet if appropriate. 557 /// Queue an outgoing packet if appropriate.
528 /// </summary> 558 /// </summary>
529 /// <param name="packet"></param> 559 /// <param name="packet"></param>
530 /// <param name="forceQueue">Always queue the packet if at all possible.</param> 560 /// <param name="forceQueue">Always queue the packet if at all possible.</param>
531 /// <returns> 561 /// <returns>
532 /// true if the packet has been queued, 562 /// true if the packet has been queued,
533 /// false if the packet has not been queued and should be sent immediately. 563 /// false if the packet has not been queued and should be sent immediately.
534 /// </returns> 564 /// </returns>
535 public bool EnqueueOutgoing(OutgoingPacket packet, bool forceQueue) 565 public bool EnqueueOutgoing(OutgoingPacket packet, bool forceQueue)
536 { 566 {
567 return EnqueueOutgoing(packet, forceQueue, false);
568 }
569
570 public bool EnqueueOutgoing(OutgoingPacket packet, bool forceQueue, bool highPriority)
571 {
537 int category = (int)packet.Category; 572 int category = (int)packet.Category;
538 573
539 if (category >= 0 && category < m_packetOutboxes.Length) 574 if (category >= 0 && category < m_packetOutboxes.Length)
540 { 575 {
541 OpenSim.Framework.LocklessQueue<OutgoingPacket> queue = m_packetOutboxes[category]; 576 DoubleLocklessQueue<OutgoingPacket> queue = m_packetOutboxes[category];
577
578 if (m_deliverPackets == false)
579 {
580 queue.Enqueue(packet, highPriority);
581 return true;
582 }
583
542 TokenBucket bucket = m_throttleCategories[category]; 584 TokenBucket bucket = m_throttleCategories[category];
543 585
544 // Don't send this packet if there is already a packet waiting in the queue 586 // Don't send this packet if queue is not empty
545 // even if we have the tokens to send it, tokens should go to the already 587 if (queue.Count > 0 || m_nextPackets[category] != null)
546 // queued packets
547 if (queue.Count > 0)
548 { 588 {
549 queue.Enqueue(packet); 589 queue.Enqueue(packet, highPriority);
550 return true; 590 return true;
551 } 591 }
552 592
553 593 if (!forceQueue && bucket.CheckTokens(packet.Buffer.DataLength))
554 if (!forceQueue && bucket.RemoveTokens(packet.Buffer.DataLength))
555 { 594 {
556 // Enough tokens were removed from the bucket, the packet will not be queued 595 // enough tokens so it can be sent imediatly by caller
596 bucket.RemoveTokens(packet.Buffer.DataLength);
557 return false; 597 return false;
558 } 598 }
559 else 599 else
560 { 600 {
561 // Force queue specified or not enough tokens in the bucket, queue this packet 601 // Force queue specified or not enough tokens in the bucket, queue this packet
562 queue.Enqueue(packet); 602 queue.Enqueue(packet, highPriority);
563 return true; 603 return true;
564 } 604 }
565 } 605 }
@@ -568,28 +608,31 @@ namespace OpenSim.Region.ClientStack.LindenUDP
568 // We don't have a token bucket for this category, so it will not be queued 608 // We don't have a token bucket for this category, so it will not be queued
569 return false; 609 return false;
570 } 610 }
611
571 } 612 }
572 613
573 /// <summary> 614 /// <summary>
574 /// Loops through all of the packet queues for this client and tries to send 615 /// Loops through all of the packet queues for this client and tries to send
575 /// an outgoing packet from each, obeying the throttling bucket limits 616 /// an outgoing packet from each, obeying the throttling bucket limits
576 /// </summary> 617 /// </summary>
577 /// 618 ///
578 /// <remarks> 619 /// <remarks>
579 /// Packet queues are inspected in ascending numerical order starting from 0. Therefore, queues with a lower 620 /// Packet queues are inspected in ascending numerical order starting from 0. Therefore, queues with a lower
580 /// ThrottleOutPacketType number will see their packet get sent first (e.g. if both Land and Wind queues have 621 /// ThrottleOutPacketType number will see their packet get sent first (e.g. if both Land and Wind queues have
581 /// packets, then the packet at the front of the Land queue will be sent before the packet at the front of the 622 /// packets, then the packet at the front of the Land queue will be sent before the packet at the front of the
582 /// wind queue). 623 /// wind queue).
583 /// 624 ///
584 /// This function is only called from a synchronous loop in the 625 /// This function is only called from a synchronous loop in the
585 /// UDPServer so we don't need to bother making this thread safe 626 /// UDPServer so we don't need to bother making this thread safe
586 /// </remarks> 627 /// </remarks>
587 /// 628 ///
588 /// <returns>True if any packets were sent, otherwise false</returns> 629 /// <returns>True if any packets were sent, otherwise false</returns>
589 public bool DequeueOutgoing() 630 public bool DequeueOutgoing()
590 { 631 {
591 OutgoingPacket packet; 632// if (m_deliverPackets == false) return false;
592 OpenSim.Framework.LocklessQueue<OutgoingPacket> queue; 633
634 OutgoingPacket packet = null;
635 DoubleLocklessQueue<OutgoingPacket> queue;
593 TokenBucket bucket; 636 TokenBucket bucket;
594 bool packetSent = false; 637 bool packetSent = false;
595 ThrottleOutPacketTypeFlags emptyCategories = 0; 638 ThrottleOutPacketTypeFlags emptyCategories = 0;
@@ -613,6 +656,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
613 m_udpServer.SendPacketFinal(nextPacket); 656 m_udpServer.SendPacketFinal(nextPacket);
614 m_nextPackets[i] = null; 657 m_nextPackets[i] = null;
615 packetSent = true; 658 packetSent = true;
659
660 if (m_packetOutboxes[i].Count < 5)
661 emptyCategories |= CategoryToFlag(i);
616 } 662 }
617 } 663 }
618 else 664 else
@@ -620,32 +666,47 @@ namespace OpenSim.Region.ClientStack.LindenUDP
620 // No dequeued packet waiting to be sent, try to pull one off 666 // No dequeued packet waiting to be sent, try to pull one off
621 // this queue 667 // this queue
622 queue = m_packetOutboxes[i]; 668 queue = m_packetOutboxes[i];
623 if (queue.Dequeue(out packet)) 669 if (queue != null)
624 { 670 {
625 // A packet was pulled off the queue. See if we have 671 bool success = false;
626 // enough tokens in the bucket to send it out 672 try
627 if (bucket.RemoveTokens(packet.Buffer.DataLength))
628 { 673 {
629 // Send the packet 674 success = queue.Dequeue(out packet);
630 m_udpServer.SendPacketFinal(packet);
631 packetSent = true;
632 } 675 }
633 else 676 catch
634 { 677 {
635 // Save the dequeued packet for the next iteration 678 m_packetOutboxes[i] = new DoubleLocklessQueue<OutgoingPacket>();
636 m_nextPackets[i] = packet;
637 } 679 }
680 if (success)
681 {
682 // A packet was pulled off the queue. See if we have
683 // enough tokens in the bucket to send it out
684 if (bucket.RemoveTokens(packet.Buffer.DataLength))
685 {
686 // Send the packet
687 m_udpServer.SendPacketFinal(packet);
688 packetSent = true;
689
690 if (queue.Count < 5)
691 emptyCategories |= CategoryToFlag(i);
692 }
693 else
694 {
695 // Save the dequeued packet for the next iteration
696 m_nextPackets[i] = packet;
697 }
638 698
639 // If the queue is empty after this dequeue, fire the queue 699 }
640 // empty callback now so it has a chance to fill before we 700 else
641 // get back here 701 {
642 if (queue.Count == 0) 702 // No packets in this queue. Fire the queue empty callback
703 // if it has not been called recently
643 emptyCategories |= CategoryToFlag(i); 704 emptyCategories |= CategoryToFlag(i);
705 }
644 } 706 }
645 else 707 else
646 { 708 {
647 // No packets in this queue. Fire the queue empty callback 709 m_packetOutboxes[i] = new DoubleLocklessQueue<OutgoingPacket>();
648 // if it has not been called recently
649 emptyCategories |= CategoryToFlag(i); 710 emptyCategories |= CategoryToFlag(i);
650 } 711 }
651 } 712 }
@@ -712,6 +773,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
712 RTO = Math.Min(RTO * 2, m_maxRTO); 773 RTO = Math.Min(RTO * 2, m_maxRTO);
713 } 774 }
714 775
776 const double MIN_CALLBACK_MS = 20.0;
777 private bool m_isQueueEmptyRunning;
778
715 /// <summary> 779 /// <summary>
716 /// Does an early check to see if this queue empty callback is already 780 /// Does an early check to see if this queue empty callback is already
717 /// running, then asynchronously firing the event 781 /// running, then asynchronously firing the event
@@ -719,44 +783,27 @@ namespace OpenSim.Region.ClientStack.LindenUDP
719 /// <param name="categories">Throttle categories to fire the callback for</param> 783 /// <param name="categories">Throttle categories to fire the callback for</param>
720 private void BeginFireQueueEmpty(ThrottleOutPacketTypeFlags categories) 784 private void BeginFireQueueEmpty(ThrottleOutPacketTypeFlags categories)
721 { 785 {
722// if (m_nextOnQueueEmpty != 0 && (Environment.TickCount & Int32.MaxValue) >= m_nextOnQueueEmpty) 786 if (!m_isQueueEmptyRunning)
723 if (!m_isQueueEmptyRunning && (Environment.TickCount & Int32.MaxValue) >= m_nextOnQueueEmpty)
724 { 787 {
725 m_isQueueEmptyRunning = true; 788 if (!HasUpdates(categories))
789 return;
726 790
727 int start = Environment.TickCount & Int32.MaxValue; 791 double start = Util.GetTimeStampMS();
728 const int MIN_CALLBACK_MS = 30; 792 if (start < m_nextOnQueueEmpty)
793 return;
729 794
795 m_isQueueEmptyRunning = true;
730 m_nextOnQueueEmpty = start + MIN_CALLBACK_MS; 796 m_nextOnQueueEmpty = start + MIN_CALLBACK_MS;
731 if (m_nextOnQueueEmpty == 0)
732 m_nextOnQueueEmpty = 1;
733 797
734 // Use a value of 0 to signal that FireQueueEmpty is running 798 // Asynchronously run the callback
735// m_nextOnQueueEmpty = 0; 799 if (m_udpServer.OqrEngine.IsRunning)
736 800 m_udpServer.OqrEngine.QueueJob(AgentID.ToString(), () => FireQueueEmpty(categories));
737 m_categories = categories;
738
739 if (HasUpdates(m_categories))
740 {
741 if (!m_udpServer.OqrEngine.IsRunning)
742 {
743 // Asynchronously run the callback
744 Util.FireAndForget(FireQueueEmpty, categories, "LLUDPClient.BeginFireQueueEmpty");
745 }
746 else
747 {
748 m_udpServer.OqrEngine.QueueJob(AgentID.ToString(), () => FireQueueEmpty(categories));
749 }
750 }
751 else 801 else
752 { 802 Util.FireAndForget(FireQueueEmpty, categories, "LLUDPClient.BeginFireQueueEmpty");
753 m_isQueueEmptyRunning = false;
754 }
755 } 803 }
756 } 804 }
757 805
758 private bool m_isQueueEmptyRunning; 806
759 private ThrottleOutPacketTypeFlags m_categories = 0;
760 807
761 /// <summary> 808 /// <summary>
762 /// Fires the OnQueueEmpty callback and sets the minimum time that it 809 /// Fires the OnQueueEmpty callback and sets the minimum time that it
@@ -767,33 +814,33 @@ namespace OpenSim.Region.ClientStack.LindenUDP
767 /// signature</param> 814 /// signature</param>
768 public void FireQueueEmpty(object o) 815 public void FireQueueEmpty(object o)
769 { 816 {
770// m_log.DebugFormat("[LLUDPCLIENT]: FireQueueEmpty for {0} in {1}", AgentID, m_udpServer.Scene.Name); 817 ThrottleOutPacketTypeFlags categories = (ThrottleOutPacketTypeFlags)o;
771 818 QueueEmpty callback = OnQueueEmpty;
772// int start = Environment.TickCount & Int32.MaxValue;
773// const int MIN_CALLBACK_MS = 30;
774 819
775// if (m_udpServer.IsRunningOutbound) 820 if (callback != null)
776// { 821 {
777 ThrottleOutPacketTypeFlags categories = (ThrottleOutPacketTypeFlags)o; 822 // if (m_udpServer.IsRunningOutbound)
778 QueueEmpty callback = OnQueueEmpty; 823 // {
779 824 try { callback(categories); }
780 if (callback != null) 825 catch (Exception e) { m_log.Error("[LLUDPCLIENT]: OnQueueEmpty(" + categories + ") threw an exception: " + e.Message, e); }
781 { 826 // }
782// if (m_udpServer.IsRunningOutbound) 827 }
783// {
784 try { callback(categories); }
785 catch (Exception e) { m_log.Error("[LLUDPCLIENT]: OnQueueEmpty(" + categories + ") threw an exception: " + e.Message, e); }
786// }
787 }
788// }
789 828
790// m_nextOnQueueEmpty = start + MIN_CALLBACK_MS; 829 m_isQueueEmptyRunning = false;
791// if (m_nextOnQueueEmpty == 0) 830 }
792// m_nextOnQueueEmpty = 1;
793 831
794// } 832 internal void ForceThrottleSetting(int throttle, int setting)
833 {
834 if (throttle > 0 && throttle < THROTTLE_CATEGORY_COUNT)
835 m_throttleCategories[throttle].RequestedDripRate = Math.Max(setting, LLUDPServer.MTU);
836 }
795 837
796 m_isQueueEmptyRunning = false; 838 internal int GetThrottleSetting(int throttle)
839 {
840 if (throttle > 0 && throttle < THROTTLE_CATEGORY_COUNT)
841 return (int)m_throttleCategories[throttle].RequestedDripRate;
842 else
843 return 0;
797 } 844 }
798 845
799 /// <summary> 846 /// <summary>
@@ -839,4 +886,33 @@ namespace OpenSim.Region.ClientStack.LindenUDP
839 } 886 }
840 } 887 }
841 } 888 }
889
890 public class DoubleLocklessQueue<T> : OpenSim.Framework.LocklessQueue<T>
891 {
892 OpenSim.Framework.LocklessQueue<T> highQueue = new OpenSim.Framework.LocklessQueue<T>();
893
894 public override int Count
895 {
896 get
897 {
898 return base.Count + highQueue.Count;
899 }
900 }
901
902 public override bool Dequeue(out T item)
903 {
904 if (highQueue.Dequeue(out item))
905 return true;
906
907 return base.Dequeue(out item);
908 }
909
910 public void Enqueue(T item, bool highPriority)
911 {
912 if (highPriority)
913 highQueue.Enqueue(item);
914 else
915 Enqueue(item);
916 }
917 }
842} 918}