Skip to content

Commit

Permalink
async_hooks: add getActiveResources (prototype)
Browse files Browse the repository at this point in the history
Prototype of a new getActiveResources() API for async_hooks.

Returns an object map of { <id>: <resource> }, and differs slightly from getActiveHandles().

Currently, it works for all async_hooks resources except nextTick-s, which are probably not useful to expose in this way.
  • Loading branch information
Fishrock123 committed Jun 22, 2018
1 parent 680aeb3 commit 148ee61
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 95 deletions.
31 changes: 31 additions & 0 deletions lib/async_hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const {
ERR_INVALID_ASYNC_ID
} = require('internal/errors').codes;
const internal_async_hooks = require('internal/async_hooks');
const internalTimers = require('internal/timers');

// Get functions
// For userland AsyncResources, make sure to emit a destroy event when the
Expand Down Expand Up @@ -123,6 +124,35 @@ function createHook(fns) {
}


function getActiveResources() {
const handles = process._getActiveHandles();
const reqs = process._getActiveRequests();

const timers = {};
for (const list of Object.values(internalTimers.timerLists)) {
var timer = list._idlePrev === list ? null : list._idlePrev;

while (timer !== null) {
timers[timer[internalTimers.async_id_symbol]] = timer;

timer = timer._idlePrev === list ? null : list._idlePrev;
}
}

const immediates = {};
const queue = internalTimers.outstandingQueue.head !== null ?
internalTimers.outstandingQueue : internalTimers.immediateQueue;
var immediate = queue.head;
while (immediate !== null) {
immediates[immediate[internalTimers.async_id_symbol]] = immediate;

immediate = immediate._idleNext;
}

return Object.assign({}, handles, reqs, timers, immediates);
}


// Embedder API //

const destroyedSymbol = Symbol('destroyed');
Expand Down Expand Up @@ -216,6 +246,7 @@ module.exports = {
createHook,
executionAsyncId,
triggerAsyncId,
getActiveResources,
// Embedder API
AsyncResource,
};
58 changes: 57 additions & 1 deletion lib/internal/timers.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,59 @@ const TIMEOUT_MAX = 2 ** 31 - 1;

const kRefed = Symbol('refed');

// Object map containing linked lists of timers, keyed and sorted by their
// duration in milliseconds.
//
// - key = time in milliseconds
// - value = linked list
const timerLists = Object.create(null);

// A linked list for storing `setImmediate()` requests
function ImmediateList() {
this.head = null;
this.tail = null;
}

// Appends an item to the end of the linked list, adjusting the current tail's
// previous and next pointers where applicable
ImmediateList.prototype.append = function(item) {
if (this.tail !== null) {
this.tail._idleNext = item;
item._idlePrev = this.tail;
} else {
this.head = item;
}
this.tail = item;
};

// Removes an item from the linked list, adjusting the pointers of adjacent
// items and the linked list's head or tail pointers as necessary
ImmediateList.prototype.remove = function(item) {
if (item._idleNext !== null) {
item._idleNext._idlePrev = item._idlePrev;
}

if (item._idlePrev !== null) {
item._idlePrev._idleNext = item._idleNext;
}

if (item === this.head)
this.head = item._idleNext;
if (item === this.tail)
this.tail = item._idlePrev;

item._idleNext = null;
item._idlePrev = null;
};

// Create a single linked list instance only once at startup
const immediateQueue = new ImmediateList();

// If an uncaught exception was thrown during execution of immediateQueue,
// this queue will store all remaining Immediates that need to run upon
// resolution of all error handling (if process is still alive).
const outstandingQueue = new ImmediateList();

