aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/LuaSL/src/LuaSL_threads.c
diff options
context:
space:
mode:
Diffstat (limited to 'LuaSL/src/LuaSL_threads.c')
-rw-r--r--LuaSL/src/LuaSL_threads.c1229
1 files changed, 1224 insertions, 5 deletions
diff --git a/LuaSL/src/LuaSL_threads.c b/LuaSL/src/LuaSL_threads.c
index eb809b3..a15b506 100644
--- a/LuaSL/src/LuaSL_threads.c
+++ b/LuaSL/src/LuaSL_threads.c
@@ -32,19 +32,1238 @@ THE SOFTWARE.
32 32
33/* This is a redesign of luaproc. The design goals and notes - 33/* This is a redesign of luaproc. The design goals and notes -
34 * 34 *
35 * Use eina lists instead of the rolled your own lists.
36 * Looks like a FIFO double linked list.
37 * Use ecore threads instead of raw pthreads. 35 * Use ecore threads instead of raw pthreads.
38 * Ecore threads pretty much wraps pthreads on posix, but has Windows support to. 36 * Ecore threads pretty much wraps pthreads on posix, but has Windows support to.
39 * In general use EFL where it is useful. 37 * In general use EFL where it is useful.
40 * One fixed unique message channel per script. 38 * One fixed unique message channel per script.
41 * Probably one fixed unique message channel per object, which each script in the object shares. 39 * Probably one fixed unique message channel per object, which each script in the object shares.
42 * But might be better to handle that C side anyway. 40 * But might be better to handle that C side anyway.
43 * FIFO queue on message channels, seems the C socket queue is not enough. 41 * No need for channel.c / .h, we are not using that sort of arbitrary channels.
44 * On the other hand, could just peel messages the socket queue, then shove them on the scripts queue. 42 * FIFO queue on message channels, seems the C socket queue is not enough.
43 * On the other hand, could just peel messages of the socket queue, then shove them on the scripts queue.
45 * Better integration with LuaSL. 44 * Better integration with LuaSL.
46 * Merge the luaproc structure with the script structure. 45 * Merge the luaproc structure with the script structure.
47 * Merge in the edje Lua code, and keep an eye on that, coz we might want to actually add this to edje Lua in the future. 46 * Merge in the edje Lua code, and keep an eye on that, coz we might want to actually add this to edje Lua in the future.
47 * Get rid of luaproc.lua, should not need it.
48 * Use my coding standards, or EFL ones. Pffft. 48 * Use my coding standards, or EFL ones. Pffft.
49 * 49 *
50 */ 50 */
51
52#include "LuaSL.h"
53
54#include <netdb.h>
55#include <pthread.h>
56#include <string.h>
57#include <arpa/inet.h>
58#include <sys/select.h>
59#include <sys/socket.h>
60
61
62/*********
63* globals
64*********/
65
66/* global channel lua_State mutex */
67pthread_mutex_t mutex_channel_lstate = PTHREAD_MUTEX_INITIALIZER;
68
69/* global lua_State where channel hash table will be stored */
70lua_State *chanls = NULL;
71
72/* message channel */
73struct stchannel {
74 Eina_Clist send;
75 Eina_Clist recv;
76 pthread_mutex_t *mutex;
77 pthread_cond_t *in_use;
78};
79
80
81
82/* ready process list */
83Eina_Clist lpready;
84
85/* ready process queue access mutex */
86pthread_mutex_t mutex_queue_access = PTHREAD_MUTEX_INITIALIZER;
87
88/* wake worker up conditional variable */
89pthread_cond_t cond_wakeup_worker = PTHREAD_COND_INITIALIZER;
90
91/* active luaproc count access mutex */
92pthread_mutex_t mutex_lp_count = PTHREAD_MUTEX_INITIALIZER;
93
94/* no active luaproc conditional variable */
95pthread_cond_t cond_no_active_lp = PTHREAD_COND_INITIALIZER;
96
97/* number of active luaprocs */
98int lpcount = 0;
99
100/* no more lua processes flag */
101int no_more_processes = FALSE;
102
103
104
105/* channel operations mutex */
106pthread_mutex_t mutex_channel = PTHREAD_MUTEX_INITIALIZER;
107
108/* recycle list mutex */
109pthread_mutex_t mutex_recycle_list = PTHREAD_MUTEX_INITIALIZER;
110
111/* recycled lua process list */
112Eina_Clist recyclelp;
113
114/* maximum lua processes to recycle */
115int recyclemax = 0;
116
117/* lua process */
118struct stluaproc {
119 Eina_Clist node;
120 lua_State *lstate;
121 int stat;
122 int args;
123 channel chan;
124 int destroyworker;
125 void *data;
126 Ecore_Cb callback;
127};
128
129
130
131/******************************
132* library functions prototypes
133******************************/
134/* create a new lua process */
135static int luaproc_create_newproc( lua_State *L );
136/* send a message to a lua process */
137static int luaproc_send( lua_State *L );
138/* receive a message from a lua process */
139static int luaproc_receive( lua_State *L );
140/* create a new channel */
141static int luaproc_create_channel( lua_State *L );
142/* destroy a channel */
143static int luaproc_destroy_channel( lua_State *L );
144/* wait until all luaprocs have finished and exit */
145static int luaproc_exit( lua_State *L );
146/* create a new worker */
147static int luaproc_create_worker( lua_State *L );
148/* destroy a worker */
149static int luaproc_destroy_worker( lua_State *L );
150/* set amount of lua processes that should be recycled (ie, reused) */
151static int luaproc_recycle_set( lua_State *L );
152/* send a message back to the main loop */
153static int luaproc_send_back( lua_State *L );
154
155/* luaproc function registration array - main (parent) functions */
156static const struct luaL_reg luaproc_funcs_parent[] = {
157 { "newproc", luaproc_create_newproc },
158 { "exit", luaproc_exit },
159 { "createworker", luaproc_create_worker },
160 { "destroyworker", luaproc_destroy_worker },
161 { "recycle", luaproc_recycle_set },
162 { "sendback", luaproc_send_back },
163 { NULL, NULL }
164};
165
166/* luaproc function registration array - newproc (child) functions */
167static const struct luaL_reg luaproc_funcs_child[] = {
168 { "newproc", luaproc_create_newproc },
169 { "send", luaproc_send },
170 { "receive", luaproc_receive },
171 { "newchannel", luaproc_create_channel },
172 { "delchannel", luaproc_destroy_channel },
173 { "createworker", luaproc_create_worker },
174 { "destroyworker", luaproc_destroy_worker },
175 { "recycle", luaproc_recycle_set },
176 { "sendback", luaproc_send_back },
177 { NULL, NULL }
178};
179
180
181
182
183/* initialize channel table */
184void channel_init( void ) {
185 chanls = luaL_newstate();
186 lua_newtable( chanls );
187 lua_setglobal( chanls, "channeltb" );
188}
189
190/* create new channel */
191channel channel_create( const char *cname ) {
192
193 channel chan;
194
195 /* get exclusive access to the channel table */
196 pthread_mutex_lock( &mutex_channel_lstate );
197
198 /* create a new channel */
199 lua_getglobal( chanls, "channeltb");
200 lua_pushstring( chanls, cname );
201 chan = (channel )lua_newuserdata( chanls, sizeof( struct stchannel ));
202 eina_clist_init(&(chan->send));
203 eina_clist_init(&(chan->recv));
204 chan->mutex = (pthread_mutex_t *)malloc( sizeof( pthread_mutex_t ));
205 pthread_mutex_init( chan->mutex, NULL );
206 chan->in_use = (pthread_cond_t *)malloc( sizeof( pthread_cond_t ));
207 pthread_cond_init( chan->in_use, NULL );
208 lua_settable( chanls, -3 );
209 lua_pop( chanls, 1 );
210
211 /* let others access the channel table */
212 pthread_mutex_unlock( &mutex_channel_lstate );
213
214 return chan;
215}
216
217/* destroy a channel */
218int channel_destroy( channel chan, const char *chname ) {
219
220 /* get exclusive access to the channel table */
221 pthread_mutex_lock( &mutex_channel_lstate );
222
223 lua_getglobal( chanls, "channeltb");
224 lua_pushstring( chanls, chname );
225 lua_pushnil( chanls );
226 lua_settable( chanls, -3 );
227 lua_pop( chanls, 1 );
228
229 /* let others access the channel table */
230 pthread_mutex_unlock( &mutex_channel_lstate );
231
232 return CHANNEL_DESTROYED;
233}
234
235/* search for and return a channel with a given name */
236channel channel_search( const char *cname ) {
237
238 channel chan;
239
240 /* get exclusive access to the channel table */
241 pthread_mutex_lock( &mutex_channel_lstate );
242
243 /* search for channel */
244 lua_getglobal( chanls, "channeltb");
245 lua_getfield( chanls, -1, cname );
246 if (( lua_type( chanls, -1 )) == LUA_TUSERDATA ) {
247 chan = (channel )lua_touserdata( chanls, -1 );
248 } else {
249 chan = NULL;
250 }
251 lua_pop( chanls, 2 );
252
253 /* let others access channel table */
254 pthread_mutex_unlock( &mutex_channel_lstate );
255
256 return chan;
257}
258
259/* return a channel's send queue */
260Eina_Clist *channel_get_sendq( channel chan ) {
261 return &chan->send;
262}
263
264/* return a channel's receive queue */
265Eina_Clist *channel_get_recvq( channel chan ) {
266 return &chan->recv;
267}
268
269/* return a channel's mutex */
270pthread_mutex_t *channel_get_mutex( channel chan ) {
271 return chan->mutex;
272}
273
274/* return a channel's conditional variable */
275pthread_cond_t *channel_get_cond( channel chan ) {
276 return chan->in_use;
277}
278
279
280
281
282/* worker thread main function */
283void *workermain( void *args ) {
284
285 luaproc lp;
286 int procstat;
287 int destroyworker;
288
289 /* detach thread so resources are freed as soon as thread exits (no further joining) */
290 pthread_detach( pthread_self( ));
291
292 /* main worker loop */
293 while ( 1 ) {
294
295 /* get exclusive access to the ready process queue */
296 pthread_mutex_lock( &mutex_queue_access );
297
298 /* wait until instructed to wake up (because there's work to do or because its time to finish) */
299 while (( eina_clist_count( &lpready ) == 0 ) && ( no_more_processes == FALSE )) {
300 pthread_cond_wait( &cond_wakeup_worker, &mutex_queue_access );
301 }
302
303 /* pop the first node from the ready process queue */
304 if ((lp = (luaproc) eina_clist_head(&lpready)))
305 eina_clist_remove(&(lp->node));
306 else {
307 /* free access to the process ready queue */
308 pthread_mutex_unlock( &mutex_queue_access );
309 /* finished thread */
310 pthread_exit( NULL );
311 }
312
313 /* free access to the process ready queue */
314 pthread_mutex_unlock( &mutex_queue_access );
315
316 /* execute the lua code specified in the lua process struct */
317 procstat = lua_resume( luaproc_get_state( lp ), luaproc_get_args( lp ));
318
319 /* reset the process argument count */
320 luaproc_set_args( lp, 0 );
321
322 /* check if process finished its whole execution */
323 if ( procstat == 0 ) {
324
325 /* check if worker thread should be destroyed */
326 destroyworker = luaproc_get_destroyworker( lp );
327
328 /* set process status to finished */
329 luaproc_set_status( lp, LUAPROC_STAT_FINISHED );
330
331 /* check if lua process should be recycled and, if not, destroy it */
332 if ( luaproc_recycle_push( lp ) == FALSE ) {
333 lua_close( luaproc_get_state( lp ));
334 }
335
336 /* decrease active lua process count */
337 sched_lpcount_dec();
338
339 /* check if thread should be finished after lua process conclusion */
340 if ( destroyworker ) {
341 /* if so, finish thread */
342 pthread_exit( NULL );
343 }
344 }
345
346 /* check if process yielded */
347 else if ( procstat == LUA_YIELD ) {
348
349 /* if so, further check if yield originated from an unmatched send/recv operation */
350 if ( luaproc_get_status( lp ) == LUAPROC_STAT_BLOCKED_SEND ) {
351 /* queue blocked lua process on corresponding channel */
352 luaproc_queue_sender( lp );
353 /* unlock channel access */
354 luaproc_unlock_channel( luaproc_get_channel( lp ));
355 }
356
357 else if ( luaproc_get_status( lp ) == LUAPROC_STAT_BLOCKED_RECV ) {
358 /* queue blocked lua process on corresponding channel */
359 luaproc_queue_receiver( lp );
360 /* unlock channel access */
361 luaproc_unlock_channel( luaproc_get_channel( lp ));
362 }
363
364 /* or if yield resulted from an explicit call to coroutine.yield in the lua code being executed */
365 else {
366 /* get exclusive access to the ready process queue */
367 pthread_mutex_lock( &mutex_queue_access );
368 /* re-insert the job at the end of the ready process queue */
369 eina_clist_add_tail(&lpready, &(lp->node));
370 /* free access to the process ready queue */
371 pthread_mutex_unlock( &mutex_queue_access );
372 }
373 }
374
375 /* check if there was any execution error (LUA_ERRRUN, LUA_ERRSYNTAX, LUA_ERRMEM or LUA_ERRERR) */
376 else {
377 /* print error message */
378 fprintf( stderr, "close lua_State (error: %s)\n", luaL_checkstring( luaproc_get_state( lp ), -1 ));
379 /* close lua state */
380 lua_close( luaproc_get_state( lp ));
381 /* decrease active lua process count */
382 sched_lpcount_dec();
383 }
384 }
385}
386
387/* local scheduler initialization */
388int sched_init_local( int numworkers ) {
389
390 int tid;
391 int workercount = 0;
392 pthread_t worker;
393
394 /* initialize ready process list */
395// lpready = list_new();
396 eina_clist_init(&lpready);
397
398 /* initialize channels */
399 channel_init();
400
401 /* create initial worker threads */
402 for ( tid = 0; tid < numworkers; tid++ ) {
403 if ( pthread_create( &worker, NULL, workermain, NULL ) == 0 ) {
404 workercount++;
405 }
406 }
407
408 if ( workercount != numworkers ) {
409 return LUAPROC_SCHED_INIT_ERROR;
410 }
411
412 return LUAPROC_SCHED_OK;
413}
414
415/* exit scheduler */
416void sched_exit( void ) {
417
418 /* get exclusive access to the ready process queue */
419 pthread_mutex_lock( &mutex_queue_access );
420 /* free access to the process ready queue */
421 pthread_mutex_unlock( &mutex_queue_access );
422}
423
424/* move process to ready queue (ie, schedule process) */
425int sched_queue_proc( luaproc lp ) {
426
427 /* get exclusive access to the ready process queue */
428 pthread_mutex_lock( &mutex_queue_access );
429
430 /* add process to ready queue */
431 eina_clist_add_tail(&lpready, &(lp->node));
432
433 /* set process status to ready */
434 luaproc_set_status( lp, LUAPROC_STAT_READY );
435
436 /* wake worker up */
437 pthread_cond_signal( &cond_wakeup_worker );
438 /* free access to the process ready queue */
439 pthread_mutex_unlock( &mutex_queue_access );
440
441 return LUAPROC_SCHED_QUEUE_PROC_OK;
442}
443
444/* synchronize worker threads */
445void sched_join_workerthreads( void ) {
446
447 pthread_mutex_lock( &mutex_lp_count );
448
449 /* wait until there is no more active lua processes */
450 while( lpcount != 0 ) {
451 pthread_cond_wait( &cond_no_active_lp, &mutex_lp_count );
452 }
453 /* get exclusive access to the ready process queue */
454 pthread_mutex_lock( &mutex_queue_access );
455 /* set the no more active lua processes flag to true */
456 no_more_processes = TRUE;
457 /* wake ALL workers up */
458 pthread_cond_broadcast( &cond_wakeup_worker );
459 /* free access to the process ready queue */
460 pthread_mutex_unlock( &mutex_queue_access );
461
462// We don't need this, as we only get here during shutdown. Linking this to EFL results in a hang otherwise anyway.
463 /* wait for (join) worker threads */
464// pthread_exit( NULL );
465
466 pthread_mutex_unlock( &mutex_lp_count );
467
468}
469
470/* increase active lua process count */
471void sched_lpcount_inc( void ) {
472 pthread_mutex_lock( &mutex_lp_count );
473 lpcount++;
474 pthread_mutex_unlock( &mutex_lp_count );
475}
476
477/* decrease active lua process count */
478void sched_lpcount_dec( void ) {
479 pthread_mutex_lock( &mutex_lp_count );
480 lpcount--;
481 /* if count reaches zero, signal there are no more active processes */
482 if ( lpcount == 0 ) {
483 pthread_cond_signal( &cond_no_active_lp );
484 }
485 pthread_mutex_unlock( &mutex_lp_count );
486}
487
488/* create a new worker pthread */
489int sched_create_worker( void ) {
490
491 pthread_t worker;
492
493 /* create a new pthread */
494 if ( pthread_create( &worker, NULL, workermain, NULL ) != 0 ) {
495 return LUAPROC_SCHED_PTHREAD_ERROR;
496 }
497
498 return LUAPROC_SCHED_OK;
499}
500
501
502
503
504/*
505static void registerlib( lua_State *L, const char *name, lua_CFunction f ) {
506 lua_getglobal( L, "package" );
507 lua_getfield( L, -1, "preload" );
508 lua_pushcfunction( L, f );
509 lua_setfield( L, -2, name );
510 lua_pop( L, 2 );
511}
512*/
513static void openlibs( lua_State *L ) {
514/*
515 lua_cpcall( L, luaopen_base, NULL );
516 lua_cpcall( L, luaopen_package, NULL );
517 registerlib( L, "io", luaopen_io );
518 registerlib( L, "os", luaopen_os );
519 registerlib( L, "table", luaopen_table );
520 registerlib( L, "string", luaopen_string );
521 registerlib( L, "math", luaopen_math );
522 registerlib( L, "debug", luaopen_debug );
523*/
524 luaL_openlibs(L);
525}
526
527/* return status (boolean) indicating if lua process should be recycled */
528luaproc luaproc_recycle_pop( void ) {
529
530 luaproc lp;
531
532 /* get exclusive access to operate on recycle list */
533 pthread_mutex_lock( &mutex_recycle_list );
534
535 /* check if there are any lua processes on recycle list */
536 if ( eina_clist_count( &recyclelp ) > 0 ) {
537 /* pop list head */
538 if ((lp = (luaproc) eina_clist_head(&recyclelp)))
539 eina_clist_remove(&(lp->node));
540 /* free access to operate on recycle list */
541 pthread_mutex_unlock( &mutex_recycle_list );
542 /* return associated luaproc */
543 return lp;
544 }
545
546 /* free access to operate on recycle list */
547 pthread_mutex_unlock( &mutex_recycle_list );
548
549 /* if no lua processes are available simply return null */
550 return NULL;
551}
552
553/* check if lua process should be recycled and, in case so, add it to the recycle list */
554int luaproc_recycle_push( luaproc lp ) {
555
556 /* get exclusive access to operate on recycle list */
557 pthread_mutex_lock( &mutex_recycle_list );
558
559 /* check if amount of lua processes currently on recycle list is greater than
560 or equal to the maximum amount of lua processes that should be recycled */
561 if ( eina_clist_count( &recyclelp ) >= recyclemax ) {
562 /* free access to operate on recycle list */
563 pthread_mutex_unlock( &mutex_recycle_list );
564 /* if so, lua process should NOT be recycled and should be destroyed */
565 return FALSE;
566 }
567 /* otherwise, lua process should be added to recycle list */
568 eina_clist_add_tail( &recyclelp, &(lp->node) );
569 /* free access to operate on recycle list */
570 pthread_mutex_unlock( &mutex_recycle_list );
571 /* since lua process will be recycled, it should not be destroyed */
572 return TRUE;
573}
574
575/* create new luaproc */
576static luaproc luaproc_new( const char *code, int destroyflag, int file) {
577
578 luaproc lp;
579 int ret;
580 /* create new lua state */
581 lua_State *lpst = luaL_newstate( );
582 /* store the luaproc struct in its own lua state */
583 lp = (luaproc )lua_newuserdata( lpst, sizeof( struct stluaproc ));
584 lua_setfield( lpst, LUA_REGISTRYINDEX, "_SELF" );
585
586 eina_clist_element_init(&(lp->node));
587 lp->lstate = lpst;
588 lp->stat = LUAPROC_STAT_IDLE;
589 lp->args = 0;
590 lp->chan = NULL;
591 lp->destroyworker = destroyflag;
592
593 /* load standard libraries */
594 openlibs( lpst );
595
596 /* register luaproc's own functions */
597 luaL_register( lpst, "luaproc", luaproc_funcs_child );
598
599 /* load process' code */
600 if (file)
601 ret = luaL_loadfile( lpst, code );
602 else
603 ret = luaL_loadstring( lpst, code );
604 /* in case of errors, destroy recently created lua process */
605 if ( ret != 0 ) {
606 lua_close( lpst );
607 return NULL;
608 }
609
610 /* return recently created lua process */
611 return lp;
612}
613
614/* synchronize worker threads and exit */
615static int luaproc_exit( lua_State *L ) {
616 sched_join_workerthreads();
617 return 0;
618}
619
620/* create a new worker pthread */
621static int luaproc_create_worker( lua_State *L ) {
622
623 if ( sched_create_worker( ) != LUAPROC_SCHED_OK ) {
624 lua_pushnil( L );
625 lua_pushstring( L, "error creating worker" );
626 return 2;
627 }
628
629 lua_pushboolean( L, TRUE );
630 return 1;
631}
632
633/* set amount of lua processes that should be recycled (ie, reused) */
634static int luaproc_recycle_set( lua_State *L ) {
635
636 luaproc lp;
637 int max = luaL_checkint( L, 1 );
638
639 /* check if function argument represents a reasonable value */
640 if ( max < 0 ) {
641 /* in case of errors return nil + error msg */
642 lua_pushnil( L );
643 lua_pushstring( L, "error setting recycle limit to negative value" );
644 return 2;
645 }
646
647 /* get exclusive access to operate on recycle list */
648 pthread_mutex_lock( &mutex_recycle_list );
649
650 /* set maximum lua processes that should be recycled */
651 recyclemax = max;
652
653 /* destroy recycle list excessive nodes (and corresponding lua processes) */
654 while ( eina_clist_count( &recyclelp ) > max ) {
655 if ((lp = (luaproc) eina_clist_head(&recyclelp)))
656 eina_clist_remove(&(lp->node));
657 /* close associated lua_State */
658 lua_close( lp->lstate );
659 }
660
661 /* free access to operate on recycle list */
662 pthread_mutex_unlock( &mutex_recycle_list );
663
664 lua_pushboolean( L, TRUE );
665 return 1;
666}
667
668
669/* destroy a worker pthread */
670static int luaproc_destroy_worker( lua_State *L ) {
671
672 /* new lua process pointer */
673 luaproc lp;
674
675 /* create new lua process with empty code and destroy worker flag set to true
676 (ie, conclusion of lua process WILL result in worker thread destruction */
677 lp = luaproc_new( "", TRUE, FALSE );
678
679 /* ensure process creation was successfull */
680 if ( lp == NULL ) {
681 /* in case of errors return nil + error msg */
682 lua_pushnil( L );
683 lua_pushstring( L, "error destroying worker" );
684 return 2;
685 }
686
687 /* increase active luaproc count */
688 sched_lpcount_inc();
689
690 /* schedule luaproc */
691 if ( sched_queue_proc( lp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) {
692 printf( "[luaproc] error queueing Lua process\n" );
693 /* decrease active luaproc count */
694 sched_lpcount_dec();
695 /* close lua_State */
696 lua_close( lp->lstate );
697 /* return nil + error msg */
698 lua_pushnil( L );
699 lua_pushstring( L, "error destroying worker" );
700 return 2;
701 }
702
703 lua_pushboolean( L, TRUE );
704 return 1;
705}
706
707/* recycle a lua process */
708static luaproc luaproc_recycle( luaproc lp, const char *code, int file ) {
709
710 int ret;
711
712 /* reset struct members */
713 lp->stat = LUAPROC_STAT_IDLE;
714 lp->args = 0;
715 lp->chan = NULL;
716 lp->destroyworker = FALSE;
717
718 /* load process' code */
719 ret = luaL_loadstring( lp->lstate, code );
720
721 /* in case of errors, destroy lua process */
722 if ( ret != 0 ) {
723 lua_close( lp->lstate );
724 return NULL;
725 }
726
727 /* return recycled lua process */
728 return lp;
729}
730
731
732int newProc(const char *code, int file, Ecore_Cb callback, void *data)
733{
734 /* new lua process pointer */
735 luaproc lp;
736
737 /* check if existing lua process should be recycled to avoid new creation */
738 lp = luaproc_recycle_pop( );
739
740 /* if there is a lua process available on the recycle queue, recycle it */
741 if ( lp != NULL ) {
742 lp = luaproc_recycle( lp, code, file );
743 }
744 /* otherwise create a new one from scratch */
745 else {
746 /* create new lua process with destroy worker flag set to false
747 (ie, conclusion of lua process will NOT result in worker thread destruction */
748 lp = luaproc_new( code, FALSE, file );
749 }
750
751 /* ensure process creation was successfull */
752 if ( lp == NULL ) {
753 return 1;
754 }
755
756 /* Stash any data and callback given to us. */
757 lp->data = data;
758 lp->callback = callback;
759
760 /* increase active luaproc count */
761 sched_lpcount_inc();
762
763 /* schedule luaproc */
764 if ( sched_queue_proc( lp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) {
765 printf( "[luaproc] error queueing Lua process\n" );
766 /* decrease active luaproc count */
767 sched_lpcount_dec();
768 /* close lua_State */
769 lua_close( lp->lstate );
770 return 2;
771 }
772
773 return 0;
774}
775
776/* create and schedule a new lua process (luaproc.newproc) */
777static int luaproc_create_newproc( lua_State *L ) {
778
779 /* check if first argument is a string (lua code) */
780 const char *code = luaL_checkstring( L, 1 );
781
782 switch (newProc(code, FALSE, NULL, NULL))
783 {
784 case 1 :
785 /* in case of errors return nil + error msg */
786 lua_pushnil( L );
787 lua_pushstring( L, "error loading code string" );
788 return 2;
789 case 2 :
790 /* return nil + error msg */
791 lua_pushnil( L );
792 lua_pushstring( L, "error queuing process" );
793 return 2;
794 }
795
796 lua_pushboolean( L, TRUE );
797 return 1;
798}
799
800/* queue a lua process sending a message without a matching receiver */
801void luaproc_queue_sender( luaproc lp ) {
802 /* add the sending process to this process' send queue */
803 eina_clist_add_tail( channel_get_sendq( lp->chan ), &(lp->node));
804}
805
806/* dequeue a lua process sending a message with a receiver match */
807luaproc luaproc_dequeue_sender( channel chan ) {
808
809 luaproc lp;
810
811 if ( eina_clist_count( channel_get_sendq( chan )) > 0 ) {
812 /* get first node from channel's send queue */
813 if ((lp = (luaproc) eina_clist_head(channel_get_sendq( chan ))))
814 eina_clist_remove(&(lp->node));
815 /* return associated luaproc */
816 return lp;
817 }
818
819 return NULL;
820}
821
822/* queue a luc process receiving a message without a matching sender */
823void luaproc_queue_receiver( luaproc lp ) {
824 /* add the receiving process to this process' receive queue */
825 eina_clist_add_tail( channel_get_recvq( lp->chan ), &(lp->node));
826}
827
828/* dequeue a lua process receiving a message with a sender match */
829luaproc luaproc_dequeue_receiver( channel chan ) {
830
831 luaproc lp;
832
833 if ( eina_clist_count( channel_get_recvq( chan )) > 0 ) {
834 /* get first node from channel's recv queue */
835 if ((lp = (luaproc) eina_clist_head(channel_get_recvq( chan ))))
836 eina_clist_remove(&(lp->node));
837 /* return associated luaproc */
838 return lp;
839 }
840
841 return NULL;
842}
843
844/* moves values between lua states' stacks */
845void luaproc_movevalues( lua_State *Lfrom, lua_State *Lto ) {
846
847 int i;
848 int n = lua_gettop( Lfrom );
849
850 /* move values between lua states' stacks */
851 for ( i = 2; i <= n; i++ ) {
852 lua_pushstring( Lto, lua_tostring( Lfrom, i ));
853 }
854}
855
856/* return the lua process associated with a given lua state */
857luaproc luaproc_getself( lua_State *L ) {
858 luaproc lp;
859 lua_getfield( L, LUA_REGISTRYINDEX, "_SELF" );
860 lp = (luaproc )lua_touserdata( L, -1 );
861 lua_pop( L, 1 );
862 return lp;
863}
864
865/* send a message to a lua process */
866static int luaproc_send_back( lua_State *L ) {
867
868 luaproc self;
869 const char *message = luaL_checkstring( L, 1 );
870
871 self = luaproc_getself( L );
872 if (self && self->callback && self->data)
873 {
874 scriptMessage *sm = calloc(1, sizeof(scriptMessage));
875
876 if (sm)
877 {
878 sm->script = self->data;
879 strcpy((char *) sm->message, message);
880 ecore_main_loop_thread_safe_call_async(self->callback, sm);
881 }
882 }
883
884 return 0;
885}
886
887/* error messages for the sendToChannel function */
888const char *sendToChannelErrors[] =
889{
890 "non-existent channel",
891 "error scheduling process"
892};
893
894// TODO - If these come in too quick, then messages might get lost. Also, in at least one case, it locked up this thread I think.
895
896/* send a message to a lua process */
897const char *sendToChannel(const char *chname, const char *message, luaproc *dst, channel *chn)
898{
899 const char *result = NULL;
900 channel chan;
901 luaproc dstlp;
902
903 /* get exclusive access to operate on channels */
904 pthread_mutex_lock(&mutex_channel);
905
906 /* wait until channel is not in use */
907 while( ((chan = channel_search(chname)) != NULL) && (pthread_mutex_trylock(channel_get_mutex(chan)) != 0 ))
908 {
909 pthread_cond_wait(channel_get_cond(chan), &mutex_channel);
910 }
911
912 /* free access to operate on channels */
913 pthread_mutex_unlock(&mutex_channel);
914
915 /* if channel is not found, return an error */
916 if (chan == NULL)
917 return sendToChannelErrors[0];
918
919 /* try to find a matching receiver */
920 dstlp = luaproc_dequeue_receiver(chan);
921
922 /* if a match is found, send the message to it and (queue) wake it */
923 if (dstlp != NULL)
924 {
925 /* push the message onto the receivers stack */
926 lua_pushstring( dstlp->lstate, message);
927
928 dstlp->args = lua_gettop(dstlp->lstate) - 1;
929
930 if (sched_queue_proc(dstlp) != LUAPROC_SCHED_QUEUE_PROC_OK)
931 {
932 /* unlock channel access */
933 luaproc_unlock_channel(chan);
934
935 /* decrease active luaproc count */
936 sched_lpcount_dec();
937
938 /* close lua_State */
939 lua_close(dstlp->lstate);
940 return sendToChannelErrors[1];
941 }
942
943 /* unlock channel access */
944 luaproc_unlock_channel(chan);
945 }
946 else if (dst)
947 dst = &dstlp;
948
949 if (chn)
950 chn = &chan;
951 return result;
952}
953
954/* send a message to a lua process */
955static int luaproc_send( lua_State *L ) {
956
957 channel chan;
958 luaproc dstlp, self;
959 const char *chname = luaL_checkstring( L, 1 );
960 const char *message = luaL_checkstring( L, 2 );
961 const char *result = sendToChannel(chname, message, &dstlp, &chan);
962
963 if (result) {
964 lua_pushnil( L );
965 lua_pushstring( L, result );
966 return 2;
967 }
968
969 if ( dstlp == NULL ) {
970
971 self = luaproc_getself( L );
972
973 if ( self != NULL ) {
974 self->stat = LUAPROC_STAT_BLOCKED_SEND;
975 self->chan = chan;
976 }
977
978 /* just yield the lua process, channel unlocking will be done by the scheduler */
979 return lua_yield( L, lua_gettop( L ));
980 }
981
982 lua_pushboolean( L, TRUE );
983 return 1;
984}
985
986/* receive a message from a lua process */
987static int luaproc_receive( lua_State *L ) {
988
989 channel chan;
990 luaproc srclp, self;
991 const char *chname = luaL_checkstring( L, 1 );
992
993 /* get exclusive access to operate on channels */
994 pthread_mutex_lock( &mutex_channel );
995
996 /* wait until channel is not in use */
997 while((( chan = channel_search( chname )) != NULL ) && ( pthread_mutex_trylock( channel_get_mutex( chan )) != 0 )) {
998 pthread_cond_wait( channel_get_cond( chan ), &mutex_channel );
999 }
1000
1001 /* free access to operate on channels */
1002 pthread_mutex_unlock( &mutex_channel );
1003
1004 /* if channel is not found, free access to operate on channels and return an error to Lua */
1005 if ( chan == NULL ) {
1006 lua_pushnil( L );
1007 lua_pushstring( L, "non-existent channel" );
1008 return 2;
1009 }
1010
1011 /* try to find a matching sender */
1012 srclp = luaproc_dequeue_sender( chan );
1013
1014 /* if a match is found, get values from it and (queue) wake it */
1015 if ( srclp != NULL ) {
1016
1017 /* move values between Lua states' stacks */
1018 luaproc_movevalues( srclp->lstate, L );
1019
1020 /* return to sender indicanting message was sent */
1021 lua_pushboolean( srclp->lstate, TRUE );
1022 srclp->args = 1;
1023
1024 if ( sched_queue_proc( srclp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) {
1025
1026 /* unlock channel access */
1027 luaproc_unlock_channel( chan );
1028
1029 /* decrease active luaproc count */
1030 sched_lpcount_dec();
1031
1032 /* close lua_State */
1033 lua_close( srclp->lstate );
1034 lua_pushnil( L );
1035 lua_pushstring( L, "error scheduling process" );
1036 return 2;
1037 }
1038
1039 /* unlock channel access */
1040 luaproc_unlock_channel( chan );
1041
1042 return lua_gettop( L ) - 1;
1043 }
1044
1045 /* otherwise queue (block) the receiving process (sync) or return immediatly (async) */
1046 else {
1047
1048 /* if trying an asynchronous receive, unlock channel access and return an error */
1049 if ( lua_toboolean( L, 2 )) {
1050 /* unlock channel access */
1051 luaproc_unlock_channel( chan );
1052 /* return an error */
1053 lua_pushnil( L );
1054 lua_pushfstring( L, "no senders waiting on channel %s", chname );
1055 return 2;
1056 }
1057
1058 /* otherwise (synchronous receive) simply block process */
1059 else {
1060 self = luaproc_getself( L );
1061
1062 if ( self != NULL ) {
1063 self->stat = LUAPROC_STAT_BLOCKED_RECV;
1064 self->chan = chan;
1065 }
1066
1067 /* just yield the lua process, channel unlocking will be done by the scheduler */
1068 return lua_yield( L, lua_gettop( L ));
1069 }
1070 }
1071}
1072
1073void luaprocInit(void)
1074{
1075 /* initialize recycle list */
1076 eina_clist_init(&recyclelp);
1077
1078 /* initialize local scheduler */
1079 sched_init_local( LUAPROC_SCHED_DEFAULT_WORKER_THREADS );
1080}
1081
1082void luaprocRegister(lua_State *L)
1083{
1084 /* register luaproc functions */
1085 luaL_register( L, "luaproc", luaproc_funcs_parent );
1086}
1087
1088LUALIB_API int luaopen_luaproc( lua_State *L ) {
1089 luaprocRegister(L);
1090 luaprocInit();
1091 return 0;
1092}
1093
1094/* return a process' status */
1095int luaproc_get_status( luaproc lp ) {
1096 return lp->stat;
1097}
1098
1099/* set a process' status */
1100void luaproc_set_status( luaproc lp, int status ) {
1101 lp->stat = status;
1102}
1103
1104/* return a process' state */
1105lua_State *luaproc_get_state( luaproc lp ) {
1106 return lp->lstate;
1107}
1108
1109/* return the number of arguments expected by a given process */
1110int luaproc_get_args( luaproc lp ) {
1111 return lp->args;
1112}
1113
1114/* set the number of arguments expected by a given process */
1115void luaproc_set_args( luaproc lp, int n ) {
1116 lp->args = n;
1117}
1118
1119/* create a new channel */
1120static int luaproc_create_channel( lua_State *L ) {
1121
1122 const char *chname = luaL_checkstring( L, 1 );
1123
1124 /* get exclusive access to operate on channels */
1125 pthread_mutex_lock( &mutex_channel );
1126
1127 /* check if channel exists */
1128 if ( channel_search( chname ) != NULL ) {
1129 /* free access to operate on channels */
1130 pthread_mutex_unlock( &mutex_channel );
1131 /* return an error to lua */
1132 lua_pushnil( L );
1133 lua_pushstring( L, "channel already exists" );
1134 return 2;
1135 }
1136
1137 channel_create( chname );
1138
1139 /* free access to operate on channels */
1140 pthread_mutex_unlock( &mutex_channel );
1141
1142 lua_pushboolean( L, TRUE );
1143
1144 return 1;
1145
1146}
1147
1148/* destroy a channel */
1149static int luaproc_destroy_channel( lua_State *L ) {
1150
1151 channel chan;
1152 luaproc lp;
1153 pthread_mutex_t *chmutex;
1154 pthread_cond_t *chcond;
1155 const char *chname = luaL_checkstring( L, 1 );
1156
1157
1158 /* get exclusive access to operate on channels */
1159 pthread_mutex_lock( &mutex_channel );
1160
1161 /* wait until channel is not in use */
1162 while((( chan = channel_search( chname )) != NULL ) && ( pthread_mutex_trylock( channel_get_mutex( chan )) != 0 )) {
1163 pthread_cond_wait( channel_get_cond( chan ), &mutex_channel );
1164 }
1165
1166 /* free access to operate on channels */
1167 pthread_mutex_unlock( &mutex_channel );
1168
1169 /* if channel is not found, return an error to Lua */
1170 if ( chan == NULL ) {
1171 lua_pushnil( L );
1172 lua_pushstring( L, "non-existent channel" );
1173 return 2;
1174 }
1175
1176 /* get channel's mutex and conditional pointers */
1177 chmutex = channel_get_mutex( chan );
1178 chcond = channel_get_cond( chan );
1179
1180 /* search for processes waiting to send a message on this channel */
1181 while (( lp = (luaproc) eina_clist_head( channel_get_sendq( chan ))) != NULL ) {
1182 eina_clist_remove(&(lp->node));
1183
1184 /* return an error so the processe knows the channel was destroyed before the message was sent */
1185 lua_settop( lp->lstate, 0 );
1186 lua_pushnil( lp->lstate );
1187 lua_pushstring( lp->lstate, "channel destroyed while waiting for receiver" );
1188 lp->args = 2;
1189
1190 /* schedule the process for execution */
1191 if ( sched_queue_proc( lp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) {
1192
1193 /* decrease active luaproc count */
1194 sched_lpcount_dec();
1195
1196 /* close lua_State */
1197 lua_close( lp->lstate );
1198 }
1199 }
1200
1201 /* search for processes waiting to receive a message on this channel */
1202 while (( lp = (luaproc) eina_clist_head( channel_get_recvq( chan ))) != NULL ) {
1203 eina_clist_remove(&(lp->node));
1204
1205 /* return an error so the processe knows the channel was destroyed before the message was received */
1206 lua_settop( lp->lstate, 0 );
1207 lua_pushnil( lp->lstate );
1208 lua_pushstring( lp->lstate, "channel destroyed while waiting for sender" );
1209 lp->args = 2;
1210
1211 /* schedule the process for execution */
1212 if ( sched_queue_proc( lp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) {
1213
1214 /* decrease active luaproc count */
1215 sched_lpcount_dec();
1216
1217 /* close lua_State */
1218 lua_close( lp->lstate );
1219 }
1220 }
1221
1222 /* get exclusive access to operate on channels */
1223 pthread_mutex_lock( &mutex_channel );
1224 /* destroy channel */
1225 channel_destroy( chan, chname );
1226 /* broadcast channel not in use */
1227 pthread_cond_broadcast( chcond );
1228 /* unlock channel access */
1229 pthread_mutex_unlock( chmutex );
1230 /* destroy channel mutex and conditional */
1231 pthread_mutex_destroy( chmutex );
1232 pthread_cond_destroy( chcond );
1233 /* free memory used by channel mutex and conditional */
1234 free( chmutex );
1235 free( chcond );
1236 /* free access to operate on channels */
1237 pthread_mutex_unlock( &mutex_channel );
1238
1239 lua_pushboolean( L, TRUE );
1240
1241 return 1;
1242}
1243
1244/* register luaproc's functions in a lua_State */
1245void luaproc_register_funcs( lua_State *L ) {
1246 luaL_register( L, "luaproc", luaproc_funcs_child );
1247}
1248
1249/* return the channel where the corresponding luaproc is blocked at */
1250channel luaproc_get_channel( luaproc lp ) {
1251 return lp->chan;
1252}
1253
1254/* unlock access to a channel */
1255void luaproc_unlock_channel( channel chan ) {
1256 /* get exclusive access to operate on channels */
1257 pthread_mutex_lock( &mutex_channel );
1258 /* unlock channel access */
1259 pthread_mutex_unlock( channel_get_mutex( chan ));
1260 /* signal channel not in use */
1261 pthread_cond_signal( channel_get_cond( chan ));
1262 /* free access to operate on channels */
1263 pthread_mutex_unlock( &mutex_channel );
1264}
1265
1266/* return status (boolean) indicating if worker thread should be destroyed after luaproc execution */
1267int luaproc_get_destroyworker( luaproc lp ) {
1268 return lp->destroyworker;
1269}