aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/linden/indra/llmessage/llxfermanager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'linden/indra/llmessage/llxfermanager.cpp')
-rw-r--r--linden/indra/llmessage/llxfermanager.cpp1152
1 files changed, 1152 insertions, 0 deletions
diff --git a/linden/indra/llmessage/llxfermanager.cpp b/linden/indra/llmessage/llxfermanager.cpp
new file mode 100644
index 0000000..d10c879
--- /dev/null
+++ b/linden/indra/llmessage/llxfermanager.cpp
@@ -0,0 +1,1152 @@
1/**
2 * @file llxfermanager.cpp
3 * @brief implementation of LLXferManager class for a collection of xfers
4 *
5 * Copyright (c) 2001-2007, Linden Research, Inc.
6 *
7 * The source code in this file ("Source Code") is provided by Linden Lab
8 * to you under the terms of the GNU General Public License, version 2.0
9 * ("GPL"), unless you have obtained a separate licensing agreement
10 * ("Other License"), formally executed by you and Linden Lab. Terms of
11 * the GPL can be found in doc/GPL-license.txt in this distribution, or
12 * online at http://secondlife.com/developers/opensource/gplv2
13 *
14 * There are special exceptions to the terms and conditions of the GPL as
15 * it is applied to this Source Code. View the full text of the exception
16 * in the file doc/FLOSS-exception.txt in this software distribution, or
17 * online at http://secondlife.com/developers/opensource/flossexception
18 *
19 * By copying, modifying or distributing this software, you acknowledge
20 * that you have read and understood your obligations described above,
21 * and agree to abide by those obligations.
22 *
23 * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO
24 * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY,
25 * COMPLETENESS OR PERFORMANCE.
26 */
27
28#include "linden_common.h"
29
30#include "llxfermanager.h"
31
32#include "llxfer.h"
33#include "llxfer_file.h"
34#include "llxfer_mem.h"
35#include "llxfer_vfile.h"
36
37#include "llerror.h"
38#include "lluuid.h"
39#include "u64.h"
40
41const F32 LL_XFER_REGISTRATION_TIMEOUT = 60.0f; // timeout if a registered transfer hasn't been requested in 60 seconds
42const F32 LL_PACKET_TIMEOUT = 3.0f; // packet timeout at 3 s
43const S32 LL_PACKET_RETRY_LIMIT = 10; // packet retransmission limit
44
45const S32 LL_DEFAULT_MAX_SIMULTANEOUS_XFERS = 10;
46const S32 LL_DEFAULT_MAX_REQUEST_FIFO_XFERS = 1000;
47
48#define LL_XFER_PROGRESS_MESSAGES 0
49#define LL_XFER_TEST_REXMIT 0
50
51
52///////////////////////////////////////////////////////////
53
54LLXferManager::LLXferManager (LLVFS *vfs)
55{
56 init(vfs);
57}
58
59///////////////////////////////////////////////////////////
60
61LLXferManager::~LLXferManager ()
62{
63 free();
64}
65
66///////////////////////////////////////////////////////////
67
68void LLXferManager::init (LLVFS *vfs)
69{
70 mSendList = NULL;
71 mReceiveList = NULL;
72
73 setMaxOutgoingXfersPerCircuit(LL_DEFAULT_MAX_SIMULTANEOUS_XFERS);
74 setMaxIncomingXfers(LL_DEFAULT_MAX_REQUEST_FIFO_XFERS);
75
76 mVFS = vfs;
77
78 // Turn on or off ack throttling
79 mUseAckThrottling = FALSE;
80 setAckThrottleBPS(100000);
81}
82
83///////////////////////////////////////////////////////////
84
85void LLXferManager::free ()
86{
87 LLXfer *xferp;
88 LLXfer *delp;
89
90 mOutgoingHosts.deleteAllData();
91
92 delp = mSendList;
93 while (delp)
94 {
95 xferp = delp->mNext;
96 delete delp;
97 delp = xferp;
98 }
99 mSendList = NULL;
100
101 delp = mReceiveList;
102 while (delp)
103 {
104 xferp = delp->mNext;
105 delete delp;
106 delp = xferp;
107 }
108 mReceiveList = NULL;
109}
110
111///////////////////////////////////////////////////////////
112
113void LLXferManager::setMaxIncomingXfers(S32 max_num)
114{
115 mMaxIncomingXfers = max_num;
116}
117
118///////////////////////////////////////////////////////////
119
120void LLXferManager::setMaxOutgoingXfersPerCircuit(S32 max_num)
121{
122 mMaxOutgoingXfersPerCircuit = max_num;
123}
124
125void LLXferManager::setUseAckThrottling(const BOOL use)
126{
127 mUseAckThrottling = use;
128}
129
130void LLXferManager::setAckThrottleBPS(const F32 bps)
131{
132 // Let's figure out the min we can set based on the ack retry rate
133 // and number of simultaneous.
134
135 // Assuming we're running as slow as possible, this is the lowest ack
136 // rate we can use.
137 F32 min_bps = (1000.f * 8.f* mMaxIncomingXfers) / LL_PACKET_TIMEOUT;
138
139 // Set
140 F32 actual_rate = llmax(min_bps*1.1f, bps);
141 llinfos << "LLXferManager ack throttle min rate: " << min_bps << llendl;
142 llinfos << "LLXferManager ack throttle actual rate: " << actual_rate << llendl;
143 mAckThrottle.setRate(actual_rate);
144}
145
146
147///////////////////////////////////////////////////////////
148
149void LLXferManager::updateHostStatus()
150{
151 LLXfer *xferp;
152 LLHostStatus *host_statusp = NULL;
153
154 mOutgoingHosts.deleteAllData();
155
156 for (xferp = mSendList; xferp; xferp = xferp->mNext)
157 {
158 for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
159 {
160 if (host_statusp->mHost == xferp->mRemoteHost)
161 {
162 break;
163 }
164 }
165 if (!host_statusp)
166 {
167 host_statusp = new LLHostStatus();
168 if (host_statusp)
169 {
170 host_statusp->mHost = xferp->mRemoteHost;
171 mOutgoingHosts.addData(host_statusp);
172 }
173 }
174 if (host_statusp)
175 {
176 if (xferp->mStatus == e_LL_XFER_PENDING)
177 {
178 host_statusp->mNumPending++;
179 }
180 else if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
181 {
182 host_statusp->mNumActive++;
183 }
184 }
185
186 }
187}
188
189///////////////////////////////////////////////////////////
190
191void LLXferManager::printHostStatus()
192{
193 LLHostStatus *host_statusp = NULL;
194 if (mOutgoingHosts.getFirstData())
195 {
196 llinfos << "Outgoing Xfers:" << llendl;
197
198 for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
199 {
200 llinfos << " " << host_statusp->mHost << " active: " << host_statusp->mNumActive << " pending: " << host_statusp->mNumPending << llendl;
201 }
202 }
203}
204
205///////////////////////////////////////////////////////////
206
207LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head)
208{
209 LLXfer *xferp;
210 for (xferp = list_head; xferp; xferp = xferp->mNext)
211 {
212 if (xferp->mID == id)
213 {
214 return(xferp);
215 }
216 }
217 return(NULL);
218}
219
220
221///////////////////////////////////////////////////////////
222
223void LLXferManager::removeXfer (LLXfer *delp, LLXfer **list_head)
224{
225 LLXfer *xferp;
226
227 if (delp)
228 {
229 if (*list_head == delp)
230 {
231 *list_head = delp->mNext;
232 delete (delp);
233 }
234 else
235 {
236 xferp = *list_head;
237 while (xferp->mNext)
238 {
239 if (xferp->mNext == delp)
240 {
241 xferp->mNext = delp->mNext;
242 delete (delp);
243 continue;
244 }
245 xferp = xferp->mNext;
246 }
247 }
248 }
249}
250
251///////////////////////////////////////////////////////////
252
253U32 LLXferManager::numActiveListEntries(LLXfer *list_head)
254{
255 U32 num_entries = 0;
256
257 while (list_head)
258 {
259 if ((list_head->mStatus == e_LL_XFER_IN_PROGRESS))
260 {
261 num_entries++;
262 }
263 list_head = list_head->mNext;
264 }
265 return(num_entries);
266}
267
268///////////////////////////////////////////////////////////
269
270S32 LLXferManager::numPendingXfers(const LLHost &host)
271{
272 LLHostStatus *host_statusp = NULL;
273
274 for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
275 {
276 if (host_statusp->mHost == host)
277 {
278 return (host_statusp->mNumPending);
279 }
280 }
281 return 0;
282}
283
284///////////////////////////////////////////////////////////
285
286S32 LLXferManager::numActiveXfers(const LLHost &host)
287{
288 LLHostStatus *host_statusp = NULL;
289
290 for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
291 {
292 if (host_statusp->mHost == host)
293 {
294 return (host_statusp->mNumActive);
295 }
296 }
297 return 0;
298}
299
300///////////////////////////////////////////////////////////
301
302void LLXferManager::changeNumActiveXfers(const LLHost &host, S32 delta)
303{
304 LLHostStatus *host_statusp = NULL;
305
306 for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData())
307 {
308 if (host_statusp->mHost == host)
309 {
310 host_statusp->mNumActive += delta;
311 }
312 }
313}
314
315///////////////////////////////////////////////////////////
316
317void LLXferManager::registerCallbacks(LLMessageSystem *msgsystem)
318{
319 msgsystem->setHandlerFuncFast(_PREHASH_ConfirmXferPacket, process_confirm_packet, NULL);
320 msgsystem->setHandlerFuncFast(_PREHASH_RequestXfer, process_request_xfer, NULL);
321 msgsystem->setHandlerFuncFast(_PREHASH_SendXferPacket, continue_file_receive, NULL);
322 msgsystem->setHandlerFuncFast(_PREHASH_AbortXfer, process_abort_xfer, NULL);
323}
324
325///////////////////////////////////////////////////////////
326
327U64 LLXferManager::getNextID ()
328{
329 LLUUID a_guid;
330
331 a_guid.generate();
332
333
334 return(*((U64*)(a_guid.mData)));
335}
336
337///////////////////////////////////////////////////////////
338
339S32 LLXferManager::encodePacketNum(S32 packet_num, BOOL is_EOF)
340{
341 if (is_EOF)
342 {
343 packet_num |= 0x80000000;
344 }
345 return packet_num;
346}
347
348///////////////////////////////////////////////////////////
349
350S32 LLXferManager::decodePacketNum(S32 packet_num)
351{
352 return(packet_num & 0x0FFFFFFF);
353}
354
355///////////////////////////////////////////////////////////
356
357BOOL LLXferManager::isLastPacket(S32 packet_num)
358{
359 return(packet_num & 0x80000000);
360}
361
362///////////////////////////////////////////////////////////
363
364U64 LLXferManager::registerXfer(const void *datap, const S32 length)
365{
366 LLXfer *xferp;
367 U64 xfer_id = getNextID();
368
369 xferp = (LLXfer *) new LLXfer_Mem();
370 if (xferp)
371 {
372 xferp->mNext = mSendList;
373 mSendList = xferp;
374
375 xfer_id = ((LLXfer_Mem *)xferp)->registerXfer(xfer_id, datap,length);
376
377 if (!xfer_id)
378 {
379 removeXfer(xferp,&mSendList);
380 }
381 }
382 else
383 {
384 llerrs << "Xfer allocation error" << llendl;
385 xfer_id = 0;
386 }
387
388 return(xfer_id);
389}
390
391///////////////////////////////////////////////////////////
392
393void LLXferManager::requestFile(const char* local_filename,
394 const char* remote_filename,
395 ELLPath remote_path,
396 const LLHost& remote_host,
397 BOOL delete_remote_on_completion,
398 void (*callback)(void**,S32),
399 void** user_data,
400 BOOL is_priority,
401 BOOL use_big_packets)
402{
403 LLXfer *xferp;
404
405 for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
406 {
407 if (xferp->getXferTypeTag() == LLXfer::XFER_FILE
408 && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename))
409 && (((LLXfer_File*)xferp)->matchesRemoteFilename(remote_filename, remote_path))
410 && (remote_host == xferp->mRemoteHost)
411 && (callback == xferp->mCallback)
412 && (user_data == xferp->mCallbackDataHandle))
413
414 {
415 // cout << "requested a xfer already in progress" << endl;
416 return;
417 }
418 }
419
420 S32 chunk_size = use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1;
421 xferp = (LLXfer *) new LLXfer_File(chunk_size);
422 if (xferp)
423 {
424 addToList(xferp, mReceiveList, is_priority);
425
426 // Remove any file by the same name that happens to be lying
427 // around.
428 // Note: according to AaronB, this is here to deal with locks on files that were
429 // in transit during a crash,
430 if(delete_remote_on_completion &&
431 (strstr(remote_filename,".tmp") == &remote_filename[strlen(remote_filename)-4])) /* Flawfinder : ignore */
432 {
433 LLFile::remove(local_filename);
434 }
435 ((LLXfer_File *)xferp)->initializeRequest(
436 getNextID(),
437 local_filename,
438 remote_filename,
439 remote_path,
440 remote_host,
441 delete_remote_on_completion,
442 callback,user_data);
443 startPendingDownloads();
444 }
445 else
446 {
447 llerrs << "Xfer allocation error" << llendl;
448 }
449}
450
451void LLXferManager::requestFile(const char* remote_filename,
452 ELLPath remote_path,
453 const LLHost& remote_host,
454 BOOL delete_remote_on_completion,
455 void (*callback)(void*,S32,void**,S32),
456 void** user_data,
457 BOOL is_priority)
458{
459 LLXfer *xferp;
460
461 xferp = (LLXfer *) new LLXfer_Mem();
462 if (xferp)
463 {
464 addToList(xferp, mReceiveList, is_priority);
465 ((LLXfer_Mem *)xferp)->initializeRequest(getNextID(),
466 remote_filename,
467 remote_path,
468 remote_host,
469 delete_remote_on_completion,
470 callback, user_data);
471 startPendingDownloads();
472 }
473 else
474 {
475 llerrs << "Xfer allocation error" << llendl;
476 }
477}
478
479void LLXferManager::requestVFile(const LLUUID& local_id,
480 const LLUUID& remote_id,
481 LLAssetType::EType type, LLVFS* vfs,
482 const LLHost& remote_host,
483 void (*callback)(void**, S32),
484 void** user_data,
485 BOOL is_priority)
486{
487 LLXfer *xferp;
488
489 for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
490 {
491 if (xferp->getXferTypeTag() == LLXfer::XFER_VFILE
492 && (((LLXfer_VFile*)xferp)->matchesLocalFile(local_id, type))
493 && (((LLXfer_VFile*)xferp)->matchesRemoteFile(remote_id, type))
494 && (remote_host == xferp->mRemoteHost)
495 && (callback == xferp->mCallback)
496 && (user_data == xferp->mCallbackDataHandle))
497
498 {
499 // cout << "requested a xfer already in progress" << endl;
500 return;
501 }
502 }
503
504 xferp = (LLXfer *) new LLXfer_VFile();
505 if (xferp)
506 {
507 addToList(xferp, mReceiveList, is_priority);
508 ((LLXfer_VFile *)xferp)->initializeRequest(getNextID(),
509 vfs,
510 local_id,
511 remote_id,
512 type,
513 remote_host,
514 callback,
515 user_data);
516 startPendingDownloads();
517 }
518 else
519 {
520 llerrs << "Xfer allocation error" << llendl;
521 }
522
523}
524
525/*
526void LLXferManager::requestXfer(
527 const char *local_filename,
528 BOOL delete_remote_on_completion,
529 U64 xfer_id,
530 const LLHost &remote_host,
531 void (*callback)(void **,S32),
532 void **user_data)
533{
534 LLXfer *xferp;
535
536 for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
537 {
538 if (xferp->getXferTypeTag() == LLXfer::XFER_FILE
539 && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename))
540 && (xfer_id == xferp->mID)
541 && (remote_host == xferp->mRemoteHost)
542 && (callback == xferp->mCallback)
543 && (user_data == xferp->mCallbackDataHandle))
544
545 {
546 // cout << "requested a xfer already in progress" << endl;
547 return;
548 }
549 }
550
551 xferp = (LLXfer *) new LLXfer_File();
552 if (xferp)
553 {
554 xferp->mNext = mReceiveList;
555 mReceiveList = xferp;
556
557 ((LLXfer_File *)xferp)->initializeRequest(xfer_id,local_filename,"",LL_PATH_NONE,remote_host,delete_remote_on_completion,callback,user_data);
558 startPendingDownloads();
559 }
560 else
561 {
562 llerrs << "Xfer allcoation error" << llendl;
563 }
564}
565
566void LLXferManager::requestXfer(U64 xfer_id, const LLHost &remote_host, BOOL delete_remote_on_completion, void (*callback)(void *,S32,void **,S32),void **user_data)
567{
568 LLXfer *xferp;
569
570 xferp = (LLXfer *) new LLXfer_Mem();
571 if (xferp)
572 {
573 xferp->mNext = mReceiveList;
574 mReceiveList = xferp;
575
576 ((LLXfer_Mem *)xferp)->initializeRequest(xfer_id,"",LL_PATH_NONE,remote_host,delete_remote_on_completion,callback,user_data);
577 startPendingDownloads();
578 }
579 else
580 {
581 llerrs << "Xfer allcoation error" << llendl;
582 }
583}
584*/
585///////////////////////////////////////////////////////////
586
587void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user_data*/)
588{
589 // there's sometimes an extra 4 bytes added to an xfer payload
590 const S32 BUF_SIZE = LL_XFER_LARGE_PAYLOAD + 4;
591 char fdata_buf[LL_XFER_LARGE_PAYLOAD + 4]; /* Flawfinder : ignore */
592 S32 fdata_size;
593 U64 id;
594 S32 packetnum;
595 LLXfer * xferp;
596
597 mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
598 mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetnum);
599
600 fdata_size = mesgsys->getSizeFast(_PREHASH_DataPacket,_PREHASH_Data);
601 mesgsys->getBinaryDataFast(_PREHASH_DataPacket, _PREHASH_Data, fdata_buf, 0, 0, BUF_SIZE);
602
603 xferp = findXfer(id, mReceiveList);
604
605 if (!xferp)
606 {
607 char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */
608 llwarns << "received xfer data from " << mesgsys->getSender()
609 << " for non-existent xfer id: "
610 << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << llendl;
611 return;
612 }
613
614 S32 xfer_size;
615
616 if (decodePacketNum(packetnum) != xferp->mPacketNum) // is the packet different from what we were expecting?
617 {
618 // confirm it if it was a resend of the last one, since the confirmation might have gotten dropped
619 if (decodePacketNum(packetnum) == (xferp->mPacketNum - 1))
620 {
621 llinfos << "Reconfirming xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet " << packetnum << llendl; sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender());
622 }
623 else
624 {
625 llinfos << "Ignoring xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " recv'd packet " << packetnum << "; expecting " << xferp->mPacketNum << llendl;
626 }
627 return;
628 }
629
630 S32 result = 0;
631
632 if (xferp->mPacketNum == 0) // first packet has size encoded as additional S32 at beginning of data
633 {
634 ntohmemcpy(&xfer_size,fdata_buf,MVT_S32,sizeof(S32));
635
636// do any necessary things on first packet ie. allocate memory
637 xferp->setXferSize(xfer_size);
638
639 // adjust buffer start and size
640 result = xferp->receiveData(&(fdata_buf[sizeof(S32)]),fdata_size-(sizeof(S32)));
641 }
642 else
643 {
644 result = xferp->receiveData(fdata_buf,fdata_size);
645 }
646
647 if (result == LL_ERR_CANNOT_OPEN_FILE)
648 {
649 xferp->abort(LL_ERR_CANNOT_OPEN_FILE);
650 removeXfer(xferp,&mReceiveList);
651 startPendingDownloads();
652 return;
653 }
654
655 xferp->mPacketNum++; // expect next packet
656
657 if (!mUseAckThrottling)
658 {
659 // No throttling, confirm right away
660 sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender());
661 }
662 else
663 {
664 // Throttling, put on queue to be confirmed later.
665 LLXferAckInfo ack_info;
666 ack_info.mID = id;
667 ack_info.mPacketNum = decodePacketNum(packetnum);
668 ack_info.mRemoteHost = mesgsys->getSender();
669 mXferAckQueue.push(ack_info);
670 }
671
672 if (isLastPacket(packetnum))
673 {
674 xferp->processEOF();
675 removeXfer(xferp,&mReceiveList);
676 startPendingDownloads();
677 }
678}
679
680///////////////////////////////////////////////////////////
681
682void LLXferManager::sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 packetnum, const LLHost &remote_host)
683{
684#if LL_XFER_PROGRESS_MESSAGES
685 if (!(packetnum % 50))
686 {
687 cout << "confirming xfer packet #" << packetnum << endl;
688 }
689#endif
690 mesgsys->newMessageFast(_PREHASH_ConfirmXferPacket);
691 mesgsys->nextBlockFast(_PREHASH_XferID);
692 mesgsys->addU64Fast(_PREHASH_ID, id);
693 mesgsys->addU32Fast(_PREHASH_Packet, packetnum);
694
695 mesgsys->sendMessage(remote_host);
696}
697
698///////////////////////////////////////////////////////////
699
700void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user_data*/)
701{
702
703 U64 id;
704 char local_filename[MAX_STRING]; /* Flawfinder : ignore */
705 ELLPath local_path = LL_PATH_NONE;
706 S32 result = LL_ERR_NOERR;
707 LLUUID uuid;
708 LLAssetType::EType type;
709 S16 type_s16;
710 BOOL b_use_big_packets;
711
712 mesgsys->getBOOL("XferID", "UseBigPackets", b_use_big_packets);
713
714 mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
715 char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */
716 llinfos << "xfer request id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF))
717 << " to " << mesgsys->getSender() << llendl;
718
719 mesgsys->getStringFast(_PREHASH_XferID, _PREHASH_Filename, MAX_STRING, local_filename);
720
721 U8 local_path_u8;
722 mesgsys->getU8("XferID", "FilePath", local_path_u8);
723 if( local_path_u8 < (U8)LL_PATH_COUNT )
724 {
725 local_path = (ELLPath)local_path_u8;
726 }
727 else
728 {
729 llwarns << "Invalid file path in LLXferManager::processFileRequest() " << (U32)local_path_u8 << llendl;
730 }
731
732 mesgsys->getUUIDFast(_PREHASH_XferID, _PREHASH_VFileID, uuid);
733 mesgsys->getS16Fast(_PREHASH_XferID, _PREHASH_VFileType, type_s16);
734 type = (LLAssetType::EType)type_s16;
735
736 LLXfer *xferp;
737
738 if (uuid != LLUUID::null)
739 {
740 if(NULL == LLAssetType::lookup(type))
741 {
742 llwarns << "Invalid type for xfer request: " << uuid << ":"
743 << type_s16 << " to " << mesgsys->getSender() << llendl;
744 return;
745 }
746
747 llinfos << "starting vfile transfer: " << uuid << "," << LLAssetType::lookup(type) << " to " << mesgsys->getSender() << llendl;
748
749 if (! mVFS)
750 {
751 llwarns << "Attempt to send VFile w/o available VFS" << llendl;
752 return;
753 }
754
755 xferp = (LLXfer *)new LLXfer_VFile(mVFS, uuid, type);
756 if (xferp)
757 {
758 xferp->mNext = mSendList;
759 mSendList = xferp;
760 result = xferp->startSend(id,mesgsys->getSender());
761 }
762 else
763 {
764 llerrs << "Xfer allcoation error" << llendl;
765 }
766 }
767 else if (strlen(local_filename)) /* Flawfinder : ignore */
768 {
769 std::string expanded_filename = gDirUtilp->getExpandedFilename( local_path, local_filename );
770 llinfos << "starting file transfer: " << expanded_filename << " to " << mesgsys->getSender() << llendl;
771
772 BOOL delete_local_on_completion = FALSE;
773 mesgsys->getBOOL("XferID", "DeleteOnCompletion", delete_local_on_completion);
774
775 // -1 chunk_size causes it to use the default
776 xferp = (LLXfer *)new LLXfer_File(expanded_filename, delete_local_on_completion, b_use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1);
777
778 if (xferp)
779 {
780 xferp->mNext = mSendList;
781 mSendList = xferp;
782 result = xferp->startSend(id,mesgsys->getSender());
783 }
784 else
785 {
786 llerrs << "Xfer allcoation error" << llendl;
787 }
788 }
789 else
790 {
791 char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */
792 llinfos << "starting memory transfer: "
793 << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << " to "
794 << mesgsys->getSender() << llendl;
795
796 xferp = findXfer(id, mSendList);
797
798 if (xferp)
799 {
800 result = xferp->startSend(id,mesgsys->getSender());
801 }
802 else
803 {
804 llinfos << "Warning: " << U64_BUF << " not found." << llendl;
805 result = LL_ERR_FILE_NOT_FOUND;
806 }
807 }
808
809 if (result)
810 {
811 if (xferp)
812 {
813 xferp->abort(result);
814 removeXfer(xferp,&mSendList);
815 }
816 else // can happen with a memory transfer not found
817 {
818 llinfos << "Aborting xfer to " << mesgsys->getSender() << " with error: " << result << llendl;
819
820 mesgsys->newMessageFast(_PREHASH_AbortXfer);
821 mesgsys->nextBlockFast(_PREHASH_XferID);
822 mesgsys->addU64Fast(_PREHASH_ID, id);
823 mesgsys->addS32Fast(_PREHASH_Result, result);
824
825 mesgsys->sendMessage(mesgsys->getSender());
826 }
827 }
828 else if(xferp && (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit))
829 {
830 xferp->sendNextPacket();
831 changeNumActiveXfers(xferp->mRemoteHost,1);
832// llinfos << "***STARTING XFER IMMEDIATELY***" << llendl;
833 }
834 else
835 {
836 if(xferp)
837 {
838 llinfos << " queueing xfer request, " << numPendingXfers(xferp->mRemoteHost) << " ahead of this one" << llendl;
839 }
840 else
841 {
842 llwarns << "LLXferManager::processFileRequest() - no xfer found!"
843 << llendl;
844 }
845 }
846}
847
848///////////////////////////////////////////////////////////
849
850void LLXferManager::processConfirmation (LLMessageSystem *mesgsys, void ** /*user_data*/)
851{
852 U64 id = 0;
853 S32 packetNum = 0;
854
855 mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
856 mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetNum);
857
858 LLXfer* xferp = findXfer(id, mSendList);
859 if (xferp)
860 {
861// cout << "confirmed packet #" << packetNum << " ping: "<< xferp->ACKTimer.getElapsedTimeF32() << endl;
862 xferp->mWaitingForACK = FALSE;
863 if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
864 {
865 xferp->sendNextPacket();
866 }
867 else
868 {
869 removeXfer(xferp, &mSendList);
870 }
871 }
872}
873
874///////////////////////////////////////////////////////////
875
876void LLXferManager::retransmitUnackedPackets ()
877{
878 LLXfer *xferp;
879 LLXfer *delp;
880 xferp = mReceiveList;
881 while(xferp)
882 {
883 if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
884 {
885 // if the circuit dies, abort
886 if (! gMessageSystem->mCircuitInfo.isCircuitAlive( xferp->mRemoteHost ))
887 {
888 llinfos << "Xfer found in progress on dead circuit, aborting" << llendl;
889 xferp->mCallbackResult = LL_ERR_CIRCUIT_GONE;
890 xferp->processEOF();
891 delp = xferp;
892 xferp = xferp->mNext;
893 removeXfer(delp,&mReceiveList);
894 continue;
895 }
896
897 }
898 xferp = xferp->mNext;
899 }
900
901 xferp = mSendList;
902 updateHostStatus();
903 F32 et;
904 while (xferp)
905 {
906 if (xferp->mWaitingForACK && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_PACKET_TIMEOUT))
907 {
908 if (xferp->mRetries > LL_PACKET_RETRY_LIMIT)
909 {
910 llinfos << "dropping xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet retransmit limit exceeded, xfer dropped" << llendl;
911 xferp->abort(LL_ERR_TCP_TIMEOUT);
912 delp = xferp;
913 xferp = xferp->mNext;
914 removeXfer(delp,&mSendList);
915 }
916 else
917 {
918 llinfos << "resending xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet unconfirmed after: "<< et << " sec, packet " << xferp->mPacketNum << llendl;
919 xferp->resendLastPacket();
920 xferp = xferp->mNext;
921 }
922 }
923 else if ((xferp->mStatus == e_LL_XFER_REGISTERED) && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_XFER_REGISTRATION_TIMEOUT))
924 {
925 llinfos << "registered xfer never requested, xfer dropped" << llendl;
926 xferp->abort(LL_ERR_TCP_TIMEOUT);
927 delp = xferp;
928 xferp = xferp->mNext;
929 removeXfer(delp,&mSendList);
930 }
931 else if (xferp->mStatus == e_LL_XFER_ABORTED)
932 {
933 llwarns << "Removing aborted xfer " << xferp->mRemoteHost << ":" << xferp->getName() << llendl;
934 delp = xferp;
935 xferp = xferp->mNext;
936 removeXfer(delp,&mSendList);
937 }
938 else if (xferp->mStatus == e_LL_XFER_PENDING)
939 {
940// llinfos << "*** numActiveXfers = " << numActiveXfers(xferp->mRemoteHost) << " mMaxOutgoingXfersPerCircuit = " << mMaxOutgoingXfersPerCircuit << llendl;
941 if (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit)
942 {
943// llinfos << "bumping pending xfer to active" << llendl;
944 xferp->sendNextPacket();
945 changeNumActiveXfers(xferp->mRemoteHost,1);
946 }
947 xferp = xferp->mNext;
948 }
949 else
950 {
951 xferp = xferp->mNext;
952 }
953 }
954
955 //
956 // HACK - if we're using xfer confirm throttling, throttle our xfer confirms here
957 // so we don't blow through bandwidth.
958 //
959
960 while (mXferAckQueue.getLength())
961 {
962 if (mAckThrottle.checkOverflow(1000.0f*8.0f))
963 {
964 break;
965 }
966 //llinfos << "Confirm packet queue length:" << mXferAckQueue.getLength() << llendl;
967 LLXferAckInfo ack_info;
968 mXferAckQueue.pop(ack_info);
969 //llinfos << "Sending confirm packet" << llendl;
970 sendConfirmPacket(gMessageSystem, ack_info.mID, ack_info.mPacketNum, ack_info.mRemoteHost);
971 mAckThrottle.throttleOverflow(1000.f*8.f); // Assume 1000 bytes/packet
972 }
973}
974
975
976///////////////////////////////////////////////////////////
977
978void LLXferManager::processAbort (LLMessageSystem *mesgsys, void ** /*user_data*/)
979{
980 U64 id = 0;
981 S32 result_code = 0;
982 LLXfer * xferp;
983
984 mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
985 mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Result, result_code);
986
987 xferp = findXfer(id, mReceiveList);
988 if (xferp)
989 {
990 xferp->mCallbackResult = result_code;
991 xferp->processEOF();
992 removeXfer(xferp, &mReceiveList);
993 startPendingDownloads();
994 }
995}
996
997///////////////////////////////////////////////////////////
998
999void LLXferManager::startPendingDownloads()
1000{
1001 // This method goes through the list, and starts pending
1002 // operations until active downloads == mMaxIncomingXfers. I copy
1003 // the pending xfers into a temporary data structure because the
1004 // xfers are stored as an intrusive linked list where older
1005 // requests get pushed toward the back. Thus, if we didn't do a
1006 // stateful iteration, it would be possible for old requests to
1007 // never start.
1008 LLXfer* xferp = mReceiveList;
1009 LLLinkedList<LLXfer> pending_downloads;
1010 S32 download_count = 0;
1011 S32 pending_count = 0;
1012 while(xferp)
1013 {
1014 if(xferp->mStatus == e_LL_XFER_PENDING)
1015 {
1016 ++pending_count; // getLength() is O(N), so track it here.
1017 pending_downloads.addData(xferp);
1018 }
1019 else if(xferp->mStatus == e_LL_XFER_IN_PROGRESS)
1020 {
1021 ++download_count;
1022 }
1023 xferp = xferp->mNext;
1024 }
1025
1026 S32 start_count = mMaxIncomingXfers - download_count;
1027
1028 lldebugs << "LLXferManager::startPendingDownloads() - XFER_IN_PROGRESS: "
1029 << download_count << " XFER_PENDING: " << pending_count
1030 << " startring " << llmin(start_count, pending_count) << llendl;
1031
1032 if((start_count > 0) && (pending_count > 0))
1033 {
1034 S32 result;
1035 xferp = pending_downloads.getFirstData();
1036 while(start_count-- && xferp)
1037 {
1038 result = xferp->startDownload();
1039 if(result)
1040 {
1041 xferp->abort(result);
1042 ++start_count;
1043 }
1044 xferp = pending_downloads.getNextData();
1045 }
1046 }
1047}
1048
1049///////////////////////////////////////////////////////////
1050
1051void LLXferManager::addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority)
1052{
1053 if(is_priority)
1054 {
1055 xferp->mNext = NULL;
1056 LLXfer* next = head;
1057 if(next)
1058 {
1059 while(next->mNext)
1060 {
1061 next = next->mNext;
1062 }
1063 next->mNext = xferp;
1064 }
1065 else
1066 {
1067 head = xferp;
1068 }
1069 }
1070 else
1071 {
1072 xferp->mNext = head;
1073 head = xferp;
1074 }
1075}
1076
1077///////////////////////////////////////////////////////////
1078// Globals and C routines
1079///////////////////////////////////////////////////////////
1080
1081LLXferManager *gXferManager = NULL;
1082
1083
1084void start_xfer_manager(LLVFS *vfs)
1085{
1086 gXferManager = new LLXferManager(vfs);
1087}
1088
1089void cleanup_xfer_manager()
1090{
1091 if (gXferManager)
1092 {
1093 delete(gXferManager);
1094 gXferManager = NULL;
1095 }
1096}
1097
1098void process_confirm_packet (LLMessageSystem *mesgsys, void **user_data)
1099{
1100 gXferManager->processConfirmation(mesgsys,user_data);
1101}
1102
1103void process_request_xfer(LLMessageSystem *mesgsys, void **user_data)
1104{
1105 gXferManager->processFileRequest(mesgsys,user_data);
1106}
1107
1108void continue_file_receive(LLMessageSystem *mesgsys, void **user_data)
1109{
1110#if LL_TEST_XFER_REXMIT
1111 if (frand(1.f) > 0.05f)
1112 {
1113#endif
1114 gXferManager->processReceiveData(mesgsys,user_data);
1115#if LL_TEST_XFER_REXMIT
1116 }
1117 else
1118 {
1119 cout << "oops! dropped a xfer packet" << endl;
1120 }
1121#endif
1122}
1123
1124void process_abort_xfer(LLMessageSystem *mesgsys, void **user_data)
1125{
1126 gXferManager->processAbort(mesgsys,user_data);
1127}
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152