Skip to content

Commit

Permalink
feat: add worker threads support
Browse files Browse the repository at this point in the history
  • Loading branch information
RafaelGSS committed Dec 16, 2024
1 parent d65e8aa commit 831664a
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 26 deletions.
14 changes: 14 additions & 0 deletions examples/worker-threads/node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
const { Suite } = require('../../lib');

const suite = new Suite({
useWorkers: true,
});

suite
.add('Using import without node: prefix', function () {
return import('fs');
})
.add('Using import with node: prefix', function () {
return import('node:fs');
})
.run();
22 changes: 12 additions & 10 deletions lib/clock.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const { debug, types } = require('node:util');
const { validateNumber } = require('./validators');
const { isMainThread } = require('child_process')

let debugBench = debug('benchmark', (fn) => {
debugBench = fn;
Expand Down Expand Up @@ -194,23 +195,22 @@ const AsyncFunction = async function () {
const SyncFunction = function () {
}.constructor;

function createRunner(bench, recommendedCount) {
const isAsync = types.isAsyncFunction(bench.fn);
const hasArg = bench.fn.length >= 1;

if (bench.fn.length > 1) {
process.emitWarning(`The benchmark "${ bench.name }" function should not have more than 1 argument.`);
}
function createFnString(bench) {
const { isAsync, hasArg } = bench;

const compiledFnStringFactory = hasArg ? createRunManagedBenchmark : createRunUnmanagedBenchmark;
const compiledFnString = compiledFnStringFactory(bench, isAsync ? 'await ' : '');
return compiledFnString;
}

function createRunner(bench, recommendedCount) {
const { isAsync, hasArg } = bench;
const compiledFnString = bench.fnStr;

const createFnPrototype = isAsync ? AsyncFunction : SyncFunction;
const compiledFn = createFnPrototype('bench', 'timer', 'count', 'kUnmanagedTimerResult', compiledFnString);

const selectedTimer = hasArg ? new ManagedTimer(recommendedCount) : timer;

const runner = compiledFn.bind(globalThis, bench, selectedTimer, recommendedCount, kUnmanagedTimerResult);

debugBench(`Compiled Code: ${ compiledFnString }`);
debugBench(`Created compiled benchmark, hasArg=${ hasArg }, isAsync=${ isAsync }, recommendedCount=${ recommendedCount }`);

Expand All @@ -224,6 +224,7 @@ async function clockBenchmark(bench, recommendedCount) {
result[0] = Math.max(MIN_RESOLUTION, result[0]);
for (const p of bench.plugins) {
if (typeof p.onCompleteBenchmark === 'function') {
// TODO: this won't work when useWorkers=true
p.onCompleteBenchmark(result, bench);
}
}
Expand All @@ -234,6 +235,7 @@ async function clockBenchmark(bench, recommendedCount) {

module.exports = {
clockBenchmark,
createFnString,
timer,
MIN_RESOLUTION,
debugBench,
Expand Down
85 changes: 75 additions & 10 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
const { Worker } = require('node:worker_threads');
const { types } = require('node:util');
const path = require('node:path');

const { textReport, chartReport, htmlReport } = require('./report');
const { getInitialIterations, runBenchmark, runWarmup } = require('./lifecycle');
const { debugBench, timer } = require('./clock');
const { debugBench, timer, createFnString } = require('./clock');
const {
validatePlugins,
V8NeverOptimizePlugin,
Expand All @@ -15,6 +19,11 @@ const {
validateArray,
} = require('./validators');

const getFunctionBody = (string) => string.substring(
string.indexOf("{") + 1,
string.lastIndexOf("}")
);

class Benchmark {
name = 'Benchmark';
fn;
Expand All @@ -30,6 +39,22 @@ class Benchmark {
this.maxTime = maxTime;
this.plugins = plugins;
this.repeatSuite = repeatSuite;

this.hasArg = this.fn.length >= 1;
if (this.fn.length > 1) {
process.emitWarning(`The benchmark "${ this.name }" function should not have more than 1 argument.`);
}

this.isAsync = types.isAsyncFunction(this.fn);

this.fnStr = createFnString(this);
}

serializeBenchmark() {
// Regular functions can't be passed to worker.postMessage
// So we pass the string and deserialize fnStr into a new Function
// on worker
this.fn = getFunctionBody(this.fn.toString());
}
}

Expand All @@ -52,6 +77,7 @@ class Suite {
#benchmarks;
#reporter;
#plugins;
#useWorkers;

constructor(options = {}) {
this.#benchmarks = [];
Expand All @@ -65,6 +91,7 @@ class Suite {
this.#reporter = textReport;
}

this.#useWorkers = options.useWorkers || false;
if (options?.plugins) {
validateArray(options.plugins, 'plugin');
validatePlugins(options.plugins);
Expand Down Expand Up @@ -105,29 +132,67 @@ class Suite {
throwIfNoNativesSyntax();
const results = new Array(this.#benchmarks.length);

// This is required to avoid variance on first benchmark run
for (let i = 0; i < this.#benchmarks.length; ++i) {
const benchmark = this.#benchmarks[i];
debugBench(`Warmup ${ benchmark.name } with minTime=${ benchmark.minTime }, maxTime=${ benchmark.maxTime }`);
const initialIteration = await getInitialIterations(benchmark);
await runWarmup(benchmark, initialIteration, { minTime: 0.005, maxTime: 0.05 });
// It doesn't make sense to warmup a fresh new instance of Worker.
// TODO: support warmup directly in the Worker.
if (!this.#useWorkers) {
// This is required to avoid variance on first benchmark run
for (let i = 0; i < this.#benchmarks.length; ++i) {
const benchmark = this.#benchmarks[i];
debugBench(`Warmup ${ benchmark.name } with minTime=${ benchmark.minTime }, maxTime=${ benchmark.maxTime }`);
const initialIteration = await getInitialIterations(benchmark);
await runWarmup(benchmark, initialIteration, { minTime: 0.005, maxTime: 0.05 });
}
}

for (let i = 0; i < this.#benchmarks.length; ++i) {
const benchmark = this.#benchmarks[i];
// Warmup is calculated to reduce noise/bias on the results
const initialIteration = await getInitialIterations(benchmark);
const initialIterations = await getInitialIterations(benchmark);
debugBench(`Starting ${ benchmark.name } with minTime=${ benchmark.minTime }, maxTime=${ benchmark.maxTime }, repeatSuite=${ benchmark.repeatSuite }`);
const result = await runBenchmark(benchmark, initialIteration, benchmark.repeatSuite);

let result;
if (this.#useWorkers) {
result = await this.runWorkerBenchmark(benchmark, initialIterations);
} else {
result = await runBenchmark(benchmark, initialIterations, benchmark.repeatSuite);
}
results[i] = result;
}

if (this.#reporter) {
this.#reporter(results);
}

return results;
}

runWorkerBenchmark(benchmark, initialIterations) {
benchmark.serializeBenchmark();
const worker = new Worker(path.join(__dirname, './worker-runner.js'));

worker.postMessage({
benchmark,
initialIterations,
repeatSuite: benchmark.repeatSuite,
});
return new Promise((resolve, reject) => {
worker.on('message', (result) => {
resolve(result);
// TODO: await?
worker.terminate();
});

worker.on('error', (err) => {
reject(err);
worker.terminate();
});

worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
});
}
}

module.exports = {
Expand Down
10 changes: 7 additions & 3 deletions lib/lifecycle.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const { clockBenchmark, debugBench, MIN_RESOLUTION, timer } = require('./clock');
const { StatisticalHistogram } = require('./histogram');
const { isMainThread } = require('child_process')

/**
* @param {number} durationPerOp The amount of time each operation takes
Expand Down Expand Up @@ -67,7 +68,6 @@ async function runWarmup(bench, initialIterations, { minTime, maxTime }) {
async function runBenchmarkOnce(bench, histogram, { initialIterations, maxDuration, minSamples }) {
let iterations = 0;
let timeSpent = 0;

while (timeSpent < maxDuration || histogram.samples.length <= minSamples) {
const { 0: duration, 1: realIterations } = await clockBenchmark(bench, initialIterations);
timeSpent += duration
Expand Down Expand Up @@ -102,7 +102,6 @@ async function runBenchmark(bench, initialIterations, repeatSuite) {
histogram,
{ initialIterations, maxDuration, minSamples }
);

totalTimeSpent += timeSpent;
totalIterations += iterations;
}
Expand All @@ -113,7 +112,12 @@ async function runBenchmark(bench, initialIterations, repeatSuite) {
return {
opsSec,
iterations: totalIterations,
histogram,
// StatisticalHistogram is not a serializable object
histogram: {
samples: histogram.samples.length,
min: histogram.min,
max: histogram.max,
},
name: bench.name,
plugins: parsePluginsResult(bench.plugins, bench.name),
};
Expand Down
2 changes: 1 addition & 1 deletion lib/reporter/text.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function textReport(results) {
process.stdout.write(styleText(['cyan', 'bold'], `${ formatter.format(opsSecReported) } ops/sec`));
// TODO: produce confidence on stddev
// process.stdout.write(result.histogram.stddev.toString());
process.stdout.write(` (${ result.histogram.samples.length } runs sampled) `);
process.stdout.write(` (${ result.histogram.samples } runs sampled) `);

for (const p of result.plugins) {
if (p.report) {
Expand Down
15 changes: 15 additions & 0 deletions lib/worker-runner.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict';

const { parentPort } = require('worker_threads');
const { runBenchmark } = require('./lifecycle');

// Deserialize the benchmark function
function deserializeBenchmark(benchmark) {
benchmark.fn = new Function(benchmark.fn);
}

parentPort.on('message', async ({ benchmark, initialIterations, repeatSuite }) => {
deserializeBenchmark(benchmark);
const result = await runBenchmark(benchmark, initialIterations, repeatSuite);
parentPort.postMessage(result);
});
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,8 @@
"bugs": {
"url": "https://github.com/RafaelGSS/bench-node/issues"
},
"homepage": "https://github.com/RafaelGSS/bench-node#readme"
"homepage": "https://github.com/RafaelGSS/bench-node#readme",
"dependencies": {
"piscina": "^4.8.0"
}
}
24 changes: 24 additions & 0 deletions test/env.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const { describe, it, before } = require('node:test');
const assert = require('node:assert');
const { Suite } = require('../lib');
const copyBench = require('./fixtures/copy');
const { managedBench, managedOptBench } = require('./fixtures/opt-managed');

Expand Down Expand Up @@ -69,3 +70,26 @@ describe('Managed can be V8 optimized', () => {
// assertBenchmarkDifference(results, 50, 30);
// });
});

describe('Workers should have parallel context', () => {
let results;
before(async () => {
const bench = new Suite({
reporter: () => {},
useWorkers: true,
});

bench
.add('Import with node: prefix', () => {
return import('node:fs');
})
.add('Import without node: prefix', () => {
return import('fs');
});
results = await bench.run();
});

it('should have a similar result as they will not share import.meta.cache', () => {
assertMaxBenchmarkDifference(results, { percentageLimit: 10, ciPercentageLimit: 30 });
});
});
2 changes: 1 addition & 1 deletion test/plugin-api-doc.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ describe("plugin API", async () => {
"getReport(string)",
"getResult(string)",
"isSupported()",
"onCompleteBenchmark([number, number, object], {fn, maxTime, minTime, name, plugins, repeatSuite})",
"onCompleteBenchmark([number, number, object], {fn, fnStr, hasArg, isAsync, maxTime, minTime, name, plugins, repeatSuite})",
"toJSON(string)",
"toString()",
]);
Expand Down
35 changes: 35 additions & 0 deletions test/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
const workerThreads = require('node:worker_threads');
const { describe, it, before, after, mock } = require('node:test');
const assert = require('node:assert');

function noop() {}

describe('Using worker_threads', () => {
before(async () => {
mock.method(workerThreads, 'Worker');

const { Suite } = require('../lib/index');

const bench = new Suite({
reporter: noop,
useWorkers: true
});

bench
.add('Import with node: prefix', () => {
return import('node:fs');
})
.add('Import without node: prefix', () => {
return import('fs');
});
await bench.run();
});

after(() => {
mock.restoreAll();
})

it('should create a new Worker 2 times', () => {
assert.strictEqual(workerThreads.Worker.mock.calls.length, 2);
});
});

0 comments on commit 831664a

Please sign in to comment.