From 425aca5be1da43862c0651d43d5a267469033a31 Mon Sep 17 00:00:00 2001 From: Mateusz Podlasin Date: Wed, 14 Dec 2016 18:27:33 +0100 Subject: [PATCH] fix(buffer): subscribe to source and closingNotifier in proper order In buffer operator subscribe to source observable first, so that when closingNotifier emits value, all source values emited before land in buffer Closes #1610 BREAKING CHANGE: When source and closingNotifier fire at the same time, it is expected that value emitted by source will first land in buffer and then closingNotifier will close it. Because of reversed subscription order, closingNotifier emitted first, so source was not able to put value in buffer before it was closed. Now source is subscribed before closingNotifier, so if they fire at the same time, source value is put into buffer and then closingNotifer closes it. --- spec/operators/buffer-spec.ts | 13 +++++++++++++ src/operator/buffer.ts | 13 ++++++++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index 6912aaf7b2..b6d3bfebe0 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -219,4 +219,17 @@ describe('Observable.prototype.buffer', () => { expectObservable(a.buffer(b).take(1)).toBe(expected, expectedValues); expectSubscriptions(b.subscriptions).toBe(bsubs); }); + + it('should work with filtered source as closingNotifier', () => { + const values = {0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8}; + + const source = hot('-0-1-2-3-4-5-6-7-8-|', values); + const expected = '-a---b---c---d---e-|'; + + const expectedValues = {a: [0], b: [1, 2], c: [3, 4], d: [5, 6], e: [7, 8]}; + const filteredSource = source.filter(x => x % 2 === 0); + + const result = source.buffer(filteredSource); + expectObservable(result).toBe(expected, expectedValues); + }); }); diff --git a/src/operator/buffer.ts b/src/operator/buffer.ts index ca43bec9f2..e90aeb2dc1 100644 --- a/src/operator/buffer.ts +++ b/src/operator/buffer.ts @@ -1,5 +1,6 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; +import { TeardownLogic } from '../Subscription'; import { Observable } from '../Observable'; import { OuterSubscriber } from '../OuterSubscriber'; @@ -47,8 +48,11 @@ class BufferOperator implements Operator { constructor(private closingNotifier: Observable) { } - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new BufferSubscriber(subscriber, this.closingNotifier)); + call(subscriber: Subscriber, source: any): TeardownLogic { + const bufferSubscriber = new BufferSubscriber(subscriber); + const subscription = source.subscribe(bufferSubscriber); + bufferSubscriber.subscribeToClosingNotifier(this.closingNotifier); + return subscription; } } @@ -60,8 +64,11 @@ class BufferOperator implements Operator { class BufferSubscriber extends OuterSubscriber { private buffer: T[] = []; - constructor(destination: Subscriber, closingNotifier: Observable) { + constructor(destination: Subscriber) { super(destination); + } + + subscribeToClosingNotifier(closingNotifier: Observable) { this.add(subscribeToResult(this, closingNotifier)); }