diff options
Diffstat (limited to 'linden/indra/llmessage/llpumpio.cpp')
-rw-r--r-- | linden/indra/llmessage/llpumpio.cpp | 1025 |
1 files changed, 1025 insertions, 0 deletions
diff --git a/linden/indra/llmessage/llpumpio.cpp b/linden/indra/llmessage/llpumpio.cpp new file mode 100644 index 0000000..853a438 --- /dev/null +++ b/linden/indra/llmessage/llpumpio.cpp | |||
@@ -0,0 +1,1025 @@ | |||
1 | /** | ||
2 | * @file llpumpio.cpp | ||
3 | * @author Phoenix | ||
4 | * @date 2004-11-21 | ||
5 | * @brief Implementation of the i/o pump and related functions. | ||
6 | * | ||
7 | * Copyright (c) 2004-2007, Linden Research, Inc. | ||
8 | * | ||
9 | * The source code in this file ("Source Code") is provided by Linden Lab | ||
10 | * to you under the terms of the GNU General Public License, version 2.0 | ||
11 | * ("GPL"), unless you have obtained a separate licensing agreement | ||
12 | * ("Other License"), formally executed by you and Linden Lab. Terms of | ||
13 | * the GPL can be found in doc/GPL-license.txt in this distribution, or | ||
14 | * online at http://secondlife.com/developers/opensource/gplv2 | ||
15 | * | ||
16 | * There are special exceptions to the terms and conditions of the GPL as | ||
17 | * it is applied to this Source Code. View the full text of the exception | ||
18 | * in the file doc/FLOSS-exception.txt in this software distribution, or | ||
19 | * online at http://secondlife.com/developers/opensource/flossexception | ||
20 | * | ||
21 | * By copying, modifying or distributing this software, you acknowledge | ||
22 | * that you have read and understood your obligations described above, | ||
23 | * and agree to abide by those obligations. | ||
24 | * | ||
25 | * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO | ||
26 | * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY, | ||
27 | * COMPLETENESS OR PERFORMANCE. | ||
28 | */ | ||
29 | |||
30 | #include "linden_common.h" | ||
31 | #include "llpumpio.h" | ||
32 | |||
33 | #include <set> | ||
34 | #include "apr-1/apr_poll.h" | ||
35 | |||
36 | #include "llapr.h" | ||
37 | #include "llmemtype.h" | ||
38 | #include "llstl.h" | ||
39 | |||
40 | // This should not be in production, but it is intensely useful during | ||
41 | // development. | ||
42 | #if LL_LINUX | ||
43 | #define LL_DEBUG_PIPE_TYPE_IN_PUMP 0 | ||
44 | #endif | ||
45 | |||
46 | #if LL_DEBUG_PIPE_TYPE_IN_PUMP | ||
47 | #include <typeinfo> | ||
48 | #endif | ||
49 | |||
50 | // constants for poll timeout. if we are threading, we want to have a | ||
51 | // longer poll timeout. | ||
52 | #if LL_THREADS_APR | ||
53 | static const S32 DEFAULT_POLL_TIMEOUT = 1000; | ||
54 | #else | ||
55 | static const S32 DEFAULT_POLL_TIMEOUT = 0; | ||
56 | #endif | ||
57 | |||
58 | // The default (and fallback) expiration time for chains | ||
59 | const F32 DEFAULT_CHAIN_EXPIRY_SECS = 30.0f; | ||
60 | extern const F32 SHORT_CHAIN_EXPIRY_SECS = 1.0f; | ||
61 | extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f; | ||
62 | |||
63 | // sorta spammy debug modes. | ||
64 | //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR 1 | ||
65 | //#define LL_DEBUG_PROCESS_LINK 1 | ||
66 | //#define LL_DEBUG_PROCESS_RETURN_VALUE 1 | ||
67 | |||
68 | // Super spammy debug mode. | ||
69 | //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 1 | ||
70 | //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 1 | ||
71 | |||
72 | /** | ||
73 | * @class | ||
74 | */ | ||
75 | class LLChainSleeper : public LLRunnable | ||
76 | { | ||
77 | public: | ||
78 | static LLRunner::run_ptr_t build(LLPumpIO* pump, S32 key) | ||
79 | { | ||
80 | return LLRunner::run_ptr_t(new LLChainSleeper(pump, key)); | ||
81 | } | ||
82 | |||
83 | virtual void run(LLRunner* runner, S64 handle) | ||
84 | { | ||
85 | mPump->clearLock(mKey); | ||
86 | } | ||
87 | |||
88 | protected: | ||
89 | LLChainSleeper(LLPumpIO* pump, S32 key) : mPump(pump), mKey(key) {} | ||
90 | LLPumpIO* mPump; | ||
91 | S32 mKey; | ||
92 | }; | ||
93 | |||
94 | |||
95 | /** | ||
96 | * @struct ll_delete_apr_pollset_fd_client_data | ||
97 | * @brief This is a simple helper class to clean up our client data. | ||
98 | */ | ||
99 | struct ll_delete_apr_pollset_fd_client_data | ||
100 | { | ||
101 | typedef std::pair<LLIOPipe::ptr_t, apr_pollfd_t> pipe_conditional_t; | ||
102 | void operator()(const pipe_conditional_t& conditional) | ||
103 | { | ||
104 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
105 | S32* client_id = (S32*)conditional.second.client_data; | ||
106 | delete client_id; | ||
107 | } | ||
108 | }; | ||
109 | |||
110 | /** | ||
111 | * LLPumpIO | ||
112 | */ | ||
113 | LLPumpIO::LLPumpIO(apr_pool_t* pool) : | ||
114 | mState(LLPumpIO::NORMAL), | ||
115 | mRebuildPollset(false), | ||
116 | mPollset(NULL), | ||
117 | mPollsetClientID(0), | ||
118 | mNextLock(0), | ||
119 | mPool(NULL), | ||
120 | mCurrentPool(NULL), | ||
121 | mCurrentPoolReallocCount(0), | ||
122 | mChainsMutex(NULL), | ||
123 | mCallbackMutex(NULL) | ||
124 | { | ||
125 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
126 | initialize(pool); | ||
127 | } | ||
128 | |||
129 | LLPumpIO::~LLPumpIO() | ||
130 | { | ||
131 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
132 | cleanup(); | ||
133 | } | ||
134 | |||
135 | bool LLPumpIO::prime(apr_pool_t* pool) | ||
136 | { | ||
137 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
138 | cleanup(); | ||
139 | initialize(pool); | ||
140 | return ((pool == NULL) ? false : true); | ||
141 | } | ||
142 | |||
143 | bool LLPumpIO::addChain(const chain_t& chain, F32 timeout) | ||
144 | { | ||
145 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
146 | if(chain.empty()) return false; | ||
147 | |||
148 | #if LL_THREADS_APR | ||
149 | LLScopedLock lock(mChainsMutex); | ||
150 | #endif | ||
151 | LLChainInfo info; | ||
152 | info.setTimeoutSeconds(timeout); | ||
153 | info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray); | ||
154 | LLLinkInfo link; | ||
155 | #if LL_DEBUG_PIPE_TYPE_IN_PUMP | ||
156 | lldebugs << "LLPumpIO::addChain() " << chain[0] << " '" | ||
157 | << typeid(*(chain[0])).name() << "'" << llendl; | ||
158 | #else | ||
159 | lldebugs << "LLPumpIO::addChain() " << chain[0] <<llendl; | ||
160 | #endif | ||
161 | chain_t::const_iterator it = chain.begin(); | ||
162 | chain_t::const_iterator end = chain.end(); | ||
163 | for(; it != end; ++it) | ||
164 | { | ||
165 | link.mPipe = (*it); | ||
166 | link.mChannels = info.mData->nextChannel(); | ||
167 | info.mChainLinks.push_back(link); | ||
168 | } | ||
169 | mPendingChains.push_back(info); | ||
170 | return true; | ||
171 | } | ||
172 | |||
173 | bool LLPumpIO::addChain( | ||
174 | const LLPumpIO::links_t& links, | ||
175 | LLIOPipe::buffer_ptr_t data, | ||
176 | LLSD context, | ||
177 | F32 timeout) | ||
178 | { | ||
179 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
180 | |||
181 | // remember that if the caller is providing a full link | ||
182 | // description, we need to have that description matched to a | ||
183 | // particular buffer. | ||
184 | if(!data) return false; | ||
185 | if(links.empty()) return false; | ||
186 | |||
187 | #if LL_THREADS_APR | ||
188 | LLScopedLock lock(mChainsMutex); | ||
189 | #endif | ||
190 | #if LL_DEBUG_PIPE_TYPE_IN_PUMP | ||
191 | lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << " '" | ||
192 | << typeid(*(links[0].mPipe)).name() << "'" << llendl; | ||
193 | #else | ||
194 | lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << llendl; | ||
195 | #endif | ||
196 | LLChainInfo info; | ||
197 | info.setTimeoutSeconds(timeout); | ||
198 | info.mChainLinks = links; | ||
199 | info.mData = data; | ||
200 | info.mContext = context; | ||
201 | mPendingChains.push_back(info); | ||
202 | return true; | ||
203 | } | ||
204 | |||
205 | bool LLPumpIO::setTimeoutSeconds(F32 timeout) | ||
206 | { | ||
207 | // If no chain is running, return failure. | ||
208 | if(current_chain_t() == mCurrentChain) | ||
209 | { | ||
210 | return false; | ||
211 | } | ||
212 | (*mCurrentChain).setTimeoutSeconds(timeout); | ||
213 | return true; | ||
214 | } | ||
215 | |||
216 | bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll) | ||
217 | { | ||
218 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
219 | //lldebugs << "LLPumpIO::setConditional" << llendl; | ||
220 | if(pipe) | ||
221 | { | ||
222 | // remove any matching poll file descriptors for this pipe. | ||
223 | LLIOPipe::ptr_t pipe_ptr(pipe); | ||
224 | |||
225 | LLChainInfo::conditionals_t::iterator it = (*mCurrentChain).mDescriptors.begin(); | ||
226 | LLChainInfo::conditionals_t::iterator end = (*mCurrentChain).mDescriptors.end(); | ||
227 | while (it != end) | ||
228 | { | ||
229 | LLChainInfo::pipe_conditional_t& value = (*it); | ||
230 | if ( pipe_ptr == value.first ) | ||
231 | { | ||
232 | ll_delete_apr_pollset_fd_client_data()(value); | ||
233 | (*mCurrentChain).mDescriptors.erase(it++); | ||
234 | mRebuildPollset = true; | ||
235 | } | ||
236 | else | ||
237 | { | ||
238 | ++it; | ||
239 | } | ||
240 | } | ||
241 | |||
242 | if(poll) | ||
243 | { | ||
244 | LLChainInfo::pipe_conditional_t value; | ||
245 | value.first = pipe_ptr; | ||
246 | value.second = *poll; | ||
247 | if(!poll->p) | ||
248 | { | ||
249 | // each fd needs a pool to work with, so if one was | ||
250 | // not specified, use this pool. | ||
251 | // *FIX: Should it always be this pool? | ||
252 | value.second.p = mPool; | ||
253 | } | ||
254 | value.second.client_data = new S32(++mPollsetClientID); | ||
255 | (*mCurrentChain).mDescriptors.push_back(value); | ||
256 | mRebuildPollset = true; | ||
257 | } | ||
258 | return true; | ||
259 | } | ||
260 | return false; | ||
261 | } | ||
262 | |||
263 | S32 LLPumpIO::setLock() | ||
264 | { | ||
265 | // *NOTE: I do not think it is necessary to acquire a mutex here | ||
266 | // since this should only be called during the pump(), and should | ||
267 | // only change the running chain. Any other use of this method is | ||
268 | // incorrect usage. If it becomes necessary to acquire a lock | ||
269 | // here, be sure to lock here and call a protected method to get | ||
270 | // the lock, and sleepChain() should probably acquire the same | ||
271 | // lock while and calling the same protected implementation to | ||
272 | // lock the runner at the same time. | ||
273 | |||
274 | // If no chain is running, return failure. | ||
275 | if(current_chain_t() == mCurrentChain) | ||
276 | { | ||
277 | return 0; | ||
278 | } | ||
279 | |||
280 | // deal with wrap. | ||
281 | if(++mNextLock <= 0) | ||
282 | { | ||
283 | mNextLock = 1; | ||
284 | } | ||
285 | |||
286 | // set the lock | ||
287 | (*mCurrentChain).mLock = mNextLock; | ||
288 | return mNextLock; | ||
289 | } | ||
290 | |||
291 | void LLPumpIO::clearLock(S32 key) | ||
292 | { | ||
293 | // We need to lock it here since we do not want to be iterating | ||
294 | // over the chains twice. We can safely call process() while this | ||
295 | // is happening since we should not be erasing a locked pipe, and | ||
296 | // therefore won't be treading into deleted memory. I think we can | ||
297 | // also clear the lock on the chain safely since the pump only | ||
298 | // reads that value. | ||
299 | #if LL_THREADS_APR | ||
300 | LLScopedLock lock(mChainsMutex); | ||
301 | #endif | ||
302 | mClearLocks.insert(key); | ||
303 | } | ||
304 | |||
305 | bool LLPumpIO::sleepChain(F64 seconds) | ||
306 | { | ||
307 | // Much like the call to setLock(), this should only be called | ||
308 | // from one chain during processing, so there is no need to | ||
309 | // acquire a mutex. | ||
310 | if(seconds <= 0.0) return false; | ||
311 | S32 key = setLock(); | ||
312 | if(!key) return false; | ||
313 | LLRunner::run_handle_t handle = mRunner.addRunnable( | ||
314 | LLChainSleeper::build(this, key), | ||
315 | LLRunner::RUN_IN, | ||
316 | seconds); | ||
317 | if(0 == handle) return false; | ||
318 | return true; | ||
319 | } | ||
320 | |||
321 | bool LLPumpIO::copyCurrentLinkInfo(links_t& links) const | ||
322 | { | ||
323 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
324 | if(current_chain_t() == mCurrentChain) | ||
325 | { | ||
326 | return false; | ||
327 | } | ||
328 | std::copy( | ||
329 | (*mCurrentChain).mChainLinks.begin(), | ||
330 | (*mCurrentChain).mChainLinks.end(), | ||
331 | std::back_insert_iterator<links_t>(links)); | ||
332 | return true; | ||
333 | } | ||
334 | |||
335 | void LLPumpIO::pump() | ||
336 | { | ||
337 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
338 | //llinfos << "LLPumpIO::pump()" << llendl; | ||
339 | |||
340 | // Run any pending runners. | ||
341 | mRunner.run(); | ||
342 | |||
343 | // We need to move all of the pending heads over to the running | ||
344 | // chains. | ||
345 | PUMP_DEBUG; | ||
346 | if(true) | ||
347 | { | ||
348 | #if LL_THREADS_APR | ||
349 | LLScopedLock lock(mChainsMutex); | ||
350 | #endif | ||
351 | // bail if this pump is paused. | ||
352 | if(PAUSING == mState) | ||
353 | { | ||
354 | mState = PAUSED; | ||
355 | } | ||
356 | if(PAUSED == mState) | ||
357 | { | ||
358 | return; | ||
359 | } | ||
360 | |||
361 | PUMP_DEBUG; | ||
362 | // Move the pending chains over to the running chaings | ||
363 | if(!mPendingChains.empty()) | ||
364 | { | ||
365 | PUMP_DEBUG; | ||
366 | //lldebugs << "Pushing " << mPendingChains.size() << "." << llendl; | ||
367 | std::copy( | ||
368 | mPendingChains.begin(), | ||
369 | mPendingChains.end(), | ||
370 | std::back_insert_iterator<running_chains_t>(mRunningChains)); | ||
371 | mPendingChains.clear(); | ||
372 | PUMP_DEBUG; | ||
373 | } | ||
374 | |||
375 | // Clear any locks. This needs to be done here so that we do | ||
376 | // not clash during a call to clearLock(). | ||
377 | if(!mClearLocks.empty()) | ||
378 | { | ||
379 | PUMP_DEBUG; | ||
380 | running_chains_t::iterator it = mRunningChains.begin(); | ||
381 | running_chains_t::iterator end = mRunningChains.end(); | ||
382 | std::set<S32>::iterator not_cleared = mClearLocks.end(); | ||
383 | for(; it != end; ++it) | ||
384 | { | ||
385 | if((*it).mLock && mClearLocks.find((*it).mLock) != not_cleared) | ||
386 | { | ||
387 | (*it).mLock = 0; | ||
388 | } | ||
389 | } | ||
390 | PUMP_DEBUG; | ||
391 | mClearLocks.clear(); | ||
392 | } | ||
393 | } | ||
394 | |||
395 | PUMP_DEBUG; | ||
396 | // rebuild the pollset if necessary | ||
397 | if(mRebuildPollset) | ||
398 | { | ||
399 | PUMP_DEBUG; | ||
400 | rebuildPollset(); | ||
401 | mRebuildPollset = false; | ||
402 | } | ||
403 | |||
404 | // Poll based on the last known pollset | ||
405 | // *FIX: may want to pass in a poll timeout so it works correctly | ||
406 | // in single and multi threaded processes. | ||
407 | PUMP_DEBUG; | ||
408 | typedef std::set<S32> signal_client_t; | ||
409 | signal_client_t signalled_client; | ||
410 | if(mPollset) | ||
411 | { | ||
412 | PUMP_DEBUG; | ||
413 | //llinfos << "polling" << llendl; | ||
414 | S32 count = 0; | ||
415 | S32 client_id = 0; | ||
416 | const apr_pollfd_t* poll_fd = NULL; | ||
417 | apr_pollset_poll(mPollset, DEFAULT_POLL_TIMEOUT, &count, &poll_fd); | ||
418 | PUMP_DEBUG; | ||
419 | for(S32 i = 0; i < count; ++i) | ||
420 | { | ||
421 | client_id = *((S32*)poll_fd[i].client_data); | ||
422 | signalled_client.insert(client_id); | ||
423 | } | ||
424 | PUMP_DEBUG; | ||
425 | } | ||
426 | |||
427 | PUMP_DEBUG; | ||
428 | // set up for a check to see if each one was signalled | ||
429 | signal_client_t::iterator not_signalled = signalled_client.end(); | ||
430 | |||
431 | // Process everything as appropriate | ||
432 | //lldebugs << "Running chain count: " << mRunningChains.size() << llendl; | ||
433 | running_chains_t::iterator run_chain = mRunningChains.begin(); | ||
434 | bool process_this_chain = false; | ||
435 | for(; run_chain != mRunningChains.end(); ) | ||
436 | { | ||
437 | PUMP_DEBUG; | ||
438 | if((*run_chain).mInit | ||
439 | && (*run_chain).mTimer.getStarted() | ||
440 | && (*run_chain).mTimer.hasExpired()) | ||
441 | { | ||
442 | PUMP_DEBUG; | ||
443 | if(handleChainError(*run_chain, LLIOPipe::STATUS_EXPIRED)) | ||
444 | { | ||
445 | // the pipe probably handled the error. If the handler | ||
446 | // forgot to reset the expiration then we need to do | ||
447 | // that here. | ||
448 | if((*run_chain).mTimer.getStarted() | ||
449 | && (*run_chain).mTimer.hasExpired()) | ||
450 | { | ||
451 | PUMP_DEBUG; | ||
452 | llinfos << "Error handler forgot to reset timeout. " | ||
453 | << "Resetting to " << DEFAULT_CHAIN_EXPIRY_SECS | ||
454 | << " seconds." << llendl; | ||
455 | (*run_chain).setTimeoutSeconds(DEFAULT_CHAIN_EXPIRY_SECS); | ||
456 | } | ||
457 | } | ||
458 | else | ||
459 | { | ||
460 | PUMP_DEBUG; | ||
461 | // it timed out and no one handled it, so we need to | ||
462 | // retire the chain | ||
463 | #if LL_DEBUG_PIPE_TYPE_IN_PUMP | ||
464 | lldebugs << "Removing chain " | ||
465 | << (*run_chain).mChainLinks[0].mPipe | ||
466 | << " '" | ||
467 | << typeid(*((*run_chain).mChainLinks[0].mPipe)).name() | ||
468 | << "' because it timed out." << llendl; | ||
469 | #else | ||
470 | // lldebugs << "Removing chain " | ||
471 | // << (*run_chain).mChainLinks[0].mPipe | ||
472 | // << " because we reached the end." << llendl; | ||
473 | #endif | ||
474 | mRunningChains.erase(run_chain++); | ||
475 | continue; | ||
476 | } | ||
477 | } | ||
478 | PUMP_DEBUG; | ||
479 | if((*run_chain).mLock) | ||
480 | { | ||
481 | ++run_chain; | ||
482 | continue; | ||
483 | } | ||
484 | PUMP_DEBUG; | ||
485 | mCurrentChain = run_chain; | ||
486 | if((*run_chain).mDescriptors.empty()) | ||
487 | { | ||
488 | // if there are no conditionals, just process this chain. | ||
489 | process_this_chain = true; | ||
490 | //lldebugs << "no conditionals - processing" << llendl; | ||
491 | } | ||
492 | else | ||
493 | { | ||
494 | PUMP_DEBUG; | ||
495 | //lldebugs << "checking conditionals" << llendl; | ||
496 | // Check if this run chain was signalled. If any file | ||
497 | // descriptor is ready for something, then go ahead and | ||
498 | // process this chian. | ||
499 | process_this_chain = false; | ||
500 | if(!signalled_client.empty()) | ||
501 | { | ||
502 | PUMP_DEBUG; | ||
503 | LLChainInfo::conditionals_t::iterator it; | ||
504 | it = (*run_chain).mDescriptors.begin(); | ||
505 | LLChainInfo::conditionals_t::iterator end; | ||
506 | end = (*run_chain).mDescriptors.end(); | ||
507 | S32 client_id = 0; | ||
508 | for(; it != end; ++it) | ||
509 | { | ||
510 | PUMP_DEBUG; | ||
511 | client_id = *((S32*)((*it).second.client_data)); | ||
512 | if(signalled_client.find(client_id) != not_signalled) | ||
513 | { | ||
514 | process_this_chain = true; | ||
515 | break; | ||
516 | } | ||
517 | //llinfos << "no fd ready for this one." << llendl; | ||
518 | } | ||
519 | } | ||
520 | } | ||
521 | if(process_this_chain) | ||
522 | { | ||
523 | PUMP_DEBUG; | ||
524 | if(!((*run_chain).mInit)) | ||
525 | { | ||
526 | (*run_chain).mHead = (*run_chain).mChainLinks.begin(); | ||
527 | (*run_chain).mInit = true; | ||
528 | } | ||
529 | PUMP_DEBUG; | ||
530 | processChain(*run_chain); | ||
531 | } | ||
532 | |||
533 | PUMP_DEBUG; | ||
534 | if((*run_chain).mHead == (*run_chain).mChainLinks.end()) | ||
535 | { | ||
536 | #if LL_DEBUG_PIPE_TYPE_IN_PUMP | ||
537 | lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe | ||
538 | << " '" | ||
539 | << typeid(*((*run_chain).mChainLinks[0].mPipe)).name() | ||
540 | << "' because we reached the end." << llendl; | ||
541 | #else | ||
542 | // lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe | ||
543 | // << " because we reached the end." << llendl; | ||
544 | #endif | ||
545 | |||
546 | PUMP_DEBUG; | ||
547 | // This chain is done. Clean up any allocated memory and | ||
548 | // erase the chain info. | ||
549 | std::for_each( | ||
550 | (*run_chain).mDescriptors.begin(), | ||
551 | (*run_chain).mDescriptors.end(), | ||
552 | ll_delete_apr_pollset_fd_client_data()); | ||
553 | mRunningChains.erase(run_chain++); | ||
554 | |||
555 | // *NOTE: may not always need to rebuild the pollset. | ||
556 | mRebuildPollset = true; | ||
557 | } | ||
558 | else | ||
559 | { | ||
560 | PUMP_DEBUG; | ||
561 | // this chain needs more processing - just go to the next | ||
562 | // chain. | ||
563 | ++run_chain; | ||
564 | } | ||
565 | } | ||
566 | |||
567 | PUMP_DEBUG; | ||
568 | // null out the chain | ||
569 | mCurrentChain = current_chain_t(); | ||
570 | END_PUMP_DEBUG; | ||
571 | } | ||
572 | |||
573 | //bool LLPumpIO::respond(const chain_t& pipes) | ||
574 | //{ | ||
575 | //#if LL_THREADS_APR | ||
576 | // LLScopedLock lock(mCallbackMutex); | ||
577 | //#endif | ||
578 | // LLChainInfo info; | ||
579 | // links_t links; | ||
580 | // | ||
581 | // mPendingCallbacks.push_back(info); | ||
582 | // return true; | ||
583 | //} | ||
584 | |||
585 | bool LLPumpIO::respond(LLIOPipe* pipe) | ||
586 | { | ||
587 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
588 | if(NULL == pipe) return false; | ||
589 | |||
590 | #if LL_THREADS_APR | ||
591 | LLScopedLock lock(mCallbackMutex); | ||
592 | #endif | ||
593 | LLChainInfo info; | ||
594 | LLLinkInfo link; | ||
595 | link.mPipe = pipe; | ||
596 | info.mChainLinks.push_back(link); | ||
597 | mPendingCallbacks.push_back(info); | ||
598 | return true; | ||
599 | } | ||
600 | |||
601 | bool LLPumpIO::respond( | ||
602 | const links_t& links, | ||
603 | LLIOPipe::buffer_ptr_t data, | ||
604 | LLSD context) | ||
605 | { | ||
606 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
607 | // if the caller is providing a full link description, we need to | ||
608 | // have that description matched to a particular buffer. | ||
609 | if(!data) return false; | ||
610 | if(links.empty()) return false; | ||
611 | |||
612 | #if LL_THREADS_APR | ||
613 | LLScopedLock lock(mCallbackMutex); | ||
614 | #endif | ||
615 | |||
616 | // Add the callback response | ||
617 | LLChainInfo info; | ||
618 | info.mChainLinks = links; | ||
619 | info.mData = data; | ||
620 | info.mContext = context; | ||
621 | mPendingCallbacks.push_back(info); | ||
622 | return true; | ||
623 | } | ||
624 | |||
625 | void LLPumpIO::callback() | ||
626 | { | ||
627 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
628 | //llinfos << "LLPumpIO::callback()" << llendl; | ||
629 | if(true) | ||
630 | { | ||
631 | #if LL_THREADS_APR | ||
632 | LLScopedLock lock(mCallbackMutex); | ||
633 | #endif | ||
634 | std::copy( | ||
635 | mPendingCallbacks.begin(), | ||
636 | mPendingCallbacks.end(), | ||
637 | std::back_insert_iterator<callbacks_t>(mCallbacks)); | ||
638 | mPendingCallbacks.clear(); | ||
639 | } | ||
640 | if(!mCallbacks.empty()) | ||
641 | { | ||
642 | callbacks_t::iterator it = mCallbacks.begin(); | ||
643 | callbacks_t::iterator end = mCallbacks.end(); | ||
644 | for(; it != end; ++it) | ||
645 | { | ||
646 | // it's always the first and last time for respone chains | ||
647 | (*it).mHead = (*it).mChainLinks.begin(); | ||
648 | (*it).mInit = true; | ||
649 | (*it).mEOS = true; | ||
650 | processChain(*it); | ||
651 | } | ||
652 | mCallbacks.clear(); | ||
653 | } | ||
654 | } | ||
655 | |||
656 | void LLPumpIO::control(LLPumpIO::EControl op) | ||
657 | { | ||
658 | #if LL_THREADS_APR | ||
659 | LLScopedLock lock(mChainsMutex); | ||
660 | #endif | ||
661 | switch(op) | ||
662 | { | ||
663 | case PAUSE: | ||
664 | mState = PAUSING; | ||
665 | break; | ||
666 | case RESUME: | ||
667 | mState = NORMAL; | ||
668 | break; | ||
669 | default: | ||
670 | // no-op | ||
671 | break; | ||
672 | } | ||
673 | } | ||
674 | |||
675 | void LLPumpIO::initialize(apr_pool_t* pool) | ||
676 | { | ||
677 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
678 | if(!pool) return; | ||
679 | #if LL_THREADS_APR | ||
680 | apr_thread_mutex_create(&mChainsMutex, APR_THREAD_MUTEX_DEFAULT, pool); | ||
681 | apr_thread_mutex_create(&mCallbackMutex, APR_THREAD_MUTEX_DEFAULT, pool); | ||
682 | #endif | ||
683 | mPool = pool; | ||
684 | } | ||
685 | |||
686 | void LLPumpIO::cleanup() | ||
687 | { | ||
688 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
689 | #if LL_THREADS_APR | ||
690 | if(mChainsMutex) apr_thread_mutex_destroy(mChainsMutex); | ||
691 | if(mCallbackMutex) apr_thread_mutex_destroy(mCallbackMutex); | ||
692 | #endif | ||
693 | mChainsMutex = NULL; | ||
694 | mCallbackMutex = NULL; | ||
695 | if(mPollset) | ||
696 | { | ||
697 | // lldebugs << "cleaning up pollset" << llendl; | ||
698 | apr_pollset_destroy(mPollset); | ||
699 | mPollset = NULL; | ||
700 | } | ||
701 | if(mCurrentPool) | ||
702 | { | ||
703 | apr_pool_destroy(mCurrentPool); | ||
704 | mCurrentPool = NULL; | ||
705 | } | ||
706 | mPool = NULL; | ||
707 | } | ||
708 | |||
709 | void LLPumpIO::rebuildPollset() | ||
710 | { | ||
711 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
712 | // lldebugs << "LLPumpIO::rebuildPollset()" << llendl; | ||
713 | if(mPollset) | ||
714 | { | ||
715 | //lldebugs << "destroying pollset" << llendl; | ||
716 | apr_pollset_destroy(mPollset); | ||
717 | mPollset = NULL; | ||
718 | } | ||
719 | U32 size = 0; | ||
720 | running_chains_t::iterator run_it = mRunningChains.begin(); | ||
721 | running_chains_t::iterator run_end = mRunningChains.end(); | ||
722 | for(; run_it != run_end; ++run_it) | ||
723 | { | ||
724 | size += (*run_it).mDescriptors.size(); | ||
725 | } | ||
726 | //lldebugs << "found " << size << " descriptors." << llendl; | ||
727 | if(size) | ||
728 | { | ||
729 | // Recycle the memory pool | ||
730 | const S32 POLLSET_POOL_RECYCLE_COUNT = 100; | ||
731 | if(mCurrentPool | ||
732 | && (0 == (++mCurrentPoolReallocCount % POLLSET_POOL_RECYCLE_COUNT))) | ||
733 | { | ||
734 | apr_pool_destroy(mCurrentPool); | ||
735 | mCurrentPool = NULL; | ||
736 | mCurrentPoolReallocCount = 0; | ||
737 | } | ||
738 | if(!mCurrentPool) | ||
739 | { | ||
740 | apr_status_t status = apr_pool_create(&mCurrentPool, mPool); | ||
741 | (void)ll_apr_warn_status(status); | ||
742 | } | ||
743 | |||
744 | // add all of the file descriptors | ||
745 | run_it = mRunningChains.begin(); | ||
746 | LLChainInfo::conditionals_t::iterator fd_it; | ||
747 | LLChainInfo::conditionals_t::iterator fd_end; | ||
748 | apr_pollset_create(&mPollset, size, mCurrentPool, 0); | ||
749 | for(; run_it != run_end; ++run_it) | ||
750 | { | ||
751 | fd_it = (*run_it).mDescriptors.begin(); | ||
752 | fd_end = (*run_it).mDescriptors.end(); | ||
753 | for(; fd_it != fd_end; ++fd_it) | ||
754 | { | ||
755 | apr_pollset_add(mPollset, &((*fd_it).second)); | ||
756 | } | ||
757 | } | ||
758 | } | ||
759 | } | ||
760 | |||
761 | void LLPumpIO::processChain(LLChainInfo& chain) | ||
762 | { | ||
763 | PUMP_DEBUG; | ||
764 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
765 | LLIOPipe::EStatus status = LLIOPipe::STATUS_OK; | ||
766 | links_t::iterator it = chain.mHead; | ||
767 | links_t::iterator end = chain.mChainLinks.end(); | ||
768 | bool need_process_signaled = false; | ||
769 | bool keep_going = true; | ||
770 | do | ||
771 | { | ||
772 | #if LL_DEBUG_PROCESS_LINK | ||
773 | #if LL_DEBUG_PIPE_TYPE_IN_PUMP | ||
774 | llinfos << "Processing " << typeid(*((*it).mPipe)).name() << "." | ||
775 | << llendl; | ||
776 | #else | ||
777 | llinfos << "Processing link " << (*it).mPipe << "." << llendl; | ||
778 | #endif | ||
779 | #endif | ||
780 | #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN | ||
781 | if(chain.mData) | ||
782 | { | ||
783 | char* buf = NULL; | ||
784 | S32 bytes = chain.mData->countAfter((*it).mChannels.in(), NULL); | ||
785 | if(bytes) | ||
786 | { | ||
787 | buf = new char[bytes + 1]; | ||
788 | chain.mData->readAfter( | ||
789 | (*it).mChannels.in(), | ||
790 | NULL, | ||
791 | (U8*)buf, | ||
792 | bytes); | ||
793 | buf[bytes] = '\0'; | ||
794 | llinfos << "CHANNEL IN(" << (*it).mChannels.in() << "): " | ||
795 | << buf << llendl; | ||
796 | delete[] buf; | ||
797 | buf = NULL; | ||
798 | } | ||
799 | else | ||
800 | { | ||
801 | llinfos << "CHANNEL IN(" << (*it).mChannels.in()<< "): (null)" | ||
802 | << llendl; | ||
803 | } | ||
804 | } | ||
805 | #endif | ||
806 | PUMP_DEBUG; | ||
807 | status = (*it).mPipe->process( | ||
808 | (*it).mChannels, | ||
809 | chain.mData, | ||
810 | chain.mEOS, | ||
811 | chain.mContext, | ||
812 | this); | ||
813 | #if LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT | ||
814 | if(chain.mData) | ||
815 | { | ||
816 | char* buf = NULL; | ||
817 | S32 bytes = chain.mData->countAfter((*it).mChannels.out(), NULL); | ||
818 | if(bytes) | ||
819 | { | ||
820 | buf = new char[bytes + 1]; | ||
821 | chain.mData->readAfter( | ||
822 | (*it).mChannels.out(), | ||
823 | NULL, | ||
824 | (U8*)buf, | ||
825 | bytes); | ||
826 | buf[bytes] = '\0'; | ||
827 | llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): " | ||
828 | << buf << llendl; | ||
829 | delete[] buf; | ||
830 | buf = NULL; | ||
831 | } | ||
832 | else | ||
833 | { | ||
834 | llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): (null)" | ||
835 | << llendl; | ||
836 | } | ||
837 | } | ||
838 | #endif | ||
839 | |||
840 | #if LL_DEBUG_PROCESS_RETURN_VALUE | ||
841 | // Only bother with the success codes - error codes are logged | ||
842 | // below. | ||
843 | if(LLIOPipe::isSuccess(status)) | ||
844 | { | ||
845 | llinfos << "Pipe returned: '" | ||
846 | #if LL_DEBUG_PIPE_TYPE_IN_PUMP | ||
847 | << typeid(*((*it).mPipe)).name() << "':'" | ||
848 | #endif | ||
849 | << LLIOPipe::lookupStatusString(status) << "'" << llendl; | ||
850 | } | ||
851 | #endif | ||
852 | |||
853 | PUMP_DEBUG; | ||
854 | switch(status) | ||
855 | { | ||
856 | case LLIOPipe::STATUS_OK: | ||
857 | // no-op | ||
858 | break; | ||
859 | case LLIOPipe::STATUS_STOP: | ||
860 | PUMP_DEBUG; | ||
861 | status = LLIOPipe::STATUS_OK; | ||
862 | chain.mHead = end; | ||
863 | keep_going = false; | ||
864 | break; | ||
865 | case LLIOPipe::STATUS_DONE: | ||
866 | PUMP_DEBUG; | ||
867 | status = LLIOPipe::STATUS_OK; | ||
868 | chain.mHead = (it + 1); | ||
869 | chain.mEOS = true; | ||
870 | break; | ||
871 | case LLIOPipe::STATUS_BREAK: | ||
872 | PUMP_DEBUG; | ||
873 | status = LLIOPipe::STATUS_OK; | ||
874 | keep_going = false; | ||
875 | break; | ||
876 | case LLIOPipe::STATUS_NEED_PROCESS: | ||
877 | PUMP_DEBUG; | ||
878 | status = LLIOPipe::STATUS_OK; | ||
879 | if(!need_process_signaled) | ||
880 | { | ||
881 | need_process_signaled = true; | ||
882 | chain.mHead = it; | ||
883 | } | ||
884 | break; | ||
885 | default: | ||
886 | PUMP_DEBUG; | ||
887 | if(LLIOPipe::isError(status)) | ||
888 | { | ||
889 | llinfos << "Pump generated pipe error: '" | ||
890 | #if LL_DEBUG_PIPE_TYPE_IN_PUMP | ||
891 | << typeid(*((*it).mPipe)).name() << "':'" | ||
892 | #endif | ||
893 | << LLIOPipe::lookupStatusString(status) | ||
894 | << "'" << llendl; | ||
895 | #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR | ||
896 | if(chain.mData) | ||
897 | { | ||
898 | char* buf = NULL; | ||
899 | S32 bytes = chain.mData->countAfter( | ||
900 | (*it).mChannels.in(), | ||
901 | NULL); | ||
902 | if(bytes) | ||
903 | { | ||
904 | buf = new char[bytes + 1]; | ||
905 | chain.mData->readAfter( | ||
906 | (*it).mChannels.in(), | ||
907 | NULL, | ||
908 | (U8*)buf, | ||
909 | bytes); | ||
910 | buf[bytes] = '\0'; | ||
911 | llinfos << "Input After Error: " << buf << llendl; | ||
912 | delete[] buf; | ||
913 | buf = NULL; | ||
914 | } | ||
915 | else | ||
916 | { | ||
917 | llinfos << "Input After Error: (null)" << llendl; | ||
918 | } | ||
919 | } | ||
920 | else | ||
921 | { | ||
922 | llinfos << "Input After Error: (null)" << llendl; | ||
923 | } | ||
924 | #endif | ||
925 | keep_going = false; | ||
926 | chain.mHead = it; | ||
927 | if(!handleChainError(chain, status)) | ||
928 | { | ||
929 | chain.mHead = end; | ||
930 | } | ||
931 | } | ||
932 | else | ||
933 | { | ||
934 | llinfos << "Unhandled status code: " << status << ":" | ||
935 | << LLIOPipe::lookupStatusString(status) << llendl; | ||
936 | } | ||
937 | break; | ||
938 | } | ||
939 | PUMP_DEBUG; | ||
940 | } while(keep_going && (++it != end)); | ||
941 | PUMP_DEBUG; | ||
942 | } | ||
943 | |||
944 | bool LLPumpIO::handleChainError( | ||
945 | LLChainInfo& chain, | ||
946 | LLIOPipe::EStatus error) | ||
947 | { | ||
948 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
949 | links_t::reverse_iterator rit; | ||
950 | if(chain.mHead == chain.mChainLinks.end()) | ||
951 | { | ||
952 | rit = links_t::reverse_iterator(chain.mHead); | ||
953 | } | ||
954 | else | ||
955 | { | ||
956 | rit = links_t::reverse_iterator(chain.mHead + 1); | ||
957 | } | ||
958 | |||
959 | links_t::reverse_iterator rend = chain.mChainLinks.rend(); | ||
960 | bool handled = false; | ||
961 | bool keep_going = true; | ||
962 | do | ||
963 | { | ||
964 | #if LL_DEBUG_PIPE_TYPE_IN_PUMP | ||
965 | lldebugs << "Passing error to " << typeid(*((*rit).mPipe)).name() | ||
966 | << "." << llendl; | ||
967 | #endif | ||
968 | error = (*rit).mPipe->handleError(error, this); | ||
969 | switch(error) | ||
970 | { | ||
971 | case LLIOPipe::STATUS_OK: | ||
972 | handled = true; | ||
973 | chain.mHead = rit.base(); | ||
974 | break; | ||
975 | case LLIOPipe::STATUS_STOP: | ||
976 | case LLIOPipe::STATUS_DONE: | ||
977 | case LLIOPipe::STATUS_BREAK: | ||
978 | case LLIOPipe::STATUS_NEED_PROCESS: | ||
979 | #if LL_DEBUG_PIPE_TYPE_IN_PUMP | ||
980 | lldebugs << "Pipe " << typeid(*((*rit).mPipe)).name() | ||
981 | << " returned code to stop error handler." << llendl; | ||
982 | #endif | ||
983 | keep_going = false; | ||
984 | break; | ||
985 | default: | ||
986 | if(LLIOPipe::isSuccess(error)) | ||
987 | { | ||
988 | llinfos << "Unhandled status code: " << error << ":" | ||
989 | << LLIOPipe::lookupStatusString(error) << llendl; | ||
990 | error = LLIOPipe::STATUS_ERROR; | ||
991 | keep_going = false; | ||
992 | } | ||
993 | break; | ||
994 | } | ||
995 | } while(keep_going && !handled && (++rit != rend)); | ||
996 | return handled; | ||
997 | } | ||
998 | |||
999 | /** | ||
1000 | * LLPumpIO::LLChainInfo | ||
1001 | */ | ||
1002 | |||
1003 | LLPumpIO::LLChainInfo::LLChainInfo() : | ||
1004 | mInit(false), | ||
1005 | mLock(0), | ||
1006 | mEOS(false) | ||
1007 | { | ||
1008 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
1009 | mTimer.setTimerExpirySec(DEFAULT_CHAIN_EXPIRY_SECS); | ||
1010 | } | ||
1011 | |||
1012 | void LLPumpIO::LLChainInfo::setTimeoutSeconds(F32 timeout) | ||
1013 | { | ||
1014 | LLMemType m1(LLMemType::MTYPE_IO_PUMP); | ||
1015 | if(timeout > 0.0f) | ||
1016 | { | ||
1017 | mTimer.start(); | ||
1018 | mTimer.reset(); | ||
1019 | mTimer.setTimerExpirySec(timeout); | ||
1020 | } | ||
1021 | else | ||
1022 | { | ||
1023 | mTimer.stop(); | ||
1024 | } | ||
1025 | } | ||