aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/libraries/ecore/src/lib/ecore/ecore_thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'libraries/ecore/src/lib/ecore/ecore_thread.c')
-rw-r--r--libraries/ecore/src/lib/ecore/ecore_thread.c756
1 files changed, 360 insertions, 396 deletions
diff --git a/libraries/ecore/src/lib/ecore/ecore_thread.c b/libraries/ecore/src/lib/ecore/ecore_thread.c
index 4444ad4..85fbe64 100644
--- a/libraries/ecore/src/lib/ecore/ecore_thread.c
+++ b/libraries/ecore/src/lib/ecore/ecore_thread.c
@@ -17,6 +17,25 @@
17 17
18#ifdef EFL_HAVE_THREADS 18#ifdef EFL_HAVE_THREADS
19 19
20# define LK(x) Eina_Lock x
21# define LKI(x) eina_lock_new(&(x))
22# define LKD(x) eina_lock_free(&(x))
23# define LKL(x) eina_lock_take(&(x))
24# define LKU(x) eina_lock_release(&(x))
25
26# define CD(x) Eina_Condition x
27# define CDI(x, m) eina_condition_new(&(x), &(m))
28# define CDD(x) eina_condition_free(&(x))
29# define CDB(x) eina_condition_broadcast(&(x))
30# define CDW(x, t) eina_condition_timedwait(&(x), t)
31
32# define LRWK(x) Eina_RWLock x
33# define LRWKI(x) eina_rwlock_new(&(x));
34# define LRWKD(x) eina_rwlock_free(&(x));
35# define LRWKWL(x) eina_rwlock_take_write(&(x));
36# define LRWKRL(x) eina_rwlock_take_read(&(x));
37# define LRWKU(x) eina_rwlock_release(&(x));
38
20# ifdef EFL_HAVE_POSIX_THREADS 39# ifdef EFL_HAVE_POSIX_THREADS
21# include <pthread.h> 40# include <pthread.h>
22# ifdef __linux__ 41# ifdef __linux__
@@ -31,28 +50,9 @@
31# define PHE(x, y) pthread_equal(x, y) 50# define PHE(x, y) pthread_equal(x, y)
32# define PHS() pthread_self() 51# define PHS() pthread_self()
33# define PHC(x, f, d) pthread_create(&(x), NULL, (void *)f, d) 52# define PHC(x, f, d) pthread_create(&(x), NULL, (void *)f, d)
34# define PHJ(x, p) pthread_join(x, (void **)(&(p))) 53# define PHJ(x) pthread_join(x, NULL)
35# define PHA(x) pthread_cancel(x) 54# define PHA(x) pthread_cancel(x)
36 55
37# define CD(x) pthread_cond_t x
38# define CDI(x) pthread_cond_init(&(x), NULL);
39# define CDD(x) pthread_cond_destroy(&(x));
40# define CDB(x) pthread_cond_broadcast(&(x));
41# define CDW(x, y, t) pthread_cond_timedwait(&(x), &(y), t);
42
43# define LK(x) pthread_mutex_t x
44# define LKI(x) pthread_mutex_init(&(x), NULL);
45# define LKD(x) pthread_mutex_destroy(&(x));
46# define LKL(x) pthread_mutex_lock(&(x));
47# define LKU(x) pthread_mutex_unlock(&(x));
48
49# define LRWK(x) pthread_rwlock_t x
50# define LRWKI(x) pthread_rwlock_init(&(x), NULL);
51# define LRWKD(x) pthread_rwlock_destroy(&(x));
52# define LRWKWL(x) pthread_rwlock_wrlock(&(x));
53# define LRWKRL(x) pthread_rwlock_rdlock(&(x));
54# define LRWKU(x) pthread_rwlock_unlock(&(x));
55
56# else /* EFL_HAVE_WIN32_THREADS */ 56# else /* EFL_HAVE_WIN32_THREADS */
57 57
58# define WIN32_LEAN_AND_MEAN 58# define WIN32_LEAN_AND_MEAN
@@ -108,209 +108,9 @@ _ecore_thread_win32_join(win32_thread *x,
108 return 0; 108 return 0;
109} 109}
110 110
111# define PHJ(x, p) _ecore_thread_win32_join(x, (void **)(&(p))) 111# define PHJ(x) _ecore_thread_win32_join(x, NULL)
112# define PHA(x) TerminateThread(x->thread, 0) 112# define PHA(x) TerminateThread(x->thread, 0)
113 113
114# define LK(x) HANDLE x
115# define LKI(x) x = CreateMutex(NULL, FALSE, NULL)
116# define LKD(x) CloseHandle(x)
117# define LKL(x) WaitForSingleObject(x, INFINITE)
118# define LKU(x) ReleaseMutex(x)
119
120typedef struct
121{
122 HANDLE semaphore;
123 LONG threads_count;
124 CRITICAL_SECTION threads_count_lock;
125} win32_cond;
126
127# define CD(x) win32_cond * x
128
129# define CDI(x) \
130 do { \
131 x = (win32_cond *)calloc(1, sizeof(win32_cond)); \
132 if (x) \
133 { \
134 x->semaphore = CreateSemaphore(NULL, 0, 0x7fffffff, NULL); \
135 if (x->semaphore) \
136 InitializeCriticalSection(&x->threads_count_lock); \
137 else \
138 { \
139 free(x); \
140 x = NULL; \
141 } \
142 } \
143 } while (0)
144
145# define CDD(x) \
146 do { \
147 CloseHandle(x->semaphore); \
148 free(x); \
149 x = NULL; \
150 } while (0)
151
152# define CDB(x) \
153 do { \
154 EnterCriticalSection(&x->threads_count_lock); \
155 if (x->threads_count > 0) \
156 ReleaseSemaphore(x->semaphore, x->threads_count, NULL); \
157 LeaveCriticalSection (&x->threads_count_lock); \
158 } while (0)
159
160int
161_ecore_thread_win32_cond_timedwait(win32_cond *c,
162 HANDLE *external_mutex,
163 struct timeval *t)
164{
165 DWORD res;
166 DWORD val = t->tv_sec * 1000 + (t->tv_usec / 1000);
167 LKL(external_mutex);
168 EnterCriticalSection (&c->threads_count_lock);
169 c->threads_count++;
170 LeaveCriticalSection (&c->threads_count_lock);
171 LKU(external_mutex);
172 res = WaitForSingleObject(c->semaphore, val);
173 if (res == WAIT_OBJECT_0)
174 return 0;
175 else
176 return -1;
177}
178
179# define CDW(x, y, t) _ecore_thread_win32_cond_timedwait(x, y, t)
180
181typedef struct
182{
183 LONG readers_count;
184 LONG writers_count;
185 int readers;
186 int writers;
187 LK(mutex);
188 CD(cond_read);
189 CD(cond_write);
190} win32_rwl;
191
192# define LRWK(x) win32_rwl * x
193# define LRWKI(x) \
194 do { \
195 x = (win32_rwl *)calloc(1, sizeof(win32_rwl)); \
196 if (x) \
197 { \
198 LKI(x->mutex); \
199 if (x->mutex) \
200 { \
201 CDI(x->cond_read); \
202 if (x->cond_read) \
203 { \
204 CDI(x->cond_write); \
205 if (!x->cond_write) \
206 { \
207 CDD(x->cond_read); \
208 LKD(x->mutex); \
209 free(x); \
210 x = NULL; \
211 } \
212 } \
213 else \
214 { \
215 LKD(x->mutex); \
216 free(x); \
217 x = NULL; \
218 } \
219 } \
220 else \
221 { \
222 free(x); \
223 x = NULL; \
224 } \
225 } \
226 } while (0)
227
228# define LRWKD(x) \
229 do { \
230 LKU(x->mutex); \
231 LKD(x->mutex); \
232 CDD(x->cond_write); \
233 CDD(x->cond_read); \
234 free(x); \
235 } while (0)
236# define LRWKWL(x) \
237 do { \
238 DWORD res; \
239 LKU(x->mutex); \
240 if (x->writers || x->readers > 0) \
241 { \
242 x->writers_count++; \
243 while (x->writers || x->readers > 0) \
244 { \
245 EnterCriticalSection(&x->cond_write->threads_count_lock); \
246 x->cond_read->threads_count++; \
247 LeaveCriticalSection(&x->cond_write->threads_count_lock); \
248 res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
249 if (res != WAIT_OBJECT_0) break; \
250 } \
251 x->writers_count--; \
252 } \
253 if (res == 0) x->writers_count = 1; \
254 LKU(x->mutex); \
255 } while (0)
256# define LRWKRL(x) \
257 do { \
258 DWORD res; \
259 LKL(x->mutex); \
260 if (x->writers) \
261 { \
262 x->readers_count++; \
263 while (x->writers) \
264 { \
265 EnterCriticalSection(&x->cond_write->threads_count_lock); \
266 x->cond_read->threads_count++; \
267 LeaveCriticalSection(&x->cond_write->threads_count_lock); \
268 res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
269 if (res != WAIT_OBJECT_0) break; \
270 } \
271 x->readers_count--; \
272 } \
273 if (res == 0) \
274 x->readers++; \
275 LKU(x->mutex); \
276 } while (0)
277# define LRWKU(x) \
278 do { \
279 LKL(x->mutex); \
280 if (x->writers) \
281 { \
282 x->writers = 0; \
283 if (x->readers_count == 1) \
284 { \
285 EnterCriticalSection(&x->cond_read->threads_count_lock); \
286 if (x->cond_read->threads_count > 0) \
287 ReleaseSemaphore(x->cond_read->semaphore, 1, 0); \
288 LeaveCriticalSection(&x->cond_read->threads_count_lock); \
289 } \
290 else if (x->readers_count > 0) \
291 CDB(x->cond_read); \
292 else if (x->writers_count > 0) \
293 { \
294 EnterCriticalSection (&x->cond_write->threads_count_lock); \
295 if (x->cond_write->threads_count > 0) \
296 ReleaseSemaphore(x->cond_write->semaphore, 1, 0); \
297 LeaveCriticalSection (&x->cond_write->threads_count_lock); \
298 } \
299 } \
300 else if (x->readers > 0) \
301 { \
302 x->readers--; \
303 if (x->readers == 0 && x->writers_count > 0) \
304 { \
305 EnterCriticalSection (&x->cond_write->threads_count_lock); \
306 if (x->cond_write->threads_count > 0) \
307 ReleaseSemaphore(x->cond_write->semaphore, 1, 0); \
308 LeaveCriticalSection (&x->cond_write->threads_count_lock); \
309 } \
310 } \
311 LKU(x->mutex); \
312 } while (0)
313
314# endif 114# endif
315 115
316#endif 116#endif
@@ -336,14 +136,24 @@ struct _Ecore_Pthread_Worker
336 { 136 {
337 Ecore_Thread_Cb func_heavy; 137 Ecore_Thread_Cb func_heavy;
338 Ecore_Thread_Notify_Cb func_notify; 138 Ecore_Thread_Notify_Cb func_notify;
339 Ecore_Pipe *notify;
340 139
341 Ecore_Pipe *direct_pipe;
342 Ecore_Pthread_Worker *direct_worker; 140 Ecore_Pthread_Worker *direct_worker;
343 141
344 int send; 142 int send;
345 int received; 143 int received;
346 } feedback_run; 144 } feedback_run;
145 struct {
146 Ecore_Thread_Cb func_main;
147 Ecore_Thread_Notify_Cb func_notify;
148
149 Ecore_Pipe *send;
150 Ecore_Pthread_Worker *direct_worker;
151
152 struct {
153 int send;
154 int received;
155 } from, to;
156 } message_run;
347 } u; 157 } u;
348 158
349 Ecore_Thread_Cb func_cancel; 159 Ecore_Thread_Cb func_cancel;
@@ -357,47 +167,63 @@ struct _Ecore_Pthread_Worker
357 167
358 const void *data; 168 const void *data;
359 169
360 Eina_Bool cancel : 1; 170 volatile int cancel;
361 Eina_Bool feedback_run : 1; 171
362 Eina_Bool kill : 1; 172#ifdef EFL_HAVE_THREADS
363 Eina_Bool reschedule : 1; 173 LK(cancel_mutex);
364 Eina_Bool no_queue : 1; 174#endif
175
176 Eina_Bool message_run : 1;
177 Eina_Bool feedback_run : 1;
178 Eina_Bool kill : 1;
179 Eina_Bool reschedule : 1;
180 Eina_Bool no_queue : 1;
365}; 181};
366 182
367#ifdef EFL_HAVE_THREADS 183#ifdef EFL_HAVE_THREADS
368typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data; 184typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data;
369
370struct _Ecore_Pthread_Data 185struct _Ecore_Pthread_Data
371{ 186{
372 Ecore_Pthread_Worker *death_job; 187 Ecore_Pthread_Worker *death_job;
373 Ecore_Pipe *p;
374 void *data; 188 void *data;
375 PH(thread); 189 PH(thread);
376}; 190};
191
192typedef struct _Ecore_Pthread_Notify Ecore_Pthread_Notify;
193struct _Ecore_Pthread_Notify
194{
195 Ecore_Pthread_Worker *work;
196 const void *user_data;
197};
198
199typedef void *(*Ecore_Thread_Sync_Cb)(void* data, Ecore_Thread *thread);
200
201typedef struct _Ecore_Pthread_Message Ecore_Pthread_Message;
202struct _Ecore_Pthread_Message
203{
204 union {
205 Ecore_Thread_Cb async;
206 Ecore_Thread_Sync_Cb sync;
207 } u;
208
209 const void *data;
210
211 int code;
212
213 Eina_Bool callback : 1;
214 Eina_Bool sync : 1;
215};
216
377#endif 217#endif
378 218
379static int _ecore_thread_count_max = 0; 219static int _ecore_thread_count_max = 0;
380static int ECORE_THREAD_PIPE_DEL = 0;
381static Eina_Array *_ecore_thread_pipe = NULL;
382 220
383#ifdef EFL_HAVE_THREADS 221#ifdef EFL_HAVE_THREADS
384 222
385static void _ecore_thread_handler(void *data __UNUSED__, 223static void _ecore_thread_handler(void *data);
386 void *buffer,
387 unsigned int nbyte);
388
389static Ecore_Pipe *
390_ecore_thread_pipe_get(void)
391{
392 if (eina_array_count(_ecore_thread_pipe) > 0)
393 return eina_array_pop(_ecore_thread_pipe);
394
395 return ecore_pipe_add(_ecore_thread_handler, NULL);
396}
397 224
398static int _ecore_thread_count = 0; 225static int _ecore_thread_count = 0;
399 226
400static Ecore_Event_Handler *del_handler = NULL;
401static Eina_List *_ecore_active_job_threads = NULL; 227static Eina_List *_ecore_active_job_threads = NULL;
402static Eina_List *_ecore_pending_job_threads = NULL; 228static Eina_List *_ecore_pending_job_threads = NULL;
403static Eina_List *_ecore_pending_job_threads_feedback = NULL; 229static Eina_List *_ecore_pending_job_threads_feedback = NULL;
@@ -435,6 +261,10 @@ static PH(get_main_loop_thread) (void)
435static void 261static void
436_ecore_thread_worker_free(Ecore_Pthread_Worker *worker) 262_ecore_thread_worker_free(Ecore_Pthread_Worker *worker)
437{ 263{
264 LKD(worker->cancel_mutex);
265 CDD(worker->cond);
266 LKD(worker->mutex);
267
438 if (_ecore_thread_worker_count > (_ecore_thread_count_max + 1) * 16) 268 if (_ecore_thread_worker_count > (_ecore_thread_count_max + 1) * 16)
439 { 269 {
440 free(worker); 270 free(worker);
@@ -454,38 +284,15 @@ _ecore_thread_data_free(void *data)
454} 284}
455 285
456static void 286static void
457_ecore_thread_pipe_free(void *data __UNUSED__,
458 void *event)
459{
460 Ecore_Pipe *p = event;
461
462 if (eina_array_count(_ecore_thread_pipe) < 50)
463 eina_array_push(_ecore_thread_pipe, p);
464 else
465 ecore_pipe_del(p);
466 eina_threads_shutdown();
467}
468
469static Eina_Bool
470_ecore_thread_pipe_del(void *data __UNUSED__,
471 int type __UNUSED__,
472 void *event __UNUSED__)
473{
474 /* This is a hack to delay pipe destruction until we are out of its internal loop. */
475 return ECORE_CALLBACK_CANCEL;
476}
477
478static void
479_ecore_thread_end(Ecore_Pthread_Data *pth, 287_ecore_thread_end(Ecore_Pthread_Data *pth,
480 Ecore_Thread *work) 288 Ecore_Thread *work)
481{ 289{
482 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)work; 290 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)work;
483 Ecore_Pipe *p;
484 291
485 if (!worker->feedback_run || (worker->feedback_run && !worker->no_queue)) 292 if (!worker->message_run || !worker->feedback_run || (worker->feedback_run && !worker->no_queue))
486 _ecore_thread_count--; 293 _ecore_thread_count--;
487 294
488 if (PHJ(pth->thread, p) != 0) 295 if (PHJ(pth->thread) != 0)
489 return; 296 return;
490 297
491 if (eina_list_count(_ecore_pending_job_threads) > 0 298 if (eina_list_count(_ecore_pending_job_threads) > 0
@@ -496,7 +303,7 @@ _ecore_thread_end(Ecore_Pthread_Data *pth,
496 INF("spawning threads because of still pending jobs."); 303 INF("spawning threads because of still pending jobs.");
497 304
498 pth->death_job = _ecore_thread_worker_new(); 305 pth->death_job = _ecore_thread_worker_new();
499 if (!pth->p || !pth->death_job) goto end; 306 if (!pth->death_job) goto end;
500 307
501 eina_threads_init(); 308 eina_threads_init();
502 309
@@ -514,7 +321,6 @@ end:
514 321
515 _ecore_active_job_threads = eina_list_remove(_ecore_active_job_threads, pth); 322 _ecore_active_job_threads = eina_list_remove(_ecore_active_job_threads, pth);
516 323
517 ecore_event_add(ECORE_THREAD_PIPE_DEL, pth->p, _ecore_thread_pipe_free, NULL);
518 free(pth); 324 free(pth);
519} 325}
520 326
@@ -534,30 +340,18 @@ _ecore_thread_kill(Ecore_Pthread_Worker *work)
534 340
535 if (work->feedback_run) 341 if (work->feedback_run)
536 { 342 {
537 ecore_pipe_del(work->u.feedback_run.notify);
538
539 if (work->u.feedback_run.direct_pipe)
540 eina_array_push(_ecore_thread_pipe, work->u.feedback_run.direct_pipe);
541 if (work->u.feedback_run.direct_worker) 343 if (work->u.feedback_run.direct_worker)
542 _ecore_thread_worker_free(work->u.feedback_run.direct_worker); 344 _ecore_thread_worker_free(work->u.feedback_run.direct_worker);
543 } 345 }
544 CDD(work->cond);
545 LKD(work->mutex);
546 if (work->hash) 346 if (work->hash)
547 eina_hash_free(work->hash); 347 eina_hash_free(work->hash);
548 _ecore_thread_worker_free(work); 348 _ecore_thread_worker_free(work);
549} 349}
550 350
551static void 351static void
552_ecore_thread_handler(void *data __UNUSED__, 352_ecore_thread_handler(void *data)
553 void *buffer,
554 unsigned int nbyte)
555{ 353{
556 Ecore_Pthread_Worker *work; 354 Ecore_Pthread_Worker *work = data;
557
558 if (nbyte != sizeof (Ecore_Pthread_Worker *)) return;
559
560 work = *(Ecore_Pthread_Worker **)buffer;
561 355
562 if (work->feedback_run) 356 if (work->feedback_run)
563 { 357 {
@@ -571,17 +365,20 @@ _ecore_thread_handler(void *data __UNUSED__,
571 _ecore_thread_kill(work); 365 _ecore_thread_kill(work);
572} 366}
573 367
368#if 0
574static void 369static void
575_ecore_notify_handler(void *data, 370_ecore_nothing_handler(void *data __UNUSED__, void *buffer __UNUSED__, unsigned int nbyte __UNUSED__)
576 void *buffer,
577 unsigned int nbyte)
578{ 371{
579 Ecore_Pthread_Worker *work = data; 372}
580 void *user_data; 373#endif
581 374
582 if (nbyte != sizeof (Ecore_Pthread_Worker *)) return; 375static void
376_ecore_notify_handler(void *data)
377{
378 Ecore_Pthread_Notify *notify = data;
379 Ecore_Pthread_Worker *work = notify->work;
380 void *user_data = (void*) notify->user_data;
583 381
584 user_data = *(void **)buffer;
585 work->u.feedback_run.received++; 382 work->u.feedback_run.received++;
586 383
587 if (work->u.feedback_run.func_notify) 384 if (work->u.feedback_run.func_notify)
@@ -592,16 +389,64 @@ _ecore_notify_handler(void *data,
592 { 389 {
593 _ecore_thread_kill(work); 390 _ecore_thread_kill(work);
594 } 391 }
392
393 free(notify);
394}
395
396static void
397_ecore_message_notify_handler(void *data)
398{
399 Ecore_Pthread_Notify *notify = data;
400 Ecore_Pthread_Worker *work = notify->work;
401 Ecore_Pthread_Message *user_data = (void *) notify->user_data;
402 Eina_Bool delete = EINA_TRUE;
403
404 work->u.message_run.from.received++;
405
406 if (!user_data->callback)
407 {
408 if (work->u.message_run.func_notify)
409 work->u.message_run.func_notify((void *) work->data, (Ecore_Thread *) work, (void *) user_data->data);
410 }
411 else
412 {
413 if (user_data->sync)
414 {
415 user_data->data = user_data->u.sync((void*) user_data->data, (Ecore_Thread *) work);
416 user_data->callback = EINA_FALSE;
417 user_data->code = INT_MAX;
418 ecore_pipe_write(work->u.message_run.send, &user_data, sizeof (Ecore_Pthread_Message *));
419
420 delete = EINA_FALSE;
421 }
422 else
423 {
424 user_data->u.async((void*) user_data->data, (Ecore_Thread *) work);
425 }
426 }
427
428 if (delete)
429 {
430 free(user_data);
431 }
432
433 /* Force reading all notify event before killing the thread */
434 if (work->kill && work->u.message_run.from.send == work->u.message_run.from.received)
435 {
436 _ecore_thread_kill(work);
437 }
438 free(notify);
595} 439}
596 440
597static void 441static void
598_ecore_short_job(Ecore_Pipe *end_pipe, 442_ecore_short_job(PH(thread))
599 PH(thread))
600{ 443{
601 Ecore_Pthread_Worker *work; 444 Ecore_Pthread_Worker *work;
602 445
603 while (_ecore_pending_job_threads) 446 while (_ecore_pending_job_threads)
604 { 447 {
448 int cancel;
449
605 LKL(_ecore_pending_job_threads_mutex); 450 LKL(_ecore_pending_job_threads_mutex);
606 451
607 if (!_ecore_pending_job_threads) 452 if (!_ecore_pending_job_threads)
@@ -616,9 +461,12 @@ _ecore_short_job(Ecore_Pipe *end_pipe,
616 461
617 LKU(_ecore_pending_job_threads_mutex); 462 LKU(_ecore_pending_job_threads_mutex);
618 463
464 LKL(work->cancel_mutex);
465 cancel = work->cancel;
466 LKU(work->cancel_mutex);
619 work->self = thread; 467 work->self = thread;
620 if (!work->cancel) 468 if (!cancel)
621 work->u.short_run.func_blocking((void *)work->data, (Ecore_Thread *)work); 469 work->u.short_run.func_blocking((void *) work->data, (Ecore_Thread*) work);
622 470
623 if (work->reschedule) 471 if (work->reschedule)
624 { 472 {
@@ -630,19 +478,20 @@ _ecore_short_job(Ecore_Pipe *end_pipe,
630 } 478 }
631 else 479 else
632 { 480 {
633 ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *)); 481 ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
634 } 482 }
635 } 483 }
636} 484}
637 485
638static void 486static void
639_ecore_feedback_job(Ecore_Pipe *end_pipe, 487_ecore_feedback_job(PH(thread))
640 PH(thread))
641{ 488{
642 Ecore_Pthread_Worker *work; 489 Ecore_Pthread_Worker *work;
643 490
644 while (_ecore_pending_job_threads_feedback) 491 while (_ecore_pending_job_threads_feedback)
645 { 492 {
493 int cancel;
494
646 LKL(_ecore_pending_job_threads_mutex); 495 LKL(_ecore_pending_job_threads_mutex);
647 496
648 if (!_ecore_pending_job_threads_feedback) 497 if (!_ecore_pending_job_threads_feedback)
@@ -657,9 +506,12 @@ _ecore_feedback_job(Ecore_Pipe *end_pipe,
657 506
658 LKU(_ecore_pending_job_threads_mutex); 507 LKU(_ecore_pending_job_threads_mutex);
659 508
509 LKL(work->cancel_mutex);
510 cancel = work->cancel;
511 LKU(work->cancel_mutex);
660 work->self = thread; 512 work->self = thread;
661 if (!work->cancel) 513 if (!cancel)
662 work->u.feedback_run.func_heavy((void *)work->data, (Ecore_Thread *)work); 514 work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
663 515
664 if (work->reschedule) 516 if (work->reschedule)
665 { 517 {
@@ -671,7 +523,7 @@ _ecore_feedback_job(Ecore_Pipe *end_pipe,
671 } 523 }
672 else 524 else
673 { 525 {
674 ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *)); 526 ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
675 } 527 }
676 } 528 }
677} 529}
@@ -679,6 +531,7 @@ _ecore_feedback_job(Ecore_Pipe *end_pipe,
679static void * 531static void *
680_ecore_direct_worker(Ecore_Pthread_Worker *work) 532_ecore_direct_worker(Ecore_Pthread_Worker *work)
681{ 533{
534 Ecore_Pthread_Worker *end;
682 Ecore_Pthread_Data *pth; 535 Ecore_Pthread_Data *pth;
683 536
684#ifdef EFL_POSIX_THREADS 537#ifdef EFL_POSIX_THREADS
@@ -691,40 +544,49 @@ _ecore_direct_worker(Ecore_Pthread_Worker *work)
691 pth = malloc(sizeof (Ecore_Pthread_Data)); 544 pth = malloc(sizeof (Ecore_Pthread_Data));
692 if (!pth) return NULL; 545 if (!pth) return NULL;
693 546
694 pth->p = work->u.feedback_run.direct_pipe;
695 if (!pth->p)
696 {
697 free(pth);
698 return NULL;
699 }
700 pth->thread = PHS(); 547 pth->thread = PHS();
701 548
702 work->self = pth->thread; 549 work->self = pth->thread;
703 work->u.feedback_run.func_heavy((void *)work->data, (Ecore_Thread *)work); 550 if (work->message_run)
551 work->u.message_run.func_main((void *) work->data, (Ecore_Thread *) work);
552 else
553 work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
704 554
705 ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *)); 555 if (work->message_run)
556 {
557 end = work->u.message_run.direct_worker;
558 work->u.message_run.direct_worker = NULL;
559 }
560 else
561 {
562 end = work->u.feedback_run.direct_worker;
563 work->u.feedback_run.direct_worker = NULL;
564 }
706 565
707 work = work->u.feedback_run.direct_worker; 566 ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
708 if (!work) 567
568 if (!end)
709 { 569 {
710 free(pth); 570 free(pth);
711 return NULL; 571 return NULL;
712 } 572 }
713 573
714 work->data = pth; 574 end->data = pth;
715 work->u.short_run.func_blocking = NULL; 575 end->u.short_run.func_blocking = NULL;
716 work->func_end = (void *)_ecore_thread_end; 576 end->func_end = (void *)_ecore_thread_end;
717 work->func_cancel = NULL; 577 end->func_cancel = NULL;
718 work->cancel = EINA_FALSE; 578 end->cancel = EINA_FALSE;
719 work->feedback_run = EINA_FALSE; 579 end->feedback_run = EINA_FALSE;
720 work->kill = EINA_FALSE; 580 end->message_run = EINA_FALSE;
721 work->hash = NULL; 581 end->no_queue = EINA_FALSE;
722 CDI(work->cond); 582 end->kill = EINA_FALSE;
723 LKI(work->mutex); 583 end->hash = NULL;
724 584 LKI(end->mutex);
725 ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *)); 585 CDI(end->cond, end->mutex);
586
587 ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, end);
726 588
727 return pth->p; 589 return NULL;
728} 590}
729 591
730static void * 592static void *
@@ -740,8 +602,8 @@ _ecore_thread_worker(Ecore_Pthread_Data *pth)
740 eina_sched_prio_drop(); 602 eina_sched_prio_drop();
741 603
742restart: 604restart:
743 if (_ecore_pending_job_threads) _ecore_short_job(pth->p, pth->thread); 605 if (_ecore_pending_job_threads) _ecore_short_job(pth->thread);
744 if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->p, pth->thread); 606 if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->thread);
745 607
746 /* FIXME: Check if there is feedback running task todo, and switch to feedback run handler. */ 608 /* FIXME: Check if there is feedback running task todo, and switch to feedback run handler. */
747 609
@@ -777,14 +639,14 @@ restart:
777 work->func_cancel = NULL; 639 work->func_cancel = NULL;
778 work->cancel = EINA_FALSE; 640 work->cancel = EINA_FALSE;
779 work->feedback_run = EINA_FALSE; 641 work->feedback_run = EINA_FALSE;
642 work->message_run = EINA_FALSE;
780 work->kill = EINA_FALSE; 643 work->kill = EINA_FALSE;
644 work->no_queue = EINA_FALSE;
781 work->hash = NULL; 645 work->hash = NULL;
782 CDI(work->cond);
783 LKI(work->mutex);
784 646
785 ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *)); 647 ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
786 648
787 return pth->p; 649 return NULL;
788} 650}
789 651
790#endif 652#endif
@@ -800,6 +662,10 @@ _ecore_thread_worker_new(void)
800 if (!result) result = malloc(sizeof (Ecore_Pthread_Worker)); 662 if (!result) result = malloc(sizeof (Ecore_Pthread_Worker));
801 else _ecore_thread_worker_count--; 663 else _ecore_thread_worker_count--;
802 664
665 LKI(result->cancel_mutex);
666 LKI(result->mutex);
667 CDI(result->cond, result->mutex);
668
803 return result; 669 return result;
804#else 670#else
805 return malloc(sizeof (Ecore_Pthread_Worker)); 671 return malloc(sizeof (Ecore_Pthread_Worker));
@@ -813,16 +679,11 @@ _ecore_thread_init(void)
813 if (_ecore_thread_count_max <= 0) 679 if (_ecore_thread_count_max <= 0)
814 _ecore_thread_count_max = 1; 680 _ecore_thread_count_max = 1;
815 681
816 ECORE_THREAD_PIPE_DEL = ecore_event_type_new();
817 _ecore_thread_pipe = eina_array_new(8);
818
819#ifdef EFL_HAVE_THREADS 682#ifdef EFL_HAVE_THREADS
820 del_handler = ecore_event_handler_add(ECORE_THREAD_PIPE_DEL, _ecore_thread_pipe_del, NULL);
821
822 LKI(_ecore_pending_job_threads_mutex); 683 LKI(_ecore_pending_job_threads_mutex);
823 LRWKI(_ecore_thread_global_hash_lock); 684 LRWKI(_ecore_thread_global_hash_lock);
824 LKI(_ecore_thread_global_hash_mutex); 685 LKI(_ecore_thread_global_hash_mutex);
825 CDI(_ecore_thread_global_hash_cond); 686 CDI(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex);
826#endif 687#endif
827} 688}
828 689
@@ -830,10 +691,6 @@ void
830_ecore_thread_shutdown(void) 691_ecore_thread_shutdown(void)
831{ 692{
832 /* FIXME: If function are still running in the background, should we kill them ? */ 693 /* FIXME: If function are still running in the background, should we kill them ? */
833 Ecore_Pipe *p;
834 Eina_Array_Iterator it;
835 unsigned int i;
836
837#ifdef EFL_HAVE_THREADS 694#ifdef EFL_HAVE_THREADS
838 Ecore_Pthread_Worker *work; 695 Ecore_Pthread_Worker *work;
839 Ecore_Pthread_Data *pth; 696 Ecore_Pthread_Data *pth;
@@ -843,46 +700,39 @@ _ecore_thread_shutdown(void)
843 EINA_LIST_FREE(_ecore_pending_job_threads, work) 700 EINA_LIST_FREE(_ecore_pending_job_threads, work)
844 { 701 {
845 if (work->func_cancel) 702 if (work->func_cancel)
846 work->func_cancel((void *)work->data, (Ecore_Thread *)work); 703 work->func_cancel((void *)work->data, (Ecore_Thread *) work);
847 free(work); 704 free(work);
848 } 705 }
849 706
850 EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work) 707 EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work)
851 { 708 {
852 if (work->func_cancel) 709 if (work->func_cancel)
853 work->func_cancel((void *)work->data, (Ecore_Thread *)work); 710 work->func_cancel((void *)work->data, (Ecore_Thread *) work);
854 free(work); 711 free(work);
855 } 712 }
856 713
857 LKU(_ecore_pending_job_threads_mutex); 714 LKU(_ecore_pending_job_threads_mutex);
858 715
859 /* Improve emergency shutdown */ 716 /* FIXME: Improve emergency shutdown, now that we use async call, we can do something */
860 EINA_LIST_FREE(_ecore_active_job_threads, pth) 717 EINA_LIST_FREE(_ecore_active_job_threads, pth)
861 { 718 {
862 Ecore_Pipe *ep;
863
864 PHA(pth->thread); 719 PHA(pth->thread);
865 PHJ(pth->thread, ep); 720 PHJ(pth->thread);
866
867 ecore_pipe_del(pth->p);
868 } 721 }
869 if (_ecore_thread_global_hash) 722 if (_ecore_thread_global_hash)
870 eina_hash_free(_ecore_thread_global_hash); 723 eina_hash_free(_ecore_thread_global_hash);
871 _ecore_event_handler_del(del_handler);
872 have_main_loop_thread = 0; 724 have_main_loop_thread = 0;
873 del_handler = NULL; 725
726 while ((work = eina_trash_pop(&_ecore_thread_worker_trash)))
727 {
728 free(work);
729 }
874 730
875 LKD(_ecore_pending_job_threads_mutex); 731 LKD(_ecore_pending_job_threads_mutex);
876 LRWKD(_ecore_thread_global_hash_lock); 732 LRWKD(_ecore_thread_global_hash_lock);
877 LKD(_ecore_thread_global_hash_mutex); 733 LKD(_ecore_thread_global_hash_mutex);
878 CDD(_ecore_thread_global_hash_cond); 734 CDD(_ecore_thread_global_hash_cond);
879#endif 735#endif
880
881 EINA_ARRAY_ITER_NEXT(_ecore_thread_pipe, i, p, it)
882 ecore_pipe_del(p);
883
884 eina_array_free(_ecore_thread_pipe);
885 _ecore_thread_pipe = NULL;
886} 736}
887 737
888void 738void
@@ -927,15 +777,15 @@ ecore_thread_run(Ecore_Thread_Cb func_blocking,
927 work->func_cancel = func_cancel; 777 work->func_cancel = func_cancel;
928 work->cancel = EINA_FALSE; 778 work->cancel = EINA_FALSE;
929 work->feedback_run = EINA_FALSE; 779 work->feedback_run = EINA_FALSE;
780 work->message_run = EINA_FALSE;
930 work->kill = EINA_FALSE; 781 work->kill = EINA_FALSE;
931 work->reschedule = EINA_FALSE; 782 work->reschedule = EINA_FALSE;
783 work->no_queue = EINA_FALSE;
932 work->data = data; 784 work->data = data;
933 785
934#ifdef EFL_HAVE_THREADS 786#ifdef EFL_HAVE_THREADS
935 work->self = 0; 787 work->self = 0;
936 work->hash = NULL; 788 work->hash = NULL;
937 CDI(work->cond);
938 LKI(work->mutex);
939 789
940 LKL(_ecore_pending_job_threads_mutex); 790 LKL(_ecore_pending_job_threads_mutex);
941 _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work); 791 _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
@@ -952,9 +802,8 @@ ecore_thread_run(Ecore_Thread_Cb func_blocking,
952 pth = malloc(sizeof (Ecore_Pthread_Data)); 802 pth = malloc(sizeof (Ecore_Pthread_Data));
953 if (!pth) goto on_error; 803 if (!pth) goto on_error;
954 804
955 pth->p = _ecore_thread_pipe_get();
956 pth->death_job = _ecore_thread_worker_new(); 805 pth->death_job = _ecore_thread_worker_new();
957 if (!pth->p || !pth->death_job) goto on_error; 806 if (!pth->death_job) goto on_error;
958 807
959 eina_threads_init(); 808 eina_threads_init();
960 809
@@ -969,7 +818,6 @@ ecore_thread_run(Ecore_Thread_Cb func_blocking,
969on_error: 818on_error:
970 if (pth) 819 if (pth)
971 { 820 {
972 if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
973 if (pth->death_job) _ecore_thread_worker_free(pth->death_job); 821 if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
974 free(pth); 822 free(pth);
975 } 823 }
@@ -981,7 +829,11 @@ on_error:
981 LKU(_ecore_pending_job_threads_mutex); 829 LKU(_ecore_pending_job_threads_mutex);
982 830
983 if (work->func_cancel) 831 if (work->func_cancel)
984 work->func_cancel((void *)work->data, (Ecore_Thread *)work); 832 work->func_cancel((void *) work->data, (Ecore_Thread *) work);
833
834 CDD(work->cond);
835 LKD(work->mutex);
836 LKD(work->cancel_mutex);
985 free(work); 837 free(work);
986 work = NULL; 838 work = NULL;
987 } 839 }
@@ -1013,12 +865,16 @@ EAPI Eina_Bool
1013ecore_thread_cancel(Ecore_Thread *thread) 865ecore_thread_cancel(Ecore_Thread *thread)
1014{ 866{
1015#ifdef EFL_HAVE_THREADS 867#ifdef EFL_HAVE_THREADS
1016 Ecore_Pthread_Worker *work = (Ecore_Pthread_Worker *)thread; 868 Ecore_Pthread_Worker *volatile work = (Ecore_Pthread_Worker *)thread;
1017 Eina_List *l; 869 Eina_List *l;
870 int cancel;
1018 871
1019 if (!work) 872 if (!work)
1020 return EINA_TRUE; 873 return EINA_TRUE;
1021 if (work->cancel) 874 LKL(work->cancel_mutex);
875 cancel = work->cancel;
876 LKU(work->cancel_mutex);
877 if (cancel)
1022 return EINA_FALSE; 878 return EINA_FALSE;
1023 879
1024 if (work->feedback_run) 880 if (work->feedback_run)
@@ -1070,9 +926,14 @@ ecore_thread_cancel(Ecore_Thread *thread)
1070 926
1071 LKU(_ecore_pending_job_threads_mutex); 927 LKU(_ecore_pending_job_threads_mutex);
1072 928
929 work = (Ecore_Pthread_Worker *)thread;
930
1073 /* Delay the destruction */ 931 /* Delay the destruction */
1074on_exit: 932 on_exit:
1075 ((Ecore_Pthread_Worker *)thread)->cancel = EINA_TRUE; 933 LKL(work->cancel_mutex);
934 work->cancel = EINA_TRUE;
935 LKU(work->cancel_mutex);
936
1076 return EINA_FALSE; 937 return EINA_FALSE;
1077#else 938#else
1078 (void) thread; 939 (void) thread;
@@ -1083,10 +944,23 @@ on_exit:
1083EAPI Eina_Bool 944EAPI Eina_Bool
1084ecore_thread_check(Ecore_Thread *thread) 945ecore_thread_check(Ecore_Thread *thread)
1085{ 946{
1086 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread; 947 Ecore_Pthread_Worker *volatile worker = (Ecore_Pthread_Worker *) thread;
948 int cancel;
1087 949
1088 if (!worker) return EINA_TRUE; 950 if (!worker) return EINA_TRUE;
1089 return worker->cancel; 951#ifdef EFL_HAVE_THREADS
952 LKL(worker->cancel_mutex);
953#endif
954 cancel = worker->cancel;
955 /* FIXME: there is an insane bug driving me nuts here. I don't know if
956 it's a race condition, some cache issue or some alien attack on our software.
957 But ecore_thread_check will only work correctly with a printf, all the volatile,
958 lock and even usleep don't help here... */
959 /* fprintf(stderr, "wc: %i\n", cancel); */
960#ifdef EFL_HAVE_THREADS
961 LKU(worker->cancel_mutex);
962#endif
963 return cancel;
1090} 964}
1091 965
1092EAPI Ecore_Thread * 966EAPI Ecore_Thread *
@@ -1109,12 +983,11 @@ ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy,
1109 worker->u.feedback_run.func_heavy = func_heavy; 983 worker->u.feedback_run.func_heavy = func_heavy;
1110 worker->u.feedback_run.func_notify = func_notify; 984 worker->u.feedback_run.func_notify = func_notify;
1111 worker->hash = NULL; 985 worker->hash = NULL;
1112 CDI(worker->cond);
1113 LKI(worker->mutex);
1114 worker->func_cancel = func_cancel; 986 worker->func_cancel = func_cancel;
1115 worker->func_end = func_end; 987 worker->func_end = func_end;
1116 worker->data = data; 988 worker->data = data;
1117 worker->cancel = EINA_FALSE; 989 worker->cancel = EINA_FALSE;
990 worker->message_run = EINA_FALSE;
1118 worker->feedback_run = EINA_TRUE; 991 worker->feedback_run = EINA_TRUE;
1119 worker->kill = EINA_FALSE; 992 worker->kill = EINA_FALSE;
1120 worker->reschedule = EINA_FALSE; 993 worker->reschedule = EINA_FALSE;
@@ -1123,15 +996,12 @@ ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy,
1123 worker->u.feedback_run.send = 0; 996 worker->u.feedback_run.send = 0;
1124 worker->u.feedback_run.received = 0; 997 worker->u.feedback_run.received = 0;
1125 998
1126 worker->u.feedback_run.notify = ecore_pipe_add(_ecore_notify_handler, worker);
1127 worker->u.feedback_run.direct_pipe = NULL;
1128 worker->u.feedback_run.direct_worker = NULL; 999 worker->u.feedback_run.direct_worker = NULL;
1129 1000
1130 if (!try_no_queue) 1001 if (try_no_queue)
1131 { 1002 {
1132 PH(t); 1003 PH(t);
1133 1004
1134 worker->u.feedback_run.direct_pipe = _ecore_thread_pipe_get();
1135 worker->u.feedback_run.direct_worker = _ecore_thread_worker_new(); 1005 worker->u.feedback_run.direct_worker = _ecore_thread_worker_new();
1136 worker->no_queue = EINA_TRUE; 1006 worker->no_queue = EINA_TRUE;
1137 1007
@@ -1140,6 +1010,12 @@ ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy,
1140 if (PHC(t, _ecore_direct_worker, worker) == 0) 1010 if (PHC(t, _ecore_direct_worker, worker) == 0)
1141 return (Ecore_Thread *)worker; 1011 return (Ecore_Thread *)worker;
1142 1012
1013 if (worker->u.feedback_run.direct_worker)
1014 {
1015 _ecore_thread_worker_free(worker->u.feedback_run.direct_worker);
1016 worker->u.feedback_run.direct_worker = NULL;
1017 }
1018
1143 eina_threads_shutdown(); 1019 eina_threads_shutdown();
1144 } 1020 }
1145 1021
@@ -1160,9 +1036,8 @@ ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy,
1160 pth = malloc(sizeof (Ecore_Pthread_Data)); 1036 pth = malloc(sizeof (Ecore_Pthread_Data));
1161 if (!pth) goto on_error; 1037 if (!pth) goto on_error;
1162 1038
1163 pth->p = _ecore_thread_pipe_get();
1164 pth->death_job = _ecore_thread_worker_new(); 1039 pth->death_job = _ecore_thread_worker_new();
1165 if (!pth->p || !pth->death_job) goto on_error; 1040 if (!pth->death_job) goto on_error;
1166 1041
1167 eina_threads_init(); 1042 eina_threads_init();
1168 1043
@@ -1177,7 +1052,6 @@ ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy,
1177on_error: 1052on_error:
1178 if (pth) 1053 if (pth)
1179 { 1054 {
1180 if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
1181 if (pth->death_job) _ecore_thread_worker_free(pth->death_job); 1055 if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
1182 free(pth); 1056 free(pth);
1183 } 1057 }
@@ -1193,7 +1067,8 @@ on_error:
1193 1067
1194 if (worker) 1068 if (worker)
1195 { 1069 {
1196 ecore_pipe_del(worker->u.feedback_run.notify); 1070 CDD(worker->cond);
1071 LKD(worker->mutex);
1197 free(worker); 1072 free(worker);
1198 worker = NULL; 1073 worker = NULL;
1199 } 1074 }
@@ -1211,7 +1086,6 @@ on_error:
1211 */ 1086 */
1212 worker.u.feedback_run.func_heavy = func_heavy; 1087 worker.u.feedback_run.func_heavy = func_heavy;
1213 worker.u.feedback_run.func_notify = func_notify; 1088 worker.u.feedback_run.func_notify = func_notify;
1214 worker.u.feedback_run.notify = NULL;
1215 worker.u.feedback_run.send = 0; 1089 worker.u.feedback_run.send = 0;
1216 worker.u.feedback_run.received = 0; 1090 worker.u.feedback_run.received = 0;
1217 worker.func_cancel = func_cancel; 1091 worker.func_cancel = func_cancel;
@@ -1219,6 +1093,7 @@ on_error:
1219 worker.data = data; 1093 worker.data = data;
1220 worker.cancel = EINA_FALSE; 1094 worker.cancel = EINA_FALSE;
1221 worker.feedback_run = EINA_TRUE; 1095 worker.feedback_run = EINA_TRUE;
1096 worker.message_run = EINA_FALSE;
1222 worker.kill = EINA_FALSE; 1097 worker.kill = EINA_FALSE;
1223 1098
1224 do { 1099 do {
@@ -1241,13 +1116,48 @@ ecore_thread_feedback(Ecore_Thread *thread,
1241 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread; 1116 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
1242 1117
1243 if (!worker) return EINA_FALSE; 1118 if (!worker) return EINA_FALSE;
1244 if (!worker->feedback_run) return EINA_FALSE;
1245 1119
1246#ifdef EFL_HAVE_THREADS 1120#ifdef EFL_HAVE_THREADS
1247 if (!PHE(worker->self, PHS())) return EINA_FALSE; 1121 if (!PHE(worker->self, PHS())) return EINA_FALSE;
1248 1122
1249 worker->u.feedback_run.send++; 1123 if (worker->feedback_run)
1250 ecore_pipe_write(worker->u.feedback_run.notify, &data, sizeof (void *)); 1124 {
1125 Ecore_Pthread_Notify *notify;
1126
1127 notify = malloc(sizeof (Ecore_Pthread_Notify));
1128 if (!notify) return EINA_FALSE;
1129
1130 notify->user_data = data;
1131 notify->work = worker;
1132 worker->u.feedback_run.send++;
1133
1134 ecore_main_loop_thread_safe_call_async(_ecore_notify_handler, notify);
1135 }
1136 else if (worker->message_run)
1137 {
1138 Ecore_Pthread_Message *msg;
1139 Ecore_Pthread_Notify *notify;
1140
1141 msg = malloc(sizeof (Ecore_Pthread_Message*));
1142 if (!msg) return EINA_FALSE;
1143 msg->data = data;
1144 msg->callback = EINA_FALSE;
1145 msg->sync = EINA_FALSE;
1146
1147 notify = malloc(sizeof (Ecore_Pthread_Notify));
1148 if (!notify)
1149 {
1150 free(msg);
1151 return EINA_FALSE;
1152 }
1153 notify->work = worker;
1154 notify->user_data = msg;
1155
1156 worker->u.message_run.from.send++;
1157 ecore_main_loop_thread_safe_call_async(_ecore_message_notify_handler, notify);
1158 }
1159 else
1160 return EINA_FALSE;
1251 1161
1252 return EINA_TRUE; 1162 return EINA_TRUE;
1253#else 1163#else
@@ -1257,6 +1167,71 @@ ecore_thread_feedback(Ecore_Thread *thread,
1257#endif 1167#endif
1258} 1168}
1259 1169
1170#if 0
1171EAPI Ecore_Thread *
1172ecore_thread_message_run(Ecore_Thread_Cb func_main,
1173 Ecore_Thread_Notify_Cb func_notify,
1174 Ecore_Thread_Cb func_end,
1175 Ecore_Thread_Cb func_cancel,
1176 const void *data)
1177{
1178#ifdef EFL_HAVE_THREADS
1179 Ecore_Pthread_Worker *worker;
1180 PH(t);
1181
1182 if (!func_main) return NULL;
1183
1184 worker = _ecore_thread_worker_new();
1185 if (!worker) return NULL;
1186
1187 worker->u.message_run.func_main = func_main;
1188 worker->u.message_run.func_notify = func_notify;
1189 worker->u.message_run.direct_worker = _ecore_thread_worker_new();
1190 worker->u.message_run.send = ecore_pipe_add(_ecore_nothing_handler, worker);
1191 worker->u.message_run.from.send = 0;
1192 worker->u.message_run.from.received = 0;
1193 worker->u.message_run.to.send = 0;
1194 worker->u.message_run.to.received = 0;
1195
1196 ecore_pipe_freeze(worker->u.message_run.send);
1197
1198 worker->func_cancel = func_cancel;
1199 worker->func_end = func_end;
1200 worker->hash = NULL;
1201 worker->data = data;
1202
1203 worker->cancel = EINA_FALSE;
1204 worker->message_run = EINA_TRUE;
1205 worker->feedback_run = EINA_FALSE;
1206 worker->kill = EINA_FALSE;
1207 worker->reschedule = EINA_FALSE;
1208 worker->no_queue = EINA_FALSE;
1209 worker->self = 0;
1210
1211 eina_threads_init();
1212
1213 if (PHC(t, _ecore_direct_worker, worker) == 0)
1214 return (Ecore_Thread*) worker;
1215
1216 eina_threads_shutdown();
1217
1218 if (worker->u.message_run.direct_worker) _ecore_thread_worker_free(worker->u.message_run.direct_worker);
1219 if (worker->u.message_run.send) ecore_pipe_del(worker->u.message_run.send);
1220
1221 CDD(worker->cond);
1222 LKD(worker->mutex);
1223#else
1224 /* Note: This type of thread can't and never will work without thread support */
1225 WRN("ecore_thread_message_run called, but threads disable in Ecore, things will go wrong. Starting now !");
1226# warning "You disabled threads support in ecore, I hope you know what you are doing !"
1227#endif
1228
1229 func_cancel((void *) data, NULL);
1230
1231 return NULL;
1232}
1233#endif
1234
1260EAPI Eina_Bool 1235EAPI Eina_Bool
1261ecore_thread_reschedule(Ecore_Thread *thread) 1236ecore_thread_reschedule(Ecore_Thread *thread)
1262{ 1237{
@@ -1641,24 +1616,13 @@ ecore_thread_global_data_wait(const char *key,
1641 1616
1642 while (1) 1617 while (1)
1643 { 1618 {
1644#ifndef _WIN32
1645 struct timespec t = { 0, 0 };
1646
1647 t.tv_sec = (long int)tm;
1648 t.tv_nsec = (long int)((tm - (double)t.tv_sec) * 1000000000);
1649#else
1650 struct timeval t = { 0, 0 };
1651
1652 t.tv_sec = (long int)tm;
1653 t.tv_usec = (long int)((tm - (double)t.tv_sec) * 1000000);
1654#endif
1655 LRWKRL(_ecore_thread_global_hash_lock); 1619 LRWKRL(_ecore_thread_global_hash_lock);
1656 ret = eina_hash_find(_ecore_thread_global_hash, key); 1620 ret = eina_hash_find(_ecore_thread_global_hash, key);
1657 LRWKU(_ecore_thread_global_hash_lock); 1621 LRWKU(_ecore_thread_global_hash_lock);
1658 if ((ret) || (!seconds) || ((seconds > 0) && (tm <= ecore_time_get()))) 1622 if ((ret) || (!seconds) || ((seconds > 0) && (tm <= ecore_time_get())))
1659 break; 1623 break;
1660 LKL(_ecore_thread_global_hash_mutex); 1624 LKL(_ecore_thread_global_hash_mutex);
1661 CDW(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex, &t); 1625 CDW(_ecore_thread_global_hash_cond, tm);
1662 LKU(_ecore_thread_global_hash_mutex); 1626 LKU(_ecore_thread_global_hash_mutex);
1663 } 1627 }
1664 if (ret) return ret->data; 1628 if (ret) return ret->data;