/** * @file llqueuedthread.cpp * * Copyright (c) 2004-2007, Linden Research, Inc. * * The source code in this file ("Source Code") is provided by Linden Lab * to you under the terms of the GNU General Public License, version 2.0 * ("GPL"), unless you have obtained a separate licensing agreement * ("Other License"), formally executed by you and Linden Lab. Terms of * the GPL can be found in doc/GPL-license.txt in this distribution, or * online at http://secondlife.com/developers/opensource/gplv2 * * There are special exceptions to the terms and conditions of the GPL as * it is applied to this Source Code. View the full text of the exception * in the file doc/FLOSS-exception.txt in this software distribution, or * online at http://secondlife.com/developers/opensource/flossexception * * By copying, modifying or distributing this software, you acknowledge * that you have read and understood your obligations described above, * and agree to abide by those obligations. * * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY, * COMPLETENESS OR PERFORMANCE. */ #include "linden_common.h" #include "llqueuedthread.h" #include "llstl.h" //============================================================================ // MAIN THREAD LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded, bool runalways) : LLThread(name), mThreaded(threaded), mRunAlways(runalways), mIdleThread(TRUE), mNextHandle(0) { if (mThreaded) { start(); } } // MAIN THREAD LLQueuedThread::~LLQueuedThread() { setQuitting(); unpause(); // MAIN THREAD if (mThreaded) { S32 timeout = 100; for ( ; timeout>0; timeout--) { if (isStopped()) { break; } ms_sleep(100); LLThread::yield(); } if (timeout == 0) { llwarns << "~LLQueuedThread (" << mName << ") timed out!" << llendl; } } else { mStatus = STOPPED; } QueuedRequest* req; while ( (req = (QueuedRequest*)mRequestHash.pop_element()) ) { req->deleteRequest(); } // ~LLThread() will be called here } //---------------------------------------------------------------------------- // MAIN THREAD void LLQueuedThread::update(U32 ms_elapsed) { updateQueue(0); } void LLQueuedThread::updateQueue(S32 inc) { // If mRunAlways == TRUE, unpause the thread whenever we put something into the queue. // If mRunAlways == FALSE, we only unpause the thread when updateQueue() is called from the main loop (i.e. between rendered frames) if (inc == 0) // Frame Update { if (mThreaded) { unpause(); wake(); // Wake the thread up if necessary. } else { while (processNextRequest() > 0) ; } } else { // Something has been added to the queue if (mRunAlways) { if (mThreaded) { wake(); // Wake the thread up if necessary. } else { while(processNextRequest() > 0) ; } } } } //virtual // May be called from any thread S32 LLQueuedThread::getPending(bool child_thread) { S32 res; lockData(); res = mRequestQueue.size(); unlockData(); return res; } // MAIN thread void LLQueuedThread::waitOnPending() { while(1) { updateQueue(0); if (mIdleThread) { break; } if (mThreaded) { yield(); } } return; } // MAIN thread void LLQueuedThread::printQueueStats() { lockData(); if (!mRequestQueue.empty()) { QueuedRequest *req = *mRequestQueue.begin(); llinfos << llformat("Pending Requests:%d Current status:%d", mRequestQueue.size(), req->getStatus()) << llendl; } else { llinfos << "Queued Thread Idle" << llendl; } unlockData(); } // MAIN thread LLQueuedThread::handle_t LLQueuedThread::generateHandle() { lockData(); while ((mNextHandle == nullHandle()) || (mRequestHash.find(mNextHandle))) { mNextHandle++; } unlockData(); return mNextHandle++; } // MAIN thread bool LLQueuedThread::addRequest(QueuedRequest* req) { if (mStatus == QUITTING) { return false; } lockData(); req->setStatus(STATUS_QUEUED); mRequestQueue.insert(req); mRequestHash.insert(req); #if _DEBUG // llinfos << llformat("LLQueuedThread::Added req [%08d]",handle) << llendl; #endif unlockData(); updateQueue(1); return true; } // MAIN thread bool LLQueuedThread::waitForResult(LLQueuedThread::handle_t handle, bool auto_complete) { llassert (handle != nullHandle()) bool res = false; bool waspaused = isPaused(); bool done = false; while(!done) { updateQueue(0); // unpauses lockData(); QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); if (!req) { done = true; // request does not exist } else if (req->getStatus() == STATUS_COMPLETE) { res = true; if (auto_complete) { mRequestHash.erase(handle); req->deleteRequest(); // check(); } done = true; } unlockData(); if (!done && mThreaded) { yield(); } } if (waspaused) { pause(); } return res; } // MAIN thread LLQueuedThread::QueuedRequest* LLQueuedThread::getRequest(handle_t handle) { if (handle == nullHandle()) { return 0; } lockData(); QueuedRequest* res = (QueuedRequest*)mRequestHash.find(handle); unlockData(); return res; } LLQueuedThread::status_t LLQueuedThread::getRequestStatus(handle_t handle) { status_t res = STATUS_EXPIRED; lockData(); QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); if (req) { res = req->getStatus(); } unlockData(); return res; } LLQueuedThread::status_t LLQueuedThread::abortRequest(handle_t handle, U32 flags) { status_t res = STATUS_EXPIRED; lockData(); QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); if (req) { res = req->abortRequest(flags); if ((flags & AUTO_COMPLETE) && (res == STATUS_COMPLETE)) { mRequestHash.erase(handle); req->deleteRequest(); // check(); } #if _DEBUG // llinfos << llformat("LLQueuedThread::Aborted req [%08d]",handle) << llendl; #endif } unlockData(); return res; } // MAIN thread LLQueuedThread::status_t LLQueuedThread::setFlags(handle_t handle, U32 flags) { status_t res = STATUS_EXPIRED; lockData(); QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); if (req) { res = req->setFlags(flags); } unlockData(); return res; } void LLQueuedThread::setPriority(handle_t handle, U32 priority) { lockData(); QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); if (req && (req->getStatus() == STATUS_QUEUED)) { llverify(mRequestQueue.erase(req) == 1); req->setPriority(priority); mRequestQueue.insert(req); } unlockData(); } bool LLQueuedThread::completeRequest(handle_t handle) { bool res = false; lockData(); QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); if (req) { llassert(req->getStatus() != STATUS_QUEUED && req->getStatus() != STATUS_ABORT); #if _DEBUG // llinfos << llformat("LLQueuedThread::Completed req [%08d]",handle) << llendl; #endif mRequestHash.erase(handle); req->deleteRequest(); // check(); res = true; } unlockData(); return res; } bool LLQueuedThread::check() { #if 0 // not a reliable check once mNextHandle wraps, just for quick and dirty debugging for (int i=0; i* entry = mRequestHash.get_element_at_index(i); while (entry) { if (entry->getHashKey() > mNextHandle) { llerrs << "Hash Error" << llendl; return false; } entry = entry->getNextEntry(); } } #endif return true; } //============================================================================ // Runs on its OWN thread int LLQueuedThread::processNextRequest() { QueuedRequest *req = 0; // Get next request from pool lockData(); while(1) { if (!mRequestQueue.empty()) { req = *mRequestQueue.begin(); mRequestQueue.erase(mRequestQueue.begin()); } if (req && req->getStatus() == STATUS_ABORT) { req->setStatus(STATUS_ABORTED); req = 0; } else { llassert (!req || req->getStatus() == STATUS_QUEUED) break; } } if (req) { req->setStatus(STATUS_INPROGRESS); } unlockData(); // This is the only place we will cal req->setStatus() after // it has initially been seet to STATUS_QUEUED, so it is // safe to access req. if (req) { // process request bool complete = processRequest(req); if (complete) { lockData(); req->setStatus(STATUS_COMPLETE); req->finishRequest(); if (req->getFlags() & AUTO_COMPLETE) { llverify(mRequestHash.erase(req)) req->deleteRequest(); // check(); } unlockData(); } else { lockData(); req->setStatus(STATUS_QUEUED); mRequestQueue.insert(req); unlockData(); } } int res; if (getPending(true) == 0) { if (isQuitting()) { res = -1; // exit thread } else { res = 0; } } else { res = 1; } return res; } bool LLQueuedThread::runCondition() { // mRunCondition must be locked here return (mRequestQueue.empty() && mIdleThread) ? FALSE : TRUE; } void LLQueuedThread::run() { llinfos << "QUEUED THREAD STARTING" << llendl; while (1) { // this will block on the condition until runCondition() returns true, the thread is unpaused, or the thread leaves the RUNNING state. checkPause(); if(isQuitting()) break; //llinfos << "QUEUED THREAD RUNNING, queue size = " << mRequestQueue.size() << llendl; mIdleThread = FALSE; int res = processNextRequest(); if (res == 0) { mIdleThread = TRUE; } if (res < 0) // finished working and want to exit { break; } } llinfos << "QUEUED THREAD " << mName << " EXITING." << llendl; } //============================================================================ LLQueuedThread::QueuedRequest::QueuedRequest(LLQueuedThread::handle_t handle, U32 priority, U32 flags) : LLSimpleHashEntry(handle), mStatus(STATUS_UNKNOWN), mPriority(priority), mFlags(flags) { } LLQueuedThread::QueuedRequest::~QueuedRequest() { if (mStatus != STATUS_DELETE) { llerrs << "Attemt to directly delete a LLQueuedThread::QueuedRequest; use deleteRequest()" << llendl; } } //virtual void LLQueuedThread::QueuedRequest::finishRequest() { } //virtual void LLQueuedThread::QueuedRequest::deleteRequest() { setStatus(STATUS_DELETE); delete this; }