aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/linden/indra/llcommon/llworkerthread.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'linden/indra/llcommon/llworkerthread.cpp')
-rw-r--r--linden/indra/llcommon/llworkerthread.cpp325
1 files changed, 196 insertions, 129 deletions
diff --git a/linden/indra/llcommon/llworkerthread.cpp b/linden/indra/llcommon/llworkerthread.cpp
index 3190046..4f99393 100644
--- a/linden/indra/llcommon/llworkerthread.cpp
+++ b/linden/indra/llcommon/llworkerthread.cpp
@@ -33,98 +33,86 @@
33#endif 33#endif
34 34
35//============================================================================ 35//============================================================================
36
37/*static*/ LLWorkerThread* LLWorkerThread::sLocal = NULL;
38/*static*/ std::set<LLWorkerThread*> LLWorkerThread::sThreadList;
39
40//============================================================================
41// Run on MAIN thread 36// Run on MAIN thread
42 37
43//static 38LLWorkerThread::LLWorkerThread(const std::string& name, bool threaded) :
44void LLWorkerThread::initClass(bool local_is_threaded, bool local_run_always) 39 LLQueuedThread(name, threaded),
40 mWorkerAPRPoolp(NULL)
45{ 41{
46 if (!sLocal) 42 apr_pool_create(&mWorkerAPRPoolp, NULL);
47 { 43 mDeleteMutex = new LLMutex(getAPRPool());
48 sLocal = new LLWorkerThread(local_is_threaded, local_run_always);
49 }
50} 44}
51 45
52//static 46LLWorkerThread::~LLWorkerThread()
53void LLWorkerThread::cleanupClass()
54{ 47{
55 if (sLocal) 48 // Delete any workers in the delete queue (should be safe - had better be!)
49 if (!mDeleteList.empty())
56 { 50 {
57 while (sLocal->getPending()) 51 llwarns << "Worker Thread: " << mName << " destroyed with " << mDeleteList.size()
58 { 52 << " entries in delete list." << llendl;
59 sLocal->update(0);
60 }
61 delete sLocal;
62 sLocal = NULL;
63 llassert(sThreadList.size() == 0);
64 } 53 }
65}
66 54
67//static 55 delete mDeleteMutex;
68S32 LLWorkerThread::updateClass(U32 ms_elapsed) 56
69{ 57 // ~LLQueuedThread() will be called here
70 for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++)
71 {
72 (*iter)->update(ms_elapsed);
73 }
74 return getAllPending();
75} 58}
76 59
77//static 60// virtual
78S32 LLWorkerThread::getAllPending() 61S32 LLWorkerThread::update(U32 max_time_ms)
79{ 62{
80 S32 res = 0; 63 S32 res = LLQueuedThread::update(max_time_ms);
81 for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++) 64 // Delete scheduled workers
65 std::vector<LLWorkerClass*> delete_list;
66 std::vector<LLWorkerClass*> abort_list;
67 mDeleteMutex->lock();
68 for (delete_list_t::iterator iter = mDeleteList.begin();
69 iter != mDeleteList.end(); )
82 { 70 {
83 res += (*iter)->getPending(); 71 delete_list_t::iterator curiter = iter++;
72 LLWorkerClass* worker = *curiter;
73 if (worker->deleteOK())
74 {
75 if (worker->getFlags(LLWorkerClass::WCF_WORK_FINISHED))
76 {
77 delete_list.push_back(worker);
78 mDeleteList.erase(curiter);
79 }
80 else if (!worker->getFlags(LLWorkerClass::WCF_ABORT_REQUESTED))
81 {
82 abort_list.push_back(worker);
83 }
84 }
84 } 85 }
85 return res; 86 mDeleteMutex->unlock();
86} 87 // abort and delete after releasing mutex
87 88 for (std::vector<LLWorkerClass*>::iterator iter = abort_list.begin();
88//static 89 iter != abort_list.end(); ++iter)
89void LLWorkerThread::pauseAll()
90{
91 for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++)
92 { 90 {
93 (*iter)->pause(); 91 (*iter)->abortWork(false);
94 } 92 }
95} 93 for (std::vector<LLWorkerClass*>::iterator iter = delete_list.begin();
96 94 iter != delete_list.end(); ++iter)
97//static
98void LLWorkerThread::waitOnAllPending()
99{
100 for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++)
101 { 95 {
102 (*iter)->waitOnPending(); 96 LLWorkerClass* worker = *iter;
97 if (worker->mRequestHandle)
98 {
99 // Finished but not completed
100 completeRequest(worker->mRequestHandle);
101 worker->mRequestHandle = LLWorkerThread::nullHandle();
102 worker->clearFlags(LLWorkerClass::WCF_HAVE_WORK);
103 }
104 delete *iter;
103 } 105 }
106 return res;
104} 107}
105 108
106//---------------------------------------------------------------------------- 109//----------------------------------------------------------------------------
107 110
108LLWorkerThread::LLWorkerThread(bool threaded, bool runalways) : 111LLWorkerThread::handle_t LLWorkerThread::addWorkRequest(LLWorkerClass* workerclass, S32 param, U32 priority)
109 LLQueuedThread("Worker", threaded, runalways)
110{
111 sThreadList.insert(this);
112}
113
114LLWorkerThread::~LLWorkerThread()
115{
116 llverify(sThreadList.erase(this) == 1);
117 // ~LLQueuedThread() will be called here
118}
119
120//----------------------------------------------------------------------------
121
122
123LLWorkerThread::handle_t LLWorkerThread::add(LLWorkerClass* workerclass, S32 param, U32 priority)
124{ 112{
125 handle_t handle = generateHandle(); 113 handle_t handle = generateHandle();
126 114
127 Request* req = new Request(handle, priority, workerclass, param); 115 WorkRequest* req = new WorkRequest(handle, priority, workerclass, param);
128 116
129 bool res = addRequest(req); 117 bool res = addRequest(req);
130 if (!res) 118 if (!res)
@@ -137,63 +125,80 @@ LLWorkerThread::handle_t LLWorkerThread::add(LLWorkerClass* workerclass, S32 par
137 return handle; 125 return handle;
138} 126}
139 127
140//============================================================================ 128void LLWorkerThread::deleteWorker(LLWorkerClass* workerclass)
141// Runs on its OWN thread
142
143bool LLWorkerThread::processRequest(QueuedRequest* qreq)
144{ 129{
145 Request *req = (Request*)qreq; 130 mDeleteMutex->lock();
146 131 mDeleteList.push_back(workerclass);
147 req->getWorkerClass()->setWorking(true); 132 mDeleteMutex->unlock();
148
149 bool complete = req->getWorkerClass()->doWork(req->getParam());
150
151 req->getWorkerClass()->setWorking(false);
152
153 LLThread::yield(); // worker thread should yield after each request
154
155 return complete;
156} 133}
157 134
158//============================================================================ 135//============================================================================
136// Runs on its OWN thread
159 137
160LLWorkerThread::Request::Request(handle_t handle, U32 priority, LLWorkerClass* workerclass, S32 param) : 138LLWorkerThread::WorkRequest::WorkRequest(handle_t handle, U32 priority, LLWorkerClass* workerclass, S32 param) :
161 LLQueuedThread::QueuedRequest(handle, priority), 139 LLQueuedThread::QueuedRequest(handle, priority),
162 mWorkerClass(workerclass), 140 mWorkerClass(workerclass),
163 mParam(param) 141 mParam(param)
164{ 142{
165} 143}
166 144
167void LLWorkerThread::Request::deleteRequest() 145LLWorkerThread::WorkRequest::~WorkRequest()
146{
147}
148
149// virtual (required for access by LLWorkerThread)
150void LLWorkerThread::WorkRequest::deleteRequest()
168{ 151{
169 LLQueuedThread::QueuedRequest::deleteRequest(); 152 LLQueuedThread::QueuedRequest::deleteRequest();
170} 153}
171 154
155// virtual
156bool LLWorkerThread::WorkRequest::processRequest()
157{
158 LLWorkerClass* workerclass = getWorkerClass();
159 workerclass->setWorking(true);
160 bool complete = workerclass->doWork(getParam());
161 workerclass->setWorking(false);
162 return complete;
163}
164
165// virtual
166void LLWorkerThread::WorkRequest::finishRequest(bool completed)
167{
168 LLWorkerClass* workerclass = getWorkerClass();
169 workerclass->finishWork(getParam(), completed);
170 U32 flags = LLWorkerClass::WCF_WORK_FINISHED | (completed ? 0 : LLWorkerClass::WCF_WORK_ABORTED);
171 workerclass->setFlags(flags);
172}
173
172//============================================================================ 174//============================================================================
173// LLWorkerClass:: operates in main thread 175// LLWorkerClass:: operates in main thread
174 176
175LLWorkerClass::LLWorkerClass(LLWorkerThread* workerthread, const std::string& name) 177LLWorkerClass::LLWorkerClass(LLWorkerThread* workerthread, const std::string& name)
176 : mWorkerThread(workerthread), 178 : mWorkerThread(workerthread),
177 mWorkerClassName(name), 179 mWorkerClassName(name),
178 mWorkHandle(LLWorkerThread::nullHandle()), 180 mRequestHandle(LLWorkerThread::nullHandle()),
181 mMutex(workerthread->getWorkerAPRPool()),
179 mWorkFlags(0) 182 mWorkFlags(0)
180{ 183{
181 if (!mWorkerThread) 184 if (!mWorkerThread)
182 { 185 {
183 mWorkerThread = LLWorkerThread::sLocal; 186 llerrs << "LLWorkerClass() called with NULL workerthread: " << name << llendl;
184 } 187 }
185} 188}
189
186LLWorkerClass::~LLWorkerClass() 190LLWorkerClass::~LLWorkerClass()
187{ 191{
188 if (mWorkHandle != LLWorkerThread::nullHandle()) 192 llassert_always(!(mWorkFlags & WCF_WORKING));
193 llassert_always(mWorkFlags & WCF_DELETE_REQUESTED);
194 if (mRequestHandle != LLWorkerThread::nullHandle())
189 { 195 {
190 LLWorkerThread::Request* workreq = (LLWorkerThread::Request*)mWorkerThread->getRequest(mWorkHandle); 196 LLWorkerThread::WorkRequest* workreq = (LLWorkerThread::WorkRequest*)mWorkerThread->getRequest(mRequestHandle);
191 if (!workreq) 197 if (!workreq)
192 { 198 {
193 llerrs << "LLWorkerClass destroyed with stale work handle" << llendl; 199 llerrs << "LLWorkerClass destroyed with stale work handle" << llendl;
194 } 200 }
195 if (workreq->getStatus() != LLWorkerThread::STATUS_ABORT && 201 if (workreq->getStatus() != LLWorkerThread::STATUS_ABORTED &&
196 workreq->getStatus() != LLWorkerThread::STATUS_ABORTED &&
197 workreq->getStatus() != LLWorkerThread::STATUS_COMPLETE) 202 workreq->getStatus() != LLWorkerThread::STATUS_COMPLETE)
198 { 203 {
199 llerrs << "LLWorkerClass destroyed with active worker! Worker Status: " << workreq->getStatus() << llendl; 204 llerrs << "LLWorkerClass destroyed with active worker! Worker Status: " << workreq->getStatus() << llendl;
@@ -203,21 +208,58 @@ LLWorkerClass::~LLWorkerClass()
203 208
204void LLWorkerClass::setWorkerThread(LLWorkerThread* workerthread) 209void LLWorkerClass::setWorkerThread(LLWorkerThread* workerthread)
205{ 210{
206 if (mWorkHandle != LLWorkerThread::nullHandle()) 211 mMutex.lock();
212 if (mRequestHandle != LLWorkerThread::nullHandle())
207 { 213 {
208 llerrs << "LLWorkerClass attempt to change WorkerThread with active worker!" << llendl; 214 llerrs << "LLWorkerClass attempt to change WorkerThread with active worker!" << llendl;
209 } 215 }
210 mWorkerThread = workerthread; 216 mWorkerThread = workerthread;
217 mMutex.unlock();
218}
219
220//----------------------------------------------------------------------------
221
222//virtual
223void LLWorkerClass::finishWork(S32 param, bool success)
224{
225}
226
227//virtual
228bool LLWorkerClass::deleteOK()
229{
230 return true; // default always OK
231}
232
233//----------------------------------------------------------------------------
234
235// Called from worker thread
236void LLWorkerClass::setWorking(bool working)
237{
238 mMutex.lock();
239 if (working)
240 {
241 llassert_always(!(mWorkFlags & WCF_WORKING));
242 setFlags(WCF_WORKING);
243 }
244 else
245 {
246 llassert_always((mWorkFlags & WCF_WORKING));
247 clearFlags(WCF_WORKING);
248 }
249 mMutex.unlock();
211} 250}
212 251
213//---------------------------------------------------------------------------- 252//----------------------------------------------------------------------------
214 253
215bool LLWorkerClass::yield() 254bool LLWorkerClass::yield()
216{ 255{
217 llassert(mWorkFlags & WCF_WORKING);
218 LLThread::yield(); 256 LLThread::yield();
219 mWorkerThread->checkPause(); 257 mWorkerThread->checkPause();
220 return (getFlags() & WCF_ABORT_REQUESTED) ? true : false; 258 bool res;
259 mMutex.lock();
260 res = (getFlags() & WCF_ABORT_REQUESTED) ? true : false;
261 mMutex.unlock();
262 return res;
221} 263}
222 264
223//---------------------------------------------------------------------------- 265//----------------------------------------------------------------------------
@@ -225,7 +267,9 @@ bool LLWorkerClass::yield()
225// calls startWork, adds doWork() to queue 267// calls startWork, adds doWork() to queue
226void LLWorkerClass::addWork(S32 param, U32 priority) 268void LLWorkerClass::addWork(S32 param, U32 priority)
227{ 269{
228 if (mWorkHandle != LLWorkerThread::nullHandle()) 270 mMutex.lock();
271 llassert_always(!(mWorkFlags & (WCF_WORKING|WCF_HAVE_WORK)));
272 if (mRequestHandle != LLWorkerThread::nullHandle())
229 { 273 {
230 llerrs << "LLWorkerClass attempt to add work with active worker!" << llendl; 274 llerrs << "LLWorkerClass attempt to add work with active worker!" << llendl;
231 } 275 }
@@ -233,70 +277,93 @@ void LLWorkerClass::addWork(S32 param, U32 priority)
233// llinfos << "addWork: " << mWorkerClassName << " Param: " << param << llendl; 277// llinfos << "addWork: " << mWorkerClassName << " Param: " << param << llendl;
234#endif 278#endif
235 startWork(param); 279 startWork(param);
236 mWorkHandle = mWorkerThread->add(this, param, priority); 280 clearFlags(WCF_WORK_FINISHED|WCF_WORK_ABORTED);
281 setFlags(WCF_HAVE_WORK);
282 mRequestHandle = mWorkerThread->addWorkRequest(this, param, priority);
283 mMutex.unlock();
237} 284}
238 285
239void LLWorkerClass::abortWork() 286void LLWorkerClass::abortWork(bool autocomplete)
240{ 287{
288 mMutex.lock();
241#if _DEBUG 289#if _DEBUG
242// LLWorkerThread::Request* workreq = mWorkerThread->getRequest(mWorkHandle); 290// LLWorkerThread::WorkRequest* workreq = mWorkerThread->getRequest(mRequestHandle);
243// if (workreq) 291// if (workreq)
244// llinfos << "abortWork: " << mWorkerClassName << " Param: " << workreq->getParam() << llendl; 292// llinfos << "abortWork: " << mWorkerClassName << " Param: " << workreq->getParam() << llendl;
245#endif 293#endif
246 mWorkerThread->abortRequest(mWorkHandle); 294 if (mRequestHandle != LLWorkerThread::nullHandle())
247 setFlags(WCF_ABORT_REQUESTED); 295 {
296 mWorkerThread->abortRequest(mRequestHandle, autocomplete);
297 mWorkerThread->setPriority(mRequestHandle, LLQueuedThread::PRIORITY_IMMEDIATE);
298 setFlags(WCF_ABORT_REQUESTED);
299 }
300 mMutex.unlock();
248} 301}
249 302
250// if doWork is complete or aborted, call endWork() and return true 303// if doWork is complete or aborted, call endWork() and return true
251bool LLWorkerClass::checkWork() 304bool LLWorkerClass::checkWork(bool aborting)
252{ 305{
306 LLMutexLock lock(&mMutex);
253 bool complete = false, abort = false; 307 bool complete = false, abort = false;
254 LLWorkerThread::Request* workreq = (LLWorkerThread::Request*)mWorkerThread->getRequest(mWorkHandle); 308 if (mRequestHandle != LLWorkerThread::nullHandle())
255 llassert(workreq);
256 if (getFlags(WCF_ABORT_REQUESTED) || workreq->getStatus() == LLWorkerThread::STATUS_ABORTED)
257 { 309 {
258 complete = true; 310 LLWorkerThread::WorkRequest* workreq = (LLWorkerThread::WorkRequest*)mWorkerThread->getRequest(mRequestHandle);
259 abort = true; 311 llassert_always(workreq);
312 LLQueuedThread::status_t status = workreq->getStatus();
313 if (status == LLWorkerThread::STATUS_ABORTED)
314 {
315 complete = true;
316 abort = true;
317 }
318 else if (status == LLWorkerThread::STATUS_COMPLETE)
319 {
320 complete = true;
321 }
322 else
323 {
324 llassert_always(!aborting || (workreq->getFlags() & LLQueuedThread::FLAG_ABORT));
325 }
326 if (complete)
327 {
328 llassert_always(!(getFlags(WCF_WORKING)));
329 endWork(workreq->getParam(), abort);
330 mWorkerThread->completeRequest(mRequestHandle);
331 mRequestHandle = LLWorkerThread::nullHandle();
332 clearFlags(WCF_HAVE_WORK);
333 }
260 } 334 }
261 else if (workreq->getStatus() == LLWorkerThread::STATUS_COMPLETE) 335 else
262 { 336 {
263 complete = true; 337 complete = true;
264 } 338 }
265 if (complete)
266 {
267#if _DEBUG
268// llinfos << "endWork: " << mWorkerClassName << " Param: " << workreq->getParam() << llendl;
269#endif
270 endWork(workreq->getParam(), abort);
271 mWorkerThread->completeRequest(mWorkHandle);
272 mWorkHandle = LLWorkerThread::nullHandle();
273 }
274 return complete; 339 return complete;
275} 340}
276 341
277void LLWorkerClass::killWork() 342void LLWorkerClass::scheduleDelete()
278{ 343{
279 if (haveWork()) 344 bool do_delete = false;
345 mMutex.lock();
346 if (!(getFlags(WCF_DELETE_REQUESTED)))
280 { 347 {
281 abortWork(); 348 setFlags(WCF_DELETE_REQUESTED);
282 bool paused = mWorkerThread->isPaused(); 349 do_delete = true;
283 while (!checkWork()) 350 }
284 { 351 mMutex.unlock();
285 mWorkerThread->updateQueue(0); 352 if (do_delete)
286 } 353 {
287 if (paused) 354 mWorkerThread->deleteWorker(this);
288 {
289 mWorkerThread->pause();
290 }
291 } 355 }
292} 356}
293 357
294void LLWorkerClass::setPriority(U32 priority) 358void LLWorkerClass::setPriority(U32 priority)
295{ 359{
296 if (haveWork()) 360 mMutex.lock();
361 if (mRequestHandle != LLWorkerThread::nullHandle())
297 { 362 {
298 mWorkerThread->setPriority(mWorkHandle, priority); 363 mRequestPriority = priority;
364 mWorkerThread->setPriority(mRequestHandle, priority);
299 } 365 }
366 mMutex.unlock();
300} 367}
301 368
302//============================================================================ 369//============================================================================