diff options
Diffstat (limited to 'linden/indra/llcommon/llworkerthread.cpp')
-rw-r--r-- | linden/indra/llcommon/llworkerthread.cpp | 325 |
1 files changed, 196 insertions, 129 deletions
diff --git a/linden/indra/llcommon/llworkerthread.cpp b/linden/indra/llcommon/llworkerthread.cpp index 3190046..4f99393 100644 --- a/linden/indra/llcommon/llworkerthread.cpp +++ b/linden/indra/llcommon/llworkerthread.cpp | |||
@@ -33,98 +33,86 @@ | |||
33 | #endif | 33 | #endif |
34 | 34 | ||
35 | //============================================================================ | 35 | //============================================================================ |
36 | |||
37 | /*static*/ LLWorkerThread* LLWorkerThread::sLocal = NULL; | ||
38 | /*static*/ std::set<LLWorkerThread*> LLWorkerThread::sThreadList; | ||
39 | |||
40 | //============================================================================ | ||
41 | // Run on MAIN thread | 36 | // Run on MAIN thread |
42 | 37 | ||
43 | //static | 38 | LLWorkerThread::LLWorkerThread(const std::string& name, bool threaded) : |
44 | void LLWorkerThread::initClass(bool local_is_threaded, bool local_run_always) | 39 | LLQueuedThread(name, threaded), |
40 | mWorkerAPRPoolp(NULL) | ||
45 | { | 41 | { |
46 | if (!sLocal) | 42 | apr_pool_create(&mWorkerAPRPoolp, NULL); |
47 | { | 43 | mDeleteMutex = new LLMutex(getAPRPool()); |
48 | sLocal = new LLWorkerThread(local_is_threaded, local_run_always); | ||
49 | } | ||
50 | } | 44 | } |
51 | 45 | ||
52 | //static | 46 | LLWorkerThread::~LLWorkerThread() |
53 | void LLWorkerThread::cleanupClass() | ||
54 | { | 47 | { |
55 | if (sLocal) | 48 | // Delete any workers in the delete queue (should be safe - had better be!) |
49 | if (!mDeleteList.empty()) | ||
56 | { | 50 | { |
57 | while (sLocal->getPending()) | 51 | llwarns << "Worker Thread: " << mName << " destroyed with " << mDeleteList.size() |
58 | { | 52 | << " entries in delete list." << llendl; |
59 | sLocal->update(0); | ||
60 | } | ||
61 | delete sLocal; | ||
62 | sLocal = NULL; | ||
63 | llassert(sThreadList.size() == 0); | ||
64 | } | 53 | } |
65 | } | ||
66 | 54 | ||
67 | //static | 55 | delete mDeleteMutex; |
68 | S32 LLWorkerThread::updateClass(U32 ms_elapsed) | 56 | |
69 | { | 57 | // ~LLQueuedThread() will be called here |
70 | for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++) | ||
71 | { | ||
72 | (*iter)->update(ms_elapsed); | ||
73 | } | ||
74 | return getAllPending(); | ||
75 | } | 58 | } |
76 | 59 | ||
77 | //static | 60 | // virtual |
78 | S32 LLWorkerThread::getAllPending() | 61 | S32 LLWorkerThread::update(U32 max_time_ms) |
79 | { | 62 | { |
80 | S32 res = 0; | 63 | S32 res = LLQueuedThread::update(max_time_ms); |
81 | for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++) | 64 | // Delete scheduled workers |
65 | std::vector<LLWorkerClass*> delete_list; | ||
66 | std::vector<LLWorkerClass*> abort_list; | ||
67 | mDeleteMutex->lock(); | ||
68 | for (delete_list_t::iterator iter = mDeleteList.begin(); | ||
69 | iter != mDeleteList.end(); ) | ||
82 | { | 70 | { |
83 | res += (*iter)->getPending(); | 71 | delete_list_t::iterator curiter = iter++; |
72 | LLWorkerClass* worker = *curiter; | ||
73 | if (worker->deleteOK()) | ||
74 | { | ||
75 | if (worker->getFlags(LLWorkerClass::WCF_WORK_FINISHED)) | ||
76 | { | ||
77 | delete_list.push_back(worker); | ||
78 | mDeleteList.erase(curiter); | ||
79 | } | ||
80 | else if (!worker->getFlags(LLWorkerClass::WCF_ABORT_REQUESTED)) | ||
81 | { | ||
82 | abort_list.push_back(worker); | ||
83 | } | ||
84 | } | ||
84 | } | 85 | } |
85 | return res; | 86 | mDeleteMutex->unlock(); |
86 | } | 87 | // abort and delete after releasing mutex |
87 | 88 | for (std::vector<LLWorkerClass*>::iterator iter = abort_list.begin(); | |
88 | //static | 89 | iter != abort_list.end(); ++iter) |
89 | void LLWorkerThread::pauseAll() | ||
90 | { | ||
91 | for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++) | ||
92 | { | 90 | { |
93 | (*iter)->pause(); | 91 | (*iter)->abortWork(false); |
94 | } | 92 | } |
95 | } | 93 | for (std::vector<LLWorkerClass*>::iterator iter = delete_list.begin(); |
96 | 94 | iter != delete_list.end(); ++iter) | |
97 | //static | ||
98 | void LLWorkerThread::waitOnAllPending() | ||
99 | { | ||
100 | for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++) | ||
101 | { | 95 | { |
102 | (*iter)->waitOnPending(); | 96 | LLWorkerClass* worker = *iter; |
97 | if (worker->mRequestHandle) | ||
98 | { | ||
99 | // Finished but not completed | ||
100 | completeRequest(worker->mRequestHandle); | ||
101 | worker->mRequestHandle = LLWorkerThread::nullHandle(); | ||
102 | worker->clearFlags(LLWorkerClass::WCF_HAVE_WORK); | ||
103 | } | ||
104 | delete *iter; | ||
103 | } | 105 | } |
106 | return res; | ||
104 | } | 107 | } |
105 | 108 | ||
106 | //---------------------------------------------------------------------------- | 109 | //---------------------------------------------------------------------------- |
107 | 110 | ||
108 | LLWorkerThread::LLWorkerThread(bool threaded, bool runalways) : | 111 | LLWorkerThread::handle_t LLWorkerThread::addWorkRequest(LLWorkerClass* workerclass, S32 param, U32 priority) |
109 | LLQueuedThread("Worker", threaded, runalways) | ||
110 | { | ||
111 | sThreadList.insert(this); | ||
112 | } | ||
113 | |||
114 | LLWorkerThread::~LLWorkerThread() | ||
115 | { | ||
116 | llverify(sThreadList.erase(this) == 1); | ||
117 | // ~LLQueuedThread() will be called here | ||
118 | } | ||
119 | |||
120 | //---------------------------------------------------------------------------- | ||
121 | |||
122 | |||
123 | LLWorkerThread::handle_t LLWorkerThread::add(LLWorkerClass* workerclass, S32 param, U32 priority) | ||
124 | { | 112 | { |
125 | handle_t handle = generateHandle(); | 113 | handle_t handle = generateHandle(); |
126 | 114 | ||
127 | Request* req = new Request(handle, priority, workerclass, param); | 115 | WorkRequest* req = new WorkRequest(handle, priority, workerclass, param); |
128 | 116 | ||
129 | bool res = addRequest(req); | 117 | bool res = addRequest(req); |
130 | if (!res) | 118 | if (!res) |
@@ -137,63 +125,80 @@ LLWorkerThread::handle_t LLWorkerThread::add(LLWorkerClass* workerclass, S32 par | |||
137 | return handle; | 125 | return handle; |
138 | } | 126 | } |
139 | 127 | ||
140 | //============================================================================ | 128 | void LLWorkerThread::deleteWorker(LLWorkerClass* workerclass) |
141 | // Runs on its OWN thread | ||
142 | |||
143 | bool LLWorkerThread::processRequest(QueuedRequest* qreq) | ||
144 | { | 129 | { |
145 | Request *req = (Request*)qreq; | 130 | mDeleteMutex->lock(); |
146 | 131 | mDeleteList.push_back(workerclass); | |
147 | req->getWorkerClass()->setWorking(true); | 132 | mDeleteMutex->unlock(); |
148 | |||
149 | bool complete = req->getWorkerClass()->doWork(req->getParam()); | ||
150 | |||
151 | req->getWorkerClass()->setWorking(false); | ||
152 | |||
153 | LLThread::yield(); // worker thread should yield after each request | ||
154 | |||
155 | return complete; | ||
156 | } | 133 | } |
157 | 134 | ||
158 | //============================================================================ | 135 | //============================================================================ |
136 | // Runs on its OWN thread | ||
159 | 137 | ||
160 | LLWorkerThread::Request::Request(handle_t handle, U32 priority, LLWorkerClass* workerclass, S32 param) : | 138 | LLWorkerThread::WorkRequest::WorkRequest(handle_t handle, U32 priority, LLWorkerClass* workerclass, S32 param) : |
161 | LLQueuedThread::QueuedRequest(handle, priority), | 139 | LLQueuedThread::QueuedRequest(handle, priority), |
162 | mWorkerClass(workerclass), | 140 | mWorkerClass(workerclass), |
163 | mParam(param) | 141 | mParam(param) |
164 | { | 142 | { |
165 | } | 143 | } |
166 | 144 | ||
167 | void LLWorkerThread::Request::deleteRequest() | 145 | LLWorkerThread::WorkRequest::~WorkRequest() |
146 | { | ||
147 | } | ||
148 | |||
149 | // virtual (required for access by LLWorkerThread) | ||
150 | void LLWorkerThread::WorkRequest::deleteRequest() | ||
168 | { | 151 | { |
169 | LLQueuedThread::QueuedRequest::deleteRequest(); | 152 | LLQueuedThread::QueuedRequest::deleteRequest(); |
170 | } | 153 | } |
171 | 154 | ||
155 | // virtual | ||
156 | bool LLWorkerThread::WorkRequest::processRequest() | ||
157 | { | ||
158 | LLWorkerClass* workerclass = getWorkerClass(); | ||
159 | workerclass->setWorking(true); | ||
160 | bool complete = workerclass->doWork(getParam()); | ||
161 | workerclass->setWorking(false); | ||
162 | return complete; | ||
163 | } | ||
164 | |||
165 | // virtual | ||
166 | void LLWorkerThread::WorkRequest::finishRequest(bool completed) | ||
167 | { | ||
168 | LLWorkerClass* workerclass = getWorkerClass(); | ||
169 | workerclass->finishWork(getParam(), completed); | ||
170 | U32 flags = LLWorkerClass::WCF_WORK_FINISHED | (completed ? 0 : LLWorkerClass::WCF_WORK_ABORTED); | ||
171 | workerclass->setFlags(flags); | ||
172 | } | ||
173 | |||
172 | //============================================================================ | 174 | //============================================================================ |
173 | // LLWorkerClass:: operates in main thread | 175 | // LLWorkerClass:: operates in main thread |
174 | 176 | ||
175 | LLWorkerClass::LLWorkerClass(LLWorkerThread* workerthread, const std::string& name) | 177 | LLWorkerClass::LLWorkerClass(LLWorkerThread* workerthread, const std::string& name) |
176 | : mWorkerThread(workerthread), | 178 | : mWorkerThread(workerthread), |
177 | mWorkerClassName(name), | 179 | mWorkerClassName(name), |
178 | mWorkHandle(LLWorkerThread::nullHandle()), | 180 | mRequestHandle(LLWorkerThread::nullHandle()), |
181 | mMutex(workerthread->getWorkerAPRPool()), | ||
179 | mWorkFlags(0) | 182 | mWorkFlags(0) |
180 | { | 183 | { |
181 | if (!mWorkerThread) | 184 | if (!mWorkerThread) |
182 | { | 185 | { |
183 | mWorkerThread = LLWorkerThread::sLocal; | 186 | llerrs << "LLWorkerClass() called with NULL workerthread: " << name << llendl; |
184 | } | 187 | } |
185 | } | 188 | } |
189 | |||
186 | LLWorkerClass::~LLWorkerClass() | 190 | LLWorkerClass::~LLWorkerClass() |
187 | { | 191 | { |
188 | if (mWorkHandle != LLWorkerThread::nullHandle()) | 192 | llassert_always(!(mWorkFlags & WCF_WORKING)); |
193 | llassert_always(mWorkFlags & WCF_DELETE_REQUESTED); | ||
194 | if (mRequestHandle != LLWorkerThread::nullHandle()) | ||
189 | { | 195 | { |
190 | LLWorkerThread::Request* workreq = (LLWorkerThread::Request*)mWorkerThread->getRequest(mWorkHandle); | 196 | LLWorkerThread::WorkRequest* workreq = (LLWorkerThread::WorkRequest*)mWorkerThread->getRequest(mRequestHandle); |
191 | if (!workreq) | 197 | if (!workreq) |
192 | { | 198 | { |
193 | llerrs << "LLWorkerClass destroyed with stale work handle" << llendl; | 199 | llerrs << "LLWorkerClass destroyed with stale work handle" << llendl; |
194 | } | 200 | } |
195 | if (workreq->getStatus() != LLWorkerThread::STATUS_ABORT && | 201 | if (workreq->getStatus() != LLWorkerThread::STATUS_ABORTED && |
196 | workreq->getStatus() != LLWorkerThread::STATUS_ABORTED && | ||
197 | workreq->getStatus() != LLWorkerThread::STATUS_COMPLETE) | 202 | workreq->getStatus() != LLWorkerThread::STATUS_COMPLETE) |
198 | { | 203 | { |
199 | llerrs << "LLWorkerClass destroyed with active worker! Worker Status: " << workreq->getStatus() << llendl; | 204 | llerrs << "LLWorkerClass destroyed with active worker! Worker Status: " << workreq->getStatus() << llendl; |
@@ -203,21 +208,58 @@ LLWorkerClass::~LLWorkerClass() | |||
203 | 208 | ||
204 | void LLWorkerClass::setWorkerThread(LLWorkerThread* workerthread) | 209 | void LLWorkerClass::setWorkerThread(LLWorkerThread* workerthread) |
205 | { | 210 | { |
206 | if (mWorkHandle != LLWorkerThread::nullHandle()) | 211 | mMutex.lock(); |
212 | if (mRequestHandle != LLWorkerThread::nullHandle()) | ||
207 | { | 213 | { |
208 | llerrs << "LLWorkerClass attempt to change WorkerThread with active worker!" << llendl; | 214 | llerrs << "LLWorkerClass attempt to change WorkerThread with active worker!" << llendl; |
209 | } | 215 | } |
210 | mWorkerThread = workerthread; | 216 | mWorkerThread = workerthread; |
217 | mMutex.unlock(); | ||
218 | } | ||
219 | |||
220 | //---------------------------------------------------------------------------- | ||
221 | |||
222 | //virtual | ||
223 | void LLWorkerClass::finishWork(S32 param, bool success) | ||
224 | { | ||
225 | } | ||
226 | |||
227 | //virtual | ||
228 | bool LLWorkerClass::deleteOK() | ||
229 | { | ||
230 | return true; // default always OK | ||
231 | } | ||
232 | |||
233 | //---------------------------------------------------------------------------- | ||
234 | |||
235 | // Called from worker thread | ||
236 | void LLWorkerClass::setWorking(bool working) | ||
237 | { | ||
238 | mMutex.lock(); | ||
239 | if (working) | ||
240 | { | ||
241 | llassert_always(!(mWorkFlags & WCF_WORKING)); | ||
242 | setFlags(WCF_WORKING); | ||
243 | } | ||
244 | else | ||
245 | { | ||
246 | llassert_always((mWorkFlags & WCF_WORKING)); | ||
247 | clearFlags(WCF_WORKING); | ||
248 | } | ||
249 | mMutex.unlock(); | ||
211 | } | 250 | } |
212 | 251 | ||
213 | //---------------------------------------------------------------------------- | 252 | //---------------------------------------------------------------------------- |
214 | 253 | ||
215 | bool LLWorkerClass::yield() | 254 | bool LLWorkerClass::yield() |
216 | { | 255 | { |
217 | llassert(mWorkFlags & WCF_WORKING); | ||
218 | LLThread::yield(); | 256 | LLThread::yield(); |
219 | mWorkerThread->checkPause(); | 257 | mWorkerThread->checkPause(); |
220 | return (getFlags() & WCF_ABORT_REQUESTED) ? true : false; | 258 | bool res; |
259 | mMutex.lock(); | ||
260 | res = (getFlags() & WCF_ABORT_REQUESTED) ? true : false; | ||
261 | mMutex.unlock(); | ||
262 | return res; | ||
221 | } | 263 | } |
222 | 264 | ||
223 | //---------------------------------------------------------------------------- | 265 | //---------------------------------------------------------------------------- |
@@ -225,7 +267,9 @@ bool LLWorkerClass::yield() | |||
225 | // calls startWork, adds doWork() to queue | 267 | // calls startWork, adds doWork() to queue |
226 | void LLWorkerClass::addWork(S32 param, U32 priority) | 268 | void LLWorkerClass::addWork(S32 param, U32 priority) |
227 | { | 269 | { |
228 | if (mWorkHandle != LLWorkerThread::nullHandle()) | 270 | mMutex.lock(); |
271 | llassert_always(!(mWorkFlags & (WCF_WORKING|WCF_HAVE_WORK))); | ||
272 | if (mRequestHandle != LLWorkerThread::nullHandle()) | ||
229 | { | 273 | { |
230 | llerrs << "LLWorkerClass attempt to add work with active worker!" << llendl; | 274 | llerrs << "LLWorkerClass attempt to add work with active worker!" << llendl; |
231 | } | 275 | } |
@@ -233,70 +277,93 @@ void LLWorkerClass::addWork(S32 param, U32 priority) | |||
233 | // llinfos << "addWork: " << mWorkerClassName << " Param: " << param << llendl; | 277 | // llinfos << "addWork: " << mWorkerClassName << " Param: " << param << llendl; |
234 | #endif | 278 | #endif |
235 | startWork(param); | 279 | startWork(param); |
236 | mWorkHandle = mWorkerThread->add(this, param, priority); | 280 | clearFlags(WCF_WORK_FINISHED|WCF_WORK_ABORTED); |
281 | setFlags(WCF_HAVE_WORK); | ||
282 | mRequestHandle = mWorkerThread->addWorkRequest(this, param, priority); | ||
283 | mMutex.unlock(); | ||
237 | } | 284 | } |
238 | 285 | ||
239 | void LLWorkerClass::abortWork() | 286 | void LLWorkerClass::abortWork(bool autocomplete) |
240 | { | 287 | { |
288 | mMutex.lock(); | ||
241 | #if _DEBUG | 289 | #if _DEBUG |
242 | // LLWorkerThread::Request* workreq = mWorkerThread->getRequest(mWorkHandle); | 290 | // LLWorkerThread::WorkRequest* workreq = mWorkerThread->getRequest(mRequestHandle); |
243 | // if (workreq) | 291 | // if (workreq) |
244 | // llinfos << "abortWork: " << mWorkerClassName << " Param: " << workreq->getParam() << llendl; | 292 | // llinfos << "abortWork: " << mWorkerClassName << " Param: " << workreq->getParam() << llendl; |
245 | #endif | 293 | #endif |
246 | mWorkerThread->abortRequest(mWorkHandle); | 294 | if (mRequestHandle != LLWorkerThread::nullHandle()) |
247 | setFlags(WCF_ABORT_REQUESTED); | 295 | { |
296 | mWorkerThread->abortRequest(mRequestHandle, autocomplete); | ||
297 | mWorkerThread->setPriority(mRequestHandle, LLQueuedThread::PRIORITY_IMMEDIATE); | ||
298 | setFlags(WCF_ABORT_REQUESTED); | ||
299 | } | ||
300 | mMutex.unlock(); | ||
248 | } | 301 | } |
249 | 302 | ||
250 | // if doWork is complete or aborted, call endWork() and return true | 303 | // if doWork is complete or aborted, call endWork() and return true |
251 | bool LLWorkerClass::checkWork() | 304 | bool LLWorkerClass::checkWork(bool aborting) |
252 | { | 305 | { |
306 | LLMutexLock lock(&mMutex); | ||
253 | bool complete = false, abort = false; | 307 | bool complete = false, abort = false; |
254 | LLWorkerThread::Request* workreq = (LLWorkerThread::Request*)mWorkerThread->getRequest(mWorkHandle); | 308 | if (mRequestHandle != LLWorkerThread::nullHandle()) |
255 | llassert(workreq); | ||
256 | if (getFlags(WCF_ABORT_REQUESTED) || workreq->getStatus() == LLWorkerThread::STATUS_ABORTED) | ||
257 | { | 309 | { |
258 | complete = true; | 310 | LLWorkerThread::WorkRequest* workreq = (LLWorkerThread::WorkRequest*)mWorkerThread->getRequest(mRequestHandle); |
259 | abort = true; | 311 | llassert_always(workreq); |
312 | LLQueuedThread::status_t status = workreq->getStatus(); | ||
313 | if (status == LLWorkerThread::STATUS_ABORTED) | ||
314 | { | ||
315 | complete = true; | ||
316 | abort = true; | ||
317 | } | ||
318 | else if (status == LLWorkerThread::STATUS_COMPLETE) | ||
319 | { | ||
320 | complete = true; | ||
321 | } | ||
322 | else | ||
323 | { | ||
324 | llassert_always(!aborting || (workreq->getFlags() & LLQueuedThread::FLAG_ABORT)); | ||
325 | } | ||
326 | if (complete) | ||
327 | { | ||
328 | llassert_always(!(getFlags(WCF_WORKING))); | ||
329 | endWork(workreq->getParam(), abort); | ||
330 | mWorkerThread->completeRequest(mRequestHandle); | ||
331 | mRequestHandle = LLWorkerThread::nullHandle(); | ||
332 | clearFlags(WCF_HAVE_WORK); | ||
333 | } | ||
260 | } | 334 | } |
261 | else if (workreq->getStatus() == LLWorkerThread::STATUS_COMPLETE) | 335 | else |
262 | { | 336 | { |
263 | complete = true; | 337 | complete = true; |
264 | } | 338 | } |
265 | if (complete) | ||
266 | { | ||
267 | #if _DEBUG | ||
268 | // llinfos << "endWork: " << mWorkerClassName << " Param: " << workreq->getParam() << llendl; | ||
269 | #endif | ||
270 | endWork(workreq->getParam(), abort); | ||
271 | mWorkerThread->completeRequest(mWorkHandle); | ||
272 | mWorkHandle = LLWorkerThread::nullHandle(); | ||
273 | } | ||
274 | return complete; | 339 | return complete; |
275 | } | 340 | } |
276 | 341 | ||
277 | void LLWorkerClass::killWork() | 342 | void LLWorkerClass::scheduleDelete() |
278 | { | 343 | { |
279 | if (haveWork()) | 344 | bool do_delete = false; |
345 | mMutex.lock(); | ||
346 | if (!(getFlags(WCF_DELETE_REQUESTED))) | ||
280 | { | 347 | { |
281 | abortWork(); | 348 | setFlags(WCF_DELETE_REQUESTED); |
282 | bool paused = mWorkerThread->isPaused(); | 349 | do_delete = true; |
283 | while (!checkWork()) | 350 | } |
284 | { | 351 | mMutex.unlock(); |
285 | mWorkerThread->updateQueue(0); | 352 | if (do_delete) |
286 | } | 353 | { |
287 | if (paused) | 354 | mWorkerThread->deleteWorker(this); |
288 | { | ||
289 | mWorkerThread->pause(); | ||
290 | } | ||
291 | } | 355 | } |
292 | } | 356 | } |
293 | 357 | ||
294 | void LLWorkerClass::setPriority(U32 priority) | 358 | void LLWorkerClass::setPriority(U32 priority) |
295 | { | 359 | { |
296 | if (haveWork()) | 360 | mMutex.lock(); |
361 | if (mRequestHandle != LLWorkerThread::nullHandle()) | ||
297 | { | 362 | { |
298 | mWorkerThread->setPriority(mWorkHandle, priority); | 363 | mRequestPriority = priority; |
364 | mWorkerThread->setPriority(mRequestHandle, priority); | ||
299 | } | 365 | } |
366 | mMutex.unlock(); | ||
300 | } | 367 | } |
301 | 368 | ||
302 | //============================================================================ | 369 | //============================================================================ |