Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proxying] Use Atomics.waitAsync to receive proxying notifications #18861

Merged
merged 4 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions emcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1710,6 +1710,7 @@ def setup_pthreads(target):
'__emscripten_thread_init',
'__emscripten_thread_exit',
'__emscripten_thread_crashed',
'__emscripten_thread_mailbox_await',
'__emscripten_tls_init',
'_pthread_self',
'checkMailbox',
Expand Down
34 changes: 28 additions & 6 deletions src/library_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -1219,18 +1219,40 @@ var LibraryPThread = {
},
#endif // MAIN_MODULE

$checkMailbox__deps: ['$callUserCallback'],
$checkMailbox__deps: ['$callUserCallback',
'_emscripten_thread_mailbox_await'],
$checkMailbox: function() {
// Only check the mailbox if we have a live pthread runtime. We implement
// pthread_self to return 0 if there is no live runtime.
if (_pthread_self()) {
var pthread_ptr = _pthread_self();
if (pthread_ptr) {
// If we are using Atomics.waitAsync as our notification mechanism, wait for
// a notification before processing the mailbox to avoid missing any work.
__emscripten_thread_mailbox_await(pthread_ptr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems cyclical, since emscripten_thread_mailbox_await then triggers call to checkMailbox, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but only as a callback to a promise, so this doesn't cause an infinite recursion. This is how we set up for the next notification when we receive a first notification.

callUserCallback(() => __emscripten_check_mailbox());
}
},

_emscripten_notify_mailbox__deps: ['$checkMailbox'],
_emscripten_notify_mailbox__sig: 'vppp',
_emscripten_notify_mailbox: function(targetThreadId, currThreadId, mainThreadId) {
_emscripten_thread_mailbox_await__deps: ['$checkMailbox'],
_emscripten_thread_mailbox_await__sig: 'vp',
_emscripten_thread_mailbox_await: function(pthread_ptr) {
if (typeof Atomics.waitAsync === 'function') {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an assert instead? Are you supposed to be able to call this function at all on browsers that don't support this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we unconditionally call this function and handle the case where Atomics.waitAsync is not supported internally in this function. Handling that case in the caller would require determining whether Atomics.waitAsync is supported from inside the Wasm module, which would either require an additional import or a new global, so this way seems simpler.

// TODO: How to make this work with wasm64?
var wait = Atomics.waitAsync(HEAP32, pthread_ptr >> 2, pthread_ptr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the waiting is don't on the pthread pointer itself? (not some member?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It doesn't really matter what address we wait on as long as we know what its value will be when we execute Atomics.waitAsync. The first member of the pthread struct is conveniently a pointer to itself, so we know what value it will have when we wait on it.

#if ASSERTIONS
assert(wait.async);
#endif
wait.value.then(checkMailbox);
var waitingAsync = pthread_ptr + {{{ C_STRUCTS.pthread.waiting_async }}};
Atomics.store(HEAP32, waitingAsync >> 2, 1);
}
},

_emscripten_notify_mailbox_postmessage__deps: ['$checkMailbox'],
_emscripten_notify_mailbox_postmessage__sig: 'vppp',
_emscripten_notify_mailbox_postmessage: function(targetThreadId,
currThreadId,
mainThreadId) {
if (targetThreadId == currThreadId) {
setTimeout(() => checkMailbox());
} else if (ENVIRONMENT_IS_PTHREAD) {
Expand All @@ -1241,7 +1263,7 @@ var LibraryPThread = {
#if ASSERTIONS
err('Cannot send message to thread with ID ' + targetThreadId + ', unknown thread ID!');
#endif
return /*0*/;
return;
}
worker.postMessage({'cmd' : 'checkMailbox'});
}
Expand Down
3 changes: 2 additions & 1 deletion src/struct_info_internal.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"pthread": [
"profilerBlock",
"stack",
"stack_size"
"stack_size",
"waiting_async"
],
"pthread_attr_t#": [
"_a_transferredcanvases"
Expand Down
4 changes: 4 additions & 0 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ function handleMessage(e) {
// Pass the thread address to wasm to store it for fast access.
Module['__emscripten_thread_init'](e.data.pthread_ptr, /*isMainBrowserThread=*/0, /*isMainRuntimeThread=*/0, /*canBlock=*/1);

// Await mailbox notifications with `Atomics.waitAsync` so we can start
// using the fast `Atomics.notify` notification path.
Module['__emscripten_thread_mailbox_await'](e.data.pthread_ptr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just make this function always operate on the calling thread? Saved a couple of bytes here but not passing this argument?

Oh I see that this is implemented in JS which doesn't have easy direct access to the self pointer so maybe this makes sense actually.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this argument avoids having to call back into Wasm just to get the current pthread pointer.


#if ASSERTIONS
assert(e.data.pthread_ptr);
#endif
Expand Down
4 changes: 4 additions & 0 deletions system/lib/libc/musl/src/internal/pthread_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ struct pthread {
// wait until it reaches 0, at which point the mailbox is considered
// closed and no further messages will be enqueued.
_Atomic int mailbox_refcount;
// Whether the thread has executed a `waitAsync` on this pthread struct
// and can be notified of new mailbox messages via `Atomics.notify`.
// Otherwise the notification has to fall back to the postMessage path.
_Atomic int waiting_async;
#endif
#if _REENTRANT
_Atomic char sleeping;
Expand Down
1 change: 1 addition & 0 deletions system/lib/pthread/library_pthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -597,4 +597,5 @@ void __emscripten_init_main_thread(void) {
__main_pthread.tsd = (void **)__pthread_tsd_main;

_emscripten_thread_mailbox_init(&__main_pthread);
_emscripten_thread_mailbox_await(&__main_pthread);
}
16 changes: 10 additions & 6 deletions system/lib/pthread/thread_mailbox.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ void _emscripten_thread_mailbox_shutdown(pthread_t thread) {
void _emscripten_thread_mailbox_init(pthread_t thread) {
thread->mailbox = em_task_queue_create(thread);
thread->mailbox_refcount = 1;
thread->waiting_async = 0;
}

// Exported for use in worker.js, but otherwise an internal function.
Expand All @@ -92,9 +93,9 @@ void _emscripten_check_mailbox() {
// Send a postMessage notification telling the target thread to check its
// mailbox when it returns to its event loop. Pass in the current thread and
// main thread ids to minimize calls back into Wasm.
void _emscripten_notify_mailbox(pthread_t target_thread,
pthread_t curr_thread,
pthread_t main_thread);
void _emscripten_notify_mailbox_postmessage(pthread_t target_thread,
pthread_t curr_thread,
pthread_t main_thread);

void emscripten_thread_mailbox_send(pthread_t thread, task t) {
assert(thread->mailbox_refcount > 0);
Expand All @@ -112,8 +113,11 @@ void emscripten_thread_mailbox_send(pthread_t thread, task t) {
notification_state previous =
atomic_exchange(&thread->mailbox->notification, NOTIFICATION_PENDING);
if (previous != NOTIFICATION_PENDING) {
_emscripten_notify_mailbox(thread,
pthread_self(),
emscripten_main_runtime_thread_id());
if (thread->waiting_async) {
__builtin_wasm_memory_atomic_notify((int*)thread, -1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess this system doesn't actually write anything to the thread pointer here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see so we keep waiting in a loop for the next thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. At no point does the first member of the pthread struct hold anything other than a pointer to itself.

} else {
_emscripten_notify_mailbox_postmessage(
thread, pthread_self(), emscripten_main_runtime_thread_id());
}
}
}
2 changes: 2 additions & 0 deletions system/lib/pthread/thread_mailbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@ void emscripten_thread_mailbox_send(pthread_t thread, task t);
// Initialize the mailbox on a pthread struct. Called during `pthread_create`.
void _emscripten_thread_mailbox_init(pthread_t thread);

void _emscripten_thread_mailbox_await(pthread_t thread);

// Close the mailbox and cancel any pending messages.
void _emscripten_thread_mailbox_shutdown(pthread_t thread);
3 changes: 2 additions & 1 deletion system/lib/wasmfs/thread_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class ProxyWorker {
// progress even if the main thread is blocked.
//
// TODO: Remove this once we can postMessage directly between workers
// without involving the main thread.
// without involving the main thread or once all browsers ship
// Atomics.waitAsync.
//
// Note that this requires adding _emscripten_proxy_execute_queue to
// EXPORTED_FUNCTIONS.
Expand Down
2 changes: 1 addition & 1 deletion test/other/metadce/test_metadce_minimal_pthreads.exports
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ B
C
D
E
o
F
p
q
r
Expand Down
1 change: 1 addition & 0 deletions test/other/metadce/test_metadce_minimal_pthreads.imports
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ a.k
a.l
a.m
a.n
a.o
2 changes: 1 addition & 1 deletion test/other/metadce/test_metadce_minimal_pthreads.jssize
Original file line number Diff line number Diff line change
@@ -1 +1 @@
15320
15563
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumable these losses could be more than regained if/when we stop including the old postMessage mechanism?

I guess we should add an option for that at some point? Perhaps doing it automatically on browser version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now it's possible to proxy a message to a worker before it has called Atomics.waitAsync to start listening for notifications, so we still have to use the old code path occasionally even on browsers that support Atomics.waitAsync. We could fix that in principle, but it would require waiting for the new thread to execute Atomics.waitAsync before returning from pthread_create, so it would at least moving thread lifetime management off the main thread.

1 change: 1 addition & 0 deletions test/other/metadce/test_metadce_minimal_pthreads.sent
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ k
l
m
n
o
2 changes: 1 addition & 1 deletion test/other/metadce/test_metadce_minimal_pthreads.size
Original file line number Diff line number Diff line change
@@ -1 +1 @@
18922
18954
21 changes: 6 additions & 15 deletions test/pthread/test_pthread_proxying_refcount.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,6 @@ void* execute_and_free_queue(void* arg) {
em_proxying_queue_destroy(queues[i]);
}

// Wrap the normal worker event listener so that we can determine when our
// proxying events have been received and handled.
EM_ASM({
var oldOnMessage = onmessage;
onmessage = (e) => {
oldOnMessage(e);
if (e.data.cmd == 'checkMailbox') {
_register_processed();
}
};
});

// Exit with a live runtime so the queued work notification is received and we
// try to execute the queue again, even though we already executed all its
// work and we are now just waiting for the notifications to be received so we
Expand Down Expand Up @@ -90,9 +78,12 @@ int main() {
while (!executed[0] || !executed[1]) {
}

// Wait for the postMessage notification to be received.
while (processed < 1) {
}
// Wait a bit (20 ms) for the notification to be received.
struct timespec time = {
.tv_sec = 0,
.tv_nsec = 20 * 1000 * 1000,
};
nanosleep(&time, NULL);

#ifndef SANITIZER
// Our zombies should not have been freed yet.
Expand Down
5 changes: 3 additions & 2 deletions test/reference_struct_info.json
Original file line number Diff line number Diff line change
Expand Up @@ -1351,10 +1351,11 @@
"p_proto": 8
},
"pthread": {
"__size__": 128,
"__size__": 132,
"profilerBlock": 112,
"stack": 52,
"stack_size": 56
"stack_size": 56,
"waiting_async": 128
},
"pthread_attr_t": {
"__size__": 44,
Expand Down