aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/linden/indra/llcommon/llqueuedthread.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'linden/indra/llcommon/llqueuedthread.cpp')
-rw-r--r--linden/indra/llcommon/llqueuedthread.cpp195
1 files changed, 109 insertions, 86 deletions
diff --git a/linden/indra/llcommon/llqueuedthread.cpp b/linden/indra/llcommon/llqueuedthread.cpp
index 565836b..2e4324b 100644
--- a/linden/indra/llcommon/llqueuedthread.cpp
+++ b/linden/indra/llcommon/llqueuedthread.cpp
@@ -31,10 +31,9 @@
31//============================================================================ 31//============================================================================
32 32
33// MAIN THREAD 33// MAIN THREAD
34LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded, bool runalways) : 34LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded) :
35 LLThread(name), 35 LLThread(name),
36 mThreaded(threaded), 36 mThreaded(threaded),
37 mRunAlways(runalways),
38 mIdleThread(TRUE), 37 mIdleThread(TRUE),
39 mNextHandle(0) 38 mNextHandle(0)
40{ 39{
@@ -47,6 +46,12 @@ LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded, bool runa
47// MAIN THREAD 46// MAIN THREAD
48LLQueuedThread::~LLQueuedThread() 47LLQueuedThread::~LLQueuedThread()
49{ 48{
49 shutdown();
50 // ~LLThread() will be called here
51}
52
53void LLQueuedThread::shutdown()
54{
50 setQuitting(); 55 setQuitting();
51 56
52 unpause(); // MAIN THREAD 57 unpause(); // MAIN THREAD
@@ -73,61 +78,69 @@ LLQueuedThread::~LLQueuedThread()
73 } 78 }
74 79
75 QueuedRequest* req; 80 QueuedRequest* req;
81 S32 active_count = 0;
76 while ( (req = (QueuedRequest*)mRequestHash.pop_element()) ) 82 while ( (req = (QueuedRequest*)mRequestHash.pop_element()) )
77 { 83 {
84 if (req->getStatus() == STATUS_QUEUED || req->getStatus() == STATUS_INPROGRESS)
85 {
86 ++active_count;
87 }
78 req->deleteRequest(); 88 req->deleteRequest();
79 } 89 }
80 90 if (active_count)
81 // ~LLThread() will be called here 91 {
92 llwarns << "~LLQueuedThread() called with active requests: " << active_count << llendl;
93 }
82} 94}
83 95
84//---------------------------------------------------------------------------- 96//----------------------------------------------------------------------------
85 97
86// MAIN THREAD 98// MAIN THREAD
87void LLQueuedThread::update(U32 ms_elapsed) 99// virtual
100S32 LLQueuedThread::update(U32 max_time_ms)
88{ 101{
89 updateQueue(0); 102 return updateQueue(max_time_ms);
90} 103}
91 104
92void LLQueuedThread::updateQueue(S32 inc) 105S32 LLQueuedThread::updateQueue(U32 max_time_ms)
93{ 106{
94 // If mRunAlways == TRUE, unpause the thread whenever we put something into the queue. 107 F64 max_time = (F64)max_time_ms * .001;
95 // If mRunAlways == FALSE, we only unpause the thread when updateQueue() is called from the main loop (i.e. between rendered frames) 108 LLTimer timer;
96 109 S32 pending = 1;
97 if (inc == 0) // Frame Update 110
111 // Frame Update
112 if (mThreaded)
98 { 113 {
99 if (mThreaded) 114 pending = getPending();
100 { 115 unpause();
101 unpause(); 116 }
102 wake(); // Wake the thread up if necessary. 117 else
103 } 118 {
104 else 119 while (pending > 0)
105 { 120 {
106 while (processNextRequest() > 0) 121 pending = processNextRequest();
107 ; 122 if (max_time && timer.getElapsedTimeF64() > max_time)
123 break;
108 } 124 }
109 } 125 }
110 else 126 return pending;
127}
128
129void LLQueuedThread::incQueue()
130{
131 // Something has been added to the queue
132 if (!isPaused())
111 { 133 {
112 // Something has been added to the queue 134 if (mThreaded)
113 if (mRunAlways)
114 { 135 {
115 if (mThreaded) 136 wake(); // Wake the thread up if necessary.
116 {
117 wake(); // Wake the thread up if necessary.
118 }
119 else
120 {
121 while(processNextRequest() > 0)
122 ;
123 }
124 } 137 }
125 } 138 }
126} 139}
127 140
128//virtual 141//virtual
129// May be called from any thread 142// May be called from any thread
130S32 LLQueuedThread::getPending(bool child_thread) 143S32 LLQueuedThread::getPending()
131{ 144{
132 S32 res; 145 S32 res;
133 lockData(); 146 lockData();
@@ -141,7 +154,7 @@ void LLQueuedThread::waitOnPending()
141{ 154{
142 while(1) 155 while(1)
143 { 156 {
144 updateQueue(0); 157 update(0);
145 158
146 if (mIdleThread) 159 if (mIdleThread)
147 { 160 {
@@ -200,7 +213,7 @@ bool LLQueuedThread::addRequest(QueuedRequest* req)
200#endif 213#endif
201 unlockData(); 214 unlockData();
202 215
203 updateQueue(1); 216 incQueue();
204 217
205 return true; 218 return true;
206} 219}
@@ -214,7 +227,7 @@ bool LLQueuedThread::waitForResult(LLQueuedThread::handle_t handle, bool auto_co
214 bool done = false; 227 bool done = false;
215 while(!done) 228 while(!done)
216 { 229 {
217 updateQueue(0); // unpauses 230 update(0); // unpauses
218 lockData(); 231 lockData();
219 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); 232 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
220 if (!req) 233 if (!req)
@@ -272,51 +285,47 @@ LLQueuedThread::status_t LLQueuedThread::getRequestStatus(handle_t handle)
272 return res; 285 return res;
273} 286}
274 287
275LLQueuedThread::status_t LLQueuedThread::abortRequest(handle_t handle, U32 flags) 288void LLQueuedThread::abortRequest(handle_t handle, bool autocomplete)
276{ 289{
277 status_t res = STATUS_EXPIRED;
278 lockData(); 290 lockData();
279 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); 291 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
280 if (req) 292 if (req)
281 { 293 {
282 res = req->abortRequest(flags); 294 req->setFlags(FLAG_ABORT | (autocomplete ? FLAG_AUTO_COMPLETE : 0));
283 if ((flags & AUTO_COMPLETE) && (res == STATUS_COMPLETE))
284 {
285 mRequestHash.erase(handle);
286 req->deleteRequest();
287// check();
288 }
289#if _DEBUG
290// llinfos << llformat("LLQueuedThread::Aborted req [%08d]",handle) << llendl;
291#endif
292 } 295 }
293 unlockData(); 296 unlockData();
294 return res;
295} 297}
296 298
297// MAIN thread 299// MAIN thread
298LLQueuedThread::status_t LLQueuedThread::setFlags(handle_t handle, U32 flags) 300void LLQueuedThread::setFlags(handle_t handle, U32 flags)
299{ 301{
300 status_t res = STATUS_EXPIRED;
301 lockData(); 302 lockData();
302 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); 303 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
303 if (req) 304 if (req)
304 { 305 {
305 res = req->setFlags(flags); 306 req->setFlags(flags);
306 } 307 }
307 unlockData(); 308 unlockData();
308 return res;
309} 309}
310 310
311void LLQueuedThread::setPriority(handle_t handle, U32 priority) 311void LLQueuedThread::setPriority(handle_t handle, U32 priority)
312{ 312{
313 lockData(); 313 lockData();
314 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); 314 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
315 if (req && (req->getStatus() == STATUS_QUEUED)) 315 if (req)
316 { 316 {
317 llverify(mRequestQueue.erase(req) == 1); 317 if(req->getStatus() == STATUS_INPROGRESS)
318 req->setPriority(priority); 318 {
319 mRequestQueue.insert(req); 319 // not in list
320 req->setPriority(priority);
321 }
322 else if(req->getStatus() == STATUS_QUEUED)
323 {
324 // remove from list then re-insert
325 llverify(mRequestQueue.erase(req) == 1);
326 req->setPriority(priority);
327 mRequestQueue.insert(req);
328 }
320 } 329 }
321 unlockData(); 330 unlockData();
322} 331}
@@ -328,9 +337,10 @@ bool LLQueuedThread::completeRequest(handle_t handle)
328 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); 337 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
329 if (req) 338 if (req)
330 { 339 {
331 llassert(req->getStatus() != STATUS_QUEUED && req->getStatus() != STATUS_ABORT); 340 llassert_always(req->getStatus() != STATUS_QUEUED);
341 llassert_always(req->getStatus() != STATUS_INPROGRESS);
332#if _DEBUG 342#if _DEBUG
333// llinfos << llformat("LLQueuedThread::Completed req [%08d]",handle) << llendl; 343// llinfos << llformat("LLQueuedThread::Completed req [%08d]",handle) << llendl;
334#endif 344#endif
335 mRequestHash.erase(handle); 345 mRequestHash.erase(handle);
336 req->deleteRequest(); 346 req->deleteRequest();
@@ -364,28 +374,34 @@ bool LLQueuedThread::check()
364//============================================================================ 374//============================================================================
365// Runs on its OWN thread 375// Runs on its OWN thread
366 376
367int LLQueuedThread::processNextRequest() 377S32 LLQueuedThread::processNextRequest()
368{ 378{
369 QueuedRequest *req = 0; 379 QueuedRequest *req;
370 // Get next request from pool 380 // Get next request from pool
371 lockData(); 381 lockData();
372 while(1) 382 while(1)
373 { 383 {
374 if (!mRequestQueue.empty()) 384 req = NULL;
385 if (mRequestQueue.empty())
375 { 386 {
376 req = *mRequestQueue.begin(); 387 break;
377 mRequestQueue.erase(mRequestQueue.begin());
378 } 388 }
379 if (req && req->getStatus() == STATUS_ABORT) 389 req = *mRequestQueue.begin();
390 mRequestQueue.erase(mRequestQueue.begin());
391 if ((req->getFlags() & FLAG_ABORT) || (mStatus == QUITTING))
380 { 392 {
381 req->setStatus(STATUS_ABORTED); 393 req->setStatus(STATUS_ABORTED);
382 req = 0; 394 req->finishRequest(false);
383 } 395 if (req->getFlags() & FLAG_AUTO_COMPLETE)
384 else 396 {
385 { 397 mRequestHash.erase(req);
386 llassert (!req || req->getStatus() == STATUS_QUEUED) 398 req->deleteRequest();
387 break; 399// check();
400 }
401 continue;
388 } 402 }
403 llassert_always(req->getStatus() == STATUS_QUEUED);
404 break;
389 } 405 }
390 if (req) 406 if (req)
391 { 407 {
@@ -393,22 +409,22 @@ int LLQueuedThread::processNextRequest()
393 } 409 }
394 unlockData(); 410 unlockData();
395 411
396 // This is the only place we will cal req->setStatus() after 412 // This is the only place we will call req->setStatus() after
397 // it has initially been seet to STATUS_QUEUED, so it is 413 // it has initially been seet to STATUS_QUEUED, so it is
398 // safe to access req. 414 // safe to access req.
399 if (req) 415 if (req)
400 { 416 {
401 // process request 417 // process request
402 bool complete = processRequest(req); 418 bool complete = req->processRequest();
403 419
404 if (complete) 420 if (complete)
405 { 421 {
406 lockData(); 422 lockData();
407 req->setStatus(STATUS_COMPLETE); 423 req->setStatus(STATUS_COMPLETE);
408 req->finishRequest(); 424 req->finishRequest(true);
409 if (req->getFlags() & AUTO_COMPLETE) 425 if (req->getFlags() & FLAG_AUTO_COMPLETE)
410 { 426 {
411 llverify(mRequestHash.erase(req)) 427 mRequestHash.erase(req);
412 req->deleteRequest(); 428 req->deleteRequest();
413// check(); 429// check();
414 } 430 }
@@ -419,12 +435,18 @@ int LLQueuedThread::processNextRequest()
419 lockData(); 435 lockData();
420 req->setStatus(STATUS_QUEUED); 436 req->setStatus(STATUS_QUEUED);
421 mRequestQueue.insert(req); 437 mRequestQueue.insert(req);
438 U32 priority = req->getPriority();
422 unlockData(); 439 unlockData();
440 if (priority < PRIORITY_NORMAL)
441 {
442 ms_sleep(1); // sleep the thread a little
443 }
423 } 444 }
424 } 445 }
425 446
426 int res; 447 S32 res;
427 if (getPending(true) == 0) 448 S32 pending = getPending();
449 if (pending == 0)
428 { 450 {
429 if (isQuitting()) 451 if (isQuitting())
430 { 452 {
@@ -437,7 +459,7 @@ int LLQueuedThread::processNextRequest()
437 } 459 }
438 else 460 else
439 { 461 {
440 res = 1; 462 res = pending;
441 } 463 }
442 return res; 464 return res;
443} 465}
@@ -445,13 +467,14 @@ int LLQueuedThread::processNextRequest()
445bool LLQueuedThread::runCondition() 467bool LLQueuedThread::runCondition()
446{ 468{
447 // mRunCondition must be locked here 469 // mRunCondition must be locked here
448 return (mRequestQueue.empty() && mIdleThread) ? FALSE : TRUE; 470 if (mRequestQueue.empty() && mIdleThread)
471 return false;
472 else
473 return true;
449} 474}
450 475
451void LLQueuedThread::run() 476void LLQueuedThread::run()
452{ 477{
453 llinfos << "QUEUED THREAD STARTING" << llendl;
454
455 while (1) 478 while (1)
456 { 479 {
457 // this will block on the condition until runCondition() returns true, the thread is unpaused, or the thread leaves the RUNNING state. 480 // this will block on the condition until runCondition() returns true, the thread is unpaused, or the thread leaves the RUNNING state.
@@ -474,6 +497,8 @@ void LLQueuedThread::run()
474 { 497 {
475 break; 498 break;
476 } 499 }
500
501 //LLThread::yield(); // thread should yield after each request
477 } 502 }
478 503
479 llinfos << "QUEUED THREAD " << mName << " EXITING." << llendl; 504 llinfos << "QUEUED THREAD " << mName << " EXITING." << llendl;
@@ -491,20 +516,18 @@ LLQueuedThread::QueuedRequest::QueuedRequest(LLQueuedThread::handle_t handle, U3
491 516
492LLQueuedThread::QueuedRequest::~QueuedRequest() 517LLQueuedThread::QueuedRequest::~QueuedRequest()
493{ 518{
494 if (mStatus != STATUS_DELETE) 519 llassert_always(mStatus == STATUS_DELETE);
495 {
496 llerrs << "Attemt to directly delete a LLQueuedThread::QueuedRequest; use deleteRequest()" << llendl;
497 }
498} 520}
499 521
500//virtual 522//virtual
501void LLQueuedThread::QueuedRequest::finishRequest() 523void LLQueuedThread::QueuedRequest::finishRequest(bool completed)
502{ 524{
503} 525}
504 526
505//virtual 527//virtual
506void LLQueuedThread::QueuedRequest::deleteRequest() 528void LLQueuedThread::QueuedRequest::deleteRequest()
507{ 529{
530 llassert_always(mStatus != STATUS_INPROGRESS);
508 setStatus(STATUS_DELETE); 531 setStatus(STATUS_DELETE);
509 delete this; 532 delete this;
510} 533}