diff options
Diffstat (limited to 'linden/indra/llcommon/llqueuedthread.cpp')
-rw-r--r-- | linden/indra/llcommon/llqueuedthread.cpp | 195 |
1 files changed, 109 insertions, 86 deletions
diff --git a/linden/indra/llcommon/llqueuedthread.cpp b/linden/indra/llcommon/llqueuedthread.cpp index 565836b..2e4324b 100644 --- a/linden/indra/llcommon/llqueuedthread.cpp +++ b/linden/indra/llcommon/llqueuedthread.cpp | |||
@@ -31,10 +31,9 @@ | |||
31 | //============================================================================ | 31 | //============================================================================ |
32 | 32 | ||
33 | // MAIN THREAD | 33 | // MAIN THREAD |
34 | LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded, bool runalways) : | 34 | LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded) : |
35 | LLThread(name), | 35 | LLThread(name), |
36 | mThreaded(threaded), | 36 | mThreaded(threaded), |
37 | mRunAlways(runalways), | ||
38 | mIdleThread(TRUE), | 37 | mIdleThread(TRUE), |
39 | mNextHandle(0) | 38 | mNextHandle(0) |
40 | { | 39 | { |
@@ -47,6 +46,12 @@ LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded, bool runa | |||
47 | // MAIN THREAD | 46 | // MAIN THREAD |
48 | LLQueuedThread::~LLQueuedThread() | 47 | LLQueuedThread::~LLQueuedThread() |
49 | { | 48 | { |
49 | shutdown(); | ||
50 | // ~LLThread() will be called here | ||
51 | } | ||
52 | |||
53 | void LLQueuedThread::shutdown() | ||
54 | { | ||
50 | setQuitting(); | 55 | setQuitting(); |
51 | 56 | ||
52 | unpause(); // MAIN THREAD | 57 | unpause(); // MAIN THREAD |
@@ -73,61 +78,69 @@ LLQueuedThread::~LLQueuedThread() | |||
73 | } | 78 | } |
74 | 79 | ||
75 | QueuedRequest* req; | 80 | QueuedRequest* req; |
81 | S32 active_count = 0; | ||
76 | while ( (req = (QueuedRequest*)mRequestHash.pop_element()) ) | 82 | while ( (req = (QueuedRequest*)mRequestHash.pop_element()) ) |
77 | { | 83 | { |
84 | if (req->getStatus() == STATUS_QUEUED || req->getStatus() == STATUS_INPROGRESS) | ||
85 | { | ||
86 | ++active_count; | ||
87 | } | ||
78 | req->deleteRequest(); | 88 | req->deleteRequest(); |
79 | } | 89 | } |
80 | 90 | if (active_count) | |
81 | // ~LLThread() will be called here | 91 | { |
92 | llwarns << "~LLQueuedThread() called with active requests: " << active_count << llendl; | ||
93 | } | ||
82 | } | 94 | } |
83 | 95 | ||
84 | //---------------------------------------------------------------------------- | 96 | //---------------------------------------------------------------------------- |
85 | 97 | ||
86 | // MAIN THREAD | 98 | // MAIN THREAD |
87 | void LLQueuedThread::update(U32 ms_elapsed) | 99 | // virtual |
100 | S32 LLQueuedThread::update(U32 max_time_ms) | ||
88 | { | 101 | { |
89 | updateQueue(0); | 102 | return updateQueue(max_time_ms); |
90 | } | 103 | } |
91 | 104 | ||
92 | void LLQueuedThread::updateQueue(S32 inc) | 105 | S32 LLQueuedThread::updateQueue(U32 max_time_ms) |
93 | { | 106 | { |
94 | // If mRunAlways == TRUE, unpause the thread whenever we put something into the queue. | 107 | F64 max_time = (F64)max_time_ms * .001; |
95 | // If mRunAlways == FALSE, we only unpause the thread when updateQueue() is called from the main loop (i.e. between rendered frames) | 108 | LLTimer timer; |
96 | 109 | S32 pending = 1; | |
97 | if (inc == 0) // Frame Update | 110 | |
111 | // Frame Update | ||
112 | if (mThreaded) | ||
98 | { | 113 | { |
99 | if (mThreaded) | 114 | pending = getPending(); |
100 | { | 115 | unpause(); |
101 | unpause(); | 116 | } |
102 | wake(); // Wake the thread up if necessary. | 117 | else |
103 | } | 118 | { |
104 | else | 119 | while (pending > 0) |
105 | { | 120 | { |
106 | while (processNextRequest() > 0) | 121 | pending = processNextRequest(); |
107 | ; | 122 | if (max_time && timer.getElapsedTimeF64() > max_time) |
123 | break; | ||
108 | } | 124 | } |
109 | } | 125 | } |
110 | else | 126 | return pending; |
127 | } | ||
128 | |||
129 | void LLQueuedThread::incQueue() | ||
130 | { | ||
131 | // Something has been added to the queue | ||
132 | if (!isPaused()) | ||
111 | { | 133 | { |
112 | // Something has been added to the queue | 134 | if (mThreaded) |
113 | if (mRunAlways) | ||
114 | { | 135 | { |
115 | if (mThreaded) | 136 | wake(); // Wake the thread up if necessary. |
116 | { | ||
117 | wake(); // Wake the thread up if necessary. | ||
118 | } | ||
119 | else | ||
120 | { | ||
121 | while(processNextRequest() > 0) | ||
122 | ; | ||
123 | } | ||
124 | } | 137 | } |
125 | } | 138 | } |
126 | } | 139 | } |
127 | 140 | ||
128 | //virtual | 141 | //virtual |
129 | // May be called from any thread | 142 | // May be called from any thread |
130 | S32 LLQueuedThread::getPending(bool child_thread) | 143 | S32 LLQueuedThread::getPending() |
131 | { | 144 | { |
132 | S32 res; | 145 | S32 res; |
133 | lockData(); | 146 | lockData(); |
@@ -141,7 +154,7 @@ void LLQueuedThread::waitOnPending() | |||
141 | { | 154 | { |
142 | while(1) | 155 | while(1) |
143 | { | 156 | { |
144 | updateQueue(0); | 157 | update(0); |
145 | 158 | ||
146 | if (mIdleThread) | 159 | if (mIdleThread) |
147 | { | 160 | { |
@@ -200,7 +213,7 @@ bool LLQueuedThread::addRequest(QueuedRequest* req) | |||
200 | #endif | 213 | #endif |
201 | unlockData(); | 214 | unlockData(); |
202 | 215 | ||
203 | updateQueue(1); | 216 | incQueue(); |
204 | 217 | ||
205 | return true; | 218 | return true; |
206 | } | 219 | } |
@@ -214,7 +227,7 @@ bool LLQueuedThread::waitForResult(LLQueuedThread::handle_t handle, bool auto_co | |||
214 | bool done = false; | 227 | bool done = false; |
215 | while(!done) | 228 | while(!done) |
216 | { | 229 | { |
217 | updateQueue(0); // unpauses | 230 | update(0); // unpauses |
218 | lockData(); | 231 | lockData(); |
219 | QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); | 232 | QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); |
220 | if (!req) | 233 | if (!req) |
@@ -272,51 +285,47 @@ LLQueuedThread::status_t LLQueuedThread::getRequestStatus(handle_t handle) | |||
272 | return res; | 285 | return res; |
273 | } | 286 | } |
274 | 287 | ||
275 | LLQueuedThread::status_t LLQueuedThread::abortRequest(handle_t handle, U32 flags) | 288 | void LLQueuedThread::abortRequest(handle_t handle, bool autocomplete) |
276 | { | 289 | { |
277 | status_t res = STATUS_EXPIRED; | ||
278 | lockData(); | 290 | lockData(); |
279 | QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); | 291 | QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); |
280 | if (req) | 292 | if (req) |
281 | { | 293 | { |
282 | res = req->abortRequest(flags); | 294 | req->setFlags(FLAG_ABORT | (autocomplete ? FLAG_AUTO_COMPLETE : 0)); |
283 | if ((flags & AUTO_COMPLETE) && (res == STATUS_COMPLETE)) | ||
284 | { | ||
285 | mRequestHash.erase(handle); | ||
286 | req->deleteRequest(); | ||
287 | // check(); | ||
288 | } | ||
289 | #if _DEBUG | ||
290 | // llinfos << llformat("LLQueuedThread::Aborted req [%08d]",handle) << llendl; | ||
291 | #endif | ||
292 | } | 295 | } |
293 | unlockData(); | 296 | unlockData(); |
294 | return res; | ||
295 | } | 297 | } |
296 | 298 | ||
297 | // MAIN thread | 299 | // MAIN thread |
298 | LLQueuedThread::status_t LLQueuedThread::setFlags(handle_t handle, U32 flags) | 300 | void LLQueuedThread::setFlags(handle_t handle, U32 flags) |
299 | { | 301 | { |
300 | status_t res = STATUS_EXPIRED; | ||
301 | lockData(); | 302 | lockData(); |
302 | QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); | 303 | QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); |
303 | if (req) | 304 | if (req) |
304 | { | 305 | { |
305 | res = req->setFlags(flags); | 306 | req->setFlags(flags); |
306 | } | 307 | } |
307 | unlockData(); | 308 | unlockData(); |
308 | return res; | ||
309 | } | 309 | } |
310 | 310 | ||
311 | void LLQueuedThread::setPriority(handle_t handle, U32 priority) | 311 | void LLQueuedThread::setPriority(handle_t handle, U32 priority) |
312 | { | 312 | { |
313 | lockData(); | 313 | lockData(); |
314 | QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); | 314 | QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); |
315 | if (req && (req->getStatus() == STATUS_QUEUED)) | 315 | if (req) |
316 | { | 316 | { |
317 | llverify(mRequestQueue.erase(req) == 1); | 317 | if(req->getStatus() == STATUS_INPROGRESS) |
318 | req->setPriority(priority); | 318 | { |
319 | mRequestQueue.insert(req); | 319 | // not in list |
320 | req->setPriority(priority); | ||
321 | } | ||
322 | else if(req->getStatus() == STATUS_QUEUED) | ||
323 | { | ||
324 | // remove from list then re-insert | ||
325 | llverify(mRequestQueue.erase(req) == 1); | ||
326 | req->setPriority(priority); | ||
327 | mRequestQueue.insert(req); | ||
328 | } | ||
320 | } | 329 | } |
321 | unlockData(); | 330 | unlockData(); |
322 | } | 331 | } |
@@ -328,9 +337,10 @@ bool LLQueuedThread::completeRequest(handle_t handle) | |||
328 | QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); | 337 | QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); |
329 | if (req) | 338 | if (req) |
330 | { | 339 | { |
331 | llassert(req->getStatus() != STATUS_QUEUED && req->getStatus() != STATUS_ABORT); | 340 | llassert_always(req->getStatus() != STATUS_QUEUED); |
341 | llassert_always(req->getStatus() != STATUS_INPROGRESS); | ||
332 | #if _DEBUG | 342 | #if _DEBUG |
333 | // llinfos << llformat("LLQueuedThread::Completed req [%08d]",handle) << llendl; | 343 | // llinfos << llformat("LLQueuedThread::Completed req [%08d]",handle) << llendl; |
334 | #endif | 344 | #endif |
335 | mRequestHash.erase(handle); | 345 | mRequestHash.erase(handle); |
336 | req->deleteRequest(); | 346 | req->deleteRequest(); |
@@ -364,28 +374,34 @@ bool LLQueuedThread::check() | |||
364 | //============================================================================ | 374 | //============================================================================ |
365 | // Runs on its OWN thread | 375 | // Runs on its OWN thread |
366 | 376 | ||
367 | int LLQueuedThread::processNextRequest() | 377 | S32 LLQueuedThread::processNextRequest() |
368 | { | 378 | { |
369 | QueuedRequest *req = 0; | 379 | QueuedRequest *req; |
370 | // Get next request from pool | 380 | // Get next request from pool |
371 | lockData(); | 381 | lockData(); |
372 | while(1) | 382 | while(1) |
373 | { | 383 | { |
374 | if (!mRequestQueue.empty()) | 384 | req = NULL; |
385 | if (mRequestQueue.empty()) | ||
375 | { | 386 | { |
376 | req = *mRequestQueue.begin(); | 387 | break; |
377 | mRequestQueue.erase(mRequestQueue.begin()); | ||
378 | } | 388 | } |
379 | if (req && req->getStatus() == STATUS_ABORT) | 389 | req = *mRequestQueue.begin(); |
390 | mRequestQueue.erase(mRequestQueue.begin()); | ||
391 | if ((req->getFlags() & FLAG_ABORT) || (mStatus == QUITTING)) | ||
380 | { | 392 | { |
381 | req->setStatus(STATUS_ABORTED); | 393 | req->setStatus(STATUS_ABORTED); |
382 | req = 0; | 394 | req->finishRequest(false); |
383 | } | 395 | if (req->getFlags() & FLAG_AUTO_COMPLETE) |
384 | else | 396 | { |
385 | { | 397 | mRequestHash.erase(req); |
386 | llassert (!req || req->getStatus() == STATUS_QUEUED) | 398 | req->deleteRequest(); |
387 | break; | 399 | // check(); |
400 | } | ||
401 | continue; | ||
388 | } | 402 | } |
403 | llassert_always(req->getStatus() == STATUS_QUEUED); | ||
404 | break; | ||
389 | } | 405 | } |
390 | if (req) | 406 | if (req) |
391 | { | 407 | { |
@@ -393,22 +409,22 @@ int LLQueuedThread::processNextRequest() | |||
393 | } | 409 | } |
394 | unlockData(); | 410 | unlockData(); |
395 | 411 | ||
396 | // This is the only place we will cal req->setStatus() after | 412 | // This is the only place we will call req->setStatus() after |
397 | // it has initially been seet to STATUS_QUEUED, so it is | 413 | // it has initially been seet to STATUS_QUEUED, so it is |
398 | // safe to access req. | 414 | // safe to access req. |
399 | if (req) | 415 | if (req) |
400 | { | 416 | { |
401 | // process request | 417 | // process request |
402 | bool complete = processRequest(req); | 418 | bool complete = req->processRequest(); |
403 | 419 | ||
404 | if (complete) | 420 | if (complete) |
405 | { | 421 | { |
406 | lockData(); | 422 | lockData(); |
407 | req->setStatus(STATUS_COMPLETE); | 423 | req->setStatus(STATUS_COMPLETE); |
408 | req->finishRequest(); | 424 | req->finishRequest(true); |
409 | if (req->getFlags() & AUTO_COMPLETE) | 425 | if (req->getFlags() & FLAG_AUTO_COMPLETE) |
410 | { | 426 | { |
411 | llverify(mRequestHash.erase(req)) | 427 | mRequestHash.erase(req); |
412 | req->deleteRequest(); | 428 | req->deleteRequest(); |
413 | // check(); | 429 | // check(); |
414 | } | 430 | } |
@@ -419,12 +435,18 @@ int LLQueuedThread::processNextRequest() | |||
419 | lockData(); | 435 | lockData(); |
420 | req->setStatus(STATUS_QUEUED); | 436 | req->setStatus(STATUS_QUEUED); |
421 | mRequestQueue.insert(req); | 437 | mRequestQueue.insert(req); |
438 | U32 priority = req->getPriority(); | ||
422 | unlockData(); | 439 | unlockData(); |
440 | if (priority < PRIORITY_NORMAL) | ||
441 | { | ||
442 | ms_sleep(1); // sleep the thread a little | ||
443 | } | ||
423 | } | 444 | } |
424 | } | 445 | } |
425 | 446 | ||
426 | int res; | 447 | S32 res; |
427 | if (getPending(true) == 0) | 448 | S32 pending = getPending(); |
449 | if (pending == 0) | ||
428 | { | 450 | { |
429 | if (isQuitting()) | 451 | if (isQuitting()) |
430 | { | 452 | { |
@@ -437,7 +459,7 @@ int LLQueuedThread::processNextRequest() | |||
437 | } | 459 | } |
438 | else | 460 | else |
439 | { | 461 | { |
440 | res = 1; | 462 | res = pending; |
441 | } | 463 | } |
442 | return res; | 464 | return res; |
443 | } | 465 | } |
@@ -445,13 +467,14 @@ int LLQueuedThread::processNextRequest() | |||
445 | bool LLQueuedThread::runCondition() | 467 | bool LLQueuedThread::runCondition() |
446 | { | 468 | { |
447 | // mRunCondition must be locked here | 469 | // mRunCondition must be locked here |
448 | return (mRequestQueue.empty() && mIdleThread) ? FALSE : TRUE; | 470 | if (mRequestQueue.empty() && mIdleThread) |
471 | return false; | ||
472 | else | ||
473 | return true; | ||
449 | } | 474 | } |
450 | 475 | ||
451 | void LLQueuedThread::run() | 476 | void LLQueuedThread::run() |
452 | { | 477 | { |
453 | llinfos << "QUEUED THREAD STARTING" << llendl; | ||
454 | |||
455 | while (1) | 478 | while (1) |
456 | { | 479 | { |
457 | // this will block on the condition until runCondition() returns true, the thread is unpaused, or the thread leaves the RUNNING state. | 480 | // this will block on the condition until runCondition() returns true, the thread is unpaused, or the thread leaves the RUNNING state. |
@@ -474,6 +497,8 @@ void LLQueuedThread::run() | |||
474 | { | 497 | { |
475 | break; | 498 | break; |
476 | } | 499 | } |
500 | |||
501 | //LLThread::yield(); // thread should yield after each request | ||
477 | } | 502 | } |
478 | 503 | ||
479 | llinfos << "QUEUED THREAD " << mName << " EXITING." << llendl; | 504 | llinfos << "QUEUED THREAD " << mName << " EXITING." << llendl; |
@@ -491,20 +516,18 @@ LLQueuedThread::QueuedRequest::QueuedRequest(LLQueuedThread::handle_t handle, U3 | |||
491 | 516 | ||
492 | LLQueuedThread::QueuedRequest::~QueuedRequest() | 517 | LLQueuedThread::QueuedRequest::~QueuedRequest() |
493 | { | 518 | { |
494 | if (mStatus != STATUS_DELETE) | 519 | llassert_always(mStatus == STATUS_DELETE); |
495 | { | ||
496 | llerrs << "Attemt to directly delete a LLQueuedThread::QueuedRequest; use deleteRequest()" << llendl; | ||
497 | } | ||
498 | } | 520 | } |
499 | 521 | ||
500 | //virtual | 522 | //virtual |
501 | void LLQueuedThread::QueuedRequest::finishRequest() | 523 | void LLQueuedThread::QueuedRequest::finishRequest(bool completed) |
502 | { | 524 | { |
503 | } | 525 | } |
504 | 526 | ||
505 | //virtual | 527 | //virtual |
506 | void LLQueuedThread::QueuedRequest::deleteRequest() | 528 | void LLQueuedThread::QueuedRequest::deleteRequest() |
507 | { | 529 | { |
530 | llassert_always(mStatus != STATUS_INPROGRESS); | ||
508 | setStatus(STATUS_DELETE); | 531 | setStatus(STATUS_DELETE); |
509 | delete this; | 532 | delete this; |
510 | } | 533 | } |