aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/linden/indra/llmessage/llpumpio.h
diff options
context:
space:
mode:
Diffstat (limited to 'linden/indra/llmessage/llpumpio.h')
-rw-r--r--linden/indra/llmessage/llpumpio.h425
1 files changed, 425 insertions, 0 deletions
diff --git a/linden/indra/llmessage/llpumpio.h b/linden/indra/llmessage/llpumpio.h
new file mode 100644
index 0000000..d90d6b0
--- /dev/null
+++ b/linden/indra/llmessage/llpumpio.h
@@ -0,0 +1,425 @@
1/**
2 * @file llpumpio.h
3 * @author Phoenix
4 * @date 2004-11-19
5 * @brief Declaration of pump class which manages io chains.
6 *
7 * Copyright (c) 2004-2007, Linden Research, Inc.
8 *
9 * The source code in this file ("Source Code") is provided by Linden Lab
10 * to you under the terms of the GNU General Public License, version 2.0
11 * ("GPL"), unless you have obtained a separate licensing agreement
12 * ("Other License"), formally executed by you and Linden Lab. Terms of
13 * the GPL can be found in doc/GPL-license.txt in this distribution, or
14 * online at http://secondlife.com/developers/opensource/gplv2
15 *
16 * There are special exceptions to the terms and conditions of the GPL as
17 * it is applied to this Source Code. View the full text of the exception
18 * in the file doc/FLOSS-exception.txt in this software distribution, or
19 * online at http://secondlife.com/developers/opensource/flossexception
20 *
21 * By copying, modifying or distributing this software, you acknowledge
22 * that you have read and understood your obligations described above,
23 * and agree to abide by those obligations.
24 *
25 * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO
26 * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY,
27 * COMPLETENESS OR PERFORMANCE.
28 */
29
30#ifndef LL_LLPUMPIO_H
31#define LL_LLPUMPIO_H
32
33#include <set>
34#if LL_LINUX // needed for PATH_MAX in APR.
35#include <sys/param.h>
36#endif
37
38#include "apr-1/apr_pools.h"
39#include "llbuffer.h"
40#include "llframetimer.h"
41#include "lliopipe.h"
42#include "llrun.h"
43
44// Define this to enable use with the APR thread library.
45//#define LL_THREADS_APR 1
46
47// some simple constants to help with timeouts
48extern const F32 DEFAULT_CHAIN_EXPIRY_SECS;
49extern const F32 SHORT_CHAIN_EXPIRY_SECS;
50extern const F32 NEVER_CHAIN_EXPIRY_SECS;
51
52/**
53 * @class LLPumpIO
54 * @brief Class to manage sets of io chains.
55 *
56 * The pump class provides a thread abstraction for doing IO based
57 * communication between two threads in a structured and optimized for
58 * processor time. The primary usage is to create a pump, and call
59 * <code>pump()</code> on a thread used for IO and call
60 * <code>respond()</code> on a thread that is expected to do higher
61 * level processing. You can call almost any other method from any
62 * thread - see notes for each method for details. In order for the
63 * threading abstraction to work, you need to call <code>prime()</code>
64 * with a valid apr pool.
65 * A pump instance manages much of the state for the pipe, including
66 * the list of pipes in the chain, the channel for each element in the
67 * chain, the buffer, and if any pipe has marked the stream or process
68 * as done. Pipes can also set file descriptor based conditional
69 * statements so that calls to process do not happen until data is
70 * ready to be read or written. Pipes control execution of calls to
71 * process by returning a status code such as STATUS_OK or
72 * STATUS_BREAK.
73 * One way to conceptualize the way IO will work is that a pump
74 * combines the unit processing of pipes to behave like file pipes on
75 * the unix command line.
76 */
77class LLPumpIO
78{
79public:
80 /**
81 * @brief Constructor.
82 */
83 LLPumpIO(apr_pool_t* pool);
84
85 /**
86 * @brief Destructor.
87 */
88 ~LLPumpIO();
89
90 /**
91 * @brief Prepare this pump for usage.
92 *
93 * If you fail to call this method prior to use, the pump will
94 * try to work, but will not come with any thread locking
95 * mechanisms.
96 * @param pool The apr pool to use.
97 * @return Returns true if the pump is primed.
98 */
99 bool prime(apr_pool_t* pool);
100
101 /**
102 * @brief Typedef for having a chain of pipes.
103 */
104 typedef std::vector<LLIOPipe::ptr_t> chain_t;
105
106 /**
107 * @brief Add a chain to this pump and process in the next cycle.
108 *
109 * This method will automatically generate a buffer and assign
110 * each link in the chain as if it were the consumer to the
111 * previous.
112 * @param chain The pipes for the chain
113 * @param timeout The number of seconds in the future to
114 * expire. Pass in 0.0f to never expire.
115 * @return Returns true if anything was added to the pump.
116 */
117 bool addChain(const chain_t& chain, F32 timeout);
118
119 /**
120 * @brief Struct to associate a pipe with it's buffer io indexes.
121 */
122 struct LLLinkInfo
123 {
124 LLIOPipe::ptr_t mPipe;
125 LLChannelDescriptors mChannels;
126 };
127
128 /**
129 * @brief Typedef for having a chain of <code>LLLinkInfo</code>
130 * instances.
131 */
132 typedef std::vector<LLLinkInfo> links_t;
133
134 /**
135 * @brief Add a chain to this pump and process in the next cycle.
136 *
137 * This method provides a slightly more sophisticated method for
138 * adding a chain where the caller can specify which link elements
139 * are on what channels. This method will fail if no buffer is
140 * provided since any calls to generate new channels for the
141 * buffers will cause unpredictable interleaving of data.
142 * @param links The pipes and io indexes for the chain
143 * @param data Shared pointer to data buffer
144 * @param context Potentially undefined context meta-data for chain.
145 * @param timeout The number of seconds in the future to
146 * expire. Pass in 0.0f to never expire.
147 * @return Returns true if anything was added to the pump.
148 */
149 bool addChain(
150 const links_t& links,
151 LLIOPipe::buffer_ptr_t data,
152 LLSD context,
153 F32 timeout);
154
155 /**
156 * @brief Set or clear a timeout for the running chain
157 *
158 * @param timeout The number of seconds in the future to
159 * expire. Pass in 0.0f to never expire.
160 * @return Returns true if the timer was set.
161 */
162 bool setTimeoutSeconds(F32 timeout);
163
164 /**
165 * @brief Set up file descriptors for for the running chain.
166 * @see rebuildPollset()
167 *
168 * There is currently a limit of one conditional per pipe.
169 * *NOTE: The internal mechanism for building a pollset based on
170 * pipe/pollfd/chain generates an epoll error on linux (and
171 * probably behaves similarly on other platforms) because the
172 * pollset rebuilder will add each apr_pollfd_t serially. This
173 * does not matter for pipes on the same chain, since any
174 * signalled pipe will eventually invoke a call to process(), but
175 * is a problem if the same apr_pollfd_t is on different
176 * chains. Once we have more than just network i/o on the pump,
177 * this might matter.
178 * *FIX: Given the structure of the pump and pipe relationship,
179 * this should probably go through a different mechanism than the
180 * pump. I think it would be best if the pipe had some kind of
181 * controller which was passed into <code>process()</code> rather
182 * than the pump which exposed this interface.
183 * @param pipe The pipe which is setting a conditional
184 * @param poll The entire socket and read/write condition - null to remove
185 * @return Returns true if the poll state was set.
186 */
187 bool setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll);
188
189 /**
190 * @brief Lock the current chain.
191 * @see sleepChain() since it relies on the implementation of this method.
192 *
193 * This locks the currently running chain so that no more calls to
194 * <code>process()</code> until you call <code>clearLock()</code>
195 * with the lock identifier.
196 * *FIX: Given the structure of the pump and pipe relationship,
197 * this should probably go through a different mechanism than the
198 * pump. I think it would be best if the pipe had some kind of
199 * controller which was passed into <code>process()</code> rather
200 * than the pump which exposed this interface.
201 * @return Returns the lock identifer to be used in
202 * <code>clearLock()</code> or 0 on failure.
203 */
204 S32 setLock();
205
206 /**
207 * @brief Clears the identified lock.
208 *
209 * @param links A container for the links which will be appended
210 */
211 void clearLock(S32 key);
212
213 /**
214 * @brief Stop processing a chain for a while.
215 * @see setLock()
216 *
217 * This method will <em>not</em> update the timeout for this
218 * chain, so it is possible to sleep the chain until it is
219 * collected by the pump during a timeout cleanup.
220 * @param seconds The number of seconds in the future to
221 * resume processing.
222 * @return Returns true if the
223 */
224 bool sleepChain(F64 seconds);
225
226 /**
227 * @brief Copy the currently running chain link info
228 *
229 * *FIX: Given the structure of the pump and pipe relationship,
230 * this should probably go through a different mechanism than the
231 * pump. I think it would be best if the pipe had some kind of
232 * controller which was passed into <code>process()</code> rather
233 * than the pump which exposed this interface.
234 * @param links A container for the links which will be appended
235 * @return Returns true if the currently running chain was copied.
236 */
237 bool copyCurrentLinkInfo(links_t& links) const;
238
239 /**
240 * @brief Call this method to call process on all running chains.
241 *
242 * This method iterates through the running chains, and if all
243 * pipe on a chain are unconditionally ready or if any pipe has
244 * any conditional processiong condition then process will be
245 * called on every chain which has requested processing. that
246 * chain has a file descriptor ready, <code>process()</code> will
247 * be called for all pipes which have requested it.
248 */
249 void pump();
250
251 /**
252 * @brief Add a chain to a special queue which will be called
253 * during the next call to <code>callback()</code> and then
254 * dropped from the queue.
255 *
256 * @param chain The IO chain that will get one <code>process()</code>.
257 */
258 //void respond(const chain_t& pipes);
259
260 /**
261 * @brief Add pipe to a special queue which will be called
262 * during the next call to <code>callback()</code> and then dropped
263 * from the queue.
264 *
265 * This call will add a single pipe, with no buffer, context, or
266 * channel information to the callback queue. It will be called
267 * once, and then dropped.
268 * @param pipe A single io pipe which will be called
269 * @return Returns true if anything was added to the pump.
270 */
271 bool respond(LLIOPipe* pipe);
272
273 /**
274 * @brief Add a chain to a special queue which will be called
275 * during the next call to <code>callback()</code> and then
276 * dropped from the queue.
277 *
278 * It is important to remember that you should not add a data
279 * buffer or context which may still be in another chain - that
280 * will almost certainly lead to a problems. Ensure that you are
281 * done reading and writing to those parameters, have new
282 * generated, or empty pointers.
283 * @param links The pipes and io indexes for the chain
284 * @param data Shared pointer to data buffer
285 * @param context Potentially undefined context meta-data for chain.
286 * @return Returns true if anything was added to the pump.
287 */
288 bool respond(
289 const links_t& links,
290 LLIOPipe::buffer_ptr_t data,
291 LLSD context);
292
293 /**
294 * @brief Run through the callback queue and call <code>process()</code>.
295 *
296 * This call will process all prending responses and call process
297 * on each. This method will then drop all processed callback
298 * requests which may lead to deleting the referenced objects.
299 */
300 void callback();
301
302 /**
303 * @brief Enumeration to send commands to the pump.
304 */
305 enum EControl
306 {
307 PAUSE,
308 RESUME,
309 };
310
311 /**
312 * @brief Send a command to the pump.
313 *
314 * @param op What control to send to the pump.
315 */
316 void control(EControl op);
317
318protected:
319 /**
320 * @brief State of the pump
321 */
322 enum EState
323 {
324 NORMAL,
325 PAUSING,
326 PAUSED
327 };
328
329 // instance data
330 EState mState;
331 bool mRebuildPollset;
332 apr_pollset_t* mPollset;
333 S32 mPollsetClientID;
334 S32 mNextLock;
335 std::set<S32> mClearLocks;
336
337 // This is the pump's runnable scheduler used for handling
338 // expiring locks.
339 LLRunner mRunner;
340
341 // This structure is the stuff we track while running chains.
342 struct LLChainInfo
343 {
344 // methods
345 LLChainInfo();
346 void setTimeoutSeconds(F32 timeout);
347
348 // basic member data
349 bool mInit;
350 S32 mLock;
351 LLFrameTimer mTimer;
352 links_t::iterator mHead;
353 links_t mChainLinks;
354 LLIOPipe::buffer_ptr_t mData;
355 bool mEOS;
356 LLSD mContext;
357
358 // tracking inside the pump
359 typedef std::pair<LLIOPipe::ptr_t, apr_pollfd_t> pipe_conditional_t;
360 typedef std::vector<pipe_conditional_t> conditionals_t;
361 conditionals_t mDescriptors;
362 };
363
364 // All the running chains & info
365 typedef std::vector<LLChainInfo> pending_chains_t;
366 pending_chains_t mPendingChains;
367 typedef std::list<LLChainInfo> running_chains_t;
368 running_chains_t mRunningChains;
369
370 typedef running_chains_t::iterator current_chain_t;
371 current_chain_t mCurrentChain;
372
373 // structures necessary for doing callbacks
374 // since the callbacks only get one chance to run, we do not have
375 // to maintain a list.
376 typedef std::vector<LLChainInfo> callbacks_t;
377 callbacks_t mPendingCallbacks;
378 callbacks_t mCallbacks;
379
380 // memory allocator for pollsets & mutexes.
381 apr_pool_t* mPool;
382 apr_pool_t* mCurrentPool;
383 S32 mCurrentPoolReallocCount;
384
385#if LL_THREADS_APR
386 apr_thread_mutex_t* mChainsMutex;
387 apr_thread_mutex_t* mCallbackMutex;
388#else
389 int* mChainsMutex;
390 int* mCallbackMutex;
391#endif
392
393protected:
394 void initialize(apr_pool_t* pool);
395 void cleanup();
396
397 /**
398 * @brief Given the internal state of the chains, rebuild the pollset
399 * @see setConditional()
400 */
401 void rebuildPollset();
402
403 /**
404 * @brief Process the chain passed in.
405 *
406 * This method will potentially modify the internals of the
407 * chain. On end, the chain.mHead will equal
408 * chain.mChainLinks.end().
409 * @param chain The LLChainInfo object to work on.
410 */
411 void processChain(LLChainInfo& chain);
412
413 /**
414 * @brief Rewind through the chain to try to recover from an error.
415 *
416 * This method will potentially modify the internals of the
417 * chain.
418 * @param chain The LLChainInfo object to work on.
419 * @return Retuns true if someone handled the error
420 */
421 bool handleChainError(LLChainInfo& chain, LLIOPipe::EStatus error);
422};
423
424
425#endif // LL_LLPUMPIO_H