From 5cb7113a64e39c2d20da6fdb96c6b5f973ae0997 Mon Sep 17 00:00:00 2001 From: Thomas Lively Date: Thu, 16 Feb 2023 16:31:05 -0800 Subject: [PATCH] [Proxying] Add `emscripten_proxy_callback_with_ctx` This new proxying function is like `emscripten_proxy_callback`, but supports proxying of asynchronous work just like `emscripten_proxy_sync_with_ctx`. It uses the existing `em_proxying_ctx` type and `emscripten_proxy_finish` function to mark the work done, and that type and function are internally augmented to handle both synchronous and callback-based proxying. `emscripten_proxy_callback` is reimplemented to share most of its implementation with `emscripten_proxy_callback_with_ctx`, but it does break the abstraction slightly to avoid having to make an extra allocation. --- site/source/docs/api_reference/proxying.h.rst | 41 ++- system/include/emscripten/proxying.h | 97 +++++-- system/lib/pthread/proxying.c | 269 ++++++++++++------ test/pthread/test_pthread_proxying.c | 53 ++++ test/pthread/test_pthread_proxying.out | 4 + .../test_pthread_proxying_canceled_work.c | 93 ++++-- .../test_pthread_proxying_canceled_work.out | 25 ++ test/pthread/test_pthread_proxying_cpp.cpp | 87 ++++-- test/pthread/test_pthread_proxying_cpp.out | 1 + 9 files changed, 505 insertions(+), 165 deletions(-) diff --git a/site/source/docs/api_reference/proxying.h.rst b/site/source/docs/api_reference/proxying.h.rst index ea6588aff8afd..c6a42b8b00339 100644 --- a/site/source/docs/api_reference/proxying.h.rst +++ b/site/source/docs/api_reference/proxying.h.rst @@ -90,9 +90,9 @@ Functions The same as ``emscripten_proxy_sync`` except that instead of waiting for the proxied function to return, it waits for the proxied task to be explicitly - marked finished with ``emscripten_proxying_finish``. ``func`` need not call - ``emscripten_proxying_finish`` itself; it could instead store the context - pointer and call ``emscripten_proxying_finish`` at an arbitrary later time. + marked finished with ``emscripten_proxy_finish``. ``func`` need not call + ``emscripten_proxy_finish`` itself; it could instead store the context pointer + and call ``emscripten_proxy_finish`` at an arbitrary later time. .. c:function:: int emscripten_proxy_callback(em_proxying_queue* q, pthread_t target_thread, void (*func)(void*), void (*callback)(void*), void (*cancel)(void*), void* arg) @@ -103,6 +103,16 @@ Functions receive the same argument, ``arg``. Returns 1 if ``func`` was successfully enqueued and the target thread notified or 0 otherwise. +.. c:function:: int emscripten_proxy_callback_with_ctx(em_proxying_queue* q, pthread_t target_thread, void (*func)(em_proxying_ctx*, void*), void (*callback)(void*), void (*cancel)(void*), void* arg) + + Enqueue ``func`` on the given queue and thread. Once (and if) it finishes the + task by calling ``emscripten_proxy_finish`` on the given ``em_proxying_ctx``, + it will asynchronously proxy ``callback`` back to the current thread on the + same queue, or if the target thread dies before the work can be completed, + ``cancel`` will be proxied back instead. All three function will receive the + same argument, ``arg``. Returns 1 if ``func`` was successfully enqueued and + the target thread notified or 0 otherwise. + C++ API ------- @@ -134,12 +144,6 @@ defined within namespace ``emscripten``. Calls ``emscripten_proxy_async`` to execute ``func``, returning ``true`` if the function was successfully enqueued and ``false`` otherwise. - .. cpp:member:: bool proxyCallback(pthread_t target, std::function&& func, std::function&& callback, std::function&& cancel) - - Calls ``emscripten_proxy_callback`` to execute ``func`` and schedule either - ``callback`` or ``cancel``, returning ``true`` if the function was - successfully enqueued and ``false`` otherwise. - .. cpp:member:: bool proxySync(const pthread_t target, const std::function& func) Calls ``emscripten_proxy_sync`` to execute ``func``, returning ``true`` if the @@ -147,9 +151,22 @@ defined within namespace ``emscripten``. .. cpp:member:: bool proxySyncWithCtx(const pthread_t target, const std::function& func) - Calls ``emscripten_proxy_sync_with_ctx`` to execute ``func``, returning ``true`` - if the function was successfully marked done with - ``emscripten_proxying_finish`` or ``ProxyingCtx::finish`` and ``false`` otherwise. + Calls ``emscripten_proxy_sync_with_ctx`` to execute ``func``, returning + ``true`` if the function was successfully marked done with + ``emscripten_proxy_finish`` or ``ProxyingCtx::finish`` and ``false`` + otherwise. + + .. cpp:member:: bool proxyCallback(pthread_t target, std::function&& func, std::function&& callback, std::function&& cancel) + + Calls ``emscripten_proxy_callback`` to execute ``func`` and schedule either + ``callback`` or ``cancel``, returning ``true`` if the function was + successfully enqueued and ``false`` otherwise. + + .. cpp:member:: bool proxyCallbackWithCtx(pthread_t target, std::function&& func, std::function&& callback, std::function&& cancel) + + Calls ``emscripten_proxy_callback_with_ctx`` to execute ``func`` and + schedule either ``callback`` or ``cancel``, returning ``true`` if the + function was successfully enqueued and ``false`` otherwise. .. _proxying.h: https://github.com/emscripten-core/emscripten/blob/main/system/include/emscripten/proxying.h .. _test_pthread_proxying.c: https://github.com/emscripten-core/emscripten/blob/main/test/pthread/test_pthread_proxying.c diff --git a/system/include/emscripten/proxying.h b/system/include/emscripten/proxying.h index e2c24b50d73af..bdcc328a55e8a 100644 --- a/system/include/emscripten/proxying.h +++ b/system/include/emscripten/proxying.h @@ -64,12 +64,12 @@ int emscripten_proxy_sync(em_proxying_queue* q, void* arg); // Enqueue `func` on the given queue and thread and wait for it to be executed -// and for the task to be marked finished with `emscripten_proxying_finish` -// before returning. `func` need not call `emscripten_proxying_finish` itself; -// it could instead store the context pointer and call -// `emscripten_proxying_finish` at an arbitrary later time. Returns 1 if the -// task was successfully completed and 0 otherwise, including if the target -// thread is canceled or exits before the work is completed. +// and for the task to be marked finished with `emscripten_proxy_finish` before +// returning. `func` need not call `emscripten_proxy_finish` itself; it could +// instead store the context pointer and call `emscripten_proxy_finish` at an +// arbitrary later time. Returns 1 if the task was successfully completed and 0 +// otherwise, including if the target thread is canceled or exits before the +// work is completed. int emscripten_proxy_sync_with_ctx(em_proxying_queue* q, pthread_t target_thread, void (*func)(em_proxying_ctx*, void*), @@ -88,6 +88,20 @@ int emscripten_proxy_callback(em_proxying_queue* q, void (*cancel)(void*), void* arg); +// Enqueue `func` on the given queue and thread. Once (and if) it finishes the +// task by calling `emscripten_proxy_finish` on the given `em_proxying_ctx`, it +// will asynchronously proxy `callback` back to the current thread on the same +// queue, or if the target thread dies before the work can be completed, +// `cancel` will be proxied back instead. All three function will receive the +// same argument, `arg`. Returns 1 if `func` was successfully enqueued and the +// target thread notified or 0 otherwise. +int emscripten_proxy_callback_with_ctx(em_proxying_queue* q, + pthread_t target_thread, + void (*func)(em_proxying_ctx*, void*), + void (*callback)(void*), + void (*cancel)(void*), + void* arg); + #ifdef __cplusplus } // extern "C" @@ -103,6 +117,18 @@ namespace emscripten { // A thin C++ wrapper around the underlying C API. class ProxyingQueue { +public: + // Simple wrapper around `em_proxying_ctx*` providing a `finish` method as an + // alternative to `emscripten_proxy_finish`. + struct ProxyingCtx { + em_proxying_ctx* ctx; + + ProxyingCtx() = default; + ProxyingCtx(em_proxying_ctx* ctx) : ctx(ctx) {} + void finish() { emscripten_proxy_finish(ctx); } + }; + +private: static void runAndFree(void* arg) { auto* f = (std::function*)arg; (*f)(); @@ -150,6 +176,37 @@ class ProxyingQueue { delete info; } + struct CallbackWithCtxFuncs { + std::function func; + std::function callback; + std::function cancel; + + CallbackWithCtxFuncs(std::function&& func, + std::function&& callback, + std::function&& cancel) + : func(std::move(func)), callback(std::move(callback)), + cancel(std::move(cancel)) {} + }; + + static void runFuncWithCtx(em_proxying_ctx* ctx, void* arg) { + auto* info = (CallbackWithCtxFuncs*)arg; + info->func(ProxyingCtx{ctx}); + } + + static void runCallbackWithCtx(void* arg) { + auto* info = (CallbackWithCtxFuncs*)arg; + info->callback(); + delete info; + } + + static void runCancelWithCtx(void* arg) { + auto* info = (CallbackWithCtxFuncs*)arg; + if (info->cancel) { + info->cancel(); + } + delete info; + } + public: em_proxying_queue* queue = em_proxying_queue_create(); @@ -179,16 +236,6 @@ class ProxyingQueue { } } - // Simple wrapper around `em_proxying_ctx*` providing a `finish` method as an - // alternative to `emscripten_proxy_finish`. - struct ProxyingCtx { - em_proxying_ctx* ctx; - - ProxyingCtx() = default; - ProxyingCtx(em_proxying_ctx* ctx) : ctx(ctx) {} - void finish() { emscripten_proxy_finish(ctx); } - }; - void execute() { emscripten_proxy_execute_queue(queue); } // Return true if the work was successfully enqueued and false otherwise. @@ -225,6 +272,24 @@ class ProxyingQueue { } return true; } + + bool proxyCallbackWithCtx(pthread_t target, + std::function&& func, + std::function&& callback, + std::function&& cancel) { + CallbackWithCtxFuncs* info = new CallbackWithCtxFuncs( + std::move(func), std::move(callback), std::move(cancel)); + if (!emscripten_proxy_callback_with_ctx(queue, + target, + runFuncWithCtx, + runCallbackWithCtx, + runCancelWithCtx, + info)) { + delete info; + return false; + } + return true; + } }; } // namespace emscripten diff --git a/system/lib/pthread/proxying.c b/system/lib/pthread/proxying.c index 5614fb635714d..1fdd69c38d895 100644 --- a/system/lib/pthread/proxying.c +++ b/system/lib/pthread/proxying.c @@ -154,17 +154,35 @@ int emscripten_proxy_async(em_proxying_queue* q, return do_proxy(q, target_thread, (task){func, NULL, arg}); } +enum ctx_kind { SYNC, CALLBACK }; + enum ctx_state { PENDING, DONE, CANCELED }; struct em_proxying_ctx { // The user-provided function and argument. void (*func)(em_proxying_ctx*, void*); void* arg; - // Update `state` and signal the condition variable once the proxied task is - // done or canceled. - enum ctx_state state; - pthread_mutex_t mutex; - pthread_cond_t cond; + + enum ctx_kind kind; + union { + // Context for synchronous proxying. + struct { + // Update `state` and signal the condition variable once the proxied task + // is done or canceled. + enum ctx_state state; + pthread_mutex_t mutex; + pthread_cond_t cond; + } sync; + + // Context for proxying with callbacks. + struct { + em_proxying_queue* queue; + pthread_t caller_thread; + void (*callback)(void*); + void (*cancel)(void*); + } cb; + }; + // A doubly linked list of contexts associated with active work on a single // thread. If the thread is canceled, it will traverse this list to find // contexts that need to be canceled. @@ -175,6 +193,7 @@ struct em_proxying_ctx { // The key that `cancel_active_ctxs` is bound to so that it runs when a thread // is canceled or exits. static pthread_key_t active_ctxs; +static pthread_once_t active_ctxs_once = PTHREAD_ONCE_INIT; static void cancel_ctx(void* arg); static void cancel_active_ctxs(void* arg); @@ -235,39 +254,109 @@ static void cancel_active_ctxs(void* arg) { } while (curr != head); } -static void em_proxying_ctx_init(em_proxying_ctx* ctx, - void (*func)(em_proxying_ctx*, void*), - void* arg) { - static pthread_once_t once = PTHREAD_ONCE_INIT; - pthread_once(&once, init_active_ctxs); +static void em_proxying_ctx_init_sync(em_proxying_ctx* ctx, + void (*func)(em_proxying_ctx*, void*), + void* arg) { + pthread_once(&active_ctxs_once, init_active_ctxs); *ctx = (em_proxying_ctx){ .func = func, .arg = arg, - .state = PENDING, - .mutex = PTHREAD_MUTEX_INITIALIZER, - .cond = PTHREAD_COND_INITIALIZER, + .kind = SYNC, + .sync = + { + .state = PENDING, + .mutex = PTHREAD_MUTEX_INITIALIZER, + .cond = PTHREAD_COND_INITIALIZER, + }, + }; +} + +static void em_proxying_ctx_init_callback(em_proxying_ctx* ctx, + em_proxying_queue* queue, + pthread_t caller_thread, + void (*func)(em_proxying_ctx*, void*), + void (*callback)(void*), + void (*cancel)(void*), + void* arg) { + pthread_once(&active_ctxs_once, init_active_ctxs); + *ctx = (em_proxying_ctx){ + .func = func, + .arg = arg, + .kind = CALLBACK, + .cb = + { + .queue = queue, + .caller_thread = caller_thread, + .callback = callback, + .cancel = cancel, + }, }; } static void em_proxying_ctx_deinit(em_proxying_ctx* ctx) { - pthread_mutex_destroy(&ctx->mutex); - pthread_cond_destroy(&ctx->cond); + if (ctx->kind == SYNC) { + pthread_mutex_destroy(&ctx->sync.mutex); + pthread_cond_destroy(&ctx->sync.cond); + } + // TODO: We should probably have some kind of refcounting scheme to keep + // `queue` alive for callback ctxs. +} + +static void free_ctx(void* arg) { + em_proxying_ctx* ctx = arg; + em_proxying_ctx_deinit(ctx); + free(ctx); +} + +// Free the callback info on the same thread it was originally allocated on. +// This may be more efficient. +static void call_callback_then_free_ctx(void* arg) { + em_proxying_ctx* ctx = arg; + ctx->cb.callback(ctx->arg); + free_ctx(ctx); } void emscripten_proxy_finish(em_proxying_ctx* ctx) { - pthread_mutex_lock(&ctx->mutex); - ctx->state = DONE; - remove_active_ctx(ctx); - pthread_mutex_unlock(&ctx->mutex); - pthread_cond_signal(&ctx->cond); + if (ctx->kind == SYNC) { + pthread_mutex_lock(&ctx->sync.mutex); + ctx->sync.state = DONE; + remove_active_ctx(ctx); + pthread_mutex_unlock(&ctx->sync.mutex); + pthread_cond_signal(&ctx->sync.cond); + } else { + // Schedule the callback on the caller thread. If the caller thread has + // already died or dies before the callback is executed, then at least make + // sure the context is freed. + remove_active_ctx(ctx); + if (!do_proxy(ctx->cb.queue, + ctx->cb.caller_thread, + (task){call_callback_then_free_ctx, free_ctx, ctx})) { + free_ctx(ctx); + } + } +} + +static void call_cancel_then_free_ctx(void* arg) { + em_proxying_ctx* ctx = arg; + ctx->cb.cancel(ctx->arg); + free_ctx(ctx); } static void cancel_ctx(void* arg) { em_proxying_ctx* ctx = arg; - pthread_mutex_lock(&ctx->mutex); - ctx->state = CANCELED; - pthread_mutex_unlock(&ctx->mutex); - pthread_cond_signal(&ctx->cond); + if (ctx->kind == SYNC) { + pthread_mutex_lock(&ctx->sync.mutex); + ctx->sync.state = CANCELED; + pthread_mutex_unlock(&ctx->sync.mutex); + pthread_cond_signal(&ctx->sync.cond); + } else { + if (ctx->cb.cancel == NULL || + !do_proxy(ctx->cb.queue, + ctx->cb.caller_thread, + (task){call_cancel_then_free_ctx, free_ctx, ctx})) { + free_ctx(ctx); + } + } } // Helper for wrapping the call with ctx as a `void (*)(void*)`. @@ -284,23 +373,23 @@ int emscripten_proxy_sync_with_ctx(em_proxying_queue* q, assert(!pthread_equal(target_thread, pthread_self()) && "Cannot synchronously wait for work proxied to the current thread"); em_proxying_ctx ctx; - em_proxying_ctx_init(&ctx, func, arg); + em_proxying_ctx_init_sync(&ctx, func, arg); if (!do_proxy(q, target_thread, (task){call_with_ctx, cancel_ctx, &ctx})) { em_proxying_ctx_deinit(&ctx); return 0; } - pthread_mutex_lock(&ctx.mutex); - while (ctx.state == PENDING) { - pthread_cond_wait(&ctx.cond, &ctx.mutex); + pthread_mutex_lock(&ctx.sync.mutex); + while (ctx.sync.state == PENDING) { + pthread_cond_wait(&ctx.sync.cond, &ctx.sync.mutex); } - pthread_mutex_unlock(&ctx.mutex); - int ret = ctx.state == DONE; + pthread_mutex_unlock(&ctx.sync.mutex); + int ret = ctx.sync.state == DONE; em_proxying_ctx_deinit(&ctx); return ret; } // Helper for signaling the end of the task after the user function returns. -static void call_then_finish(em_proxying_ctx* ctx, void* arg) { +static void call_then_finish_sync(em_proxying_ctx* ctx, void* arg) { task* t = arg; t->func(t->arg); emscripten_proxy_finish(ctx); @@ -311,59 +400,62 @@ int emscripten_proxy_sync(em_proxying_queue* q, void (*func)(void*), void* arg) { task t = {.func = func, .arg = arg}; - return emscripten_proxy_sync_with_ctx(q, target_thread, call_then_finish, &t); + return emscripten_proxy_sync_with_ctx( + q, target_thread, call_then_finish_sync, &t); } -// Helper struct for organizing a proxied call and its callback on the original -// thread. -struct callback { - em_proxying_queue* queue; - pthread_t caller_thread; +static int do_proxy_callback(em_proxying_queue* q, + pthread_t target_thread, + void (*func)(em_proxying_ctx* ctx, void*), + void (*callback)(void*), + void (*cancel)(void*), + void* arg, + em_proxying_ctx* ctx) { + em_proxying_ctx_init_callback( + ctx, q, pthread_self(), func, callback, cancel, arg); + if (!do_proxy(q, target_thread, (task){call_with_ctx, cancel_ctx, ctx})) { + free_ctx(ctx); + return 0; + } + return 1; +} + +int emscripten_proxy_callback_with_ctx(em_proxying_queue* q, + pthread_t target_thread, + void (*func)(em_proxying_ctx* ctx, + void*), + void (*callback)(void*), + void (*cancel)(void*), + void* arg) { + em_proxying_ctx* ctx = malloc(sizeof(*ctx)); + if (ctx == NULL) { + return 0; + } + return do_proxy_callback(q, target_thread, func, callback, cancel, arg, ctx); +} + +typedef struct callback_ctx { void (*func)(void*); void (*callback)(void*); void (*cancel)(void*); void* arg; -}; +} callback_ctx; -static void free_callback(void* arg) { - struct callback* info = arg; - free(info); -} - -// Free the callback info on the same thread it was originally allocated on. -// This may be more efficient. -static void call_callback_then_free(void* arg) { - struct callback* info = arg; - info->callback(info->arg); - free(info); -} - -static void call_then_schedule_callback(void* arg) { - struct callback* info = arg; - info->func(info->arg); - // Schedule the callback on the caller thread. If the caller thread has - // already died or dies before the callback is executed, then at least make - // sure the callback info is freed. - if (!do_proxy(info->queue, - info->caller_thread, - (task){call_callback_then_free, free_callback, info})) { - free(info); - } +static void call_then_finish_callback(em_proxying_ctx* ctx, void* arg) { + callback_ctx* cb_ctx = arg; + cb_ctx->func(cb_ctx->arg); + emscripten_proxy_finish(ctx); } -static void cancel_callback_then_free(void* arg) { - struct callback* info = arg; - info->cancel(info->arg); - free(info); +static void callback_call(void* arg) { + callback_ctx* cb_ctx = arg; + cb_ctx->callback(cb_ctx->arg); } -static void schedule_cancel_callback(void* arg) { - struct callback* info = arg; - if (info->cancel == NULL || - !do_proxy(info->queue, - info->caller_thread, - (task){cancel_callback_then_free, free_callback, info})) { - free(info); +static void callback_cancel(void* arg) { + callback_ctx* cb_ctx = arg; + if (cb_ctx->cancel != NULL) { + cb_ctx->cancel(cb_ctx->arg); } } @@ -373,24 +465,21 @@ int emscripten_proxy_callback(em_proxying_queue* q, void (*callback)(void*), void (*cancel)(void*), void* arg) { - struct callback* info = malloc(sizeof(*info)); - if (info == NULL) { - return 0; - } - *info = (struct callback){ - .queue = q, - .caller_thread = pthread_self(), - .func = func, - .callback = callback, - .cancel = cancel, - .arg = arg, + // Allocate the em_proxying_ctx and the user ctx as a single block. + struct block { + em_proxying_ctx ctx; + callback_ctx cb_ctx; }; - if (!do_proxy( - q, - target_thread, - (task){call_then_schedule_callback, schedule_cancel_callback, info})) { - free(info); + struct block* block = malloc(sizeof(*block)); + if (block == NULL) { return 0; } - return 1; + block->cb_ctx = (callback_ctx){func, callback, cancel, arg}; + return do_proxy_callback(q, + target_thread, + call_then_finish_callback, + callback_call, + callback_cancel, + &block->cb_ctx, + &block->ctx); } diff --git a/test/pthread/test_pthread_proxying.c b/test/pthread/test_pthread_proxying.c index 1cee32618b430..2e931f64e47c4 100644 --- a/test/pthread/test_pthread_proxying.c +++ b/test/pthread/test_pthread_proxying.c @@ -262,6 +262,58 @@ void test_proxy_callback(void) { destroy_widget(&w10); } +void test_proxy_callback_with_ctx(void) { + printf("Testing callback_with_ctx proxying\n"); + + int i = 0; + widget w11, w12, w13; + init_widget(&w11, &i, 11); + init_widget(&w12, &i, 12); + init_widget(&w13, &i, 13); + + // Proxy to ourselves. + + emscripten_proxy_callback_with_ctx( + proxy_queue, pthread_self(), start_and_finish_running_widget, set_j, NULL, &w11); + assert(!w11.done); + assert(j == 10); + emscripten_proxy_execute_queue(proxy_queue); + assert(i == 11); + assert(w11.done); + assert(pthread_equal(w11.thread, pthread_self())); + assert(j == 11); + + // Proxy to looper. + emscripten_proxy_callback_with_ctx( + proxy_queue, looper, start_and_finish_running_widget, set_j, NULL, &w12); + await_widget(&w12); + assert(i == 12); + assert(w12.done); + assert(pthread_equal(w12.thread, looper)); + assert(j == 11); + // TODO: Add a way to wait for work before executing it. + while (j != 12) { + emscripten_proxy_execute_queue(proxy_queue); + } + + // Proxy to returner. + emscripten_proxy_callback_with_ctx( + proxy_queue, returner, start_running_widget, set_j, NULL, &w13); + await_widget(&w13); + assert(i == 13); + assert(w13.done); + assert(pthread_equal(w13.thread, returner)); + assert(j == 12); + // TODO: Add a way to wait for work before executing it. + while (j != 13) { + emscripten_proxy_execute_queue(proxy_queue); + } + + destroy_widget(&w11); + destroy_widget(&w12); + destroy_widget(&w13); +} + typedef struct increment_to_arg { em_proxying_queue* queue; int* ip; @@ -393,6 +445,7 @@ int main(int argc, char* argv[]) { test_proxy_sync(); test_proxy_sync_with_ctx(); test_proxy_callback(); + test_proxy_callback_with_ctx(); should_quit = 1; pthread_join(looper, NULL); diff --git a/test/pthread/test_pthread_proxying.out b/test/pthread/test_pthread_proxying.out index edfb2beeccf57..9b445fbb4342e 100644 --- a/test/pthread/test_pthread_proxying.out +++ b/test/pthread/test_pthread_proxying.out @@ -12,6 +12,10 @@ Testing callback proxying running widget 8 on main running widget 9 on looper running widget 10 on returner +Testing callback_with_ctx proxying +running widget 11 on main +running widget 12 on looper +running widget 13 on returner Testing tasks queue growth Testing proxying queue growth work diff --git a/test/pthread/test_pthread_proxying_canceled_work.c b/test/pthread/test_pthread_proxying_canceled_work.c index 3a15c9514f566..8e8661a45acea 100644 --- a/test/pthread/test_pthread_proxying_canceled_work.c +++ b/test/pthread/test_pthread_proxying_canceled_work.c @@ -233,7 +233,10 @@ void test_proxy_callback_then_exit() { pthread_join(info.proxier, NULL); } +enum proxy_kind { SYNC, CALLBACK }; + struct in_progress_state { + enum proxy_kind kind; int pattern_index; int proxier_index; _Atomic int running_count; @@ -243,9 +246,10 @@ struct in_progress_state { _Atomic int ctx_count; }; -struct state_index { +struct worker_args { int index; struct in_progress_state* state; + int* ret; }; // The patterns of work completion to test so we sufficiently exercise the @@ -259,7 +263,7 @@ int patterns[][5] = {{1, 2, 3, 4, 5}, {4, 2, 3, 0, 1}}; void receive_ctx(em_proxying_ctx* ctx, void* arg) { - struct state_index* args = arg; + struct worker_args* args = arg; args->state->ctxs[args->index] = ctx; args->state->ctx_count++; } @@ -284,14 +288,43 @@ void* in_progress_worker(void* arg) { return NULL; } +void callback_ok(void* arg) { + struct worker_args* args = arg; + *args->ret = 1; +} + +void callback_cancel(void* arg) { + struct worker_args* args = arg; + *args->ret = 0; +} + void* in_progress_proxier(void* arg) { struct in_progress_state* state = arg; int index = state->proxier_index; state->running_count++; - struct state_index proxy_args = {index, state}; - int ret = emscripten_proxy_sync_with_ctx( - queue, state->worker, receive_ctx, &proxy_args); + int ret = -1; + struct worker_args worker_args = {index, state, &ret}; + if (state->kind == SYNC) { + // Synchronously wait for the work to be completed or canceled. + ret = emscripten_proxy_sync_with_ctx( + queue, state->worker, receive_ctx, &worker_args); + } else if (state->kind == CALLBACK) { + // Spin until we execute a callback telling us whether the work was + // completed or canceled. + int proxied = emscripten_proxy_callback_with_ctx(queue, + state->worker, + receive_ctx, + callback_ok, + callback_cancel, + &worker_args); + assert(proxied == 1); + while (ret == -1) { + // TODO: Add a way to wait for work to be executed. + emscripten_proxy_execute_queue(queue); + } + } + // The expected result value depends on the pattern we are executing. assert(ret == (0 != patterns[state->pattern_index][index])); return NULL; @@ -303,33 +336,35 @@ void test_cancel_in_progress() { int num_patterns = sizeof(patterns) / sizeof(patterns[0]); struct in_progress_state state; - for (state.pattern_index = 0; state.pattern_index < num_patterns; - state.pattern_index++) { - state.running_count = 0; - state.ctx_count = 0; - - printf("checking pattern %d\n", state.pattern_index); - - // Spawn the worker thread. - pthread_create(&state.worker, NULL, in_progress_worker, &state); - - // Spawn the proxier threads. - for (state.proxier_index = 0; state.proxier_index < 5; - state.proxier_index++) { - pthread_create(&state.proxiers[state.proxier_index], - NULL, - in_progress_proxier, - &state); - // Wait for the new proxier to start running so it gets the right index. - while (state.running_count == state.proxier_index) { + for (state.kind = 0; state.kind <= CALLBACK; state.kind++) { + for (state.pattern_index = 0; state.pattern_index < num_patterns; + state.pattern_index++) { + state.running_count = 0; + state.ctx_count = 0; + + printf("checking pattern %d\n", state.pattern_index); + + // Spawn the worker thread. + pthread_create(&state.worker, NULL, in_progress_worker, &state); + + // Spawn the proxier threads. + for (state.proxier_index = 0; state.proxier_index < 5; + state.proxier_index++) { + pthread_create(&state.proxiers[state.proxier_index], + NULL, + in_progress_proxier, + &state); + // Wait for the new proxier to start running so it gets the right index. + while (state.running_count == state.proxier_index) { + } } - } - // Wait for all the threads to finish. - for (int i = 0; i < 5; i++) { - pthread_join(state.proxiers[i], NULL); + // Wait for all the threads to finish. + for (int i = 0; i < 5; i++) { + pthread_join(state.proxiers[i], NULL); + } + pthread_join(state.worker, NULL); } - pthread_join(state.worker, NULL); } } diff --git a/test/pthread/test_pthread_proxying_canceled_work.out b/test/pthread/test_pthread_proxying_canceled_work.out index e97a06ffe98e7..07e0a9bc8d04c 100644 --- a/test/pthread/test_pthread_proxying_canceled_work.out +++ b/test/pthread/test_pthread_proxying_canceled_work.out @@ -30,4 +30,29 @@ finishing task 4 finishing task 1 finishing task 2 finishing task 0 +checking pattern 0 +finishing task 0 +finishing task 1 +finishing task 2 +finishing task 3 +finishing task 4 +checking pattern 1 +finishing task 4 +finishing task 3 +finishing task 2 +finishing task 1 +finishing task 0 +checking pattern 2 +checking pattern 3 +finishing task 0 +finishing task 2 +finishing task 4 +checking pattern 4 +finishing task 1 +finishing task 3 +checking pattern 5 +finishing task 4 +finishing task 1 +finishing task 2 +finishing task 0 done diff --git a/test/pthread/test_pthread_proxying_cpp.cpp b/test/pthread/test_pthread_proxying_cpp.cpp index 5dba8ded46567..d7fd6da1be27f 100644 --- a/test/pthread/test_pthread_proxying_cpp.cpp +++ b/test/pthread/test_pthread_proxying_cpp.cpp @@ -146,9 +146,6 @@ void test_proxy_callback(void) { int i = 0; thread_local int j = 0; - - std::mutex mutex; - std::condition_variable cond; std::thread::id executor; // Proxy to ourselves. @@ -171,22 +168,17 @@ void test_proxy_callback(void) { queue.proxyCallback( looper.native_handle(), [&]() { - { - std::unique_lock lock(mutex); - i = 2; - } + i = 2; executor = std::this_thread::get_id(); - cond.notify_one(); }, [&]() { j = 2; }, {}); - std::unique_lock lock(mutex); - cond.wait(lock, [&]() { return i == 2; }); - assert(executor == looper.get_id()); // TODO: Add a way to wait for work before executing it. while (j != 2) { queue.execute(); } + assert(i == 2); + assert(executor == looper.get_id()); } // Proxy to returner. @@ -194,22 +186,80 @@ void test_proxy_callback(void) { queue.proxyCallback( returner.native_handle(), [&]() { - { - std::unique_lock lock(mutex); - i = 3; - } + i = 3; executor = std::this_thread::get_id(); - cond.notify_one(); }, [&]() { j = 3; }, {}); - std::unique_lock lock(mutex); - cond.wait(lock, [&]() { return i == 3; }); + // TODO: Add a way to wait for work before executing it. + while (j != 3) { + queue.execute(); + } + assert(i == 3); assert(executor == returner.get_id()); + } +} + +void test_proxy_callback_with_ctx(void) { + std::cout << "Testing callback_with_ctx proxying\n"; + + int i = 0; + thread_local int j = 0; + std::thread::id executor; + + // Proxy to ourselves. + queue.proxyCallbackWithCtx( + pthread_self(), + [&](auto ctx) { + i = 1; + executor = std::this_thread::get_id(); + ctx.finish(); + }, + [&]() { j = 1; }, + {}); + assert(i == 0); + queue.execute(); + assert(i == 1); + assert(executor == std::this_thread::get_id()); + assert(j == 1); + + // Proxy to looper. + { + queue.proxyCallbackWithCtx( + looper.native_handle(), + [&](auto ctx) { + i = 2; + executor = std::this_thread::get_id(); + ctx.finish(); + }, + [&]() { j = 2; }, + {}); + // TODO: Add a way to wait for work before executing it. + while (j != 2) { + queue.execute(); + } + assert(i == 2); + assert(executor == looper.get_id()); + } + + // Proxy to returner. + { + queue.proxyCallbackWithCtx( + returner.native_handle(), + [&](auto ctx) { + i = 3; + executor = std::this_thread::get_id(); + auto finish = (void (*)(void*))emscripten_proxy_finish; + emscripten_async_call(finish, ctx.ctx, 0); + }, + [&]() { j = 3; }, + {}); // TODO: Add a way to wait for work before executing it. while (j != 3) { queue.execute(); } + assert(i == 3); + assert(executor == returner.get_id()); } } @@ -221,6 +271,7 @@ int main(int argc, char* argv[]) { test_proxy_sync(); test_proxy_sync_with_ctx(); test_proxy_callback(); + test_proxy_callback_with_ctx(); should_quit = true; looper.join(); diff --git a/test/pthread/test_pthread_proxying_cpp.out b/test/pthread/test_pthread_proxying_cpp.out index 6dd1dabc0983e..8198ba7070767 100644 --- a/test/pthread/test_pthread_proxying_cpp.out +++ b/test/pthread/test_pthread_proxying_cpp.out @@ -2,4 +2,5 @@ Testing async proxying Testing sync proxying Testing sync_with_ctx proxying Testing callback proxying +Testing callback_with_ctx proxying done