aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/linden/indra/llmessage/llpumpio.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'linden/indra/llmessage/llpumpio.cpp')
-rw-r--r--linden/indra/llmessage/llpumpio.cpp1025
1 files changed, 1025 insertions, 0 deletions
diff --git a/linden/indra/llmessage/llpumpio.cpp b/linden/indra/llmessage/llpumpio.cpp
new file mode 100644
index 0000000..853a438
--- /dev/null
+++ b/linden/indra/llmessage/llpumpio.cpp
@@ -0,0 +1,1025 @@
1/**
2 * @file llpumpio.cpp
3 * @author Phoenix
4 * @date 2004-11-21
5 * @brief Implementation of the i/o pump and related functions.
6 *
7 * Copyright (c) 2004-2007, Linden Research, Inc.
8 *
9 * The source code in this file ("Source Code") is provided by Linden Lab
10 * to you under the terms of the GNU General Public License, version 2.0
11 * ("GPL"), unless you have obtained a separate licensing agreement
12 * ("Other License"), formally executed by you and Linden Lab. Terms of
13 * the GPL can be found in doc/GPL-license.txt in this distribution, or
14 * online at http://secondlife.com/developers/opensource/gplv2
15 *
16 * There are special exceptions to the terms and conditions of the GPL as
17 * it is applied to this Source Code. View the full text of the exception
18 * in the file doc/FLOSS-exception.txt in this software distribution, or
19 * online at http://secondlife.com/developers/opensource/flossexception
20 *
21 * By copying, modifying or distributing this software, you acknowledge
22 * that you have read and understood your obligations described above,
23 * and agree to abide by those obligations.
24 *
25 * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO
26 * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY,
27 * COMPLETENESS OR PERFORMANCE.
28 */
29
30#include "linden_common.h"
31#include "llpumpio.h"
32
33#include <set>
34#include "apr-1/apr_poll.h"
35
36#include "llapr.h"
37#include "llmemtype.h"
38#include "llstl.h"
39
40// This should not be in production, but it is intensely useful during
41// development.
42#if LL_LINUX
43#define LL_DEBUG_PIPE_TYPE_IN_PUMP 0
44#endif
45
46#if LL_DEBUG_PIPE_TYPE_IN_PUMP
47#include <typeinfo>
48#endif
49
50// constants for poll timeout. if we are threading, we want to have a
51// longer poll timeout.
52#if LL_THREADS_APR
53static const S32 DEFAULT_POLL_TIMEOUT = 1000;
54#else
55static const S32 DEFAULT_POLL_TIMEOUT = 0;
56#endif
57
58// The default (and fallback) expiration time for chains
59const F32 DEFAULT_CHAIN_EXPIRY_SECS = 30.0f;
60extern const F32 SHORT_CHAIN_EXPIRY_SECS = 1.0f;
61extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f;
62
63// sorta spammy debug modes.
64//#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR 1
65//#define LL_DEBUG_PROCESS_LINK 1
66//#define LL_DEBUG_PROCESS_RETURN_VALUE 1
67
68// Super spammy debug mode.
69//#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 1
70//#define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 1
71
72/**
73 * @class
74 */
75class LLChainSleeper : public LLRunnable
76{
77public:
78 static LLRunner::run_ptr_t build(LLPumpIO* pump, S32 key)
79 {
80 return LLRunner::run_ptr_t(new LLChainSleeper(pump, key));
81 }
82
83 virtual void run(LLRunner* runner, S64 handle)
84 {
85 mPump->clearLock(mKey);
86 }
87
88protected:
89 LLChainSleeper(LLPumpIO* pump, S32 key) : mPump(pump), mKey(key) {}
90 LLPumpIO* mPump;
91 S32 mKey;
92};
93
94
95/**
96 * @struct ll_delete_apr_pollset_fd_client_data
97 * @brief This is a simple helper class to clean up our client data.
98 */
99struct ll_delete_apr_pollset_fd_client_data
100{
101 typedef std::pair<LLIOPipe::ptr_t, apr_pollfd_t> pipe_conditional_t;
102 void operator()(const pipe_conditional_t& conditional)
103 {
104 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
105 S32* client_id = (S32*)conditional.second.client_data;
106 delete client_id;
107 }
108};
109
110/**
111 * LLPumpIO
112 */
113LLPumpIO::LLPumpIO(apr_pool_t* pool) :
114 mState(LLPumpIO::NORMAL),
115 mRebuildPollset(false),
116 mPollset(NULL),
117 mPollsetClientID(0),
118 mNextLock(0),
119 mPool(NULL),
120 mCurrentPool(NULL),
121 mCurrentPoolReallocCount(0),
122 mChainsMutex(NULL),
123 mCallbackMutex(NULL)
124{
125 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
126 initialize(pool);
127}
128
129LLPumpIO::~LLPumpIO()
130{
131 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
132 cleanup();
133}
134
135bool LLPumpIO::prime(apr_pool_t* pool)
136{
137 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
138 cleanup();
139 initialize(pool);
140 return ((pool == NULL) ? false : true);
141}
142
143bool LLPumpIO::addChain(const chain_t& chain, F32 timeout)
144{
145 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
146 if(chain.empty()) return false;
147
148#if LL_THREADS_APR
149 LLScopedLock lock(mChainsMutex);
150#endif
151 LLChainInfo info;
152 info.setTimeoutSeconds(timeout);
153 info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray);
154 LLLinkInfo link;
155#if LL_DEBUG_PIPE_TYPE_IN_PUMP
156 lldebugs << "LLPumpIO::addChain() " << chain[0] << " '"
157 << typeid(*(chain[0])).name() << "'" << llendl;
158#else
159 lldebugs << "LLPumpIO::addChain() " << chain[0] <<llendl;
160#endif
161 chain_t::const_iterator it = chain.begin();
162 chain_t::const_iterator end = chain.end();
163 for(; it != end; ++it)
164 {
165 link.mPipe = (*it);
166 link.mChannels = info.mData->nextChannel();
167 info.mChainLinks.push_back(link);
168 }
169 mPendingChains.push_back(info);
170 return true;
171}
172
173bool LLPumpIO::addChain(
174 const LLPumpIO::links_t& links,
175 LLIOPipe::buffer_ptr_t data,
176 LLSD context,
177 F32 timeout)
178{
179 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
180
181 // remember that if the caller is providing a full link
182 // description, we need to have that description matched to a
183 // particular buffer.
184 if(!data) return false;
185 if(links.empty()) return false;
186
187#if LL_THREADS_APR
188 LLScopedLock lock(mChainsMutex);
189#endif
190#if LL_DEBUG_PIPE_TYPE_IN_PUMP
191 lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << " '"
192 << typeid(*(links[0].mPipe)).name() << "'" << llendl;
193#else
194 lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << llendl;
195#endif
196 LLChainInfo info;
197 info.setTimeoutSeconds(timeout);
198 info.mChainLinks = links;
199 info.mData = data;
200 info.mContext = context;
201 mPendingChains.push_back(info);
202 return true;
203}
204
205bool LLPumpIO::setTimeoutSeconds(F32 timeout)
206{
207 // If no chain is running, return failure.
208 if(current_chain_t() == mCurrentChain)
209 {
210 return false;
211 }
212 (*mCurrentChain).setTimeoutSeconds(timeout);
213 return true;
214}
215
216bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll)
217{
218 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
219 //lldebugs << "LLPumpIO::setConditional" << llendl;
220 if(pipe)
221 {
222 // remove any matching poll file descriptors for this pipe.
223 LLIOPipe::ptr_t pipe_ptr(pipe);
224
225 LLChainInfo::conditionals_t::iterator it = (*mCurrentChain).mDescriptors.begin();
226 LLChainInfo::conditionals_t::iterator end = (*mCurrentChain).mDescriptors.end();
227 while (it != end)
228 {
229 LLChainInfo::pipe_conditional_t& value = (*it);
230 if ( pipe_ptr == value.first )
231 {
232 ll_delete_apr_pollset_fd_client_data()(value);
233 (*mCurrentChain).mDescriptors.erase(it++);
234 mRebuildPollset = true;
235 }
236 else
237 {
238 ++it;
239 }
240 }
241
242 if(poll)
243 {
244 LLChainInfo::pipe_conditional_t value;
245 value.first = pipe_ptr;
246 value.second = *poll;
247 if(!poll->p)
248 {
249 // each fd needs a pool to work with, so if one was
250 // not specified, use this pool.
251 // *FIX: Should it always be this pool?
252 value.second.p = mPool;
253 }
254 value.second.client_data = new S32(++mPollsetClientID);
255 (*mCurrentChain).mDescriptors.push_back(value);
256 mRebuildPollset = true;
257 }
258 return true;
259 }
260 return false;
261}
262
263S32 LLPumpIO::setLock()
264{
265 // *NOTE: I do not think it is necessary to acquire a mutex here
266 // since this should only be called during the pump(), and should
267 // only change the running chain. Any other use of this method is
268 // incorrect usage. If it becomes necessary to acquire a lock
269 // here, be sure to lock here and call a protected method to get
270 // the lock, and sleepChain() should probably acquire the same
271 // lock while and calling the same protected implementation to
272 // lock the runner at the same time.
273
274 // If no chain is running, return failure.
275 if(current_chain_t() == mCurrentChain)
276 {
277 return 0;
278 }
279
280 // deal with wrap.
281 if(++mNextLock <= 0)
282 {
283 mNextLock = 1;
284 }
285
286 // set the lock
287 (*mCurrentChain).mLock = mNextLock;
288 return mNextLock;
289}
290
291void LLPumpIO::clearLock(S32 key)
292{
293 // We need to lock it here since we do not want to be iterating
294 // over the chains twice. We can safely call process() while this
295 // is happening since we should not be erasing a locked pipe, and
296 // therefore won't be treading into deleted memory. I think we can
297 // also clear the lock on the chain safely since the pump only
298 // reads that value.
299#if LL_THREADS_APR
300 LLScopedLock lock(mChainsMutex);
301#endif
302 mClearLocks.insert(key);
303}
304
305bool LLPumpIO::sleepChain(F64 seconds)
306{
307 // Much like the call to setLock(), this should only be called
308 // from one chain during processing, so there is no need to
309 // acquire a mutex.
310 if(seconds <= 0.0) return false;
311 S32 key = setLock();
312 if(!key) return false;
313 LLRunner::run_handle_t handle = mRunner.addRunnable(
314 LLChainSleeper::build(this, key),
315 LLRunner::RUN_IN,
316 seconds);
317 if(0 == handle) return false;
318 return true;
319}
320
321bool LLPumpIO::copyCurrentLinkInfo(links_t& links) const
322{
323 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
324 if(current_chain_t() == mCurrentChain)
325 {
326 return false;
327 }
328 std::copy(
329 (*mCurrentChain).mChainLinks.begin(),
330 (*mCurrentChain).mChainLinks.end(),
331 std::back_insert_iterator<links_t>(links));
332 return true;
333}
334
335void LLPumpIO::pump()
336{
337 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
338 //llinfos << "LLPumpIO::pump()" << llendl;
339
340 // Run any pending runners.
341 mRunner.run();
342
343 // We need to move all of the pending heads over to the running
344 // chains.
345 PUMP_DEBUG;
346 if(true)
347 {
348#if LL_THREADS_APR
349 LLScopedLock lock(mChainsMutex);
350#endif
351 // bail if this pump is paused.
352 if(PAUSING == mState)
353 {
354 mState = PAUSED;
355 }
356 if(PAUSED == mState)
357 {
358 return;
359 }
360
361 PUMP_DEBUG;
362 // Move the pending chains over to the running chaings
363 if(!mPendingChains.empty())
364 {
365 PUMP_DEBUG;
366 //lldebugs << "Pushing " << mPendingChains.size() << "." << llendl;
367 std::copy(
368 mPendingChains.begin(),
369 mPendingChains.end(),
370 std::back_insert_iterator<running_chains_t>(mRunningChains));
371 mPendingChains.clear();
372 PUMP_DEBUG;
373 }
374
375 // Clear any locks. This needs to be done here so that we do
376 // not clash during a call to clearLock().
377 if(!mClearLocks.empty())
378 {
379 PUMP_DEBUG;
380 running_chains_t::iterator it = mRunningChains.begin();
381 running_chains_t::iterator end = mRunningChains.end();
382 std::set<S32>::iterator not_cleared = mClearLocks.end();
383 for(; it != end; ++it)
384 {
385 if((*it).mLock && mClearLocks.find((*it).mLock) != not_cleared)
386 {
387 (*it).mLock = 0;
388 }
389 }
390 PUMP_DEBUG;
391 mClearLocks.clear();
392 }
393 }
394
395 PUMP_DEBUG;
396 // rebuild the pollset if necessary
397 if(mRebuildPollset)
398 {
399 PUMP_DEBUG;
400 rebuildPollset();
401 mRebuildPollset = false;
402 }
403
404 // Poll based on the last known pollset
405 // *FIX: may want to pass in a poll timeout so it works correctly
406 // in single and multi threaded processes.
407 PUMP_DEBUG;
408 typedef std::set<S32> signal_client_t;
409 signal_client_t signalled_client;
410 if(mPollset)
411 {
412 PUMP_DEBUG;
413 //llinfos << "polling" << llendl;
414 S32 count = 0;
415 S32 client_id = 0;
416 const apr_pollfd_t* poll_fd = NULL;
417 apr_pollset_poll(mPollset, DEFAULT_POLL_TIMEOUT, &count, &poll_fd);
418 PUMP_DEBUG;
419 for(S32 i = 0; i < count; ++i)
420 {
421 client_id = *((S32*)poll_fd[i].client_data);
422 signalled_client.insert(client_id);
423 }
424 PUMP_DEBUG;
425 }
426
427 PUMP_DEBUG;
428 // set up for a check to see if each one was signalled
429 signal_client_t::iterator not_signalled = signalled_client.end();
430
431 // Process everything as appropriate
432 //lldebugs << "Running chain count: " << mRunningChains.size() << llendl;
433 running_chains_t::iterator run_chain = mRunningChains.begin();
434 bool process_this_chain = false;
435 for(; run_chain != mRunningChains.end(); )
436 {
437 PUMP_DEBUG;
438 if((*run_chain).mInit
439 && (*run_chain).mTimer.getStarted()
440 && (*run_chain).mTimer.hasExpired())
441 {
442 PUMP_DEBUG;
443 if(handleChainError(*run_chain, LLIOPipe::STATUS_EXPIRED))
444 {
445 // the pipe probably handled the error. If the handler
446 // forgot to reset the expiration then we need to do
447 // that here.
448 if((*run_chain).mTimer.getStarted()
449 && (*run_chain).mTimer.hasExpired())
450 {
451 PUMP_DEBUG;
452 llinfos << "Error handler forgot to reset timeout. "
453 << "Resetting to " << DEFAULT_CHAIN_EXPIRY_SECS
454 << " seconds." << llendl;
455 (*run_chain).setTimeoutSeconds(DEFAULT_CHAIN_EXPIRY_SECS);
456 }
457 }
458 else
459 {
460 PUMP_DEBUG;
461 // it timed out and no one handled it, so we need to
462 // retire the chain
463#if LL_DEBUG_PIPE_TYPE_IN_PUMP
464 lldebugs << "Removing chain "
465 << (*run_chain).mChainLinks[0].mPipe
466 << " '"
467 << typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
468 << "' because it timed out." << llendl;
469#else
470// lldebugs << "Removing chain "
471// << (*run_chain).mChainLinks[0].mPipe
472// << " because we reached the end." << llendl;
473#endif
474 mRunningChains.erase(run_chain++);
475 continue;
476 }
477 }
478 PUMP_DEBUG;
479 if((*run_chain).mLock)
480 {
481 ++run_chain;
482 continue;
483 }
484 PUMP_DEBUG;
485 mCurrentChain = run_chain;
486 if((*run_chain).mDescriptors.empty())
487 {
488 // if there are no conditionals, just process this chain.
489 process_this_chain = true;
490 //lldebugs << "no conditionals - processing" << llendl;
491 }
492 else
493 {
494 PUMP_DEBUG;
495 //lldebugs << "checking conditionals" << llendl;
496 // Check if this run chain was signalled. If any file
497 // descriptor is ready for something, then go ahead and
498 // process this chian.
499 process_this_chain = false;
500 if(!signalled_client.empty())
501 {
502 PUMP_DEBUG;
503 LLChainInfo::conditionals_t::iterator it;
504 it = (*run_chain).mDescriptors.begin();
505 LLChainInfo::conditionals_t::iterator end;
506 end = (*run_chain).mDescriptors.end();
507 S32 client_id = 0;
508 for(; it != end; ++it)
509 {
510 PUMP_DEBUG;
511 client_id = *((S32*)((*it).second.client_data));
512 if(signalled_client.find(client_id) != not_signalled)
513 {
514 process_this_chain = true;
515 break;
516 }
517 //llinfos << "no fd ready for this one." << llendl;
518 }
519 }
520 }
521 if(process_this_chain)
522 {
523 PUMP_DEBUG;
524 if(!((*run_chain).mInit))
525 {
526 (*run_chain).mHead = (*run_chain).mChainLinks.begin();
527 (*run_chain).mInit = true;
528 }
529 PUMP_DEBUG;
530 processChain(*run_chain);
531 }
532
533 PUMP_DEBUG;
534 if((*run_chain).mHead == (*run_chain).mChainLinks.end())
535 {
536#if LL_DEBUG_PIPE_TYPE_IN_PUMP
537 lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe
538 << " '"
539 << typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
540 << "' because we reached the end." << llendl;
541#else
542// lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe
543// << " because we reached the end." << llendl;
544#endif
545
546 PUMP_DEBUG;
547 // This chain is done. Clean up any allocated memory and
548 // erase the chain info.
549 std::for_each(
550 (*run_chain).mDescriptors.begin(),
551 (*run_chain).mDescriptors.end(),
552 ll_delete_apr_pollset_fd_client_data());
553 mRunningChains.erase(run_chain++);
554
555 // *NOTE: may not always need to rebuild the pollset.
556 mRebuildPollset = true;
557 }
558 else
559 {
560 PUMP_DEBUG;
561 // this chain needs more processing - just go to the next
562 // chain.
563 ++run_chain;
564 }
565 }
566
567 PUMP_DEBUG;
568 // null out the chain
569 mCurrentChain = current_chain_t();
570 END_PUMP_DEBUG;
571}
572
573//bool LLPumpIO::respond(const chain_t& pipes)
574//{
575//#if LL_THREADS_APR
576// LLScopedLock lock(mCallbackMutex);
577//#endif
578// LLChainInfo info;
579// links_t links;
580//
581// mPendingCallbacks.push_back(info);
582// return true;
583//}
584
585bool LLPumpIO::respond(LLIOPipe* pipe)
586{
587 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
588 if(NULL == pipe) return false;
589
590#if LL_THREADS_APR
591 LLScopedLock lock(mCallbackMutex);
592#endif
593 LLChainInfo info;
594 LLLinkInfo link;
595 link.mPipe = pipe;
596 info.mChainLinks.push_back(link);
597 mPendingCallbacks.push_back(info);
598 return true;
599}
600
601bool LLPumpIO::respond(
602 const links_t& links,
603 LLIOPipe::buffer_ptr_t data,
604 LLSD context)
605{
606 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
607 // if the caller is providing a full link description, we need to
608 // have that description matched to a particular buffer.
609 if(!data) return false;
610 if(links.empty()) return false;
611
612#if LL_THREADS_APR
613 LLScopedLock lock(mCallbackMutex);
614#endif
615
616 // Add the callback response
617 LLChainInfo info;
618 info.mChainLinks = links;
619 info.mData = data;
620 info.mContext = context;
621 mPendingCallbacks.push_back(info);
622 return true;
623}
624
625void LLPumpIO::callback()
626{
627 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
628 //llinfos << "LLPumpIO::callback()" << llendl;
629 if(true)
630 {
631#if LL_THREADS_APR
632 LLScopedLock lock(mCallbackMutex);
633#endif
634 std::copy(
635 mPendingCallbacks.begin(),
636 mPendingCallbacks.end(),
637 std::back_insert_iterator<callbacks_t>(mCallbacks));
638 mPendingCallbacks.clear();
639 }
640 if(!mCallbacks.empty())
641 {
642 callbacks_t::iterator it = mCallbacks.begin();
643 callbacks_t::iterator end = mCallbacks.end();
644 for(; it != end; ++it)
645 {
646 // it's always the first and last time for respone chains
647 (*it).mHead = (*it).mChainLinks.begin();
648 (*it).mInit = true;
649 (*it).mEOS = true;
650 processChain(*it);
651 }
652 mCallbacks.clear();
653 }
654}
655
656void LLPumpIO::control(LLPumpIO::EControl op)
657{
658#if LL_THREADS_APR
659 LLScopedLock lock(mChainsMutex);
660#endif
661 switch(op)
662 {
663 case PAUSE:
664 mState = PAUSING;
665 break;
666 case RESUME:
667 mState = NORMAL;
668 break;
669 default:
670 // no-op
671 break;
672 }
673}
674
675void LLPumpIO::initialize(apr_pool_t* pool)
676{
677 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
678 if(!pool) return;
679#if LL_THREADS_APR
680 apr_thread_mutex_create(&mChainsMutex, APR_THREAD_MUTEX_DEFAULT, pool);
681 apr_thread_mutex_create(&mCallbackMutex, APR_THREAD_MUTEX_DEFAULT, pool);
682#endif
683 mPool = pool;
684}
685
686void LLPumpIO::cleanup()
687{
688 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
689#if LL_THREADS_APR
690 if(mChainsMutex) apr_thread_mutex_destroy(mChainsMutex);
691 if(mCallbackMutex) apr_thread_mutex_destroy(mCallbackMutex);
692#endif
693 mChainsMutex = NULL;
694 mCallbackMutex = NULL;
695 if(mPollset)
696 {
697// lldebugs << "cleaning up pollset" << llendl;
698 apr_pollset_destroy(mPollset);
699 mPollset = NULL;
700 }
701 if(mCurrentPool)
702 {
703 apr_pool_destroy(mCurrentPool);
704 mCurrentPool = NULL;
705 }
706 mPool = NULL;
707}
708
709void LLPumpIO::rebuildPollset()
710{
711 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
712// lldebugs << "LLPumpIO::rebuildPollset()" << llendl;
713 if(mPollset)
714 {
715 //lldebugs << "destroying pollset" << llendl;
716 apr_pollset_destroy(mPollset);
717 mPollset = NULL;
718 }
719 U32 size = 0;
720 running_chains_t::iterator run_it = mRunningChains.begin();
721 running_chains_t::iterator run_end = mRunningChains.end();
722 for(; run_it != run_end; ++run_it)
723 {
724 size += (*run_it).mDescriptors.size();
725 }
726 //lldebugs << "found " << size << " descriptors." << llendl;
727 if(size)
728 {
729 // Recycle the memory pool
730 const S32 POLLSET_POOL_RECYCLE_COUNT = 100;
731 if(mCurrentPool
732 && (0 == (++mCurrentPoolReallocCount % POLLSET_POOL_RECYCLE_COUNT)))
733 {
734 apr_pool_destroy(mCurrentPool);
735 mCurrentPool = NULL;
736 mCurrentPoolReallocCount = 0;
737 }
738 if(!mCurrentPool)
739 {
740 apr_status_t status = apr_pool_create(&mCurrentPool, mPool);
741 (void)ll_apr_warn_status(status);
742 }
743
744 // add all of the file descriptors
745 run_it = mRunningChains.begin();
746 LLChainInfo::conditionals_t::iterator fd_it;
747 LLChainInfo::conditionals_t::iterator fd_end;
748 apr_pollset_create(&mPollset, size, mCurrentPool, 0);
749 for(; run_it != run_end; ++run_it)
750 {
751 fd_it = (*run_it).mDescriptors.begin();
752 fd_end = (*run_it).mDescriptors.end();
753 for(; fd_it != fd_end; ++fd_it)
754 {
755 apr_pollset_add(mPollset, &((*fd_it).second));
756 }
757 }
758 }
759}
760
761void LLPumpIO::processChain(LLChainInfo& chain)
762{
763 PUMP_DEBUG;
764 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
765 LLIOPipe::EStatus status = LLIOPipe::STATUS_OK;
766 links_t::iterator it = chain.mHead;
767 links_t::iterator end = chain.mChainLinks.end();
768 bool need_process_signaled = false;
769 bool keep_going = true;
770 do
771 {
772#if LL_DEBUG_PROCESS_LINK
773#if LL_DEBUG_PIPE_TYPE_IN_PUMP
774 llinfos << "Processing " << typeid(*((*it).mPipe)).name() << "."
775 << llendl;
776#else
777 llinfos << "Processing link " << (*it).mPipe << "." << llendl;
778#endif
779#endif
780#if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN
781 if(chain.mData)
782 {
783 char* buf = NULL;
784 S32 bytes = chain.mData->countAfter((*it).mChannels.in(), NULL);
785 if(bytes)
786 {
787 buf = new char[bytes + 1];
788 chain.mData->readAfter(
789 (*it).mChannels.in(),
790 NULL,
791 (U8*)buf,
792 bytes);
793 buf[bytes] = '\0';
794 llinfos << "CHANNEL IN(" << (*it).mChannels.in() << "): "
795 << buf << llendl;
796 delete[] buf;
797 buf = NULL;
798 }
799 else
800 {
801 llinfos << "CHANNEL IN(" << (*it).mChannels.in()<< "): (null)"
802 << llendl;
803 }
804 }
805#endif
806 PUMP_DEBUG;
807 status = (*it).mPipe->process(
808 (*it).mChannels,
809 chain.mData,
810 chain.mEOS,
811 chain.mContext,
812 this);
813#if LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT
814 if(chain.mData)
815 {
816 char* buf = NULL;
817 S32 bytes = chain.mData->countAfter((*it).mChannels.out(), NULL);
818 if(bytes)
819 {
820 buf = new char[bytes + 1];
821 chain.mData->readAfter(
822 (*it).mChannels.out(),
823 NULL,
824 (U8*)buf,
825 bytes);
826 buf[bytes] = '\0';
827 llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): "
828 << buf << llendl;
829 delete[] buf;
830 buf = NULL;
831 }
832 else
833 {
834 llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): (null)"
835 << llendl;
836 }
837 }
838#endif
839
840#if LL_DEBUG_PROCESS_RETURN_VALUE
841 // Only bother with the success codes - error codes are logged
842 // below.
843 if(LLIOPipe::isSuccess(status))
844 {
845 llinfos << "Pipe returned: '"
846#if LL_DEBUG_PIPE_TYPE_IN_PUMP
847 << typeid(*((*it).mPipe)).name() << "':'"
848#endif
849 << LLIOPipe::lookupStatusString(status) << "'" << llendl;
850 }
851#endif
852
853 PUMP_DEBUG;
854 switch(status)
855 {
856 case LLIOPipe::STATUS_OK:
857 // no-op
858 break;
859 case LLIOPipe::STATUS_STOP:
860 PUMP_DEBUG;
861 status = LLIOPipe::STATUS_OK;
862 chain.mHead = end;
863 keep_going = false;
864 break;
865 case LLIOPipe::STATUS_DONE:
866 PUMP_DEBUG;
867 status = LLIOPipe::STATUS_OK;
868 chain.mHead = (it + 1);
869 chain.mEOS = true;
870 break;
871 case LLIOPipe::STATUS_BREAK:
872 PUMP_DEBUG;
873 status = LLIOPipe::STATUS_OK;
874 keep_going = false;
875 break;
876 case LLIOPipe::STATUS_NEED_PROCESS:
877 PUMP_DEBUG;
878 status = LLIOPipe::STATUS_OK;
879 if(!need_process_signaled)
880 {
881 need_process_signaled = true;
882 chain.mHead = it;
883 }
884 break;
885 default:
886 PUMP_DEBUG;
887 if(LLIOPipe::isError(status))
888 {
889 llinfos << "Pump generated pipe error: '"
890#if LL_DEBUG_PIPE_TYPE_IN_PUMP
891 << typeid(*((*it).mPipe)).name() << "':'"
892#endif
893 << LLIOPipe::lookupStatusString(status)
894 << "'" << llendl;
895#if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR
896 if(chain.mData)
897 {
898 char* buf = NULL;
899 S32 bytes = chain.mData->countAfter(
900 (*it).mChannels.in(),
901 NULL);
902 if(bytes)
903 {
904 buf = new char[bytes + 1];
905 chain.mData->readAfter(
906 (*it).mChannels.in(),
907 NULL,
908 (U8*)buf,
909 bytes);
910 buf[bytes] = '\0';
911 llinfos << "Input After Error: " << buf << llendl;
912 delete[] buf;
913 buf = NULL;
914 }
915 else
916 {
917 llinfos << "Input After Error: (null)" << llendl;
918 }
919 }
920 else
921 {
922 llinfos << "Input After Error: (null)" << llendl;
923 }
924#endif
925 keep_going = false;
926 chain.mHead = it;
927 if(!handleChainError(chain, status))
928 {
929 chain.mHead = end;
930 }
931 }
932 else
933 {
934 llinfos << "Unhandled status code: " << status << ":"
935 << LLIOPipe::lookupStatusString(status) << llendl;
936 }
937 break;
938 }
939 PUMP_DEBUG;
940 } while(keep_going && (++it != end));
941 PUMP_DEBUG;
942}
943
944bool LLPumpIO::handleChainError(
945 LLChainInfo& chain,
946 LLIOPipe::EStatus error)
947{
948 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
949 links_t::reverse_iterator rit;
950 if(chain.mHead == chain.mChainLinks.end())
951 {
952 rit = links_t::reverse_iterator(chain.mHead);
953 }
954 else
955 {
956 rit = links_t::reverse_iterator(chain.mHead + 1);
957 }
958
959 links_t::reverse_iterator rend = chain.mChainLinks.rend();
960 bool handled = false;
961 bool keep_going = true;
962 do
963 {
964#if LL_DEBUG_PIPE_TYPE_IN_PUMP
965 lldebugs << "Passing error to " << typeid(*((*rit).mPipe)).name()
966 << "." << llendl;
967#endif
968 error = (*rit).mPipe->handleError(error, this);
969 switch(error)
970 {
971 case LLIOPipe::STATUS_OK:
972 handled = true;
973 chain.mHead = rit.base();
974 break;
975 case LLIOPipe::STATUS_STOP:
976 case LLIOPipe::STATUS_DONE:
977 case LLIOPipe::STATUS_BREAK:
978 case LLIOPipe::STATUS_NEED_PROCESS:
979#if LL_DEBUG_PIPE_TYPE_IN_PUMP
980 lldebugs << "Pipe " << typeid(*((*rit).mPipe)).name()
981 << " returned code to stop error handler." << llendl;
982#endif
983 keep_going = false;
984 break;
985 default:
986 if(LLIOPipe::isSuccess(error))
987 {
988 llinfos << "Unhandled status code: " << error << ":"
989 << LLIOPipe::lookupStatusString(error) << llendl;
990 error = LLIOPipe::STATUS_ERROR;
991 keep_going = false;
992 }
993 break;
994 }
995 } while(keep_going && !handled && (++rit != rend));
996 return handled;
997}
998
999/**
1000 * LLPumpIO::LLChainInfo
1001 */
1002
1003LLPumpIO::LLChainInfo::LLChainInfo() :
1004 mInit(false),
1005 mLock(0),
1006 mEOS(false)
1007{
1008 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
1009 mTimer.setTimerExpirySec(DEFAULT_CHAIN_EXPIRY_SECS);
1010}
1011
1012void LLPumpIO::LLChainInfo::setTimeoutSeconds(F32 timeout)
1013{
1014 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
1015 if(timeout > 0.0f)
1016 {
1017 mTimer.start();
1018 mTimer.reset();
1019 mTimer.setTimerExpirySec(timeout);
1020 }
1021 else
1022 {
1023 mTimer.stop();
1024 }
1025}