/** * @file llpumpio.cpp * @author Phoenix * @date 2004-11-21 * @brief Implementation of the i/o pump and related functions. * * $LicenseInfo:firstyear=2004&license=viewergpl$ * * Copyright (c) 2004-2008, Linden Research, Inc. * * Second Life Viewer Source Code * The source code in this file ("Source Code") is provided by Linden Lab * to you under the terms of the GNU General Public License, version 2.0 * ("GPL"), unless you have obtained a separate licensing agreement * ("Other License"), formally executed by you and Linden Lab. Terms of * the GPL can be found in doc/GPL-license.txt in this distribution, or * online at http://secondlifegrid.net/programs/open_source/licensing/gplv2 * * There are special exceptions to the terms and conditions of the GPL as * it is applied to this Source Code. View the full text of the exception * in the file doc/FLOSS-exception.txt in this software distribution, or * online at http://secondlifegrid.net/programs/open_source/licensing/flossexception * * By copying, modifying or distributing this software, you acknowledge * that you have read and understood your obligations described above, * and agree to abide by those obligations. * * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY, * COMPLETENESS OR PERFORMANCE. * $/LicenseInfo$ */ #include "linden_common.h" #include "llpumpio.h" #include #include #include "apr_poll.h" #include "llapr.h" #include "llmemtype.h" #include "llstl.h" #include "llstat.h" // These should not be enabled in production, but they can be // intensely useful during development for finding certain kinds of // bugs. #if LL_LINUX //#define LL_DEBUG_PIPE_TYPE_IN_PUMP 1 //#define LL_DEBUG_POLL_FILE_DESCRIPTORS 1 #if LL_DEBUG_POLL_FILE_DESCRIPTORS #include "apr_portable.h" #endif #endif #if LL_DEBUG_PIPE_TYPE_IN_PUMP #include #endif // constants for poll timeout. if we are threading, we want to have a // longer poll timeout. #if LL_THREADS_APR static const S32 DEFAULT_POLL_TIMEOUT = 1000; #else static const S32 DEFAULT_POLL_TIMEOUT = 0; #endif // The default (and fallback) expiration time for chains const F32 DEFAULT_CHAIN_EXPIRY_SECS = 30.0f; extern const F32 SHORT_CHAIN_EXPIRY_SECS = 1.0f; extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f; // sorta spammy debug modes. //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR 1 //#define LL_DEBUG_PROCESS_LINK 1 //#define LL_DEBUG_PROCESS_RETURN_VALUE 1 // Super spammy debug mode. //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 1 //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 1 // // local functions // void ll_debug_poll_fd(const char* msg, const apr_pollfd_t* poll) { #if LL_DEBUG_POLL_FILE_DESCRIPTORS if(!poll) { lldebugs << "Poll -- " << (msg?msg:"") << ": no pollfd." << llendl; return; } if(poll->desc.s) { apr_os_sock_t os_sock; if(APR_SUCCESS == apr_os_sock_get(&os_sock, poll->desc.s)) { lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_sock << " at " << poll->desc.s << llendl; } else { lldebugs << "Poll -- " << (msg?msg:"") << " no fd " << " at " << poll->desc.s << llendl; } } else if(poll->desc.f) { apr_os_file_t os_file; if(APR_SUCCESS == apr_os_file_get(&os_file, poll->desc.f)) { lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_file << " at " << poll->desc.f << llendl; } else { lldebugs << "Poll -- " << (msg?msg:"") << " no fd " << " at " << poll->desc.f << llendl; } } else { lldebugs << "Poll -- " << (msg?msg:"") << ": no descriptor." << llendl; } #endif } /** * @class */ class LLChainSleeper : public LLRunnable { public: static LLRunner::run_ptr_t build(LLPumpIO* pump, S32 key) { return LLRunner::run_ptr_t(new LLChainSleeper(pump, key)); } virtual void run(LLRunner* runner, S64 handle) { mPump->clearLock(mKey); } protected: LLChainSleeper(LLPumpIO* pump, S32 key) : mPump(pump), mKey(key) {} LLPumpIO* mPump; S32 mKey; }; /** * @struct ll_delete_apr_pollset_fd_client_data * @brief This is a simple helper class to clean up our client data. */ struct ll_delete_apr_pollset_fd_client_data { typedef std::pair pipe_conditional_t; void operator()(const pipe_conditional_t& conditional) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); S32* client_id = (S32*)conditional.second.client_data; delete client_id; } }; /** * LLPumpIO */ LLPumpIO::LLPumpIO(apr_pool_t* pool) : mState(LLPumpIO::NORMAL), mRebuildPollset(false), mPollset(NULL), mPollsetClientID(0), mNextLock(0), mPool(NULL), mCurrentPool(NULL), mCurrentPoolReallocCount(0), mChainsMutex(NULL), mCallbackMutex(NULL), mCurrentChain(mRunningChains.end()) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); initialize(pool); } LLPumpIO::~LLPumpIO() { LLMemType m1(LLMemType::MTYPE_IO_PUMP); cleanup(); } bool LLPumpIO::prime(apr_pool_t* pool) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); cleanup(); initialize(pool); return ((pool == NULL) ? false : true); } bool LLPumpIO::addChain(const chain_t& chain, F32 timeout) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); if(chain.empty()) return false; #if LL_THREADS_APR LLScopedLock lock(mChainsMutex); #endif LLChainInfo info; info.setTimeoutSeconds(timeout); info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray); LLLinkInfo link; #if LL_DEBUG_PIPE_TYPE_IN_PUMP lldebugs << "LLPumpIO::addChain() " << chain[0] << " '" << typeid(*(chain[0])).name() << "'" << llendl; #else lldebugs << "LLPumpIO::addChain() " << chain[0] <nextChannel(); info.mChainLinks.push_back(link); } mPendingChains.push_back(info); return true; } bool LLPumpIO::addChain( const LLPumpIO::links_t& links, LLIOPipe::buffer_ptr_t data, LLSD context, F32 timeout) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); // remember that if the caller is providing a full link // description, we need to have that description matched to a // particular buffer. if(!data) return false; if(links.empty()) return false; #if LL_THREADS_APR LLScopedLock lock(mChainsMutex); #endif #if LL_DEBUG_PIPE_TYPE_IN_PUMP lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << " '" << typeid(*(links[0].mPipe)).name() << "'" << llendl; #else lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << llendl; #endif LLChainInfo info; info.setTimeoutSeconds(timeout); info.mChainLinks = links; info.mData = data; info.mContext = context; mPendingChains.push_back(info); return true; } bool LLPumpIO::setTimeoutSeconds(F32 timeout) { // If no chain is running, return failure. if(mRunningChains.end() == mCurrentChain) { return false; } (*mCurrentChain).setTimeoutSeconds(timeout); return true; } void LLPumpIO::adjustTimeoutSeconds(F32 delta) { // If no chain is running, bail if(mRunningChains.end() == mCurrentChain) { return; } (*mCurrentChain).adjustTimeoutSeconds(delta); } static std::string events_2_string(apr_int16_t events) { std::ostringstream ostr; if(events & APR_POLLIN) { ostr << "read,"; } if(events & APR_POLLPRI) { ostr << "priority,"; } if(events & APR_POLLOUT) { ostr << "write,"; } if(events & APR_POLLERR) { ostr << "error,"; } if(events & APR_POLLHUP) { ostr << "hangup,"; } if(events & APR_POLLNVAL) { ostr << "invalid,"; } return chop_tail_copy(ostr.str(), 1); } bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); if(!pipe) return false; ll_debug_poll_fd("Set conditional", poll); lldebugs << "Setting conditionals (" << (poll ? events_2_string(poll->reqevents) :"null") << ") " #if LL_DEBUG_PIPE_TYPE_IN_PUMP << "on pipe " << typeid(*pipe).name() #endif << " at " << pipe << llendl; // remove any matching poll file descriptors for this pipe. LLIOPipe::ptr_t pipe_ptr(pipe); LLChainInfo::conditionals_t::iterator it; it = (*mCurrentChain).mDescriptors.begin(); while(it != (*mCurrentChain).mDescriptors.end()) { LLChainInfo::pipe_conditional_t& value = (*it); if(pipe_ptr == value.first) { ll_delete_apr_pollset_fd_client_data()(value); it = (*mCurrentChain).mDescriptors.erase(it); mRebuildPollset = true; } else { ++it; } } if(!poll) { mRebuildPollset = true; return true; } LLChainInfo::pipe_conditional_t value; value.first = pipe_ptr; value.second = *poll; value.second.rtnevents = 0; if(!poll->p) { // each fd needs a pool to work with, so if one was // not specified, use this pool. // *FIX: Should it always be this pool? value.second.p = mPool; } value.second.client_data = new S32(++mPollsetClientID); (*mCurrentChain).mDescriptors.push_back(value); mRebuildPollset = true; return true; } S32 LLPumpIO::setLock() { // *NOTE: I do not think it is necessary to acquire a mutex here // since this should only be called during the pump(), and should // only change the running chain. Any other use of this method is // incorrect usage. If it becomes necessary to acquire a lock // here, be sure to lock here and call a protected method to get // the lock, and sleepChain() should probably acquire the same // lock while and calling the same protected implementation to // lock the runner at the same time. // If no chain is running, return failure. if(mRunningChains.end() == mCurrentChain) { return 0; } // deal with wrap. if(++mNextLock <= 0) { mNextLock = 1; } // set the lock (*mCurrentChain).mLock = mNextLock; return mNextLock; } void LLPumpIO::clearLock(S32 key) { // We need to lock it here since we do not want to be iterating // over the chains twice. We can safely call process() while this // is happening since we should not be erasing a locked pipe, and // therefore won't be treading into deleted memory. I think we can // also clear the lock on the chain safely since the pump only // reads that value. #if LL_THREADS_APR LLScopedLock lock(mChainsMutex); #endif mClearLocks.insert(key); } bool LLPumpIO::sleepChain(F64 seconds) { // Much like the call to setLock(), this should only be called // from one chain during processing, so there is no need to // acquire a mutex. if(seconds <= 0.0) return false; S32 key = setLock(); if(!key) return false; LLRunner::run_handle_t handle = mRunner.addRunnable( LLChainSleeper::build(this, key), LLRunner::RUN_IN, seconds); if(0 == handle) return false; return true; } bool LLPumpIO::copyCurrentLinkInfo(links_t& links) const { LLMemType m1(LLMemType::MTYPE_IO_PUMP); if(mRunningChains.end() == mCurrentChain) { return false; } std::copy( (*mCurrentChain).mChainLinks.begin(), (*mCurrentChain).mChainLinks.end(), std::back_insert_iterator(links)); return true; } void LLPumpIO::pump() { pump(DEFAULT_POLL_TIMEOUT); } //timeout is in microseconds void LLPumpIO::pump(const S32& poll_timeout) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); LLFastTimer t1(LLFastTimer::FTM_PUMP); //llinfos << "LLPumpIO::pump()" << llendl; // Run any pending runners. mRunner.run(); // We need to move all of the pending heads over to the running // chains. PUMP_DEBUG; if(true) { #if LL_THREADS_APR LLScopedLock lock(mChainsMutex); #endif // bail if this pump is paused. if(PAUSING == mState) { mState = PAUSED; } if(PAUSED == mState) { return; } PUMP_DEBUG; // Move the pending chains over to the running chaings if(!mPendingChains.empty()) { PUMP_DEBUG; //lldebugs << "Pushing " << mPendingChains.size() << "." << llendl; std::copy( mPendingChains.begin(), mPendingChains.end(), std::back_insert_iterator(mRunningChains)); mPendingChains.clear(); PUMP_DEBUG; } // Clear any locks. This needs to be done here so that we do // not clash during a call to clearLock(). if(!mClearLocks.empty()) { PUMP_DEBUG; running_chains_t::iterator it = mRunningChains.begin(); running_chains_t::iterator end = mRunningChains.end(); std::set::iterator not_cleared = mClearLocks.end(); for(; it != end; ++it) { if((*it).mLock && mClearLocks.find((*it).mLock) != not_cleared) { (*it).mLock = 0; } } PUMP_DEBUG; mClearLocks.clear(); } } PUMP_DEBUG; // rebuild the pollset if necessary if(mRebuildPollset) { PUMP_DEBUG; rebuildPollset(); mRebuildPollset = false; } // Poll based on the last known pollset // *TODO: may want to pass in a poll timeout so it works correctly // in single and multi threaded processes. PUMP_DEBUG; typedef std::map signal_client_t; signal_client_t signalled_client; const apr_pollfd_t* poll_fd = NULL; if(mPollset) { PUMP_DEBUG; //llinfos << "polling" << llendl; S32 count = 0; S32 client_id = 0; { LLPerfBlock polltime("pump_poll"); apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd); } PUMP_DEBUG; for(S32 ii = 0; ii < count; ++ii) { ll_debug_poll_fd("Signalled pipe", &poll_fd[ii]); client_id = *((S32*)poll_fd[ii].client_data); signalled_client[client_id] = ii; } PUMP_DEBUG; } PUMP_DEBUG; // set up for a check to see if each one was signalled signal_client_t::iterator not_signalled = signalled_client.end(); // Process everything as appropriate //lldebugs << "Running chain count: " << mRunningChains.size() << llendl; running_chains_t::iterator run_chain = mRunningChains.begin(); bool process_this_chain = false; for(; run_chain != mRunningChains.end(); ) { PUMP_DEBUG; if((*run_chain).mInit && (*run_chain).mTimer.getStarted() && (*run_chain).mTimer.hasExpired()) { PUMP_DEBUG; if(handleChainError(*run_chain, LLIOPipe::STATUS_EXPIRED)) { // the pipe probably handled the error. If the handler // forgot to reset the expiration then we need to do // that here. if((*run_chain).mTimer.getStarted() && (*run_chain).mTimer.hasExpired()) { PUMP_DEBUG; llinfos << "Error handler forgot to reset timeout. " << "Resetting to " << DEFAULT_CHAIN_EXPIRY_SECS << " seconds." << llendl; (*run_chain).setTimeoutSeconds(DEFAULT_CHAIN_EXPIRY_SECS); } } else { PUMP_DEBUG; // it timed out and no one handled it, so we need to // retire the chain #if LL_DEBUG_PIPE_TYPE_IN_PUMP lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe << " '" << typeid(*((*run_chain).mChainLinks[0].mPipe)).name() << "' because it timed out." << llendl; #else // lldebugs << "Removing chain " // << (*run_chain).mChainLinks[0].mPipe // << " because we reached the end." << llendl; #endif run_chain = mRunningChains.erase(run_chain); continue; } } PUMP_DEBUG; if((*run_chain).mLock) { ++run_chain; continue; } PUMP_DEBUG; mCurrentChain = run_chain; if((*run_chain).mDescriptors.empty()) { // if there are no conditionals, just process this chain. process_this_chain = true; //lldebugs << "no conditionals - processing" << llendl; } else { PUMP_DEBUG; //lldebugs << "checking conditionals" << llendl; // Check if this run chain was signalled. If any file // descriptor is ready for something, then go ahead and // process this chian. process_this_chain = false; if(!signalled_client.empty()) { PUMP_DEBUG; LLChainInfo::conditionals_t::iterator it; it = (*run_chain).mDescriptors.begin(); LLChainInfo::conditionals_t::iterator end; end = (*run_chain).mDescriptors.end(); S32 client_id = 0; signal_client_t::iterator signal; for(; it != end; ++it) { PUMP_DEBUG; client_id = *((S32*)((*it).second.client_data)); signal = signalled_client.find(client_id); if (signal == not_signalled) continue; static const apr_int16_t POLL_CHAIN_ERROR = APR_POLLHUP | APR_POLLNVAL | APR_POLLERR; const apr_pollfd_t* poll = &(poll_fd[(*signal).second]); if(poll->rtnevents & POLL_CHAIN_ERROR) { // Potential eror condition has been // returned. If HUP was one of them, we pass // that as the error even though there may be // more. If there are in fact more errors, // we'll just wait for that detection until // the next pump() cycle to catch it so that // the logic here gets no more strained than // it already is. LLIOPipe::EStatus error_status; if(poll->rtnevents & APR_POLLHUP) error_status = LLIOPipe::STATUS_LOST_CONNECTION; else error_status = LLIOPipe::STATUS_ERROR; if(handleChainError(*run_chain, error_status)) break; ll_debug_poll_fd("Removing pipe", poll); llwarns << "Removing pipe " << (*run_chain).mChainLinks[0].mPipe << " '" #if LL_DEBUG_PIPE_TYPE_IN_PUMP << typeid( *((*run_chain).mChainLinks[0].mPipe)).name() #endif << "' because: " << events_2_string(poll->rtnevents) << llendl; (*run_chain).mHead = (*run_chain).mChainLinks.end(); break; } // at least 1 fd got signalled, and there were no // errors. That means we process this chain. process_this_chain = true; break; } } } if(process_this_chain) { PUMP_DEBUG; if(!((*run_chain).mInit)) { (*run_chain).mHead = (*run_chain).mChainLinks.begin(); (*run_chain).mInit = true; } PUMP_DEBUG; processChain(*run_chain); } PUMP_DEBUG; if((*run_chain).mHead == (*run_chain).mChainLinks.end()) { #if LL_DEBUG_PIPE_TYPE_IN_PUMP lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe << " '" << typeid(*((*run_chain).mChainLinks[0].mPipe)).name() << "' because we reached the end." << llendl; #else // lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe // << " because we reached the end." << llendl; #endif PUMP_DEBUG; // This chain is done. Clean up any allocated memory and // erase the chain info. std::for_each( (*run_chain).mDescriptors.begin(), (*run_chain).mDescriptors.end(), ll_delete_apr_pollset_fd_client_data()); run_chain = mRunningChains.erase(run_chain); // *NOTE: may not always need to rebuild the pollset. mRebuildPollset = true; } else { PUMP_DEBUG; // this chain needs more processing - just go to the next // chain. ++run_chain; } } PUMP_DEBUG; // null out the chain mCurrentChain = mRunningChains.end(); END_PUMP_DEBUG; } //bool LLPumpIO::respond(const chain_t& pipes) //{ //#if LL_THREADS_APR // LLScopedLock lock(mCallbackMutex); //#endif // LLChainInfo info; // links_t links; // // mPendingCallbacks.push_back(info); // return true; //} bool LLPumpIO::respond(LLIOPipe* pipe) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); if(NULL == pipe) return false; #if LL_THREADS_APR LLScopedLock lock(mCallbackMutex); #endif LLChainInfo info; LLLinkInfo link; link.mPipe = pipe; info.mChainLinks.push_back(link); mPendingCallbacks.push_back(info); return true; } bool LLPumpIO::respond( const links_t& links, LLIOPipe::buffer_ptr_t data, LLSD context) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); // if the caller is providing a full link description, we need to // have that description matched to a particular buffer. if(!data) return false; if(links.empty()) return false; #if LL_THREADS_APR LLScopedLock lock(mCallbackMutex); #endif // Add the callback response LLChainInfo info; info.mChainLinks = links; info.mData = data; info.mContext = context; mPendingCallbacks.push_back(info); return true; } void LLPumpIO::callback() { LLMemType m1(LLMemType::MTYPE_IO_PUMP); //llinfos << "LLPumpIO::callback()" << llendl; if(true) { #if LL_THREADS_APR LLScopedLock lock(mCallbackMutex); #endif std::copy( mPendingCallbacks.begin(), mPendingCallbacks.end(), std::back_insert_iterator(mCallbacks)); mPendingCallbacks.clear(); } if(!mCallbacks.empty()) { callbacks_t::iterator it = mCallbacks.begin(); callbacks_t::iterator end = mCallbacks.end(); for(; it != end; ++it) { // it's always the first and last time for respone chains (*it).mHead = (*it).mChainLinks.begin(); (*it).mInit = true; (*it).mEOS = true; processChain(*it); } mCallbacks.clear(); } } void LLPumpIO::control(LLPumpIO::EControl op) { #if LL_THREADS_APR LLScopedLock lock(mChainsMutex); #endif switch(op) { case PAUSE: mState = PAUSING; break; case RESUME: mState = NORMAL; break; default: // no-op break; } } void LLPumpIO::initialize(apr_pool_t* pool) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); if(!pool) return; #if LL_THREADS_APR // SJB: Windows defaults to NESTED and OSX defaults to UNNESTED, so use UNNESTED explicitly. apr_thread_mutex_create(&mChainsMutex, APR_THREAD_MUTEX_UNNESTED, pool); apr_thread_mutex_create(&mCallbackMutex, APR_THREAD_MUTEX_UNNESTED, pool); #endif mPool = pool; } void LLPumpIO::cleanup() { LLMemType m1(LLMemType::MTYPE_IO_PUMP); #if LL_THREADS_APR if(mChainsMutex) apr_thread_mutex_destroy(mChainsMutex); if(mCallbackMutex) apr_thread_mutex_destroy(mCallbackMutex); #endif mChainsMutex = NULL; mCallbackMutex = NULL; if(mPollset) { // lldebugs << "cleaning up pollset" << llendl; apr_pollset_destroy(mPollset); mPollset = NULL; } if(mCurrentPool) { apr_pool_destroy(mCurrentPool); mCurrentPool = NULL; } mPool = NULL; } void LLPumpIO::rebuildPollset() { LLMemType m1(LLMemType::MTYPE_IO_PUMP); // lldebugs << "LLPumpIO::rebuildPollset()" << llendl; if(mPollset) { //lldebugs << "destroying pollset" << llendl; apr_pollset_destroy(mPollset); mPollset = NULL; } U32 size = 0; running_chains_t::iterator run_it = mRunningChains.begin(); running_chains_t::iterator run_end = mRunningChains.end(); for(; run_it != run_end; ++run_it) { size += (*run_it).mDescriptors.size(); } //lldebugs << "found " << size << " descriptors." << llendl; if(size) { // Recycle the memory pool const S32 POLLSET_POOL_RECYCLE_COUNT = 100; if(mCurrentPool && (0 == (++mCurrentPoolReallocCount % POLLSET_POOL_RECYCLE_COUNT))) { apr_pool_destroy(mCurrentPool); mCurrentPool = NULL; mCurrentPoolReallocCount = 0; } if(!mCurrentPool) { apr_status_t status = apr_pool_create(&mCurrentPool, mPool); (void)ll_apr_warn_status(status); } // add all of the file descriptors run_it = mRunningChains.begin(); LLChainInfo::conditionals_t::iterator fd_it; LLChainInfo::conditionals_t::iterator fd_end; apr_pollset_create(&mPollset, size, mCurrentPool, 0); for(; run_it != run_end; ++run_it) { fd_it = (*run_it).mDescriptors.begin(); fd_end = (*run_it).mDescriptors.end(); for(; fd_it != fd_end; ++fd_it) { apr_pollset_add(mPollset, &((*fd_it).second)); } } } } void LLPumpIO::processChain(LLChainInfo& chain) { PUMP_DEBUG; LLMemType m1(LLMemType::MTYPE_IO_PUMP); LLIOPipe::EStatus status = LLIOPipe::STATUS_OK; links_t::iterator it = chain.mHead; links_t::iterator end = chain.mChainLinks.end(); bool need_process_signaled = false; bool keep_going = true; do { #if LL_DEBUG_PROCESS_LINK #if LL_DEBUG_PIPE_TYPE_IN_PUMP llinfos << "Processing " << typeid(*((*it).mPipe)).name() << "." << llendl; #else llinfos << "Processing link " << (*it).mPipe << "." << llendl; #endif #endif #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN if(chain.mData) { char* buf = NULL; S32 bytes = chain.mData->countAfter((*it).mChannels.in(), NULL); if(bytes) { buf = new char[bytes + 1]; chain.mData->readAfter( (*it).mChannels.in(), NULL, (U8*)buf, bytes); buf[bytes] = '\0'; llinfos << "CHANNEL IN(" << (*it).mChannels.in() << "): " << buf << llendl; delete[] buf; buf = NULL; } else { llinfos << "CHANNEL IN(" << (*it).mChannels.in()<< "): (null)" << llendl; } } #endif PUMP_DEBUG; status = (*it).mPipe->process( (*it).mChannels, chain.mData, chain.mEOS, chain.mContext, this); #if LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT if(chain.mData) { char* buf = NULL; S32 bytes = chain.mData->countAfter((*it).mChannels.out(), NULL); if(bytes) { buf = new char[bytes + 1]; chain.mData->readAfter( (*it).mChannels.out(), NULL, (U8*)buf, bytes); buf[bytes] = '\0'; llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): " << buf << llendl; delete[] buf; buf = NULL; } else { llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): (null)" << llendl; } } #endif #if LL_DEBUG_PROCESS_RETURN_VALUE // Only bother with the success codes - error codes are logged // below. if(LLIOPipe::isSuccess(status)) { llinfos << "Pipe returned: '" #if LL_DEBUG_PIPE_TYPE_IN_PUMP << typeid(*((*it).mPipe)).name() << "':'" #endif << LLIOPipe::lookupStatusString(status) << "'" << llendl; } #endif PUMP_DEBUG; switch(status) { case LLIOPipe::STATUS_OK: // no-op break; case LLIOPipe::STATUS_STOP: PUMP_DEBUG; status = LLIOPipe::STATUS_OK; chain.mHead = end; keep_going = false; break; case LLIOPipe::STATUS_DONE: PUMP_DEBUG; status = LLIOPipe::STATUS_OK; chain.mHead = (it + 1); chain.mEOS = true; break; case LLIOPipe::STATUS_BREAK: PUMP_DEBUG; status = LLIOPipe::STATUS_OK; keep_going = false; break; case LLIOPipe::STATUS_NEED_PROCESS: PUMP_DEBUG; status = LLIOPipe::STATUS_OK; if(!need_process_signaled) { need_process_signaled = true; chain.mHead = it; } break; default: PUMP_DEBUG; if(LLIOPipe::isError(status)) { llinfos << "Pump generated pipe err: '" #if LL_DEBUG_PIPE_TYPE_IN_PUMP << typeid(*((*it).mPipe)).name() << "':'" #endif << LLIOPipe::lookupStatusString(status) << "'" << llendl; #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR if(chain.mData) { char* buf = NULL; S32 bytes = chain.mData->countAfter( (*it).mChannels.in(), NULL); if(bytes) { buf = new char[bytes + 1]; chain.mData->readAfter( (*it).mChannels.in(), NULL, (U8*)buf, bytes); buf[bytes] = '\0'; llinfos << "Input After Error: " << buf << llendl; delete[] buf; buf = NULL; } else { llinfos << "Input After Error: (null)" << llendl; } } else { llinfos << "Input After Error: (null)" << llendl; } #endif keep_going = false; chain.mHead = it; if(!handleChainError(chain, status)) { chain.mHead = end; } } else { llinfos << "Unhandled status code: " << status << ":" << LLIOPipe::lookupStatusString(status) << llendl; } break; } PUMP_DEBUG; } while(keep_going && (++it != end)); PUMP_DEBUG; } bool LLPumpIO::handleChainError( LLChainInfo& chain, LLIOPipe::EStatus error) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); links_t::reverse_iterator rit; if(chain.mHead == chain.mChainLinks.end()) { rit = links_t::reverse_iterator(chain.mHead); } else { rit = links_t::reverse_iterator(chain.mHead + 1); } links_t::reverse_iterator rend = chain.mChainLinks.rend(); bool handled = false; bool keep_going = true; do { #if LL_DEBUG_PIPE_TYPE_IN_PUMP lldebugs << "Passing error to " << typeid(*((*rit).mPipe)).name() << "." << llendl; #endif error = (*rit).mPipe->handleError(error, this); switch(error) { case LLIOPipe::STATUS_OK: handled = true; chain.mHead = rit.base(); break; case LLIOPipe::STATUS_STOP: case LLIOPipe::STATUS_DONE: case LLIOPipe::STATUS_BREAK: case LLIOPipe::STATUS_NEED_PROCESS: #if LL_DEBUG_PIPE_TYPE_IN_PUMP lldebugs << "Pipe " << typeid(*((*rit).mPipe)).name() << " returned code to stop error handler." << llendl; #endif keep_going = false; break; default: if(LLIOPipe::isSuccess(error)) { llinfos << "Unhandled status code: " << error << ":" << LLIOPipe::lookupStatusString(error) << llendl; error = LLIOPipe::STATUS_ERROR; keep_going = false; } break; } } while(keep_going && !handled && (++rit != rend)); return handled; } /** * LLPumpIO::LLChainInfo */ LLPumpIO::LLChainInfo::LLChainInfo() : mInit(false), mLock(0), mEOS(false) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); mTimer.setTimerExpirySec(DEFAULT_CHAIN_EXPIRY_SECS); } void LLPumpIO::LLChainInfo::setTimeoutSeconds(F32 timeout) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); if(timeout > 0.0f) { mTimer.start(); mTimer.reset(); mTimer.setTimerExpirySec(timeout); } else { mTimer.stop(); } } void LLPumpIO::LLChainInfo::adjustTimeoutSeconds(F32 delta) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); if(mTimer.getStarted()) { F64 expiry = mTimer.expiresAt(); expiry += delta; mTimer.setExpiryAt(expiry); } }