Skip to content

Commit

Permalink
feat: context aware native module
Browse files Browse the repository at this point in the history
PR-URL: #148
Reviewed-BY: hyj1991 <[email protected]>
  • Loading branch information
legendecas authored and hyj1991 committed Mar 19, 2022
1 parent c114f45 commit 4a5ed5f
Show file tree
Hide file tree
Showing 16 changed files with 187 additions and 18 deletions.
15 changes: 15 additions & 0 deletions lib/worker_threads.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict';

let isMainThread = true;
let threadId = 0;
try {
const workerThreads = require('worker_threads');
isMainThread = workerThreads.isMainThread;
threadId = workerThreads.threadId;
/** Node.js v8.x compat: remove the unused catch-binding */
} catch (_) { /** ignore */ }

module.exports = {
isMainThread,
threadId,
};
2 changes: 2 additions & 0 deletions src/commands/listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

namespace xprofiler {
using Nan::False;
using Nan::FunctionCallbackInfo;
using Nan::ThrowTypeError;
using Nan::True;
using v8::Value;

static uv_thread_t uv_commands_listener_thread;

Expand Down
39 changes: 34 additions & 5 deletions src/environment_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@

#include "logbypass/log.h"
#include "process_data.h"
#include "util.h"
#include "util-inl.h"
#include "xpf_node.h"
#include "xpf_v8.h"

namespace xprofiler {
using v8::Context;
using v8::Isolate;
using v8::Local;
using v8::Number;
using v8::Object;

// static
EnvironmentData* EnvironmentData::GetCurrent() {
Expand Down Expand Up @@ -38,9 +42,6 @@ void EnvironmentData::Create(v8::Isolate* isolate) {
EnvironmentRegistry* registry = ProcessData::Get()->environment_registry();
EnvironmentRegistry::NoExitScope no_exit(registry);

// TODO(legendecas): context awareness support.
CHECK_EQ(registry->begin(), registry->end());

HandleScope scope(isolate);
uv_loop_t* loop = node::GetCurrentEventLoop(isolate);
CHECK_NOT_NULL(loop);
Expand All @@ -63,7 +64,20 @@ void EnvironmentData::AtExit(void* arg) {
Isolate* isolate = reinterpret_cast<Isolate*>(arg);
EnvironmentRegistry* registry = ProcessData::Get()->environment_registry();
EnvironmentRegistry::NoExitScope scope(registry);
registry->Unregister(isolate);
std::unique_ptr<EnvironmentData> env_data = registry->Unregister(isolate);
uv_close(reinterpret_cast<uv_handle_t*>(&env_data->interrupt_async_),
nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&env_data->statistics_async_),
CloseCallback);
env_data.release();
}

// static
void EnvironmentData::CloseCallback(uv_handle_t* handle) {
EnvironmentData* env_data =
ContainerOf(&EnvironmentData::statistics_async_,
reinterpret_cast<uv_async_t*>(handle));
delete env_data;
}

void EnvironmentData::SendCollectStatistics() {
Expand Down Expand Up @@ -116,4 +130,19 @@ void EnvironmentData::CollectStatistics(uv_async_t* handle) {
CollectLibuvHandleStatistics(env_data);
}

// javascript accessible
void JsSetupEnvironmentData(const Nan::FunctionCallbackInfo<v8::Value>& info) {
Isolate* isolate = info.GetIsolate();
EnvironmentData* env_data = EnvironmentData::GetCurrent(isolate);
HandleScope scope(isolate);
Local<Context> context = isolate->GetCurrentContext();

Local<Object> data = info[0].As<Object>();
Local<Number> thread_id =
data->Get(context, OneByteString(isolate, "threadId"))
.ToLocalChecked()
.As<Number>();
env_data->set_thread_id(thread_id->Value());
}

} // namespace xprofiler
10 changes: 10 additions & 0 deletions src/environment_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class EnvironmentData {

inline v8::Isolate* isolate() { return isolate_; }
inline uv_loop_t* loop() { return loop_; }
inline double thread_id() { return thread_id_; }
inline void set_thread_id(double thread_id) { thread_id_ = thread_id; }

inline GcStatistics* gc_statistics() { return &gc_statistics_; }
inline HttpStatistics* http_statistics() { return &http_statistics_; }
Expand All @@ -50,6 +52,7 @@ class EnvironmentData {

private:
static void AtExit(void* arg);
static void CloseCallback(uv_handle_t* handle);
static void InterruptBusyCallback(v8::Isolate* isolate, void* data);
static void InterruptIdleCallback(uv_async_t* handle);

Expand All @@ -59,6 +62,10 @@ class EnvironmentData {
v8::Isolate* isolate_;
uv_loop_t* loop_;
uv_async_t statistics_async_;
/* We don't have a native method to get the uint64_t thread id.
* Use the JavaScript number representation.
*/
double thread_id_ = -1;

Mutex interrupt_mutex_;
std::list<InterruptCallback> interrupt_requests_;
Expand All @@ -70,6 +77,9 @@ class EnvironmentData {
UvHandleStatistics uv_handle_statistics_;
};

// javascript accessible
void JsSetupEnvironmentData(const Nan::FunctionCallbackInfo<v8::Value>& info);

} // namespace xprofiler

#endif /* XPROFILER_SRC_ENVIRONMENT_DATA_H */
10 changes: 7 additions & 3 deletions src/environment_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ void EnvironmentRegistry::Register(v8::Isolate* isolate,
map_.emplace(isolate, std::move(env));
}

void EnvironmentRegistry::Unregister(v8::Isolate* isolate) {
std::unique_ptr<EnvironmentData> EnvironmentRegistry::Unregister(
v8::Isolate* isolate) {
CHECK(disallow_exit_);
CHECK_NE(map_.find(isolate), map_.end());
map_.erase(isolate);
auto it = map_.find(isolate);
CHECK_NE(it, map_.end());
std::unique_ptr<EnvironmentData> env_data = std::move(it->second);
map_.erase(it);
return env_data;
}

EnvironmentData* EnvironmentRegistry::Get(v8::Isolate* isolate) {
Expand Down
2 changes: 1 addition & 1 deletion src/environment_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class EnvironmentRegistry {
EnvironmentRegistry(const EnvironmentRegistry& other) = delete;

void Register(v8::Isolate* isolate, std::unique_ptr<EnvironmentData> env);
void Unregister(v8::Isolate* isolate);
std::unique_ptr<EnvironmentData> Unregister(v8::Isolate* isolate);
EnvironmentData* Get(v8::Isolate* isolate);

Iterator begin();
Expand Down
3 changes: 3 additions & 0 deletions src/hooks/set_hooks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include "fatal_error.h"

namespace xprofiler {
using Nan::FunctionCallbackInfo;
using v8::Value;

void SetHooks(const FunctionCallbackInfo<Value>& info) {
// set fatal error hook
if (GetEnableFatalErrorHook()) {
Expand Down
3 changes: 3 additions & 0 deletions src/logbypass/http.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
#include "uv.h"

namespace xprofiler {
using Nan::FunctionCallbackInfo;
using v8::Value;

constexpr char module_type[] = "http";

void AddLiveRequest(const FunctionCallbackInfo<Value>& info) {
Expand Down
2 changes: 2 additions & 0 deletions src/logbypass/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

namespace xprofiler {
using Nan::False;
using Nan::FunctionCallbackInfo;
using Nan::ThrowTypeError;
using Nan::True;
using v8::Value;

void LogByPass::ThreadEntry(uv_loop_t* loop) {
CHECK_EQ(0, uv_timer_init(loop, &cpu_interval_));
Expand Down
2 changes: 2 additions & 0 deletions src/logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "util.h"

namespace xprofiler {
using Nan::FunctionCallbackInfo;
using Nan::New;
using Nan::ThrowTypeError;
using Nan::To;
Expand All @@ -21,6 +22,7 @@ using std::string;
using std::to_string;
using v8::Local;
using v8::String;
using v8::Value;

#define LOG_WITH_LEVEL(level) \
va_list args; \
Expand Down
9 changes: 3 additions & 6 deletions src/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
#include "nan.h"

namespace xprofiler {
using Nan::FunctionCallbackInfo;
using v8::Value;

// xprofiler logger
enum LOG_LEVEL { LOG_INFO, LOG_ERROR, LOG_DEBUG };
enum LOG_TYPE { LOG_TO_FILE, LOG_TO_TTL };
Expand All @@ -20,9 +17,9 @@ void Error(const char* log_type, const char* format, ...);
void Debug(const char* log_type, const char* format, ...);

// javascript accessible
void JsInfo(const FunctionCallbackInfo<Value>& info);
void JsError(const FunctionCallbackInfo<Value>& info);
void JsDebug(const FunctionCallbackInfo<Value>& info);
void JsInfo(const Nan::FunctionCallbackInfo<v8::Value>& info);
void JsError(const Nan::FunctionCallbackInfo<v8::Value>& info);
void JsDebug(const Nan::FunctionCallbackInfo<v8::Value>& info);
} // namespace xprofiler

#endif /* XPROFILER_SRC_LOGGER_H */
5 changes: 3 additions & 2 deletions src/xprofiler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ NAN_MODULE_INIT(Initialize) {
CREATE_JS_BINDING(error, JsError);
CREATE_JS_BINDING(debug, JsDebug);

CREATE_JS_BINDING(setup, JsSetupEnvironmentData);

// performance log
CREATE_JS_BINDING(runLogBypass, RunLogBypass);

Expand All @@ -59,6 +61,5 @@ NAN_MODULE_INIT(Initialize) {
CREATE_JS_BINDING(addHttpStatusCode, AddHttpStatusCode);
}

// TODO(legendecas): declare context aware when ready.
NODE_MODULE(xprofiler, Initialize)
NODE_MODULE_CONTEXT_AWARE(xprofiler, Initialize)
} // namespace xprofiler
65 changes: 64 additions & 1 deletion test/fixtures/utils.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';

const cp = require('child_process');
const fs = require('fs');
const path = require('path');
const pack = require('../../package.json');
Expand Down Expand Up @@ -70,4 +71,66 @@ exports.checkChildProcessExitInfo = function (expect, exitInfo) {
// One of the code | signal will always be non-null.
expect(code === 0).to.be.ok();
expect(signal === null).to.be.ok();
};
};

function createDeferred() {
let resolve, reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
return {
promise,
resolve,
reject,
};
}
exports.createDeferred = createDeferred;

/** Node.js v8.x compat for events.once */
exports.once = function once(eventemitter, event) {
const deferred = createDeferred();
let uninstallListeners;
const listener = (...args) => {
deferred.resolve(args);
uninstallListeners();
};
const errorListener = (err) => {
deferred.reject(err);
uninstallListeners();
};
uninstallListeners = () => {
eventemitter.removeListener(event, listener);
eventemitter.removeListener('error', errorListener);
};
eventemitter.on(event, listener);
eventemitter.on('error', errorListener);
return deferred.promise;
};

exports.fork = function fork(filepath, options = {}) {
const proc = cp.fork(filepath, Object.assign({
stdio: ['ignore', 'pipe', 'pipe', 'ipc'],
}, options));
proc.stdout.setEncoding('utf8');
proc.stderr.setEncoding('utf8');

let stdout = '';
let stderr = '';
proc.stdout.on('data', chunk => {
stdout += chunk;
});
proc.stderr.on('data', chunk => {
stderr += chunk;
});

proc.on('exit', (code, signal) => {
if (code !== 0) {
console.log('process exited with non-zero code: pid(%d), code(%d), signal(%d)', proc.pid, code, signal);
console.log('stdout:\n', stdout);
console.log('');
console.log('stderr:\n', stderr);
}
});
return proc;
};
17 changes: 17 additions & 0 deletions test/fixtures/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict';

if (Number.parseInt(process.versions.node.split('.')[0], 10) <= 10) {
process.exit(0);
}

const workerThreads = require('worker_threads');
require('../../xprofiler');

if (workerThreads.isMainThread) {
const w = new workerThreads.Worker(__filename);
w.on('exit', code => {
console.log('worker exited', code);
});
} else {
setTimeout(() => {}, 1000);
}
16 changes: 16 additions & 0 deletions test/worker_threads.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
'use strict';

const assert = require('assert');
const path = require('path');
const { once, fork } = require('./fixtures/utils');

describe('worker_threads', () => {
describe('load', () => {
it('should load xprofiler and exit cleanly', async () => {
const proc = fork(path.join(__dirname, 'fixtures/worker.js'));
const [code, signal] = await once(proc, 'exit');
assert.strictEqual(code, 0);
assert.strictEqual(signal, null);
});
});
});
5 changes: 5 additions & 0 deletions xprofiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ const configure = require('./lib/configure');
const configList = require('./xprofiler.json');
const moment = require('moment');
const pkg = require('./package.json');
const workerThreads = require('./lib/worker_threads');

// xprofiler.node
const binary = require('@xprofiler/node-pre-gyp');
const bindingPath = binary.find(path.resolve(path.join(__dirname, './package.json')));
const xprofiler = require(bindingPath);
xprofiler.setup({
isMainThread: workerThreads.isMainThread,
threadId: workerThreads.threadId,
});

const runOnceStatus = {
bypassLogThreadStarted: false,
Expand Down

0 comments on commit 4a5ed5f

Please sign in to comment.