Skip to content

Commit

Permalink
[Proxying] Use Atomics.waitAsync to receive proxying notifications (e…
Browse files Browse the repository at this point in the history
…mscripten-core#18861)

When it is available, register to be notified of new proxying messages using
`Atomics.waitAsync` and perform the notification using `Atomics.notify`. This
new notification mechanism does not depend on the main thread forwarding
postMessage notifications, so its performance should not degrade when the main
thread is busy.

Update a test that depended on intercepting postMessage notifications to
conservatively wait for 20ms instead.
  • Loading branch information
tlively authored and impact-maker committed Mar 17, 2023
1 parent bce0a28 commit 1202f72
Show file tree
Hide file tree
Showing 16 changed files with 68 additions and 34 deletions.
1 change: 1 addition & 0 deletions emcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1714,6 +1714,7 @@ def setup_pthreads(target):
'__emscripten_thread_init',
'__emscripten_thread_exit',
'__emscripten_thread_crashed',
'__emscripten_thread_mailbox_await',
'__emscripten_tls_init',
'_pthread_self',
'checkMailbox',
Expand Down
34 changes: 28 additions & 6 deletions src/library_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -1219,18 +1219,40 @@ var LibraryPThread = {
},
#endif // MAIN_MODULE

$checkMailbox__deps: ['$callUserCallback'],
$checkMailbox__deps: ['$callUserCallback',
'_emscripten_thread_mailbox_await'],
$checkMailbox: function() {
// Only check the mailbox if we have a live pthread runtime. We implement
// pthread_self to return 0 if there is no live runtime.
if (_pthread_self()) {
var pthread_ptr = _pthread_self();
if (pthread_ptr) {
// If we are using Atomics.waitAsync as our notification mechanism, wait for
// a notification before processing the mailbox to avoid missing any work.
__emscripten_thread_mailbox_await(pthread_ptr);
callUserCallback(() => __emscripten_check_mailbox());
}
},

_emscripten_notify_mailbox__deps: ['$checkMailbox'],
_emscripten_notify_mailbox__sig: 'vppp',
_emscripten_notify_mailbox: function(targetThreadId, currThreadId, mainThreadId) {
_emscripten_thread_mailbox_await__deps: ['$checkMailbox'],
_emscripten_thread_mailbox_await__sig: 'vp',
_emscripten_thread_mailbox_await: function(pthread_ptr) {
if (typeof Atomics.waitAsync === 'function') {
// TODO: How to make this work with wasm64?
var wait = Atomics.waitAsync(HEAP32, pthread_ptr >> 2, pthread_ptr);
#if ASSERTIONS
assert(wait.async);
#endif
wait.value.then(checkMailbox);
var waitingAsync = pthread_ptr + {{{ C_STRUCTS.pthread.waiting_async }}};
Atomics.store(HEAP32, waitingAsync >> 2, 1);
}
},

_emscripten_notify_mailbox_postmessage__deps: ['$checkMailbox'],
_emscripten_notify_mailbox_postmessage__sig: 'vppp',
_emscripten_notify_mailbox_postmessage: function(targetThreadId,
currThreadId,
mainThreadId) {
if (targetThreadId == currThreadId) {
setTimeout(() => checkMailbox());
} else if (ENVIRONMENT_IS_PTHREAD) {
Expand All @@ -1241,7 +1263,7 @@ var LibraryPThread = {
#if ASSERTIONS
err('Cannot send message to thread with ID ' + targetThreadId + ', unknown thread ID!');
#endif
return /*0*/;
return;
}
worker.postMessage({'cmd' : 'checkMailbox'});
}
Expand Down
3 changes: 2 additions & 1 deletion src/struct_info_internal.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"pthread": [
"profilerBlock",
"stack",
"stack_size"
"stack_size",
"waiting_async"
],
"pthread_attr_t#": [
"_a_transferredcanvases"
Expand Down
4 changes: 4 additions & 0 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ function handleMessage(e) {
// Pass the thread address to wasm to store it for fast access.
Module['__emscripten_thread_init'](e.data.pthread_ptr, /*isMainBrowserThread=*/0, /*isMainRuntimeThread=*/0, /*canBlock=*/1);

// Await mailbox notifications with `Atomics.waitAsync` so we can start
// using the fast `Atomics.notify` notification path.
Module['__emscripten_thread_mailbox_await'](e.data.pthread_ptr);

#if ASSERTIONS
assert(e.data.pthread_ptr);
#endif
Expand Down
4 changes: 4 additions & 0 deletions system/lib/libc/musl/src/internal/pthread_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ struct pthread {
// wait until it reaches 0, at which point the mailbox is considered
// closed and no further messages will be enqueued.
_Atomic int mailbox_refcount;
// Whether the thread has executed a `waitAsync` on this pthread struct
// and can be notified of new mailbox messages via `Atomics.notify`.
// Otherwise the notification has to fall back to the postMessage path.
_Atomic int waiting_async;
#endif
#if _REENTRANT
_Atomic char sleeping;
Expand Down
1 change: 1 addition & 0 deletions system/lib/pthread/library_pthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -597,4 +597,5 @@ void __emscripten_init_main_thread(void) {
__main_pthread.tsd = (void **)__pthread_tsd_main;

_emscripten_thread_mailbox_init(&__main_pthread);
_emscripten_thread_mailbox_await(&__main_pthread);
}
16 changes: 10 additions & 6 deletions system/lib/pthread/thread_mailbox.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ void _emscripten_thread_mailbox_shutdown(pthread_t thread) {
void _emscripten_thread_mailbox_init(pthread_t thread) {
thread->mailbox = em_task_queue_create(thread);
thread->mailbox_refcount = 1;
thread->waiting_async = 0;
}

// Exported for use in worker.js, but otherwise an internal function.
Expand All @@ -92,9 +93,9 @@ void _emscripten_check_mailbox() {
// Send a postMessage notification telling the target thread to check its
// mailbox when it returns to its event loop. Pass in the current thread and
// main thread ids to minimize calls back into Wasm.
void _emscripten_notify_mailbox(pthread_t target_thread,
pthread_t curr_thread,
pthread_t main_thread);
void _emscripten_notify_mailbox_postmessage(pthread_t target_thread,
pthread_t curr_thread,
pthread_t main_thread);

void emscripten_thread_mailbox_send(pthread_t thread, task t) {
assert(thread->mailbox_refcount > 0);
Expand All @@ -112,8 +113,11 @@ void emscripten_thread_mailbox_send(pthread_t thread, task t) {
notification_state previous =
atomic_exchange(&thread->mailbox->notification, NOTIFICATION_PENDING);
if (previous != NOTIFICATION_PENDING) {
_emscripten_notify_mailbox(thread,
pthread_self(),
emscripten_main_runtime_thread_id());
if (thread->waiting_async) {
__builtin_wasm_memory_atomic_notify((int*)thread, -1);
} else {
_emscripten_notify_mailbox_postmessage(
thread, pthread_self(), emscripten_main_runtime_thread_id());
}
}
}
2 changes: 2 additions & 0 deletions system/lib/pthread/thread_mailbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@ void emscripten_thread_mailbox_send(pthread_t thread, task t);
// Initialize the mailbox on a pthread struct. Called during `pthread_create`.
void _emscripten_thread_mailbox_init(pthread_t thread);

void _emscripten_thread_mailbox_await(pthread_t thread);

// Close the mailbox and cancel any pending messages.
void _emscripten_thread_mailbox_shutdown(pthread_t thread);
3 changes: 2 additions & 1 deletion system/lib/wasmfs/thread_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class ProxyWorker {
// progress even if the main thread is blocked.
//
// TODO: Remove this once we can postMessage directly between workers
// without involving the main thread.
// without involving the main thread or once all browsers ship
// Atomics.waitAsync.
//
// Note that this requires adding _emscripten_proxy_execute_queue to
// EXPORTED_FUNCTIONS.
Expand Down
2 changes: 1 addition & 1 deletion test/other/metadce/test_metadce_minimal_pthreads.exports
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ B
C
D
E
o
F
p
q
r
Expand Down
1 change: 1 addition & 0 deletions test/other/metadce/test_metadce_minimal_pthreads.imports
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ a.k
a.l
a.m
a.n
a.o
2 changes: 1 addition & 1 deletion test/other/metadce/test_metadce_minimal_pthreads.jssize
Original file line number Diff line number Diff line change
@@ -1 +1 @@
15320
15563
1 change: 1 addition & 0 deletions test/other/metadce/test_metadce_minimal_pthreads.sent
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ k
l
m
n
o
2 changes: 1 addition & 1 deletion test/other/metadce/test_metadce_minimal_pthreads.size
Original file line number Diff line number Diff line change
@@ -1 +1 @@
18922
18954
21 changes: 6 additions & 15 deletions test/pthread/test_pthread_proxying_refcount.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,6 @@ void* execute_and_free_queue(void* arg) {
em_proxying_queue_destroy(queues[i]);
}

// Wrap the normal worker event listener so that we can determine when our
// proxying events have been received and handled.
EM_ASM({
var oldOnMessage = onmessage;
onmessage = (e) => {
oldOnMessage(e);
if (e.data.cmd == 'checkMailbox') {
_register_processed();
}
};
});

// Exit with a live runtime so the queued work notification is received and we
// try to execute the queue again, even though we already executed all its
// work and we are now just waiting for the notifications to be received so we
Expand Down Expand Up @@ -90,9 +78,12 @@ int main() {
while (!executed[0] || !executed[1]) {
}

// Wait for the postMessage notification to be received.
while (processed < 1) {
}
// Wait a bit (20 ms) for the notification to be received.
struct timespec time = {
.tv_sec = 0,
.tv_nsec = 20 * 1000 * 1000,
};
nanosleep(&time, NULL);

#ifndef SANITIZER
// Our zombies should not have been freed yet.
Expand Down
5 changes: 3 additions & 2 deletions test/reference_struct_info.json
Original file line number Diff line number Diff line change
Expand Up @@ -1351,10 +1351,11 @@
"p_proto": 8
},
"pthread": {
"__size__": 128,
"__size__": 132,
"profilerBlock": 112,
"stack": 52,
"stack_size": 56
"stack_size": 56,
"waiting_async": 128
},
"pthread_attr_t": {
"__size__": 44,
Expand Down

0 comments on commit 1202f72

Please sign in to comment.