diff options
Diffstat (limited to 'LuaSL/src')
-rw-r--r-- | LuaSL/src/LuaSL_threads.c | 555 | ||||
-rw-r--r-- | LuaSL/src/LuaSL_threads.h | 2 |
2 files changed, 160 insertions, 397 deletions
diff --git a/LuaSL/src/LuaSL_threads.c b/LuaSL/src/LuaSL_threads.c index a8a1681..399d077 100644 --- a/LuaSL/src/LuaSL_threads.c +++ b/LuaSL/src/LuaSL_threads.c | |||
@@ -70,8 +70,6 @@ THE SOFTWARE. | |||
70 | #define LUAPROC_STAT_BLOCKED_SEND 2 | 70 | #define LUAPROC_STAT_BLOCKED_SEND 2 |
71 | /* process is blocked on receive */ | 71 | /* process is blocked on receive */ |
72 | #define LUAPROC_STAT_BLOCKED_RECV 3 | 72 | #define LUAPROC_STAT_BLOCKED_RECV 3 |
73 | /* process is finished */ | ||
74 | #define LUAPROC_STAT_FINISHED 4 | ||
75 | 73 | ||
76 | 74 | ||
77 | /* message channel */ | 75 | /* message channel */ |
@@ -86,7 +84,7 @@ struct stchannel { | |||
86 | struct stluaproc { | 84 | struct stluaproc { |
87 | Eina_Clist node; | 85 | Eina_Clist node; |
88 | lua_State *lstate; | 86 | lua_State *lstate; |
89 | int stat; | 87 | int status; |
90 | int args; | 88 | int args; |
91 | channel chan; | 89 | channel chan; |
92 | void *data; | 90 | void *data; |
@@ -100,8 +98,8 @@ struct stluaproc { | |||
100 | /* global channel lua_State mutex */ | 98 | /* global channel lua_State mutex */ |
101 | pthread_mutex_t mutex_channel_lstate = PTHREAD_MUTEX_INITIALIZER; | 99 | pthread_mutex_t mutex_channel_lstate = PTHREAD_MUTEX_INITIALIZER; |
102 | 100 | ||
103 | /* global lua_State where channel hash table will be stored */ | 101 | /* global where channels will be stored */ |
104 | lua_State *chanls = NULL; | 102 | Eina_Hash *channels; |
105 | 103 | ||
106 | /* ready process list */ | 104 | /* ready process list */ |
107 | Eina_Clist lpready; | 105 | Eina_Clist lpready; |
@@ -133,9 +131,6 @@ pthread_mutex_t mutex_recycle_list = PTHREAD_MUTEX_INITIALIZER; | |||
133 | /* recycled lua process list */ | 131 | /* recycled lua process list */ |
134 | Eina_Clist recyclelp; | 132 | Eina_Clist recyclelp; |
135 | 133 | ||
136 | /* maximum lua processes to recycle */ | ||
137 | int recyclemax = 0; | ||
138 | |||
139 | 134 | ||
140 | /****************************** | 135 | /****************************** |
141 | * library functions prototypes | 136 | * library functions prototypes |
@@ -165,241 +160,70 @@ static const struct luaL_reg luaproc_funcs_child[] = { | |||
165 | }; | 160 | }; |
166 | 161 | ||
167 | 162 | ||
168 | |||
169 | |||
170 | /* initialize channel table */ | ||
171 | static void channel_init( void ) { | ||
172 | chanls = luaL_newstate(); | ||
173 | lua_newtable( chanls ); | ||
174 | lua_setglobal( chanls, "channeltb" ); | ||
175 | } | ||
176 | |||
177 | /* create new channel */ | ||
178 | static channel channel_create( const char *cname ) { | ||
179 | |||
180 | channel chan; | ||
181 | |||
182 | /* get exclusive access to the channel table */ | ||
183 | pthread_mutex_lock( &mutex_channel_lstate ); | ||
184 | |||
185 | /* create a new channel */ | ||
186 | lua_getglobal( chanls, "channeltb"); | ||
187 | lua_pushstring( chanls, cname ); | ||
188 | chan = (channel )lua_newuserdata( chanls, sizeof( struct stchannel )); | ||
189 | eina_clist_init(&(chan->send)); | ||
190 | eina_clist_init(&(chan->recv)); | ||
191 | chan->mutex = (pthread_mutex_t *)malloc( sizeof( pthread_mutex_t )); | ||
192 | pthread_mutex_init( chan->mutex, NULL ); | ||
193 | chan->in_use = (pthread_cond_t *)malloc( sizeof( pthread_cond_t )); | ||
194 | pthread_cond_init( chan->in_use, NULL ); | ||
195 | lua_settable( chanls, -3 ); | ||
196 | lua_pop( chanls, 1 ); | ||
197 | |||
198 | /* let others access the channel table */ | ||
199 | pthread_mutex_unlock( &mutex_channel_lstate ); | ||
200 | |||
201 | return chan; | ||
202 | } | ||
203 | |||
204 | /* search for and return a channel with a given name */ | ||
205 | static channel channel_search( const char *cname ) { | ||
206 | |||
207 | channel chan; | ||
208 | |||
209 | /* get exclusive access to the channel table */ | ||
210 | pthread_mutex_lock( &mutex_channel_lstate ); | ||
211 | |||
212 | /* search for channel */ | ||
213 | lua_getglobal( chanls, "channeltb"); | ||
214 | lua_getfield( chanls, -1, cname ); | ||
215 | if (( lua_type( chanls, -1 )) == LUA_TUSERDATA ) { | ||
216 | chan = (channel )lua_touserdata( chanls, -1 ); | ||
217 | } else { | ||
218 | chan = NULL; | ||
219 | } | ||
220 | lua_pop( chanls, 2 ); | ||
221 | |||
222 | /* let others access channel table */ | ||
223 | pthread_mutex_unlock( &mutex_channel_lstate ); | ||
224 | |||
225 | return chan; | ||
226 | } | ||
227 | |||
228 | /* return a channel's send queue */ | ||
229 | static Eina_Clist *channel_get_sendq( channel chan ) { | ||
230 | return &chan->send; | ||
231 | } | ||
232 | |||
233 | /* return a channel's receive queue */ | ||
234 | static Eina_Clist *channel_get_recvq( channel chan ) { | ||
235 | return &chan->recv; | ||
236 | } | ||
237 | |||
238 | /* return a channel's mutex */ | ||
239 | static pthread_mutex_t *channel_get_mutex( channel chan ) { | ||
240 | return chan->mutex; | ||
241 | } | ||
242 | |||
243 | /* return a channel's conditional variable */ | ||
244 | static pthread_cond_t *channel_get_cond( channel chan ) { | ||
245 | return chan->in_use; | ||
246 | } | ||
247 | |||
248 | |||
249 | |||
250 | /* return status (boolean) indicating if lua process should be recycled */ | ||
251 | static luaproc luaproc_recycle_pop( void ) { | ||
252 | |||
253 | luaproc lp; | ||
254 | |||
255 | /* get exclusive access to operate on recycle list */ | ||
256 | pthread_mutex_lock( &mutex_recycle_list ); | ||
257 | |||
258 | /* check if there are any lua processes on recycle list */ | ||
259 | if ( eina_clist_count( &recyclelp ) > 0 ) { | ||
260 | /* pop list head */ | ||
261 | if ((lp = (luaproc) eina_clist_head(&recyclelp))) | ||
262 | eina_clist_remove(&(lp->node)); | ||
263 | /* free access to operate on recycle list */ | ||
264 | pthread_mutex_unlock( &mutex_recycle_list ); | ||
265 | /* return associated luaproc */ | ||
266 | return lp; | ||
267 | } | ||
268 | |||
269 | /* free access to operate on recycle list */ | ||
270 | pthread_mutex_unlock( &mutex_recycle_list ); | ||
271 | |||
272 | /* if no lua processes are available simply return null */ | ||
273 | return NULL; | ||
274 | } | ||
275 | |||
276 | /* check if lua process should be recycled and, in case so, add it to the recycle list */ | ||
277 | static int luaproc_recycle_push( luaproc lp ) { | ||
278 | |||
279 | /* get exclusive access to operate on recycle list */ | ||
280 | pthread_mutex_lock( &mutex_recycle_list ); | ||
281 | |||
282 | /* check if amount of lua processes currently on recycle list is greater than | ||
283 | or equal to the maximum amount of lua processes that should be recycled */ | ||
284 | if ( eina_clist_count( &recyclelp ) >= recyclemax ) { | ||
285 | /* free access to operate on recycle list */ | ||
286 | pthread_mutex_unlock( &mutex_recycle_list ); | ||
287 | /* if so, lua process should NOT be recycled and should be destroyed */ | ||
288 | return FALSE; | ||
289 | } | ||
290 | /* otherwise, lua process should be added to recycle list */ | ||
291 | eina_clist_add_tail( &recyclelp, &(lp->node) ); | ||
292 | /* free access to operate on recycle list */ | ||
293 | pthread_mutex_unlock( &mutex_recycle_list ); | ||
294 | /* since lua process will be recycled, it should not be destroyed */ | ||
295 | return TRUE; | ||
296 | } | ||
297 | |||
298 | /* queue a lua process sending a message without a matching receiver */ | 163 | /* queue a lua process sending a message without a matching receiver */ |
299 | static void luaproc_queue_sender( luaproc lp ) { | 164 | static void luaproc_queue_sender(luaproc lp) |
300 | /* add the sending process to this process' send queue */ | 165 | { |
301 | eina_clist_add_tail( channel_get_sendq( lp->chan ), &(lp->node)); | 166 | eina_clist_add_tail(&(lp->chan->send), &(lp->node)); |
302 | } | 167 | } |
303 | 168 | ||
304 | /* dequeue a lua process sending a message with a receiver match */ | 169 | /* dequeue a lua process sending a message with a receiver match */ |
305 | static luaproc luaproc_dequeue_sender( channel chan ) { | 170 | static luaproc luaproc_dequeue_sender(channel chan) |
306 | 171 | { | |
307 | luaproc lp; | 172 | luaproc lp; |
308 | 173 | ||
309 | if ( eina_clist_count( channel_get_sendq( chan )) > 0 ) { | 174 | if ((lp = (luaproc) eina_clist_head(&(chan->send)))) |
310 | /* get first node from channel's send queue */ | 175 | eina_clist_remove(&(lp->node)); |
311 | if ((lp = (luaproc) eina_clist_head(channel_get_sendq( chan )))) | ||
312 | eina_clist_remove(&(lp->node)); | ||
313 | /* return associated luaproc */ | ||
314 | return lp; | ||
315 | } | ||
316 | 176 | ||
317 | return NULL; | 177 | return lp; |
318 | } | 178 | } |
319 | 179 | ||
320 | /* queue a luc process receiving a message without a matching sender */ | 180 | /* queue a luc process receiving a message without a matching sender */ |
321 | static void luaproc_queue_receiver( luaproc lp ) { | 181 | static void luaproc_queue_receiver(luaproc lp) |
322 | /* add the receiving process to this process' receive queue */ | 182 | { |
323 | eina_clist_add_tail( channel_get_recvq( lp->chan ), &(lp->node)); | 183 | eina_clist_add_tail(&(lp->chan->recv), &(lp->node)); |
324 | } | 184 | } |
325 | 185 | ||
326 | /* dequeue a lua process receiving a message with a sender match */ | 186 | /* dequeue a lua process receiving a message with a sender match */ |
327 | static luaproc luaproc_dequeue_receiver( channel chan ) { | 187 | static luaproc luaproc_dequeue_receiver(channel chan) |
328 | 188 | { | |
329 | luaproc lp; | 189 | luaproc lp; |
330 | |||
331 | if ( eina_clist_count( channel_get_recvq( chan )) > 0 ) { | ||
332 | /* get first node from channel's recv queue */ | ||
333 | if ((lp = (luaproc) eina_clist_head(channel_get_recvq( chan )))) | ||
334 | eina_clist_remove(&(lp->node)); | ||
335 | /* return associated luaproc */ | ||
336 | return lp; | ||
337 | } | ||
338 | |||
339 | return NULL; | ||
340 | } | ||
341 | |||
342 | /* return a process' status */ | ||
343 | static int luaproc_get_status( luaproc lp ) { | ||
344 | return lp->stat; | ||
345 | } | ||
346 | |||
347 | /* set a process' status */ | ||
348 | static void luaproc_set_status( luaproc lp, int status ) { | ||
349 | lp->stat = status; | ||
350 | } | ||
351 | |||
352 | /* return a process' state */ | ||
353 | static lua_State *luaproc_get_state( luaproc lp ) { | ||
354 | return lp->lstate; | ||
355 | } | ||
356 | |||
357 | /* return the number of arguments expected by a given process */ | ||
358 | static int luaproc_get_args( luaproc lp ) { | ||
359 | return lp->args; | ||
360 | } | ||
361 | |||
362 | /* set the number of arguments expected by a given process */ | ||
363 | static void luaproc_set_args( luaproc lp, int n ) { | ||
364 | lp->args = n; | ||
365 | } | ||
366 | 190 | ||
191 | if ((lp = (luaproc) eina_clist_head(&(chan->recv)))) | ||
192 | eina_clist_remove(&(lp->node)); | ||
367 | 193 | ||
368 | /* return the channel where the corresponding luaproc is blocked at */ | 194 | return lp; |
369 | static channel luaproc_get_channel( luaproc lp ) { | ||
370 | return lp->chan; | ||
371 | } | 195 | } |
372 | 196 | ||
373 | /* unlock access to a channel */ | 197 | /* unlock access to a channel */ |
374 | static void luaproc_unlock_channel( channel chan ) { | 198 | static void luaproc_unlock_channel(channel chan) |
375 | /* get exclusive access to operate on channels */ | 199 | { |
376 | pthread_mutex_lock( &mutex_channel ); | 200 | /* get exclusive access to operate on channels */ |
377 | /* unlock channel access */ | 201 | pthread_mutex_lock(&mutex_channel); |
378 | pthread_mutex_unlock( channel_get_mutex( chan )); | 202 | /* unlock channel access */ |
379 | /* signal channel not in use */ | 203 | pthread_mutex_unlock(chan->mutex); |
380 | pthread_cond_signal( channel_get_cond( chan )); | 204 | /* signal channel not in use */ |
381 | /* free access to operate on channels */ | 205 | pthread_cond_signal(chan->in_use); |
382 | pthread_mutex_unlock( &mutex_channel ); | 206 | /* free access to operate on channels */ |
207 | pthread_mutex_unlock(&mutex_channel); | ||
383 | } | 208 | } |
384 | 209 | ||
385 | |||
386 | |||
387 | /* increase active lua process count */ | 210 | /* increase active lua process count */ |
388 | static void sched_lpcount_inc( void ) { | 211 | static void sched_lpcount_inc(void) |
389 | pthread_mutex_lock( &mutex_lp_count ); | 212 | { |
390 | lpcount++; | 213 | pthread_mutex_lock(&mutex_lp_count); |
391 | pthread_mutex_unlock( &mutex_lp_count ); | 214 | lpcount++; |
215 | pthread_mutex_unlock(&mutex_lp_count); | ||
392 | } | 216 | } |
393 | 217 | ||
394 | /* decrease active lua process count */ | 218 | /* decrease active lua process count */ |
395 | static void sched_lpcount_dec( void ) { | 219 | static void sched_lpcount_dec(void) |
396 | pthread_mutex_lock( &mutex_lp_count ); | 220 | { |
397 | lpcount--; | 221 | pthread_mutex_lock(&mutex_lp_count); |
398 | /* if count reaches zero, signal there are no more active processes */ | 222 | lpcount--; |
399 | if ( lpcount == 0 ) { | 223 | /* if count reaches zero, signal there are no more active processes */ |
400 | pthread_cond_signal( &cond_no_active_lp ); | 224 | if (lpcount == 0) |
401 | } | 225 | pthread_cond_signal(&cond_no_active_lp); |
402 | pthread_mutex_unlock( &mutex_lp_count ); | 226 | pthread_mutex_unlock(&mutex_lp_count); |
403 | } | 227 | } |
404 | 228 | ||
405 | /* worker thread main function */ | 229 | /* worker thread main function */ |
@@ -436,23 +260,17 @@ static void *workermain( void *args ) { | |||
436 | pthread_mutex_unlock( &mutex_queue_access ); | 260 | pthread_mutex_unlock( &mutex_queue_access ); |
437 | 261 | ||
438 | /* execute the lua code specified in the lua process struct */ | 262 | /* execute the lua code specified in the lua process struct */ |
439 | procstat = lua_resume( luaproc_get_state( lp ), luaproc_get_args( lp )); | 263 | procstat = lua_resume(lp->lstate, lp->args); |
440 | 264 | ||
441 | /* reset the process argument count */ | 265 | /* reset the process argument count */ |
442 | luaproc_set_args( lp, 0 ); | 266 | lp->args = 0; |
443 | 267 | ||
444 | /* check if process finished its whole execution */ | 268 | /* check if process finished its whole execution, then recycle it */ |
445 | if ( procstat == 0 ) { | 269 | if ( procstat == 0 ) { |
446 | 270 | ||
447 | /* set process status to finished */ | 271 | pthread_mutex_lock(&mutex_recycle_list); |
448 | luaproc_set_status( lp, LUAPROC_STAT_FINISHED ); | 272 | eina_clist_add_tail(&recyclelp, &(lp->node)); |
449 | 273 | pthread_mutex_unlock(&mutex_recycle_list); | |
450 | /* check if lua process should be recycled and, if not, destroy it */ | ||
451 | if ( luaproc_recycle_push( lp ) == FALSE ) { | ||
452 | lua_close( luaproc_get_state( lp )); | ||
453 | } | ||
454 | |||
455 | /* decrease active lua process count */ | ||
456 | sched_lpcount_dec(); | 274 | sched_lpcount_dec(); |
457 | 275 | ||
458 | } | 276 | } |
@@ -461,18 +279,18 @@ static void *workermain( void *args ) { | |||
461 | else if ( procstat == LUA_YIELD ) { | 279 | else if ( procstat == LUA_YIELD ) { |
462 | 280 | ||
463 | /* if so, further check if yield originated from an unmatched send/recv operation */ | 281 | /* if so, further check if yield originated from an unmatched send/recv operation */ |
464 | if ( luaproc_get_status( lp ) == LUAPROC_STAT_BLOCKED_SEND ) { | 282 | if ( lp->status == LUAPROC_STAT_BLOCKED_SEND ) { |
465 | /* queue blocked lua process on corresponding channel */ | 283 | /* queue blocked lua process on corresponding channel */ |
466 | luaproc_queue_sender( lp ); | 284 | luaproc_queue_sender( lp ); |
467 | /* unlock channel access */ | 285 | /* unlock channel access */ |
468 | luaproc_unlock_channel( luaproc_get_channel( lp )); | 286 | luaproc_unlock_channel(lp->chan); |
469 | } | 287 | } |
470 | 288 | ||
471 | else if ( luaproc_get_status( lp ) == LUAPROC_STAT_BLOCKED_RECV ) { | 289 | else if ( lp->status == LUAPROC_STAT_BLOCKED_RECV ) { |
472 | /* queue blocked lua process on corresponding channel */ | 290 | /* queue blocked lua process on corresponding channel */ |
473 | luaproc_queue_receiver( lp ); | 291 | luaproc_queue_receiver( lp ); |
474 | /* unlock channel access */ | 292 | /* unlock channel access */ |
475 | luaproc_unlock_channel( luaproc_get_channel( lp )); | 293 | luaproc_unlock_channel(lp->chan); |
476 | } | 294 | } |
477 | 295 | ||
478 | /* or if yield resulted from an explicit call to coroutine.yield in the lua code being executed */ | 296 | /* or if yield resulted from an explicit call to coroutine.yield in the lua code being executed */ |
@@ -489,42 +307,15 @@ static void *workermain( void *args ) { | |||
489 | /* check if there was any execution error (LUA_ERRRUN, LUA_ERRSYNTAX, LUA_ERRMEM or LUA_ERRERR) */ | 307 | /* check if there was any execution error (LUA_ERRRUN, LUA_ERRSYNTAX, LUA_ERRMEM or LUA_ERRERR) */ |
490 | else { | 308 | else { |
491 | /* print error message */ | 309 | /* print error message */ |
492 | fprintf( stderr, "close lua_State (error: %s)\n", luaL_checkstring( luaproc_get_state( lp ), -1 )); | 310 | fprintf( stderr, "close lua_State (error: %s)\n", luaL_checkstring(lp->lstate, -1 )); |
493 | /* close lua state */ | 311 | /* close lua state */ |
494 | lua_close( luaproc_get_state( lp )); | 312 | lua_close(lp->lstate); |
495 | /* decrease active lua process count */ | 313 | /* decrease active lua process count */ |
496 | sched_lpcount_dec(); | 314 | sched_lpcount_dec(); |
497 | } | 315 | } |
498 | } | 316 | } |
499 | } | 317 | } |
500 | 318 | ||
501 | /* local scheduler initialization */ | ||
502 | static int sched_init_local( int numworkers ) { | ||
503 | |||
504 | int tid; | ||
505 | int workercount = 0; | ||
506 | pthread_t worker; | ||
507 | |||
508 | /* initialize ready process list */ | ||
509 | eina_clist_init(&lpready); | ||
510 | |||
511 | /* initialize channels */ | ||
512 | channel_init(); | ||
513 | |||
514 | /* create initial worker threads */ | ||
515 | for ( tid = 0; tid < numworkers; tid++ ) { | ||
516 | if ( pthread_create( &worker, NULL, workermain, NULL ) == 0 ) { | ||
517 | workercount++; | ||
518 | } | ||
519 | } | ||
520 | |||
521 | if ( workercount != numworkers ) { | ||
522 | return LUAPROC_SCHED_INIT_ERROR; | ||
523 | } | ||
524 | |||
525 | return LUAPROC_SCHED_OK; | ||
526 | } | ||
527 | |||
528 | /* move process to ready queue (ie, schedule process) */ | 319 | /* move process to ready queue (ie, schedule process) */ |
529 | static int sched_queue_proc( luaproc lp ) { | 320 | static int sched_queue_proc( luaproc lp ) { |
530 | 321 | ||
@@ -534,8 +325,7 @@ static int sched_queue_proc( luaproc lp ) { | |||
534 | /* add process to ready queue */ | 325 | /* add process to ready queue */ |
535 | eina_clist_add_tail(&lpready, &(lp->node)); | 326 | eina_clist_add_tail(&lpready, &(lp->node)); |
536 | 327 | ||
537 | /* set process status to ready */ | 328 | lp->status = LUAPROC_STAT_READY; |
538 | luaproc_set_status( lp, LUAPROC_STAT_READY ); | ||
539 | 329 | ||
540 | /* wake worker up */ | 330 | /* wake worker up */ |
541 | pthread_cond_signal( &cond_wakeup_worker ); | 331 | pthread_cond_signal( &cond_wakeup_worker ); |
@@ -584,112 +374,63 @@ int sched_create_worker( void ) { | |||
584 | return LUAPROC_SCHED_OK; | 374 | return LUAPROC_SCHED_OK; |
585 | } | 375 | } |
586 | 376 | ||
587 | static void openlibs( lua_State *L ) { | ||
588 | luaL_openlibs(L); | ||
589 | } | ||
590 | |||
591 | /* create new luaproc */ | ||
592 | static luaproc luaproc_new( const char *code, int destroyflag, int file) { | ||
593 | |||
594 | luaproc lp; | ||
595 | int ret; | ||
596 | /* create new lua state */ | ||
597 | lua_State *lpst = luaL_newstate( ); | ||
598 | /* store the luaproc struct in its own lua state */ | ||
599 | lp = (luaproc )lua_newuserdata( lpst, sizeof( struct stluaproc )); | ||
600 | lua_setfield( lpst, LUA_REGISTRYINDEX, "_SELF" ); | ||
601 | |||
602 | eina_clist_element_init(&(lp->node)); | ||
603 | lp->lstate = lpst; | ||
604 | lp->stat = LUAPROC_STAT_IDLE; | ||
605 | lp->args = 0; | ||
606 | lp->chan = NULL; | ||
607 | |||
608 | /* load standard libraries */ | ||
609 | openlibs( lpst ); | ||
610 | |||
611 | /* register luaproc's own functions */ | ||
612 | luaL_register( lpst, "luaproc", luaproc_funcs_child ); | ||
613 | |||
614 | /* load process' code */ | ||
615 | if (file) | ||
616 | ret = luaL_loadfile( lpst, code ); | ||
617 | else | ||
618 | ret = luaL_loadstring( lpst, code ); | ||
619 | /* in case of errors, destroy recently created lua process */ | ||
620 | if ( ret != 0 ) { | ||
621 | lua_close( lpst ); | ||
622 | return NULL; | ||
623 | } | ||
624 | |||
625 | /* return recently created lua process */ | ||
626 | return lp; | ||
627 | } | ||
628 | |||
629 | /* recycle a lua process */ | ||
630 | static luaproc luaproc_recycle( luaproc lp, const char *code, int file ) { | ||
631 | 377 | ||
632 | int ret; | ||
633 | 378 | ||
634 | /* reset struct members */ | 379 | void newProc(const char *code, int file, script *data) |
635 | lp->stat = LUAPROC_STAT_IDLE; | 380 | { |
636 | lp->args = 0; | 381 | int ret; |
637 | lp->chan = NULL; | 382 | luaproc lp; |
638 | |||
639 | /* load process' code */ | ||
640 | ret = luaL_loadstring( lp->lstate, code ); | ||
641 | 383 | ||
642 | /* in case of errors, destroy lua process */ | 384 | // Try to recycle a Lua state, otherwise create one from scratch. |
643 | if ( ret != 0 ) { | 385 | pthread_mutex_lock(&mutex_recycle_list); |
644 | lua_close( lp->lstate ); | 386 | /* pop list head */ |
645 | return NULL; | 387 | if ((lp = (luaproc) eina_clist_head(&recyclelp))) |
646 | } | 388 | eina_clist_remove(&(lp->node)); |
389 | pthread_mutex_unlock(&mutex_recycle_list); | ||
647 | 390 | ||
648 | /* return recycled lua process */ | 391 | if (lp == NULL) |
649 | return lp; | 392 | { |
650 | } | 393 | lua_State *lpst = luaL_newstate(); |
651 | 394 | ||
652 | int newProc(const char *code, int file, void *data) | 395 | /* store the luaproc struct in its own Lua state */ |
653 | { | 396 | lp = (luaproc) lua_newuserdata(lpst, sizeof(struct stluaproc)); |
654 | /* new lua process pointer */ | 397 | lp->lstate = lpst; |
655 | luaproc lp; | 398 | lua_setfield(lp->lstate, LUA_REGISTRYINDEX, "_SELF"); |
399 | luaL_openlibs(lp->lstate); | ||
400 | luaL_register(lp->lstate, "luaproc", luaproc_funcs_child); | ||
401 | eina_clist_element_init(&(lp->node)); | ||
402 | } | ||
656 | 403 | ||
657 | /* check if existing lua process should be recycled to avoid new creation */ | 404 | lp->status = LUAPROC_STAT_IDLE; |
658 | lp = luaproc_recycle_pop( ); | 405 | lp->args = 0; |
406 | lp->chan = NULL; | ||
659 | 407 | ||
660 | /* if there is a lua process available on the recycle queue, recycle it */ | 408 | /* load process' code */ |
661 | if ( lp != NULL ) { | 409 | if (file) |
662 | lp = luaproc_recycle( lp, code, file ); | 410 | ret = luaL_loadfile(lp->lstate, code); |
663 | } | 411 | else |
664 | /* otherwise create a new one from scratch */ | 412 | ret = luaL_loadstring(lp->lstate, code); |
665 | else { | ||
666 | /* create new lua process with destroy worker flag set to false | ||
667 | (ie, conclusion of lua process will NOT result in worker thread destruction */ | ||
668 | lp = luaproc_new( code, FALSE, file ); | ||
669 | } | ||
670 | 413 | ||
671 | /* ensure process creation was successfull */ | 414 | /* in case of errors, destroy Lua process */ |
672 | if ( lp == NULL ) { | 415 | if (ret != 0) |
673 | return 1; | 416 | { |
674 | } | 417 | lua_close(lp->lstate); |
418 | lp = NULL; | ||
419 | } | ||
675 | 420 | ||
676 | /* Stash any data given to us. */ | 421 | if (lp) |
422 | { | ||
677 | lp->data = data; | 423 | lp->data = data; |
678 | |||
679 | /* increase active luaproc count */ | ||
680 | sched_lpcount_inc(); | 424 | sched_lpcount_inc(); |
681 | 425 | ||
682 | /* schedule luaproc */ | 426 | /* schedule luaproc */ |
683 | if ( sched_queue_proc( lp ) != LUAPROC_SCHED_QUEUE_PROC_OK ) { | 427 | if (sched_queue_proc(lp) != LUAPROC_SCHED_QUEUE_PROC_OK) |
684 | printf( "[luaproc] error queueing Lua process\n" ); | 428 | { |
685 | /* decrease active luaproc count */ | 429 | printf( "[luaproc] error queueing Lua process\n" ); |
686 | sched_lpcount_dec(); | 430 | sched_lpcount_dec(); |
687 | /* close lua_State */ | 431 | lua_close(lp->lstate); |
688 | lua_close( lp->lstate ); | ||
689 | return 2; | ||
690 | } | 432 | } |
691 | 433 | } | |
692 | return 0; | ||
693 | } | 434 | } |
694 | 435 | ||
695 | /* moves values between lua states' stacks */ | 436 | /* moves values between lua states' stacks */ |
@@ -713,6 +454,49 @@ static luaproc luaproc_getself( lua_State *L ) { | |||
713 | return lp; | 454 | return lp; |
714 | } | 455 | } |
715 | 456 | ||
457 | /* create a new channel */ | ||
458 | static int luaproc_create_channel(lua_State *L) | ||
459 | { | ||
460 | const char *name = luaL_checkstring(L, 1); | ||
461 | channel chan; | ||
462 | |||
463 | /* get exclusive access to operate on channels */ | ||
464 | pthread_mutex_lock(&mutex_channel); | ||
465 | |||
466 | /* check if channel exists */ | ||
467 | if (eina_hash_find(channels, name) != NULL) | ||
468 | { | ||
469 | /* free access to operate on channels */ | ||
470 | pthread_mutex_unlock(&mutex_channel); | ||
471 | /* return an error to lua */ | ||
472 | lua_pushnil(L); | ||
473 | lua_pushstring(L, "channel already exists"); | ||
474 | return 2; | ||
475 | } | ||
476 | |||
477 | /* get exclusive access to the channel table */ | ||
478 | pthread_mutex_lock(&mutex_channel_lstate); | ||
479 | |||
480 | /* create a new channel */ | ||
481 | chan = (channel) calloc(1, sizeof(struct stchannel)); | ||
482 | eina_clist_init(&(chan->send)); | ||
483 | eina_clist_init(&(chan->recv)); | ||
484 | chan->mutex = (pthread_mutex_t *) malloc(sizeof(pthread_mutex_t)); | ||
485 | pthread_mutex_init( chan->mutex, NULL ); | ||
486 | chan->in_use = (pthread_cond_t *) malloc(sizeof(pthread_cond_t)); | ||
487 | pthread_cond_init(chan->in_use, NULL); | ||
488 | eina_hash_add(channels, name, chan); | ||
489 | |||
490 | /* let others access the channel table */ | ||
491 | pthread_mutex_unlock(&mutex_channel_lstate); | ||
492 | |||
493 | /* free access to operate on channels */ | ||
494 | pthread_mutex_unlock(&mutex_channel); | ||
495 | |||
496 | lua_pushboolean(L, TRUE); | ||
497 | return 1; | ||
498 | } | ||
499 | |||
716 | /* send a message to a lua process */ | 500 | /* send a message to a lua process */ |
717 | static int luaproc_send_back( lua_State *L ) { | 501 | static int luaproc_send_back( lua_State *L ) { |
718 | 502 | ||
@@ -755,9 +539,9 @@ const char *sendToChannel(const char *chname, const char *message, luaproc *dst, | |||
755 | pthread_mutex_lock(&mutex_channel); | 539 | pthread_mutex_lock(&mutex_channel); |
756 | 540 | ||
757 | /* wait until channel is not in use */ | 541 | /* wait until channel is not in use */ |
758 | while( ((chan = channel_search(chname)) != NULL) && (pthread_mutex_trylock(channel_get_mutex(chan)) != 0 )) | 542 | while( ((chan = eina_hash_find(channels, chname)) != NULL) && (pthread_mutex_trylock(chan->mutex) != 0 )) |
759 | { | 543 | { |
760 | pthread_cond_wait(channel_get_cond(chan), &mutex_channel); | 544 | pthread_cond_wait(chan->in_use, &mutex_channel); |
761 | } | 545 | } |
762 | 546 | ||
763 | /* free access to operate on channels */ | 547 | /* free access to operate on channels */ |
@@ -822,7 +606,7 @@ static int luaproc_send( lua_State *L ) { | |||
822 | self = luaproc_getself( L ); | 606 | self = luaproc_getself( L ); |
823 | 607 | ||
824 | if ( self != NULL ) { | 608 | if ( self != NULL ) { |
825 | self->stat = LUAPROC_STAT_BLOCKED_SEND; | 609 | self->status = LUAPROC_STAT_BLOCKED_SEND; |
826 | self->chan = chan; | 610 | self->chan = chan; |
827 | } | 611 | } |
828 | 612 | ||
@@ -845,8 +629,8 @@ static int luaproc_receive( lua_State *L ) { | |||
845 | pthread_mutex_lock( &mutex_channel ); | 629 | pthread_mutex_lock( &mutex_channel ); |
846 | 630 | ||
847 | /* wait until channel is not in use */ | 631 | /* wait until channel is not in use */ |
848 | while((( chan = channel_search( chname )) != NULL ) && ( pthread_mutex_trylock( channel_get_mutex( chan )) != 0 )) { | 632 | while((( chan = eina_hash_find(channels, chname)) != NULL ) && ( pthread_mutex_trylock(chan->mutex) != 0 )) { |
849 | pthread_cond_wait( channel_get_cond( chan ), &mutex_channel ); | 633 | pthread_cond_wait(chan->in_use, &mutex_channel ); |
850 | } | 634 | } |
851 | 635 | ||
852 | /* free access to operate on channels */ | 636 | /* free access to operate on channels */ |
@@ -911,7 +695,7 @@ static int luaproc_receive( lua_State *L ) { | |||
911 | self = luaproc_getself( L ); | 695 | self = luaproc_getself( L ); |
912 | 696 | ||
913 | if ( self != NULL ) { | 697 | if ( self != NULL ) { |
914 | self->stat = LUAPROC_STAT_BLOCKED_RECV; | 698 | self->status = LUAPROC_STAT_BLOCKED_RECV; |
915 | self->chan = chan; | 699 | self->chan = chan; |
916 | } | 700 | } |
917 | 701 | ||
@@ -923,38 +707,17 @@ static int luaproc_receive( lua_State *L ) { | |||
923 | 707 | ||
924 | void luaprocInit(void) | 708 | void luaprocInit(void) |
925 | { | 709 | { |
926 | /* initialize recycle list */ | 710 | eina_clist_init(&recyclelp); |
927 | eina_clist_init(&recyclelp); | ||
928 | |||
929 | /* initialize local scheduler */ | ||
930 | sched_init_local( LUAPROC_SCHED_DEFAULT_WORKER_THREADS ); | ||
931 | } | ||
932 | |||
933 | /* create a new channel */ | ||
934 | static int luaproc_create_channel( lua_State *L ) { | ||
935 | |||
936 | const char *chname = luaL_checkstring( L, 1 ); | ||
937 | |||
938 | /* get exclusive access to operate on channels */ | ||
939 | pthread_mutex_lock( &mutex_channel ); | ||
940 | |||
941 | /* check if channel exists */ | ||
942 | if ( channel_search( chname ) != NULL ) { | ||
943 | /* free access to operate on channels */ | ||
944 | pthread_mutex_unlock( &mutex_channel ); | ||
945 | /* return an error to lua */ | ||
946 | lua_pushnil( L ); | ||
947 | lua_pushstring( L, "channel already exists" ); | ||
948 | return 2; | ||
949 | } | ||
950 | 711 | ||
951 | channel_create( chname ); | 712 | int tid; |
713 | pthread_t worker; | ||
952 | 714 | ||
953 | /* free access to operate on channels */ | 715 | eina_clist_init(&lpready); |
954 | pthread_mutex_unlock( &mutex_channel ); | 716 | channels = eina_hash_string_superfast_new(NULL); |
955 | |||
956 | lua_pushboolean( L, TRUE ); | ||
957 | |||
958 | return 1; | ||
959 | 717 | ||
718 | /* create initial worker threads */ | ||
719 | for (tid = 0; tid < LUAPROC_SCHED_DEFAULT_WORKER_THREADS; tid++) | ||
720 | { | ||
721 | pthread_create( &worker, NULL, workermain, NULL); | ||
722 | } | ||
960 | } | 723 | } |
diff --git a/LuaSL/src/LuaSL_threads.h b/LuaSL/src/LuaSL_threads.h index 9fc38ea..47f5a69 100644 --- a/LuaSL/src/LuaSL_threads.h +++ b/LuaSL/src/LuaSL_threads.h | |||
@@ -55,7 +55,7 @@ void luaprocInit(void); | |||
55 | /* create a new worker pthread */ | 55 | /* create a new worker pthread */ |
56 | int sched_create_worker( void ); | 56 | int sched_create_worker( void ); |
57 | 57 | ||
58 | int newProc(const char *code, int file, void *data); | 58 | void newProc(const char *code, int file, script *data); |
59 | const char *sendToChannel(const char *chname, const char *message, luaproc *dst, channel *chn); | 59 | const char *sendToChannel(const char *chname, const char *message, luaproc *dst, channel *chn); |
60 | 60 | ||
61 | /* join all worker threads and exit */ | 61 | /* join all worker threads and exit */ |