From d5fd69c123d2232335563eea95c69c07576d079d Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 24 Sep 2020 17:21:40 -0500 Subject: [PATCH] fix(throttle): trailing values will now emit after source completes --- spec/operators/throttle-spec.ts | 13 +++++++++++++ src/internal/operators/throttle.ts | 26 +++++++++++++++++++------- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/spec/operators/throttle-spec.ts b/spec/operators/throttle-spec.ts index 3991021067..140ce6c9c1 100644 --- a/spec/operators/throttle-spec.ts +++ b/spec/operators/throttle-spec.ts @@ -394,6 +394,19 @@ describe('throttle operator', () => { expectSubscriptions(s1.subscriptions).toBe(s1Subs); expectSubscriptions(n1.subscriptions).toBe(n1Subs); }); + + it('should wait for trailing throttle to complete before completing, even if source completes', () => { + const source = hot( '-^--x--------y---------|'); + const sourceSubs = '^ !'; + const duration = cold( '------------------------|'); + const durationSubs = ' ^ !'; + const exp = '---x-----------------------(y|)'; + + const result = source.pipe(throttle(() => duration, { leading: true, trailing: true })); + expectObservable(result).toBe(exp); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expectSubscriptions(duration.subscriptions).toBe(durationSubs); + }) }); // TODO: fix firehose unsubscription diff --git a/src/internal/operators/throttle.ts b/src/internal/operators/throttle.ts index 05c60f40bd..6ccceb70a6 100644 --- a/src/internal/operators/throttle.ts +++ b/src/internal/operators/throttle.ts @@ -69,11 +69,15 @@ export function throttle( let hasValue = false; let sendValue: T | null = null; let throttled: Subscription | null = null; + let isComplete = false; const throttlingDone = () => { throttled?.unsubscribe(); throttled = null; - trailing && send(); + if (trailing) { + send(); + isComplete && subscriber.complete(); + } }; const throttle = (value: T) => @@ -82,18 +86,26 @@ export function throttle( const send = () => { if (hasValue) { subscriber.next(sendValue!); - throttle(sendValue!); + !isComplete && throttle(sendValue!); } hasValue = false; sendValue = null; }; source.subscribe( - new OperatorSubscriber(subscriber, (value) => { - hasValue = true; - sendValue = value; - !throttled && (leading ? send() : throttle(value)); - }) + new OperatorSubscriber( + subscriber, + (value) => { + hasValue = true; + sendValue = value; + !throttled && (leading ? send() : throttle(value)); + }, + undefined, + () => { + isComplete = true; + !(trailing && hasValue && throttled) && subscriber.complete(); + } + ) ); }); }