aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/linden/indra/llmessage/llcircuit.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'linden/indra/llmessage/llcircuit.cpp')
-rw-r--r--linden/indra/llmessage/llcircuit.cpp1401
1 files changed, 1401 insertions, 0 deletions
diff --git a/linden/indra/llmessage/llcircuit.cpp b/linden/indra/llmessage/llcircuit.cpp
new file mode 100644
index 0000000..7d3136b
--- /dev/null
+++ b/linden/indra/llmessage/llcircuit.cpp
@@ -0,0 +1,1401 @@
1/**
2 * @file llcircuit.cpp
3 * @brief Class to track UDP endpoints for the message system.
4 *
5 * Copyright (c) 2002-2007, Linden Research, Inc.
6 *
7 * The source code in this file ("Source Code") is provided by Linden Lab
8 * to you under the terms of the GNU General Public License, version 2.0
9 * ("GPL"), unless you have obtained a separate licensing agreement
10 * ("Other License"), formally executed by you and Linden Lab. Terms of
11 * the GPL can be found in doc/GPL-license.txt in this distribution, or
12 * online at http://secondlife.com/developers/opensource/gplv2
13 *
14 * There are special exceptions to the terms and conditions of the GPL as
15 * it is applied to this Source Code. View the full text of the exception
16 * in the file doc/FLOSS-exception.txt in this software distribution, or
17 * online at http://secondlife.com/developers/opensource/flossexception
18 *
19 * By copying, modifying or distributing this software, you acknowledge
20 * that you have read and understood your obligations described above,
21 * and agree to abide by those obligations.
22 *
23 * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO
24 * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY,
25 * COMPLETENESS OR PERFORMANCE.
26 */
27
28#include "linden_common.h"
29
30#if LL_WINDOWS
31
32#include <process.h>
33
34#else
35
36#if LL_LINUX
37#include <dlfcn.h> // RTLD_LAZY
38#endif
39#include <sys/types.h>
40#include <sys/socket.h>
41#include <netinet/in.h>
42
43#endif
44
45
46#if !defined(USE_CIRCUIT_LIST)
47#include <algorithm>
48#endif
49#include <sstream>
50#include <iterator>
51#include <stack>
52
53#include "llcircuit.h"
54
55#include "message.h"
56#include "llrand.h"
57#include "llstl.h"
58#include "lltransfermanager.h"
59
60const F32 PING_INTERVAL = 5.f; // seconds
61const S32 PING_START_BLOCK = 3; // How many pings behind we have to be to consider ourself blocked.
62const S32 PING_RELEASE_BLOCK = 2; // How many pings behind we have to be to consider ourself unblocked.
63
64const F32 TARGET_PERIOD_LENGTH = 5.f; // seconds
65const F32 LL_DUPLICATE_SUPPRESSION_TIMEOUT = 60.f; //seconds - this can be long, as time-based cleanup is
66 // only done when wrapping packetids, now...
67
68LLCircuitData::LLCircuitData(const LLHost &host, TPACKETID in_id)
69: mHost (host),
70 mWrapID(0),
71 mPacketsOutID(0),
72 mPacketsInID(in_id),
73 mHighestPacketID(in_id),
74 mTrusted(FALSE),
75 mbAllowTimeout(TRUE),
76 mbAlive(TRUE),
77 mBlocked(FALSE),
78 mPingTime(0.0),
79 mLastPingSendTime(0.0),
80 mLastPingReceivedTime(0.0),
81 mNextPingSendTime(0.0),
82 mPingsInTransit(0),
83 mLastPingID(0),
84 mPingDelay(INITIAL_PING_VALUE_MSEC),
85 mPingDelayAveraged((F32)INITIAL_PING_VALUE_MSEC),
86 mUnackedPacketCount(0),
87 mUnackedPacketBytes(0),
88 mLocalEndPointID(),
89 mPacketsOut(0),
90 mPacketsIn(0),
91 mPacketsLost(0),
92 mBytesIn(0),
93 mBytesOut(0),
94 mLastPeriodLength(-1.f),
95 mBytesInLastPeriod(0),
96 mBytesOutLastPeriod(0),
97 mBytesInThisPeriod(0),
98 mBytesOutThisPeriod(0),
99 mPeakBPSIn(0),
100 mPeakBPSOut(0),
101 mPeriodTime(0.0),
102 mExistenceTimer(),
103 mCurrentResendCount(0)
104{
105 // Need to guarantee that this time is up to date, we may be creating a circuit even though we haven't been
106 // running a message system loop.
107 F64 mt_sec = LLMessageSystem::getMessageTimeSeconds(TRUE);
108 F32 distribution_offset = frand(1.0f);
109
110 mPingTime = mt_sec;
111 mLastPingSendTime = mt_sec + PING_INTERVAL * distribution_offset;
112 mLastPingReceivedTime = mt_sec;
113 mNextPingSendTime = mLastPingSendTime + 0.95*PING_INTERVAL + frand(0.1f*PING_INTERVAL);
114 mPeriodTime = mt_sec;
115
116 mTimeoutCallback = NULL;
117 mTimeoutUserData = NULL;
118
119 mLocalEndPointID.generate();
120}
121
122
123LLCircuitData::~LLCircuitData()
124{
125 LLReliablePacket *packetp = NULL;
126
127 // Clean up all pending transfers.
128 gTransferManager.cleanupConnection(mHost);
129
130 // remove all pending reliable messages on this circuit
131 std::vector<TPACKETID> doomed;
132 reliable_iter iter;
133 reliable_iter end = mUnackedPackets.end();
134 for(iter = mUnackedPackets.begin(); iter != end; ++iter)
135 {
136 packetp = iter->second;
137 gMessageSystem->mFailedResendPackets++;
138 if(gMessageSystem->mVerboseLog)
139 {
140 doomed.push_back(packetp->mPacketID);
141 }
142 if (packetp->mCallback)
143 {
144 packetp->mCallback(packetp->mCallbackData,LL_ERR_CIRCUIT_GONE);
145 }
146
147 // Update stats
148 mUnackedPacketCount--;
149 mUnackedPacketBytes -= packetp->mBufferLength;
150
151 delete packetp;
152 }
153
154 // remove all pending final retry reliable messages on this circuit
155 end = mFinalRetryPackets.end();
156 for(iter = mFinalRetryPackets.begin(); iter != end; ++iter)
157 {
158 packetp = iter->second;
159 gMessageSystem->mFailedResendPackets++;
160 if(gMessageSystem->mVerboseLog)
161 {
162 doomed.push_back(packetp->mPacketID);
163 }
164 if (packetp->mCallback)
165 {
166 packetp->mCallback(packetp->mCallbackData,LL_ERR_CIRCUIT_GONE);
167 }
168
169 // Update stats
170 mUnackedPacketCount--;
171 mUnackedPacketBytes -= packetp->mBufferLength;
172
173 delete packetp;
174 }
175
176 // log aborted reliable packets for this circuit.
177 if(gMessageSystem->mVerboseLog && !doomed.empty())
178 {
179 std::ostringstream str;
180 std::ostream_iterator<TPACKETID> append(str, " ");
181 str << "MSG: -> " << mHost << "\tABORTING RELIABLE:\t";
182 std::copy(doomed.begin(), doomed.end(), append);
183 llinfos << str.str().c_str() << llendl;
184 }
185}
186
187
188void LLCircuitData::ackReliablePacket(TPACKETID packet_num)
189{
190 reliable_iter iter;
191 LLReliablePacket *packetp;
192
193 iter = mUnackedPackets.find(packet_num);
194 if (iter != mUnackedPackets.end())
195 {
196 packetp = iter->second;
197
198 if(gMessageSystem->mVerboseLog)
199 {
200 std::ostringstream str;
201 str << "MSG: <- " << packetp->mHost << "\tRELIABLE ACKED:\t"
202 << packetp->mPacketID;
203 llinfos << str.str().c_str() << llendl;
204 }
205 if (packetp->mCallback)
206 {
207 if (packetp->mTimeout < 0.f) // negative timeout will always return timeout even for successful ack, for debugging
208 {
209 packetp->mCallback(packetp->mCallbackData,LL_ERR_TCP_TIMEOUT);
210 }
211 else
212 {
213 packetp->mCallback(packetp->mCallbackData,LL_ERR_NOERR);
214 }
215 }
216
217 // Update stats
218 mUnackedPacketCount--;
219 mUnackedPacketBytes -= packetp->mBufferLength;
220
221 // Cleanup
222 delete packetp;
223 mUnackedPackets.erase(iter);
224 return;
225 }
226
227 iter = mFinalRetryPackets.find(packet_num);
228 if (iter != mFinalRetryPackets.end())
229 {
230 packetp = iter->second;
231 // llinfos << "Packet " << packet_num << " removed from the pending list" << llendl;
232 if(gMessageSystem->mVerboseLog)
233 {
234 std::ostringstream str;
235 str << "MSG: <- " << packetp->mHost << "\tRELIABLE ACKED:\t"
236 << packetp->mPacketID;
237 llinfos << str.str().c_str() << llendl;
238 }
239 if (packetp->mCallback)
240 {
241 if (packetp->mTimeout < 0.f) // negative timeout will always return timeout even for successful ack, for debugging
242 {
243 packetp->mCallback(packetp->mCallbackData,LL_ERR_TCP_TIMEOUT);
244 }
245 else
246 {
247 packetp->mCallback(packetp->mCallbackData,LL_ERR_NOERR);
248 }
249 }
250
251 // Update stats
252 mUnackedPacketCount--;
253 mUnackedPacketBytes -= packetp->mBufferLength;
254
255 // Cleanup
256 delete packetp;
257 mFinalRetryPackets.erase(iter);
258 }
259 else
260 {
261 // Couldn't find this packet on either of the unacked lists.
262 // maybe it's a duplicate ack?
263 }
264}
265
266
267
268S32 LLCircuitData::resendUnackedPackets(const F64 now)
269{
270 S32 resent_packets = 0;
271 LLReliablePacket *packetp;
272
273
274 //
275 // Theoretically we should search through the list for the packet with the oldest
276 // packet ID, as otherwise when we WRAP we will resend reliable packets out of order.
277 // Since resends are ALREADY out of order, and wrapping is highly rare (16+million packets),
278 // I'm not going to worry about this for now - djs
279 //
280
281 reliable_iter iter;
282 BOOL have_resend_overflow = FALSE;
283 for (iter = mUnackedPackets.begin(); iter != mUnackedPackets.end();)
284 {
285 packetp = iter->second;
286
287 // Only check overflow if we haven't had one yet.
288 if (!have_resend_overflow)
289 {
290 have_resend_overflow = mThrottles.checkOverflow(TC_RESEND, 0);
291 }
292
293 if (have_resend_overflow)
294 {
295 // We've exceeded our bandwidth for resends.
296 // Time to stop trying to send them.
297
298 // If we have too many unacked packets, we need to start dropping expired ones.
299 if (mUnackedPacketBytes > 512000)
300 {
301 if (now > packetp->mExpirationTime)
302 {
303 // This circuit has overflowed. Do not retry. Do not pass go.
304 packetp->mRetries = 0;
305 // Remove it from this list and add it to the final list.
306 mUnackedPackets.erase(iter++);
307 mFinalRetryPackets[packetp->mPacketID] = packetp;
308 }
309 else
310 {
311 ++iter;
312 }
313 // Move on to the next unacked packet.
314 continue;
315 }
316
317 if (mUnackedPacketBytes > 256000 && !(getPacketsOut() % 1024))
318 {
319 // Warn if we've got a lot of resends waiting.
320 llwarns << mHost << " has " << mUnackedPacketBytes
321 << " bytes of reliable messages waiting" << llendl;
322 }
323 // Stop resending. There are less than 512000 unacked packets.
324 break;
325 }
326
327 if (now > packetp->mExpirationTime)
328 {
329 packetp->mRetries--;
330
331 // retry
332 mCurrentResendCount++;
333
334 gMessageSystem->mResentPackets++;
335
336 if(gMessageSystem->mVerboseLog)
337 {
338 std::ostringstream str;
339 str << "MSG: -> " << packetp->mHost
340 << "\tRESENDING RELIABLE:\t" << packetp->mPacketID;
341 llinfos << str.str().c_str() << llendl;
342 }
343
344 packetp->mBuffer[0] |= LL_RESENT_FLAG; // tag packet id as being a resend
345
346 gMessageSystem->mPacketRing.sendPacket(packetp->mSocket,
347 (char *)packetp->mBuffer, packetp->mBufferLength,
348 packetp->mHost);
349
350 mThrottles.throttleOverflow(TC_RESEND, packetp->mBufferLength * 8.f);
351
352 // The new method, retry time based on ping
353 if (packetp->mPingBasedRetry)
354 {
355 packetp->mExpirationTime = now + llmax(LL_MINIMUM_RELIABLE_TIMEOUT_SECONDS, (LL_RELIABLE_TIMEOUT_FACTOR * getPingDelayAveraged()));
356 }
357 else
358 {
359 // custom, constant retry time
360 packetp->mExpirationTime = now + packetp->mTimeout;
361 }
362
363 if (!packetp->mRetries)
364 {
365 // Last resend, remove it from this list and add it to the final list.
366 mUnackedPackets.erase(iter++);
367 mFinalRetryPackets[packetp->mPacketID] = packetp;
368 }
369 else
370 {
371 // Don't remove it yet, it still gets to try to resend at least once.
372 ++iter;
373 }
374 resent_packets++;
375 }
376 else
377 {
378 // Don't need to do anything with this packet, keep iterating.
379 ++iter;
380 }
381 }
382
383
384 for (iter = mFinalRetryPackets.begin(); iter != mFinalRetryPackets.end();)
385 {
386 packetp = iter->second;
387 if (now > packetp->mExpirationTime)
388 {
389 // fail (too many retries)
390 //llinfos << "Packet " << packetp->mPacketID << " removed from the pending list: exceeded retry limit" << llendl;
391 //if (packetp->mMessageName)
392 //{
393 // llinfos << "Packet name " << packetp->mMessageName << llendl;
394 //}
395 gMessageSystem->mFailedResendPackets++;
396
397 if(gMessageSystem->mVerboseLog)
398 {
399 std::ostringstream str;
400 str << "MSG: -> " << packetp->mHost << "\tABORTING RELIABLE:\t"
401 << packetp->mPacketID;
402 llinfos << str.str().c_str() << llendl;
403 }
404
405 if (packetp->mCallback)
406 {
407 packetp->mCallback(packetp->mCallbackData,LL_ERR_TCP_TIMEOUT);
408 }
409
410 // Update stats
411 mUnackedPacketCount--;
412 mUnackedPacketBytes -= packetp->mBufferLength;
413
414 mFinalRetryPackets.erase(iter++);
415 delete packetp;
416 }
417 else
418 {
419 ++iter;
420 }
421 }
422
423 return mUnackedPacketCount;
424}
425
426
427LLCircuit::LLCircuit() : mLastCircuit(NULL)
428{
429}
430
431LLCircuit::~LLCircuit()
432{
433 // delete pointers in the map.
434 std::for_each(mCircuitData.begin(),
435 mCircuitData.end(),
436 llcompose1(
437 DeletePointerFunctor<LLCircuitData>(),
438 llselect2nd<circuit_data_map::value_type>()));
439}
440
441LLCircuitData *LLCircuit::addCircuitData(const LLHost &host, TPACKETID in_id)
442{
443 // This should really validate if one already exists
444 llinfos << "LLCircuit::addCircuitData for " << host << llendl;
445 LLCircuitData *tempp = new LLCircuitData(host, in_id);
446 mCircuitData.insert(circuit_data_map::value_type(host, tempp));
447 mPingSet.insert(tempp);
448
449 mLastCircuit = tempp;
450 return tempp;
451}
452
453void LLCircuit::removeCircuitData(const LLHost &host)
454{
455 llinfos << "LLCircuit::removeCircuitData for " << host << llendl;
456 mLastCircuit = NULL;
457 circuit_data_map::iterator it = mCircuitData.find(host);
458 if(it != mCircuitData.end())
459 {
460 LLCircuitData *cdp = it->second;
461 mCircuitData.erase(it);
462
463 LLCircuit::ping_set_t::iterator psit = mPingSet.find(cdp);
464 if (psit != mPingSet.end())
465 {
466 mPingSet.erase(psit);
467 }
468 else
469 {
470 llwarns << "Couldn't find entry for next ping in ping set!" << llendl;
471 }
472
473 // Clean up from optimization maps
474 mUnackedCircuitMap.erase(host);
475 mSendAckMap.erase(host);
476 delete cdp;
477 }
478
479 // This also has to happen AFTER we nuke the circuit, because various
480 // callbacks for the circuit may result in messages being sent to
481 // this circuit, and the setting of mLastCircuit. We don't check
482 // if the host matches, but we don't really care because mLastCircuit
483 // is an optimization, and this happens VERY rarely.
484 mLastCircuit = NULL;
485}
486
487void LLCircuitData::setAlive(BOOL b_alive)
488{
489 if (mbAlive != b_alive)
490 {
491 mPacketsOutID = 0;
492 mPacketsInID = 0;
493 mbAlive = b_alive;
494 }
495 if (b_alive)
496 {
497 mLastPingReceivedTime = LLMessageSystem::getMessageTimeSeconds();
498 mPingsInTransit = 0;
499 mBlocked = FALSE;
500 }
501}
502
503
504void LLCircuitData::setAllowTimeout(BOOL allow)
505{
506 mbAllowTimeout = allow;
507
508 if (allow)
509 {
510 // resuming circuit
511 // make sure it's alive
512 setAlive(TRUE);
513 }
514}
515
516
517// Reset per-period counters if necessary.
518void LLCircuitData::checkPeriodTime()
519{
520 F64 mt_sec = LLMessageSystem::getMessageTimeSeconds();
521 F64 period_length = mt_sec - mPeriodTime;
522 if ( period_length > TARGET_PERIOD_LENGTH)
523 {
524 F32 bps_in = (F32)(mBytesInThisPeriod * 8.f / period_length);
525 if (bps_in > mPeakBPSIn)
526 {
527 mPeakBPSIn = bps_in;
528 }
529
530 F32 bps_out = (F32)(mBytesOutThisPeriod * 8.f / period_length);
531 if (bps_out > mPeakBPSOut)
532 {
533 mPeakBPSOut = bps_out;
534 }
535
536 mBytesInLastPeriod = mBytesInThisPeriod;
537 mBytesOutLastPeriod = mBytesOutThisPeriod;
538 mBytesInThisPeriod = 0;
539 mBytesOutThisPeriod = 0;
540 mLastPeriodLength = (F32)period_length;
541
542 mPeriodTime = mt_sec;
543 }
544}
545
546
547void LLCircuitData::addBytesIn(S32 bytes)
548{
549 mBytesIn += bytes;
550 mBytesInThisPeriod += bytes;
551}
552
553
554void LLCircuitData::addBytesOut(S32 bytes)
555{
556 mBytesOut += bytes;
557 mBytesOutThisPeriod += bytes;
558}
559
560
561void LLCircuitData::addReliablePacket(S32 mSocket, U8 *buf_ptr, S32 buf_len, LLReliablePacketParams *params)
562{
563 LLReliablePacket *packet_info;
564
565 packet_info = new LLReliablePacket(mSocket, buf_ptr, buf_len, params);
566
567 mUnackedPacketCount++;
568 mUnackedPacketBytes += packet_info->mBufferLength;
569
570 if (params && params->mRetries)
571 {
572 mUnackedPackets[packet_info->mPacketID] = packet_info;
573 }
574 else
575 {
576 mFinalRetryPackets[packet_info->mPacketID] = packet_info;
577 }
578}
579
580
581void LLCircuit::resendUnackedPackets(S32& unacked_list_length, S32& unacked_list_size)
582{
583 F64 now = LLMessageSystem::getMessageTimeSeconds();
584 unacked_list_length = 0;
585 unacked_list_size = 0;
586
587 LLCircuitData* circ;
588 circuit_data_map::iterator end = mUnackedCircuitMap.end();
589 for(circuit_data_map::iterator it = mUnackedCircuitMap.begin(); it != end; ++it)
590 {
591 circ = (*it).second;
592 unacked_list_length += circ->resendUnackedPackets(now);
593 unacked_list_size += circ->getUnackedPacketBytes();
594 }
595}
596
597
598BOOL LLCircuitData::isDuplicateResend(TPACKETID packetnum)
599{
600 return (mRecentlyReceivedReliablePackets.find(packetnum) != mRecentlyReceivedReliablePackets.end());
601}
602
603
604void LLCircuit::dumpResends()
605{
606 circuit_data_map::iterator end = mCircuitData.end();
607 for(circuit_data_map::iterator it = mCircuitData.begin(); it != end; ++it)
608 {
609 (*it).second->dumpResendCountAndReset();
610 }
611}
612
613LLCircuitData* LLCircuit::findCircuit(const LLHost& host) const
614{
615 // An optimization on finding the previously found circuit.
616 if (mLastCircuit && (mLastCircuit->mHost == host))
617 {
618 return mLastCircuit;
619 }
620
621 circuit_data_map::const_iterator it = mCircuitData.find(host);
622 if(it == mCircuitData.end())
623 {
624 return NULL;
625 }
626 mLastCircuit = it->second;
627 return mLastCircuit;
628}
629
630
631BOOL LLCircuit::isCircuitAlive(const LLHost& host) const
632{
633 LLCircuitData *cdp = findCircuit(host);
634 if(cdp)
635 {
636 return cdp->mbAlive;
637 }
638
639 return FALSE;
640}
641
642void LLCircuitData::setTimeoutCallback(void (*callback_func)(const LLHost &host, void *user_data), void *user_data)
643{
644 mTimeoutCallback = callback_func;
645 mTimeoutUserData = user_data;
646}
647
648void LLCircuitData::checkPacketInID(TPACKETID id, BOOL receive_resent)
649{
650 // Done as floats so we don't have to worry about running out of room
651 // with U32 getting poked into an S32.
652 F32 delta = (F32)mHighestPacketID - (F32)id;
653 if (delta > (0.5f*LL_MAX_OUT_PACKET_ID))
654 {
655 // We've almost definitely wrapped, reset the mLastPacketID to be low again.
656 mHighestPacketID = id;
657 }
658 else if (delta < (-0.5f*LL_MAX_OUT_PACKET_ID))
659 {
660 // This is almost definitely an old packet coming in after a wrap, ignore it.
661 }
662 else
663 {
664 mHighestPacketID = llmax(mHighestPacketID, id);
665 }
666
667
668 // Have we received anything on this circuit yet?
669 if (0 == mPacketsIn)
670 {
671 // Must be first packet from unclosed circuit.
672 mPacketsIn++;
673 setPacketInID((id + 1) % LL_MAX_OUT_PACKET_ID);
674
675 return;
676 }
677
678 mPacketsIn++;
679
680
681 // now, check to see if we've got a gap
682 if ((mPacketsInID == id))
683 {
684 // nope! bump and wrap the counter, then return
685 mPacketsInID++;
686 mPacketsInID = (mPacketsInID) % LL_MAX_OUT_PACKET_ID;
687 }
688 else if (id < mWrapID)
689 {
690 // id < mWrapID will happen if the first few packets are out of order. . .
691 // at that point we haven't marked anything "potentially lost" and
692 // the out-of-order packet will cause a full wrap marking all the IDs "potentially lost"
693
694 // do nothing
695 }
696 else
697 {
698 // we have a gap! if that id is in the map, remove it from the map, leave mCurrentCircuit->mPacketsInID
699 // alone
700 // otherwise, walk from mCurrentCircuit->mPacketsInID to id with wrapping, adding the values to the map
701 // and setting mPacketsInID to id + 1 % LL_MAX_OUT_PACKET_ID
702
703 if (mPotentialLostPackets.find(id) != mPotentialLostPackets.end())
704 {
705 if(gMessageSystem->mVerboseLog)
706 {
707 std::ostringstream str;
708 str << "MSG: <- " << mHost << "\tRECOVERING LOST:\t" << id;
709 llinfos << str.str().c_str() << llendl;
710 }
711 // llinfos << "removing potential lost: " << id << llendl;
712 mPotentialLostPackets.erase(id);
713 }
714 else if (!receive_resent) // don't freak out over out-of-order reliable resends
715 {
716 U64 time = LLMessageSystem::getMessageTimeUsecs();
717 TPACKETID index = mPacketsInID;
718 S32 gap_count = 0;
719 if ((index < id) && ((id - index) < 16))
720 {
721 while (index != id)
722 {
723 if(gMessageSystem->mVerboseLog)
724 {
725 std::ostringstream str;
726 str << "MSG: <- " << mHost << "\tPACKET GAP:\t"
727 << index;
728 llinfos << str.str().c_str() << llendl;
729 }
730
731// llinfos << "adding potential lost: " << index << llendl;
732 mPotentialLostPackets[index] = time;
733 index++;
734 index = index % LL_MAX_OUT_PACKET_ID;
735 gap_count++;
736 }
737 }
738 else
739 {
740 llinfos << "packet_out_of_order - got packet " << id << " expecting " << index << " from " << mHost << llendl;
741 if(gMessageSystem->mVerboseLog)
742 {
743 std::ostringstream str;
744 str << "MSG: <- " << mHost << "\tPACKET GAP:\t"
745 << id << " expected " << index;
746 llinfos << str.str().c_str() << llendl;
747 }
748 }
749
750 mPacketsInID = id + 1;
751 mPacketsInID = (mPacketsInID) % LL_MAX_OUT_PACKET_ID;
752
753 if (gap_count > 128)
754 {
755 llwarns << "Packet loss gap filler running amok!" << llendl;
756 }
757 else if (gap_count > 16)
758 {
759 llwarns << "Sustaining large amounts of packet loss!" << llendl;
760 }
761
762 }
763 }
764}
765
766
767void LLCircuit::updateWatchDogTimers(LLMessageSystem *msgsys)
768{
769 F64 cur_time = LLMessageSystem::getMessageTimeSeconds();
770 S32 count = mPingSet.size();
771 S32 cur = 0;
772
773 // Only process each circuit once at most, stop processing if no circuits
774 while((cur < count) && !mPingSet.empty())
775 {
776 cur++;
777
778 LLCircuit::ping_set_t::iterator psit = mPingSet.begin();
779 LLCircuitData *cdp = *psit;
780
781 if (!cdp->mbAlive)
782 {
783 // We suspect that this case should never happen, given how
784 // the alive status is set.
785 // Skip over dead circuits, just add the ping interval and push it to the back
786 // Always remember to remove it from the set before changing the sorting
787 // key (mNextPingSendTime)
788 mPingSet.erase(psit);
789 cdp->mNextPingSendTime = cur_time + PING_INTERVAL;
790 mPingSet.insert(cdp);
791 continue;
792 }
793 else
794 {
795 // Check to see if this needs a ping
796 if (cur_time < cdp->mNextPingSendTime)
797 {
798 // This circuit doesn't need a ping, break out because
799 // we have a sorted list, thus no more circuits need pings
800 break;
801 }
802
803 // Update watchdog timers
804 if (cdp->updateWatchDogTimers(msgsys))
805 {
806 // Randomize our pings a bit by doing some up to 5% early or late
807 F64 dt = 0.95f*PING_INTERVAL + frand(0.1f*PING_INTERVAL);
808
809 // Remove it, and reinsert it with the new next ping time.
810 // Always remove before changing the sorting key.
811 mPingSet.erase(psit);
812 cdp->mNextPingSendTime = cur_time + dt;
813 mPingSet.insert(cdp);
814
815 // Update our throttles
816 cdp->mThrottles.dynamicAdjust();
817
818 // Update some stats, this is not terribly important
819 cdp->checkPeriodTime();
820 }
821 else
822 {
823 // This mPingSet.erase isn't necessary, because removing the circuit will
824 // remove the ping set.
825 //mPingSet.erase(psit);
826 removeCircuitData(cdp->mHost);
827 }
828 }
829 }
830}
831
832
833BOOL LLCircuitData::updateWatchDogTimers(LLMessageSystem *msgsys)
834{
835 F64 cur_time = LLMessageSystem::getMessageTimeSeconds();
836 mLastPingSendTime = cur_time;
837
838 if (!checkCircuitTimeout())
839 {
840 // Pass this back to the calling LLCircuit, this circuit needs to be cleaned up.
841 return FALSE;
842 }
843
844 // WARNING!
845 // Duplicate suppression can FAIL if packets are delivered out of
846 // order, although it's EXTREMELY unlikely. It would require
847 // that the ping get delivered out of order enough that the ACK
848 // for the packet that it was out of order with was received BEFORE
849 // the ping was sent.
850
851 // Find the current oldest reliable packetID
852 // This is to handle the case if we actually manage to wrap our
853 // packet IDs - the oldest will actually have a higher packet ID
854 // than the current.
855 BOOL wrapped = FALSE;
856 reliable_iter iter;
857 iter = mUnackedPackets.upper_bound(getPacketOutID());
858 if (iter == mUnackedPackets.end())
859 {
860 // Nothing AFTER this one, so we want the lowest packet ID
861 // then.
862 iter = mUnackedPackets.begin();
863 wrapped = TRUE;
864 }
865
866 TPACKETID packet_id = 0;
867
868 // Check against the "final" packets
869 BOOL wrapped_final = FALSE;
870 reliable_iter iter_final;
871 iter_final = mFinalRetryPackets.upper_bound(getPacketOutID());
872 if (iter_final == mFinalRetryPackets.end())
873 {
874 iter_final = mFinalRetryPackets.begin();
875 wrapped_final = TRUE;
876 }
877
878 //llinfos << mHost << " - unacked count " << mUnackedPackets.size() << llendl;
879 //llinfos << mHost << " - final count " << mFinalRetryPackets.size() << llendl;
880 if (wrapped != wrapped_final)
881 {
882 // One of the "unacked" or "final" lists hasn't wrapped. Whichever one
883 // hasn't has the oldest packet.
884 if (!wrapped)
885 {
886 // Hasn't wrapped, so the one on the
887 // unacked packet list is older
888 packet_id = iter->first;
889 //llinfos << mHost << ": nowrapped unacked" << llendl;
890 }
891 else
892 {
893 packet_id = iter_final->first;
894 //llinfos << mHost << ": nowrapped final" << llendl;
895 }
896 }
897 else
898 {
899 // They both wrapped, we can just use the minimum of the two.
900 if ((iter == mUnackedPackets.end()) && (iter_final == mFinalRetryPackets.end()))
901 {
902 // Wow! No unacked packets at all!
903 // Send the ID of the last packet we sent out.
904 // This will flush all of the destination's
905 // unacked packets, theoretically.
906 //llinfos << mHost << ": No unacked!" << llendl;
907 packet_id = getPacketOutID();
908 }
909 else
910 {
911 BOOL had_unacked = FALSE;
912 if (iter != mUnackedPackets.end())
913 {
914 // Unacked list has the lowest so far
915 packet_id = iter->first;
916 had_unacked = TRUE;
917 //llinfos << mHost << ": Unacked" << llendl;
918 }
919
920 if (iter_final != mFinalRetryPackets.end())
921 {
922 // Use the lowest of the unacked list and the final list
923 if (had_unacked)
924 {
925 // Both had a packet, use the lowest.
926 packet_id = llmin(packet_id, iter_final->first);
927 //llinfos << mHost << ": Min of unacked/final" << llendl;
928 }
929 else
930 {
931 // Only the final had a packet, use it.
932 packet_id = iter_final->first;
933 //llinfos << mHost << ": Final!" << llendl;
934 }
935 }
936 }
937 }
938
939 // Send off the another ping.
940 pingTimerStart();
941 msgsys->newMessageFast(_PREHASH_StartPingCheck);
942 msgsys->nextBlock(_PREHASH_PingID);
943 msgsys->addU8Fast(_PREHASH_PingID, nextPingID());
944 msgsys->addU32Fast(_PREHASH_OldestUnacked, packet_id);
945 msgsys->sendMessage(mHost);
946
947 // Also do lost packet accounting.
948 // Check to see if anything on our lost list is old enough to
949 // be considered lost
950
951 LLCircuitData::packet_time_map::iterator it;
952 U64 timeout = (U64)(1000000.0*llmin(LL_MAX_LOST_TIMEOUT, getPingDelayAveraged() * LL_LOST_TIMEOUT_FACTOR));
953
954 U64 mt_usec = LLMessageSystem::getMessageTimeUsecs();
955 for (it = mPotentialLostPackets.begin(); it != mPotentialLostPackets.end(); )
956 {
957 U64 delta_t_usec = mt_usec - (*it).second;
958 if (delta_t_usec > timeout)
959 {
960 // let's call this one a loss!
961 mPacketsLost++;
962 gMessageSystem->mDroppedPackets++;
963 if(gMessageSystem->mVerboseLog)
964 {
965 std::ostringstream str;
966 str << "MSG: <- " << mHost << "\tLOST PACKET:\t"
967 << (*it).first;
968 llinfos << str.str().c_str() << llendl;
969 }
970 mPotentialLostPackets.erase((*(it++)).first);
971 }
972 else
973 {
974 ++it;
975 }
976 }
977
978 return TRUE;
979}
980
981
982void LLCircuitData::clearDuplicateList(TPACKETID oldest_id)
983{
984 // purge old data from the duplicate suppression queue
985
986 // we want to KEEP all x where oldest_id <= x <= last incoming packet, and delete everything else.
987
988 //llinfos << mHost << ": clearing before oldest " << oldest_id << llendl;
989 //llinfos << "Recent list before: " << mRecentlyReceivedReliablePackets.size() << llendl;
990 if (oldest_id < mHighestPacketID)
991 {
992 // Clean up everything with a packet ID less than oldest_id.
993 packet_time_map::iterator pit_start;
994 packet_time_map::iterator pit_end;
995 pit_start = mRecentlyReceivedReliablePackets.begin();
996 pit_end = mRecentlyReceivedReliablePackets.lower_bound(oldest_id);
997 mRecentlyReceivedReliablePackets.erase(pit_start, pit_end);
998 }
999
1000 // Do timeout checks on everything with an ID > mHighestPacketID.
1001 // This should be empty except for wrapping IDs. Thus, this should be
1002 // highly rare.
1003 U64 mt_usec = LLMessageSystem::getMessageTimeUsecs();
1004
1005 packet_time_map::iterator pit;
1006 for(pit = mRecentlyReceivedReliablePackets.upper_bound(mHighestPacketID);
1007 pit != mRecentlyReceivedReliablePackets.end(); )
1008 {
1009 // Validate that the packet ID seems far enough away
1010 if ((pit->first - mHighestPacketID) < 100)
1011 {
1012 llwarns << "Probably incorrectly timing out non-wrapped packets!" << llendl;
1013 }
1014 U64 delta_t_usec = mt_usec - (*pit).second;
1015 F64 delta_t_sec = delta_t_usec * SEC_PER_USEC;
1016 if (delta_t_sec > LL_DUPLICATE_SUPPRESSION_TIMEOUT)
1017 {
1018 // enough time has elapsed we're not likely to get a duplicate on this one
1019 llinfos << "Clearing " << pit->first << " from recent list" << llendl;
1020 mRecentlyReceivedReliablePackets.erase(pit++);
1021 }
1022 else
1023 {
1024 ++pit;
1025 }
1026 }
1027 //llinfos << "Recent list after: " << mRecentlyReceivedReliablePackets.size() << llendl;
1028}
1029
1030BOOL LLCircuitData::checkCircuitTimeout()
1031{
1032 F64 time_since_last_ping = LLMessageSystem::getMessageTimeSeconds() - mLastPingReceivedTime;
1033
1034 // Nota Bene: This needs to be turned off if you are debugging multiple simulators
1035 if (time_since_last_ping > PING_INTERVAL_MAX)
1036 {
1037 llwarns << "LLCircuitData::checkCircuitTimeout for " << mHost << " last ping " << time_since_last_ping << " seconds ago." <<llendl;
1038 setAlive(FALSE);
1039 if (mTimeoutCallback)
1040 {
1041 llwarns << "LLCircuitData::checkCircuitTimeout for " << mHost << " calling callback." << llendl;
1042 mTimeoutCallback(mHost, mTimeoutUserData);
1043 }
1044 if (!isAlive())
1045 {
1046 // The callback didn't try and resurrect the circuit. We should kill it.
1047 llwarns << "LLCircuitData::checkCircuitTimeout for " << mHost << " still dead, dropping." << llendl;
1048 return FALSE;
1049 }
1050 }
1051 else if (time_since_last_ping > PING_INTERVAL_ALARM)
1052 {
1053 //llwarns << "Unresponsive circuit: " << mHost << ": " << time_since_last_ping << " seconds since last ping."<< llendl;
1054 }
1055 return TRUE;
1056}
1057
1058// Call this method when a reliable message comes in - this will
1059// correctly place the packet in the correct list to be acked later.
1060BOOL LLCircuitData::collectRAck(TPACKETID packet_num)
1061{
1062 if (mAcks.empty())
1063 {
1064 // First extra ack, we need to add ourselves to the list of circuits that need to send acks
1065 gMessageSystem->mCircuitInfo.mSendAckMap[mHost] = this;
1066 }
1067
1068 mAcks.push_back(packet_num);
1069 return TRUE;
1070}
1071
1072// this method is called during the message system processAcks() to
1073// send out any acks that did not get sent already.
1074void LLCircuit::sendAcks()
1075{
1076 LLCircuitData* cd;
1077 circuit_data_map::iterator end = mSendAckMap.end();
1078 for(circuit_data_map::iterator it = mSendAckMap.begin(); it != end; ++it)
1079 {
1080 cd = (*it).second;
1081
1082 S32 count = (S32)cd->mAcks.size();
1083 if(count > 0)
1084 {
1085 // send the packet acks
1086 S32 acks_this_packet = 0;
1087 for(S32 i = 0; i < count; ++i)
1088 {
1089 if(acks_this_packet == 0)
1090 {
1091 gMessageSystem->newMessageFast(_PREHASH_PacketAck);
1092 }
1093 gMessageSystem->nextBlockFast(_PREHASH_Packets);
1094 gMessageSystem->addU32Fast(_PREHASH_ID, cd->mAcks[i]);
1095 ++acks_this_packet;
1096 if(acks_this_packet > 250)
1097 {
1098 gMessageSystem->sendMessage(cd->mHost);
1099 acks_this_packet = 0;
1100 }
1101 }
1102 if(acks_this_packet > 0)
1103 {
1104 gMessageSystem->sendMessage(cd->mHost);
1105 }
1106
1107 if(gMessageSystem->mVerboseLog)
1108 {
1109 std::ostringstream str;
1110 str << "MSG: -> " << cd->mHost << "\tPACKET ACKS:\t";
1111 std::ostream_iterator<TPACKETID> append(str, " ");
1112 std::copy(cd->mAcks.begin(), cd->mAcks.end(), append);
1113 llinfos << str.str().c_str() << llendl;
1114 }
1115
1116 // empty out the acks list
1117 cd->mAcks.clear();
1118 }
1119 }
1120
1121 // All acks have been sent, clear the map
1122 mSendAckMap.clear();
1123}
1124
1125
1126std::ostream& operator<<(std::ostream& s, LLCircuitData& circuit)
1127{
1128 F32 age = circuit.mExistenceTimer.getElapsedTimeF32();
1129
1130 using namespace std;
1131 s << "Circuit " << circuit.mHost << " ";
1132 s << circuit.mRemoteID << " ";
1133 s << (circuit.mbAlive ? "Alive" : "Not Alive") << " ";
1134 s << (circuit.mbAllowTimeout ? "Timeout Allowed" : "Timeout Not Allowed");
1135 s << endl;
1136
1137 s << " Packets Lost: " << circuit.mPacketsLost;
1138 s << " Measured Ping: " << circuit.mPingDelay;
1139 s << " Averaged Ping: " << circuit.mPingDelayAveraged;
1140 s << endl;
1141
1142 s << "Global In/Out " << S32(age) << " sec";
1143 s << " KBytes: " << circuit.mBytesIn / 1024 << "/" << circuit.mBytesOut / 1024;
1144 s << " Kbps: ";
1145 s << S32(circuit.mBytesIn * 8.f / circuit.mExistenceTimer.getElapsedTimeF32() / 1024.f);
1146 s << "/";
1147 s << S32(circuit.mBytesOut * 8.f / circuit.mExistenceTimer.getElapsedTimeF32() / 1024.f);
1148 s << " Packets: " << circuit.mPacketsIn << "/" << circuit.mPacketsOut;
1149 s << endl;
1150
1151 s << "Recent In/Out " << S32(circuit.mLastPeriodLength) << " sec";
1152 s << " KBytes: ";
1153 s << circuit.mBytesInLastPeriod / 1024;
1154 s << "/";
1155 s << circuit.mBytesOutLastPeriod / 1024;
1156 s << " Kbps: ";
1157 s << S32(circuit.mBytesInLastPeriod * 8.f / circuit.mLastPeriodLength / 1024.f);
1158 s << "/";
1159 s << S32(circuit.mBytesOutLastPeriod * 8.f / circuit.mLastPeriodLength / 1024.f);
1160 s << " Peak Kbps: ";
1161 s << S32(circuit.mPeakBPSIn / 1024.f);
1162 s << "/";
1163 s << S32(circuit.mPeakBPSOut / 1024.f);
1164 s << endl;
1165
1166 return s;
1167}
1168
1169const char* LLCircuitData::getInfoString() const
1170{
1171 std::ostringstream info;
1172 info << "Circuit: " << mHost << std::endl
1173 << (mbAlive ? "Alive" : "Not Alive") << std::endl
1174 << "Age: " << mExistenceTimer.getElapsedTimeF32() << std::endl;
1175 return info.str().c_str();
1176}
1177
1178void LLCircuitData::dumpResendCountAndReset()
1179{
1180 if (mCurrentResendCount)
1181 {
1182 llinfos << "Circuit: " << mHost << " resent " << mCurrentResendCount << " packets" << llendl;
1183 mCurrentResendCount = 0;
1184 }
1185}
1186
1187std::ostream& operator<<(std::ostream& s, LLCircuit &circuit)
1188{
1189 s << "Circuit Info:" << std::endl;
1190 LLCircuit::circuit_data_map::iterator end = circuit.mCircuitData.end();
1191 LLCircuit::circuit_data_map::iterator it;
1192 for(it = circuit.mCircuitData.begin(); it != end; ++it)
1193 {
1194 s << *((*it).second) << std::endl;
1195 }
1196 return s;
1197}
1198
1199const char* LLCircuit::getInfoString() const
1200{
1201 std::ostringstream info;
1202 info << "Circuit Info:" << std::endl;
1203 LLCircuit::circuit_data_map::const_iterator end = mCircuitData.end();
1204 LLCircuit::circuit_data_map::const_iterator it;
1205 for(it = mCircuitData.begin(); it != end; ++it)
1206 {
1207 info << (*it).second->getInfoString() << std::endl;
1208 }
1209 return info.str().c_str();
1210}
1211
1212void LLCircuit::getCircuitRange(
1213 const LLHost& key,
1214 LLCircuit::circuit_data_map::iterator& first,
1215 LLCircuit::circuit_data_map::iterator& end)
1216{
1217 end = mCircuitData.end();
1218 first = mCircuitData.upper_bound(key);
1219}
1220
1221TPACKETID LLCircuitData::nextPacketOutID()
1222{
1223 mPacketsOut++;
1224
1225 TPACKETID id;
1226
1227 id = (mPacketsOutID + 1) % LL_MAX_OUT_PACKET_ID;
1228
1229 if (id < mPacketsOutID)
1230 {
1231 // we just wrapped on a circuit, reset the wrap ID to zero
1232 mWrapID = 0;
1233 }
1234 mPacketsOutID = id;
1235 return id;
1236}
1237
1238
1239void LLCircuitData::setPacketInID(TPACKETID id)
1240{
1241 id = id % LL_MAX_OUT_PACKET_ID;
1242 mPacketsInID = id;
1243 mRecentlyReceivedReliablePackets.clear();
1244
1245 mWrapID = id;
1246}
1247
1248
1249void LLCircuitData::pingTimerStop(const U8 ping_id)
1250{
1251 F64 mt_secs = LLMessageSystem::getMessageTimeSeconds();
1252
1253 // Nota Bene: no averaging of ping times until we get a feel for how this works
1254 F64 time = mt_secs - mPingTime;
1255 if (time == 0.0)
1256 {
1257 // Ack, we got our ping response on the same frame! Sigh, let's get a real time otherwise
1258 // all of our ping calculations will be skewed.
1259 mt_secs = LLMessageSystem::getMessageTimeSeconds(TRUE);
1260 }
1261 mLastPingReceivedTime = mt_secs;
1262
1263 // If ping is longer than 1 second, we'll get sequence deltas in the ping.
1264 // Approximate by assuming each ping counts for 1 second (slightly low, probably)
1265 S32 delta_ping = (S32)mLastPingID - (S32) ping_id;
1266 if (delta_ping < 0)
1267 {
1268 delta_ping += 256;
1269 }
1270
1271 U32 msec = (U32) ((delta_ping*PING_INTERVAL + time) * 1000.f);
1272 setPingDelay(msec);
1273
1274 mPingsInTransit = delta_ping;
1275 if (mBlocked && (mPingsInTransit <= PING_RELEASE_BLOCK))
1276 {
1277 mBlocked = FALSE;
1278 }
1279}
1280
1281
1282void LLCircuitData::pingTimerStart()
1283{
1284 mPingTime = LLMessageSystem::getMessageTimeSeconds();
1285 mPingsInTransit++;
1286
1287 if (!mBlocked && (mPingsInTransit > PING_START_BLOCK))
1288 {
1289 mBlocked = TRUE;
1290 }
1291}
1292
1293
1294U32 LLCircuitData::getPacketsIn() const
1295{
1296 return mPacketsIn;
1297}
1298
1299
1300S32 LLCircuitData::getBytesIn() const
1301{
1302 return mBytesIn;
1303}
1304
1305
1306S32 LLCircuitData::getBytesOut() const
1307{
1308 return mBytesOut;
1309}
1310
1311
1312U32 LLCircuitData::getPacketsOut() const
1313{
1314 return mPacketsOut;
1315}
1316
1317
1318TPACKETID LLCircuitData::getPacketOutID() const
1319{
1320 return mPacketsOutID;
1321}
1322
1323
1324U32 LLCircuitData::getPacketsLost() const
1325{
1326 return mPacketsLost;
1327}
1328
1329
1330BOOL LLCircuitData::isAlive() const
1331{
1332 return mbAlive;
1333}
1334
1335
1336BOOL LLCircuitData::isBlocked() const
1337{
1338 return mBlocked;
1339}
1340
1341
1342BOOL LLCircuitData::getAllowTimeout() const
1343{
1344 return mbAllowTimeout;
1345}
1346
1347
1348U32 LLCircuitData::getPingDelay() const
1349{
1350 return mPingDelay;
1351}
1352
1353
1354F32 LLCircuitData::getPingInTransitTime()
1355{
1356 // This may be inaccurate in the case of a circuit that was "dead" and then revived,
1357 // but only until the first round trip ping is sent - djs
1358 F32 time_since_ping_was_sent = 0;
1359
1360 if (mPingsInTransit)
1361 {
1362 time_since_ping_was_sent = (F32)((mPingsInTransit*PING_INTERVAL - 1) + (LLMessageSystem::getMessageTimeSeconds() - mPingTime))*1000.f;
1363 }
1364
1365 return time_since_ping_was_sent;
1366}
1367
1368
1369void LLCircuitData::setPingDelay(U32 ping)
1370{
1371 mPingDelay = ping;
1372 mPingDelayAveraged = llmax((F32)ping, getPingDelayAveraged());
1373 mPingDelayAveraged = ((1.f - LL_AVERAGED_PING_ALPHA) * mPingDelayAveraged)
1374 + (LL_AVERAGED_PING_ALPHA * (F32) ping);
1375 mPingDelayAveraged = llclamp(mPingDelayAveraged,
1376 LL_AVERAGED_PING_MIN,
1377 LL_AVERAGED_PING_MAX);
1378}
1379
1380
1381F32 LLCircuitData::getPingDelayAveraged()
1382{
1383 return llmin(llmax(getPingInTransitTime(), mPingDelayAveraged), LL_AVERAGED_PING_MAX);
1384}
1385
1386
1387BOOL LLCircuitData::getTrusted() const
1388{
1389 return mTrusted;
1390}
1391
1392
1393void LLCircuitData::setTrusted(BOOL t)
1394{
1395 mTrusted = t;
1396}
1397
1398F32 LLCircuitData::getAgeInSeconds() const
1399{
1400 return mExistenceTimer.getElapsedTimeF32();
1401}