Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use streams rather than a buffer #10

Merged
merged 1 commit into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 54 additions & 19 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {type Readable} from 'node:stream';
import {normalize as normalizePath} from 'node:path';
import {cwd as getCwd} from 'node:process';
import {computeEnv} from './env.js';
import {readStreamAsString, combineStreams} from './stream.js';
import {combineStreams} from './stream.js';
import readline from 'node:readline';

export interface Output {
Expand Down Expand Up @@ -156,31 +156,37 @@ export class ExecProcess implements Result {

async *[Symbol.asyncIterator](): AsyncIterator<string> {
const proc = this._process;

if (!proc) {
return;
}

if (this._thrownError) {
throw this._thrownError;
}
const streams: Readable[] = [];

const sources: Readable[] = [];
if (proc.stderr) {
sources.push(proc.stderr);
if (this._streamErr) {
streams.push(this._streamErr);
}
if (proc.stdout) {
sources.push(proc.stdout);
if (this._streamOut) {
streams.push(this._streamOut);
}
const combined = combineStreams(sources);

const streamCombined = combineStreams(streams);

const rl = readline.createInterface({
input: combined
input: streamCombined
});

for await (const chunk of rl) {
yield chunk.toString();
}

await this._processClosed;

proc.removeAllListeners();

if (this._thrownError) {
throw this._thrownError;
}
}

protected async _waitForOutput(): Promise<Output> {
Expand All @@ -194,6 +200,21 @@ export class ExecProcess implements Result {
throw new Error('No process was started');
}

let stderr = '';
let stdout = '';

if (this._streamErr) {
for await (const chunk of this._streamErr) {
stderr += chunk.toString();
}
}

if (this._streamOut) {
for await (const chunk of this._streamOut) {
stdout += chunk.toString();
}
}

await this._processClosed;

proc.removeAllListeners();
Expand All @@ -202,14 +223,9 @@ export class ExecProcess implements Result {
throw this._thrownError;
}

const [stderr, stdout] = await Promise.all([
proc.stderr && readStreamAsString(proc.stderr),
proc.stdout && readStreamAsString(proc.stdout)
]);

const result: Output = {
stderr: stderr ?? '',
stdout: stdout ?? ''
stderr,
stdout
};

return result;
Expand All @@ -222,16 +238,20 @@ export class ExecProcess implements Result {
return this._waitForOutput().then(onfulfilled, onrejected);
}

protected _streamOut?: Readable;
protected _streamErr?: Readable;

public spawn(): void {
const cwd = getCwd();
const options = this._options;
const nodeOptions = {
...defaultNodeOptions,
...options.nodeOptions
};

const signals: AbortSignal[] = [];

this._resetState();

if (options.timeout !== undefined) {
signals.push(AbortSignal.timeout(options.timeout));
}
Expand All @@ -255,6 +275,13 @@ export class ExecProcess implements Result {

const handle = spawn(normalisedCommand, normalisedArgs, nodeOptions);

if (handle.stderr) {
this._streamErr = handle.stderr;
}
if (handle.stdout) {
this._streamOut = handle.stdout;
}

this._process = handle;
handle.once('error', this._onError);
handle.once('close', this._onClose);
Expand All @@ -267,6 +294,14 @@ export class ExecProcess implements Result {
}
}

protected _resetState(): void {
this._aborted = false;
this._processClosed = new Promise<void>((resolve) => {
this._resolveClose = resolve;
});
this._thrownError = undefined;
}

protected _onError = (err: Error): void => {
if (
err.name === 'AbortError' &&
Expand Down
20 changes: 0 additions & 20 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,6 @@
import {type EventEmitter} from 'node:events';
import {type Readable, PassThrough} from 'node:stream';

export const readStreamAsString = (stream: Readable): Promise<string> => {
return new Promise<string>((resolve, reject) => {
let result = '';
const onDataReceived = (chunk: Buffer | string): void => {
result += chunk.toString();
};

stream.once('error', (err) => {
reject(err);
});

stream.on('data', onDataReceived);

stream.once('end', () => {
stream.removeListener('data', onDataReceived);
resolve(result);
});
});
};

export const waitForEvent = (
emitter: EventEmitter,
name: string
Expand Down
98 changes: 98 additions & 0 deletions src/test/main_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import {x} from '../main.js';
import * as assert from 'node:assert/strict';
import {test} from 'node:test';

test('exec', async (t) => {
await t.test('resolves to stdout', async () => {
const result = await x('echo', ['foo']);
assert.equal(result.stdout, 'foo\n');
assert.equal(result.stderr, '');
});

await t.test('times out after defined timeout (ms)', async () => {
const proc = x('sleep', ['0.2s'], {timeout: 100});
await assert.rejects(async () => {
await proc;
});
assert.equal(proc.killed, true);
assert.equal(proc.process!.signalCode, 'SIGTERM');
});

await t.test('throws spawn errors', async () => {
const proc = x('definitelyNonExistent');
await assert.rejects(async () => {
await proc;
}, 'spawn definitelyNonExistent NOENT');
});

await t.test('captures stderr', async () => {
const result = await x('cat', ['nonexistentforsure']);
assert.equal(
result.stderr,
'cat: nonexistentforsure: No such file or directory\n'
);
assert.equal(result.stdout, '');
});

await t.test('pid is number', async () => {
const proc = x('echo', ['foo']);
await proc;
assert.ok(typeof proc.pid === 'number');
});

await t.test('exitCode is set correctly', async () => {
const proc = x('echo', ['foo']);
assert.equal(proc.exitCode, undefined);
await proc;
assert.equal(proc.exitCode, 0);
});

await t.test('kill terminates the process', async () => {
const proc = x('sleep', ['5s']);
const result = proc.kill();
assert.ok(result);
assert.ok(proc.killed);
assert.equal(proc.aborted, false);
});

await t.test('pipe correctly pipes output', async () => {
const echoProc = x('echo', ['foo\nbar']);
const grepProc = echoProc.pipe('grep', ['foo']);
const result = await grepProc;

assert.equal(result.stderr, '');
assert.equal(result.stdout, 'foo\n');
assert.equal(echoProc.exitCode, 0);
assert.equal(grepProc.exitCode, 0);
});

await t.test('async iterator gets correct output', async () => {
const proc = x('echo', ['foo\nbar']);
const lines = [];
for await (const line of proc) {
lines.push(line);
}

assert.deepEqual(lines, ['foo', 'bar']);
});

await t.test('async iterator receives errors', async () => {
const proc = x('nonexistentforsure');
await assert.rejects(async () => {
for await (const line of proc) {
line;
}
});
});

await t.test('signal can be used to abort execution', async () => {
const controller = new AbortController();
const proc = x('sleep', ['4s'], {signal: controller.signal});
controller.abort();
const result = await proc;
assert.ok(proc.aborted);
assert.ok(proc.killed);
assert.equal(result.stderr, '');
assert.equal(result.stdout, '');
});
});
32 changes: 1 addition & 31 deletions src/test/stream_test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {combineStreams, waitForEvent, readStreamAsString} from '../stream.js';
import {combineStreams, waitForEvent} from '../stream.js';
import * as assert from 'node:assert/strict';
import {test} from 'node:test';
import {EventEmitter} from 'node:events';
Expand All @@ -13,36 +13,6 @@ test('waitForEvent', async (t) => {
});
});

test('readStreamAsString', async (t) => {
await t.test('rejects on error', async () => {
const streamError = new Error('fudge');
const stream = new Readable({
read() {
this.destroy(streamError);
}
});
await assert.rejects(readStreamAsString(stream), streamError);
});

await t.test('resolves to concatenated data', async () => {
const stream = Readable.from(['foo', 'bar']);
const result = await readStreamAsString(stream);
assert.equal(result, 'foobar');
});

await t.test('handles buffer data', async () => {
const stream = new Readable({
read() {
this.push(Buffer.from('foo'));
this.push(Buffer.from('bar'));
this.push(null);
}
});
const result = await readStreamAsString(stream);
assert.equal(result, 'foobar');
});
});

test('combineStreams', async (t) => {
await t.test('works with a single stream', async () => {
const stream = Readable.from(['foo', 'bar']);
Expand Down