Skip to content

Commit

Permalink
Assert we drained the microtask queue since the last time we called i…
Browse files Browse the repository at this point in the history
…nto JavaScript (#8646)

* Add checks that we drain the microtask queue whenever we call into JavaScript

* Drain microtasks in more places

* Tweak assertions for entering/exiting the event loop

* Drain more

---------

Co-authored-by: Jarred Sumner <[email protected]>
  • Loading branch information
Jarred-Sumner and Jarred-Sumner authored Feb 3, 2024
1 parent 7d86f9e commit c75e768
Show file tree
Hide file tree
Showing 13 changed files with 253 additions and 80 deletions.
4 changes: 2 additions & 2 deletions src/bun.js/api/bun/dns_resolver.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1935,8 +1935,8 @@ pub const DNSResolver = struct {
poll: *Async.FilePoll,
) void {
var vm = this.vm;
defer vm.drainMicrotasks();

vm.eventLoop().enter();
defer vm.eventLoop().exit();
var channel = this.channel orelse {
_ = this.polls.orderedRemove(poll.fd.int());
poll.deinit();
Expand Down
22 changes: 1 addition & 21 deletions src/bun.js/api/bun/h2_frame_parser.zig
Original file line number Diff line number Diff line change
Expand Up @@ -433,27 +433,7 @@ const Handlers = struct {
return false;
}

const result = callback.callWithThis(this.globalObject, thisValue, data);
if (result.isAnyError()) {
this.vm.onUnhandledError(this.globalObject, result);
}

return true;
}

pub fn callErrorHandler(this: *Handlers, thisValue: JSValue, err: []const JSValue) bool {
const onError = this.onError;
if (onError == .zero) {
if (err.len > 0)
this.vm.onUnhandledError(this.globalObject, err[0]);

return false;
}

const result = onError.callWithThis(this.globalObject, thisValue, err);
if (result.isAnyError()) {
this.vm.onUnhandledError(this.globalObject, result);
}
this.vm.eventLoop().runCallback(callback, this.globalObject, thisValue, data);

return true;
}
Expand Down
6 changes: 3 additions & 3 deletions src/bun.js/api/bun/lshpack.translated.zig
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ pub const enum_lsxpack_flag = c_uint;
// /// limitation: we currently does not support total header size > 64KB.
pub const struct_lsxpack_header = extern struct {
/// the buffer for headers
buf: [*]u8 = undefined,
buf: ?[*]u8 = null,
/// hash value for name
name_hash: __uint32_t = 0,
/// hash value for name + value
Expand Down Expand Up @@ -205,11 +205,11 @@ pub fn lsxpack_header_prepare_decode(arg_hdr: *lsxpack_header_t, arg_out: [*c]u8
}

pub fn lsxpack_header_get_name(hdr: *lsxpack_header_t) []const u8 {
if (hdr.name_len != 0) return hdr.buf[@as(usize, @intCast(hdr.name_offset)) .. @as(usize, @intCast(hdr.name_offset)) + hdr.name_len];
if (hdr.name_len != 0) return hdr.buf.?[@as(usize, @intCast(hdr.name_offset)) .. @as(usize, @intCast(hdr.name_offset)) + hdr.name_len];
return "";
}
pub fn lsxpack_header_get_value(hdr: *lsxpack_header_t) []const u8 {
if (hdr.val_len != 0) return hdr.buf[@as(usize, @intCast(hdr.val_offset)) .. @as(usize, @intCast(hdr.val_offset)) + hdr.val_len];
if (hdr.val_len != 0) return hdr.buf.?[@as(usize, @intCast(hdr.val_offset)) .. @as(usize, @intCast(hdr.val_offset)) + hdr.val_len];
return "";
}
pub fn lsxpack_header_get_dec_size(hdr: ?*const lsxpack_header_t) callconv(.C) usize {
Expand Down
16 changes: 12 additions & 4 deletions src/bun.js/api/bun/socket.zig
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,14 @@ const Handlers = struct {

pub fn exit(this: *Scope, ssl: bool, wrapped: WrappedType) void {
var vm = this.handlers.vm;
defer vm.drainMicrotasks();
defer vm.eventLoop().exit();
this.handlers.markInactive(ssl, this.socket_context, wrapped);
}
};

pub fn enter(this: *Handlers, context: ?*uws.SocketContext) Scope {
this.markActive();
this.vm.eventLoop().enter();
return .{
.handlers = this,
.socket_context = context,
Expand Down Expand Up @@ -1202,7 +1203,8 @@ fn NewSocket(comptime ssl: bool) type {
const callback = handlers.onWritable;
if (callback == .zero) return;
var vm = handlers.vm;
defer vm.drainMicrotasks();
vm.eventLoop().enter();
defer vm.eventLoop().exit();

const globalObject = handlers.globalObject;
const this_value = this.getThisValue(globalObject);
Expand Down Expand Up @@ -1249,12 +1251,11 @@ fn NewSocket(comptime ssl: bool) type {
defer this.markInactive();

const handlers = this.handlers;
var vm = handlers.vm;
const vm = handlers.vm;
this.poll_ref.unrefOnNextTick(vm);

const callback = handlers.onConnectError;
const globalObject = handlers.globalObject;
defer vm.drainMicrotasks();
const err = JSC.SystemError{
.errno = errno,
.message = bun.String.static("Failed to connect"),
Expand All @@ -1266,6 +1267,10 @@ fn NewSocket(comptime ssl: bool) type {
// .code = bun.String.static(@tagName(bun.sys.getErrno(errno))),
// .code = bun.String.static(@tagName(@as(bun.C.E, @enumFromInt(errno)))),
};
vm.eventLoop().enter();
defer {
vm.eventLoop().exit();
}

if (callback == .zero) {
if (handlers.promise.trySwap()) |promise| {
Expand Down Expand Up @@ -1388,6 +1393,9 @@ fn NewSocket(comptime ssl: bool) type {
} else {
if (callback == .zero) return;
}
const vm = handlers.vm;
vm.eventLoop().enter();
defer vm.eventLoop().exit();
const result = callback.callWithThis(globalObject, this_value, &[_]JSValue{
this_value,
});
Expand Down
21 changes: 7 additions & 14 deletions src/bun.js/api/bun/subprocess.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2796,10 +2796,9 @@ pub const Subprocess = struct {
var vm = this.globalThis.bunVM();
const is_sync = this.flags.is_sync;

defer {
if (!is_sync)
vm.drainMicrotasks();
}
if (!is_sync) vm.eventLoop().enter();
defer if (!is_sync) vm.eventLoop().exit();

this.wait(false);
}

Expand Down Expand Up @@ -2965,15 +2964,12 @@ pub const Subprocess = struct {
waitpid_value,
};

const result = callback.callWithThis(
globalThis.bunVM().eventLoop().runCallback(
callback,
globalThis,
this_value,
&args,
);

if (result.isAnyError()) {
globalThis.bunVM().onUnhandledError(globalThis, result);
}
}
}

Expand Down Expand Up @@ -3386,15 +3382,12 @@ pub const Subprocess = struct {
.data => |data| {
IPC.log("Received IPC message from child", .{});
if (this.ipc_callback.get()) |cb| {
const result = cb.callWithThis(
this.globalThis.bunVM().eventLoop().runCallback(
cb,
this.globalThis,
this.this_jsvalue,
&[_]JSValue{ data, this.this_jsvalue },
);
data.ensureStillAlive();
if (result.isAnyError()) {
this.globalThis.bunVM().onUnhandledError(this.globalThis, result);
}
}
},
}
Expand Down
40 changes: 32 additions & 8 deletions src/bun.js/api/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3205,7 +3205,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (readable.ptr == .Bytes) {
std.debug.assert(this.request_body_buf.items.len == 0);
var vm = this.server.vm;
defer vm.drainMicrotasks();
vm.eventLoop().enter();
defer vm.eventLoop().exit();

if (!last) {
readable.ptr.Bytes.onData(
Expand Down Expand Up @@ -3750,6 +3751,9 @@ pub const ServerWebSocket = struct {
if (onOpenHandler.isEmptyOrUndefinedOrNull()) return;
const this_value = this.getThisValue();
var args = [_]JSValue{this_value};
const loop = globalObject.bunVM().eventLoop();
loop.enter();
defer loop.exit();

var corker = Corker{
.args = &args,
Expand Down Expand Up @@ -3810,7 +3814,9 @@ pub const ServerWebSocket = struct {
if (onMessageHandler.isEmptyOrUndefinedOrNull()) return;
var globalObject = this.handler.globalObject;
// This is the start of a task.
defer globalObject.bunVM().drainMicrotasks();
const loop = globalObject.bunVM().eventLoop();
loop.enter();
defer loop.exit();

const arguments = [_]JSValue{
this.getThisValue(),
Expand Down Expand Up @@ -3893,7 +3899,9 @@ pub const ServerWebSocket = struct {
.globalObject = globalObject,
.callback = handler.onDrain,
};

const loop = globalObject.bunVM().eventLoop();
loop.enter();
defer loop.exit();
this.websocket.cork(&corker, Corker.run);
const result = corker.result;

Expand Down Expand Up @@ -3921,7 +3929,9 @@ pub const ServerWebSocket = struct {
var globalThis = handler.globalObject;

// This is the start of a task.
defer globalThis.bunVM().drainMicrotasks();
const loop = globalThis.bunVM().eventLoop();
loop.enter();
defer loop.exit();

const result = cb.call(
globalThis,
Expand Down Expand Up @@ -3961,7 +3971,9 @@ pub const ServerWebSocket = struct {
var globalThis = handler.globalObject;

// This is the start of a task.
defer globalThis.bunVM().drainMicrotasks();
const loop = globalThis.bunVM().eventLoop();
loop.enter();
defer loop.exit();

const result = cb.call(
globalThis,
Expand Down Expand Up @@ -4004,15 +4016,19 @@ pub const ServerWebSocket = struct {

if (!handler.onClose.isEmptyOrUndefinedOrNull()) {
var str = ZigString.init(message);
const globalObject = handler.globalObject;
const loop = globalObject.bunVM().eventLoop();
loop.enter();
defer loop.exit();
str.markUTF8();
const result = handler.onClose.call(
handler.globalObject,
&[_]JSC.JSValue{ this.this_value, JSValue.jsNumber(code), str.toValueGC(handler.globalObject) },
globalObject,
&[_]JSC.JSValue{ this.this_value, JSValue.jsNumber(code), str.toValueGC(globalObject) },
);

if (result.toError()) |err| {
log("onClose error", .{});
handler.globalObject.bunVM().runErrorHandler(err, null);
globalObject.bunVM().runErrorHandler(err, null);
}
}

Expand Down Expand Up @@ -5813,6 +5829,14 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
) void {
JSC.markBinding(@src());
this.pending_requests += 1;
if (comptime Environment.isDebug) {
this.vm.eventLoop().debug.enter();
}
defer {
if (comptime Environment.isDebug) {
this.vm.eventLoop().debug.exit();
}
}

req.setYield(false);
var ctx = this.request_pool_allocator.tryGet() catch @panic("ran out of memory");
Expand Down
17 changes: 16 additions & 1 deletion src/bun.js/bindings/bindings.zig
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,6 @@ pub const ZigString = extern struct {

pub const byteSlice = Slice.slice;


pub fn fromUTF8NeverFree(input: []const u8) Slice {
return .{
.ptr = input.ptr,
Expand Down Expand Up @@ -3565,6 +3564,14 @@ pub const JSValue = enum(JSValueReprInt) {

pub fn callWithGlobalThis(this: JSValue, globalThis: *JSGlobalObject, args: []const JSC.JSValue) JSC.JSValue {
JSC.markBinding(@src());
if (comptime bun.Environment.isDebug) {
const loop = JSC.VirtualMachine.get().eventLoop();
loop.debug.js_call_count_outside_tick_queue += @as(usize, @intFromBool(!loop.debug.is_inside_tick_queue));
if (loop.debug.track_last_fn_name and !loop.debug.is_inside_tick_queue) {
loop.debug.last_fn_name.deref();
loop.debug.last_fn_name = this.getName(globalThis);
}
}
return JSC.C.JSObjectCallAsFunctionReturnValue(
globalThis,
this,
Expand All @@ -3576,6 +3583,14 @@ pub const JSValue = enum(JSValueReprInt) {

pub fn callWithThis(this: JSValue, globalThis: *JSGlobalObject, thisValue: JSC.JSValue, args: []const JSC.JSValue) JSC.JSValue {
JSC.markBinding(@src());
if (comptime bun.Environment.isDebug) {
const loop = JSC.VirtualMachine.get().eventLoop();
loop.debug.js_call_count_outside_tick_queue += @as(usize, @intFromBool(!loop.debug.is_inside_tick_queue));
if (loop.debug.track_last_fn_name and !loop.debug.is_inside_tick_queue) {
loop.debug.last_fn_name.deref();
loop.debug.last_fn_name = this.getName(globalThis);
}
}
return JSC.C.JSObjectCallAsFunctionReturnValue(
globalThis,
this,
Expand Down
Loading

0 comments on commit c75e768

Please sign in to comment.