From bd82e0ebd6db7ab7a9577c9ec6d9a9a9c2d73db8 Mon Sep 17 00:00:00 2001 From: David Walter Seikel Date: Sat, 25 Feb 2012 09:43:05 +1000 Subject: Beat the script messsage passing into shape, mostly by knocking large chunks off. lol --- LuaSL/src/LSL.lua | 12 +- LuaSL/src/LuaSL.h | 1 - LuaSL/src/LuaSL_main.c | 10 +- LuaSL/src/LuaSL_test.c | 12 +- LuaSL/src/LuaSL_threads.c | 451 ++++++++++------------------------------------ LuaSL/src/LuaSL_threads.h | 9 +- 6 files changed, 108 insertions(+), 387 deletions(-) diff --git a/LuaSL/src/LSL.lua b/LuaSL/src/LSL.lua index 3810abc..662c880 100644 --- a/LuaSL/src/LSL.lua +++ b/LuaSL/src/LSL.lua @@ -82,6 +82,7 @@ end events = {} +function start() paused = false end function stop() paused = true end function quit() running = false end @@ -761,26 +762,18 @@ function LSL.stateChange(x) end; function LSL.mainLoop(sid, name, x) - local status, errorMsg = luaproc.newchannel(sid) + local status, errorMsg local result SID = sid scriptName = name - LSL.EOF = "\n\n\n" -- Fix this up now. - if not status then - msg("Can't open the luaproc channel " .. sid .. " ERROR MESSAGE: " .. errorMsg) - return - end - LSL.stateChange(x); waitAndProcess(false) msg("Script quitting.") end --- Need a FIFO queue of incoming events. Which will be in the C main thread, coz that's listening on the socket for us. --- The ecore_con stuff ends up being a FIFO queue of the commands coming from OpenSim. So no worries. function waitAndProcess(returnWanted) local Type = "event" @@ -798,6 +791,7 @@ function waitAndProcess(returnWanted) else -- Set the functions environment to ours, for the protection of the script, coz loadstring sets it to the global environment instead. -- TODO - On the other hand, we will need the global environment when we call event handlers. So we should probably stash it around here somewhere. + -- Meh, seems to be working fine as it is. setfenv(result, getfenv(1)) status, result = pcall(result) if not status then diff --git a/LuaSL/src/LuaSL.h b/LuaSL/src/LuaSL.h index 057788b..086d1b7 100644 --- a/LuaSL/src/LuaSL.h +++ b/LuaSL/src/LuaSL.h @@ -84,7 +84,6 @@ struct _script boolean running; int status; int args; - channel chan; Eina_Clist messages; Ecore_Con_Client *client; Ecore_Timer *timer; diff --git a/LuaSL/src/LuaSL_main.c b/LuaSL/src/LuaSL_main.c index c03a827..f15ae4a 100644 --- a/LuaSL/src/LuaSL_main.c +++ b/LuaSL/src/LuaSL_main.c @@ -12,7 +12,7 @@ static Eina_Bool _sleep_timer_cb(void *data) gameGlobals *game = script->game; PD("Waking up %s", script->SID); - sendToChannel(game, script->SID, "return 0.0", NULL, NULL); + sendToChannel(game, script->SID, "return 0.0"); return ECORE_CALLBACK_CANCEL; } @@ -22,7 +22,7 @@ static Eina_Bool _timer_timer_cb(void *data) gameGlobals *game = script->game; PD("Timer for %s", script->SID); - sendToChannel(game, script->SID, "events.timer()", NULL, NULL); + sendToChannel(game, script->SID, "events.timer()"); return ECORE_CALLBACK_RENEW; } @@ -82,9 +82,9 @@ void scriptSendBack(void * data) while (isspace(*temp)) temp++; if ('1' == *temp) - sendToChannel(game, them->SID, "start()", NULL, NULL); + sendToChannel(game, them->SID, "start()"); else - sendToChannel(game, them->SID, "stop()", NULL, NULL); + sendToChannel(game, them->SID, "stop()"); PD("Stopped %s", them->fileName); } else @@ -191,7 +191,7 @@ static Eina_Bool _data(void *data, int type __UNUSED__, Ecore_Con_Event_Client_D { const char *status = NULL; - status = sendToChannel(game, SID, command, NULL, NULL); + status = sendToChannel(game, SID, command); if (status) PE("Error sending command %s to script %s : %s", command, SID, status); } diff --git a/LuaSL/src/LuaSL_test.c b/LuaSL/src/LuaSL_test.c index ec76a32..463ae3b 100644 --- a/LuaSL/src/LuaSL_test.c +++ b/LuaSL/src/LuaSL_test.c @@ -112,19 +112,17 @@ static Eina_Bool _timer_cb(void *data) { case 5 : { - // TODO - do it as one line, coz sendToChannel() locks up if I do them one at a time too quickly. - sendForth(game, me->SID, "events.detectedKeys({\"%s\"}); events.detectedNames({\"%s\"}); events.touch_start(1)", ownerKey, ownerName); -// sendForth(game, me->SID, "events.detectedKeys({\"%s\"})", ownerKey); -// sendForth(game, me->SID, "events.detectedNames({\"%s\"})", ownerName); -// sendForth(game, me->SID, "events.touch_start(1)"); + sendForth(game, me->SID, "events.detectedKeys({\"%s\"})", ownerKey); + sendForth(game, me->SID, "events.detectedNames({\"%s\"})", ownerName); + sendForth(game, me->SID, "events.touch_start(1)"); break; } - case 9+3 : + case 9 : { sendForth(game, me->SID, "quit()"); break; } - case 11+3 : + case 11 : { exit = TRUE; break; diff --git a/LuaSL/src/LuaSL_threads.c b/LuaSL/src/LuaSL_threads.c index 4a7397e..aec5ed6 100644 --- a/LuaSL/src/LuaSL_threads.c +++ b/LuaSL/src/LuaSL_threads.c @@ -33,10 +33,6 @@ THE SOFTWARE. /* This is a redesign of luaproc. The design goals and notes - * * In general use EFL where it is useful. - * One fixed unique message channel per script. - * No need for channel.c / .h, we are not using that sort of arbitrary channels. - * FIFO queue on message channels, seems the C socket queue is not enough. - * On the other hand, could just peel messages of the socket queue, then shove them on the scripts queue. * Probably one fixed unique message channel per object, which each script in the object shares. * But might be better to handle that C side anyway. * Better integration with LuaSL. @@ -50,16 +46,9 @@ THE SOFTWARE. #include "LuaSL.h" -#define CHANNEL_MAX_NAME_LENGTH 255 - -#define CHANNEL_DESTROYED 0 - /* ready process queue insertion status */ #define LUAPROC_SCHED_QUEUE_PROC_OK 0 -#define LUAPROC_SCHED_QUEUE_PROC_ERR -1 - -/* scheduler default number of worker threads */ -#define LUAPROC_SCHED_DEFAULT_WORKER_THREADS 1 +//#define LUAPROC_SCHED_QUEUE_PROC_ERR -1 /* process is idle */ #define LUAPROC_STAT_IDLE 0 @@ -71,14 +60,6 @@ THE SOFTWARE. #define LUAPROC_STAT_BLOCKED_RECV 3 -/* message channel */ -struct stchannel { - Eina_Clist send; - Eina_Clist recv; - pthread_mutex_t *mutex; - pthread_cond_t *in_use; -}; - typedef struct { Eina_Clist node; @@ -89,12 +70,6 @@ typedef struct * globals *********/ -/* global channel lua_State mutex */ -pthread_mutex_t mutex_channel_lstate = PTHREAD_MUTEX_INITIALIZER; - -/* global where channels will be stored */ -Eina_Hash *channels; - /* ready process list */ Eina_Clist lpready; @@ -133,8 +108,6 @@ Eina_Clist recyclelp; static int luaproc_send( lua_State *L ); /* receive a message from a lua process */ static int luaproc_receive( lua_State *L ); -/* create a new channel */ -static int luaproc_create_channel( lua_State *L ); /* send a message back to the main loop */ static int luaproc_send_back( lua_State *L ); @@ -148,59 +121,11 @@ static const struct luaL_reg luaproc_funcs_parent[] = { static const struct luaL_reg luaproc_funcs_child[] = { { "send", luaproc_send }, { "receive", luaproc_receive }, - { "newchannel", luaproc_create_channel }, { "sendback", luaproc_send_back }, { NULL, NULL } }; -/* queue a lua process sending a message without a matching receiver */ -static void luaproc_queue_sender(script *lp) -{ - eina_clist_add_tail(&(lp->chan->send), &(lp->node)); -} - -/* dequeue a lua process sending a message with a receiver match */ -static script *luaproc_dequeue_sender(channel chan) -{ - script *lp; - - if ((lp = (script *) eina_clist_head(&(chan->send)))) - eina_clist_remove(&(lp->node)); - - return lp; -} - -/* queue a luc process receiving a message without a matching sender */ -static void luaproc_queue_receiver(script *lp) -{ - eina_clist_add_tail(&(lp->chan->recv), &(lp->node)); -} - -/* dequeue a lua process receiving a message with a sender match */ -static script *luaproc_dequeue_receiver(channel chan) -{ - script *lp; - - if ((lp = (script *) eina_clist_head(&(chan->recv)))) - eina_clist_remove(&(lp->node)); - - return lp; -} - -/* unlock access to a channel */ -static void luaproc_unlock_channel(channel chan) -{ - /* get exclusive access to operate on channels */ - pthread_mutex_lock(&mutex_channel); - /* unlock channel access */ - pthread_mutex_unlock(chan->mutex); - /* signal channel not in use */ - pthread_cond_signal(chan->in_use); - /* free access to operate on channels */ - pthread_mutex_unlock(&mutex_channel); -} - /* increase active lua process count */ static void sched_lpcount_inc(void) { @@ -255,7 +180,6 @@ static void *workermain( void *args ) { /* execute the lua code specified in the lua process struct */ procstat = lua_resume(lp->L, lp->args); - /* reset the process argument count */ lp->args = 0; @@ -273,8 +197,8 @@ static void *workermain( void *args ) { sched_lpcount_dec(); } lua_close(lp->L); -// if (lp->timer) -// ecore_timer_del(lp->timer); + if (lp->timer) + ecore_timer_del(lp->timer); free(lp); } @@ -282,22 +206,15 @@ static void *workermain( void *args ) { else if ( procstat == LUA_YIELD ) { /* if so, further check if yield originated from an unmatched send/recv operation */ - if ( lp->status == LUAPROC_STAT_BLOCKED_SEND ) { - /* queue blocked lua process on corresponding channel */ - luaproc_queue_sender( lp ); - /* unlock channel access */ - luaproc_unlock_channel(lp->chan); + if (lp->status == LUAPROC_STAT_BLOCKED_SEND) + { } - - else if ( lp->status == LUAPROC_STAT_BLOCKED_RECV ) { - /* queue blocked lua process on corresponding channel */ - luaproc_queue_receiver( lp ); - /* unlock channel access */ - luaproc_unlock_channel(lp->chan); + else if (lp->status == LUAPROC_STAT_BLOCKED_RECV) + { } - /* or if yield resulted from an explicit call to coroutine.yield in the lua code being executed */ - else { + else + { /* get exclusive access to the ready process queue */ pthread_mutex_lock( &mutex_queue_access ); /* re-insert the job at the end of the ready process queue */ @@ -306,9 +223,9 @@ static void *workermain( void *args ) { pthread_mutex_unlock( &mutex_queue_access ); } } - /* check if there was any execution error (LUA_ERRRUN, LUA_ERRSYNTAX, LUA_ERRMEM or LUA_ERRERR) */ - else { + else + { /* print error message */ fprintf( stderr, "close lua_State (error: %s)\n", luaL_checkstring(lp->L, -1 )); /* close lua state */ @@ -365,20 +282,16 @@ void sched_join_workerthreads( void ) { } /* create a new worker pthread */ -int sched_create_worker( void ) { - - pthread_t worker; - - /* create a new pthread */ - if ( pthread_create( &worker, NULL, workermain, NULL ) != 0 ) { - return LUAPROC_SCHED_PTHREAD_ERROR; - } +int sched_create_worker(void) +{ + pthread_t worker; - return LUAPROC_SCHED_OK; + /* create a new pthread */ + if (pthread_create( &worker, NULL, workermain, NULL ) != 0) + return LUAPROC_SCHED_PTHREAD_ERROR; + return LUAPROC_SCHED_OK; } - - void newProc(const char *code, int file, script *lp) { int ret; @@ -408,7 +321,6 @@ void newProc(const char *code, int file, script *lp) lp->status = LUAPROC_STAT_IDLE; lp->args = 0; - lp->chan = NULL; eina_clist_element_init(&(lp->node)); eina_clist_init(&(lp->messages)); @@ -439,18 +351,6 @@ void newProc(const char *code, int file, script *lp) } } -/* moves values between lua states' stacks */ -static void luaproc_movevalues( lua_State *Lfrom, lua_State *Lto ) { - - int i; - int n = lua_gettop( Lfrom ); - - /* move values between lua states' stacks */ - for ( i = 2; i <= n; i++ ) { - lua_pushstring( Lto, lua_tostring( Lfrom, i )); - } -} - /* return the lua process associated with a given lua state */ static script *luaproc_getself(lua_State *L) { @@ -462,70 +362,26 @@ static script *luaproc_getself(lua_State *L) return lp; } -/* create a new channel */ -static int luaproc_create_channel(lua_State *L) +/* send a message to the client process */ +static int luaproc_send_back(lua_State *L) { - const char *name = luaL_checkstring(L, 1); - channel chan; + script *self = luaproc_getself(L); + const char *message = luaL_checkstring(L, 1); - /* get exclusive access to operate on channels */ - pthread_mutex_lock(&mutex_channel); - - /* check if channel exists */ - if (eina_hash_find(channels, name) != NULL) + if (self) { - /* free access to operate on channels */ - pthread_mutex_unlock(&mutex_channel); - /* return an error to lua */ - lua_pushnil(L); - lua_pushstring(L, "channel already exists"); - return 2; - } - - /* get exclusive access to the channel table */ - pthread_mutex_lock(&mutex_channel_lstate); - - /* create a new channel */ - chan = (channel) calloc(1, sizeof(struct stchannel)); - eina_clist_init(&(chan->send)); - eina_clist_init(&(chan->recv)); - chan->mutex = (pthread_mutex_t *) malloc(sizeof(pthread_mutex_t)); - pthread_mutex_init( chan->mutex, NULL ); - chan->in_use = (pthread_cond_t *) malloc(sizeof(pthread_cond_t)); - pthread_cond_init(chan->in_use, NULL); - eina_hash_add(channels, name, chan); - - /* let others access the channel table */ - pthread_mutex_unlock(&mutex_channel_lstate); - - /* free access to operate on channels */ - pthread_mutex_unlock(&mutex_channel); - - lua_pushboolean(L, TRUE); - return 1; -} + scriptMessage *sm = calloc(1, sizeof(scriptMessage)); -/* send a message to the client process */ -static int luaproc_send_back( lua_State *L ) { - - script *self; - const char *message = luaL_checkstring( L, 1 ); - - self = luaproc_getself(L); - if (self) + if (sm) { - scriptMessage *sm = calloc(1, sizeof(scriptMessage)); - - if (sm) - { - eina_clist_element_init(&(sm->node)); - sm->script = self; - strcpy((char *) sm->message, message); - ecore_main_loop_thread_safe_call_async(scriptSendBack, sm); - } + eina_clist_element_init(&(sm->node)); + sm->script = self; + strcpy((char *) sm->message, message); + ecore_main_loop_thread_safe_call_async(scriptSendBack, sm); } + } - return 0; + return 0; } /* error messages for the sendToChannel function */ @@ -536,224 +392,105 @@ const char *sendToChannelErrors[] = }; /* send a message to a lua process */ -const char *sendToChannel(gameGlobals *game, const char *chname, const char *message, script **dst, channel *chn) +const char *sendToChannel(gameGlobals *game, const char *SID, const char *message) { const char *result = NULL; - channel chan; script *dstlp; - scriptMessage *sm = NULL; /* get exclusive access to operate on channels */ pthread_mutex_lock(&mutex_channel); // Add the message to the queue. - if ((dstlp = eina_hash_find(game->scripts, chname))) + if ((dstlp = eina_hash_find(game->scripts, SID))) { + scriptMessage *sm = NULL; + if ((sm = malloc(sizeof(scriptMessage)))) { sm->script = dstlp; strcpy((char *) sm->message, message); - eina_clist_add_tail(&(sm->script->messages), &(sm->node)); + eina_clist_add_tail(&(dstlp->messages), &(sm->node)); } - } - - /* wait until channel is not in use */ - while( ((chan = eina_hash_find(channels, chname)) != NULL) && (pthread_mutex_trylock(chan->mutex) != 0 )) - { - pthread_cond_wait(chan->in_use, &mutex_channel); - } - - /* free access to operate on channels */ - pthread_mutex_unlock(&mutex_channel); - - /* if channel is not found, return an error */ - if (chan == NULL) - return sendToChannelErrors[0]; - - /* try to find a matching receiver */ - dstlp = luaproc_dequeue_receiver(chan); - - /* if a match is found, send the message to it and (queue) wake it */ - if (dstlp != NULL) - { - scriptMessage *msg = (scriptMessage *) eina_clist_head(&(dstlp->messages)); - // See if there's a message on the queue. Note, this may not be the same as the incoming message, if there was already a queue. - if (msg) + /* if it's already waiting, send the next message to it and (queue) wake it */ + if (dstlp->status == LUAPROC_STAT_BLOCKED_RECV) { - eina_clist_remove(&(msg->node)); - message = msg->message; - } - /* push the message onto the receivers stack */ - lua_pushstring(dstlp->L, message); - dstlp->args = lua_gettop(dstlp->L) - 1; - if (msg) - free(msg); - - if (sched_queue_proc(dstlp) != LUAPROC_SCHED_QUEUE_PROC_OK) - { - /* unlock channel access */ - luaproc_unlock_channel(chan); + scriptMessage *msg = (scriptMessage *) eina_clist_head(&(dstlp->messages)); - /* decrease active luaproc count */ - sched_lpcount_dec(); + // See if there's a message on the queue. Note, this may not be the same as the incoming message, if there was already a queue. + if (msg) + { + eina_clist_remove(&(msg->node)); + message = msg->message; + } + /* push the message onto the receivers stack */ + lua_pushstring(dstlp->L, message); + dstlp->args = lua_gettop(dstlp->L) - 1; + if (msg) + free(msg); - /* close lua_State */ - lua_close(dstlp->L); - return sendToChannelErrors[1]; + if (sched_queue_proc(dstlp) != LUAPROC_SCHED_QUEUE_PROC_OK) + { + sched_lpcount_dec(); + lua_close(dstlp->L); + result = sendToChannelErrors[1]; + } } - - /* unlock channel access */ - luaproc_unlock_channel(chan); } - else if (dst) - *dst = dstlp; - if (chn) - chn = &chan; + pthread_mutex_unlock(&mutex_channel); + return result; } /* send a message to a lua process */ -static int luaproc_send( lua_State *L ) { - - channel chan; - script *dstlp, *self = luaproc_getself(L); - const char *chname = luaL_checkstring( L, 1 ); - const char *message = luaL_checkstring( L, 2 ); - const char *result = sendToChannel(self->game, chname, message, &dstlp, &chan); - - if (result) { - lua_pushnil( L ); - lua_pushstring( L, result ); - return 2; - } - - if ( dstlp == NULL ) { - - if ( self != NULL ) { - self->status = LUAPROC_STAT_BLOCKED_SEND; - self->chan = chan; - } +static int luaproc_send(lua_State *L) +{ + script *self = luaproc_getself(L); + const char *result = sendToChannel(self->game, luaL_checkstring(L, 1), luaL_checkstring(L, 2)); - /* just yield the lua process, channel unlocking will be done by the scheduler */ - return lua_yield( L, lua_gettop( L )); - } + if (result) + { + lua_pushnil(L); + lua_pushstring(L, result); + return 2; + } - lua_pushboolean( L, TRUE ); - return 1; + lua_pushboolean(L, TRUE); + return 1; } /* receive a message from a lua process */ -static int luaproc_receive( lua_State *L ) { - - channel chan; - script *srclp, *self; - const char *chname = luaL_checkstring( L, 1 ); - scriptMessage *msg; - - // First check if there are queued messages, and grab one. - self = luaproc_getself(L); - if ((msg = (scriptMessage *) eina_clist_head(&(self->messages)))) - { - eina_clist_remove(&(msg->node)); - lua_pushstring(L, msg->message); - free(msg); - return lua_gettop(L) - 1; - } - - /* get exclusive access to operate on channels */ - pthread_mutex_lock( &mutex_channel ); - - /* wait until channel is not in use */ - while((( chan = eina_hash_find(channels, chname)) != NULL ) && ( pthread_mutex_trylock(chan->mutex) != 0 )) { - pthread_cond_wait(chan->in_use, &mutex_channel ); - } - - /* free access to operate on channels */ - pthread_mutex_unlock( &mutex_channel ); - - /* if channel is not found, free access to operate on channels and return an error to Lua */ - if ( chan == NULL ) { - lua_pushnil( L ); - lua_pushstring( L, "non-existent channel" ); - return 2; - } - - /* try to find a matching sender */ - srclp = luaproc_dequeue_sender( chan ); - - /* if a match is found, get values from it and (queue) wake it */ - if ( srclp != NULL ) { - - /* move values between Lua states' stacks */ - luaproc_movevalues( srclp->L, L ); - - /* return to sender indicanting message was sent */ - lua_pushboolean( srclp->L, TRUE ); - srclp->args = 1; - - if ( sched_queue_proc( srclp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) { - - /* unlock channel access */ - luaproc_unlock_channel( chan ); - - /* decrease active luaproc count */ - sched_lpcount_dec(); - - /* close lua_State */ - lua_close( srclp->L ); - lua_pushnil( L ); - lua_pushstring( L, "error scheduling process" ); - return 2; - } - - /* unlock channel access */ - luaproc_unlock_channel( chan ); - - return lua_gettop( L ) - 1; - } - - /* otherwise queue (block) the receiving process (sync) or return immediatly (async) */ - else { - - /* if trying an asynchronous receive, unlock channel access and return an error */ - if ( lua_toboolean( L, 2 )) { - /* unlock channel access */ - luaproc_unlock_channel( chan ); - /* return an error */ - lua_pushnil( L ); - lua_pushfstring( L, "no senders waiting on channel %s", chname ); - return 2; - } - - /* otherwise (synchronous receive) simply block process */ - else { +static int luaproc_receive(lua_State *L) +{ + script *self; + const char *chname = luaL_checkstring(L, 1); + scriptMessage *msg; - if ( self != NULL ) { - self->status = LUAPROC_STAT_BLOCKED_RECV; - self->chan = chan; - } + // First check if there are queued messages, and grab one. + self = luaproc_getself(L); + if ((msg = (scriptMessage *) eina_clist_head(&(self->messages)))) + { + eina_clist_remove(&(msg->node)); + lua_pushstring(L, msg->message); + free(msg); + return lua_gettop(L) - 1; + } - /* just yield the lua process, channel unlocking will be done by the scheduler */ - return lua_yield( L, lua_gettop( L )); - } - } + /* if trying an asynchronous receive, return an error */ + if ( lua_toboolean( L, 2 )) + { + lua_pushnil(L); + lua_pushfstring(L, "no senders waiting on channel %s", chname); + return 2; + } + /* otherwise (synchronous receive) simply block process */ + self->status = LUAPROC_STAT_BLOCKED_RECV; + return lua_yield(L, lua_gettop(L)); } void luaprocInit(void) { eina_clist_init(&recyclelp); - - int tid; - pthread_t worker; - eina_clist_init(&lpready); - channels = eina_hash_string_superfast_new(NULL); - - /* create initial worker threads */ - for (tid = 0; tid < LUAPROC_SCHED_DEFAULT_WORKER_THREADS; tid++) - { - pthread_create( &worker, NULL, workermain, NULL); - } } diff --git a/LuaSL/src/LuaSL_threads.h b/LuaSL/src/LuaSL_threads.h index b7f6449..78c6639 100644 --- a/LuaSL/src/LuaSL_threads.h +++ b/LuaSL/src/LuaSL_threads.h @@ -43,17 +43,10 @@ THE SOFTWARE. #define LUAPROC_SCHED_INIT_ERROR -7 -/* message channel pointer type */ -typedef struct stchannel *channel; - - void luaprocInit(void); - -/* create a new worker pthread */ int sched_create_worker(void); - void newProc(const char *code, int file, script *lp); -const char *sendToChannel(gameGlobals *game, const char *chname, const char *message, script **dst, channel *chn); +const char *sendToChannel(gameGlobals *game, const char *SID, const char *message); /* join all worker threads and exit */ void sched_join_workerthreads(void); -- cgit v1.1