aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/libraries/luaproc/luaproc.c
diff options
context:
space:
mode:
Diffstat (limited to 'libraries/luaproc/luaproc.c')
-rw-r--r--libraries/luaproc/luaproc.c931
1 files changed, 0 insertions, 931 deletions
diff --git a/libraries/luaproc/luaproc.c b/libraries/luaproc/luaproc.c
deleted file mode 100644
index 78c713f..0000000
--- a/libraries/luaproc/luaproc.c
+++ /dev/null
@@ -1,931 +0,0 @@
1/***************************************************
2
3Copyright 2008 Alexandre Skyrme, Noemi Rodriguez, Roberto Ierusalimschy
4
5Permission is hereby granted, free of charge, to any person obtaining a copy
6of this software and associated documentation files (the "Software"), to deal
7in the Software without restriction, including without limitation the rights
8to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9copies of the Software, and to permit persons to whom the Software is
10furnished to do so, subject to the following conditions:
11
12The above copyright notice and this permission notice shall be included in
13all copies or substantial portions of the Software.
14
15THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21THE SOFTWARE.
22
23*****************************************************
24
25[luaproc.c]
26
27****************************************************/
28
29#include <netdb.h>
30#include <pthread.h>
31#include <stdio.h>
32#include <stdlib.h>
33#include <string.h>
34#include <unistd.h>
35#include <lua.h>
36#include <lauxlib.h>
37#include <lualib.h>
38
39#include "luaproc.h"
40#include "list.h"
41#include "sched.h"
42#include "channel.h"
43
44
45#define FALSE 0
46#define TRUE 1
47
48/*********
49* globals
50*********/
51
52/* channel operations mutex */
53pthread_mutex_t mutex_channel = PTHREAD_MUTEX_INITIALIZER;
54
55/* recycle list mutex */
56pthread_mutex_t mutex_recycle_list = PTHREAD_MUTEX_INITIALIZER;
57
58/* recycled lua process list */
59list recyclelp = NULL;
60
61/* maximum lua processes to recycle */
62int recyclemax = 0;
63
64/* lua process */
65struct stluaproc {
66 lua_State *lstate;
67 int stat;
68 int args;
69 channel chan;
70 int destroyworker;
71 void *data;
72 Ecore_Cb callback;
73};
74
75/* TODO - hack, duplicating something from LuaSL for now. */
76typedef struct
77{
78 void *script;
79 char message[PATH_MAX];
80} scriptMessage;
81
82
83/******************************
84* library functions prototypes
85******************************/
86/* create a new lua process */
87static int luaproc_create_newproc( lua_State *L );
88/* send a message to a lua process */
89static int luaproc_send( lua_State *L );
90/* receive a message from a lua process */
91static int luaproc_receive( lua_State *L );
92/* create a new channel */
93static int luaproc_create_channel( lua_State *L );
94/* destroy a channel */
95static int luaproc_destroy_channel( lua_State *L );
96/* wait until all luaprocs have finished and exit */
97static int luaproc_exit( lua_State *L );
98/* create a new worker */
99static int luaproc_create_worker( lua_State *L );
100/* destroy a worker */
101static int luaproc_destroy_worker( lua_State *L );
102/* set amount of lua processes that should be recycled (ie, reused) */
103static int luaproc_recycle_set( lua_State *L );
104/* send a message back to the main loop */
105static int luaproc_send_back( lua_State *L );
106
107/* luaproc function registration array - main (parent) functions */
108static const struct luaL_reg luaproc_funcs_parent[] = {
109 { "newproc", luaproc_create_newproc },
110 { "exit", luaproc_exit },
111 { "createworker", luaproc_create_worker },
112 { "destroyworker", luaproc_destroy_worker },
113 { "recycle", luaproc_recycle_set },
114 { "sendback", luaproc_send_back },
115 { NULL, NULL }
116};
117
118/* luaproc function registration array - newproc (child) functions */
119static const struct luaL_reg luaproc_funcs_child[] = {
120 { "newproc", luaproc_create_newproc },
121 { "send", luaproc_send },
122 { "receive", luaproc_receive },
123 { "newchannel", luaproc_create_channel },
124 { "delchannel", luaproc_destroy_channel },
125 { "createworker", luaproc_create_worker },
126 { "destroyworker", luaproc_destroy_worker },
127 { "recycle", luaproc_recycle_set },
128 { "sendback", luaproc_send_back },
129 { NULL, NULL }
130};
131
132/*
133static void registerlib( lua_State *L, const char *name, lua_CFunction f ) {
134 lua_getglobal( L, "package" );
135 lua_getfield( L, -1, "preload" );
136 lua_pushcfunction( L, f );
137 lua_setfield( L, -2, name );
138 lua_pop( L, 2 );
139}
140*/
141static void openlibs( lua_State *L ) {
142/*
143 lua_cpcall( L, luaopen_base, NULL );
144 lua_cpcall( L, luaopen_package, NULL );
145 registerlib( L, "io", luaopen_io );
146 registerlib( L, "os", luaopen_os );
147 registerlib( L, "table", luaopen_table );
148 registerlib( L, "string", luaopen_string );
149 registerlib( L, "math", luaopen_math );
150 registerlib( L, "debug", luaopen_debug );
151*/
152 luaL_openlibs(L);
153}
154
155/* return status (boolean) indicating if lua process should be recycled */
156luaproc luaproc_recycle_pop( void ) {
157
158 luaproc lp;
159 node n;
160
161 /* get exclusive access to operate on recycle list */
162 pthread_mutex_lock( &mutex_recycle_list );
163
164 /* check if there are any lua processes on recycle list */
165 if ( list_node_count( recyclelp ) > 0 ) {
166 /* pop list head */
167 n = list_pop_head( recyclelp );
168 /* free access to operate on recycle list */
169 pthread_mutex_unlock( &mutex_recycle_list );
170 /* find associated luaproc */
171 lp = (luaproc )list_data( n );
172 /* destroy node (but not associated luaproc) */
173 list_destroy_node( n );
174 /* return associated luaproc */
175 return lp;
176 }
177
178 /* free access to operate on recycle list */
179 pthread_mutex_unlock( &mutex_recycle_list );
180
181 /* if no lua processes are available simply return null */
182 return NULL;
183}
184
185/* check if lua process should be recycled and, in case so, add it to the recycle list */
186int luaproc_recycle_push( luaproc lp ) {
187
188 node n;
189
190 /* get exclusive access to operate on recycle list */
191 pthread_mutex_lock( &mutex_recycle_list );
192
193 /* check if amount of lua processes currently on recycle list is greater than
194 or equal to the maximum amount of lua processes that should be recycled */
195 if ( list_node_count( recyclelp ) >= recyclemax ) {
196 /* free access to operate on recycle list */
197 pthread_mutex_unlock( &mutex_recycle_list );
198 /* if so, lua process should NOT be recycled and should be destroyed */
199 return FALSE;
200 }
201 /* otherwise, lua process should be added to recycle list */
202 n = list_new_node( lp );
203 if ( n == NULL ) {
204 /* free access to operate on recycle list */
205 pthread_mutex_unlock( &mutex_recycle_list );
206 /* in case of errors, lua process should be destroyed */
207 return FALSE;
208 }
209 list_add( recyclelp, n );
210 /* free access to operate on recycle list */
211 pthread_mutex_unlock( &mutex_recycle_list );
212 /* since lua process will be recycled, it should not be destroyed */
213 return TRUE;
214}
215
216/* create new luaproc */
217static luaproc luaproc_new( const char *code, int destroyflag, int file) {
218
219 luaproc lp;
220 int ret;
221 /* create new lua state */
222 lua_State *lpst = luaL_newstate( );
223 /* store the luaproc struct in its own lua state */
224 lp = (luaproc )lua_newuserdata( lpst, sizeof( struct stluaproc ));
225 lua_setfield( lpst, LUA_REGISTRYINDEX, "_SELF" );
226
227 lp->lstate = lpst;
228 lp->stat = LUAPROC_STAT_IDLE;
229 lp->args = 0;
230 lp->chan = NULL;
231 lp->destroyworker = destroyflag;
232
233 /* load standard libraries */
234 openlibs( lpst );
235
236 /* register luaproc's own functions */
237 luaL_register( lpst, "luaproc", luaproc_funcs_child );
238
239 /* load process' code */
240 if (file)
241 ret = luaL_loadfile( lpst, code );
242 else
243 ret = luaL_loadstring( lpst, code );
244 /* in case of errors, destroy recently created lua process */
245 if ( ret != 0 ) {
246 lua_close( lpst );
247 return NULL;
248 }
249
250 /* return recently created lua process */
251 return lp;
252}
253
254/* synchronize worker threads and exit */
255static int luaproc_exit( lua_State *L ) {
256 sched_join_workerthreads();
257 return 0;
258}
259
260/* create a new worker pthread */
261static int luaproc_create_worker( lua_State *L ) {
262
263 if ( sched_create_worker( ) != LUAPROC_SCHED_OK ) {
264 lua_pushnil( L );
265 lua_pushstring( L, "error creating worker" );
266 return 2;
267 }
268
269 lua_pushboolean( L, TRUE );
270 return 1;
271}
272
273/* set amount of lua processes that should be recycled (ie, reused) */
274static int luaproc_recycle_set( lua_State *L ) {
275
276 node n;
277 luaproc lp;
278 int max = luaL_checkint( L, 1 );
279
280 /* check if function argument represents a reasonable value */
281 if ( max < 0 ) {
282 /* in case of errors return nil + error msg */
283 lua_pushnil( L );
284 lua_pushstring( L, "error setting recycle limit to negative value" );
285 return 2;
286 }
287
288 /* get exclusive access to operate on recycle list */
289 pthread_mutex_lock( &mutex_recycle_list );
290
291 /* set maximum lua processes that should be recycled */
292 recyclemax = max;
293
294 /* destroy recycle list excessive nodes (and corresponding lua processes) */
295 while ( list_node_count( recyclelp ) > max ) {
296 /* get first node from recycle list */
297 n = list_pop_head( recyclelp );
298 /* find associated luaproc */
299 lp = (luaproc )list_data( n );
300 /* destroy node */
301 list_destroy_node( n );
302 /* close associated lua_State */
303 lua_close( lp->lstate );
304 }
305
306 /* free access to operate on recycle list */
307 pthread_mutex_unlock( &mutex_recycle_list );
308
309 lua_pushboolean( L, TRUE );
310 return 1;
311}
312
313
314/* destroy a worker pthread */
315static int luaproc_destroy_worker( lua_State *L ) {
316
317 /* new lua process pointer */
318 luaproc lp;
319
320 /* create new lua process with empty code and destroy worker flag set to true
321 (ie, conclusion of lua process WILL result in worker thread destruction */
322 lp = luaproc_new( "", TRUE, FALSE );
323
324 /* ensure process creation was successfull */
325 if ( lp == NULL ) {
326 /* in case of errors return nil + error msg */
327 lua_pushnil( L );
328 lua_pushstring( L, "error destroying worker" );
329 return 2;
330 }
331
332 /* increase active luaproc count */
333 sched_lpcount_inc();
334
335 /* schedule luaproc */
336 if ( sched_queue_proc( lp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) {
337 printf( "[luaproc] error queueing Lua process\n" );
338 /* decrease active luaproc count */
339 sched_lpcount_dec();
340 /* close lua_State */
341 lua_close( lp->lstate );
342 /* return nil + error msg */
343 lua_pushnil( L );
344 lua_pushstring( L, "error destroying worker" );
345 return 2;
346 }
347
348 lua_pushboolean( L, TRUE );
349 return 1;
350}
351
352/* recycle a lua process */
353static luaproc luaproc_recycle( luaproc lp, const char *code, int file ) {
354
355 int ret;
356
357 /* reset struct members */
358 lp->stat = LUAPROC_STAT_IDLE;
359 lp->args = 0;
360 lp->chan = NULL;
361 lp->destroyworker = FALSE;
362
363 /* load process' code */
364 ret = luaL_loadstring( lp->lstate, code );
365
366 /* in case of errors, destroy lua process */
367 if ( ret != 0 ) {
368 lua_close( lp->lstate );
369 return NULL;
370 }
371
372 /* return recycled lua process */
373 return lp;
374}
375
376
377int newProc(const char *code, int file, Ecore_Cb callback, void *data)
378{
379 /* new lua process pointer */
380 luaproc lp;
381
382 /* check if existing lua process should be recycled to avoid new creation */
383 lp = luaproc_recycle_pop( );
384
385 /* if there is a lua process available on the recycle queue, recycle it */
386 if ( lp != NULL ) {
387 lp = luaproc_recycle( lp, code, file );
388 }
389 /* otherwise create a new one from scratch */
390 else {
391 /* create new lua process with destroy worker flag set to false
392 (ie, conclusion of lua process will NOT result in worker thread destruction */
393 lp = luaproc_new( code, FALSE, file );
394 }
395
396 /* ensure process creation was successfull */
397 if ( lp == NULL ) {
398 return 1;
399 }
400
401 /* Stash any data and callback given to us. */
402 lp->data = data;
403 lp->callback = callback;
404
405 /* increase active luaproc count */
406 sched_lpcount_inc();
407
408 /* schedule luaproc */
409 if ( sched_queue_proc( lp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) {
410 printf( "[luaproc] error queueing Lua process\n" );
411 /* decrease active luaproc count */
412 sched_lpcount_dec();
413 /* close lua_State */
414 lua_close( lp->lstate );
415 return 2;
416 }
417
418 return 0;
419}
420
421/* create and schedule a new lua process (luaproc.newproc) */
422static int luaproc_create_newproc( lua_State *L ) {
423
424 /* check if first argument is a string (lua code) */
425 const char *code = luaL_checkstring( L, 1 );
426
427 switch (newProc(code, FALSE, NULL, NULL))
428 {
429 case 1 :
430 /* in case of errors return nil + error msg */
431 lua_pushnil( L );
432 lua_pushstring( L, "error loading code string" );
433 return 2;
434 case 2 :
435 /* return nil + error msg */
436 lua_pushnil( L );
437 lua_pushstring( L, "error queuing process" );
438 return 2;
439 }
440
441 lua_pushboolean( L, TRUE );
442 return 1;
443}
444
445/* queue a lua process sending a message without a matching receiver */
446void luaproc_queue_sender( luaproc lp ) {
447 /* add the sending process to this process' send queue */
448 list_add( channel_get_sendq( lp->chan ), list_new_node( lp ));
449}
450
451/* dequeue a lua process sending a message with a receiver match */
452luaproc luaproc_dequeue_sender( channel chan ) {
453
454 node n;
455 luaproc lp;
456
457 if ( list_node_count( channel_get_sendq( chan )) > 0 ) {
458 /* get first node from channel's send queue */
459 n = list_pop_head( channel_get_sendq( chan ));
460 /* find associated luaproc */
461 lp = (luaproc )list_data( n );
462 /* destroy node (but not associated luaproc) */
463 list_destroy_node( n );
464 /* return associated luaproc */
465 return lp;
466 }
467
468 return NULL;
469}
470
471/* queue a luc process receiving a message without a matching sender */
472void luaproc_queue_receiver( luaproc lp ) {
473 /* add the receiving process to this process' receive queue */
474 list_add( channel_get_recvq( lp->chan ), list_new_node( lp ));
475}
476
477/* dequeue a lua process receiving a message with a sender match */
478luaproc luaproc_dequeue_receiver( channel chan ) {
479
480 node n;
481 luaproc lp;
482
483 if ( list_node_count( channel_get_recvq( chan )) > 0 ) {
484 /* get first node from channel's recv queue */
485 n = list_pop_head( channel_get_recvq( chan ));
486 /* find associated luaproc */
487 lp = (luaproc )list_data( n );
488 /* destroy node (but not associated luaproc) */
489 list_destroy_node( n );
490 /* return associated luaproc */
491 return lp;
492 }
493
494 return NULL;
495}
496
497/* moves values between lua states' stacks */
498void luaproc_movevalues( lua_State *Lfrom, lua_State *Lto ) {
499
500 int i;
501 int n = lua_gettop( Lfrom );
502
503 /* move values between lua states' stacks */
504 for ( i = 2; i <= n; i++ ) {
505 lua_pushstring( Lto, lua_tostring( Lfrom, i ));
506 }
507}
508
509/* return the lua process associated with a given lua state */
510luaproc luaproc_getself( lua_State *L ) {
511 luaproc lp;
512 lua_getfield( L, LUA_REGISTRYINDEX, "_SELF" );
513 lp = (luaproc )lua_touserdata( L, -1 );
514 lua_pop( L, 1 );
515 return lp;
516}
517
518/* send a message to a lua process */
519static int luaproc_send_back( lua_State *L ) {
520
521 luaproc self;
522 const char *message = luaL_checkstring( L, 1 );
523
524 self = luaproc_getself( L );
525 if (self && self->callback && self->data)
526 {
527 scriptMessage *sm = calloc(1, sizeof(scriptMessage));
528
529 if (sm)
530 {
531 sm->script = self->data;
532 strcpy(sm->message, message);
533 ecore_main_loop_thread_safe_call_async(self->callback, sm);
534 }
535 }
536
537 return 0;
538}
539
540/* error messages for the sendToChannel function */
541const char *sendToChannelErrors[] =
542{
543 "non-existent channel",
544 "error scheduling process"
545};
546
547// TODO - If these come in too quick, then messages might get lost. Also, in at least one case, it locked up this thread I think.
548
549/* send a message to a lua process */
550const char *sendToChannel(const char *chname, const char *message, luaproc *dst, channel *chn)
551{
552 const char *result = NULL;
553 channel chan;
554 luaproc dstlp;
555
556 /* get exclusive access to operate on channels */
557 pthread_mutex_lock(&mutex_channel);
558
559 /* wait until channel is not in use */
560 while( ((chan = channel_search(chname)) != NULL) && (pthread_mutex_trylock(channel_get_mutex(chan)) != 0 ))
561 {
562 pthread_cond_wait(channel_get_cond(chan), &mutex_channel);
563 }
564
565 /* free access to operate on channels */
566 pthread_mutex_unlock(&mutex_channel);
567
568 /* if channel is not found, return an error */
569 if (chan == NULL)
570 return sendToChannelErrors[0];
571
572 /* try to find a matching receiver */
573 dstlp = luaproc_dequeue_receiver(chan);
574
575 /* if a match is found, send the message to it and (queue) wake it */
576 if (dstlp != NULL)
577 {
578 /* push the message onto the receivers stack */
579 lua_pushstring( dstlp->lstate, message);
580
581 dstlp->args = lua_gettop(dstlp->lstate) - 1;
582
583 if (sched_queue_proc(dstlp) != LUAPROC_SCHED_QUEUE_PROC_OK)
584 {
585 /* unlock channel access */
586 luaproc_unlock_channel(chan);
587
588 /* decrease active luaproc count */
589 sched_lpcount_dec();
590
591 /* close lua_State */
592 lua_close(dstlp->lstate);
593 return sendToChannelErrors[1];
594 }
595
596 /* unlock channel access */
597 luaproc_unlock_channel(chan);
598 }
599 else if (dst)
600 dst = &dstlp;
601
602 if (chn)
603 chn = &chan;
604 return result;
605}
606
607/* send a message to a lua process */
608static int luaproc_send( lua_State *L ) {
609
610 channel chan;
611 luaproc dstlp, self;
612 const char *chname = luaL_checkstring( L, 1 );
613 const char *message = luaL_checkstring( L, 2 );
614 const char *result = sendToChannel(chname, message, &dstlp, &chan);
615
616 if (result) {
617 lua_pushnil( L );
618 lua_pushstring( L, result );
619 return 2;
620 }
621
622 if ( dstlp == NULL ) {
623
624 self = luaproc_getself( L );
625
626 if ( self != NULL ) {
627 self->stat = LUAPROC_STAT_BLOCKED_SEND;
628 self->chan = chan;
629 }
630
631 /* just yield the lua process, channel unlocking will be done by the scheduler */
632 return lua_yield( L, lua_gettop( L ));
633 }
634
635 lua_pushboolean( L, TRUE );
636 return 1;
637}
638
639/* receive a message from a lua process */
640static int luaproc_receive( lua_State *L ) {
641
642 channel chan;
643 luaproc srclp, self;
644 const char *chname = luaL_checkstring( L, 1 );
645
646 /* get exclusive access to operate on channels */
647 pthread_mutex_lock( &mutex_channel );
648
649 /* wait until channel is not in use */
650 while((( chan = channel_search( chname )) != NULL ) && ( pthread_mutex_trylock( channel_get_mutex( chan )) != 0 )) {
651 pthread_cond_wait( channel_get_cond( chan ), &mutex_channel );
652 }
653
654 /* free access to operate on channels */
655 pthread_mutex_unlock( &mutex_channel );
656
657 /* if channel is not found, free access to operate on channels and return an error to Lua */
658 if ( chan == NULL ) {
659 lua_pushnil( L );
660 lua_pushstring( L, "non-existent channel" );
661 return 2;
662 }
663
664 /* try to find a matching sender */
665 srclp = luaproc_dequeue_sender( chan );
666
667 /* if a match is found, get values from it and (queue) wake it */
668 if ( srclp != NULL ) {
669
670 /* move values between Lua states' stacks */
671 luaproc_movevalues( srclp->lstate, L );
672
673 /* return to sender indicanting message was sent */
674 lua_pushboolean( srclp->lstate, TRUE );
675 srclp->args = 1;
676
677 if ( sched_queue_proc( srclp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) {
678
679 /* unlock channel access */
680 luaproc_unlock_channel( chan );
681
682 /* decrease active luaproc count */
683 sched_lpcount_dec();
684
685 /* close lua_State */
686 lua_close( srclp->lstate );
687 lua_pushnil( L );
688 lua_pushstring( L, "error scheduling process" );
689 return 2;
690 }
691
692 /* unlock channel access */
693 luaproc_unlock_channel( chan );
694
695 return lua_gettop( L ) - 1;
696 }
697
698 /* otherwise queue (block) the receiving process (sync) or return immediatly (async) */
699 else {
700
701 /* if trying an asynchronous receive, unlock channel access and return an error */
702 if ( lua_toboolean( L, 2 )) {
703 /* unlock channel access */
704 luaproc_unlock_channel( chan );
705 /* return an error */
706 lua_pushnil( L );
707 lua_pushfstring( L, "no senders waiting on channel %s", chname );
708 return 2;
709 }
710
711 /* otherwise (synchronous receive) simply block process */
712 else {
713 self = luaproc_getself( L );
714
715 if ( self != NULL ) {
716 self->stat = LUAPROC_STAT_BLOCKED_RECV;
717 self->chan = chan;
718 }
719
720 /* just yield the lua process, channel unlocking will be done by the scheduler */
721 return lua_yield( L, lua_gettop( L ));
722 }
723 }
724}
725
726void luaprocInit(void)
727{
728 /* initialize recycle list */
729 recyclelp = list_new();
730
731 /* initialize local scheduler */
732 sched_init_local( LUAPROC_SCHED_DEFAULT_WORKER_THREADS );
733}
734
735void luaprocRegister(lua_State *L)
736{
737 /* register luaproc functions */
738 luaL_register( L, "luaproc", luaproc_funcs_parent );
739}
740
741LUALIB_API int luaopen_luaproc( lua_State *L ) {
742 luaprocRegister(L);
743 luaprocInit();
744 return 0;
745}
746
747/* return a process' status */
748int luaproc_get_status( luaproc lp ) {
749 return lp->stat;
750}
751
752/* set a process' status */
753void luaproc_set_status( luaproc lp, int status ) {
754 lp->stat = status;
755}
756
757/* return a process' state */
758lua_State *luaproc_get_state( luaproc lp ) {
759 return lp->lstate;
760}
761
762/* return the number of arguments expected by a given process */
763int luaproc_get_args( luaproc lp ) {
764 return lp->args;
765}
766
767/* set the number of arguments expected by a given process */
768void luaproc_set_args( luaproc lp, int n ) {
769 lp->args = n;
770}
771
772/* create a new channel */
773static int luaproc_create_channel( lua_State *L ) {
774
775 const char *chname = luaL_checkstring( L, 1 );
776
777 /* get exclusive access to operate on channels */
778 pthread_mutex_lock( &mutex_channel );
779
780 /* check if channel exists */
781 if ( channel_search( chname ) != NULL ) {
782 /* free access to operate on channels */
783 pthread_mutex_unlock( &mutex_channel );
784 /* return an error to lua */
785 lua_pushnil( L );
786 lua_pushstring( L, "channel already exists" );
787 return 2;
788 }
789
790 channel_create( chname );
791
792 /* free access to operate on channels */
793 pthread_mutex_unlock( &mutex_channel );
794
795 lua_pushboolean( L, TRUE );
796
797 return 1;
798
799}
800
801/* destroy a channel */
802static int luaproc_destroy_channel( lua_State *L ) {
803
804 channel chan;
805 luaproc lp;
806 node nitr;
807 pthread_mutex_t *chmutex;
808 pthread_cond_t *chcond;
809 const char *chname = luaL_checkstring( L, 1 );
810
811
812 /* get exclusive access to operate on channels */
813 pthread_mutex_lock( &mutex_channel );
814
815 /* wait until channel is not in use */
816 while((( chan = channel_search( chname )) != NULL ) && ( pthread_mutex_trylock( channel_get_mutex( chan )) != 0 )) {
817 pthread_cond_wait( channel_get_cond( chan ), &mutex_channel );
818 }
819
820 /* free access to operate on channels */
821 pthread_mutex_unlock( &mutex_channel );
822
823 /* if channel is not found, return an error to Lua */
824 if ( chan == NULL ) {
825 lua_pushnil( L );
826 lua_pushstring( L, "non-existent channel" );
827 return 2;
828 }
829
830 /* get channel's mutex and conditional pointers */
831 chmutex = channel_get_mutex( chan );
832 chcond = channel_get_cond( chan );
833
834 /* search for processes waiting to send a message on this channel */
835 while (( nitr = list_pop_head( channel_get_sendq( chan ))) != NULL ) {
836
837 lp = (luaproc )list_data( nitr );
838
839 /* destroy node (but not associated luaproc) */
840 list_destroy_node( nitr );
841
842 /* return an error so the processe knows the channel was destroyed before the message was sent */
843 lua_settop( lp->lstate, 0 );
844 lua_pushnil( lp->lstate );
845 lua_pushstring( lp->lstate, "channel destroyed while waiting for receiver" );
846 lp->args = 2;
847
848 /* schedule the process for execution */
849 if ( sched_queue_proc( lp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) {
850
851 /* decrease active luaproc count */
852 sched_lpcount_dec();
853
854 /* close lua_State */
855 lua_close( lp->lstate );
856 }
857 }
858
859 /* search for processes waiting to receive a message on this channel */
860 while (( nitr = list_pop_head( channel_get_recvq( chan ))) != NULL ) {
861
862 lp = (luaproc )list_data( nitr );
863
864 /* destroy node (but not associated luaproc) */
865 list_destroy_node( nitr );
866
867 /* return an error so the processe knows the channel was destroyed before the message was received */
868 lua_settop( lp->lstate, 0 );
869 lua_pushnil( lp->lstate );
870 lua_pushstring( lp->lstate, "channel destroyed while waiting for sender" );
871 lp->args = 2;
872
873 /* schedule the process for execution */
874 if ( sched_queue_proc( lp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) {
875
876 /* decrease active luaproc count */
877 sched_lpcount_dec();
878
879 /* close lua_State */
880 lua_close( lp->lstate );
881 }
882 }
883
884 /* get exclusive access to operate on channels */
885 pthread_mutex_lock( &mutex_channel );
886 /* destroy channel */
887 channel_destroy( chan, chname );
888 /* broadcast channel not in use */
889 pthread_cond_broadcast( chcond );
890 /* unlock channel access */
891 pthread_mutex_unlock( chmutex );
892 /* destroy channel mutex and conditional */
893 pthread_mutex_destroy( chmutex );
894 pthread_cond_destroy( chcond );
895 /* free memory used by channel mutex and conditional */
896 free( chmutex );
897 free( chcond );
898 /* free access to operate on channels */
899 pthread_mutex_unlock( &mutex_channel );
900
901 lua_pushboolean( L, TRUE );
902
903 return 1;
904}
905
906/* register luaproc's functions in a lua_State */
907void luaproc_register_funcs( lua_State *L ) {
908 luaL_register( L, "luaproc", luaproc_funcs_child );
909}
910
911/* return the channel where the corresponding luaproc is blocked at */
912channel luaproc_get_channel( luaproc lp ) {
913 return lp->chan;
914}
915
916/* unlock access to a channel */
917void luaproc_unlock_channel( channel chan ) {
918 /* get exclusive access to operate on channels */
919 pthread_mutex_lock( &mutex_channel );
920 /* unlock channel access */
921 pthread_mutex_unlock( channel_get_mutex( chan ));
922 /* signal channel not in use */
923 pthread_cond_signal( channel_get_cond( chan ));
924 /* free access to operate on channels */
925 pthread_mutex_unlock( &mutex_channel );
926}
927
928/* return status (boolean) indicating if worker thread should be destroyed after luaproc execution */
929int luaproc_get_destroyworker( luaproc lp ) {
930 return lp->destroyworker;
931}