Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add test pass uv_asyn_t as arg to threadpool #468

Merged
merged 2 commits into from
Mar 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 2 additions & 13 deletions src/work.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ typedef struct {
char* code; /* thread entry code */
size_t len;

uv_async_t async;
int async_cb; /* ref, run in main, call when async message received */
int after_work_cb; /* ref, run in main ,call after work cb*/
int pool_ref; /* ref of lua_State cache array */
} luv_work_ctx_t;
Expand All @@ -47,7 +45,6 @@ static int luv_work_ctx_gc(lua_State *L) {
luv_work_ctx_t* ctx = luv_check_work_ctx(L, 1);
free(ctx->code);
luaL_unref(L, LUA_REGISTRYINDEX, ctx->after_work_cb);
luaL_unref(L, LUA_REGISTRYINDEX, ctx->async_cb);

lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->pool_ref);
n = lua_rawlen(L, -1);
Expand Down Expand Up @@ -98,13 +95,13 @@ static void luv_work_cb(uv_work_t* req) {
if (lua_isfunction(L, -1)) {
int i = luv_thread_arg_push(L, &work->args, LUVF_THREAD_SIDE_CHILD);
i = luv_cfpcall(L, i, LUA_MULTRET, 0);
luv_thread_arg_clear(L, &work->args, LUVF_THREAD_SIDE_CHILD);
if ( i>=0 ) {
//clear in main threads, luv_after_work_cb
i = luv_thread_arg_set(L, &work->rets, top + 1, lua_gettop(L), LUVF_THREAD_SIDE_CHILD);
lua_pop(L, i); // pop all returned value
luv_thread_arg_clear(L, &work->rets, LUVF_THREAD_SIDE_CHILD);
}
luv_thread_arg_clear(L, &work->args, LUVF_THREAD_SIDE_CHILD);
} else {
fprintf(stderr, "Uncaught Error: %s can't be work entry\n",
lua_typename(L, lua_type(L,-1)));
Expand Down Expand Up @@ -149,7 +146,6 @@ static void async_cb(uv_async_t *handle) {
luv_work_ctx_t* ctx = work->ctx;
lua_State* L = ctx->L;

lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->async_cb);
luv_cfpcall(L, 0, 0, 0);
}

Expand All @@ -172,14 +168,7 @@ static int luv_new_work(lua_State* L) {

lua_pushvalue(L, 2);
ctx->after_work_cb = luaL_ref(L, LUA_REGISTRYINDEX);
if (lua_gettop(L) == 4)
{
lua_pushvalue(L, 3);
ctx->async_cb = luaL_ref(L, LUA_REGISTRYINDEX);
uv_async_init(luv_loop(L), &ctx->async, async_cb);
}
else
ctx->async_cb = LUA_REFNIL;

ctx->L = L;
luaL_getmetatable(L, "luv_work_ctx");
lua_setmetatable(L, -2);
Expand Down
34 changes: 33 additions & 1 deletion tests/test-work.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ return require('lib/tap')(function (test)
local uv = require('luv')
local t = uv.thread_self()
uv.sleep(10)
return n,n*n, tostring(uv.thread_self()),s
return n, n*n, t, s
end,
function(n,r,id, s)
assert(n*n==r)
Expand Down Expand Up @@ -48,4 +48,36 @@ return require('lib/tap')(function (test)
_uv.queue_work(ctx,-2,ls)
_uv.queue_work(ctx,-11,ls)
end)

test("test threadpool with async", function(print,p,expect,_uv)
local ctx, async
async = _uv.new_async(expect(function (a,b,c)
p('in async notify callback')
p(a,b,c)
assert(a=='a')
assert(b==true)
assert(c==250)
end))

ctx = _uv.new_work(
function(n, s, a) --work,in threadpool
local uv = require('luv')
local t = tostring(uv.thread_self())
if a then
assert(uv.async_send(a,'a',true,250)==0)
end
uv.sleep(10)
return n, n*n, t, s
end,
function(n,r,id, s) --after work, in loop thread
p(n, r, id, s)
assert(n*n==r)
if async then
_uv.close(async)
end
print(id, 'finish', s)
end
)
_uv.queue_work(ctx,2,'hello',async)
end)
end)