Skip to content

Commit 3cf4685

Browse files
committed
Add AsyncQueue#conflateWithSeedFn
1 parent 78d7411 commit 3cf4685

File tree

3 files changed

+96
-65
lines changed

3 files changed

+96
-65
lines changed

flake.lock

+25-65
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/async_queue.test.ts

+16
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,22 @@ Deno.test("conflate() basic", async () => {
149149
assertEquals(results, [1, 4]);
150150
});
151151

152+
Deno.test("conflateWithSeedFn() basic", async () => {
153+
const queue = new AsyncQueue<number>(10);
154+
155+
await queue.enqueue(1);
156+
await queue.enqueue(2);
157+
await queue.enqueue(3);
158+
await queue.enqueue(4);
159+
160+
const conflatedQueue = queue.conflateWithSeedFn((v) => [v], (a, b) => [...a, b]);
161+
162+
queue.complete();
163+
164+
const results = await conflatedQueue.tap(() => delay(100)).collectArray();
165+
assertEquals(results, [[1], [2, 3, 4]]);
166+
});
167+
152168
Deno.test("debounce() basic", async () => {
153169
const queue = new AsyncQueue<number>(10);
154170
const debouncedQueue = queue.debounce(100);

src/async_queue.ts

+55
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,50 @@ export abstract class AsyncReadonlyQueue<T> {
475475
return conflatedQueue;
476476
}
477477

478+
conflateWithSeedFn<U>(
479+
seedFn: (next: T, signal: AbortSignal) => Promise<U> | U,
480+
reducer: (prior: U, next: T, signal: AbortSignal) => Promise<U> | U,
481+
): AsyncReadonlyQueue<U> {
482+
let accumulator: U | undefined;
483+
484+
const abortController = new AbortController();
485+
const signal = abortController.signal;
486+
const conflatedQueue = new AsyncKeepLastQueue<U>(
487+
"conflateWithSeedFn",
488+
this._maxBufferSize,
489+
() => abortController.abort(),
490+
undefined,
491+
() => {
492+
accumulator = undefined;
493+
},
494+
);
495+
496+
(async () => {
497+
for await (const item of this.items()) {
498+
if (conflatedQueue.isCompleted) return;
499+
try {
500+
if (accumulator === undefined) {
501+
accumulator = await seedFn(item, signal);
502+
} else {
503+
accumulator = await reducer(accumulator, item, signal);
504+
}
505+
} catch (e) {
506+
if (e.name === "AbortError") {
507+
return;
508+
}
509+
throw e;
510+
}
511+
512+
conflatedQueue.accumulate(accumulator);
513+
if (conflatedQueue.isCompleted) return;
514+
}
515+
516+
conflatedQueue.complete();
517+
})();
518+
519+
return conflatedQueue;
520+
}
521+
478522
debounce(durationMs: number): AsyncReadonlyQueue<T> {
479523
if (durationMs < 1) {
480524
throw new Error("debounce() durationMs must be greater than 0, got " + durationMs);
@@ -701,6 +745,16 @@ export class AsyncKeepLastQueue<T> extends AsyncReadonlyQueue<T> {
701745
protected _isItemPendingRead = false;
702746
protected _isPulling = false;
703747

748+
constructor(
749+
protected _name: string,
750+
protected _maxBufferSize: number,
751+
protected _onComplete?: () => void,
752+
protected _parents?: AsyncReadonlyQueue<unknown>[],
753+
protected _onEmit?: () => void,
754+
) {
755+
super(_name, _maxBufferSize, _onComplete, _parents);
756+
}
757+
704758
accumulate(item: T) {
705759
this._isItemPendingRead = true;
706760

@@ -722,6 +776,7 @@ export class AsyncKeepLastQueue<T> extends AsyncReadonlyQueue<T> {
722776

723777
this._isItemPendingRead = false;
724778

779+
this._onEmit?.();
725780
yield next;
726781
}
727782
}

0 commit comments

Comments
 (0)