diff --git a/src/main.ts b/src/main.ts index 004152d..88fbfec 100644 --- a/src/main.ts +++ b/src/main.ts @@ -3,7 +3,7 @@ import {type Readable} from 'node:stream'; import {normalize as normalizePath} from 'node:path'; import {cwd as getCwd} from 'node:process'; import {computeEnv} from './env.js'; -import {readStreamAsString, combineStreams} from './stream.js'; +import {combineStreams} from './stream.js'; import readline from 'node:readline'; export interface Output { @@ -156,24 +156,24 @@ export class ExecProcess implements Result { async *[Symbol.asyncIterator](): AsyncIterator { const proc = this._process; + if (!proc) { return; } - if (this._thrownError) { - throw this._thrownError; - } + const streams: Readable[] = []; - const sources: Readable[] = []; - if (proc.stderr) { - sources.push(proc.stderr); + if (this._streamErr) { + streams.push(this._streamErr); } - if (proc.stdout) { - sources.push(proc.stdout); + if (this._streamOut) { + streams.push(this._streamOut); } - const combined = combineStreams(sources); + + const streamCombined = combineStreams(streams); + const rl = readline.createInterface({ - input: combined + input: streamCombined }); for await (const chunk of rl) { @@ -181,6 +181,12 @@ export class ExecProcess implements Result { } await this._processClosed; + + proc.removeAllListeners(); + + if (this._thrownError) { + throw this._thrownError; + } } protected async _waitForOutput(): Promise { @@ -194,6 +200,21 @@ export class ExecProcess implements Result { throw new Error('No process was started'); } + let stderr = ''; + let stdout = ''; + + if (this._streamErr) { + for await (const chunk of this._streamErr) { + stderr += chunk.toString(); + } + } + + if (this._streamOut) { + for await (const chunk of this._streamOut) { + stdout += chunk.toString(); + } + } + await this._processClosed; proc.removeAllListeners(); @@ -202,14 +223,9 @@ export class ExecProcess implements Result { throw this._thrownError; } - const [stderr, stdout] = await Promise.all([ - proc.stderr && readStreamAsString(proc.stderr), - proc.stdout && readStreamAsString(proc.stdout) - ]); - const result: Output = { - stderr: stderr ?? '', - stdout: stdout ?? '' + stderr, + stdout }; return result; @@ -222,6 +238,9 @@ export class ExecProcess implements Result { return this._waitForOutput().then(onfulfilled, onrejected); } + protected _streamOut?: Readable; + protected _streamErr?: Readable; + public spawn(): void { const cwd = getCwd(); const options = this._options; @@ -229,9 +248,10 @@ export class ExecProcess implements Result { ...defaultNodeOptions, ...options.nodeOptions }; - const signals: AbortSignal[] = []; + this._resetState(); + if (options.timeout !== undefined) { signals.push(AbortSignal.timeout(options.timeout)); } @@ -255,6 +275,13 @@ export class ExecProcess implements Result { const handle = spawn(normalisedCommand, normalisedArgs, nodeOptions); + if (handle.stderr) { + this._streamErr = handle.stderr; + } + if (handle.stdout) { + this._streamOut = handle.stdout; + } + this._process = handle; handle.once('error', this._onError); handle.once('close', this._onClose); @@ -267,6 +294,14 @@ export class ExecProcess implements Result { } } + protected _resetState(): void { + this._aborted = false; + this._processClosed = new Promise((resolve) => { + this._resolveClose = resolve; + }); + this._thrownError = undefined; + } + protected _onError = (err: Error): void => { if ( err.name === 'AbortError' && diff --git a/src/stream.ts b/src/stream.ts index 6b4573f..b3e5684 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,26 +1,6 @@ import {type EventEmitter} from 'node:events'; import {type Readable, PassThrough} from 'node:stream'; -export const readStreamAsString = (stream: Readable): Promise => { - return new Promise((resolve, reject) => { - let result = ''; - const onDataReceived = (chunk: Buffer | string): void => { - result += chunk.toString(); - }; - - stream.once('error', (err) => { - reject(err); - }); - - stream.on('data', onDataReceived); - - stream.once('end', () => { - stream.removeListener('data', onDataReceived); - resolve(result); - }); - }); -}; - export const waitForEvent = ( emitter: EventEmitter, name: string diff --git a/src/test/main_test.ts b/src/test/main_test.ts new file mode 100644 index 0000000..8b6a0d0 --- /dev/null +++ b/src/test/main_test.ts @@ -0,0 +1,98 @@ +import {x} from '../main.js'; +import * as assert from 'node:assert/strict'; +import {test} from 'node:test'; + +test('exec', async (t) => { + await t.test('resolves to stdout', async () => { + const result = await x('echo', ['foo']); + assert.equal(result.stdout, 'foo\n'); + assert.equal(result.stderr, ''); + }); + + await t.test('times out after defined timeout (ms)', async () => { + const proc = x('sleep', ['0.2s'], {timeout: 100}); + await assert.rejects(async () => { + await proc; + }); + assert.equal(proc.killed, true); + assert.equal(proc.process!.signalCode, 'SIGTERM'); + }); + + await t.test('throws spawn errors', async () => { + const proc = x('definitelyNonExistent'); + await assert.rejects(async () => { + await proc; + }, 'spawn definitelyNonExistent NOENT'); + }); + + await t.test('captures stderr', async () => { + const result = await x('cat', ['nonexistentforsure']); + assert.equal( + result.stderr, + 'cat: nonexistentforsure: No such file or directory\n' + ); + assert.equal(result.stdout, ''); + }); + + await t.test('pid is number', async () => { + const proc = x('echo', ['foo']); + await proc; + assert.ok(typeof proc.pid === 'number'); + }); + + await t.test('exitCode is set correctly', async () => { + const proc = x('echo', ['foo']); + assert.equal(proc.exitCode, undefined); + await proc; + assert.equal(proc.exitCode, 0); + }); + + await t.test('kill terminates the process', async () => { + const proc = x('sleep', ['5s']); + const result = proc.kill(); + assert.ok(result); + assert.ok(proc.killed); + assert.equal(proc.aborted, false); + }); + + await t.test('pipe correctly pipes output', async () => { + const echoProc = x('echo', ['foo\nbar']); + const grepProc = echoProc.pipe('grep', ['foo']); + const result = await grepProc; + + assert.equal(result.stderr, ''); + assert.equal(result.stdout, 'foo\n'); + assert.equal(echoProc.exitCode, 0); + assert.equal(grepProc.exitCode, 0); + }); + + await t.test('async iterator gets correct output', async () => { + const proc = x('echo', ['foo\nbar']); + const lines = []; + for await (const line of proc) { + lines.push(line); + } + + assert.deepEqual(lines, ['foo', 'bar']); + }); + + await t.test('async iterator receives errors', async () => { + const proc = x('nonexistentforsure'); + await assert.rejects(async () => { + for await (const line of proc) { + line; + } + }); + }); + + await t.test('signal can be used to abort execution', async () => { + const controller = new AbortController(); + const proc = x('sleep', ['4s'], {signal: controller.signal}); + controller.abort(); + const result = await proc; + assert.ok(proc.aborted); + assert.ok(proc.killed); + assert.equal(result.stderr, ''); + assert.equal(result.stdout, ''); + }); +}); diff --git a/src/test/stream_test.ts b/src/test/stream_test.ts index cc29ed5..fa7feaf 100644 --- a/src/test/stream_test.ts +++ b/src/test/stream_test.ts @@ -1,4 +1,4 @@ -import {combineStreams, waitForEvent, readStreamAsString} from '../stream.js'; +import {combineStreams, waitForEvent} from '../stream.js'; import * as assert from 'node:assert/strict'; import {test} from 'node:test'; import {EventEmitter} from 'node:events'; @@ -13,36 +13,6 @@ test('waitForEvent', async (t) => { }); }); -test('readStreamAsString', async (t) => { - await t.test('rejects on error', async () => { - const streamError = new Error('fudge'); - const stream = new Readable({ - read() { - this.destroy(streamError); - } - }); - await assert.rejects(readStreamAsString(stream), streamError); - }); - - await t.test('resolves to concatenated data', async () => { - const stream = Readable.from(['foo', 'bar']); - const result = await readStreamAsString(stream); - assert.equal(result, 'foobar'); - }); - - await t.test('handles buffer data', async () => { - const stream = new Readable({ - read() { - this.push(Buffer.from('foo')); - this.push(Buffer.from('bar')); - this.push(null); - } - }); - const result = await readStreamAsString(stream); - assert.equal(result, 'foobar'); - }); -}); - test('combineStreams', async (t) => { await t.test('works with a single stream', async () => { const stream = Readable.from(['foo', 'bar']);