From de650eb6965e6c6798f404084c34dca0644c39ff Mon Sep 17 00:00:00 2001 From: David Walter Seikel Date: Sat, 25 Feb 2012 03:36:02 +1000 Subject: Strip out some of the waste of space, including unacademicifying luaproc. Also, replace the channel functions with a real hash. --- LuaSL/src/LuaSL_threads.c | 555 +++++++++++++--------------------------------- LuaSL/src/LuaSL_threads.h | 2 +- 2 files changed, 160 insertions(+), 397 deletions(-) (limited to 'LuaSL') diff --git a/LuaSL/src/LuaSL_threads.c b/LuaSL/src/LuaSL_threads.c index a8a1681..399d077 100644 --- a/LuaSL/src/LuaSL_threads.c +++ b/LuaSL/src/LuaSL_threads.c @@ -70,8 +70,6 @@ THE SOFTWARE. #define LUAPROC_STAT_BLOCKED_SEND 2 /* process is blocked on receive */ #define LUAPROC_STAT_BLOCKED_RECV 3 -/* process is finished */ -#define LUAPROC_STAT_FINISHED 4 /* message channel */ @@ -86,7 +84,7 @@ struct stchannel { struct stluaproc { Eina_Clist node; lua_State *lstate; - int stat; + int status; int args; channel chan; void *data; @@ -100,8 +98,8 @@ struct stluaproc { /* global channel lua_State mutex */ pthread_mutex_t mutex_channel_lstate = PTHREAD_MUTEX_INITIALIZER; -/* global lua_State where channel hash table will be stored */ -lua_State *chanls = NULL; +/* global where channels will be stored */ +Eina_Hash *channels; /* ready process list */ Eina_Clist lpready; @@ -133,9 +131,6 @@ pthread_mutex_t mutex_recycle_list = PTHREAD_MUTEX_INITIALIZER; /* recycled lua process list */ Eina_Clist recyclelp; -/* maximum lua processes to recycle */ -int recyclemax = 0; - /****************************** * library functions prototypes @@ -165,241 +160,70 @@ static const struct luaL_reg luaproc_funcs_child[] = { }; - - -/* initialize channel table */ -static void channel_init( void ) { - chanls = luaL_newstate(); - lua_newtable( chanls ); - lua_setglobal( chanls, "channeltb" ); -} - -/* create new channel */ -static channel channel_create( const char *cname ) { - - channel chan; - - /* get exclusive access to the channel table */ - pthread_mutex_lock( &mutex_channel_lstate ); - - /* create a new channel */ - lua_getglobal( chanls, "channeltb"); - lua_pushstring( chanls, cname ); - chan = (channel )lua_newuserdata( chanls, sizeof( struct stchannel )); - eina_clist_init(&(chan->send)); - eina_clist_init(&(chan->recv)); - chan->mutex = (pthread_mutex_t *)malloc( sizeof( pthread_mutex_t )); - pthread_mutex_init( chan->mutex, NULL ); - chan->in_use = (pthread_cond_t *)malloc( sizeof( pthread_cond_t )); - pthread_cond_init( chan->in_use, NULL ); - lua_settable( chanls, -3 ); - lua_pop( chanls, 1 ); - - /* let others access the channel table */ - pthread_mutex_unlock( &mutex_channel_lstate ); - - return chan; -} - -/* search for and return a channel with a given name */ -static channel channel_search( const char *cname ) { - - channel chan; - - /* get exclusive access to the channel table */ - pthread_mutex_lock( &mutex_channel_lstate ); - - /* search for channel */ - lua_getglobal( chanls, "channeltb"); - lua_getfield( chanls, -1, cname ); - if (( lua_type( chanls, -1 )) == LUA_TUSERDATA ) { - chan = (channel )lua_touserdata( chanls, -1 ); - } else { - chan = NULL; - } - lua_pop( chanls, 2 ); - - /* let others access channel table */ - pthread_mutex_unlock( &mutex_channel_lstate ); - - return chan; -} - -/* return a channel's send queue */ -static Eina_Clist *channel_get_sendq( channel chan ) { - return &chan->send; -} - -/* return a channel's receive queue */ -static Eina_Clist *channel_get_recvq( channel chan ) { - return &chan->recv; -} - -/* return a channel's mutex */ -static pthread_mutex_t *channel_get_mutex( channel chan ) { - return chan->mutex; -} - -/* return a channel's conditional variable */ -static pthread_cond_t *channel_get_cond( channel chan ) { - return chan->in_use; -} - - - -/* return status (boolean) indicating if lua process should be recycled */ -static luaproc luaproc_recycle_pop( void ) { - - luaproc lp; - - /* get exclusive access to operate on recycle list */ - pthread_mutex_lock( &mutex_recycle_list ); - - /* check if there are any lua processes on recycle list */ - if ( eina_clist_count( &recyclelp ) > 0 ) { - /* pop list head */ - if ((lp = (luaproc) eina_clist_head(&recyclelp))) - eina_clist_remove(&(lp->node)); - /* free access to operate on recycle list */ - pthread_mutex_unlock( &mutex_recycle_list ); - /* return associated luaproc */ - return lp; - } - - /* free access to operate on recycle list */ - pthread_mutex_unlock( &mutex_recycle_list ); - - /* if no lua processes are available simply return null */ - return NULL; -} - -/* check if lua process should be recycled and, in case so, add it to the recycle list */ -static int luaproc_recycle_push( luaproc lp ) { - - /* get exclusive access to operate on recycle list */ - pthread_mutex_lock( &mutex_recycle_list ); - - /* check if amount of lua processes currently on recycle list is greater than - or equal to the maximum amount of lua processes that should be recycled */ - if ( eina_clist_count( &recyclelp ) >= recyclemax ) { - /* free access to operate on recycle list */ - pthread_mutex_unlock( &mutex_recycle_list ); - /* if so, lua process should NOT be recycled and should be destroyed */ - return FALSE; - } - /* otherwise, lua process should be added to recycle list */ - eina_clist_add_tail( &recyclelp, &(lp->node) ); - /* free access to operate on recycle list */ - pthread_mutex_unlock( &mutex_recycle_list ); - /* since lua process will be recycled, it should not be destroyed */ - return TRUE; -} - /* queue a lua process sending a message without a matching receiver */ -static void luaproc_queue_sender( luaproc lp ) { - /* add the sending process to this process' send queue */ - eina_clist_add_tail( channel_get_sendq( lp->chan ), &(lp->node)); +static void luaproc_queue_sender(luaproc lp) +{ + eina_clist_add_tail(&(lp->chan->send), &(lp->node)); } /* dequeue a lua process sending a message with a receiver match */ -static luaproc luaproc_dequeue_sender( channel chan ) { - - luaproc lp; +static luaproc luaproc_dequeue_sender(channel chan) +{ + luaproc lp; - if ( eina_clist_count( channel_get_sendq( chan )) > 0 ) { - /* get first node from channel's send queue */ - if ((lp = (luaproc) eina_clist_head(channel_get_sendq( chan )))) - eina_clist_remove(&(lp->node)); - /* return associated luaproc */ - return lp; - } + if ((lp = (luaproc) eina_clist_head(&(chan->send)))) + eina_clist_remove(&(lp->node)); - return NULL; + return lp; } /* queue a luc process receiving a message without a matching sender */ -static void luaproc_queue_receiver( luaproc lp ) { - /* add the receiving process to this process' receive queue */ - eina_clist_add_tail( channel_get_recvq( lp->chan ), &(lp->node)); +static void luaproc_queue_receiver(luaproc lp) +{ + eina_clist_add_tail(&(lp->chan->recv), &(lp->node)); } /* dequeue a lua process receiving a message with a sender match */ -static luaproc luaproc_dequeue_receiver( channel chan ) { - - luaproc lp; - - if ( eina_clist_count( channel_get_recvq( chan )) > 0 ) { - /* get first node from channel's recv queue */ - if ((lp = (luaproc) eina_clist_head(channel_get_recvq( chan )))) - eina_clist_remove(&(lp->node)); - /* return associated luaproc */ - return lp; - } - - return NULL; -} - -/* return a process' status */ -static int luaproc_get_status( luaproc lp ) { - return lp->stat; -} - -/* set a process' status */ -static void luaproc_set_status( luaproc lp, int status ) { - lp->stat = status; -} - -/* return a process' state */ -static lua_State *luaproc_get_state( luaproc lp ) { - return lp->lstate; -} - -/* return the number of arguments expected by a given process */ -static int luaproc_get_args( luaproc lp ) { - return lp->args; -} - -/* set the number of arguments expected by a given process */ -static void luaproc_set_args( luaproc lp, int n ) { - lp->args = n; -} +static luaproc luaproc_dequeue_receiver(channel chan) +{ + luaproc lp; + if ((lp = (luaproc) eina_clist_head(&(chan->recv)))) + eina_clist_remove(&(lp->node)); -/* return the channel where the corresponding luaproc is blocked at */ -static channel luaproc_get_channel( luaproc lp ) { - return lp->chan; + return lp; } /* unlock access to a channel */ -static void luaproc_unlock_channel( channel chan ) { - /* get exclusive access to operate on channels */ - pthread_mutex_lock( &mutex_channel ); - /* unlock channel access */ - pthread_mutex_unlock( channel_get_mutex( chan )); - /* signal channel not in use */ - pthread_cond_signal( channel_get_cond( chan )); - /* free access to operate on channels */ - pthread_mutex_unlock( &mutex_channel ); +static void luaproc_unlock_channel(channel chan) +{ + /* get exclusive access to operate on channels */ + pthread_mutex_lock(&mutex_channel); + /* unlock channel access */ + pthread_mutex_unlock(chan->mutex); + /* signal channel not in use */ + pthread_cond_signal(chan->in_use); + /* free access to operate on channels */ + pthread_mutex_unlock(&mutex_channel); } - - /* increase active lua process count */ -static void sched_lpcount_inc( void ) { - pthread_mutex_lock( &mutex_lp_count ); - lpcount++; - pthread_mutex_unlock( &mutex_lp_count ); +static void sched_lpcount_inc(void) +{ + pthread_mutex_lock(&mutex_lp_count); + lpcount++; + pthread_mutex_unlock(&mutex_lp_count); } /* decrease active lua process count */ -static void sched_lpcount_dec( void ) { - pthread_mutex_lock( &mutex_lp_count ); - lpcount--; - /* if count reaches zero, signal there are no more active processes */ - if ( lpcount == 0 ) { - pthread_cond_signal( &cond_no_active_lp ); - } - pthread_mutex_unlock( &mutex_lp_count ); +static void sched_lpcount_dec(void) +{ + pthread_mutex_lock(&mutex_lp_count); + lpcount--; + /* if count reaches zero, signal there are no more active processes */ + if (lpcount == 0) + pthread_cond_signal(&cond_no_active_lp); + pthread_mutex_unlock(&mutex_lp_count); } /* worker thread main function */ @@ -436,23 +260,17 @@ static void *workermain( void *args ) { pthread_mutex_unlock( &mutex_queue_access ); /* execute the lua code specified in the lua process struct */ - procstat = lua_resume( luaproc_get_state( lp ), luaproc_get_args( lp )); + procstat = lua_resume(lp->lstate, lp->args); /* reset the process argument count */ - luaproc_set_args( lp, 0 ); + lp->args = 0; - /* check if process finished its whole execution */ + /* check if process finished its whole execution, then recycle it */ if ( procstat == 0 ) { - /* set process status to finished */ - luaproc_set_status( lp, LUAPROC_STAT_FINISHED ); - - /* check if lua process should be recycled and, if not, destroy it */ - if ( luaproc_recycle_push( lp ) == FALSE ) { - lua_close( luaproc_get_state( lp )); - } - - /* decrease active lua process count */ + pthread_mutex_lock(&mutex_recycle_list); + eina_clist_add_tail(&recyclelp, &(lp->node)); + pthread_mutex_unlock(&mutex_recycle_list); sched_lpcount_dec(); } @@ -461,18 +279,18 @@ static void *workermain( void *args ) { else if ( procstat == LUA_YIELD ) { /* if so, further check if yield originated from an unmatched send/recv operation */ - if ( luaproc_get_status( lp ) == LUAPROC_STAT_BLOCKED_SEND ) { + if ( lp->status == LUAPROC_STAT_BLOCKED_SEND ) { /* queue blocked lua process on corresponding channel */ luaproc_queue_sender( lp ); /* unlock channel access */ - luaproc_unlock_channel( luaproc_get_channel( lp )); + luaproc_unlock_channel(lp->chan); } - else if ( luaproc_get_status( lp ) == LUAPROC_STAT_BLOCKED_RECV ) { + else if ( lp->status == LUAPROC_STAT_BLOCKED_RECV ) { /* queue blocked lua process on corresponding channel */ luaproc_queue_receiver( lp ); /* unlock channel access */ - luaproc_unlock_channel( luaproc_get_channel( lp )); + luaproc_unlock_channel(lp->chan); } /* or if yield resulted from an explicit call to coroutine.yield in the lua code being executed */ @@ -489,42 +307,15 @@ static void *workermain( void *args ) { /* check if there was any execution error (LUA_ERRRUN, LUA_ERRSYNTAX, LUA_ERRMEM or LUA_ERRERR) */ else { /* print error message */ - fprintf( stderr, "close lua_State (error: %s)\n", luaL_checkstring( luaproc_get_state( lp ), -1 )); + fprintf( stderr, "close lua_State (error: %s)\n", luaL_checkstring(lp->lstate, -1 )); /* close lua state */ - lua_close( luaproc_get_state( lp )); + lua_close(lp->lstate); /* decrease active lua process count */ sched_lpcount_dec(); } } } -/* local scheduler initialization */ -static int sched_init_local( int numworkers ) { - - int tid; - int workercount = 0; - pthread_t worker; - - /* initialize ready process list */ - eina_clist_init(&lpready); - - /* initialize channels */ - channel_init(); - - /* create initial worker threads */ - for ( tid = 0; tid < numworkers; tid++ ) { - if ( pthread_create( &worker, NULL, workermain, NULL ) == 0 ) { - workercount++; - } - } - - if ( workercount != numworkers ) { - return LUAPROC_SCHED_INIT_ERROR; - } - - return LUAPROC_SCHED_OK; -} - /* move process to ready queue (ie, schedule process) */ static int sched_queue_proc( luaproc lp ) { @@ -534,8 +325,7 @@ static int sched_queue_proc( luaproc lp ) { /* add process to ready queue */ eina_clist_add_tail(&lpready, &(lp->node)); - /* set process status to ready */ - luaproc_set_status( lp, LUAPROC_STAT_READY ); + lp->status = LUAPROC_STAT_READY; /* wake worker up */ pthread_cond_signal( &cond_wakeup_worker ); @@ -584,112 +374,63 @@ int sched_create_worker( void ) { return LUAPROC_SCHED_OK; } -static void openlibs( lua_State *L ) { - luaL_openlibs(L); -} - -/* create new luaproc */ -static luaproc luaproc_new( const char *code, int destroyflag, int file) { - - luaproc lp; - int ret; - /* create new lua state */ - lua_State *lpst = luaL_newstate( ); - /* store the luaproc struct in its own lua state */ - lp = (luaproc )lua_newuserdata( lpst, sizeof( struct stluaproc )); - lua_setfield( lpst, LUA_REGISTRYINDEX, "_SELF" ); - - eina_clist_element_init(&(lp->node)); - lp->lstate = lpst; - lp->stat = LUAPROC_STAT_IDLE; - lp->args = 0; - lp->chan = NULL; - - /* load standard libraries */ - openlibs( lpst ); - - /* register luaproc's own functions */ - luaL_register( lpst, "luaproc", luaproc_funcs_child ); - - /* load process' code */ - if (file) - ret = luaL_loadfile( lpst, code ); - else - ret = luaL_loadstring( lpst, code ); - /* in case of errors, destroy recently created lua process */ - if ( ret != 0 ) { - lua_close( lpst ); - return NULL; - } - - /* return recently created lua process */ - return lp; -} - -/* recycle a lua process */ -static luaproc luaproc_recycle( luaproc lp, const char *code, int file ) { - int ret; - /* reset struct members */ - lp->stat = LUAPROC_STAT_IDLE; - lp->args = 0; - lp->chan = NULL; - - /* load process' code */ - ret = luaL_loadstring( lp->lstate, code ); +void newProc(const char *code, int file, script *data) +{ + int ret; + luaproc lp; - /* in case of errors, destroy lua process */ - if ( ret != 0 ) { - lua_close( lp->lstate ); - return NULL; - } + // Try to recycle a Lua state, otherwise create one from scratch. + pthread_mutex_lock(&mutex_recycle_list); + /* pop list head */ + if ((lp = (luaproc) eina_clist_head(&recyclelp))) + eina_clist_remove(&(lp->node)); + pthread_mutex_unlock(&mutex_recycle_list); - /* return recycled lua process */ - return lp; -} + if (lp == NULL) + { + lua_State *lpst = luaL_newstate(); -int newProc(const char *code, int file, void *data) -{ - /* new lua process pointer */ - luaproc lp; + /* store the luaproc struct in its own Lua state */ + lp = (luaproc) lua_newuserdata(lpst, sizeof(struct stluaproc)); + lp->lstate = lpst; + lua_setfield(lp->lstate, LUA_REGISTRYINDEX, "_SELF"); + luaL_openlibs(lp->lstate); + luaL_register(lp->lstate, "luaproc", luaproc_funcs_child); + eina_clist_element_init(&(lp->node)); + } - /* check if existing lua process should be recycled to avoid new creation */ - lp = luaproc_recycle_pop( ); + lp->status = LUAPROC_STAT_IDLE; + lp->args = 0; + lp->chan = NULL; - /* if there is a lua process available on the recycle queue, recycle it */ - if ( lp != NULL ) { - lp = luaproc_recycle( lp, code, file ); - } - /* otherwise create a new one from scratch */ - else { - /* create new lua process with destroy worker flag set to false - (ie, conclusion of lua process will NOT result in worker thread destruction */ - lp = luaproc_new( code, FALSE, file ); - } + /* load process' code */ + if (file) + ret = luaL_loadfile(lp->lstate, code); + else + ret = luaL_loadstring(lp->lstate, code); - /* ensure process creation was successfull */ - if ( lp == NULL ) { - return 1; - } + /* in case of errors, destroy Lua process */ + if (ret != 0) + { + lua_close(lp->lstate); + lp = NULL; + } - /* Stash any data given to us. */ + if (lp) + { lp->data = data; - - /* increase active luaproc count */ sched_lpcount_inc(); /* schedule luaproc */ - if ( sched_queue_proc( lp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) { - printf( "[luaproc] error queueing Lua process\n" ); - /* decrease active luaproc count */ - sched_lpcount_dec(); - /* close lua_State */ - lua_close( lp->lstate ); - return 2; + if (sched_queue_proc(lp) != LUAPROC_SCHED_QUEUE_PROC_OK) + { + printf( "[luaproc] error queueing Lua process\n" ); + sched_lpcount_dec(); + lua_close(lp->lstate); } - - return 0; + } } /* moves values between lua states' stacks */ @@ -713,6 +454,49 @@ static luaproc luaproc_getself( lua_State *L ) { return lp; } +/* create a new channel */ +static int luaproc_create_channel(lua_State *L) +{ + const char *name = luaL_checkstring(L, 1); + channel chan; + + /* get exclusive access to operate on channels */ + pthread_mutex_lock(&mutex_channel); + + /* check if channel exists */ + if (eina_hash_find(channels, name) != NULL) + { + /* free access to operate on channels */ + pthread_mutex_unlock(&mutex_channel); + /* return an error to lua */ + lua_pushnil(L); + lua_pushstring(L, "channel already exists"); + return 2; + } + + /* get exclusive access to the channel table */ + pthread_mutex_lock(&mutex_channel_lstate); + + /* create a new channel */ + chan = (channel) calloc(1, sizeof(struct stchannel)); + eina_clist_init(&(chan->send)); + eina_clist_init(&(chan->recv)); + chan->mutex = (pthread_mutex_t *) malloc(sizeof(pthread_mutex_t)); + pthread_mutex_init( chan->mutex, NULL ); + chan->in_use = (pthread_cond_t *) malloc(sizeof(pthread_cond_t)); + pthread_cond_init(chan->in_use, NULL); + eina_hash_add(channels, name, chan); + + /* let others access the channel table */ + pthread_mutex_unlock(&mutex_channel_lstate); + + /* free access to operate on channels */ + pthread_mutex_unlock(&mutex_channel); + + lua_pushboolean(L, TRUE); + return 1; +} + /* send a message to a lua process */ static int luaproc_send_back( lua_State *L ) { @@ -755,9 +539,9 @@ const char *sendToChannel(const char *chname, const char *message, luaproc *dst, pthread_mutex_lock(&mutex_channel); /* wait until channel is not in use */ - while( ((chan = channel_search(chname)) != NULL) && (pthread_mutex_trylock(channel_get_mutex(chan)) != 0 )) + while( ((chan = eina_hash_find(channels, chname)) != NULL) && (pthread_mutex_trylock(chan->mutex) != 0 )) { - pthread_cond_wait(channel_get_cond(chan), &mutex_channel); + pthread_cond_wait(chan->in_use, &mutex_channel); } /* free access to operate on channels */ @@ -822,7 +606,7 @@ static int luaproc_send( lua_State *L ) { self = luaproc_getself( L ); if ( self != NULL ) { - self->stat = LUAPROC_STAT_BLOCKED_SEND; + self->status = LUAPROC_STAT_BLOCKED_SEND; self->chan = chan; } @@ -845,8 +629,8 @@ static int luaproc_receive( lua_State *L ) { pthread_mutex_lock( &mutex_channel ); /* wait until channel is not in use */ - while((( chan = channel_search( chname )) != NULL ) && ( pthread_mutex_trylock( channel_get_mutex( chan )) != 0 )) { - pthread_cond_wait( channel_get_cond( chan ), &mutex_channel ); + while((( chan = eina_hash_find(channels, chname)) != NULL ) && ( pthread_mutex_trylock(chan->mutex) != 0 )) { + pthread_cond_wait(chan->in_use, &mutex_channel ); } /* free access to operate on channels */ @@ -911,7 +695,7 @@ static int luaproc_receive( lua_State *L ) { self = luaproc_getself( L ); if ( self != NULL ) { - self->stat = LUAPROC_STAT_BLOCKED_RECV; + self->status = LUAPROC_STAT_BLOCKED_RECV; self->chan = chan; } @@ -923,38 +707,17 @@ static int luaproc_receive( lua_State *L ) { void luaprocInit(void) { - /* initialize recycle list */ - eina_clist_init(&recyclelp); - - /* initialize local scheduler */ - sched_init_local( LUAPROC_SCHED_DEFAULT_WORKER_THREADS ); -} - -/* create a new channel */ -static int luaproc_create_channel( lua_State *L ) { - - const char *chname = luaL_checkstring( L, 1 ); - - /* get exclusive access to operate on channels */ - pthread_mutex_lock( &mutex_channel ); - - /* check if channel exists */ - if ( channel_search( chname ) != NULL ) { - /* free access to operate on channels */ - pthread_mutex_unlock( &mutex_channel ); - /* return an error to lua */ - lua_pushnil( L ); - lua_pushstring( L, "channel already exists" ); - return 2; - } + eina_clist_init(&recyclelp); - channel_create( chname ); + int tid; + pthread_t worker; - /* free access to operate on channels */ - pthread_mutex_unlock( &mutex_channel ); - - lua_pushboolean( L, TRUE ); - - return 1; + eina_clist_init(&lpready); + channels = eina_hash_string_superfast_new(NULL); + /* create initial worker threads */ + for (tid = 0; tid < LUAPROC_SCHED_DEFAULT_WORKER_THREADS; tid++) + { + pthread_create( &worker, NULL, workermain, NULL); + } } diff --git a/LuaSL/src/LuaSL_threads.h b/LuaSL/src/LuaSL_threads.h index 9fc38ea..47f5a69 100644 --- a/LuaSL/src/LuaSL_threads.h +++ b/LuaSL/src/LuaSL_threads.h @@ -55,7 +55,7 @@ void luaprocInit(void); /* create a new worker pthread */ int sched_create_worker( void ); -int newProc(const char *code, int file, void *data); +void newProc(const char *code, int file, script *data); const char *sendToChannel(const char *chname, const char *message, luaproc *dst, channel *chn); /* join all worker threads and exit */ -- cgit v1.1