Skip to content

Commit

Permalink
Avoid close() cancels the underlying Web stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Borewit committed Feb 5, 2025
1 parent 86e9076 commit 51206d4
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 33 deletions.
19 changes: 3 additions & 16 deletions lib/WebStreamByobReader.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import type { ReadableStreamBYOBReader } from 'node:stream/web';
import { AbstractStreamReader } from "./AbstractStreamReader.js";

import { WebStreamReader } from './WebStreamReader.js';

/**
* Read from a WebStream using a BYOB reader
* Reference: https://nodejs.org/api/webstreams.html#class-readablestreambyobreader
*/
export class WebStreamByobReader extends AbstractStreamReader {

public constructor(private reader: ReadableStreamBYOBReader) {
super();
}
export class WebStreamByobReader extends WebStreamReader {

protected async readFromStream(buffer: Uint8Array, offset: number, length: number): Promise<number> {

Expand All @@ -26,13 +22,4 @@ export class WebStreamByobReader extends AbstractStreamReader {

return 0;
}

public abort(): Promise<void> {
return this.reader.cancel(); // Signals a loss of interest in the stream by a consumer
}

public async close(): Promise<void> {
await this.abort();
this.reader.releaseLock();
}
}
19 changes: 19 additions & 0 deletions lib/WebStreamReader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type { ReadableStreamBYOBReader, ReadableStreamDefaultReader } from 'node:stream/web';
import { AbstractStreamReader } from "./AbstractStreamReader.js";

export abstract class WebStreamReader extends AbstractStreamReader {

public constructor(protected reader: ReadableStreamDefaultReader | ReadableStreamBYOBReader) {
super();
}

protected abstract readFromStream(buffer: Uint8Array, offset: number, length: number): Promise<number>;

public async abort(): Promise<void> {
return this.close();
}

public async close(): Promise<void> {
this.reader.releaseLock();
}
}
42 changes: 25 additions & 17 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {Readable} from 'node:stream';
import { AbortError, EndOfStreamError, type IStreamReader, makeWebStreamReader, StreamReader } from '../lib/index.js';
import {SourceStream, stringToReadableStream} from './util.js';
import type { ReadStream } from 'node:fs';
import process from 'node:process';

use(chaiAsPromised);

Expand Down Expand Up @@ -224,20 +223,20 @@ describe('Matrix', () => {
describe('Handle delayed read', () => {

it('handle delay', async ()=> {
const fileReadStream = factory.fromString('123', 500);
const streamReader = factory.fromString('123', 500);
const res = new Uint8Array(3);
const promise = fileReadStream.read(res, 0, 3);
const promise = streamReader.read(res, 0, 3);
assert.strictEqual(await promise, 3);
});

it('abort async operation', async function () {
it('abort async operation', async function() {
if (process.versions.bun) {
this.skip();
this.skip(); // https://github.com/oven-sh/bun/issues/17008
}
const fileReadStream = factory.fromString('123', 500);
const res = new Uint8Array(3);
const promise = fileReadStream.read(res, 0, 3);
await fileReadStream.close();
await fileReadStream.abort();
await expect(promise).to.be.rejectedWith(Error)
});

Expand Down Expand Up @@ -414,28 +413,37 @@ describe('Node.js StreamReader', () => {

describe('abort() should release stream-lock', () => {

it('`BYOB WebStreamReader`', async () => {
it('BYOB WebStreamReader', async () => {

const readableStream = stringToReadableStream('abc', false);
assert.isFalse(readableStream.locked, 'stream is unlocked before initializing tokenizer');
try {
assert.isFalse(readableStream.locked, 'stream is unlocked before initializing tokenizer');

const webStreamReader = makeWebStreamReader(readableStream);
assert.isTrue(readableStream.locked, 'stream is locked after initializing tokenizer');
const webStreamReader = makeWebStreamReader(readableStream);
assert.isTrue(readableStream.locked, 'stream is locked after initializing tokenizer');

await webStreamReader.close();
assert.isFalse(readableStream.locked, 'stream is unlocked after closing tokenizer');
} finally {
await readableStream.cancel();
}

await webStreamReader.close();
assert.isFalse(readableStream.locked, 'stream is unlocked after closing tokenizer');
});

it('Default WebStreamReader', async () => {

const readableStream = stringToReadableStream('abc', true);
assert.isFalse(readableStream.locked, 'stream is unlocked before initializing tokenizer');
try {
assert.isFalse(readableStream.locked, 'stream is unlocked before initializing tokenizer');

const webStreamReader = makeWebStreamReader(readableStream);
assert.isTrue(readableStream.locked, 'stream is locked after initializing tokenizer');
const webStreamReader = makeWebStreamReader(readableStream);
assert.isTrue(readableStream.locked, 'stream is locked after initializing tokenizer');

await webStreamReader.close();
assert.isFalse(readableStream.locked, 'stream is unlocked after closing tokenizer');
await webStreamReader.close();
assert.isFalse(readableStream.locked, 'stream is unlocked after closing tokenizer');
} finally {
await readableStream.cancel();
}
});
});
});
Expand Down

0 comments on commit 51206d4

Please sign in to comment.