From bd82e0ebd6db7ab7a9577c9ec6d9a9a9c2d73db8 Mon Sep 17 00:00:00 2001 From: David Walter Seikel Date: Sat, 25 Feb 2012 09:43:05 +1000 Subject: Beat the script messsage passing into shape, mostly by knocking large chunks off. lol --- LuaSL/src/LuaSL_threads.c | 451 ++++++++++------------------------------------ 1 file changed, 94 insertions(+), 357 deletions(-) (limited to 'LuaSL/src/LuaSL_threads.c') diff --git a/LuaSL/src/LuaSL_threads.c b/LuaSL/src/LuaSL_threads.c index 4a7397e..aec5ed6 100644 --- a/LuaSL/src/LuaSL_threads.c +++ b/LuaSL/src/LuaSL_threads.c @@ -33,10 +33,6 @@ THE SOFTWARE. /* This is a redesign of luaproc. The design goals and notes - * * In general use EFL where it is useful. - * One fixed unique message channel per script. - * No need for channel.c / .h, we are not using that sort of arbitrary channels. - * FIFO queue on message channels, seems the C socket queue is not enough. - * On the other hand, could just peel messages of the socket queue, then shove them on the scripts queue. * Probably one fixed unique message channel per object, which each script in the object shares. * But might be better to handle that C side anyway. * Better integration with LuaSL. @@ -50,16 +46,9 @@ THE SOFTWARE. #include "LuaSL.h" -#define CHANNEL_MAX_NAME_LENGTH 255 - -#define CHANNEL_DESTROYED 0 - /* ready process queue insertion status */ #define LUAPROC_SCHED_QUEUE_PROC_OK 0 -#define LUAPROC_SCHED_QUEUE_PROC_ERR -1 - -/* scheduler default number of worker threads */ -#define LUAPROC_SCHED_DEFAULT_WORKER_THREADS 1 +//#define LUAPROC_SCHED_QUEUE_PROC_ERR -1 /* process is idle */ #define LUAPROC_STAT_IDLE 0 @@ -71,14 +60,6 @@ THE SOFTWARE. #define LUAPROC_STAT_BLOCKED_RECV 3 -/* message channel */ -struct stchannel { - Eina_Clist send; - Eina_Clist recv; - pthread_mutex_t *mutex; - pthread_cond_t *in_use; -}; - typedef struct { Eina_Clist node; @@ -89,12 +70,6 @@ typedef struct * globals *********/ -/* global channel lua_State mutex */ -pthread_mutex_t mutex_channel_lstate = PTHREAD_MUTEX_INITIALIZER; - -/* global where channels will be stored */ -Eina_Hash *channels; - /* ready process list */ Eina_Clist lpready; @@ -133,8 +108,6 @@ Eina_Clist recyclelp; static int luaproc_send( lua_State *L ); /* receive a message from a lua process */ static int luaproc_receive( lua_State *L ); -/* create a new channel */ -static int luaproc_create_channel( lua_State *L ); /* send a message back to the main loop */ static int luaproc_send_back( lua_State *L ); @@ -148,59 +121,11 @@ static const struct luaL_reg luaproc_funcs_parent[] = { static const struct luaL_reg luaproc_funcs_child[] = { { "send", luaproc_send }, { "receive", luaproc_receive }, - { "newchannel", luaproc_create_channel }, { "sendback", luaproc_send_back }, { NULL, NULL } }; -/* queue a lua process sending a message without a matching receiver */ -static void luaproc_queue_sender(script *lp) -{ - eina_clist_add_tail(&(lp->chan->send), &(lp->node)); -} - -/* dequeue a lua process sending a message with a receiver match */ -static script *luaproc_dequeue_sender(channel chan) -{ - script *lp; - - if ((lp = (script *) eina_clist_head(&(chan->send)))) - eina_clist_remove(&(lp->node)); - - return lp; -} - -/* queue a luc process receiving a message without a matching sender */ -static void luaproc_queue_receiver(script *lp) -{ - eina_clist_add_tail(&(lp->chan->recv), &(lp->node)); -} - -/* dequeue a lua process receiving a message with a sender match */ -static script *luaproc_dequeue_receiver(channel chan) -{ - script *lp; - - if ((lp = (script *) eina_clist_head(&(chan->recv)))) - eina_clist_remove(&(lp->node)); - - return lp; -} - -/* unlock access to a channel */ -static void luaproc_unlock_channel(channel chan) -{ - /* get exclusive access to operate on channels */ - pthread_mutex_lock(&mutex_channel); - /* unlock channel access */ - pthread_mutex_unlock(chan->mutex); - /* signal channel not in use */ - pthread_cond_signal(chan->in_use); - /* free access to operate on channels */ - pthread_mutex_unlock(&mutex_channel); -} - /* increase active lua process count */ static void sched_lpcount_inc(void) { @@ -255,7 +180,6 @@ static void *workermain( void *args ) { /* execute the lua code specified in the lua process struct */ procstat = lua_resume(lp->L, lp->args); - /* reset the process argument count */ lp->args = 0; @@ -273,8 +197,8 @@ static void *workermain( void *args ) { sched_lpcount_dec(); } lua_close(lp->L); -// if (lp->timer) -// ecore_timer_del(lp->timer); + if (lp->timer) + ecore_timer_del(lp->timer); free(lp); } @@ -282,22 +206,15 @@ static void *workermain( void *args ) { else if ( procstat == LUA_YIELD ) { /* if so, further check if yield originated from an unmatched send/recv operation */ - if ( lp->status == LUAPROC_STAT_BLOCKED_SEND ) { - /* queue blocked lua process on corresponding channel */ - luaproc_queue_sender( lp ); - /* unlock channel access */ - luaproc_unlock_channel(lp->chan); + if (lp->status == LUAPROC_STAT_BLOCKED_SEND) + { } - - else if ( lp->status == LUAPROC_STAT_BLOCKED_RECV ) { - /* queue blocked lua process on corresponding channel */ - luaproc_queue_receiver( lp ); - /* unlock channel access */ - luaproc_unlock_channel(lp->chan); + else if (lp->status == LUAPROC_STAT_BLOCKED_RECV) + { } - /* or if yield resulted from an explicit call to coroutine.yield in the lua code being executed */ - else { + else + { /* get exclusive access to the ready process queue */ pthread_mutex_lock( &mutex_queue_access ); /* re-insert the job at the end of the ready process queue */ @@ -306,9 +223,9 @@ static void *workermain( void *args ) { pthread_mutex_unlock( &mutex_queue_access ); } } - /* check if there was any execution error (LUA_ERRRUN, LUA_ERRSYNTAX, LUA_ERRMEM or LUA_ERRERR) */ - else { + else + { /* print error message */ fprintf( stderr, "close lua_State (error: %s)\n", luaL_checkstring(lp->L, -1 )); /* close lua state */ @@ -365,20 +282,16 @@ void sched_join_workerthreads( void ) { } /* create a new worker pthread */ -int sched_create_worker( void ) { - - pthread_t worker; - - /* create a new pthread */ - if ( pthread_create( &worker, NULL, workermain, NULL ) != 0 ) { - return LUAPROC_SCHED_PTHREAD_ERROR; - } +int sched_create_worker(void) +{ + pthread_t worker; - return LUAPROC_SCHED_OK; + /* create a new pthread */ + if (pthread_create( &worker, NULL, workermain, NULL ) != 0) + return LUAPROC_SCHED_PTHREAD_ERROR; + return LUAPROC_SCHED_OK; } - - void newProc(const char *code, int file, script *lp) { int ret; @@ -408,7 +321,6 @@ void newProc(const char *code, int file, script *lp) lp->status = LUAPROC_STAT_IDLE; lp->args = 0; - lp->chan = NULL; eina_clist_element_init(&(lp->node)); eina_clist_init(&(lp->messages)); @@ -439,18 +351,6 @@ void newProc(const char *code, int file, script *lp) } } -/* moves values between lua states' stacks */ -static void luaproc_movevalues( lua_State *Lfrom, lua_State *Lto ) { - - int i; - int n = lua_gettop( Lfrom ); - - /* move values between lua states' stacks */ - for ( i = 2; i <= n; i++ ) { - lua_pushstring( Lto, lua_tostring( Lfrom, i )); - } -} - /* return the lua process associated with a given lua state */ static script *luaproc_getself(lua_State *L) { @@ -462,70 +362,26 @@ static script *luaproc_getself(lua_State *L) return lp; } -/* create a new channel */ -static int luaproc_create_channel(lua_State *L) +/* send a message to the client process */ +static int luaproc_send_back(lua_State *L) { - const char *name = luaL_checkstring(L, 1); - channel chan; + script *self = luaproc_getself(L); + const char *message = luaL_checkstring(L, 1); - /* get exclusive access to operate on channels */ - pthread_mutex_lock(&mutex_channel); - - /* check if channel exists */ - if (eina_hash_find(channels, name) != NULL) + if (self) { - /* free access to operate on channels */ - pthread_mutex_unlock(&mutex_channel); - /* return an error to lua */ - lua_pushnil(L); - lua_pushstring(L, "channel already exists"); - return 2; - } - - /* get exclusive access to the channel table */ - pthread_mutex_lock(&mutex_channel_lstate); - - /* create a new channel */ - chan = (channel) calloc(1, sizeof(struct stchannel)); - eina_clist_init(&(chan->send)); - eina_clist_init(&(chan->recv)); - chan->mutex = (pthread_mutex_t *) malloc(sizeof(pthread_mutex_t)); - pthread_mutex_init( chan->mutex, NULL ); - chan->in_use = (pthread_cond_t *) malloc(sizeof(pthread_cond_t)); - pthread_cond_init(chan->in_use, NULL); - eina_hash_add(channels, name, chan); - - /* let others access the channel table */ - pthread_mutex_unlock(&mutex_channel_lstate); - - /* free access to operate on channels */ - pthread_mutex_unlock(&mutex_channel); - - lua_pushboolean(L, TRUE); - return 1; -} + scriptMessage *sm = calloc(1, sizeof(scriptMessage)); -/* send a message to the client process */ -static int luaproc_send_back( lua_State *L ) { - - script *self; - const char *message = luaL_checkstring( L, 1 ); - - self = luaproc_getself(L); - if (self) + if (sm) { - scriptMessage *sm = calloc(1, sizeof(scriptMessage)); - - if (sm) - { - eina_clist_element_init(&(sm->node)); - sm->script = self; - strcpy((char *) sm->message, message); - ecore_main_loop_thread_safe_call_async(scriptSendBack, sm); - } + eina_clist_element_init(&(sm->node)); + sm->script = self; + strcpy((char *) sm->message, message); + ecore_main_loop_thread_safe_call_async(scriptSendBack, sm); } + } - return 0; + return 0; } /* error messages for the sendToChannel function */ @@ -536,224 +392,105 @@ const char *sendToChannelErrors[] = }; /* send a message to a lua process */ -const char *sendToChannel(gameGlobals *game, const char *chname, const char *message, script **dst, channel *chn) +const char *sendToChannel(gameGlobals *game, const char *SID, const char *message) { const char *result = NULL; - channel chan; script *dstlp; - scriptMessage *sm = NULL; /* get exclusive access to operate on channels */ pthread_mutex_lock(&mutex_channel); // Add the message to the queue. - if ((dstlp = eina_hash_find(game->scripts, chname))) + if ((dstlp = eina_hash_find(game->scripts, SID))) { + scriptMessage *sm = NULL; + if ((sm = malloc(sizeof(scriptMessage)))) { sm->script = dstlp; strcpy((char *) sm->message, message); - eina_clist_add_tail(&(sm->script->messages), &(sm->node)); + eina_clist_add_tail(&(dstlp->messages), &(sm->node)); } - } - - /* wait until channel is not in use */ - while( ((chan = eina_hash_find(channels, chname)) != NULL) && (pthread_mutex_trylock(chan->mutex) != 0 )) - { - pthread_cond_wait(chan->in_use, &mutex_channel); - } - - /* free access to operate on channels */ - pthread_mutex_unlock(&mutex_channel); - - /* if channel is not found, return an error */ - if (chan == NULL) - return sendToChannelErrors[0]; - - /* try to find a matching receiver */ - dstlp = luaproc_dequeue_receiver(chan); - - /* if a match is found, send the message to it and (queue) wake it */ - if (dstlp != NULL) - { - scriptMessage *msg = (scriptMessage *) eina_clist_head(&(dstlp->messages)); - // See if there's a message on the queue. Note, this may not be the same as the incoming message, if there was already a queue. - if (msg) + /* if it's already waiting, send the next message to it and (queue) wake it */ + if (dstlp->status == LUAPROC_STAT_BLOCKED_RECV) { - eina_clist_remove(&(msg->node)); - message = msg->message; - } - /* push the message onto the receivers stack */ - lua_pushstring(dstlp->L, message); - dstlp->args = lua_gettop(dstlp->L) - 1; - if (msg) - free(msg); - - if (sched_queue_proc(dstlp) != LUAPROC_SCHED_QUEUE_PROC_OK) - { - /* unlock channel access */ - luaproc_unlock_channel(chan); + scriptMessage *msg = (scriptMessage *) eina_clist_head(&(dstlp->messages)); - /* decrease active luaproc count */ - sched_lpcount_dec(); + // See if there's a message on the queue. Note, this may not be the same as the incoming message, if there was already a queue. + if (msg) + { + eina_clist_remove(&(msg->node)); + message = msg->message; + } + /* push the message onto the receivers stack */ + lua_pushstring(dstlp->L, message); + dstlp->args = lua_gettop(dstlp->L) - 1; + if (msg) + free(msg); - /* close lua_State */ - lua_close(dstlp->L); - return sendToChannelErrors[1]; + if (sched_queue_proc(dstlp) != LUAPROC_SCHED_QUEUE_PROC_OK) + { + sched_lpcount_dec(); + lua_close(dstlp->L); + result = sendToChannelErrors[1]; + } } - - /* unlock channel access */ - luaproc_unlock_channel(chan); } - else if (dst) - *dst = dstlp; - if (chn) - chn = &chan; + pthread_mutex_unlock(&mutex_channel); + return result; } /* send a message to a lua process */ -static int luaproc_send( lua_State *L ) { - - channel chan; - script *dstlp, *self = luaproc_getself(L); - const char *chname = luaL_checkstring( L, 1 ); - const char *message = luaL_checkstring( L, 2 ); - const char *result = sendToChannel(self->game, chname, message, &dstlp, &chan); - - if (result) { - lua_pushnil( L ); - lua_pushstring( L, result ); - return 2; - } - - if ( dstlp == NULL ) { - - if ( self != NULL ) { - self->status = LUAPROC_STAT_BLOCKED_SEND; - self->chan = chan; - } +static int luaproc_send(lua_State *L) +{ + script *self = luaproc_getself(L); + const char *result = sendToChannel(self->game, luaL_checkstring(L, 1), luaL_checkstring(L, 2)); - /* just yield the lua process, channel unlocking will be done by the scheduler */ - return lua_yield( L, lua_gettop( L )); - } + if (result) + { + lua_pushnil(L); + lua_pushstring(L, result); + return 2; + } - lua_pushboolean( L, TRUE ); - return 1; + lua_pushboolean(L, TRUE); + return 1; } /* receive a message from a lua process */ -static int luaproc_receive( lua_State *L ) { - - channel chan; - script *srclp, *self; - const char *chname = luaL_checkstring( L, 1 ); - scriptMessage *msg; - - // First check if there are queued messages, and grab one. - self = luaproc_getself(L); - if ((msg = (scriptMessage *) eina_clist_head(&(self->messages)))) - { - eina_clist_remove(&(msg->node)); - lua_pushstring(L, msg->message); - free(msg); - return lua_gettop(L) - 1; - } - - /* get exclusive access to operate on channels */ - pthread_mutex_lock( &mutex_channel ); - - /* wait until channel is not in use */ - while((( chan = eina_hash_find(channels, chname)) != NULL ) && ( pthread_mutex_trylock(chan->mutex) != 0 )) { - pthread_cond_wait(chan->in_use, &mutex_channel ); - } - - /* free access to operate on channels */ - pthread_mutex_unlock( &mutex_channel ); - - /* if channel is not found, free access to operate on channels and return an error to Lua */ - if ( chan == NULL ) { - lua_pushnil( L ); - lua_pushstring( L, "non-existent channel" ); - return 2; - } - - /* try to find a matching sender */ - srclp = luaproc_dequeue_sender( chan ); - - /* if a match is found, get values from it and (queue) wake it */ - if ( srclp != NULL ) { - - /* move values between Lua states' stacks */ - luaproc_movevalues( srclp->L, L ); - - /* return to sender indicanting message was sent */ - lua_pushboolean( srclp->L, TRUE ); - srclp->args = 1; - - if ( sched_queue_proc( srclp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) { - - /* unlock channel access */ - luaproc_unlock_channel( chan ); - - /* decrease active luaproc count */ - sched_lpcount_dec(); - - /* close lua_State */ - lua_close( srclp->L ); - lua_pushnil( L ); - lua_pushstring( L, "error scheduling process" ); - return 2; - } - - /* unlock channel access */ - luaproc_unlock_channel( chan ); - - return lua_gettop( L ) - 1; - } - - /* otherwise queue (block) the receiving process (sync) or return immediatly (async) */ - else { - - /* if trying an asynchronous receive, unlock channel access and return an error */ - if ( lua_toboolean( L, 2 )) { - /* unlock channel access */ - luaproc_unlock_channel( chan ); - /* return an error */ - lua_pushnil( L ); - lua_pushfstring( L, "no senders waiting on channel %s", chname ); - return 2; - } - - /* otherwise (synchronous receive) simply block process */ - else { +static int luaproc_receive(lua_State *L) +{ + script *self; + const char *chname = luaL_checkstring(L, 1); + scriptMessage *msg; - if ( self != NULL ) { - self->status = LUAPROC_STAT_BLOCKED_RECV; - self->chan = chan; - } + // First check if there are queued messages, and grab one. + self = luaproc_getself(L); + if ((msg = (scriptMessage *) eina_clist_head(&(self->messages)))) + { + eina_clist_remove(&(msg->node)); + lua_pushstring(L, msg->message); + free(msg); + return lua_gettop(L) - 1; + } - /* just yield the lua process, channel unlocking will be done by the scheduler */ - return lua_yield( L, lua_gettop( L )); - } - } + /* if trying an asynchronous receive, return an error */ + if ( lua_toboolean( L, 2 )) + { + lua_pushnil(L); + lua_pushfstring(L, "no senders waiting on channel %s", chname); + return 2; + } + /* otherwise (synchronous receive) simply block process */ + self->status = LUAPROC_STAT_BLOCKED_RECV; + return lua_yield(L, lua_gettop(L)); } void luaprocInit(void) { eina_clist_init(&recyclelp); - - int tid; - pthread_t worker; - eina_clist_init(&lpready); - channels = eina_hash_string_superfast_new(NULL); - - /* create initial worker threads */ - for (tid = 0; tid < LUAPROC_SCHED_DEFAULT_WORKER_THREADS; tid++) - { - pthread_create( &worker, NULL, workermain, NULL); - } } -- cgit v1.1