diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index 56652ba3ce..f3d46cefa9 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -17,11 +17,26 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ hot, expectObservable }) => { const a = hot(' -a-b-c-d-e-f-g-h-i-|'); const b = hot(' -----B-----B-----B-|'); - const expected = '-----x-----y-----z-|'; + const expected = '-----x-----y-----z-(F|)'; const expectedValues = { x: ['a', 'b', 'c'], y: ['d', 'e', 'f'], z: ['g', 'h', 'i'], + F: [], + }; + expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); + }); + }); + + it('should not emit a final buffer if the closingNotifier is already complete', () => { + testScheduler.run(({ hot, expectObservable }) => { + const a = hot(' -a-b-c-d-e-f-g-h-i-|'); + const b = hot(' -----B-----B--|'); + const expected = '-----x-----y-------(F|)'; + const expectedValues = { + x: ['a', 'b', 'c'], + y: ['d', 'e', 'f'], + F: ['g', 'h', 'i'], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); }); @@ -52,8 +67,8 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ expectObservable }) => { const a = EMPTY; const b = EMPTY; - const expected = '|'; - expectObservable(a.pipe(buffer(b))).toBe(expected); + const expected = '(F|)'; + expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] }); }); }); @@ -61,8 +76,8 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ hot, expectObservable }) => { const a = EMPTY; const b = hot('-----a-----'); - const expected = '|'; - expectObservable(a.pipe(buffer(b))).toBe(expected); + const expected = '(F|)'; + expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] }); }); }); @@ -70,8 +85,10 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ hot, expectObservable }) => { const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); const b = EMPTY; - const expected = ' --------------------------------|'; - expectObservable(a.pipe(buffer(b))).toBe(expected); + const expected = ' --------------------------------(F|)'; + expectObservable(a.pipe(buffer(b))).toBe(expected, { + F: ['3', '4', '5', '6', '7', '8', '9', '0'], + }); }); }); @@ -97,8 +114,8 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ expectObservable }) => { const a = EMPTY; const b = NEVER; - const expected = '|'; - expectObservable(a.pipe(buffer(b))).toBe(expected); + const expected = '(F|)'; + expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] }); }); }); @@ -143,7 +160,7 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ hot, expectObservable }) => { const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); const b = hot('--------^--a-------b---cd---------e---f---|'); - const expected = ' ---a-------b---cd---------e---f-|'; + const expected = ' ---a-------b---cd---------e---f-(F|)'; const expectedValues = { a: ['3'], b: ['4', '5'], @@ -151,23 +168,24 @@ describe('Observable.prototype.buffer', () => { d: [] as string[], e: ['7', '8', '9'], f: ['0'], + F: [], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); }); }); - it(' work with selector completed', () => { - // Buffshoulder Boundaries onCompletedBoundaries (RxJS 4) + it('should work with selector completed', () => { testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); const subs = ' ^-------------------------------!'; const b = hot('--------^--a-------b---cd| '); - const expected = ' ---a-------b---cd---------------|'; + const expected = ' ---a-------b---cd---------------(F|)'; const expectedValues = { a: ['3'], b: ['4', '5'], c: ['6'], d: [] as string[], + F: ['7', '8', '9', '0'], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); expectSubscriptions(a.subscriptions).toBe(subs); @@ -294,7 +312,7 @@ describe('Observable.prototype.buffer', () => { const results: any[] = []; const subject = new Subject(); - const source = subject.pipe(buffer(subject)).subscribe({ + subject.pipe(buffer(subject)).subscribe({ next: (value) => results.push(value), complete: () => results.push('complete'), }); @@ -304,7 +322,7 @@ describe('Observable.prototype.buffer', () => { subject.next(2); expect(results).to.deep.equal([[1], [2]]); subject.complete(); - expect(results).to.deep.equal([[1], [2], 'complete']); + expect(results).to.deep.equal([[1], [2], [], 'complete']); }); describe('equivalence with the window operator', () => { diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index a9deb9ae81..daf5b62766 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -44,10 +44,23 @@ import { OperatorSubscriber } from './OperatorSubscriber'; */ export function buffer(closingNotifier: Observable): OperatorFunction { return operate((source, subscriber) => { + // The current buffered values. If this is null, it's because the + // closingNotifier has completed before the source. let currentBuffer: T[] = []; // Subscribe to our source. - source.subscribe(new OperatorSubscriber(subscriber, (value) => currentBuffer.push(value))); + source.subscribe( + new OperatorSubscriber( + subscriber, + (value) => currentBuffer.push(value), + // Pass all errors to the consumer. + undefined, + () => { + subscriber.next(currentBuffer); + subscriber.complete(); + } + ) + ); // Subscribe to the closing notifier. closingNotifier.subscribe( @@ -61,7 +74,6 @@ export function buffer(closingNotifier: Observable): OperatorFunction