Skip to content

Commit 5851d4c

Browse files
committed
Ensure connection between AsyncKeepLastQueue and its parent
1 parent 3cf4685 commit 5851d4c

File tree

1 file changed

+17
-5
lines changed

1 file changed

+17
-5
lines changed

src/async_queue.ts

+17-5
Original file line numberDiff line numberDiff line change
@@ -416,18 +416,24 @@ export abstract class AsyncReadonlyQueue<T> {
416416
});
417417
}
418418

419-
conflate<U>(reducer: (prior: T, next: T) => Promise<T> | T): AsyncReadonlyQueue<T> {
419+
conflate<U>(reducer: (prior: T, next: T, signal: AbortSignal) => Promise<T> | T): AsyncReadonlyQueue<T> {
420420
let accumulator: T | undefined = undefined;
421421

422-
const conflatedQueue = new AsyncKeepLastQueue<T>("conflate", this._maxBufferSize);
422+
const abortController = new AbortController();
423+
const conflatedQueue = new AsyncKeepLastQueue<T>(
424+
"conflate",
425+
this._maxBufferSize,
426+
() => abortController.abort(),
427+
[this],
428+
);
423429

424430
(async () => {
425431
for await (const item of this.items()) {
426432
if (conflatedQueue.isCompleted) return;
427433
if (accumulator === undefined) {
428434
accumulator = item;
429435
} else {
430-
accumulator = await reducer(accumulator, item);
436+
accumulator = await reducer(accumulator, item, abortController.signal);
431437
}
432438

433439
conflatedQueue.accumulate(accumulator);
@@ -451,6 +457,7 @@ export abstract class AsyncReadonlyQueue<T> {
451457
"conflateWithSeed",
452458
this._maxBufferSize,
453459
() => abortController.abort(),
460+
[this],
454461
);
455462

456463
(async () => {
@@ -487,7 +494,7 @@ export abstract class AsyncReadonlyQueue<T> {
487494
"conflateWithSeedFn",
488495
this._maxBufferSize,
489496
() => abortController.abort(),
490-
undefined,
497+
[this],
491498
() => {
492499
accumulator = undefined;
493500
},
@@ -527,7 +534,12 @@ export abstract class AsyncReadonlyQueue<T> {
527534
let lastItem: T | undefined = undefined;
528535
let timer: number | undefined = undefined;
529536

530-
const debouncedQueue = new AsyncKeepLastQueue<T>("debounce", this._maxBufferSize);
537+
const debouncedQueue = new AsyncKeepLastQueue<T>(
538+
"debounce",
539+
this._maxBufferSize,
540+
undefined,
541+
[this],
542+
);
531543

532544
(async () => {
533545
for await (const item of this.items()) {

0 commit comments

Comments
 (0)