aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/LuaSL/src
diff options
context:
space:
mode:
Diffstat (limited to 'LuaSL/src')
-rw-r--r--LuaSL/src/LuaSL_threads.c555
-rw-r--r--LuaSL/src/LuaSL_threads.h2
2 files changed, 160 insertions, 397 deletions
diff --git a/LuaSL/src/LuaSL_threads.c b/LuaSL/src/LuaSL_threads.c
index a8a1681..399d077 100644
--- a/LuaSL/src/LuaSL_threads.c
+++ b/LuaSL/src/LuaSL_threads.c
@@ -70,8 +70,6 @@ THE SOFTWARE.
70#define LUAPROC_STAT_BLOCKED_SEND 2 70#define LUAPROC_STAT_BLOCKED_SEND 2
71/* process is blocked on receive */ 71/* process is blocked on receive */
72#define LUAPROC_STAT_BLOCKED_RECV 3 72#define LUAPROC_STAT_BLOCKED_RECV 3
73/* process is finished */
74#define LUAPROC_STAT_FINISHED 4
75 73
76 74
77/* message channel */ 75/* message channel */
@@ -86,7 +84,7 @@ struct stchannel {
86struct stluaproc { 84struct stluaproc {
87 Eina_Clist node; 85 Eina_Clist node;
88 lua_State *lstate; 86 lua_State *lstate;
89 int stat; 87 int status;
90 int args; 88 int args;
91 channel chan; 89 channel chan;
92 void *data; 90 void *data;
@@ -100,8 +98,8 @@ struct stluaproc {
100/* global channel lua_State mutex */ 98/* global channel lua_State mutex */
101pthread_mutex_t mutex_channel_lstate = PTHREAD_MUTEX_INITIALIZER; 99pthread_mutex_t mutex_channel_lstate = PTHREAD_MUTEX_INITIALIZER;
102 100
103/* global lua_State where channel hash table will be stored */ 101/* global where channels will be stored */
104lua_State *chanls = NULL; 102Eina_Hash *channels;
105 103
106/* ready process list */ 104/* ready process list */
107Eina_Clist lpready; 105Eina_Clist lpready;
@@ -133,9 +131,6 @@ pthread_mutex_t mutex_recycle_list = PTHREAD_MUTEX_INITIALIZER;
133/* recycled lua process list */ 131/* recycled lua process list */
134Eina_Clist recyclelp; 132Eina_Clist recyclelp;
135 133
136/* maximum lua processes to recycle */
137int recyclemax = 0;
138
139 134
140/****************************** 135/******************************
141* library functions prototypes 136* library functions prototypes
@@ -165,241 +160,70 @@ static const struct luaL_reg luaproc_funcs_child[] = {
165}; 160};
166 161
167 162
168
169
170/* initialize channel table */
171static void channel_init( void ) {
172 chanls = luaL_newstate();
173 lua_newtable( chanls );
174 lua_setglobal( chanls, "channeltb" );
175}
176
177/* create new channel */
178static channel channel_create( const char *cname ) {
179
180 channel chan;
181
182 /* get exclusive access to the channel table */
183 pthread_mutex_lock( &mutex_channel_lstate );
184
185 /* create a new channel */
186 lua_getglobal( chanls, "channeltb");
187 lua_pushstring( chanls, cname );
188 chan = (channel )lua_newuserdata( chanls, sizeof( struct stchannel ));
189 eina_clist_init(&(chan->send));
190 eina_clist_init(&(chan->recv));
191 chan->mutex = (pthread_mutex_t *)malloc( sizeof( pthread_mutex_t ));
192 pthread_mutex_init( chan->mutex, NULL );
193 chan->in_use = (pthread_cond_t *)malloc( sizeof( pthread_cond_t ));
194 pthread_cond_init( chan->in_use, NULL );
195 lua_settable( chanls, -3 );
196 lua_pop( chanls, 1 );
197
198 /* let others access the channel table */
199 pthread_mutex_unlock( &mutex_channel_lstate );
200
201 return chan;
202}
203
204/* search for and return a channel with a given name */
205static channel channel_search( const char *cname ) {
206
207 channel chan;
208
209 /* get exclusive access to the channel table */
210 pthread_mutex_lock( &mutex_channel_lstate );
211
212 /* search for channel */
213 lua_getglobal( chanls, "channeltb");
214 lua_getfield( chanls, -1, cname );
215 if (( lua_type( chanls, -1 )) == LUA_TUSERDATA ) {
216 chan = (channel )lua_touserdata( chanls, -1 );
217 } else {
218 chan = NULL;
219 }
220 lua_pop( chanls, 2 );
221
222 /* let others access channel table */
223 pthread_mutex_unlock( &mutex_channel_lstate );
224
225 return chan;
226}
227
228/* return a channel's send queue */
229static Eina_Clist *channel_get_sendq( channel chan ) {
230 return &chan->send;
231}
232
233/* return a channel's receive queue */
234static Eina_Clist *channel_get_recvq( channel chan ) {
235 return &chan->recv;
236}
237
238/* return a channel's mutex */
239static pthread_mutex_t *channel_get_mutex( channel chan ) {
240 return chan->mutex;
241}
242
243/* return a channel's conditional variable */
244static pthread_cond_t *channel_get_cond( channel chan ) {
245 return chan->in_use;
246}
247
248
249
250/* return status (boolean) indicating if lua process should be recycled */
251static luaproc luaproc_recycle_pop( void ) {
252
253 luaproc lp;
254
255 /* get exclusive access to operate on recycle list */
256 pthread_mutex_lock( &mutex_recycle_list );
257
258 /* check if there are any lua processes on recycle list */
259 if ( eina_clist_count( &recyclelp ) > 0 ) {
260 /* pop list head */
261 if ((lp = (luaproc) eina_clist_head(&recyclelp)))
262 eina_clist_remove(&(lp->node));
263 /* free access to operate on recycle list */
264 pthread_mutex_unlock( &mutex_recycle_list );
265 /* return associated luaproc */
266 return lp;
267 }
268
269 /* free access to operate on recycle list */
270 pthread_mutex_unlock( &mutex_recycle_list );
271
272 /* if no lua processes are available simply return null */
273 return NULL;
274}
275
276/* check if lua process should be recycled and, in case so, add it to the recycle list */
277static int luaproc_recycle_push( luaproc lp ) {
278
279 /* get exclusive access to operate on recycle list */
280 pthread_mutex_lock( &mutex_recycle_list );
281
282 /* check if amount of lua processes currently on recycle list is greater than
283 or equal to the maximum amount of lua processes that should be recycled */
284 if ( eina_clist_count( &recyclelp ) >= recyclemax ) {
285 /* free access to operate on recycle list */
286 pthread_mutex_unlock( &mutex_recycle_list );
287 /* if so, lua process should NOT be recycled and should be destroyed */
288 return FALSE;
289 }
290 /* otherwise, lua process should be added to recycle list */
291 eina_clist_add_tail( &recyclelp, &(lp->node) );
292 /* free access to operate on recycle list */
293 pthread_mutex_unlock( &mutex_recycle_list );
294 /* since lua process will be recycled, it should not be destroyed */
295 return TRUE;
296}
297
298/* queue a lua process sending a message without a matching receiver */ 163/* queue a lua process sending a message without a matching receiver */
299static void luaproc_queue_sender( luaproc lp ) { 164static void luaproc_queue_sender(luaproc lp)
300 /* add the sending process to this process' send queue */ 165{
301 eina_clist_add_tail( channel_get_sendq( lp->chan ), &(lp->node)); 166 eina_clist_add_tail(&(lp->chan->send), &(lp->node));
302} 167}
303 168
304/* dequeue a lua process sending a message with a receiver match */ 169/* dequeue a lua process sending a message with a receiver match */
305static luaproc luaproc_dequeue_sender( channel chan ) { 170static luaproc luaproc_dequeue_sender(channel chan)
306 171{
307 luaproc lp; 172 luaproc lp;
308 173
309 if ( eina_clist_count( channel_get_sendq( chan )) > 0 ) { 174 if ((lp = (luaproc) eina_clist_head(&(chan->send))))
310 /* get first node from channel's send queue */ 175 eina_clist_remove(&(lp->node));
311 if ((lp = (luaproc) eina_clist_head(channel_get_sendq( chan ))))
312 eina_clist_remove(&(lp->node));
313 /* return associated luaproc */
314 return lp;
315 }
316 176
317 return NULL; 177 return lp;
318} 178}
319 179
320/* queue a luc process receiving a message without a matching sender */ 180/* queue a luc process receiving a message without a matching sender */
321static void luaproc_queue_receiver( luaproc lp ) { 181static void luaproc_queue_receiver(luaproc lp)
322 /* add the receiving process to this process' receive queue */ 182{
323 eina_clist_add_tail( channel_get_recvq( lp->chan ), &(lp->node)); 183 eina_clist_add_tail(&(lp->chan->recv), &(lp->node));
324} 184}
325 185
326/* dequeue a lua process receiving a message with a sender match */ 186/* dequeue a lua process receiving a message with a sender match */
327static luaproc luaproc_dequeue_receiver( channel chan ) { 187static luaproc luaproc_dequeue_receiver(channel chan)
328 188{
329 luaproc lp; 189 luaproc lp;
330
331 if ( eina_clist_count( channel_get_recvq( chan )) > 0 ) {
332 /* get first node from channel's recv queue */
333 if ((lp = (luaproc) eina_clist_head(channel_get_recvq( chan ))))
334 eina_clist_remove(&(lp->node));
335 /* return associated luaproc */
336 return lp;
337 }
338
339 return NULL;
340}
341
342/* return a process' status */
343static int luaproc_get_status( luaproc lp ) {
344 return lp->stat;
345}
346
347/* set a process' status */
348static void luaproc_set_status( luaproc lp, int status ) {
349 lp->stat = status;
350}
351
352/* return a process' state */
353static lua_State *luaproc_get_state( luaproc lp ) {
354 return lp->lstate;
355}
356
357/* return the number of arguments expected by a given process */
358static int luaproc_get_args( luaproc lp ) {
359 return lp->args;
360}
361
362/* set the number of arguments expected by a given process */
363static void luaproc_set_args( luaproc lp, int n ) {
364 lp->args = n;
365}
366 190
191 if ((lp = (luaproc) eina_clist_head(&(chan->recv))))
192 eina_clist_remove(&(lp->node));
367 193
368/* return the channel where the corresponding luaproc is blocked at */ 194 return lp;
369static channel luaproc_get_channel( luaproc lp ) {
370 return lp->chan;
371} 195}
372 196
373/* unlock access to a channel */ 197/* unlock access to a channel */
374static void luaproc_unlock_channel( channel chan ) { 198static void luaproc_unlock_channel(channel chan)
375 /* get exclusive access to operate on channels */ 199{
376 pthread_mutex_lock( &mutex_channel ); 200 /* get exclusive access to operate on channels */
377 /* unlock channel access */ 201 pthread_mutex_lock(&mutex_channel);
378 pthread_mutex_unlock( channel_get_mutex( chan )); 202 /* unlock channel access */
379 /* signal channel not in use */ 203 pthread_mutex_unlock(chan->mutex);
380 pthread_cond_signal( channel_get_cond( chan )); 204 /* signal channel not in use */
381 /* free access to operate on channels */ 205 pthread_cond_signal(chan->in_use);
382 pthread_mutex_unlock( &mutex_channel ); 206 /* free access to operate on channels */
207 pthread_mutex_unlock(&mutex_channel);
383} 208}
384 209
385
386
387/* increase active lua process count */ 210/* increase active lua process count */
388static void sched_lpcount_inc( void ) { 211static void sched_lpcount_inc(void)
389 pthread_mutex_lock( &mutex_lp_count ); 212{
390 lpcount++; 213 pthread_mutex_lock(&mutex_lp_count);
391 pthread_mutex_unlock( &mutex_lp_count ); 214 lpcount++;
215 pthread_mutex_unlock(&mutex_lp_count);
392} 216}
393 217
394/* decrease active lua process count */ 218/* decrease active lua process count */
395static void sched_lpcount_dec( void ) { 219static void sched_lpcount_dec(void)
396 pthread_mutex_lock( &mutex_lp_count ); 220{
397 lpcount--; 221 pthread_mutex_lock(&mutex_lp_count);
398 /* if count reaches zero, signal there are no more active processes */ 222 lpcount--;
399 if ( lpcount == 0 ) { 223 /* if count reaches zero, signal there are no more active processes */
400 pthread_cond_signal( &cond_no_active_lp ); 224 if (lpcount == 0)
401 } 225 pthread_cond_signal(&cond_no_active_lp);
402 pthread_mutex_unlock( &mutex_lp_count ); 226 pthread_mutex_unlock(&mutex_lp_count);
403} 227}
404 228
405/* worker thread main function */ 229/* worker thread main function */
@@ -436,23 +260,17 @@ static void *workermain( void *args ) {
436 pthread_mutex_unlock( &mutex_queue_access ); 260 pthread_mutex_unlock( &mutex_queue_access );
437 261
438 /* execute the lua code specified in the lua process struct */ 262 /* execute the lua code specified in the lua process struct */
439 procstat = lua_resume( luaproc_get_state( lp ), luaproc_get_args( lp )); 263 procstat = lua_resume(lp->lstate, lp->args);
440 264
441 /* reset the process argument count */ 265 /* reset the process argument count */
442 luaproc_set_args( lp, 0 ); 266 lp->args = 0;
443 267
444 /* check if process finished its whole execution */ 268 /* check if process finished its whole execution, then recycle it */
445 if ( procstat == 0 ) { 269 if ( procstat == 0 ) {
446 270
447 /* set process status to finished */ 271 pthread_mutex_lock(&mutex_recycle_list);
448 luaproc_set_status( lp, LUAPROC_STAT_FINISHED ); 272 eina_clist_add_tail(&recyclelp, &(lp->node));
449 273 pthread_mutex_unlock(&mutex_recycle_list);
450 /* check if lua process should be recycled and, if not, destroy it */
451 if ( luaproc_recycle_push( lp ) == FALSE ) {
452 lua_close( luaproc_get_state( lp ));
453 }
454
455 /* decrease active lua process count */
456 sched_lpcount_dec(); 274 sched_lpcount_dec();
457 275
458 } 276 }
@@ -461,18 +279,18 @@ static void *workermain( void *args ) {
461 else if ( procstat == LUA_YIELD ) { 279 else if ( procstat == LUA_YIELD ) {
462 280
463 /* if so, further check if yield originated from an unmatched send/recv operation */ 281 /* if so, further check if yield originated from an unmatched send/recv operation */
464 if ( luaproc_get_status( lp ) == LUAPROC_STAT_BLOCKED_SEND ) { 282 if ( lp->status == LUAPROC_STAT_BLOCKED_SEND ) {
465 /* queue blocked lua process on corresponding channel */ 283 /* queue blocked lua process on corresponding channel */
466 luaproc_queue_sender( lp ); 284 luaproc_queue_sender( lp );
467 /* unlock channel access */ 285 /* unlock channel access */
468 luaproc_unlock_channel( luaproc_get_channel( lp )); 286 luaproc_unlock_channel(lp->chan);
469 } 287 }
470 288
471 else if ( luaproc_get_status( lp ) == LUAPROC_STAT_BLOCKED_RECV ) { 289 else if ( lp->status == LUAPROC_STAT_BLOCKED_RECV ) {
472 /* queue blocked lua process on corresponding channel */ 290 /* queue blocked lua process on corresponding channel */
473 luaproc_queue_receiver( lp ); 291 luaproc_queue_receiver( lp );
474 /* unlock channel access */ 292 /* unlock channel access */
475 luaproc_unlock_channel( luaproc_get_channel( lp )); 293 luaproc_unlock_channel(lp->chan);
476 } 294 }
477 295
478 /* or if yield resulted from an explicit call to coroutine.yield in the lua code being executed */ 296 /* or if yield resulted from an explicit call to coroutine.yield in the lua code being executed */
@@ -489,42 +307,15 @@ static void *workermain( void *args ) {
489 /* check if there was any execution error (LUA_ERRRUN, LUA_ERRSYNTAX, LUA_ERRMEM or LUA_ERRERR) */ 307 /* check if there was any execution error (LUA_ERRRUN, LUA_ERRSYNTAX, LUA_ERRMEM or LUA_ERRERR) */
490 else { 308 else {
491 /* print error message */ 309 /* print error message */
492 fprintf( stderr, "close lua_State (error: %s)\n", luaL_checkstring( luaproc_get_state( lp ), -1 )); 310 fprintf( stderr, "close lua_State (error: %s)\n", luaL_checkstring(lp->lstate, -1 ));
493 /* close lua state */ 311 /* close lua state */
494 lua_close( luaproc_get_state( lp )); 312 lua_close(lp->lstate);
495 /* decrease active lua process count */ 313 /* decrease active lua process count */
496 sched_lpcount_dec(); 314 sched_lpcount_dec();
497 } 315 }
498 } 316 }
499} 317}
500 318
501/* local scheduler initialization */
502static int sched_init_local( int numworkers ) {
503
504 int tid;
505 int workercount = 0;
506 pthread_t worker;
507
508 /* initialize ready process list */
509 eina_clist_init(&lpready);
510
511 /* initialize channels */
512 channel_init();
513
514 /* create initial worker threads */
515 for ( tid = 0; tid < numworkers; tid++ ) {
516 if ( pthread_create( &worker, NULL, workermain, NULL ) == 0 ) {
517 workercount++;
518 }
519 }
520
521 if ( workercount != numworkers ) {
522 return LUAPROC_SCHED_INIT_ERROR;
523 }
524
525 return LUAPROC_SCHED_OK;
526}
527
528/* move process to ready queue (ie, schedule process) */ 319/* move process to ready queue (ie, schedule process) */
529static int sched_queue_proc( luaproc lp ) { 320static int sched_queue_proc( luaproc lp ) {
530 321
@@ -534,8 +325,7 @@ static int sched_queue_proc( luaproc lp ) {
534 /* add process to ready queue */ 325 /* add process to ready queue */
535 eina_clist_add_tail(&lpready, &(lp->node)); 326 eina_clist_add_tail(&lpready, &(lp->node));
536 327
537 /* set process status to ready */ 328 lp->status = LUAPROC_STAT_READY;
538 luaproc_set_status( lp, LUAPROC_STAT_READY );
539 329
540 /* wake worker up */ 330 /* wake worker up */
541 pthread_cond_signal( &cond_wakeup_worker ); 331 pthread_cond_signal( &cond_wakeup_worker );
@@ -584,112 +374,63 @@ int sched_create_worker( void ) {
584 return LUAPROC_SCHED_OK; 374 return LUAPROC_SCHED_OK;
585} 375}
586 376
587static void openlibs( lua_State *L ) {
588 luaL_openlibs(L);
589}
590
591/* create new luaproc */
592static luaproc luaproc_new( const char *code, int destroyflag, int file) {
593
594 luaproc lp;
595 int ret;
596 /* create new lua state */
597 lua_State *lpst = luaL_newstate( );
598 /* store the luaproc struct in its own lua state */
599 lp = (luaproc )lua_newuserdata( lpst, sizeof( struct stluaproc ));
600 lua_setfield( lpst, LUA_REGISTRYINDEX, "_SELF" );
601
602 eina_clist_element_init(&(lp->node));
603 lp->lstate = lpst;
604 lp->stat = LUAPROC_STAT_IDLE;
605 lp->args = 0;
606 lp->chan = NULL;
607
608 /* load standard libraries */
609 openlibs( lpst );
610
611 /* register luaproc's own functions */
612 luaL_register( lpst, "luaproc", luaproc_funcs_child );
613
614 /* load process' code */
615 if (file)
616 ret = luaL_loadfile( lpst, code );
617 else
618 ret = luaL_loadstring( lpst, code );
619 /* in case of errors, destroy recently created lua process */
620 if ( ret != 0 ) {
621 lua_close( lpst );
622 return NULL;
623 }
624
625 /* return recently created lua process */
626 return lp;
627}
628
629/* recycle a lua process */
630static luaproc luaproc_recycle( luaproc lp, const char *code, int file ) {
631 377
632 int ret;
633 378
634 /* reset struct members */ 379void newProc(const char *code, int file, script *data)
635 lp->stat = LUAPROC_STAT_IDLE; 380{
636 lp->args = 0; 381 int ret;
637 lp->chan = NULL; 382 luaproc lp;
638
639 /* load process' code */
640 ret = luaL_loadstring( lp->lstate, code );
641 383
642 /* in case of errors, destroy lua process */ 384 // Try to recycle a Lua state, otherwise create one from scratch.
643 if ( ret != 0 ) { 385 pthread_mutex_lock(&mutex_recycle_list);
644 lua_close( lp->lstate ); 386 /* pop list head */
645 return NULL; 387 if ((lp = (luaproc) eina_clist_head(&recyclelp)))
646 } 388 eina_clist_remove(&(lp->node));
389 pthread_mutex_unlock(&mutex_recycle_list);
647 390
648 /* return recycled lua process */ 391 if (lp == NULL)
649 return lp; 392 {
650} 393 lua_State *lpst = luaL_newstate();
651 394
652int newProc(const char *code, int file, void *data) 395 /* store the luaproc struct in its own Lua state */
653{ 396 lp = (luaproc) lua_newuserdata(lpst, sizeof(struct stluaproc));
654 /* new lua process pointer */ 397 lp->lstate = lpst;
655 luaproc lp; 398 lua_setfield(lp->lstate, LUA_REGISTRYINDEX, "_SELF");
399 luaL_openlibs(lp->lstate);
400 luaL_register(lp->lstate, "luaproc", luaproc_funcs_child);
401 eina_clist_element_init(&(lp->node));
402 }
656 403
657 /* check if existing lua process should be recycled to avoid new creation */ 404 lp->status = LUAPROC_STAT_IDLE;
658 lp = luaproc_recycle_pop( ); 405 lp->args = 0;
406 lp->chan = NULL;
659 407
660 /* if there is a lua process available on the recycle queue, recycle it */ 408 /* load process' code */
661 if ( lp != NULL ) { 409 if (file)
662 lp = luaproc_recycle( lp, code, file ); 410 ret = luaL_loadfile(lp->lstate, code);
663 } 411 else
664 /* otherwise create a new one from scratch */ 412 ret = luaL_loadstring(lp->lstate, code);
665 else {
666 /* create new lua process with destroy worker flag set to false
667 (ie, conclusion of lua process will NOT result in worker thread destruction */
668 lp = luaproc_new( code, FALSE, file );
669 }
670 413
671 /* ensure process creation was successfull */ 414 /* in case of errors, destroy Lua process */
672 if ( lp == NULL ) { 415 if (ret != 0)
673 return 1; 416 {
674 } 417 lua_close(lp->lstate);
418 lp = NULL;
419 }
675 420
676 /* Stash any data given to us. */ 421 if (lp)
422 {
677 lp->data = data; 423 lp->data = data;
678
679 /* increase active luaproc count */
680 sched_lpcount_inc(); 424 sched_lpcount_inc();
681 425
682 /* schedule luaproc */ 426 /* schedule luaproc */
683 if ( sched_queue_proc( lp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) { 427 if (sched_queue_proc(lp) != LUAPROC_SCHED_QUEUE_PROC_OK)
684 printf( "[luaproc] error queueing Lua process\n" ); 428 {
685 /* decrease active luaproc count */ 429 printf( "[luaproc] error queueing Lua process\n" );
686 sched_lpcount_dec(); 430 sched_lpcount_dec();
687 /* close lua_State */ 431 lua_close(lp->lstate);
688 lua_close( lp->lstate );
689 return 2;
690 } 432 }
691 433 }
692 return 0;
693} 434}
694 435
695/* moves values between lua states' stacks */ 436/* moves values between lua states' stacks */
@@ -713,6 +454,49 @@ static luaproc luaproc_getself( lua_State *L ) {
713 return lp; 454 return lp;
714} 455}
715 456
457/* create a new channel */
458static int luaproc_create_channel(lua_State *L)
459{
460 const char *name = luaL_checkstring(L, 1);
461 channel chan;
462
463 /* get exclusive access to operate on channels */
464 pthread_mutex_lock(&mutex_channel);
465
466 /* check if channel exists */
467 if (eina_hash_find(channels, name) != NULL)
468 {
469 /* free access to operate on channels */
470 pthread_mutex_unlock(&mutex_channel);
471 /* return an error to lua */
472 lua_pushnil(L);
473 lua_pushstring(L, "channel already exists");
474 return 2;
475 }
476
477 /* get exclusive access to the channel table */
478 pthread_mutex_lock(&mutex_channel_lstate);
479
480 /* create a new channel */
481 chan = (channel) calloc(1, sizeof(struct stchannel));
482 eina_clist_init(&(chan->send));
483 eina_clist_init(&(chan->recv));
484 chan->mutex = (pthread_mutex_t *) malloc(sizeof(pthread_mutex_t));
485 pthread_mutex_init( chan->mutex, NULL );
486 chan->in_use = (pthread_cond_t *) malloc(sizeof(pthread_cond_t));
487 pthread_cond_init(chan->in_use, NULL);
488 eina_hash_add(channels, name, chan);
489
490 /* let others access the channel table */
491 pthread_mutex_unlock(&mutex_channel_lstate);
492
493 /* free access to operate on channels */
494 pthread_mutex_unlock(&mutex_channel);
495
496 lua_pushboolean(L, TRUE);
497 return 1;
498}
499
716/* send a message to a lua process */ 500/* send a message to a lua process */
717static int luaproc_send_back( lua_State *L ) { 501static int luaproc_send_back( lua_State *L ) {
718 502
@@ -755,9 +539,9 @@ const char *sendToChannel(const char *chname, const char *message, luaproc *dst,
755 pthread_mutex_lock(&mutex_channel); 539 pthread_mutex_lock(&mutex_channel);
756 540
757 /* wait until channel is not in use */ 541 /* wait until channel is not in use */
758 while( ((chan = channel_search(chname)) != NULL) && (pthread_mutex_trylock(channel_get_mutex(chan)) != 0 )) 542 while( ((chan = eina_hash_find(channels, chname)) != NULL) && (pthread_mutex_trylock(chan->mutex) != 0 ))
759 { 543 {
760 pthread_cond_wait(channel_get_cond(chan), &mutex_channel); 544 pthread_cond_wait(chan->in_use, &mutex_channel);
761 } 545 }
762 546
763 /* free access to operate on channels */ 547 /* free access to operate on channels */
@@ -822,7 +606,7 @@ static int luaproc_send( lua_State *L ) {
822 self = luaproc_getself( L ); 606 self = luaproc_getself( L );
823 607
824 if ( self != NULL ) { 608 if ( self != NULL ) {
825 self->stat = LUAPROC_STAT_BLOCKED_SEND; 609 self->status = LUAPROC_STAT_BLOCKED_SEND;
826 self->chan = chan; 610 self->chan = chan;
827 } 611 }
828 612
@@ -845,8 +629,8 @@ static int luaproc_receive( lua_State *L ) {
845 pthread_mutex_lock( &mutex_channel ); 629 pthread_mutex_lock( &mutex_channel );
846 630
847 /* wait until channel is not in use */ 631 /* wait until channel is not in use */
848 while((( chan = channel_search( chname )) != NULL ) && ( pthread_mutex_trylock( channel_get_mutex( chan )) != 0 )) { 632 while((( chan = eina_hash_find(channels, chname)) != NULL ) && ( pthread_mutex_trylock(chan->mutex) != 0 )) {
849 pthread_cond_wait( channel_get_cond( chan ), &mutex_channel ); 633 pthread_cond_wait(chan->in_use, &mutex_channel );
850 } 634 }
851 635
852 /* free access to operate on channels */ 636 /* free access to operate on channels */
@@ -911,7 +695,7 @@ static int luaproc_receive( lua_State *L ) {
911 self = luaproc_getself( L ); 695 self = luaproc_getself( L );
912 696
913 if ( self != NULL ) { 697 if ( self != NULL ) {
914 self->stat = LUAPROC_STAT_BLOCKED_RECV; 698 self->status = LUAPROC_STAT_BLOCKED_RECV;
915 self->chan = chan; 699 self->chan = chan;
916 } 700 }
917 701
@@ -923,38 +707,17 @@ static int luaproc_receive( lua_State *L ) {
923 707
924void luaprocInit(void) 708void luaprocInit(void)
925{ 709{
926 /* initialize recycle list */ 710 eina_clist_init(&recyclelp);
927 eina_clist_init(&recyclelp);
928
929 /* initialize local scheduler */
930 sched_init_local( LUAPROC_SCHED_DEFAULT_WORKER_THREADS );
931}
932
933/* create a new channel */
934static int luaproc_create_channel( lua_State *L ) {
935
936 const char *chname = luaL_checkstring( L, 1 );
937
938 /* get exclusive access to operate on channels */
939 pthread_mutex_lock( &mutex_channel );
940
941 /* check if channel exists */
942 if ( channel_search( chname ) != NULL ) {
943 /* free access to operate on channels */
944 pthread_mutex_unlock( &mutex_channel );
945 /* return an error to lua */
946 lua_pushnil( L );
947 lua_pushstring( L, "channel already exists" );
948 return 2;
949 }
950 711
951 channel_create( chname ); 712 int tid;
713 pthread_t worker;
952 714
953 /* free access to operate on channels */ 715 eina_clist_init(&lpready);
954 pthread_mutex_unlock( &mutex_channel ); 716 channels = eina_hash_string_superfast_new(NULL);
955
956 lua_pushboolean( L, TRUE );
957
958 return 1;
959 717
718 /* create initial worker threads */
719 for (tid = 0; tid < LUAPROC_SCHED_DEFAULT_WORKER_THREADS; tid++)
720 {
721 pthread_create( &worker, NULL, workermain, NULL);
722 }
960} 723}
diff --git a/LuaSL/src/LuaSL_threads.h b/LuaSL/src/LuaSL_threads.h
index 9fc38ea..47f5a69 100644
--- a/LuaSL/src/LuaSL_threads.h
+++ b/LuaSL/src/LuaSL_threads.h
@@ -55,7 +55,7 @@ void luaprocInit(void);
55/* create a new worker pthread */ 55/* create a new worker pthread */
56int sched_create_worker( void ); 56int sched_create_worker( void );
57 57
58int newProc(const char *code, int file, void *data); 58void newProc(const char *code, int file, script *data);
59const char *sendToChannel(const char *chname, const char *message, luaproc *dst, channel *chn); 59const char *sendToChannel(const char *chname, const char *message, luaproc *dst, channel *chn);
60 60
61/* join all worker threads and exit */ 61/* join all worker threads and exit */