Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use custom setImmediate to avoid conflict with Node.js immediate loop #10

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
},
"scripts": {
"lint": "eslint {src,test}/**/*.ts",
"test": "npm run lint && npm run build && nyc npm run test-only",
"test": "npm run lint && npm run build && npm run test-only",
"test-nolint": "npm run build && npm run test-only",
"test-only": "mocha --colors -r ts-node/register test/*.ts",
"test-only": "mocha --trace-uncaught --colors -r ts-node/register test/*.ts",
"build": "node-gyp rebuild && npm run compile-ts && gen-esm-wrapper . ./.esm-wrapper.mjs",
"prepack": "npm run build",
"compile-ts": "tsc -p tsconfig.json"
Expand Down
79 changes: 79 additions & 0 deletions src/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,79 @@ bool Worker::IsLoopAlive() {
return uv_loop_alive(&loop_);
}

class StandaloneImmediate {
public:
StandaloneImmediate(Local<Context> context, Local<Function> function);
~StandaloneImmediate();

private:
void OnTimeout();
static void Cleanup(void* arg, void (*cb)(void*), void* cbarg);

struct CloseCbData { void (*cb)(void*); void* cbarg; };
std::vector<CloseCbData> close_cbs_;
Isolate* isolate_;
Global<Context> context_;
Global<Function> function_;
std::unique_ptr<AsyncResource> async_resource_;
uv_check_t check_;
AsyncCleanupHookHandle async_cleanup_handle_;
};

StandaloneImmediate::StandaloneImmediate(Local<Context> context, Local<Function> function) {
Isolate* isolate = context->GetIsolate();
uv_loop_t* loop = GetCurrentEventLoop(isolate);
uv_check_init(loop, &check_);
uv_check_start(&check_, [](uv_check_t* check) {
static_cast<StandaloneImmediate*>(check->data)->OnTimeout();
});
check_.data = this;
async_cleanup_handle_ = AddEnvironmentCleanupHook(isolate, Cleanup, this);

Local<Object> resource = Object::New(isolate);
async_resource_ = std::make_unique<AsyncResource>(isolate, resource, "synchronous-worker:Immediate");
function_.Reset(isolate, function);
context_.Reset(isolate, isolate->GetCurrentContext());
isolate_ = isolate;
}

StandaloneImmediate::~StandaloneImmediate() {
if (async_cleanup_handle_)
RemoveEnvironmentCleanupHook(std::move(async_cleanup_handle_));
}

void StandaloneImmediate::OnTimeout() {
uv_check_stop(&check_);
HandleScope handle_scope(isolate_);
Context::Scope context_scope(context_.Get(isolate_));
async_resource_->MakeCallback(function_.Get(isolate_), 0, nullptr);
Cleanup(this, [](void* arg) {}, nullptr);
}

void StandaloneImmediate::Cleanup(void* arg, void (*cb)(void*), void* cbarg) {
StandaloneImmediate* self = static_cast<StandaloneImmediate*>(arg);
self->close_cbs_.emplace_back(CloseCbData { cb, cbarg });
if (self->close_cbs_.size() > 1) {
return;
}

uv_check_stop(&self->check_);
uv_close(reinterpret_cast<uv_handle_t*>(&self->check_), [](uv_handle_t* handle) {
StandaloneImmediate* self = static_cast<StandaloneImmediate*>(handle->data);
HandleScope handle_scope(self->isolate_);
Context::Scope context_scope(self->context_.Get(self->isolate_));

for (const auto& close_cb : self->close_cbs_) {
close_cb.cb(close_cb.cbarg);
}
delete self;
});
}

void StandaloneSetImmediate(const FunctionCallbackInfo<Value>& args) {
new StandaloneImmediate(args.GetIsolate()->GetCurrentContext(), args[0].As<Function>());
}

NODE_MODULE_INIT() {
Isolate* isolate = context->GetIsolate();
Local<FunctionTemplate> templ = FunctionTemplate::New(isolate, Worker::New);
Expand Down Expand Up @@ -414,6 +487,12 @@ NODE_MODULE_INIT() {
USE(exports->Set(context,
String::NewFromUtf8Literal(isolate, "SynchronousWorkerImpl"),
worker_fn));
Local<Function> set_immediate_fn;
if (!Function::New(context, StandaloneSetImmediate).ToLocal(&set_immediate_fn))
return;
USE(exports->Set(context,
String::NewFromUtf8Literal(isolate, "standaloneSetImmediate"),
set_immediate_fn));

NODE_DEFINE_CONSTANT(exports, UV_RUN_DEFAULT);
NODE_DEFINE_CONSTANT(exports, UV_RUN_ONCE);
Expand Down
5 changes: 3 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ const {
SynchronousWorkerImpl,
UV_RUN_DEFAULT,
UV_RUN_ONCE,
UV_RUN_NOWAIT
UV_RUN_NOWAIT,
standaloneSetImmediate
} = bindings('synchronous_worker');
const kHandle = Symbol('kHandle');
const kProcess = Symbol('kProcess');
Expand Down Expand Up @@ -123,7 +124,7 @@ class SynchronousWorker extends EventEmitter {
async stop(): Promise<void> {
return this[kStoppedPromise] ??= new Promise(resolve => {
this[kHandle].signalStop();
setImmediate(() => {
standaloneSetImmediate(() => {
this[kHandle].stop();
resolve();
});
Expand Down
14 changes: 14 additions & 0 deletions test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,18 @@ describe('SynchronousWorker allows running Node.js code', () => {
});
await w.stop();
});

it('properly handles immediates when FreeEnvironment() is called on a shared event loop', async() => {
const w = new SynchronousWorker({
sharedEventLoop: true,
sharedMicrotaskQueue: true
});

setImmediate(() => {
setImmediate(() => {
setImmediate(() => {});
});
});
await w.stop();
});
});