Skip to content

Commit

Permalink
fix(throttle): trailing values will now emit after source completes
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Sep 25, 2020
1 parent 207b704 commit d5fd69c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 7 deletions.
13 changes: 13 additions & 0 deletions spec/operators/throttle-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,19 @@ describe('throttle operator', () => {
expectSubscriptions(s1.subscriptions).toBe(s1Subs);
expectSubscriptions(n1.subscriptions).toBe(n1Subs);
});

it('should wait for trailing throttle to complete before completing, even if source completes', () => {
const source = hot( '-^--x--------y---------|');
const sourceSubs = '^ !';
const duration = cold( '------------------------|');
const durationSubs = ' ^ !';
const exp = '---x-----------------------(y|)';

const result = source.pipe(throttle(() => duration, { leading: true, trailing: true }));
expectObservable(result).toBe(exp);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expectSubscriptions(duration.subscriptions).toBe(durationSubs);
})
});

// TODO: fix firehose unsubscription
Expand Down
26 changes: 19 additions & 7 deletions src/internal/operators/throttle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,15 @@ export function throttle<T>(
let hasValue = false;
let sendValue: T | null = null;
let throttled: Subscription | null = null;
let isComplete = false;

const throttlingDone = () => {
throttled?.unsubscribe();
throttled = null;
trailing && send();
if (trailing) {
send();
isComplete && subscriber.complete();
}
};

const throttle = (value: T) =>
Expand All @@ -82,18 +86,26 @@ export function throttle<T>(
const send = () => {
if (hasValue) {
subscriber.next(sendValue!);
throttle(sendValue!);
!isComplete && throttle(sendValue!);
}
hasValue = false;
sendValue = null;
};

source.subscribe(
new OperatorSubscriber(subscriber, (value) => {
hasValue = true;
sendValue = value;
!throttled && (leading ? send() : throttle(value));
})
new OperatorSubscriber(
subscriber,
(value) => {
hasValue = true;
sendValue = value;
!throttled && (leading ? send() : throttle(value));
},
undefined,
() => {
isComplete = true;
!(trailing && hasValue && throttled) && subscriber.complete();
}
)
);
});
}

0 comments on commit d5fd69c

Please sign in to comment.