diff --git a/src/work.c b/src/work.c index 22c8e23a..a89b6bb2 100644 --- a/src/work.c +++ b/src/work.c @@ -22,8 +22,6 @@ typedef struct { char* code; /* thread entry code */ size_t len; - uv_async_t async; - int async_cb; /* ref, run in main, call when async message received */ int after_work_cb; /* ref, run in main ,call after work cb*/ int pool_ref; /* ref of lua_State cache array */ } luv_work_ctx_t; @@ -47,7 +45,6 @@ static int luv_work_ctx_gc(lua_State *L) { luv_work_ctx_t* ctx = luv_check_work_ctx(L, 1); free(ctx->code); luaL_unref(L, LUA_REGISTRYINDEX, ctx->after_work_cb); - luaL_unref(L, LUA_REGISTRYINDEX, ctx->async_cb); lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->pool_ref); n = lua_rawlen(L, -1); @@ -98,13 +95,13 @@ static void luv_work_cb(uv_work_t* req) { if (lua_isfunction(L, -1)) { int i = luv_thread_arg_push(L, &work->args, LUVF_THREAD_SIDE_CHILD); i = luv_cfpcall(L, i, LUA_MULTRET, 0); - luv_thread_arg_clear(L, &work->args, LUVF_THREAD_SIDE_CHILD); if ( i>=0 ) { //clear in main threads, luv_after_work_cb i = luv_thread_arg_set(L, &work->rets, top + 1, lua_gettop(L), LUVF_THREAD_SIDE_CHILD); lua_pop(L, i); // pop all returned value luv_thread_arg_clear(L, &work->rets, LUVF_THREAD_SIDE_CHILD); } + luv_thread_arg_clear(L, &work->args, LUVF_THREAD_SIDE_CHILD); } else { fprintf(stderr, "Uncaught Error: %s can't be work entry\n", lua_typename(L, lua_type(L,-1))); @@ -149,7 +146,6 @@ static void async_cb(uv_async_t *handle) { luv_work_ctx_t* ctx = work->ctx; lua_State* L = ctx->L; - lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->async_cb); luv_cfpcall(L, 0, 0, 0); } @@ -172,14 +168,7 @@ static int luv_new_work(lua_State* L) { lua_pushvalue(L, 2); ctx->after_work_cb = luaL_ref(L, LUA_REGISTRYINDEX); - if (lua_gettop(L) == 4) - { - lua_pushvalue(L, 3); - ctx->async_cb = luaL_ref(L, LUA_REGISTRYINDEX); - uv_async_init(luv_loop(L), &ctx->async, async_cb); - } - else - ctx->async_cb = LUA_REFNIL; + ctx->L = L; luaL_getmetatable(L, "luv_work_ctx"); lua_setmetatable(L, -2); diff --git a/tests/test-work.lua b/tests/test-work.lua index b9d1aa79..ec50a97b 100644 --- a/tests/test-work.lua +++ b/tests/test-work.lua @@ -9,7 +9,7 @@ return require('lib/tap')(function (test) local uv = require('luv') local t = uv.thread_self() uv.sleep(10) - return n,n*n, tostring(uv.thread_self()),s + return n, n*n, t, s end, function(n,r,id, s) assert(n*n==r) @@ -48,4 +48,36 @@ return require('lib/tap')(function (test) _uv.queue_work(ctx,-2,ls) _uv.queue_work(ctx,-11,ls) end) + + test("test threadpool with async", function(print,p,expect,_uv) + local ctx, async + async = _uv.new_async(expect(function (a,b,c) + p('in async notify callback') + p(a,b,c) + assert(a=='a') + assert(b==true) + assert(c==250) + end)) + + ctx = _uv.new_work( + function(n, s, a) --work,in threadpool + local uv = require('luv') + local t = tostring(uv.thread_self()) + if a then + assert(uv.async_send(a,'a',true,250)==0) + end + uv.sleep(10) + return n, n*n, t, s + end, + function(n,r,id, s) --after work, in loop thread + p(n, r, id, s) + assert(n*n==r) + if async then + _uv.close(async) + end + print(id, 'finish', s) + end + ) + _uv.queue_work(ctx,2,'hello',async) + end) end)