From 6523585c66c04cea54df50013df8886b589847d8 Mon Sep 17 00:00:00 2001 From: David Walter Seikel Date: Mon, 23 Jan 2012 23:36:30 +1000 Subject: Add luaproc and LuaJIT libraries. Two versions of LuaJIT, the stable release, and the dev version. Try the dev version first, until ih fails badly. --- libraries/luaproc/COPYRIGHT | 32 + libraries/luaproc/Lua_multithreading_ry08-05.pdf | Bin 0 -> 135164 bytes libraries/luaproc/Makefile | 64 ++ libraries/luaproc/README | 97 +++ libraries/luaproc/channel.c | 151 ++++ libraries/luaproc/channel.h | 67 ++ libraries/luaproc/list.c | 241 +++++++ libraries/luaproc/list.h | 76 ++ libraries/luaproc/luaproc.c | 837 +++++++++++++++++++++++ libraries/luaproc/luaproc.h | 92 +++ libraries/luaproc/luaproc.lua | 34 + libraries/luaproc/sched.c | 314 +++++++++ libraries/luaproc/sched.h | 78 +++ libraries/luaproc/test.lua | 39 ++ 14 files changed, 2122 insertions(+) create mode 100644 libraries/luaproc/COPYRIGHT create mode 100644 libraries/luaproc/Lua_multithreading_ry08-05.pdf create mode 100644 libraries/luaproc/Makefile create mode 100644 libraries/luaproc/README create mode 100644 libraries/luaproc/channel.c create mode 100644 libraries/luaproc/channel.h create mode 100644 libraries/luaproc/list.c create mode 100644 libraries/luaproc/list.h create mode 100644 libraries/luaproc/luaproc.c create mode 100644 libraries/luaproc/luaproc.h create mode 100644 libraries/luaproc/luaproc.lua create mode 100644 libraries/luaproc/sched.c create mode 100644 libraries/luaproc/sched.h create mode 100644 libraries/luaproc/test.lua (limited to 'libraries/luaproc') diff --git a/libraries/luaproc/COPYRIGHT b/libraries/luaproc/COPYRIGHT new file mode 100644 index 0000000..5f3c3ab --- /dev/null +++ b/libraries/luaproc/COPYRIGHT @@ -0,0 +1,32 @@ +luaproc License +--------------- + +luaproc is licensed under the terms of the MIT license reproduced below. +This means that luaproc is free software and can be used for both academic +and commercial purposes at absolutely no cost. + +=============================================================================== + +Copyright (C) 2008 Alexandre Skyrme, Noemi Rodriguez, Roberto Ierusalimschy + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +=============================================================================== + +(end of COPYRIGHT) diff --git a/libraries/luaproc/Lua_multithreading_ry08-05.pdf b/libraries/luaproc/Lua_multithreading_ry08-05.pdf new file mode 100644 index 0000000..5caf694 Binary files /dev/null and b/libraries/luaproc/Lua_multithreading_ry08-05.pdf differ diff --git a/libraries/luaproc/Makefile b/libraries/luaproc/Makefile new file mode 100644 index 0000000..1efc2d5 --- /dev/null +++ b/libraries/luaproc/Makefile @@ -0,0 +1,64 @@ +#################################################### +# +# Copyright 2008 Alexandre Skyrme, Noemi Rodriguez, Roberto Ierusalimschy +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +###################################################### +# +# [Makefile] +# +###################################################### + +# path to lua header files +LUA_INC_PATH=/usr/include/lua5.1 +# path to lua library +LUA_LIB_PATH=/usr/lib/lua5.1 + +# standard makefile variables +CC=gcc +CFLAGS=-c -Wall -fPIC -I${LUA_INC_PATH} +LDFLAGS=-shared -L${LUA_LIB_PATH} -lpthread +SOURCES=sched.c list.c luaproc.c channel.c +OBJECTS=${SOURCES:.c=.o} +LIB=luaproc.so + +all: ${SOURCES} ${LIB} + +${LIB}: ${OBJECTS} + ${CC} ${OBJECTS} -o $@ ${LDFLAGS} + +sched.o: sched.c sched.h list.h luaproc.h channel.h + ${CC} ${CFLAGS} sched.c + +list.o: list.c list.h + ${CC} ${CFLAGS} list.c + +luaproc.o: luaproc.c luaproc.h list.h sched.h channel.h + ${CC} ${CFLAGS} luaproc.c + +channel.o: channel.c channel.h list.h + ${CC} ${CFLAGS} channel.c + +clean: + rm -f ${OBJECTS} ${LIB} + +test: + lua test.lua + diff --git a/libraries/luaproc/README b/libraries/luaproc/README new file mode 100644 index 0000000..bc37dfe --- /dev/null +++ b/libraries/luaproc/README @@ -0,0 +1,97 @@ +*************************************************** +* +* Copyright 2008 Alexandre Skyrme, Noemi Rodriguez, Roberto Ierusalimschy +* +* Permission is hereby granted, free of charge, to any person obtaining a copy +* of this software and associated documentation files (the "Software"), to deal +* in the Software without restriction, including without limitation the rights +* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the Software is +* furnished to do so, subject to the following conditions: +* +* The above copyright notice and this permission notice shall be included in +* all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +* THE SOFTWARE. +* +***************************************************** +* +* [README] +* +**************************************************** + + +************** +* PARENT API * +************** + +-- Create a new lua process +-- Returns true if sucessful or nil, error_message if failed +luaproc.newproc( ) + +-- Create a new worker (pthread) +-- Returns true if sucessful or nil, error_message if failed +luaproc.createworker( ) + +-- Destroy a worker (pthread) +-- Returns true if sucessful or nil, error_message if failed +luaproc.destroyworker( ) + +-- Synchronize workers (pthreads) and exit after all lua processes have ended +-- No return, finishes execution. +luaproc.exit( ) + +-- Set maximum lua processes that should be recycled (default = 0) +-- Returns true if sucessful or nil, error_message if failed +luaproc.recycle( ) + +************************************************************ +* CHILD API * +* Available only to processes spawned * +* with luaproc.newproc * +************************************************************ + +-- Create a new lua process +-- Returns true if sucessful or nil, error_message if failed +luaproc.newproc( ) + +-- Create a new worker (pthread) +-- Returns true if sucessful or nil, error_message if failed +luaproc.createworker( ) + +-- Destroy a worker (pthread) +-- Returns true if sucessful or nil, error_message if failed +luaproc.destroyworker( ) + +-- Send a message on a channel +-- Returns true if sucessful or nil, error_message if failed +-- Results in blocking if there is no matching receive +luaproc.send( , , + [string msg2], [string msg3], ... ) + +-- Receive a message on a channel +-- Returns message string(s) if sucessful or nil, error_message if failed +-- Results in blocking if there is no matching send +-- and the asynchronous flag is not set (nil) or set to false +luaproc.receive( , [boolean asynchronous] ) + +-- Create a new channel +-- Returns true if sucessful or nil, error_message if failed +luaproc.newchannel( ) + +-- Destroy a channel +-- Returns true if sucessful or nil, error_message if failed +luaproc.delchannel( ) + +-- Set maximum lua processes that should be recycled (default = 0) +-- Returns true if sucessful or nil, error_message if failed +luaproc.recycle( ) + +<> = mandatory arguments +[] = optional arguments diff --git a/libraries/luaproc/channel.c b/libraries/luaproc/channel.c new file mode 100644 index 0000000..ef4ab4b --- /dev/null +++ b/libraries/luaproc/channel.c @@ -0,0 +1,151 @@ +/*************************************************** + +Copyright 2008 Alexandre Skyrme, Noemi Rodriguez, Roberto Ierusalimschy + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +***************************************************** + +[channel.c] + +****************************************************/ + +#include +#include +#include +#include +#include +#include + +#include "channel.h" +#include "list.h" + +/* global channel lua_State mutex */ +pthread_mutex_t mutex_channel_lstate = PTHREAD_MUTEX_INITIALIZER; + +/* global lua_State where channel hash table will be stored */ +lua_State *chanls = NULL; + +/* message channel */ +struct stchannel { + list send; + list recv; + pthread_mutex_t *mutex; + pthread_cond_t *in_use; +}; + +/* initialize channel table */ +void channel_init( void ) { + chanls = luaL_newstate(); + lua_newtable( chanls ); + lua_setglobal( chanls, "channeltb" ); +} + +/* create new channel */ +channel channel_create( const char *cname ) { + + channel chan; + + /* get exclusive access to the channel table */ + pthread_mutex_lock( &mutex_channel_lstate ); + + /* create a new channel */ + lua_getglobal( chanls, "channeltb"); + lua_pushstring( chanls, cname ); + chan = (channel )lua_newuserdata( chanls, sizeof( struct stchannel )); + chan->send = list_new(); + chan->recv = list_new(); + chan->mutex = (pthread_mutex_t *)malloc( sizeof( pthread_mutex_t )); + pthread_mutex_init( chan->mutex, NULL ); + chan->in_use = (pthread_cond_t *)malloc( sizeof( pthread_cond_t )); + pthread_cond_init( chan->in_use, NULL ); + lua_settable( chanls, -3 ); + lua_pop( chanls, 1 ); + + /* let others access the channel table */ + pthread_mutex_unlock( &mutex_channel_lstate ); + + return chan; +} + +/* destroy a channel */ +int channel_destroy( channel chan, const char *chname ) { + + /* get exclusive access to the channel table */ + pthread_mutex_lock( &mutex_channel_lstate ); + + list_destroy( chan->send ); + list_destroy( chan->recv ); + + lua_getglobal( chanls, "channeltb"); + lua_pushstring( chanls, chname ); + lua_pushnil( chanls ); + lua_settable( chanls, -3 ); + lua_pop( chanls, 1 ); + + /* let others access the channel table */ + pthread_mutex_unlock( &mutex_channel_lstate ); + + return CHANNEL_DESTROYED; +} + +/* search for and return a channel with a given name */ +channel channel_search( const char *cname ) { + + channel chan; + + /* get exclusive access to the channel table */ + pthread_mutex_lock( &mutex_channel_lstate ); + + /* search for channel */ + lua_getglobal( chanls, "channeltb"); + lua_getfield( chanls, -1, cname ); + if (( lua_type( chanls, -1 )) == LUA_TUSERDATA ) { + chan = (channel )lua_touserdata( chanls, -1 ); + } else { + chan = NULL; + } + lua_pop( chanls, 2 ); + + /* let others access channel table */ + pthread_mutex_unlock( &mutex_channel_lstate ); + + return chan; +} + +/* return a channel's send queue */ +list channel_get_sendq( channel chan ) { + return chan->send; +} + +/* return a channel's receive queue */ +list channel_get_recvq( channel chan ) { + return chan->recv; +} + +/* return a channel's mutex */ +pthread_mutex_t *channel_get_mutex( channel chan ) { + return chan->mutex; +} + +/* return a channel's conditional variable */ +pthread_cond_t *channel_get_cond( channel chan ) { + return chan->in_use; +} + diff --git a/libraries/luaproc/channel.h b/libraries/luaproc/channel.h new file mode 100644 index 0000000..1cced9e --- /dev/null +++ b/libraries/luaproc/channel.h @@ -0,0 +1,67 @@ +/*************************************************** + +Copyright 2008 Alexandre Skyrme, Noemi Rodriguez, Roberto Ierusalimschy + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +***************************************************** + +[channel.h] + +****************************************************/ + +#ifndef _CHANNEL_H_ +#define _CHANNEL_H_ + +#include + +#include "list.h" + +#define CHANNEL_MAX_NAME_LENGTH 255 + +#define CHANNEL_DESTROYED 0 + +/* message channel pointer type */ +typedef struct stchannel *channel; + +/* initialize channels */ +void channel_init( void ); + +/* create new channel */ +channel channel_create( const char *cname ); + +/* destroy a channel */ +int channel_destroy( channel chan, const char *chname ); + +/* search for and return a channel with a given name */ +channel channel_search( const char *cname ); + +/* return a channel's send queue */ +list channel_get_sendq( channel chan ); + +/* return a channel's receive queue */ +list channel_get_recvq( channel chan ); + +/* return a channel's mutex */ +pthread_mutex_t *channel_get_mutex( channel chan ); + +/* return a channel's conditional variable */ +pthread_cond_t *channel_get_cond( channel chan ); + +#endif diff --git a/libraries/luaproc/list.c b/libraries/luaproc/list.c new file mode 100644 index 0000000..0088695 --- /dev/null +++ b/libraries/luaproc/list.c @@ -0,0 +1,241 @@ +/*************************************************** + +Copyright 2008 Alexandre Skyrme, Noemi Rodriguez, Roberto Ierusalimschy + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +***************************************************** + +[list.c] + +****************************************************/ +#include +#include + +#include "list.h" + +/* linked list node */ +struct stnode { + void *data; + struct stnode *next; + struct stnode *prev; +}; + +/* linked list */ +struct stlist { + node head; + node tail; + int nodes; +}; + +/* create new (empty) list */ +list list_new( void ) { + list lst; + lst = (list )malloc( sizeof( struct stlist )); + if ( lst == NULL ) { + return lst; + } + lst->head = NULL; + lst->tail = NULL; + lst->nodes = 0; + return lst; +} + +/* create new node */ +node list_new_node( void *data ) { + node n; + n = (node )malloc( sizeof( struct stnode )); + if ( n == NULL ) { + return n; + } + n->data = data; + n->next = NULL; + n->prev = NULL; + return n; +} + +/* add node to list */ +node list_add( list lst, node n ) { + + /* void list or node */ + if (( lst == NULL ) || ( n == NULL )) { + return NULL; + } + + /* list is empty */ + if ( lst->head == NULL ) { + lst->head = n; + lst->tail = n; + } + + /* list is _not_ empty */ + else { + lst->tail->next = n; + n->prev = lst->tail; + lst->tail = n; + } + + lst->nodes++; + return n; +} + +/* search for a node */ +node list_search( list lst, void *data ) { + + node nitr; + + /* check if list is null or empty */ + if (( lst == NULL ) || ( lst->head == NULL )) { + return NULL; + } + + /* look for node between first and last nodes (first and last are included) */ + for ( nitr = lst->head; nitr != NULL; nitr = nitr->next ) { + if ( nitr->data == data ) { + /* node found, return it */ + return nitr; + } + } + + /* node not found */ + return NULL; +} + +/* remove node from list */ +void list_remove( list lst, node n ) { + + node nitr; + + /* check if list or node are null and if list is empty */ + if (( lst == NULL ) || ( n == NULL ) || ( lst->head == NULL )) { + return; + } + + /* check if node is list's head */ + if ( lst->head == n ) { + lst->head = n->next; + /* if so, also check if it's the only node in the list */ + if ( lst->tail == n ) { + lst->tail = n->next; + } + else { + lst->head->prev = NULL; + } + free( n ); + lst->nodes--; + return; + } + + /* look for node between first and last nodes (first and last are excluded) */ + for ( nitr = lst->head->next; nitr != lst->tail; nitr = nitr->next ) { + if ( nitr == n ) { + n->prev->next = n->next; + n->next->prev = n->prev; + free( n ); + lst->nodes--; + return; + } + } + + /* check if node is list's tail */ + if ( lst->tail == n ) { + lst->tail = n->prev; + n->prev->next = n->next; + free( n ); + lst->nodes--; + return; + } + + return; +} + +/* list_destroy */ +void list_destroy( list lst ) { + + /* empty list */ + if ( lst == NULL ) { + return; + } + + /* non-empty list */ + while ( lst->head != NULL ) { + list_remove( lst, lst->head ); + } + + free( lst ); +} + +/* return list's first node */ +node list_head( list lst ) { + if ( lst != NULL ) { + return lst->head; + } + return NULL; +} + +/* return node's next node */ +node list_next( node n ) { + if ( n != NULL ) { + return n->next; + } + return NULL; +} + +/* return a node's data */ +void *list_data( node n ) { + if ( n != NULL ) { + return n->data; + } + return NULL; +} + +/* pop the head node from the list */ +node list_pop_head( list lst ) { + node ntmp; + if (( lst == NULL ) || ( lst->head == NULL )) { + return NULL; + } + + ntmp = lst->head; + if ( lst->head == lst->tail ) { + lst->head = NULL; + lst->tail = NULL; + } else { + lst->head = ntmp->next; + ntmp->next->prev = NULL; + } + ntmp->next = NULL; + ntmp->prev = NULL; + lst->nodes--; + return ntmp; +} + +/* destroy a node */ +void list_destroy_node( node n ) { + free( n ); +} + +/* return a list's node count */ +int list_node_count( list lst ) { + if ( lst != NULL ) { + return lst->nodes; + } + return LIST_COUNT_ERROR; +} + diff --git a/libraries/luaproc/list.h b/libraries/luaproc/list.h new file mode 100644 index 0000000..6aa21c0 --- /dev/null +++ b/libraries/luaproc/list.h @@ -0,0 +1,76 @@ +/*************************************************** + +Copyright 2008 Alexandre Skyrme, Noemi Rodriguez, Roberto Ierusalimschy + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +***************************************************** + +[list.h] + +****************************************************/ + +#ifndef _LIST_H_ +#define _LIST_H_ + +#define LIST_COUNT_ERROR -1 + +/* node pointer type */ +typedef struct stnode *node; + +/* list pointer type */ +typedef struct stlist *list; + +/* create new (empty) list */ +list list_new( void ); + +/* create new node */ +node list_new_node( void *data ); + +/* add node to list */ +node list_add( list lst, node n ); + +/* search for a node */ +node list_search( list lst, void *data ); + +/* remove node from list */ +void list_remove( list lst, node n ); + +/* destroy list */ +void list_destroy( list lst ); + +/* return list's first node */ +node list_head( list lst ); + +/* return node's next node */ +node list_next( node n ); + +/* return a node's data */ +void *list_data( node n ); + +/* pop the head node from the list */ +node list_pop_head( list lst ); + +/* destroy a node */ +void list_destroy_node( node n ); + +/* return a list's node count */ +int list_node_count( list lst ); + +#endif diff --git a/libraries/luaproc/luaproc.c b/libraries/luaproc/luaproc.c new file mode 100644 index 0000000..2ec5b31 --- /dev/null +++ b/libraries/luaproc/luaproc.c @@ -0,0 +1,837 @@ +/*************************************************** + +Copyright 2008 Alexandre Skyrme, Noemi Rodriguez, Roberto Ierusalimschy + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +***************************************************** + +[luaproc.c] + +****************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "luaproc.h" +#include "list.h" +#include "sched.h" +#include "channel.h" + +#define FALSE 0 +#define TRUE 1 + +/********* +* globals +*********/ + +/* channel operations mutex */ +pthread_mutex_t mutex_channel = PTHREAD_MUTEX_INITIALIZER; + +/* recycle list mutex */ +pthread_mutex_t mutex_recycle_list = PTHREAD_MUTEX_INITIALIZER; + +/* recycled lua process list */ +list recyclelp = NULL; + +/* maximum lua processes to recycle */ +int recyclemax = 0; + +/* lua process */ +struct stluaproc { + lua_State *lstate; + int stat; + int args; + channel chan; + int destroyworker; +}; + +/****************************** +* library functions prototypes +******************************/ +/* create a new lua process */ +static int luaproc_create_newproc( lua_State *L ); +/* send a message to a lua process */ +static int luaproc_send( lua_State *L ); +/* receive a message from a lua process */ +static int luaproc_receive( lua_State *L ); +/* create a new channel */ +static int luaproc_create_channel( lua_State *L ); +/* destroy a channel */ +static int luaproc_destroy_channel( lua_State *L ); +/* wait until all luaprocs have finished and exit */ +static int luaproc_exit( lua_State *L ); +/* create a new worker */ +static int luaproc_create_worker( lua_State *L ); +/* destroy a worker */ +static int luaproc_destroy_worker( lua_State *L ); +/* set amount of lua processes that should be recycled (ie, reused) */ +static int luaproc_recycle_set( lua_State *L ); + +/* luaproc function registration array - main (parent) functions */ +static const struct luaL_reg luaproc_funcs_parent[] = { + { "newproc", luaproc_create_newproc }, + { "exit", luaproc_exit }, + { "createworker", luaproc_create_worker }, + { "destroyworker", luaproc_destroy_worker }, + { "recycle", luaproc_recycle_set }, + { NULL, NULL } +}; + +/* luaproc function registration array - newproc (child) functions */ +static const struct luaL_reg luaproc_funcs_child[] = { + { "newproc", luaproc_create_newproc }, + { "send", luaproc_send }, + { "receive", luaproc_receive }, + { "newchannel", luaproc_create_channel }, + { "delchannel", luaproc_destroy_channel }, + { "createworker", luaproc_create_worker }, + { "destroyworker", luaproc_destroy_worker }, + { "recycle", luaproc_recycle_set }, + { NULL, NULL } +}; + +static void registerlib( lua_State *L, const char *name, lua_CFunction f ) { + lua_getglobal( L, "package" ); + lua_getfield( L, -1, "preload" ); + lua_pushcfunction( L, f ); + lua_setfield( L, -2, name ); + lua_pop( L, 2 ); +} + +static void openlibs( lua_State *L ) { + lua_cpcall( L, luaopen_base, NULL ); + lua_cpcall( L, luaopen_package, NULL ); + registerlib( L, "io", luaopen_io ); + registerlib( L, "os", luaopen_os ); + registerlib( L, "table", luaopen_table ); + registerlib( L, "string", luaopen_string ); + registerlib( L, "math", luaopen_math ); + registerlib( L, "debug", luaopen_debug ); +} + +/* return status (boolean) indicating if lua process should be recycled */ +luaproc luaproc_recycle_pop( void ) { + + luaproc lp; + node n; + + /* get exclusive access to operate on recycle list */ + pthread_mutex_lock( &mutex_recycle_list ); + + /* check if there are any lua processes on recycle list */ + if ( list_node_count( recyclelp ) > 0 ) { + /* pop list head */ + n = list_pop_head( recyclelp ); + /* free access to operate on recycle list */ + pthread_mutex_unlock( &mutex_recycle_list ); + /* find associated luaproc */ + lp = (luaproc )list_data( n ); + /* destroy node (but not associated luaproc) */ + list_destroy_node( n ); + /* return associated luaproc */ + return lp; + } + + /* free access to operate on recycle list */ + pthread_mutex_unlock( &mutex_recycle_list ); + + /* if no lua processes are available simply return null */ + return NULL; +} + +/* check if lua process should be recycled and, in case so, add it to the recycle list */ +int luaproc_recycle_push( luaproc lp ) { + + node n; + + /* get exclusive access to operate on recycle list */ + pthread_mutex_lock( &mutex_recycle_list ); + + /* check if amount of lua processes currently on recycle list is greater than + or equal to the maximum amount of lua processes that should be recycled */ + if ( list_node_count( recyclelp ) >= recyclemax ) { + /* free access to operate on recycle list */ + pthread_mutex_unlock( &mutex_recycle_list ); + /* if so, lua process should NOT be recycled and should be destroyed */ + return FALSE; + } + /* otherwise, lua process should be added to recycle list */ + n = list_new_node( lp ); + if ( n == NULL ) { + /* free access to operate on recycle list */ + pthread_mutex_unlock( &mutex_recycle_list ); + /* in case of errors, lua process should be destroyed */ + return FALSE; + } + list_add( recyclelp, n ); + /* free access to operate on recycle list */ + pthread_mutex_unlock( &mutex_recycle_list ); + /* since lua process will be recycled, it should not be destroyed */ + return TRUE; +} + +/* create new luaproc */ +luaproc luaproc_new( const char *code, int destroyflag ) { + + luaproc lp; + int ret; + /* create new lua state */ + lua_State *lpst = luaL_newstate( ); + /* store the luaproc struct in its own lua state */ + lp = (luaproc )lua_newuserdata( lpst, sizeof( struct stluaproc )); + lua_setfield( lpst, LUA_REGISTRYINDEX, "_SELF" ); + + lp->lstate = lpst; + lp->stat = LUAPROC_STAT_IDLE; + lp->args = 0; + lp->chan = NULL; + lp->destroyworker = destroyflag; + + /* load standard libraries */ + openlibs( lpst ); + + /* register luaproc's own functions */ + luaL_register( lpst, "luaproc", luaproc_funcs_child ); + + /* load process' code */ + ret = luaL_loadstring( lpst, code ); + /* in case of errors, destroy recently created lua process */ + if ( ret != 0 ) { + lua_close( lpst ); + return NULL; + } + + /* return recently created lua process */ + return lp; +} + +/* synchronize worker threads and exit */ +static int luaproc_exit( lua_State *L ) { + sched_join_workerthreads( ); + return 0; +} + +/* create a new worker pthread */ +static int luaproc_create_worker( lua_State *L ) { + + if ( sched_create_worker( ) != LUAPROC_SCHED_OK ) { + lua_pushnil( L ); + lua_pushstring( L, "error creating worker" ); + return 2; + } + + lua_pushboolean( L, TRUE ); + return 1; +} + +/* set amount of lua processes that should be recycled (ie, reused) */ +static int luaproc_recycle_set( lua_State *L ) { + + node n; + luaproc lp; + int max = luaL_checkint( L, 1 ); + + /* check if function argument represents a reasonable value */ + if ( max < 0 ) { + /* in case of errors return nil + error msg */ + lua_pushnil( L ); + lua_pushstring( L, "error setting recycle limit to negative value" ); + return 2; + } + + /* get exclusive access to operate on recycle list */ + pthread_mutex_lock( &mutex_recycle_list ); + + /* set maximum lua processes that should be recycled */ + recyclemax = max; + + /* destroy recycle list excessive nodes (and corresponding lua processes) */ + while ( list_node_count( recyclelp ) > max ) { + /* get first node from recycle list */ + n = list_pop_head( recyclelp ); + /* find associated luaproc */ + lp = (luaproc )list_data( n ); + /* destroy node */ + list_destroy_node( n ); + /* close associated lua_State */ + lua_close( lp->lstate ); + } + + /* free access to operate on recycle list */ + pthread_mutex_unlock( &mutex_recycle_list ); + + lua_pushboolean( L, TRUE ); + return 1; +} + + +/* destroy a worker pthread */ +static int luaproc_destroy_worker( lua_State *L ) { + + /* new lua process pointer */ + luaproc lp; + + /* create new lua process with empty code and destroy worker flag set to true + (ie, conclusion of lua process WILL result in worker thread destruction */ + lp = luaproc_new( "", TRUE ); + + /* ensure process creation was successfull */ + if ( lp == NULL ) { + /* in case of errors return nil + error msg */ + lua_pushnil( L ); + lua_pushstring( L, "error destroying worker" ); + return 2; + } + + /* increase active luaproc count */ + sched_lpcount_inc(); + + /* schedule luaproc */ + if ( sched_queue_proc( lp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) { + printf( "[luaproc] error queueing Lua process\n" ); + /* decrease active luaproc count */ + sched_lpcount_dec(); + /* close lua_State */ + lua_close( lp->lstate ); + /* return nil + error msg */ + lua_pushnil( L ); + lua_pushstring( L, "error destroying worker" ); + return 2; + } + + lua_pushboolean( L, TRUE ); + return 1; +} + +/* recycle a lua process */ +luaproc luaproc_recycle( luaproc lp, const char *code ) { + + int ret; + + /* reset struct members */ + lp->stat = LUAPROC_STAT_IDLE; + lp->args = 0; + lp->chan = NULL; + lp->destroyworker = FALSE; + + /* load process' code */ + ret = luaL_loadstring( lp->lstate, code ); + + /* in case of errors, destroy lua process */ + if ( ret != 0 ) { + lua_close( lp->lstate ); + return NULL; + } + + /* return recycled lua process */ + return lp; +} + +/* create and schedule a new lua process (luaproc.newproc) */ +static int luaproc_create_newproc( lua_State *L ) { + + /* check if first argument is a string (lua code) */ + const char *code = luaL_checkstring( L, 1 ); + + /* new lua process pointer */ + luaproc lp; + + /* check if existing lua process should be recycled to avoid new creation */ + lp = luaproc_recycle_pop( ); + + /* if there is a lua process available on the recycle queue, recycle it */ + if ( lp != NULL ) { + lp = luaproc_recycle( lp, code ); + } + /* otherwise create a new one from scratch */ + else { + /* create new lua process with destroy worker flag set to false + (ie, conclusion of lua process will NOT result in worker thread destruction */ + lp = luaproc_new( code, FALSE ); + } + + /* ensure process creation was successfull */ + if ( lp == NULL ) { + /* in case of errors return nil + error msg */ + lua_pushnil( L ); + lua_pushstring( L, "error loading code string" ); + return 2; + } + + /* increase active luaproc count */ + sched_lpcount_inc(); + + /* schedule luaproc */ + if ( sched_queue_proc( lp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) { + printf( "[luaproc] error queueing Lua process\n" ); + /* decrease active luaproc count */ + sched_lpcount_dec(); + /* close lua_State */ + lua_close( lp->lstate ); + /* return nil + error msg */ + lua_pushnil( L ); + lua_pushstring( L, "error queuing process" ); + return 2; + } + + lua_pushboolean( L, TRUE ); + return 1; +} + +/* queue a lua process sending a message without a matching receiver */ +void luaproc_queue_sender( luaproc lp ) { + /* add the sending process to this process' send queue */ + list_add( channel_get_sendq( lp->chan ), list_new_node( lp )); +} + +/* dequeue a lua process sending a message with a receiver match */ +luaproc luaproc_dequeue_sender( channel chan ) { + + node n; + luaproc lp; + + if ( list_node_count( channel_get_sendq( chan )) > 0 ) { + /* get first node from channel's send queue */ + n = list_pop_head( channel_get_sendq( chan )); + /* find associated luaproc */ + lp = (luaproc )list_data( n ); + /* destroy node (but not associated luaproc) */ + list_destroy_node( n ); + /* return associated luaproc */ + return lp; + } + + return NULL; +} + +/* queue a luc process receiving a message without a matching sender */ +void luaproc_queue_receiver( luaproc lp ) { + /* add the receiving process to this process' receive queue */ + list_add( channel_get_recvq( lp->chan ), list_new_node( lp )); +} + +/* dequeue a lua process receiving a message with a sender match */ +luaproc luaproc_dequeue_receiver( channel chan ) { + + node n; + luaproc lp; + + if ( list_node_count( channel_get_recvq( chan )) > 0 ) { + /* get first node from channel's recv queue */ + n = list_pop_head( channel_get_recvq( chan )); + /* find associated luaproc */ + lp = (luaproc )list_data( n ); + /* destroy node (but not associated luaproc) */ + list_destroy_node( n ); + /* return associated luaproc */ + return lp; + } + + return NULL; +} + +/* moves values between lua states' stacks */ +void luaproc_movevalues( lua_State *Lfrom, lua_State *Lto ) { + + int i; + int n = lua_gettop( Lfrom ); + + /* move values between lua states' stacks */ + for ( i = 2; i <= n; i++ ) { + lua_pushstring( Lto, lua_tostring( Lfrom, i )); + } +} + +/* return the lua process associated with a given lua state */ +luaproc luaproc_getself( lua_State *L ) { + luaproc lp; + lua_getfield( L, LUA_REGISTRYINDEX, "_SELF" ); + lp = (luaproc )lua_touserdata( L, -1 ); + lua_pop( L, 1 ); + return lp; +} + +/* send a message to a lua process */ +static int luaproc_send( lua_State *L ) { + + channel chan; + luaproc dstlp, self; + const char *chname = luaL_checkstring( L, 1 ); + + /* get exclusive access to operate on channels */ + pthread_mutex_lock( &mutex_channel ); + + /* wait until channel is not in use */ + while((( chan = channel_search( chname )) != NULL ) && ( pthread_mutex_trylock( channel_get_mutex( chan )) != 0 )) { + pthread_cond_wait( channel_get_cond( chan ), &mutex_channel ); + } + + /* free access to operate on channels */ + pthread_mutex_unlock( &mutex_channel ); + + /* if channel is not found, return an error to Lua */ + if ( chan == NULL ) { + lua_pushnil( L ); + lua_pushstring( L, "non-existent channel" ); + return 2; + } + + /* try to find a matching receiver */ + dstlp = luaproc_dequeue_receiver( chan ); + + /* if a match is found, move values to it and (queue) wake it */ + if ( dstlp != NULL ) { + + /* move values between Lua states' stacks */ + luaproc_movevalues( L, dstlp->lstate ); + + dstlp->args = lua_gettop( dstlp->lstate ) - 1; + + if ( sched_queue_proc( dstlp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) { + + /* unlock channel access */ + luaproc_unlock_channel( chan ); + + /* decrease active luaproc count */ + sched_lpcount_dec(); + + /* close lua_State */ + lua_close( dstlp->lstate ); + lua_pushnil( L ); + lua_pushstring( L, "error scheduling process" ); + return 2; + } + + /* unlock channel access */ + luaproc_unlock_channel( chan ); + } + + /* otherwise queue (block) the sending process */ + else { + + self = luaproc_getself( L ); + + if ( self != NULL ) { + self->stat = LUAPROC_STAT_BLOCKED_SEND; + self->chan = chan; + } + + /* just yield the lua process, channel unlocking will be done by the scheduler */ + return lua_yield( L, lua_gettop( L )); + } + + lua_pushboolean( L, TRUE ); + return 1; +} + +/* receive a message from a lua process */ +static int luaproc_receive( lua_State *L ) { + + channel chan; + luaproc srclp, self; + const char *chname = luaL_checkstring( L, 1 ); + + /* get exclusive access to operate on channels */ + pthread_mutex_lock( &mutex_channel ); + + /* wait until channel is not in use */ + while((( chan = channel_search( chname )) != NULL ) && ( pthread_mutex_trylock( channel_get_mutex( chan )) != 0 )) { + pthread_cond_wait( channel_get_cond( chan ), &mutex_channel ); + } + + /* free access to operate on channels */ + pthread_mutex_unlock( &mutex_channel ); + + /* if channel is not found, free access to operate on channels and return an error to Lua */ + if ( chan == NULL ) { + lua_pushnil( L ); + lua_pushstring( L, "non-existent channel" ); + return 2; + } + + /* try to find a matching sender */ + srclp = luaproc_dequeue_sender( chan ); + + /* if a match is found, get values from it and (queue) wake it */ + if ( srclp != NULL ) { + + /* move values between Lua states' stacks */ + luaproc_movevalues( srclp->lstate, L ); + + /* return to sender indicanting message was sent */ + lua_pushboolean( srclp->lstate, TRUE ); + srclp->args = 1; + + if ( sched_queue_proc( srclp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) { + + /* unlock channel access */ + luaproc_unlock_channel( chan ); + + /* decrease active luaproc count */ + sched_lpcount_dec(); + + /* close lua_State */ + lua_close( srclp->lstate ); + lua_pushnil( L ); + lua_pushstring( L, "error scheduling process" ); + return 2; + } + + /* unlock channel access */ + luaproc_unlock_channel( chan ); + + return lua_gettop( L ) - 1; + } + + /* otherwise queue (block) the receiving process (sync) or return immediatly (async) */ + else { + + /* if trying an asynchronous receive, unlock channel access and return an error */ + if ( lua_toboolean( L, 2 )) { + /* unlock channel access */ + luaproc_unlock_channel( chan ); + /* return an error */ + lua_pushnil( L ); + lua_pushfstring( L, "no senders waiting on channel %s", chname ); + return 2; + } + + /* otherwise (synchronous receive) simply block process */ + else { + self = luaproc_getself( L ); + + if ( self != NULL ) { + self->stat = LUAPROC_STAT_BLOCKED_RECV; + self->chan = chan; + } + + /* just yield the lua process, channel unlocking will be done by the scheduler */ + return lua_yield( L, lua_gettop( L )); + } + } +} + +LUALIB_API int luaopen_luaproc( lua_State *L ) { + + /* register luaproc functions */ + luaL_register( L, "luaproc", luaproc_funcs_parent ); + + /* initialize recycle list */ + recyclelp = list_new(); + + /* initialize local scheduler */ + sched_init_local( LUAPROC_SCHED_DEFAULT_WORKER_THREADS ); + + return 0; +} + +/* return a process' status */ +int luaproc_get_status( luaproc lp ) { + return lp->stat; +} + +/* set a process' status */ +void luaproc_set_status( luaproc lp, int status ) { + lp->stat = status; +} + +/* return a process' state */ +lua_State *luaproc_get_state( luaproc lp ) { + return lp->lstate; +} + +/* return the number of arguments expected by a given process */ +int luaproc_get_args( luaproc lp ) { + return lp->args; +} + +/* set the number of arguments expected by a given process */ +void luaproc_set_args( luaproc lp, int n ) { + lp->args = n; +} + +/* create a new channel */ +static int luaproc_create_channel( lua_State *L ) { + + const char *chname = luaL_checkstring( L, 1 ); + + /* get exclusive access to operate on channels */ + pthread_mutex_lock( &mutex_channel ); + + /* check if channel exists */ + if ( channel_search( chname ) != NULL ) { + /* free access to operate on channels */ + pthread_mutex_unlock( &mutex_channel ); + /* return an error to lua */ + lua_pushnil( L ); + lua_pushstring( L, "channel already exists" ); + return 2; + } + + channel_create( chname ); + + /* free access to operate on channels */ + pthread_mutex_unlock( &mutex_channel ); + + lua_pushboolean( L, TRUE ); + + return 1; + +} + +/* destroy a channel */ +static int luaproc_destroy_channel( lua_State *L ) { + + channel chan; + luaproc lp; + node nitr; + pthread_mutex_t *chmutex; + pthread_cond_t *chcond; + const char *chname = luaL_checkstring( L, 1 ); + + + /* get exclusive access to operate on channels */ + pthread_mutex_lock( &mutex_channel ); + + /* wait until channel is not in use */ + while((( chan = channel_search( chname )) != NULL ) && ( pthread_mutex_trylock( channel_get_mutex( chan )) != 0 )) { + pthread_cond_wait( channel_get_cond( chan ), &mutex_channel ); + } + + /* free access to operate on channels */ + pthread_mutex_unlock( &mutex_channel ); + + /* if channel is not found, return an error to Lua */ + if ( chan == NULL ) { + lua_pushnil( L ); + lua_pushstring( L, "non-existent channel" ); + return 2; + } + + /* get channel's mutex and conditional pointers */ + chmutex = channel_get_mutex( chan ); + chcond = channel_get_cond( chan ); + + /* search for processes waiting to send a message on this channel */ + while (( nitr = list_pop_head( channel_get_sendq( chan ))) != NULL ) { + + lp = (luaproc )list_data( nitr ); + + /* destroy node (but not associated luaproc) */ + list_destroy_node( nitr ); + + /* return an error so the processe knows the channel was destroyed before the message was sent */ + lua_settop( lp->lstate, 0 ); + lua_pushnil( lp->lstate ); + lua_pushstring( lp->lstate, "channel destroyed while waiting for receiver" ); + lp->args = 2; + + /* schedule the process for execution */ + if ( sched_queue_proc( lp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) { + + /* decrease active luaproc count */ + sched_lpcount_dec(); + + /* close lua_State */ + lua_close( lp->lstate ); + } + } + + /* search for processes waiting to receive a message on this channel */ + while (( nitr = list_pop_head( channel_get_recvq( chan ))) != NULL ) { + + lp = (luaproc )list_data( nitr ); + + /* destroy node (but not associated luaproc) */ + list_destroy_node( nitr ); + + /* return an error so the processe knows the channel was destroyed before the message was received */ + lua_settop( lp->lstate, 0 ); + lua_pushnil( lp->lstate ); + lua_pushstring( lp->lstate, "channel destroyed while waiting for sender" ); + lp->args = 2; + + /* schedule the process for execution */ + if ( sched_queue_proc( lp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) { + + /* decrease active luaproc count */ + sched_lpcount_dec(); + + /* close lua_State */ + lua_close( lp->lstate ); + } + } + + /* get exclusive access to operate on channels */ + pthread_mutex_lock( &mutex_channel ); + /* destroy channel */ + channel_destroy( chan, chname ); + /* broadcast channel not in use */ + pthread_cond_broadcast( chcond ); + /* unlock channel access */ + pthread_mutex_unlock( chmutex ); + /* destroy channel mutex and conditional */ + pthread_mutex_destroy( chmutex ); + pthread_cond_destroy( chcond ); + /* free memory used by channel mutex and conditional */ + free( chmutex ); + free( chcond ); + /* free access to operate on channels */ + pthread_mutex_unlock( &mutex_channel ); + + lua_pushboolean( L, TRUE ); + + return 1; +} + +/* register luaproc's functions in a lua_State */ +void luaproc_register_funcs( lua_State *L ) { + luaL_register( L, "luaproc", luaproc_funcs_child ); +} + +/* return the channel where the corresponding luaproc is blocked at */ +channel luaproc_get_channel( luaproc lp ) { + return lp->chan; +} + +/* unlock access to a channel */ +void luaproc_unlock_channel( channel chan ) { + /* get exclusive access to operate on channels */ + pthread_mutex_lock( &mutex_channel ); + /* unlock channel access */ + pthread_mutex_unlock( channel_get_mutex( chan )); + /* signal channel not in use */ + pthread_cond_signal( channel_get_cond( chan )); + /* free access to operate on channels */ + pthread_mutex_unlock( &mutex_channel ); +} + +/* return status (boolean) indicating if worker thread should be destroyed after luaproc execution */ +int luaproc_get_destroyworker( luaproc lp ) { + return lp->destroyworker; +} + diff --git a/libraries/luaproc/luaproc.h b/libraries/luaproc/luaproc.h new file mode 100644 index 0000000..86617a9 --- /dev/null +++ b/libraries/luaproc/luaproc.h @@ -0,0 +1,92 @@ +/*************************************************** + +Copyright 2008 Alexandre Skyrme, Noemi Rodriguez, Roberto Ierusalimschy + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +***************************************************** + +[luaproc.h] + +****************************************************/ +#ifndef _LUAPROC_H_ +#define _LUAPROC_H_ + +#include "channel.h" + +/* process is idle */ +#define LUAPROC_STAT_IDLE 0 +/* process is ready to run */ +#define LUAPROC_STAT_READY 1 +/* process is blocked on send */ +#define LUAPROC_STAT_BLOCKED_SEND 2 +/* process is blocked on receive */ +#define LUAPROC_STAT_BLOCKED_RECV 3 +/* process is finished */ +#define LUAPROC_STAT_FINISHED 4 + +/* lua process pointer type */ +typedef struct stluaproc *luaproc; + +/* return a process' status */ +int luaproc_get_status( luaproc lp ); + +/* set a process' status */ +void luaproc_set_status( luaproc lp, int status ); + +/* return a process' state */ +lua_State *luaproc_get_state( luaproc lp ); + +/* return the number of arguments expected by a given a process */ +int luaproc_get_args( luaproc lp ); + +/* set the number of arguments expected by a given process */ +void luaproc_set_args( luaproc lp, int n ); + +/* create luaproc (from scheduler) */ +luaproc luaproc_create_sched( char *code ); + +/* register luaproc's functions in a lua_State */ +void luaproc_register_funcs( lua_State *L ); + +/* allow registering of luaproc's functions in c main prog */ +void luaproc_register_lib( lua_State *L ); + +/* queue a luaproc that tried to send a message */ +void luaproc_queue_sender( luaproc lp ); + +/* queue a luaproc that tried to receive a message */ +void luaproc_queue_receiver( luaproc lp ); + +/* unlock a channel's access */ +void luaproc_unlock_channel( channel chan ); + +/* return a luaproc's channel */ +channel luaproc_get_channel( luaproc lp ); + +/* return status (boolean) indicating if worker thread should be destroyed after luaproc execution */ +int luaproc_get_destroyworker( luaproc lp ); + +/* return status (boolean) indicating if lua process should be recycled */ +luaproc luaproc_recycle_pop( void ); + +/* add a lua process to the recycle list */ +int luaproc_recycle_push( luaproc lp ); + +#endif diff --git a/libraries/luaproc/luaproc.lua b/libraries/luaproc/luaproc.lua new file mode 100644 index 0000000..a1a73e4 --- /dev/null +++ b/libraries/luaproc/luaproc.lua @@ -0,0 +1,34 @@ +---------------------------------------------------- +-- +-- Copyright 2008 Alexandre Skyrme, Noemi Rodriguez, Roberto Ierusalimschy +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. +-- +----------------------------------------------------- +-- +-- luaproc.lua +-- +----------------------------------------------------- + +local function so( objname ) + local SOPATH = os.getenv( "LUA_SOPATH" ) or "./" + assert( package.loadlib( SOPATH .. objname .. ".so" , "luaopen_" .. objname ))( ) +end + +so( "luaproc" ) diff --git a/libraries/luaproc/sched.c b/libraries/luaproc/sched.c new file mode 100644 index 0000000..f19beeb --- /dev/null +++ b/libraries/luaproc/sched.c @@ -0,0 +1,314 @@ +/*************************************************** + +Copyright 2008 Alexandre Skyrme, Noemi Rodriguez, Roberto Ierusalimschy + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +***************************************************** + +[sched.c] + +****************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "list.h" +#include "sched.h" +#include "luaproc.h" +#include "channel.h" + +#define TRUE 1 +#define FALSE 0 + +/********* +* globals +*********/ + +/* ready process list */ +list lpready = NULL; + +/* ready process queue access mutex */ +pthread_mutex_t mutex_queue_access = PTHREAD_MUTEX_INITIALIZER; + +/* wake worker up conditional variable */ +pthread_cond_t cond_wakeup_worker = PTHREAD_COND_INITIALIZER; + +/* active luaproc count access mutex */ +pthread_mutex_t mutex_lp_count = PTHREAD_MUTEX_INITIALIZER; + +/* no active luaproc conditional variable */ +pthread_cond_t cond_no_active_lp = PTHREAD_COND_INITIALIZER; + +/* number of active luaprocs */ +int lpcount = 0; + +/* no more lua processes flag */ +int no_more_processes = FALSE; + +/* worker thread main function */ +void *workermain( void *args ) { + + node n; + luaproc lp; + int procstat; + int destroyworker; + + /* detach thread so resources are freed as soon as thread exits (no further joining) */ + pthread_detach( pthread_self( )); + + /* main worker loop */ + while ( 1 ) { + + /* get exclusive access to the ready process queue */ + pthread_mutex_lock( &mutex_queue_access ); + + /* wait until instructed to wake up (because there's work to do or because its time to finish) */ + while (( list_node_count( lpready ) == 0 ) && ( no_more_processes == FALSE )) { + pthread_cond_wait( &cond_wakeup_worker, &mutex_queue_access ); + } + + /* pop the first node from the ready process queue */ + n = list_pop_head( lpready ); + + /* ensure list pop succeeded before proceeding */ + if ( n != NULL ) { + /* get the popped node's data content (ie, the lua process struct) */ + lp = (luaproc )list_data( n ); + } + else { + /* free access to the process ready queue */ + pthread_mutex_unlock( &mutex_queue_access ); + /* finished thread */ + pthread_exit( NULL ); + } + + /* free access to the process ready queue */ + pthread_mutex_unlock( &mutex_queue_access ); + + /* execute the lua code specified in the lua process struct */ + procstat = lua_resume( luaproc_get_state( lp ), luaproc_get_args( lp )); + + /* reset the process argument count */ + luaproc_set_args( lp, 0 ); + + /* check if process finished its whole execution */ + if ( procstat == 0 ) { + + /* destroy the corresponding list node */ + list_destroy_node( n ); + + /* check if worker thread should be destroyed */ + destroyworker = luaproc_get_destroyworker( lp ); + + /* set process status to finished */ + luaproc_set_status( lp, LUAPROC_STAT_FINISHED ); + + /* check if lua process should be recycled and, if not, destroy it */ + if ( luaproc_recycle_push( lp ) == FALSE ) { + lua_close( luaproc_get_state( lp )); + } + + /* decrease active lua process count */ + sched_lpcount_dec(); + + /* check if thread should be finished after lua process conclusion */ + if ( destroyworker ) { + /* if so, finish thread */ + pthread_exit( NULL ); + } + } + + /* check if process yielded */ + else if ( procstat == LUA_YIELD ) { + + /* if so, further check if yield originated from an unmatched send/recv operation */ + if ( luaproc_get_status( lp ) == LUAPROC_STAT_BLOCKED_SEND ) { + /* queue blocked lua process on corresponding channel */ + luaproc_queue_sender( lp ); + /* unlock channel access */ + luaproc_unlock_channel( luaproc_get_channel( lp )); + /* destroy node (but not the associated Lua process) */ + list_destroy_node( n ); + } + + else if ( luaproc_get_status( lp ) == LUAPROC_STAT_BLOCKED_RECV ) { + /* queue blocked lua process on corresponding channel */ + luaproc_queue_receiver( lp ); + /* unlock channel access */ + luaproc_unlock_channel( luaproc_get_channel( lp )); + /* destroy node (but not the associated Lua process) */ + list_destroy_node( n ); + } + + /* or if yield resulted from an explicit call to coroutine.yield in the lua code being executed */ + else { + /* get exclusive access to the ready process queue */ + pthread_mutex_lock( &mutex_queue_access ); + /* re-insert the job at the end of the ready process queue */ + list_add( lpready, n ); + /* free access to the process ready queue */ + pthread_mutex_unlock( &mutex_queue_access ); + } + } + + /* check if there was any execution error (LUA_ERRRUN, LUA_ERRSYNTAX, LUA_ERRMEM or LUA_ERRERR) */ + else { + /* destroy the corresponding node */ + list_destroy_node( n ); + /* print error message */ + fprintf( stderr, "close lua_State (error: %s)\n", luaL_checkstring( luaproc_get_state( lp ), -1 )); + /* close lua state */ + lua_close( luaproc_get_state( lp )); + /* decrease active lua process count */ + sched_lpcount_dec(); + } + } +} + +/* local scheduler initialization */ +int sched_init_local( int numworkers ) { + + int tid; + int workercount = 0; + pthread_t worker; + + /* initialize ready process list */ + lpready = list_new(); + + /* initialize channels */ + channel_init(); + + /* create initial worker threads */ + for ( tid = 0; tid < numworkers; tid++ ) { + if ( pthread_create( &worker, NULL, workermain, NULL ) == 0 ) { + workercount++; + } + } + + if ( workercount != numworkers ) { + return LUAPROC_SCHED_INIT_ERROR; + } + + return LUAPROC_SCHED_OK; +} + +/* exit scheduler */ +void sched_exit( void ) { + + /* get exclusive access to the ready process queue */ + pthread_mutex_lock( &mutex_queue_access ); + /* destroy the ready process list */ + list_destroy( lpready ); + /* free access to the process ready queue */ + pthread_mutex_unlock( &mutex_queue_access ); +} + +/* move process to ready queue (ie, schedule process) */ +int sched_queue_proc( luaproc lp ) { + + /* get exclusive access to the ready process queue */ + pthread_mutex_lock( &mutex_queue_access ); + + /* add process to ready queue */ + if ( list_add( lpready, list_new_node( lp )) != NULL ) { + + /* set process status to ready */ + luaproc_set_status( lp, LUAPROC_STAT_READY ); + + /* wake worker up */ + pthread_cond_signal( &cond_wakeup_worker ); + /* free access to the process ready queue */ + pthread_mutex_unlock( &mutex_queue_access ); + + return LUAPROC_SCHED_QUEUE_PROC_OK; + } + else { + /* free access to the process ready queue */ + pthread_mutex_unlock( &mutex_queue_access ); + + return LUAPROC_SCHED_QUEUE_PROC_ERR; + } +} + +/* synchronize worker threads */ +void sched_join_workerthreads( void ) { + + pthread_mutex_lock( &mutex_lp_count ); + + /* wait until there is no more active lua processes */ + while( lpcount != 0 ) { + pthread_cond_wait( &cond_no_active_lp, &mutex_lp_count ); + } + /* get exclusive access to the ready process queue */ + pthread_mutex_lock( &mutex_queue_access ); + /* set the no more active lua processes flag to true */ + no_more_processes = TRUE; + /* wake ALL workers up */ + pthread_cond_broadcast( &cond_wakeup_worker ); + /* free access to the process ready queue */ + pthread_mutex_unlock( &mutex_queue_access ); + /* wait for (join) worker threads */ + pthread_exit( NULL ); + + pthread_mutex_unlock( &mutex_lp_count ); + +} + +/* increase active lua process count */ +void sched_lpcount_inc( void ) { + pthread_mutex_lock( &mutex_lp_count ); + lpcount++; + pthread_mutex_unlock( &mutex_lp_count ); +} + +/* decrease active lua process count */ +void sched_lpcount_dec( void ) { + pthread_mutex_lock( &mutex_lp_count ); + lpcount--; + /* if count reaches zero, signal there are no more active processes */ + if ( lpcount == 0 ) { + pthread_cond_signal( &cond_no_active_lp ); + } + pthread_mutex_unlock( &mutex_lp_count ); +} + +/* create a new worker pthread */ +int sched_create_worker( void ) { + + pthread_t worker; + + /* create a new pthread */ + if ( pthread_create( &worker, NULL, workermain, NULL ) != 0 ) { + return LUAPROC_SCHED_PTHREAD_ERROR; + } + + return LUAPROC_SCHED_OK; +} + diff --git a/libraries/luaproc/sched.h b/libraries/luaproc/sched.h new file mode 100644 index 0000000..c03e6ea --- /dev/null +++ b/libraries/luaproc/sched.h @@ -0,0 +1,78 @@ +/*************************************************** + +Copyright 2008 Alexandre Skyrme, Noemi Rodriguez, Roberto Ierusalimschy + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +***************************************************** + +[sched.h] + +****************************************************/ +#ifndef _SCHED_H_ +#define _SCHED_H_ + +#include "luaproc.h" + +/* scheduler function return constants */ +#define LUAPROC_SCHED_OK 0 +#define LUAPROC_SCHED_SOCKET_ERROR -1 +#define LUAPROC_SCHED_SETSOCKOPT_ERROR -2 +#define LUAPROC_SCHED_BIND_ERROR -3 +#define LUAPROC_SCHED_LISTEN_ERROR -4 +#define LUAPROC_SCHED_FORK_ERROR -5 +#define LUAPROC_SCHED_PTHREAD_ERROR -6 +#define LUAPROC_SCHED_INIT_ERROR -7 + +/* ready process queue insertion status */ +#define LUAPROC_SCHED_QUEUE_PROC_OK 0 +#define LUAPROC_SCHED_QUEUE_PROC_ERR -1 + +/* scheduler listener service default hostname and port */ +#define LUAPROC_SCHED_DEFAULT_HOST "127.0.0.1" +#define LUAPROC_SCHED_DEFAULT_PORT 3133 + +/* scheduler default number of worker threads */ +#define LUAPROC_SCHED_DEFAULT_WORKER_THREADS 1 + +/* initialize local scheduler */ +int sched_init_local( int numworkers ); + +/* initialize socket enabled scheduler */ +int sched_init_socket( int numworkers, const char *host, int port ); + +/* exit scheduler */ +void sched_exit( void ); + +/* move process to ready queue (ie, schedule process) */ +int sched_queue_proc( luaproc lp ); + +/* join all worker threads and exit */ +void sched_join_workerthreads( void ); + +/* increase active luaproc count */ +void sched_lpcount_inc( void ); + +/* decrease active luaproc count */ +void sched_lpcount_dec( void ); + +/* create a new worker pthread */ +int sched_create_worker( void ); + +#endif diff --git a/libraries/luaproc/test.lua b/libraries/luaproc/test.lua new file mode 100644 index 0000000..5868d6f --- /dev/null +++ b/libraries/luaproc/test.lua @@ -0,0 +1,39 @@ +---------------------------------------------------- +-- +-- Copyright 2008 Alexandre Skyrme, Noemi Rodriguez, Roberto Ierusalimschy +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. +-- +----------------------------------------------------- +-- +-- test.lua +-- +----------------------------------------------------- + +require "luaproc" + +luaproc.createworker() + +luaproc.newproc( [=[ + luaproc.newchannel( "testchannel" ) + luaproc.newproc( "luaproc.send( 'testchannel', 'luaproc is working fine!' )" ) + luaproc.newproc( "print( luaproc.receive( 'testchannel' ))" ) +]=] ) + +luaproc.exit() -- cgit v1.1