module.exports = {
TIMEOUT_MAX,
kTimeout: Symbol('timeout'), // For hiding Timeouts on other internals.
Expand All @@ -30,7 +83,10 @@ module.exports = {
kRefed,
initAsyncResource,
setUnrefTimeout,
validateTimerDuration
validateTimerDuration,
timerLists,
immediateQueue,
outstandingQueue
};

var timers;
Expand Down
60 changes: 4 additions & 56 deletions lib/timers.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ const {
Timeout,
kRefed,
initAsyncResource,
validateTimerDuration
validateTimerDuration,
timerLists: lists,
immediateQueue,
outstandingQueue
} = require('internal/timers');
const internalUtil = require('internal/util');
const { createPromise, promiseResolve } = process.binding('util');
Expand Down Expand Up @@ -128,14 +131,6 @@ const [immediateInfo, toggleImmediateRef] =
// timers within (or creation of a new list). However, these operations combined
// have shown to be trivial in comparison to other timers architectures.


// Object map containing linked lists of timers, keyed and sorted by their
// duration in milliseconds.
//
// - key = time in milliseconds
// - value = linked list
const lists = Object.create(null);

// This is a priority queue with a custom sorting function that first compares
// the expiry times of two lists and if they're the same then compares their
// individual IDs to determine which list was created first.
Expand Down Expand Up @@ -557,53 +552,6 @@ Timeout.prototype.close = function() {
};


// A linked list for storing `setImmediate()` requests
function ImmediateList() {
this.head = null;
this.tail = null;
}

// Appends an item to the end of the linked list, adjusting the current tail's
// previous and next pointers where applicable
ImmediateList.prototype.append = function(item) {
if (this.tail !== null) {
this.tail._idleNext = item;
item._idlePrev = this.tail;
} else {
this.head = item;
}
this.tail = item;
};

// Removes an item from the linked list, adjusting the pointers of adjacent
// items and the linked list's head or tail pointers as necessary
ImmediateList.prototype.remove = function(item) {
if (item._idleNext !== null) {
item._idleNext._idlePrev = item._idlePrev;
}

if (item._idlePrev !== null) {
item._idlePrev._idleNext = item._idleNext;
}

if (item === this.head)
this.head = item._idleNext;
if (item === this.tail)
this.tail = item._idlePrev;

item._idleNext = null;
item._idlePrev = null;
};

// Create a single linked list instance only once at startup
const immediateQueue = new ImmediateList();

// If an uncaught exception was thrown during execution of immediateQueue,
// this queue will store all remaining Immediates that need to run upon
// resolution of all error handling (if process is still alive).
const outstandingQueue = new ImmediateList();


function processImmediate() {
const queue = outstandingQueue.head !== null ?
outstandingQueue : immediateQueue;
Expand Down
42 changes: 12 additions & 30 deletions src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1016,27 +1016,19 @@ static MaybeLocal<Value> ExecuteString(Environment* env,
static void GetActiveRequests(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);

Local<Array> ary = Array::New(args.GetIsolate());
Local<Context> ctx = env->context();
Local<Function> fn = env->push_values_to_array_function();
Local<Value> argv[NODE_PUSH_VAL_TO_ARRAY_MAX];
size_t idx = 0;
Local<Object> return_obj = Object::New(args.GetIsolate());

for (auto w : *env->req_wrap_queue()) {
if (w->persistent().IsEmpty())
continue;
argv[idx] = w->object();
if (++idx >= arraysize(argv)) {
fn->Call(ctx, ary, idx, argv).ToLocalChecked();
idx = 0;
}
}
double async_id = w->get_async_id();
Local<Object> req_obj = w->object();

if (idx > 0) {
fn->Call(ctx, ary, idx, argv).ToLocalChecked();
return_obj->Set(ctx, Number::New(args.GetIsolate(), async_id), req_obj);
}

args.GetReturnValue().Set(ary);
args.GetReturnValue().Set(return_obj);
}


Expand All @@ -1045,32 +1037,22 @@ static void GetActiveRequests(const FunctionCallbackInfo<Value>& args) {
void GetActiveHandles(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);

Local<Array> ary = Array::New(env->isolate());
Local<Context> ctx = env->context();
Local<Function> fn = env->push_values_to_array_function();
Local<Value> argv[NODE_PUSH_VAL_TO_ARRAY_MAX];
size_t idx = 0;
Local<Object> return_obj = Object::New(args.GetIsolate());

Local<String> owner_sym = env->owner_string();

for (auto w : *env->handle_wrap_queue()) {
if (w->persistent().IsEmpty() || !HandleWrap::HasRef(w))
continue;
Local<Object> object = w->object();
Local<Value> owner = object->Get(owner_sym);
if (owner->IsUndefined())
owner = object;
argv[idx] = owner;
if (++idx >= arraysize(argv)) {
fn->Call(ctx, ary, idx, argv).ToLocalChecked();
idx = 0;
}
}
if (idx > 0) {
fn->Call(ctx, ary, idx, argv).ToLocalChecked();
double async_id = w->get_async_id();
Local<Object> handle_object = w->object();
return_obj->Set(ctx, Number::New(args.GetIsolate(),
async_id),
handle_object);
}

args.GetReturnValue().Set(ary);
args.GetReturnValue().Set(return_obj);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
require('../common');
const assert = require('assert');
const fs = require('fs');
const { getActiveResources } = require('async_hooks');

for (let i = 0; i < 12; i++)
fs.open(__filename, 'r', () => {});

assert.strictEqual(12, process._getActiveRequests().length);
assert.strictEqual(12, Object.values(getActiveResources()).length);
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
require('../common');
const assert = require('assert');
const net = require('net');
const { getActiveResources } = require('async_hooks');

const NUM = 8;
const connections = [];
const clients = [];
Expand Down Expand Up @@ -30,18 +32,18 @@ function clientConnected(client) {


function checkAll() {
const handles = process._getActiveHandles();
const handles = Object.values(getActiveResources());

clients.forEach(function(item) {
assert.ok(handles.includes(item));
assert.ok(handles.includes(item._handle));
item.destroy();
});

connections.forEach(function(item) {
assert.ok(handles.includes(item));
assert.ok(handles.includes(item._handle));
item.end();
});

assert.ok(handles.includes(server));
assert.ok(handles.includes(server._handle));
server.close();
}
5 changes: 3 additions & 2 deletions test/parallel/test-handle-wrap-isrefed.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const common = require('../common');
const strictEqual = require('assert').strictEqual;
const { getActiveResources } = require('async_hooks');

// child_process
{
Expand Down Expand Up @@ -113,10 +114,10 @@ const dgram = require('dgram');
// timers
{
const { Timer } = process.binding('timer_wrap');
strictEqual(process._getActiveHandles().filter(
strictEqual(Object.values(getActiveResources()).filter(
(handle) => (handle instanceof Timer)).length, 0);
const timer = setTimeout(() => {}, 500);
const handles = process._getActiveHandles().filter(
const handles = Object.values(getActiveResources()).filter(
(handle) => (handle instanceof Timer));
strictEqual(handles.length, 1);
const handle = handles[0];
Expand Down
4 changes: 3 additions & 1 deletion test/pseudo-tty/ref_keeps_node_running.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ require('../common');

const { TTY, isTTY } = process.binding('tty_wrap');
const strictEqual = require('assert').strictEqual;
const { getActiveResources } = require('async_hooks');

strictEqual(isTTY(0), true, 'fd 0 is not a TTY');

Expand All @@ -12,7 +13,8 @@ handle.readStart();
handle.onread = () => {};

function isHandleActive(handle) {
return process._getActiveHandles().some((active) => active === handle);
return Object.values(getActiveResources())
.some((active) => active === handle);
}

strictEqual(isHandleActive(handle), true, 'TTY handle not initially active');
Expand Down

0 comments on commit 148ee61

Please sign in to comment.