aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/linden/indra/llmessage/lltransfermanager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'linden/indra/llmessage/lltransfermanager.cpp')
-rw-r--r--linden/indra/llmessage/lltransfermanager.cpp1384
1 files changed, 1384 insertions, 0 deletions
diff --git a/linden/indra/llmessage/lltransfermanager.cpp b/linden/indra/llmessage/lltransfermanager.cpp
new file mode 100644
index 0000000..bfeedd0
--- /dev/null
+++ b/linden/indra/llmessage/lltransfermanager.cpp
@@ -0,0 +1,1384 @@
1/**
2 * @file lltransfermanager.cpp
3 * @brief Improved transfer mechanism for moving data through the
4 * message system.
5 *
6 * Copyright (c) 2004-2007, Linden Research, Inc.
7 *
8 * The source code in this file ("Source Code") is provided by Linden Lab
9 * to you under the terms of the GNU General Public License, version 2.0
10 * ("GPL"), unless you have obtained a separate licensing agreement
11 * ("Other License"), formally executed by you and Linden Lab. Terms of
12 * the GPL can be found in doc/GPL-license.txt in this distribution, or
13 * online at http://secondlife.com/developers/opensource/gplv2
14 *
15 * There are special exceptions to the terms and conditions of the GPL as
16 * it is applied to this Source Code. View the full text of the exception
17 * in the file doc/FLOSS-exception.txt in this software distribution, or
18 * online at http://secondlife.com/developers/opensource/flossexception
19 *
20 * By copying, modifying or distributing this software, you acknowledge
21 * that you have read and understood your obligations described above,
22 * and agree to abide by those obligations.
23 *
24 * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO
25 * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY,
26 * COMPLETENESS OR PERFORMANCE.
27 */
28
29#include "linden_common.h"
30
31#include "lltransfermanager.h"
32
33#include "llerror.h"
34#include "message.h"
35#include "lldatapacker.h"
36
37#include "lltransfersourcefile.h"
38#include "lltransfersourceasset.h"
39#include "lltransfertargetfile.h"
40#include "lltransfertargetvfile.h"
41
42const S32 MAX_PACKET_DATA_SIZE = 2048;
43const S32 MAX_PARAMS_SIZE = 1024;
44
45LLTransferManager gTransferManager;
46LLTransferSource::stype_scfunc_map LLTransferSource::sSourceCreateMap;
47
48//
49// LLTransferManager implementation
50//
51
52LLTransferManager::LLTransferManager() :
53 mValid(FALSE)
54{
55 S32 i;
56 for (i = 0; i < LLTTT_NUM_TYPES; i++)
57 {
58 mTransferBitsIn[i] = 0;
59 mTransferBitsOut[i] = 0;
60 }
61}
62
63
64LLTransferManager::~LLTransferManager()
65{
66 if (mValid)
67 {
68 llwarns << "LLTransferManager::~LLTransferManager - Should have been cleaned up by message system shutdown process" << llendl;
69 cleanup();
70 }
71}
72
73
74void LLTransferManager::init()
75{
76 if (mValid)
77 {
78 llerrs << "Double initializing LLTransferManager!" << llendl;
79 }
80 mValid = TRUE;
81
82 // Register message system handlers
83 gMessageSystem->setHandlerFunc("TransferRequest", processTransferRequest, NULL);
84 gMessageSystem->setHandlerFunc("TransferInfo", processTransferInfo, NULL);
85 gMessageSystem->setHandlerFunc("TransferPacket", processTransferPacket, NULL);
86 gMessageSystem->setHandlerFunc("TransferAbort", processTransferAbort, NULL);
87}
88
89
90void LLTransferManager::cleanup()
91{
92 mValid = FALSE;
93
94 host_tc_map::iterator iter;
95 for (iter = mTransferConnections.begin(); iter != mTransferConnections.end(); iter++)
96 {
97 delete iter->second;
98 }
99 mTransferConnections.clear();
100}
101
102
103void LLTransferManager::updateTransfers()
104{
105 host_tc_map::iterator iter;
106 for (iter = mTransferConnections.begin(); iter != mTransferConnections.end(); iter++)
107 {
108 iter->second->updateTransfers();
109 }
110}
111
112
113void LLTransferManager::cleanupConnection(const LLHost &host)
114{
115 host_tc_map::iterator iter;
116 iter = mTransferConnections.find(host);
117 if (iter == mTransferConnections.end())
118 {
119 // This can happen legitimately if we've never done a transfer, and we're
120 // cleaning up a circuit.
121 //llwarns << "Cleaning up nonexistent transfer connection to " << host << llendl;
122 return;
123 }
124 LLTransferConnection *connp = iter->second;
125 delete connp;
126 mTransferConnections.erase(iter);
127}
128
129
130LLTransferConnection *LLTransferManager::getTransferConnection(const LLHost &host)
131{
132 host_tc_map::iterator iter;
133 iter = mTransferConnections.find(host);
134 if (iter == mTransferConnections.end())
135 {
136 mTransferConnections[host] = new LLTransferConnection(host);
137 return mTransferConnections[host];
138 }
139
140 return iter->second;
141}
142
143
144LLTransferSourceChannel *LLTransferManager::getSourceChannel(const LLHost &host, const LLTransferChannelType type)
145{
146 LLTransferConnection *tcp = getTransferConnection(host);
147 if (!tcp)
148 {
149 return NULL;
150 }
151 return tcp->getSourceChannel(type);
152}
153
154
155
156LLTransferTargetChannel *LLTransferManager::getTargetChannel(const LLHost &host, const LLTransferChannelType type)
157{
158 LLTransferConnection *tcp = getTransferConnection(host);
159 if (!tcp)
160 {
161 return NULL;
162 }
163 return tcp->getTargetChannel(type);
164}
165
166// virtual
167LLTransferSourceParams::~LLTransferSourceParams()
168{ }
169
170
171LLTransferSource *LLTransferManager::findTransferSource(const LLUUID &transfer_id)
172{
173 // This linear traversal could screw us later if we do lots of
174 // searches for sources. However, this ONLY happens right now
175 // in asset transfer callbacks, so this should be relatively quick.
176 host_tc_map::iterator iter;
177 for (iter = mTransferConnections.begin(); iter != mTransferConnections.end(); iter++)
178 {
179 LLTransferConnection *tcp = iter->second;
180 LLTransferConnection::tsc_iter sc_iter;
181 for (sc_iter = tcp->mTransferSourceChannels.begin(); sc_iter != tcp->mTransferSourceChannels.end(); sc_iter++)
182 {
183 LLTransferSourceChannel *scp = *sc_iter;
184 LLTransferSource *sourcep = scp->findTransferSource(transfer_id);
185 if (sourcep)
186 {
187 return sourcep;
188 }
189 }
190 }
191
192 return NULL;
193}
194
195//
196// Message handlers
197//
198
199//static
200void LLTransferManager::processTransferRequest(LLMessageSystem *msgp, void **)
201{
202 //llinfos << "LLTransferManager::processTransferRequest" << llendl;
203
204 LLUUID transfer_id;
205 LLTransferSourceType source_type;
206 LLTransferChannelType channel_type;
207 F32 priority;
208
209 msgp->getUUID("TransferInfo", "TransferID", transfer_id);
210 msgp->getS32("TransferInfo", "SourceType", (S32 &)source_type);
211 msgp->getS32("TransferInfo", "ChannelType", (S32 &)channel_type);
212 msgp->getF32("TransferInfo", "Priority", priority);
213
214 LLTransferSourceChannel *tscp = gTransferManager.getSourceChannel(msgp->getSender(), channel_type);
215
216 if (!tscp)
217 {
218 llwarns << "Source channel not found" << llendl;
219 return;
220 }
221
222 if (tscp->findTransferSource(transfer_id))
223 {
224 llwarns << "Duplicate request for transfer " << transfer_id << ", aborting!" << llendl;
225 return;
226 }
227
228 S32 size = msgp->getSize("TransferInfo", "Params");
229 if(size > MAX_PARAMS_SIZE)
230 {
231 llwarns << "LLTransferManager::processTransferRequest params too big."
232 << llendl;
233 return;
234 }
235
236 //llinfos << transfer_id << ":" << source_type << ":" << channel_type << ":" << priority << llendl;
237 LLTransferSource* tsp = LLTransferSource::createSource(
238 source_type,
239 transfer_id,
240 priority);
241 if(!tsp)
242 {
243 llwarns << "LLTransferManager::processTransferRequest couldn't create"
244 << " transfer source!" << llendl;
245 return;
246 }
247 U8 tmp[MAX_PARAMS_SIZE];
248 msgp->getBinaryData("TransferInfo", "Params", tmp, size);
249
250 LLDataPackerBinaryBuffer dpb(tmp, MAX_PARAMS_SIZE);
251 BOOL unpack_ok = tsp->unpackParams(dpb);
252 if (!unpack_ok)
253 {
254 // This should only happen if the data is corrupt or
255 // incorrectly packed.
256 // *NOTE: We may want to call abortTransfer().
257 llwarns << "LLTransferManager::processTransferRequest: bad parameters."
258 << llendl;
259 delete tsp;
260 return;
261 }
262
263 tscp->addTransferSource(tsp);
264 tsp->initTransfer();
265}
266
267
268//static
269void LLTransferManager::processTransferInfo(LLMessageSystem *msgp, void **)
270{
271 //llinfos << "LLTransferManager::processTransferInfo" << llendl;
272
273 LLUUID transfer_id;
274 LLTransferTargetType target_type;
275 LLTransferChannelType channel_type;
276 LLTSCode status;
277 S32 size;
278
279 msgp->getUUID("TransferInfo", "TransferID", transfer_id);
280 msgp->getS32("TransferInfo", "TargetType", (S32 &)target_type);
281 msgp->getS32("TransferInfo", "ChannelType", (S32 &)channel_type);
282 msgp->getS32("TransferInfo", "Status", (S32 &)status);
283 msgp->getS32("TransferInfo", "Size", size);
284
285 //llinfos << transfer_id << ":" << target_type<< ":" << channel_type << llendl;
286 LLTransferTargetChannel *ttcp = gTransferManager.getTargetChannel(msgp->getSender(), channel_type);
287 if (!ttcp)
288 {
289 llwarns << "Target channel not found" << llendl;
290 // Should send a message to abort the transfer.
291 return;
292 }
293
294 LLTransferTarget *ttp = ttcp->findTransferTarget(transfer_id);
295 if (!ttp)
296 {
297 llwarns << "TransferInfo for unknown transfer! Not able to handle this yet!" << llendl;
298 // This could happen if we're doing a push transfer, although to avoid confusion,
299 // maybe it should be a different message.
300 return;
301 }
302
303 if (status != LLTS_OK)
304 {
305 llwarns << transfer_id << ": Non-ok status, cleaning up" << llendl;
306 ttp->completionCallback(status);
307 // Clean up the transfer.
308 ttcp->deleteTransfer(ttp);
309 return;
310 }
311
312 // unpack the params
313 S32 params_size = msgp->getSize("TransferInfo", "Params");
314 if(params_size > MAX_PARAMS_SIZE)
315 {
316 llwarns << "LLTransferManager::processTransferInfo params too big."
317 << llendl;
318 return;
319 }
320 else if(params_size > 0)
321 {
322 U8 tmp[MAX_PARAMS_SIZE];
323 msgp->getBinaryData("TransferInfo", "Params", tmp, params_size);
324 LLDataPackerBinaryBuffer dpb(tmp, MAX_PARAMS_SIZE);
325 if (!ttp->unpackParams(dpb))
326 {
327 // This should only happen if the data is corrupt or
328 // incorrectly packed.
329 llwarns << "LLTransferManager::processTransferRequest: bad params."
330 << llendl;
331 ttp->abortTransfer();
332 ttcp->deleteTransfer(ttp);
333 return;
334 }
335 }
336
337 llinfos << "Receiving " << transfer_id << ", size " << size << " bytes" << llendl;
338 ttp->setSize(size);
339 ttp->setGotInfo(TRUE);
340
341 // OK, at this point we to handle any delayed transfer packets (which could happen
342 // if this packet was lost)
343
344 // This is a lame cut and paste of code down below. If we change the logic down there,
345 // we HAVE to change the logic up here.
346
347 while (1)
348 {
349 S32 packet_id = 0;
350 U8 tmp_data[MAX_PACKET_DATA_SIZE];
351 // See if we've got any delayed packets
352 packet_id = ttp->getNextPacketID();
353 if (ttp->mDelayedPacketMap.find(packet_id) != ttp->mDelayedPacketMap.end())
354 {
355 // Perhaps this stuff should be inside a method in LLTransferPacket?
356 // I'm too lazy to do it now, though.
357 llinfos << "Playing back delayed packet " << packet_id << llendl;
358 LLTransferPacket *packetp = ttp->mDelayedPacketMap[packet_id];
359
360 // This is somewhat inefficient, but avoids us having to duplicate
361 // code between the off-the-wire and delayed paths.
362 packet_id = packetp->mPacketID;
363 size = packetp->mSize;
364 if (size)
365 {
366 if ((packetp->mDatap != NULL) && (size<(S32)sizeof(tmp_data)))
367 {
368 memcpy(tmp_data, packetp->mDatap, size);
369 }
370 }
371 status = packetp->mStatus;
372 ttp->mDelayedPacketMap.erase(packet_id);
373 delete packetp;
374 }
375 else
376 {
377 // No matching delayed packet, we're done.
378 break;
379 }
380
381 LLTSCode ret_code = ttp->dataCallback(packet_id, tmp_data, size);
382 if (ret_code == LLTS_OK)
383 {
384 ttp->setLastPacketID(packet_id);
385 }
386
387 if (status != LLTS_OK)
388 {
389 if (status != LLTS_DONE)
390 {
391 llwarns << "LLTransferManager::processTransferInfo Error in playback!" << llendl;
392 }
393 else
394 {
395 llinfos << "LLTransferManager::processTransferInfo replay FINISHED for " << transfer_id << llendl;
396 }
397 // This transfer is done, either via error or not.
398 ttp->completionCallback(status);
399 ttcp->deleteTransfer(ttp);
400 return;
401 }
402 }
403}
404
405
406//static
407void LLTransferManager::processTransferPacket(LLMessageSystem *msgp, void **)
408{
409 //llinfos << "LLTransferManager::processTransferPacket" << llendl;
410
411 LLUUID transfer_id;
412 LLTransferChannelType channel_type;
413 S32 packet_id;
414 LLTSCode status;
415 S32 size;
416 msgp->getUUID("TransferData", "TransferID", transfer_id);
417 msgp->getS32("TransferData", "ChannelType", (S32 &)channel_type);
418 msgp->getS32("TransferData", "Packet", packet_id);
419 msgp->getS32("TransferData", "Status", (S32 &)status);
420
421 // Find the transfer associated with this packet.
422 //llinfos << transfer_id << ":" << channel_type << llendl;
423 LLTransferTargetChannel *ttcp = gTransferManager.getTargetChannel(msgp->getSender(), channel_type);
424 if (!ttcp)
425 {
426 llwarns << "Target channel not found" << llendl;
427 return;
428 }
429
430 LLTransferTarget *ttp = ttcp->findTransferTarget(transfer_id);
431 if (!ttp)
432 {
433 llwarns << "Didn't find matching transfer for " << transfer_id
434 << " processing packet " << packet_id
435 << " from " << msgp->getSender() << llendl;
436 return;
437 }
438
439 size = msgp->getSize("TransferData", "Data");
440
441 S32 msg_bytes = 0;
442 if (msgp->getReceiveCompressedSize())
443 {
444 msg_bytes = msgp->getReceiveCompressedSize();
445 }
446 else
447 {
448 msg_bytes = msgp->getReceiveSize();
449 }
450 gTransferManager.addTransferBitsIn(ttcp->mChannelType, msg_bytes*8);
451
452 if ((size < 0) || (size > MAX_PACKET_DATA_SIZE))
453 {
454 llwarns << "Invalid transfer packet size " << size << llendl;
455 return;
456 }
457
458 U8 tmp_data[MAX_PACKET_DATA_SIZE];
459 if (size > 0)
460 {
461 // Only pull the data out if the size is > 0
462 msgp->getBinaryData("TransferData", "Data", tmp_data, size);
463 }
464
465 if ((!ttp->gotInfo()) || (ttp->getNextPacketID() != packet_id))
466 {
467 // Put this on a list of packets to be delivered later.
468 if(!ttp->addDelayedPacket(packet_id, status, tmp_data, size))
469 {
470 // Whoops - failed to add a delayed packet for some reason.
471 llwarns << "Too many delayed packets processing transfer "
472 << transfer_id << " from " << msgp->getSender() << llendl;
473 ttp->abortTransfer();
474 ttcp->deleteTransfer(ttp);
475 return;
476 }
477 const S32 LL_TRANSFER_WARN_GAP = 10;
478 if(!ttp->gotInfo())
479 {
480 llwarns << "Got data packet before information in transfer "
481 << transfer_id << " from " << msgp->getSender()
482 << ", got " << packet_id << llendl;
483 }
484 else if((packet_id - ttp->getNextPacketID()) > LL_TRANSFER_WARN_GAP)
485 {
486 llwarns << "Out of order packet in transfer " << transfer_id
487 << " from " << msgp->getSender() << ", got " << packet_id
488 << " expecting " << ttp->getNextPacketID() << llendl;
489 }
490 return;
491 }
492
493 // Loop through this until we're done with all delayed packets
494
495 //
496 // NOTE: THERE IS A CUT AND PASTE OF THIS CODE IN THE TRANSFERINFO HANDLER
497 // SO WE CAN PLAY BACK DELAYED PACKETS THERE!!!!!!!!!!!!!!!!!!!!!!!!!
498 //
499 BOOL done = FALSE;
500 while (!done)
501 {
502 LLTSCode ret_code = ttp->dataCallback(packet_id, tmp_data, size);
503 if (ret_code == LLTS_OK)
504 {
505 ttp->setLastPacketID(packet_id);
506 }
507
508 if (status != LLTS_OK)
509 {
510 if (status != LLTS_DONE)
511 {
512 llwarns << "LLTransferManager::processTransferPacket Error in transfer!" << llendl;
513 }
514 else
515 {
516// llinfos << "LLTransferManager::processTransferPacket done for " << transfer_id << llendl;
517 }
518 // This transfer is done, either via error or not.
519 ttp->completionCallback(status);
520 ttcp->deleteTransfer(ttp);
521 return;
522 }
523
524 // See if we've got any delayed packets
525 packet_id = ttp->getNextPacketID();
526 if (ttp->mDelayedPacketMap.find(packet_id) != ttp->mDelayedPacketMap.end())
527 {
528 // Perhaps this stuff should be inside a method in LLTransferPacket?
529 // I'm too lazy to do it now, though.
530 llinfos << "Playing back delayed packet " << packet_id << llendl;
531 LLTransferPacket *packetp = ttp->mDelayedPacketMap[packet_id];
532
533 // This is somewhat inefficient, but avoids us having to duplicate
534 // code between the off-the-wire and delayed paths.
535 packet_id = packetp->mPacketID;
536 size = packetp->mSize;
537 if (size)
538 {
539 if ((packetp->mDatap != NULL) && (size<(S32)sizeof(tmp_data)))
540 {
541 memcpy(tmp_data, packetp->mDatap, size);
542 }
543 }
544 status = packetp->mStatus;
545 ttp->mDelayedPacketMap.erase(packet_id);
546 delete packetp;
547 }
548 else
549 {
550 // No matching delayed packet, abort it.
551 done = TRUE;
552 }
553 }
554}
555
556
557//static
558void LLTransferManager::processTransferAbort(LLMessageSystem *msgp, void **)
559{
560 //llinfos << "LLTransferManager::processTransferPacket" << llendl;
561
562 LLUUID transfer_id;
563 LLTransferChannelType channel_type;
564 msgp->getUUID("TransferInfo", "TransferID", transfer_id);
565 msgp->getS32("TransferInfo", "ChannelType", (S32 &)channel_type);
566
567
568 // See if it's a target that we're trying to abort
569 // Find the transfer associated with this packet.
570 LLTransferTargetChannel *ttcp = gTransferManager.getTargetChannel(msgp->getSender(), channel_type);
571 if (ttcp)
572 {
573 LLTransferTarget *ttp = ttcp->findTransferTarget(transfer_id);
574 if (ttp)
575 {
576 ttp->abortTransfer();
577 ttcp->deleteTransfer(ttp);
578 return;
579 }
580 }
581
582 // Hmm, not a target. Maybe it's a source.
583 LLTransferSourceChannel *tscp = gTransferManager.getSourceChannel(msgp->getSender(), channel_type);
584 if (tscp)
585 {
586 LLTransferSource *tsp = tscp->findTransferSource(transfer_id);
587 if (tsp)
588 {
589 tsp->abortTransfer();
590 tscp->deleteTransfer(tsp);
591 return;
592 }
593 }
594
595 llwarns << "Couldn't find transfer " << transfer_id << " to abort!" << llendl;
596}
597
598
599//static
600void LLTransferManager::processTransferPriority(LLMessageSystem *msgp, void **)
601{
602 //llinfos << "LLTransferManager::processTransferPacket" << llendl;
603
604 LLUUID transfer_id;
605 LLTransferChannelType channel_type;
606 F32 priority = 0.f;
607 msgp->getUUID("TransferInfo", "TransferID", transfer_id);
608 msgp->getS32("TransferInfo", "ChannelType", (S32 &)channel_type);
609 msgp->getF32("TransferInfo", "Priority", priority);
610
611 // Hmm, not a target. Maybe it's a source.
612 LLTransferSourceChannel *tscp = gTransferManager.getSourceChannel(msgp->getSender(), channel_type);
613 if (tscp)
614 {
615 LLTransferSource *tsp = tscp->findTransferSource(transfer_id);
616 if (tsp)
617 {
618 tscp->updatePriority(tsp, priority);
619 return;
620 }
621 }
622
623 llwarns << "Couldn't find transfer " << transfer_id << " to set priority!" << llendl;
624}
625
626
627//static
628void LLTransferManager::reliablePacketCallback(void **user_data, S32 result)
629{
630 LLUUID *transfer_idp = (LLUUID *)user_data;
631 if (result)
632 {
633 llwarns << "Aborting reliable transfer " << *transfer_idp << " due to failed reliable resends!" << llendl;
634 LLTransferSource *tsp = gTransferManager.findTransferSource(*transfer_idp);
635 if (tsp)
636 {
637 LLTransferSourceChannel *tscp = tsp->mChannelp;
638 tsp->abortTransfer();
639 tscp->deleteTransfer(tsp);
640 }
641 }
642 delete transfer_idp;
643}
644
645//
646// LLTransferConnection implementation
647//
648
649LLTransferConnection::LLTransferConnection(const LLHost &host)
650{
651 mHost = host;
652}
653
654LLTransferConnection::~LLTransferConnection()
655{
656 tsc_iter itersc;
657 for (itersc = mTransferSourceChannels.begin(); itersc != mTransferSourceChannels.end(); itersc++)
658 {
659 delete *itersc;
660 }
661 mTransferSourceChannels.clear();
662
663 ttc_iter itertc;
664 for (itertc = mTransferTargetChannels.begin(); itertc != mTransferTargetChannels.end(); itertc++)
665 {
666 delete *itertc;
667 }
668 mTransferTargetChannels.clear();
669}
670
671
672void LLTransferConnection::updateTransfers()
673{
674 // Do stuff for source transfers (basically, send data out).
675 tsc_iter iter;
676 for (iter = mTransferSourceChannels.begin(); iter != mTransferSourceChannels.end(); iter++)
677 {
678 (*iter)->updateTransfers();
679 }
680
681 // Do stuff for target transfers
682 // Primarily, we should be aborting transfers that are irredeemably broken
683 // (large packet gaps that don't appear to be getting filled in, most likely)
684 // Probably should NOT be doing timeouts for other things, as new priority scheme
685 // means that a high priority transfer COULD block a transfer for a long time.
686}
687
688
689LLTransferSourceChannel *LLTransferConnection::getSourceChannel(const LLTransferChannelType channel_type)
690{
691 tsc_iter iter;
692 for (iter = mTransferSourceChannels.begin(); iter != mTransferSourceChannels.end(); iter++)
693 {
694 if ((*iter)->getChannelType() == channel_type)
695 {
696 return *iter;
697 }
698 }
699
700 LLTransferSourceChannel *tscp = new LLTransferSourceChannel(channel_type, mHost);
701 mTransferSourceChannels.push_back(tscp);
702 return tscp;
703}
704
705
706LLTransferTargetChannel *LLTransferConnection::getTargetChannel(const LLTransferChannelType channel_type)
707{
708 ttc_iter iter;
709 for (iter = mTransferTargetChannels.begin(); iter != mTransferTargetChannels.end(); iter++)
710 {
711 if ((*iter)->getChannelType() == channel_type)
712 {
713 return *iter;
714 }
715 }
716
717 LLTransferTargetChannel *ttcp = new LLTransferTargetChannel(channel_type, mHost);
718 mTransferTargetChannels.push_back(ttcp);
719 return ttcp;
720}
721
722
723//
724// LLTransferSourceChannel implementation
725//
726
727const S32 DEFAULT_PACKET_SIZE = 1000;
728
729
730LLTransferSourceChannel::LLTransferSourceChannel(const LLTransferChannelType channel_type, const LLHost &host) :
731 mChannelType(channel_type),
732 mHost(host),
733 mTransferSources(LLTransferSource::sSetPriority, LLTransferSource::sGetPriority),
734 mThrottleID(TC_ASSET)
735{
736}
737
738
739LLTransferSourceChannel::~LLTransferSourceChannel()
740{
741 LLPriQueueMap<LLTransferSource *>::pqm_iter iter;
742 for (iter = mTransferSources.mMap.begin(); iter != mTransferSources.mMap.end(); iter++)
743 {
744 // Just kill off all of the transfers
745 (*iter).second->abortTransfer();
746 delete iter->second;
747 }
748 mTransferSources.mMap.clear();
749}
750
751void LLTransferSourceChannel::updatePriority(LLTransferSource *tsp, const F32 priority)
752{
753 mTransferSources.reprioritize(priority, tsp);
754}
755
756void LLTransferSourceChannel::updateTransfers()
757{
758 // Actually, this should do the following:
759 // Decide if we can actually send data.
760 // If so, update priorities so we know who gets to send it.
761 // Send data from the sources, while updating until we've sent our throttle allocation.
762
763 LLCircuitData *cdp = gMessageSystem->mCircuitInfo.findCircuit(getHost());
764 if (!cdp)
765 {
766 return;
767 }
768
769 if (cdp->isBlocked())
770 {
771 // *NOTE: We need to make sure that the throttle bits
772 // available gets reset.
773
774 // We DON'T want to send any packets if they're blocked, they'll just end up
775 // piling up on the other end.
776 //llwarns << "Blocking transfers due to blocked circuit for " << getHost() << llendl;
777 return;
778 }
779
780 const S32 throttle_id = mThrottleID;
781
782 LLThrottleGroup &tg = cdp->getThrottleGroup();
783
784 if (tg.checkOverflow(throttle_id, 0.f))
785 {
786 return;
787 }
788
789 LLPriQueueMap<LLTransferSource *>::pqm_iter iter;
790
791
792 BOOL done = FALSE;
793 for (iter = mTransferSources.mMap.begin(); (iter != mTransferSources.mMap.end()) && !done;)
794 {
795 //llinfos << "LLTransferSourceChannel::updateTransfers()" << llendl;
796 // Do stuff.
797 LLTransferSource *tsp = iter->second;
798 U8 *datap = NULL;
799 S32 data_size = 0;
800 BOOL delete_data = FALSE;
801 S32 packet_id = 0;
802 S32 sent_bytes = 0;
803 LLTSCode status = LLTS_OK;
804
805 // Get the packetID for the next packet that we're transferring.
806 packet_id = tsp->getNextPacketID();
807 status = tsp->dataCallback(packet_id, DEFAULT_PACKET_SIZE, &datap, data_size, delete_data);
808
809 if (status == LLTS_SKIP)
810 {
811 // We don't have any data, but we're not done, just go on.
812 // This will presumably be used for streaming or async transfers that
813 // are stalled waiting for data from another source.
814 iter++;
815 continue;
816 }
817
818 LLUUID *cb_uuid = new LLUUID(tsp->getID());
819
820 // Send the data now, even if it's an error.
821 // The status code will tell the other end what to do.
822 gMessageSystem->newMessage("TransferPacket");
823 gMessageSystem->nextBlock("TransferData");
824 gMessageSystem->addUUID("TransferID", tsp->getID());
825 gMessageSystem->addS32("ChannelType", getChannelType());
826 gMessageSystem->addS32("Packet", packet_id); // HACK! Need to put in a REAL packet id
827 gMessageSystem->addS32("Status", status);
828 gMessageSystem->addBinaryData("Data", datap, data_size);
829 sent_bytes = gMessageSystem->getCurrentSendTotal();
830 gMessageSystem->sendReliable(getHost(), LL_DEFAULT_RELIABLE_RETRIES, TRUE, 0.f,
831 LLTransferManager::reliablePacketCallback, (void**)cb_uuid);
832
833 // Do bookkeeping for the throttle
834 done = tg.throttleOverflow(throttle_id, sent_bytes*8.f);
835 gTransferManager.addTransferBitsOut(mChannelType, sent_bytes*8);
836
837 // Clean up our temporary data.
838 if (delete_data)
839 {
840 delete[] datap;
841 datap = NULL;
842 }
843
844 // Update the packet counter
845 tsp->setLastPacketID(packet_id);
846
847 switch (status)
848 {
849 case LLTS_OK:
850 // We're OK, don't need to do anything. Keep sending data.
851 break;
852 case LLTS_ERROR:
853 llwarns << "Error in transfer dataCallback!" << llendl;
854 case LLTS_DONE:
855 // We need to clean up this transfer source.
856 //llinfos << "LLTransferSourceChannel::updateTransfers() " << tsp->getID() << " done" << llendl;
857 tsp->completionCallback(status);
858 delete tsp;
859
860 mTransferSources.mMap.erase(iter++);
861 break;
862 default:
863 llerrs << "Unknown transfer error code!" << llendl;
864 }
865
866 // At this point, we should do priority adjustment (since some transfers like
867 // streaming transfers will adjust priority based on how much they've sent and time,
868 // but I'm not going to bother yet. - djs.
869 }
870}
871
872
873void LLTransferSourceChannel::addTransferSource(LLTransferSource *sourcep)
874{
875 sourcep->mChannelp = this;
876 mTransferSources.push(sourcep->getPriority(), sourcep);
877}
878
879
880LLTransferSource *LLTransferSourceChannel::findTransferSource(const LLUUID &transfer_id)
881{
882 LLPriQueueMap<LLTransferSource *>::pqm_iter iter;
883 for (iter = mTransferSources.mMap.begin(); iter != mTransferSources.mMap.end(); iter++)
884 {
885 LLTransferSource *tsp = iter->second;
886 if (tsp->getID() == transfer_id)
887 {
888 return tsp;
889 }
890 }
891 return NULL;
892}
893
894
895BOOL LLTransferSourceChannel::deleteTransfer(LLTransferSource *tsp)
896{
897 LLPriQueueMap<LLTransferSource *>::pqm_iter iter;
898 for (iter = mTransferSources.mMap.begin(); iter != mTransferSources.mMap.end(); iter++)
899 {
900 if (iter->second == tsp)
901 {
902 break;
903 }
904 }
905
906 if (iter == mTransferSources.mMap.end())
907 {
908 llerrs << "Unable to find transfer source to delete!" << llendl;
909 return FALSE;
910 }
911 mTransferSources.mMap.erase(iter);
912 delete tsp;
913 return TRUE;
914}
915
916
917//
918// LLTransferTargetChannel implementation
919//
920
921LLTransferTargetChannel::LLTransferTargetChannel(const LLTransferChannelType channel_type, const LLHost &host) :
922 mChannelType(channel_type),
923 mHost(host)
924{
925}
926
927LLTransferTargetChannel::~LLTransferTargetChannel()
928{
929 tt_iter iter;
930 for (iter = mTransferTargets.begin(); iter != mTransferTargets.end(); iter++)
931 {
932 // Abort all of the current transfers
933 (*iter)->abortTransfer();
934 delete *iter;
935 }
936 mTransferTargets.clear();
937}
938
939
940void LLTransferTargetChannel::requestTransfer(
941 const LLTransferSourceParams& source_params,
942 const LLTransferTargetParams& target_params,
943 const F32 priority)
944{
945 LLUUID id;
946 id.generate();
947 LLTransferTarget* ttp = LLTransferTarget::createTarget(
948 target_params.getType(),
949 id,
950 source_params.getType());
951 if (!ttp)
952 {
953 llwarns << "LLTransferManager::requestTransfer aborting due to target creation failure!" << llendl;
954 }
955
956 ttp->applyParams(target_params);
957 addTransferTarget(ttp);
958
959 sendTransferRequest(ttp, source_params, priority);
960}
961
962
963void LLTransferTargetChannel::sendTransferRequest(LLTransferTarget *targetp,
964 const LLTransferSourceParams &params,
965 const F32 priority)
966{
967 //
968 // Pack the message with data which explains how to get the source, and
969 // send it off to the source for this channel.
970 //
971 llassert(targetp);
972 llassert(targetp->getChannel() == this);
973
974 gMessageSystem->newMessage("TransferRequest");
975 gMessageSystem->nextBlock("TransferInfo");
976 gMessageSystem->addUUID("TransferID", targetp->getID());
977 gMessageSystem->addS32("SourceType", params.getType());
978 gMessageSystem->addS32("ChannelType", getChannelType());
979 gMessageSystem->addF32("Priority", priority);
980
981 U8 tmp[MAX_PARAMS_SIZE];
982 LLDataPackerBinaryBuffer dp(tmp, MAX_PARAMS_SIZE);
983 params.packParams(dp);
984 S32 len = dp.getCurrentSize();
985 gMessageSystem->addBinaryData("Params", tmp, len);
986
987 gMessageSystem->sendReliable(mHost);
988}
989
990
991void LLTransferTargetChannel::addTransferTarget(LLTransferTarget *targetp)
992{
993 targetp->mChannelp = this;
994 mTransferTargets.push_back(targetp);
995}
996
997
998LLTransferTarget *LLTransferTargetChannel::findTransferTarget(const LLUUID &transfer_id)
999{
1000 tt_iter iter;
1001 for (iter = mTransferTargets.begin(); iter != mTransferTargets.end(); iter++)
1002 {
1003 LLTransferTarget *ttp = *iter;
1004 if (ttp->getID() == transfer_id)
1005 {
1006 return ttp;
1007 }
1008 }
1009 return NULL;
1010}
1011
1012
1013BOOL LLTransferTargetChannel::deleteTransfer(LLTransferTarget *ttp)
1014{
1015 tt_iter iter;
1016 for (iter = mTransferTargets.begin(); iter != mTransferTargets.end(); iter++)
1017 {
1018 if (*iter == ttp)
1019 {
1020 break;
1021 }
1022 }
1023
1024 if (iter == mTransferTargets.end())
1025 {
1026 llerrs << "Unable to find transfer target to delete!" << llendl;
1027 return FALSE;
1028 }
1029 mTransferTargets.erase(iter);
1030 delete ttp;
1031 return TRUE;
1032}
1033
1034
1035//
1036// LLTransferSource implementation
1037//
1038
1039LLTransferSource::LLTransferSource(const LLTransferSourceType type,
1040 const LLUUID &transfer_id,
1041 const F32 priority) :
1042 mType(type),
1043 mID(transfer_id),
1044 mChannelp(NULL),
1045 mPriority(priority),
1046 mSize(0),
1047 mLastPacketID(-1)
1048{
1049 setPriority(priority);
1050}
1051
1052
1053LLTransferSource::~LLTransferSource()
1054{
1055 // No actual cleanup of the transfer is done here, this is purely for
1056 // memory cleanup. The completionCallback is guaranteed to get called
1057 // before this happens.
1058}
1059
1060
1061void LLTransferSource::sendTransferStatus(LLTSCode status)
1062{
1063 gMessageSystem->newMessage("TransferInfo");
1064 gMessageSystem->nextBlock("TransferInfo");
1065 gMessageSystem->addUUID("TransferID", getID());
1066 gMessageSystem->addS32("TargetType", LLTTT_UNKNOWN);
1067 gMessageSystem->addS32("ChannelType", mChannelp->getChannelType());
1068 gMessageSystem->addS32("Status", status);
1069 gMessageSystem->addS32("Size", mSize);
1070 U8 tmp[MAX_PARAMS_SIZE];
1071 LLDataPackerBinaryBuffer dp(tmp, MAX_PARAMS_SIZE);
1072 packParams(dp);
1073 S32 len = dp.getCurrentSize();
1074 gMessageSystem->addBinaryData("Params", tmp, len);
1075 gMessageSystem->sendReliable(mChannelp->getHost());
1076
1077 // Abort if there was as asset system issue.
1078 if (status != LLTS_OK)
1079 {
1080 completionCallback(status);
1081 mChannelp->deleteTransfer(this);
1082 }
1083}
1084
1085
1086// This should never be called directly, the transfer manager is responsible for
1087// aborting the transfer from the channel. I might want to rethink this in the
1088// future, though.
1089void LLTransferSource::abortTransfer()
1090{
1091 // Send a message down, call the completion callback
1092 llinfos << "Aborting transfer " << getID() << " to " << mChannelp->getHost() << llendl;
1093 gMessageSystem->newMessage("TransferAbort");
1094 gMessageSystem->nextBlock("TransferInfo");
1095 gMessageSystem->addUUID("TransferID", getID());
1096 gMessageSystem->addS32("ChannelType", mChannelp->getChannelType());
1097 gMessageSystem->sendReliable(mChannelp->getHost());
1098
1099 completionCallback(LLTS_ABORT);
1100}
1101
1102
1103//static
1104void LLTransferSource::registerSourceType(const LLTransferSourceType stype, LLTransferSourceCreateFunc func)
1105{
1106 if (sSourceCreateMap.count(stype))
1107 {
1108 // Disallow changing what class handles a source type
1109 // Unclear when you would want to do this, and whether it would work.
1110 llerrs << "Reregistering source type " << stype << llendl;
1111 }
1112 else
1113 {
1114 sSourceCreateMap[stype] = func;
1115 }
1116}
1117
1118//static
1119LLTransferSource *LLTransferSource::createSource(const LLTransferSourceType stype,
1120 const LLUUID &id,
1121 const F32 priority)
1122{
1123 switch (stype)
1124 {
1125 // *NOTE: The source file transfer mechanism is highly insecure and could
1126 // lead to easy exploitation of a server process.
1127 // I have removed all uses of it from the codebase. Phoenix.
1128 //
1129 //case LLTST_FILE:
1130 // return new LLTransferSourceFile(id, priority);
1131 case LLTST_ASSET:
1132 return new LLTransferSourceAsset(id, priority);
1133 default:
1134 {
1135 if (!sSourceCreateMap.count(stype))
1136 {
1137 // Use the callback to create the source type if it's not there.
1138 llwarns << "Unknown transfer source type: " << stype << llendl;
1139 return NULL;
1140 }
1141 return (sSourceCreateMap[stype])(id, priority);
1142 }
1143 }
1144}
1145
1146
1147// static
1148void LLTransferSource::sSetPriority(LLTransferSource *&tsp, const F32 priority)
1149{
1150 tsp->setPriority(priority);
1151}
1152
1153
1154// static
1155F32 LLTransferSource::sGetPriority(LLTransferSource *&tsp)
1156{
1157 return tsp->getPriority();
1158}
1159
1160
1161//
1162// LLTransferPacket implementation
1163//
1164
1165LLTransferPacket::LLTransferPacket(const S32 packet_id, const LLTSCode status, const U8 *datap, const S32 size) :
1166 mPacketID(packet_id),
1167 mStatus(status),
1168 mDatap(NULL),
1169 mSize(size)
1170{
1171 if (size == 0)
1172 {
1173 return;
1174 }
1175
1176 mDatap = new U8[size];
1177 if (mDatap != NULL)
1178 {
1179 memcpy(mDatap, datap, size);
1180 }
1181}
1182
1183LLTransferPacket::~LLTransferPacket()
1184{
1185 delete[] mDatap;
1186}
1187
1188//
1189// LLTransferTarget implementation
1190//
1191
1192LLTransferTarget::LLTransferTarget(
1193 LLTransferTargetType type,
1194 const LLUUID& transfer_id,
1195 LLTransferSourceType source_type) :
1196 mType(type),
1197 mSourceType(source_type),
1198 mID(transfer_id),
1199 mGotInfo(FALSE),
1200 mSize(0),
1201 mLastPacketID(-1)
1202{
1203}
1204
1205LLTransferTarget::~LLTransferTarget()
1206{
1207 // No actual cleanup of the transfer is done here, this is purely for
1208 // memory cleanup. The completionCallback is guaranteed to get called
1209 // before this happens.
1210 tpm_iter iter;
1211 for (iter = mDelayedPacketMap.begin(); iter != mDelayedPacketMap.end(); iter++)
1212 {
1213 delete iter->second;
1214 }
1215 mDelayedPacketMap.clear();
1216}
1217
1218// This should never be called directly, the transfer manager is responsible for
1219// aborting the transfer from the channel. I might want to rethink this in the
1220// future, though.
1221void LLTransferTarget::abortTransfer()
1222{
1223 // Send a message up, call the completion callback
1224 llinfos << "Aborting transfer " << getID() << " from " << mChannelp->getHost() << llendl;
1225 gMessageSystem->newMessage("TransferAbort");
1226 gMessageSystem->nextBlock("TransferInfo");
1227 gMessageSystem->addUUID("TransferID", getID());
1228 gMessageSystem->addS32("ChannelType", mChannelp->getChannelType());
1229 gMessageSystem->sendReliable(mChannelp->getHost());
1230
1231 completionCallback(LLTS_ABORT);
1232}
1233
1234bool LLTransferTarget::addDelayedPacket(
1235 const S32 packet_id,
1236 const LLTSCode status,
1237 U8* datap,
1238 const S32 size)
1239{
1240 const transfer_packet_map::size_type LL_MAX_DELAYED_PACKETS = 100;
1241 if(mDelayedPacketMap.size() > LL_MAX_DELAYED_PACKETS)
1242 {
1243 // too many delayed packets
1244 return false;
1245 }
1246
1247 LLTransferPacket* tpp = new LLTransferPacket(
1248 packet_id,
1249 status,
1250 datap,
1251 size);
1252
1253#ifdef _DEBUG
1254 if (mDelayedPacketMap.find(packet_id) != mDelayedPacketMap.end())
1255 {
1256 llerrs << "Packet ALREADY in delayed packet map!" << llendl;
1257 }
1258#endif
1259
1260 mDelayedPacketMap[packet_id] = tpp;
1261 return true;
1262}
1263
1264
1265LLTransferTarget* LLTransferTarget::createTarget(
1266 LLTransferTargetType type,
1267 const LLUUID& id,
1268 LLTransferSourceType source_type)
1269{
1270 switch (type)
1271 {
1272 case LLTTT_FILE:
1273 return new LLTransferTargetFile(id, source_type);
1274 case LLTTT_VFILE:
1275 return new LLTransferTargetVFile(id, source_type);
1276 default:
1277 llwarns << "Unknown transfer target type: " << type << llendl;
1278 return NULL;
1279 }
1280}
1281
1282
1283LLTransferSourceParamsInvItem::LLTransferSourceParamsInvItem() : LLTransferSourceParams(LLTST_SIM_INV_ITEM), mAssetType(LLAssetType::AT_NONE)
1284{
1285}
1286
1287
1288void LLTransferSourceParamsInvItem::setAgentSession(const LLUUID &agent_id, const LLUUID &session_id)
1289{
1290 mAgentID = agent_id;
1291 mSessionID = session_id;
1292}
1293
1294
1295void LLTransferSourceParamsInvItem::setInvItem(const LLUUID &owner_id, const LLUUID &task_id, const LLUUID &item_id)
1296{
1297 mOwnerID = owner_id;
1298 mTaskID = task_id;
1299 mItemID = item_id;
1300}
1301
1302
1303void LLTransferSourceParamsInvItem::setAsset(const LLUUID &asset_id, const LLAssetType::EType asset_type)
1304{
1305 mAssetID = asset_id;
1306 mAssetType = asset_type;
1307}
1308
1309
1310void LLTransferSourceParamsInvItem::packParams(LLDataPacker &dp) const
1311{
1312 lldebugs << "LLTransferSourceParamsInvItem::packParams()" << llendl;
1313 dp.packUUID(mAgentID, "AgentID");
1314 dp.packUUID(mSessionID, "SessionID");
1315 dp.packUUID(mOwnerID, "OwnerID");
1316 dp.packUUID(mTaskID, "TaskID");
1317 dp.packUUID(mItemID, "ItemID");
1318 dp.packUUID(mAssetID, "AssetID");
1319 dp.packS32(mAssetType, "AssetType");
1320}
1321
1322
1323BOOL LLTransferSourceParamsInvItem::unpackParams(LLDataPacker &dp)
1324{
1325 S32 tmp_at;
1326
1327 dp.unpackUUID(mAgentID, "AgentID");
1328 dp.unpackUUID(mSessionID, "SessionID");
1329 dp.unpackUUID(mOwnerID, "OwnerID");
1330 dp.unpackUUID(mTaskID, "TaskID");
1331 dp.unpackUUID(mItemID, "ItemID");
1332 dp.unpackUUID(mAssetID, "AssetID");
1333 dp.unpackS32(tmp_at, "AssetType");
1334
1335 mAssetType = (LLAssetType::EType)tmp_at;
1336
1337 return TRUE;
1338}
1339
1340LLTransferSourceParamsEstate::LLTransferSourceParamsEstate() :
1341 LLTransferSourceParams(LLTST_SIM_ESTATE), mEstateAssetType(ET_NONE)
1342{
1343}
1344
1345void LLTransferSourceParamsEstate::setAgentSession(const LLUUID &agent_id, const LLUUID &session_id)
1346{
1347 mAgentID = agent_id;
1348 mSessionID = session_id;
1349}
1350
1351void LLTransferSourceParamsEstate::setEstateAssetType(const EstateAssetType etype)
1352{
1353 mEstateAssetType = etype;
1354}
1355
1356void LLTransferSourceParamsEstate::setAsset(const LLUUID &asset_id, const LLAssetType::EType asset_type)
1357{
1358 mAssetID = asset_id;
1359 mAssetType = asset_type;
1360}
1361
1362void LLTransferSourceParamsEstate::packParams(LLDataPacker &dp) const
1363{
1364 dp.packUUID(mAgentID, "AgentID");
1365 // *NOTE: We do not want to pass the session id from the server to
1366 // the client, but I am not sure if anyone expects this value to
1367 // be set on the client.
1368 dp.packUUID(mSessionID, "SessionID");
1369 dp.packS32(mEstateAssetType, "EstateAssetType");
1370}
1371
1372
1373BOOL LLTransferSourceParamsEstate::unpackParams(LLDataPacker &dp)
1374{
1375 S32 tmp_et;
1376
1377 dp.unpackUUID(mAgentID, "AgentID");
1378 dp.unpackUUID(mSessionID, "SessionID");
1379 dp.unpackS32(tmp_et, "EstateAssetType");
1380
1381 mEstateAssetType = (EstateAssetType)tmp_et;
1382
1383 return TRUE;
1384}