diff --git a/api_guard/dist/types/index.d.ts b/api_guard/dist/types/index.d.ts index 305d279b1f..37fca228e4 100644 --- a/api_guard/dist/types/index.d.ts +++ b/api_guard/dist/types/index.d.ts @@ -118,6 +118,8 @@ export declare const config: { useDeprecatedNextContext: boolean; }; +export declare function connectable(source: ObservableInput, connector?: Subject): ConnectableObservableLike; + export declare class ConnectableObservable extends Observable { protected _connection: Subscription | null; protected _refCount: number; @@ -298,7 +300,7 @@ export declare class Observable implements Subscribable { pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction, op7: OperatorFunction, op8: OperatorFunction): Observable; pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction, op7: OperatorFunction, op8: OperatorFunction, op9: OperatorFunction): Observable; pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction, op7: OperatorFunction, op8: OperatorFunction, op9: OperatorFunction, ...operations: OperatorFunction[]): Observable; - subscribe(observer?: PartialObserver): Subscription; + subscribe(observer?: Partial>): Subscription; subscribe(next: null | undefined, error: null | undefined, complete: () => void): Subscription; subscribe(next: null | undefined, error: (error: any) => void, complete?: () => void): Subscription; subscribe(next: (value: T) => void, error: null | undefined, complete: () => void): Subscription; @@ -330,7 +332,6 @@ export declare type ObservedValueTupleFromArray = { export declare type ObservedValueUnionFromArray = X extends Array> ? T : never; export interface Observer { - closed?: boolean; complete: () => void; error: (err: any) => void; next: (value: T) => void; @@ -438,12 +439,11 @@ export declare class Subject extends Observable implements SubscriptionLik static create: (...args: any[]) => any; } +export interface SubjectLike extends Observer, Subscribable { +} + export interface Subscribable { - subscribe(observer?: PartialObserver): Unsubscribable; - subscribe(next: null | undefined, error: null | undefined, complete: () => void): Unsubscribable; - subscribe(next: null | undefined, error: (error: any) => void, complete?: () => void): Unsubscribable; - subscribe(next: (value: T) => void, error: null | undefined, complete: () => void): Unsubscribable; - subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Unsubscribable; + subscribe(observer: Partial>): Unsubscribable; } export declare type SubscribableOrPromise = Subscribable | Subscribable | PromiseLike | InteropObservable; diff --git a/api_guard/dist/types/operators/index.d.ts b/api_guard/dist/types/operators/index.d.ts index 8e66c7decf..3a85ca4d31 100644 --- a/api_guard/dist/types/operators/index.d.ts +++ b/api_guard/dist/types/operators/index.d.ts @@ -60,6 +60,8 @@ export declare function concatMapTo>(observ export declare function concatWith(...otherSources: [...ObservableInputTuple]): OperatorFunction; +export declare function connect(selector: (shared: Observable) => ObservableInput, config?: ConnectConfig): OperatorFunction; + export declare function count(predicate?: (value: T, index: number) => boolean): OperatorFunction; export declare function debounce(durationSelector: (value: T) => ObservableInput): MonoTypeOperatorFunction; @@ -182,9 +184,9 @@ export declare function mergeWith(...otherSourc export declare function min(comparer?: (x: T, y: T) => number): MonoTypeOperatorFunction; export declare function multicast(subject: Subject): UnaryFunction, ConnectableObservable>; -export declare function multicast>(subject: Subject, selector: (shared: Observable) => O): UnaryFunction, ConnectableObservable>>; -export declare function multicast(subjectFactory: (this: Observable) => Subject): UnaryFunction, ConnectableObservable>; -export declare function multicast>(SubjectFactory: (this: Observable) => Subject, selector: (shared: Observable) => O): OperatorFunction>; +export declare function multicast>(subject: Subject, selector: (shared: Observable) => O): OperatorFunction>; +export declare function multicast(subjectFactory: () => Subject): UnaryFunction, ConnectableObservable>; +export declare function multicast>(subjectFactory: () => Subject, selector: (shared: Observable) => O): OperatorFunction>; export declare function observeOn(scheduler: SchedulerLike, delay?: number): MonoTypeOperatorFunction; @@ -206,14 +208,14 @@ export declare function pluck(...properties: string[]): OperatorFunction(): UnaryFunction, ConnectableObservable>; export declare function publish>(selector: (shared: Observable) => O): OperatorFunction>; -export declare function publish(selector: MonoTypeOperatorFunction): MonoTypeOperatorFunction; -export declare function publishBehavior(value: T): UnaryFunction, ConnectableObservable>; +export declare function publishBehavior(initialValue: T): UnaryFunction, ConnectableObservable>; export declare function publishLast(): UnaryFunction, ConnectableObservable>; -export declare function publishReplay(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; -export declare function publishReplay>(bufferSize?: number, windowTime?: number, selector?: (shared: Observable) => O, scheduler?: SchedulerLike): OperatorFunction>; +export declare function publishReplay(bufferSize?: number, windowTime?: number, timestampProvider?: TimestampProvider): MonoTypeOperatorFunction; +export declare function publishReplay>(bufferSize: number | undefined, windowTime: number | undefined, selector: (shared: Observable) => O, timestampProvider?: TimestampProvider): OperatorFunction>; +export declare function publishReplay>(bufferSize: number | undefined, windowTime: number | undefined, selector: undefined, timestampProvider: TimestampProvider): OperatorFunction>; export declare function race(observables: Array>): MonoTypeOperatorFunction; export declare function race(observables: Array>): OperatorFunction; @@ -248,6 +250,7 @@ export declare function scan(accumulator: (acc: A | S, value: V, index: export declare function sequenceEqual(compareTo: Observable, comparator?: (a: T, b: T) => boolean): OperatorFunction; export declare function share(): MonoTypeOperatorFunction; +export declare function share(options: ShareConfig): MonoTypeOperatorFunction; export declare function shareReplay(config: ShareReplayConfig): MonoTypeOperatorFunction; export declare function shareReplay(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; diff --git a/api_guard/dist/types/testing/index.d.ts b/api_guard/dist/types/testing/index.d.ts index 3573895cf4..4b1d2badcc 100644 --- a/api_guard/dist/types/testing/index.d.ts +++ b/api_guard/dist/types/testing/index.d.ts @@ -20,12 +20,13 @@ export declare class TestScheduler extends VirtualTimeScheduler { [marble: string]: T; }, error?: any): HotObservable; createTime(marbles: string): number; - expectObservable(observable: Observable, subscriptionMarbles?: string | null): ({ - toBe: observableToBeFn; - }); - expectSubscriptions(actualSubscriptionLogs: SubscriptionLog[]): ({ + expectObservable(observable: Observable, subscriptionMarbles?: string | null): { + toBe(marbles: string, values?: any, errorValue?: any): void; + toEqual: (other: Observable) => void; + }; + expectSubscriptions(actualSubscriptionLogs: SubscriptionLog[]): { toBe: subscriptionLogsToBeFn; - }); + }; flush(): void; run(callback: (helpers: RunHelpers) => T): T; static frameTimeFactor: number; diff --git a/spec/deprecation-equivalents/multicasting-deprecations-spec.ts b/spec/deprecation-equivalents/multicasting-deprecations-spec.ts new file mode 100644 index 0000000000..d9024c44fc --- /dev/null +++ b/spec/deprecation-equivalents/multicasting-deprecations-spec.ts @@ -0,0 +1,143 @@ +/** @prettier */ +import { Observable, ConnectableObservable, connectable, of, AsyncSubject, BehaviorSubject, ReplaySubject, Subject, merge } from 'rxjs'; +import { connect, share, multicast, publish, publishReplay, publishBehavior, publishLast, refCount, repeat, retry } from 'rxjs/operators'; +import { TestScheduler } from 'rxjs/testing'; +import { observableMatcher } from '../helpers/observableMatcher'; + +describe('multicasting equivalent tests', () => { + let rxTest: TestScheduler; + + beforeEach(() => { + rxTest = new TestScheduler(observableMatcher); + }); + + testEquivalents( + 'multicast(() => new Subject()), refCount() and share()', + (source) => + source.pipe( + multicast(() => new Subject()), + refCount() + ), + (source) => source.pipe(share()) + ); + + testEquivalents( + 'multicast(new Subject()), refCount() and share({ resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })', + (source) => source.pipe(multicast(new Subject()), refCount()), + (source) => source.pipe(share({ resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })) + ); + + testEquivalents( + 'publish(), refCount() and share({ resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })', + (source) => source.pipe(publish(), refCount()), + (source) => source.pipe(share({ resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })) + ); + + testEquivalents( + 'publishLast(), refCount() and share({ connector: () => new AsyncSubject(), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })', + (source) => source.pipe(publishLast(), refCount()), + (source) => + source.pipe(share({ connector: () => new AsyncSubject(), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })) + ); + + testEquivalents( + 'publishBehavior("X"), refCount() and share({ connector: () => new BehaviorSubject("X"), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })', + (source) => source.pipe(publishBehavior('X'), refCount()), + (source) => + source.pipe( + share({ connector: () => new BehaviorSubject('X'), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }) + ) + ); + + testEquivalents( + 'publishReplay(3, 10), refCount() and share({ connector: () => new ReplaySubject(3, 10), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })', + (source) => source.pipe(publishReplay(3, 10), refCount()), + (source) => + source.pipe( + share({ connector: () => new ReplaySubject(3, 10), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }) + ) + ); + + const fn = (source: Observable) => merge(source, source); + + testEquivalents( + 'publish(fn) and connect({ setup: fn })', + (source) => source.pipe(publish(fn)), + (source) => source.pipe(connect(fn)) + ); + + testEquivalents( + 'publishReplay(3, 10, fn) and `subject = new ReplaySubject(3, 10), connect({ connector: () => subject , setup: fn })`', + (source) => source.pipe(publishReplay(3, 10, fn)), + (source) => { + const subject = new ReplaySubject(3, 10); + return source.pipe(connect(fn, { connector: () => subject })); + } + ); + + /** + * Used to test a variety of scenarios with multicast operators that should be equivalent. + * @param name The name to add to the test output + * @param oldExpression The old expression we're saying matches the updated expression + * @param updatedExpression The updated expression we're telling people to use instead. + */ + function testEquivalents( + name: string, + oldExpression: (source: Observable) => Observable, + updatedExpression: (source: Observable) => Observable + ) { + it(`should be equivalent for ${name} for async sources`, () => { + rxTest.run(({ cold, expectObservable }) => { + const source = cold('----a---b---c----d---e----|'); + const old = oldExpression(source); + const updated = updatedExpression(source); + expectObservable(updated).toEqual(old); + }); + }); + + it(`should be equivalent for ${name} for async sources that repeat`, () => { + rxTest.run(({ cold, expectObservable }) => { + const source = cold('----a---b---c----d---e----|'); + const old = oldExpression(source).pipe(repeat(3)); + const updated = updatedExpression(source).pipe(repeat(3)); + expectObservable(updated).toEqual(old); + }); + }); + + it(`should be equivalent for ${name} for async sources that retry`, () => { + rxTest.run(({ cold, expectObservable }) => { + const source = cold('----a---b---c----d---e----#'); + const old = oldExpression(source).pipe(retry(3)); + const updated = updatedExpression(source).pipe(retry(3)); + expectObservable(updated).toEqual(old); + }); + }); + + it(`should be equivalent for ${name} for async sources`, () => { + rxTest.run(({ expectObservable }) => { + const source = of('a', 'b', 'c'); + const old = oldExpression(source); + const updated = updatedExpression(source); + expectObservable(updated).toEqual(old); + }); + }); + + it(`should be equivalent for ${name} for async sources that repeat`, () => { + rxTest.run(({ expectObservable }) => { + const source = of('a', 'b', 'c'); + const old = oldExpression(source).pipe(repeat(3)); + const updated = updatedExpression(source).pipe(repeat(3)); + expectObservable(updated).toEqual(old); + }); + }); + + it(`should be equivalent for ${name} for async sources that retry`, () => { + rxTest.run(({ expectObservable }) => { + const source = of('a', 'b', 'c'); + const old = oldExpression(source).pipe(retry(3)); + const updated = updatedExpression(source).pipe(retry(3)); + expectObservable(updated).toEqual(old); + }); + }); + } +}); diff --git a/spec/operators/connect-spec.ts b/spec/operators/connect-spec.ts new file mode 100644 index 0000000000..23761d54e9 --- /dev/null +++ b/spec/operators/connect-spec.ts @@ -0,0 +1,46 @@ +/** @prettier */ +import { BehaviorSubject, merge } from 'rxjs'; +import { connect, delay } from 'rxjs/operators'; +import { TestScheduler } from 'rxjs/testing'; +import { observableMatcher } from '../helpers/observableMatcher'; + +describe('connect', () => { + let rxTest: TestScheduler; + + beforeEach(() => { + rxTest = new TestScheduler(observableMatcher); + }); + + it('should connect a source through a selector function', () => { + rxTest.run(({ cold, time, expectObservable }) => { + const source = cold('---a----b-----c---|'); + const d = time(' ---|'); + const expected = ' ---a--a-b--b--c--c|'; + + const result = source.pipe(connect((shared) => merge(shared.pipe(delay(d)), shared))); + + expectObservable(result).toBe(expected); + }); + }); + + it('should connect a source through a selector function and use the provided connector', () => { + rxTest.run(({ cold, time, expectObservable }) => { + const source = cold('--------a---------b---------c-----|'); + const d = time(' ---|'); + const expected = ' S--S----a--a------b--b------c--c--|'; + + const result = source.pipe( + connect( + (shared) => { + return merge(shared.pipe(delay(d)), shared); + }, + { + connector: () => new BehaviorSubject('S'), + } + ) + ); + + expectObservable(result).toBe(expected); + }); + }); +}); diff --git a/spec/operators/publishReplay-spec.ts b/spec/operators/publishReplay-spec.ts index 4614684b13..fef94068bd 100644 --- a/spec/operators/publishReplay-spec.ts +++ b/spec/operators/publishReplay-spec.ts @@ -430,7 +430,7 @@ describe('publishReplay operator', () => { expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); - it('should emit an error when the selector throws an exception', () => { + it('should EMIT an error when the selector throws an exception', () => { const error = "It's broken"; const selector = () => { throw error; diff --git a/spec/operators/share-spec.ts b/spec/operators/share-spec.ts index cbb7ace6c1..2cb5f9c881 100644 --- a/spec/operators/share-spec.ts +++ b/spec/operators/share-spec.ts @@ -1,331 +1,466 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { share, retry, mergeMapTo, mergeMap, tap, repeat, take } from 'rxjs/operators'; -import { Observable, EMPTY, NEVER, of } from 'rxjs'; +import { share, retry, mergeMapTo, mergeMap, tap, repeat, take, takeUntil, takeWhile, materialize } from 'rxjs/operators'; +import { Observable, EMPTY, NEVER, of, Subject, Observer, from } from 'rxjs'; +import { TestScheduler } from 'rxjs/testing'; +import { observableMatcher } from '../helpers/observableMatcher'; +import sinon = require('sinon'); /** @test {share} */ -describe('share operator', () => { - it('should mirror a simple source Observable', () => { - const source = cold('--1-2---3-4--5-|'); - const sourceSubs = '^ !'; - const expected = '--1-2---3-4--5-|'; +describe('share', () => { + describe('share()', () => { + it('should mirror a simple source Observable', () => { + const source = cold('--1-2---3-4--5-|'); + const sourceSubs = '^ !'; + const expected = '--1-2---3-4--5-|'; - const shared = source.pipe(share()); + const shared = source.pipe(share()); - expectObservable(shared).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); - - it('should share a single subscription', () => { - let subscriptionCount = 0; - const obs = new Observable(observer => { - subscriptionCount++; + expectObservable(shared).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); - const source = obs.pipe(share()); + it('should share a single subscription', () => { + let subscriptionCount = 0; + const obs = new Observable(observer => { + subscriptionCount++; + }); - expect(subscriptionCount).to.equal(0); + const source = obs.pipe(share()); - source.subscribe(); - source.subscribe(); + expect(subscriptionCount).to.equal(0); - expect(subscriptionCount).to.equal(1); - }); + source.subscribe(); + source.subscribe(); - it('should not change the output of the observable when error', () => { - const e1 = hot('---a--^--b--c--d--e--#'); - const e1subs = '^ !'; - const expected = '---b--c--d--e--#'; + expect(subscriptionCount).to.equal(1); + }); - expectObservable(e1.pipe(share())).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); + it('should not change the output of the observable when error', () => { + const e1 = hot('---a--^--b--c--d--e--#'); + const e1subs = '^ !'; + const expected = '---b--c--d--e--#'; - it('should not change the output of the observable when successful with cold observable', () => { - const e1 = cold('---a--b--c--d--e--|'); - const e1subs = '^ !'; - const expected = '---a--b--c--d--e--|'; + expectObservable(e1.pipe(share())).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); - expectObservable(e1.pipe(share())).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); + it('should not change the output of the observable when successful with cold observable', () => { + const e1 = cold('---a--b--c--d--e--|'); + const e1subs = '^ !'; + const expected = '---a--b--c--d--e--|'; - it('should not change the output of the observable when error with cold observable', () => { - const e1 = cold('---a--b--c--d--e--#'); - const e1subs = '^ !'; - const expected = '---a--b--c--d--e--#'; + expectObservable(e1.pipe(share())).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); - expectObservable(e1.pipe(share())).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); + it('should not change the output of the observable when error with cold observable', () => { + const e1 = cold('---a--b--c--d--e--#'); + const e1subs = '^ !'; + const expected = '---a--b--c--d--e--#'; - it('should retry just fine', () => { - const e1 = cold('---a--b--c--d--e--#'); - const e1subs = ['^ ! ', - ' ^ !']; - const expected = '---a--b--c--d--e-----a--b--c--d--e--#'; + expectObservable(e1.pipe(share())).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); - expectObservable(e1.pipe(share(), retry(1))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); + it('should retry just fine', () => { + const e1 = cold('---a--b--c--d--e--#'); + const e1subs = ['^ ! ', + ' ^ !']; + const expected = '---a--b--c--d--e-----a--b--c--d--e--#'; - it('should share the same values to multiple observers', () => { - const source = cold('-1-2-3----4-|'); - const sourceSubs = '^ !'; - const shared = source.pipe(share()); - const subscriber1 = hot('a| ').pipe(mergeMapTo(shared)); - const expected1 = '-1-2-3----4-|'; - const subscriber2 = hot(' b| ').pipe(mergeMapTo(shared)); - const expected2 = ' -3----4-|'; - const subscriber3 = hot(' c| ').pipe(mergeMapTo(shared)); - const expected3 = ' --4-|'; - - expectObservable(subscriber1).toBe(expected1); - expectObservable(subscriber2).toBe(expected2); - expectObservable(subscriber3).toBe(expected3); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); + expectObservable(e1.pipe(share(), retry(1))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); - it('should share an error from the source to multiple observers', () => { - const source = cold('-1-2-3----4-#'); - const sourceSubs = '^ !'; - const shared = source.pipe(share()); - const subscriber1 = hot('a| ').pipe(mergeMapTo(shared)); - const expected1 = '-1-2-3----4-#'; - const subscriber2 = hot(' b| ').pipe(mergeMapTo(shared)); - const expected2 = ' -3----4-#'; - const subscriber3 = hot(' c| ').pipe(mergeMapTo(shared)); - const expected3 = ' --4-#'; - - expectObservable(subscriber1).toBe(expected1); - expectObservable(subscriber2).toBe(expected2); - expectObservable(subscriber3).toBe(expected3); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); + it('should share the same values to multiple observers', () => { + const source = cold('-1-2-3----4-|'); + const sourceSubs = '^ !'; + const shared = source.pipe(share()); + const subscriber1 = hot('a| ').pipe(mergeMapTo(shared)); + const expected1 = '-1-2-3----4-|'; + const subscriber2 = hot(' b| ').pipe(mergeMapTo(shared)); + const expected2 = ' -3----4-|'; + const subscriber3 = hot(' c| ').pipe(mergeMapTo(shared)); + const expected3 = ' --4-|'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); - it('should share the same values to multiple observers, ' + - 'but is unsubscribed explicitly and early', () => { - const source = cold('-1-2-3----4-|'); - const sourceSubs = '^ ! '; - const shared = source.pipe(share()); - const unsub = ' ! '; - const subscriber1 = hot('a| ').pipe(mergeMapTo(shared)); - const expected1 = '-1-2-3---- '; - const subscriber2 = hot(' b| ').pipe(mergeMapTo(shared)); - const expected2 = ' -3---- '; - const subscriber3 = hot(' c| ').pipe(mergeMapTo(shared)); - const expected3 = ' -- '; - - expectObservable(subscriber1, unsub).toBe(expected1); - expectObservable(subscriber2, unsub).toBe(expected2); - expectObservable(subscriber3, unsub).toBe(expected3); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); + it('should share an error from the source to multiple observers', () => { + const source = cold('-1-2-3----4-#'); + const sourceSubs = '^ !'; + const shared = source.pipe(share()); + const subscriber1 = hot('a| ').pipe(mergeMapTo(shared)); + const expected1 = '-1-2-3----4-#'; + const subscriber2 = hot(' b| ').pipe(mergeMapTo(shared)); + const expected2 = ' -3----4-#'; + const subscriber3 = hot(' c| ').pipe(mergeMapTo(shared)); + const expected3 = ' --4-#'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); - it('should share an empty source', () => { - const source = cold('|'); - const sourceSubs = '(^!)'; - const shared = source.pipe(share()); - const expected = '|'; + it('should share the same values to multiple observers, ' + + 'but is unsubscribed explicitly and early', () => { + const source = cold('-1-2-3----4-|'); + const sourceSubs = '^ ! '; + const shared = source.pipe(share()); + const unsub = ' ! '; + const subscriber1 = hot('a| ').pipe(mergeMapTo(shared)); + const expected1 = '-1-2-3---- '; + const subscriber2 = hot(' b| ').pipe(mergeMapTo(shared)); + const expected2 = ' -3---- '; + const subscriber3 = hot(' c| ').pipe(mergeMapTo(shared)); + const expected3 = ' -- '; + + expectObservable(subscriber1, unsub).toBe(expected1); + expectObservable(subscriber2, unsub).toBe(expected2); + expectObservable(subscriber3, unsub).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); - expectObservable(shared).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); + it('should share an empty source', () => { + const source = cold('|'); + const sourceSubs = '(^!)'; + const shared = source.pipe(share()); + const expected = '|'; - it('should share a never source', () => { - const source = cold('-'); - const sourceSubs = '^'; - const shared = source.pipe(share()); - const expected = '-'; + expectObservable(shared).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); - expectObservable(shared).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); + it('should share a never source', () => { + const source = cold('-'); + const sourceSubs = '^'; + const shared = source.pipe(share()); + const expected = '-'; - it('should share a throw source', () => { - const source = cold('#'); - const sourceSubs = '(^!)'; - const shared = source.pipe(share()); - const expected = '#'; + expectObservable(shared).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); - expectObservable(shared).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); + it('should share a throw source', () => { + const source = cold('#'); + const sourceSubs = '(^!)'; + const shared = source.pipe(share()); + const expected = '#'; - it('should connect when first subscriber subscribes', () => { - const source = cold( '-1-2-3----4-|'); - const sourceSubs = ' ^ !'; - const shared = source.pipe(share()); - const subscriber1 = hot(' a| ').pipe(mergeMapTo(shared)); - const expected1 = ' -1-2-3----4-|'; - const subscriber2 = hot(' b| ').pipe(mergeMapTo(shared)); - const expected2 = ' -3----4-|'; - const subscriber3 = hot(' c| ').pipe(mergeMapTo(shared)); - const expected3 = ' --4-|'; - - expectObservable(subscriber1).toBe(expected1); - expectObservable(subscriber2).toBe(expected2); - expectObservable(subscriber3).toBe(expected3); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); + expectObservable(shared).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); - it('should disconnect when last subscriber unsubscribes', () => { - const source = cold( '-1-2-3----4-|'); - const sourceSubs = ' ^ ! '; - const shared = source.pipe(share()); - const subscriber1 = hot(' a| ').pipe(mergeMapTo(shared)); - const unsub1 = ' ! '; - const expected1 = ' -1-2-3-- '; - const subscriber2 = hot(' b| ').pipe(mergeMapTo(shared)); - const unsub2 = ' ! '; - const expected2 = ' -3---- '; - - expectObservable(subscriber1, unsub1).toBe(expected1); - expectObservable(subscriber2, unsub2).toBe(expected2); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); + it('should connect when first subscriber subscribes', () => { + const source = cold( '-1-2-3----4-|'); + const sourceSubs = ' ^ !'; + const shared = source.pipe(share()); + const subscriber1 = hot(' a| ').pipe(mergeMapTo(shared)); + const expected1 = ' -1-2-3----4-|'; + const subscriber2 = hot(' b| ').pipe(mergeMapTo(shared)); + const expected2 = ' -3----4-|'; + const subscriber3 = hot(' c| ').pipe(mergeMapTo(shared)); + const expected3 = ' --4-|'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); - it('should not break unsubscription chain when last subscriber unsubscribes', () => { - const source = cold( '-1-2-3----4-|'); - const sourceSubs = ' ^ ! '; - const shared = source.pipe( - mergeMap((x: string) => of(x)), - share(), - mergeMap((x: string) => of(x)) - ); - const subscriber1 = hot(' a| ').pipe(mergeMapTo(shared)); - const unsub1 = ' ! '; - const expected1 = ' -1-2-3-- '; - const subscriber2 = hot(' b| ').pipe(mergeMapTo(shared)); - const unsub2 = ' ! '; - const expected2 = ' -3---- '; - - expectObservable(subscriber1, unsub1).toBe(expected1); - expectObservable(subscriber2, unsub2).toBe(expected2); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); + it('should disconnect when last subscriber unsubscribes', () => { + const source = cold( '-1-2-3----4-|'); + const sourceSubs = ' ^ ! '; + const shared = source.pipe(share()); + const subscriber1 = hot(' a| ').pipe(mergeMapTo(shared)); + const unsub1 = ' ! '; + const expected1 = ' -1-2-3-- '; + const subscriber2 = hot(' b| ').pipe(mergeMapTo(shared)); + const unsub2 = ' ! '; + const expected2 = ' -3---- '; + + expectObservable(subscriber1, unsub1).toBe(expected1); + expectObservable(subscriber2, unsub2).toBe(expected2); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); - it('should be retryable when cold source is synchronous', () => { - const source = cold('(123#)'); - const shared = source.pipe(share()); - const subscribe1 = 's '; - const expected1 = '(123123#) '; - const subscribe2 = ' s '; - const expected2 = ' (123123#)'; - const sourceSubs = ['(^!)', - '(^!)', - ' (^!)', - ' (^!)']; - - expectObservable(hot(subscribe1).pipe(tap(() => { - expectObservable(shared.pipe(retry(1))).toBe(expected1); - }))).toBe(subscribe1); - - expectObservable(hot(subscribe2).pipe(tap(() => { - expectObservable(shared.pipe(retry(1))).toBe(expected2); - }))).toBe(subscribe2); - - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); + it('should not break unsubscription chain when last subscriber unsubscribes', () => { + const source = cold( '-1-2-3----4-|'); + const sourceSubs = ' ^ ! '; + const shared = source.pipe( + mergeMap((x: string) => of(x)), + share(), + mergeMap((x: string) => of(x)) + ); + const subscriber1 = hot(' a| ').pipe(mergeMapTo(shared)); + const unsub1 = ' ! '; + const expected1 = ' -1-2-3-- '; + const subscriber2 = hot(' b| ').pipe(mergeMapTo(shared)); + const unsub2 = ' ! '; + const expected2 = ' -3---- '; + + expectObservable(subscriber1, unsub1).toBe(expected1); + expectObservable(subscriber2, unsub2).toBe(expected2); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); - it('should be repeatable when cold source is synchronous', () => { - const source = cold('(123|)'); - const shared = source.pipe(share()); - const subscribe1 = 's '; - const expected1 = '(123123|) '; - const subscribe2 = ' s '; - const expected2 = ' (123123|)'; - const sourceSubs = ['(^!)', - '(^!)', - ' (^!)', - ' (^!)']; - - expectObservable(hot(subscribe1).pipe(tap(() => { - expectObservable(shared.pipe(repeat(2))).toBe(expected1); - }))).toBe(subscribe1); - - expectObservable(hot(subscribe2).pipe(tap(() => { - expectObservable(shared.pipe(repeat(2))).toBe(expected2); - }))).toBe(subscribe2); - - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); + it('should be retryable when cold source is synchronous', () => { + const source = cold('(123#)'); + const shared = source.pipe(share()); + const subscribe1 = 's '; + const expected1 = '(123123#) '; + const subscribe2 = ' s '; + const expected2 = ' (123123#)'; + const sourceSubs = ['(^!)', + '(^!)', + ' (^!)', + ' (^!)']; + + expectObservable(hot(subscribe1).pipe(tap(() => { + expectObservable(shared.pipe(retry(1))).toBe(expected1); + }))).toBe(subscribe1); + + expectObservable(hot(subscribe2).pipe(tap(() => { + expectObservable(shared.pipe(retry(1))).toBe(expected2); + }))).toBe(subscribe2); + + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); - it('should be retryable', () => { - const source = cold('-1-2-3----4-# '); - const sourceSubs = ['^ ! ', - ' ^ ! ', - ' ^ !']; - const shared = source.pipe(share()); - const subscribe1 = 's '; - const expected1 = '-1-2-3----4--1-2-3----4--1-2-3----4-#'; - const subscribe2 = ' s '; - const expected2 = ' -3----4--1-2-3----4--1-2-3----4-#'; - - expectObservable(hot(subscribe1).pipe(tap(() => { - expectObservable(shared.pipe(retry(2))).toBe(expected1); - }))).toBe(subscribe1); - - expectObservable(hot(subscribe2).pipe(tap(() => { - expectObservable(shared.pipe(retry(2))).toBe(expected2); - }))).toBe(subscribe2); - - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); + it('should be repeatable when cold source is synchronous', () => { + const source = cold('(123|)'); + const shared = source.pipe(share()); + const subscribe1 = 's '; + const expected1 = '(123123|) '; + const subscribe2 = ' s '; + const expected2 = ' (123123|)'; + const sourceSubs = ['(^!)', + '(^!)', + ' (^!)', + ' (^!)']; + + expectObservable(hot(subscribe1).pipe(tap(() => { + expectObservable(shared.pipe(repeat(2))).toBe(expected1); + }))).toBe(subscribe1); + + expectObservable(hot(subscribe2).pipe(tap(() => { + expectObservable(shared.pipe(repeat(2))).toBe(expected2); + }))).toBe(subscribe2); + + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); - it('should be repeatable', () => { - const source = cold('-1-2-3----4-| '); - const sourceSubs = ['^ ! ', - ' ^ ! ', - ' ^ !']; - const shared = source.pipe(share()); - const subscribe1 = 's '; - const expected1 = '-1-2-3----4--1-2-3----4--1-2-3----4-|'; - const subscribe2 = ' s '; - const expected2 = ' -3----4--1-2-3----4--1-2-3----4-|'; - - expectObservable(hot(subscribe1).pipe(tap(() => { - expectObservable(shared.pipe(repeat(3))).toBe(expected1); - }))).toBe(subscribe1); - - expectObservable(hot(subscribe2).pipe(tap(() => { - expectObservable(shared.pipe(repeat(3))).toBe(expected2); - }))).toBe(subscribe2); - - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); + it('should be retryable', () => { + const source = cold('-1-2-3----4-# '); + const sourceSubs = ['^ ! ', + ' ^ ! ', + ' ^ !']; + const shared = source.pipe(share()); + const subscribe1 = 's '; + const expected1 = '-1-2-3----4--1-2-3----4--1-2-3----4-#'; + const subscribe2 = ' s '; + const expected2 = ' -3----4--1-2-3----4--1-2-3----4-#'; + + expectObservable(hot(subscribe1).pipe(tap(() => { + expectObservable(shared.pipe(retry(2))).toBe(expected1); + }))).toBe(subscribe1); + + expectObservable(hot(subscribe2).pipe(tap(() => { + expectObservable(shared.pipe(retry(2))).toBe(expected2); + }))).toBe(subscribe2); + + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); - it('should not change the output of the observable when never', () => { - const e1 = NEVER; - const expected = '-'; + it('should be repeatable', () => { + const source = cold('-1-2-3----4-| '); + const sourceSubs = ['^ ! ', + ' ^ ! ', + ' ^ !']; + const shared = source.pipe(share()); + const subscribe1 = 's '; + const expected1 = '-1-2-3----4--1-2-3----4--1-2-3----4-|'; + const subscribe2 = ' s '; + const expected2 = ' -3----4--1-2-3----4--1-2-3----4-|'; + + expectObservable(hot(subscribe1).pipe(tap(() => { + expectObservable(shared.pipe(repeat(3))).toBe(expected1); + }))).toBe(subscribe1); + + expectObservable(hot(subscribe2).pipe(tap(() => { + expectObservable(shared.pipe(repeat(3))).toBe(expected2); + }))).toBe(subscribe2); + + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); - expectObservable(e1.pipe(share())).toBe(expected); - }); + it('should not change the output of the observable when never', () => { + const e1 = NEVER; + const expected = '-'; + + expectObservable(e1.pipe(share())).toBe(expected); + }); + + it('should not change the output of the observable when empty', () => { + const e1 = EMPTY; + const expected = '|'; - it('should not change the output of the observable when empty', () => { - const e1 = EMPTY; - const expected = '|'; + expectObservable(e1.pipe(share())).toBe(expected); + }); - expectObservable(e1.pipe(share())).toBe(expected); + // TODO: fix firehose unsubscription + it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable(subscriber => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe( + share(), + take(3), + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); + }); }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { - const sideEffects: number[] = []; - const synchronousObservable = new Observable(subscriber => { - // This will check to see if the subscriber was closed on each loop - // when the unsubscribe hits (from the `take`), it should be closed - for (let i = 0; !subscriber.closed && i < 10; i++) { - sideEffects.push(i); - subscriber.next(i); - } + describe('share(config)', () => { + let rxTest: TestScheduler; + + beforeEach(() => { + rxTest = new TestScheduler(observableMatcher); + }); + + it('should not reset on error if configured to do so', () => { + rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { + const source = hot('---a---b---c---d---e---f----#'); + const expected = ' ---a---b---c---d---e---f----#'; + const sourceSubs = [ + ' ^----------!', + ' -----------^-----------!', + ' -----------------------^----!' + ]; + const result = source.pipe( + // takes a, b, c... then repeat causes it to take d, e, f + take(3), + share({ + resetOnError: false + }), + repeat() + ); + + expectObservable(result).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); - synchronousObservable.pipe( - share(), - take(3), - ).subscribe(() => { /* noop */ }); + it('should not reset on complete if configured to do so', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold('---a---b---c---#'); + const expected = ' ---a---b---c------a---b---c------a---b---|'; + const sourceSubs = [ + ' ^--------------!', + ' ---------------^--------------!', + ' ------------------------------^----------!' + ]; + + // Used to trigger the source to complete at a given moment. + const triggerComplete = new Subject(); + + // just used to count how many values have made it through the share. + let count = 0; + + const result = source.pipe( + takeUntil(triggerComplete), + share({ + resetOnComplete: false + }), + // Retry on any error. + retry(), + tap(() => { + if (++count === 9) { + // If we see the ninth value, complete the source this time. + triggerComplete.next(); + } + }) + ); + + expectObservable(result).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + }); + + it('should not reset on refCount 0 if configured to do so', () => { + rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { + const source = hot(' ---v---v---v---E--v---v---v---C---v----v------v---'); + const expected = ' ---v---v---v------v---v---v-------v----v----'; + const subscription = '^-------------------------------------------!'; + const sourceSubs = [ + ' ^--------------!', + ' ---------------^--------------!', + // Note this last subscription never ends, because refCount hitting zero isn't going to reset. + ' ------------------------------^--------------' + ]; + + const result = source.pipe( + tap(value => { + if (value === 'E') { + throw new Error('E'); + } + }), + takeWhile(value => value !== 'C'), + share({ + resetOnRefCountZero: false + }), + retry(), + repeat(), + ); + + expectObservable(result, subscription).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + }); - expect(sideEffects).to.deep.equal([0, 1, 2]); + it('should use the connector function provided', () => { + const connector = sinon.spy(() => new Subject()); + + rxTest.run(({ hot, expectObservable }) => { + const source = hot(' ---v---v---v---E--v---v---v---C---v----v--------v----v---'); + const subs1 = ' ^-------------------------------------------!'; + const expResult1 = ' ---v---v---v------v---v---v-------v----v-----'; + const subs2 = ' ----------------------------------------------^---------!'; + const expResult2 = ' ------------------------------------------------v----v---'; + + + const result = source.pipe( + tap(value => { + if (value === 'E') { + throw new Error('E'); + } + }), + takeWhile(value => value !== 'C'), + share({ + connector + }), + retry(), + repeat(), + ); + + expectObservable(result, subs1).toBe(expResult1); + expectObservable(result, subs2).toBe(expResult2); + }); + + expect(connector).to.have.callCount(4); + }) }); -}); +}); \ No newline at end of file diff --git a/spec/schedulers/TestScheduler-spec.ts b/spec/schedulers/TestScheduler-spec.ts index 4873dfb12e..e58bf6cf5a 100644 --- a/spec/schedulers/TestScheduler-spec.ts +++ b/spec/schedulers/TestScheduler-spec.ts @@ -387,14 +387,18 @@ describe('TestScheduler', () => { expect(expectObservable).to.be.a('function'); expect(expectSubscriptions).to.be.a('function'); - const obs1 = cold('-a-c-e|'); + const obs1 = cold('-a-c-e|'); const obs2 = hot(' ^-b-d-f|'); const output = merge(obs1, obs2); const expected = ' -abcdef|'; expectObservable(output).toBe(expected); - expectSubscriptions(obs1.subscriptions).toBe('^-----!'); - expectSubscriptions(obs2.subscriptions).toBe('^------!'); + expectObservable(output).toEqual(cold(expected)); + // There are two subscriptions to each of these, because we merged + // them together, then we subscribed to the merged result once + // to check `toBe` and another time to check `toEqual`. + expectSubscriptions(obs1.subscriptions).toBe(['^-----!', '^-----!']); + expectSubscriptions(obs2.subscriptions).toBe(['^------!', '^------!']); }); }); diff --git a/src/index.ts b/src/index.ts index cc725f1e1c..6afbc59fb7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -51,6 +51,7 @@ export { bindCallback } from './internal/observable/bindCallback'; export { bindNodeCallback } from './internal/observable/bindNodeCallback'; export { combineLatest } from './internal/observable/combineLatest'; export { concat } from './internal/observable/concat'; +export { connectable } from './internal/observable/connectable'; export { defer } from './internal/observable/defer'; export { empty } from './internal/observable/empty'; export { forkJoin } from './internal/observable/forkJoin'; diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index ea64715de8..c78bd9e90c 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -4,7 +4,7 @@ import { Operator } from './Operator'; import { SafeSubscriber, Subscriber } from './Subscriber'; import { isSubscription, Subscription } from './Subscription'; -import { TeardownLogic, OperatorFunction, PartialObserver, Subscribable, Observer } from './types'; +import { TeardownLogic, OperatorFunction, Subscribable, Observer } from './types'; import { observable as Symbol_observable } from './symbol/observable'; import { pipeFromArray } from './util/pipe'; import { config } from './config'; @@ -68,7 +68,7 @@ export class Observable implements Subscribable { return observable; } - subscribe(observer?: PartialObserver): Subscription; + subscribe(observer?: Partial>): Subscription; /** @deprecated Use an observer instead of a complete callback */ subscribe(next: null | undefined, error: null | undefined, complete: () => void): Subscription; /** @deprecated Use an observer instead of an error callback */ @@ -202,7 +202,7 @@ export class Observable implements Subscribable { * @method subscribe */ subscribe( - observerOrNext?: PartialObserver | ((value: T) => void) | null, + observerOrNext?: Partial> | ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null ): Subscription { diff --git a/src/internal/Subscriber.ts b/src/internal/Subscriber.ts index 637b1e58ce..f5a4555c40 100644 --- a/src/internal/Subscriber.ts +++ b/src/internal/Subscriber.ts @@ -1,6 +1,6 @@ /** @prettier */ import { isFunction } from './util/isFunction'; -import { Observer, PartialObserver, ObservableNotification } from './types'; +import { Observer, ObservableNotification } from './types'; import { isSubscription, Subscription } from './Subscription'; import { config } from './config'; import { reportUnhandledError } from './util/reportUnhandledError'; @@ -126,7 +126,7 @@ export class Subscriber extends Subscription implements Observer { export class SafeSubscriber extends Subscriber { constructor( - observerOrNext?: PartialObserver | ((value: T) => void) | null, + observerOrNext?: Partial> | ((value: T) => void) | null, error?: ((e?: any) => void) | null, complete?: (() => void) | null ) { @@ -204,7 +204,7 @@ function handleStoppedNotification(notification: ObservableNotification, su * pass any arguments to `subscribe`. Comes with the default error handling * behavior. */ -export const EMPTY_OBSERVER: Readonly> = { +export const EMPTY_OBSERVER: Readonly> & { closed: true } = { closed: true, next: noop, error: defaultErrorHandler, diff --git a/src/internal/observable/ConnectableObservable.ts b/src/internal/observable/ConnectableObservable.ts index 0650cfcdf5..61374e219b 100644 --- a/src/internal/observable/ConnectableObservable.ts +++ b/src/internal/observable/ConnectableObservable.ts @@ -8,12 +8,25 @@ import { OperatorSubscriber } from '../operators/OperatorSubscriber'; /** * @class ConnectableObservable + * @deprecated To be removed in version 8. Please use {@link connectable} to create a connectable observable. + * If you are using the `refCount` method of `ConnectableObservable` you can use the updated {@link share} operator + * instead, which is now highly configurable. */ export class ConnectableObservable extends Observable { protected _subject: Subject | null = null; protected _refCount: number = 0; protected _connection: Subscription | null = null; + /** + * @param source The source observable + * @param subjectFactory The factory that creates the subject used internally. + * @deprecated To be removed in version 8. Please use {@link connectable} to create a connectable observable. + * If you are using the `refCount` method of `ConnectableObservable` you can use the {@link share} operator + * instead, which is now highly configurable. `new ConnectableObservable(source, fn)` is equivalent + * to `connectable(source, fn)`. With the exception of when the `refCount()` method is needed, in which + * case, the new {@link share} operator should be used: `new ConnectableObservable(source, fn).refCount()` + * is equivalent to `source.pipe(share({ connector: fn }))`. + */ constructor(public source: Observable, protected subjectFactory: () => Subject) { super(); } @@ -68,6 +81,10 @@ export class ConnectableObservable extends Observable { return connection; } + /** + * @deprecated The {@link ConnectableObservable} class is scheduled for removal in version 8. + * Please use the {@link share} operator, which is now highly configurable. + */ refCount(): Observable { return higherOrderRefCount()(this) as Observable; } diff --git a/src/internal/observable/connectable.ts b/src/internal/observable/connectable.ts new file mode 100644 index 0000000000..df7aa81a7c --- /dev/null +++ b/src/internal/observable/connectable.ts @@ -0,0 +1,51 @@ +/** @prettier */ + +import { ObservableInput } from '../types'; +import { Subject } from '../Subject'; +import { Subscription } from '../Subscription'; +import { Observable } from '../Observable'; +import { defer } from './defer'; + +/** + * An observable with a `connect` method that is used to create a subscription + * to an underlying source, connecting it with all consumers via a multicast. + */ +export interface ConnectableObservableLike extends Observable { + /** + * (Idempotent) Calling this method will connect the underlying source observable to all subscribed consumers + * through an underlying {@link Subject}. + * @returns A subscription, that when unsubscribed, will "disconnect" the source from the connector subject, + * severing notifications to all consumers. + */ + connect(): Subscription; +} + +/** + * Creates an observable that multicasts once `connect()` is called on it. + * + * @param source The observable source to make connectable. + * @param connector The subject to used to multicast the source observable to all subscribers. + * Defaults to a new {@link Subject}. + * @returns A "connectable" observable, that has a `connect()` method, that you must call to + * connect the source to all consumers through the subject provided as the connector. + */ +export function connectable(source: ObservableInput, connector: Subject = new Subject()): ConnectableObservableLike { + // The subscription representing the connection. + let connection: Subscription | null = null; + + const result: any = new Observable((subscriber) => { + return connector.subscribe(subscriber); + }); + + // Define the `connect` function. This is what users must call + // in order to "connect" the source to the subject that is + // multicasting it. + result.connect = () => { + if (!connection) { + connection = defer(() => source).subscribe(connector); + } + return connection; + }; + + return result; +} diff --git a/src/internal/observable/fromSubscribable.ts b/src/internal/observable/fromSubscribable.ts new file mode 100644 index 0000000000..4c145d092d --- /dev/null +++ b/src/internal/observable/fromSubscribable.ts @@ -0,0 +1,18 @@ +/** @prettier */ +import { Observable } from '../Observable'; +import { Subscriber } from '../Subscriber'; +import { Subscribable } from '../types'; + +/** + * Used to convert a subscribable to an observable. + * + * Currently, this is only used within internals. + * + * TODO: Discuss ObservableInput supporting "Subscribable". + * https://github.com/ReactiveX/rxjs/issues/5909 + * + * @param subscribable A subscribable + */ +export function fromSubscribable(subscribable: Subscribable) { + return new Observable((subscriber: Subscriber) => subscribable.subscribe(subscriber)); +} diff --git a/src/internal/operators/connect.ts b/src/internal/operators/connect.ts new file mode 100644 index 0000000000..7e7357fbdc --- /dev/null +++ b/src/internal/operators/connect.ts @@ -0,0 +1,108 @@ +/** @prettier */ +import { OperatorFunction, ObservableInput, SubjectLike } from '../types'; +import { Observable } from '../Observable'; +import { Subject } from '../Subject'; +import { from } from '../observable/from'; +import { operate } from '../util/lift'; +import { fromSubscribable } from '../observable/fromSubscribable'; + +export interface ConnectConfig { + /** + * A factory function used to create the Subject through which the source + * is multicast. By default this creates a {@link Subject}. + */ + connector: () => SubjectLike; +} + +/** + * The default configuration for `connect`. + */ +const DEFAULT_CONFIG: ConnectConfig = { + connector: () => new Subject(), +}; + +/** + * Creates an observable by multicasting the source within a function that + * allows the developer to define the usage of the multicast prior to connection. + * + * This is particularly useful if the observable source you wish to multicast could + * be synchronous or asynchronous. This sets it apart from {@link share}, which, in the + * case of totally synchronous sources will fail to share a single subscription with + * multiple consumers, as by the time the subscription to the result of {@link share} + * has returned, if the source is synchronous its internal reference count will jump from + * 0 to 1 back to 0 and reset. + * + * To use `connect`, you provide a `selector` function that will give you + * a multicast observable that is not yet connected. You then use that multicast observable + * to create a resulting observable that, when subscribed, will set up your multicast. This is + * generally, but not always, accomplished with {@link merge}. + * + * Note that using a {@link takeUntil} inside of `connect`'s `selector` _might_ mean you were looking + * to use the {@link takeWhile} operator instead. + * + * When you subscribe to the result of `connect`, the `selector` function will be called. After + * the `selector` function returns, the observable it returns will be subscribed to, _then_ the + * multicast will be connected to the source. + * + * ### Example + * + * Sharing a totally synchronous observable + * + * ```ts + * import { defer, of } from 'rxjs'; + * import { tap, connect } from 'rxjs/operators'; + * + * const source$ = defer(() => { + * console.log('subscription started'); + * return of(1, 2, 3, 4, 5).pipe( + * tap(n => console.log(`source emitted ${n}`)) + * ); + * }); + * + * source$.pipe( + * // Notice in here we're merging 3 subscriptions to `shared$`. + * connect((shared$) => merge( + * shared$.pipe(map(n => `all ${n}`)), + * shared$.pipe(filter(n => n % 2 === 0), map(n => `even ${n}`)), + * shared$.pipe(filter(n => n % 2 === 1), map(n => `odd ${n}`)), + * )) + * ) + * .subscribe(console.log); + * + * // Expected output: (notice only one subscription) + * "subscription started" + * "source emitted 1" + * "all 1" + * "odd 1" + * "source emitted 2" + * "all 2" + * "even 2" + * "source emitted 3" + * "all 3" + * "odd 3" + * "source emitted 4" + * "all 4" + * "even 4" + * "source emitted 5" + * "all 5" + * "odd 5" + * ``` + * + * @param selector A function used to set up the multicast. Gives you a multicast observable + * that is not yet connected. With that, you're expected to create and return + * and Observable, that when subscribed to, will utilize the multicast observable. + * After this function is executed -- and its return value subscribed to -- the + * the operator will subscribe to the source, and the connection will be made. + * @param param0 The configuration object for `connect`. + */ +export function connect( + selector: (shared: Observable) => ObservableInput, + config: ConnectConfig = DEFAULT_CONFIG +): OperatorFunction { + const { connector } = config; + return operate((source, subscriber) => { + const subject = connector(); + from(selector(fromSubscribable(subject))).subscribe(subscriber); + subscriber.add(source.subscribe(subject)); + }); +} diff --git a/src/internal/operators/multicast.ts b/src/internal/operators/multicast.ts index 3f9ae7285b..1f5ec936d0 100644 --- a/src/internal/operators/multicast.ts +++ b/src/internal/operators/multicast.ts @@ -3,39 +3,70 @@ import { Subject } from '../Subject'; import { Observable } from '../Observable'; import { ConnectableObservable } from '../observable/ConnectableObservable'; import { OperatorFunction, UnaryFunction, ObservedValueOf, ObservableInput } from '../types'; -import { hasLift, operate } from '../util/lift'; +import { hasLift } from '../util/lift'; import { isFunction } from '../util/isFunction'; +import { connect } from './connect'; -/* tslint:disable:max-line-length */ +/** + * An operator that creates a {@link ConnectableObservable}, that when connected, + * with the `connect` method, will use the provided subject to multicast the values + * from the source to all consumers. + * + * @param subject The subject to multicast through. + * @deprecated This will be removed in version 8. Please use the {@link connectable} creation + * function, which creates a connectable observable. If you were using the {@link refCount} operator + * on the result of the `multicast` operator, then use the {@link share} operator, which is now + * highly configurable. `multicast(subject), refCount()` is equivalent to + * `share({ connector: () => subject, resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })`. + */ export function multicast(subject: Subject): UnaryFunction, ConnectableObservable>; + +/** + * Because this is deprecated in favor of the {@link connect} operator, and was otherwise poorly documented, + * rather than duplicate the effort of documenting the same behavior, please see documentation for the + * {@link connect} operator. + * + * @param subject The subject used to multicast. + * @param selector A setup function to setup the multicast + * @deprecated To be removed in version 8. Please use the new {@link connect} operator. + * `multicast(subject, fn)` is equivalent to `connect({ connector: () => subject, setup: fn })`. + */ export function multicast>( subject: Subject, selector: (shared: Observable) => O -): UnaryFunction, ConnectableObservable>>; -export function multicast(subjectFactory: (this: Observable) => Subject): UnaryFunction, ConnectableObservable>; -export function multicast>( - SubjectFactory: (this: Observable) => Subject, - selector: (shared: Observable) => O ): OperatorFunction>; -/* tslint:enable:max-line-length */ /** - * Returns an Observable that emits the results of invoking a specified selector on items - * emitted by a ConnectableObservable that shares a single subscription to the underlying stream. + * An operator that creates a {@link ConnectableObservable}, that when connected, + * with the `connect` method, will use the provided subject to multicast the values + * from the source to all consumers. * - * ![](multicast.png) + * @param subjectFactory A factory that will be called to create the subject. Passing a function here + * will cause the underlying subject to be "reset" on error, completion, or refCounted unsubscription of + * the source. + * @deprecated This will be removed in version 8. Please use the {@link connectable} creation + * function, which creates a connectable observable. If you were using the {@link refCount} operator + * on the result of the `multicast` operator, then use the {@link share} operator, which is now + * highly configurable. `multicast(() => new BehaviorSubject('test'))), refCount()` is equivalent to + * `share({ connector: () => new BehaviorSubject('test') })`. + */ +export function multicast(subjectFactory: () => Subject): UnaryFunction, ConnectableObservable>; + +/** + * Because this is deprecated in favor of the {@link connect} operator, and was otherwise poorly documented, + * rather than duplicate the effort of documenting the same behavior, please see documentation for the + * {@link connect} operator. * - * @param {Function|Subject} subjectOrSubjectFactory - Factory function to create an intermediate subject through - * which the source sequence's elements will be multicasted to the selector function - * or Subject to push source elements into. - * @param {Function} [selector] - Optional selector function that can use the multicasted source stream - * as many times as needed, without causing multiple subscriptions to the source stream. - * Subscribers to the given source will receive all notifications of the source from the - * time of the subscription forward. - * @return {Observable} An Observable that emits the results of invoking the selector - * on the items emitted by a `ConnectableObservable` that shares a single subscription to - * the underlying stream. + * @param subjectFactory A factory that creates the subject used to multicast. + * @param selector A function to setup the multicast and select the output. + * @deprecated To be removed in version 8. Please use the new {@link connect} operator. + * `multicast(subjectFactor, selector)` is equivalent to `connect(selector, { connector: subjectFactory })`. */ +export function multicast>( + subjectFactory: () => Subject, + selector: (shared: Observable) => O +): OperatorFunction>; + export function multicast( subjectOrSubjectFactory: Subject | (() => Subject), selector?: (source: Observable) => Observable @@ -43,14 +74,11 @@ export function multicast( const subjectFactory = isFunction(subjectOrSubjectFactory) ? subjectOrSubjectFactory : () => subjectOrSubjectFactory; if (isFunction(selector)) { - return operate((source, subscriber) => { - const subject = subjectFactory(); - // Intentionally terse code: Subscribe to the result of the selector, - // then immediately connect the source through the subject, adding - // that to the resulting subscription. The act of subscribing with `this`, - // the primary destination subscriber, will automatically add the subscription - // to the result. - selector(subject).subscribe(subscriber).add(source.subscribe(subject)); + // If a selector function is provided, then we're a "normal" operator that isn't + // going to return a ConnectableObservable. We can use `connect` to do what we + // need to do. + return connect(selector, { + connector: subjectFactory, }); } diff --git a/src/internal/operators/publish.ts b/src/internal/operators/publish.ts index d76accfbc4..2dfa9ebfb2 100644 --- a/src/internal/operators/publish.ts +++ b/src/internal/operators/publish.ts @@ -1,14 +1,36 @@ +/** @prettier */ import { Observable } from '../Observable'; import { Subject } from '../Subject'; import { multicast } from './multicast'; import { ConnectableObservable } from '../observable/ConnectableObservable'; import { MonoTypeOperatorFunction, OperatorFunction, UnaryFunction, ObservableInput, ObservedValueOf } from '../types'; +import { connect } from './connect'; -/* tslint:disable:max-line-length */ +/** + * Returns a connectable observable that, when connected, will multicast + * all values through a single underlying {@link Subject} instance. + * + * @deprecated To be removed in version 8. If you're using `publish()` to get a connectable observable, + * please use the new {@link connectable} creation function. `source.pipe(publish())` is + * equivalent to `connectable(source, () => new Subject())`. If you're calling {@link refCount} on the result + * of `publish`, please use the updated {@link share} operator which is highly configurable. + * `source.pipe(publish(), refCount())` is equivalent to + * `source.pipe(share({ resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }))`. + */ export function publish(): UnaryFunction, ConnectableObservable>; + +/** + * Returns an observable, that when subscribed to, creates an underlying {@link Subject}, + * provides an observable view of it to a `selector` function, takes the observable result of + * that selector function and subscribes to it, sending its values to the consumer, _then_ connects + * the subject to the original source. + * + * @param selector A function used to setup multicasting prior to automatic connection. + * + * @deprecated To be removed in version 8. Use the new {@link connect} operator. + * If you're using `publish(fn)`, it is equivalent to `connect(fn)`. + */ export function publish>(selector: (shared: Observable) => O): OperatorFunction>; -export function publish(selector: MonoTypeOperatorFunction): MonoTypeOperatorFunction; -/* tslint:enable:max-line-length */ /** * Returns a ConnectableObservable, which is a variety of Observable that waits until its connect method is called @@ -57,7 +79,5 @@ export function publish(selector: MonoTypeOperatorFunction): MonoTypeOpera * @return A ConnectableObservable that upon connection causes the source Observable to emit items to its Observers. */ export function publish(selector?: OperatorFunction): MonoTypeOperatorFunction | OperatorFunction { - return selector ? - multicast(() => new Subject(), selector) : - multicast(new Subject()); + return selector ? connect(selector) : multicast(new Subject()); } diff --git a/src/internal/operators/publishBehavior.ts b/src/internal/operators/publishBehavior.ts index d8b6be7ccf..7aa1dd4a7c 100644 --- a/src/internal/operators/publishBehavior.ts +++ b/src/internal/operators/publishBehavior.ts @@ -1,13 +1,22 @@ import { Observable } from '../Observable'; import { BehaviorSubject } from '../BehaviorSubject'; -import { multicast } from './multicast'; import { ConnectableObservable } from '../observable/ConnectableObservable'; import { UnaryFunction } from '../types'; /** - * @param value + * Creates a {@link ConnectableObservable} that utilizes a {@link BehaviorSubject}. + * + * @param initialValue The initial value passed to the {@link BehaviorSubject}. * @return {ConnectableObservable} + * @deprecated to be removed in version 8. If you want to get a connectable observable that uses a + * {@link BehaviorSubject} under the hood, please use {@link connectable}. `source.pipe(publishBehavior(initValue))` + * is equivalent to: `connectable(source, () => new BehaviorSubject(initValue))`. + * If you're using {@link refCount} after the call to `publishBehavior`, use the {@link share} operator, which is now + * highly configurable. `source.pipe(publishBehavior(initValue), refCount())` is equivalent to: + * `source.pipe(share({ connector: () => new BehaviorSubject(initValue), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }))`. */ -export function publishBehavior(value: T): UnaryFunction, ConnectableObservable> { - return (source: Observable) => multicast(new BehaviorSubject(value))(source) as ConnectableObservable; +export function publishBehavior(initialValue: T): UnaryFunction, ConnectableObservable> { + const subject = new BehaviorSubject(initialValue); + // Note that this has *never* supported the selector function. + return (source) => new ConnectableObservable(source, () => subject); } diff --git a/src/internal/operators/publishLast.ts b/src/internal/operators/publishLast.ts index 0c1c665208..d279e1c888 100644 --- a/src/internal/operators/publishLast.ts +++ b/src/internal/operators/publishLast.ts @@ -1,6 +1,5 @@ import { Observable } from '../Observable'; import { AsyncSubject } from '../AsyncSubject'; -import { multicast } from './multicast'; import { ConnectableObservable } from '../observable/ConnectableObservable'; import { UnaryFunction } from '../types'; @@ -51,14 +50,17 @@ import { UnaryFunction } from '../types'; * // "Sub. B Complete" * ``` * - * @see {@link ConnectableObservable} - * @see {@link publish} - * @see {@link publishReplay} - * @see {@link publishBehavior} - * - * @return {ConnectableObservable} An observable sequence that contains the elements of a + * @return A connectable observable sequence that contains the elements of a * sequence produced by multicasting the source sequence. + * @deprecated To be removed in version 8. If you're trying to create a connectable observable + * with an {@link AsyncSubject} under the hood, please use the new {@link connectable} creation function. + * `source.pipe(publishLast())` is equivalent to `connectable(source, () => new AsyncSubject())`. + * If you're using {@link refCount} on the result of `publishLast`, you can use the updated {@link share} + * operator, which is now highly configurable. `source.pipe(publishLast(), refCount())` + * is equivalent to `source.pipe(share({ connector: () => new AsyncSubject(), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }))`. */ export function publishLast(): UnaryFunction, ConnectableObservable> { - return (source: Observable) => multicast(new AsyncSubject())(source); + const subject = new AsyncSubject(); + // Note that this has *never* supported a selector function like `publish` and `publishReplay`. + return (source) => new ConnectableObservable(source, () => subject); } diff --git a/src/internal/operators/publishReplay.ts b/src/internal/operators/publishReplay.ts index 3233ecd530..6e2aa5c33f 100644 --- a/src/internal/operators/publishReplay.ts +++ b/src/internal/operators/publishReplay.ts @@ -2,32 +2,88 @@ import { Observable } from '../Observable'; import { ReplaySubject } from '../ReplaySubject'; import { multicast } from './multicast'; -import { ConnectableObservable } from '../observable/ConnectableObservable'; -import { UnaryFunction, MonoTypeOperatorFunction, OperatorFunction, SchedulerLike, ObservableInput, ObservedValueOf } from '../types'; +import { MonoTypeOperatorFunction, OperatorFunction, TimestampProvider, ObservableInput, ObservedValueOf } from '../types'; import { isFunction } from '../util/isFunction'; -/* tslint:disable:max-line-length */ -export function publishReplay(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; -export function publishReplay>( +/** + * Creates a {@link ConnectableObservable} that uses a {@link ReplaySubject} + * internally. + * + * @param bufferSize The buffer size for the underlying {@link ReplaySubject}. + * @param windowTime The window time for the underlying {@link ReplaySubject}. + * @param timestampProvider The timestamp provider for the underlying {@link ReplaySubject}. + * @deprecated To be removed in version 8. Use the new {@link connectable} create method to create + * a connectable observable. `source.pipe(publishReplay(size, time, scheduler))` is equivalent to + * `connectable(source, () => new ReplaySubject(size, time, scheduler))`. + * If you're using this with {@link refCount}, then use the new {@link share} operator, + * which is now highly configurable. `publishReplay(size, time, scheduler), refCount()` + * is equivalent to `share({ connector: () => new ReplaySubject(size, time, scheduler) })`. + */ +export function publishReplay( bufferSize?: number, windowTime?: number, - selector?: (shared: Observable) => O, - scheduler?: SchedulerLike + timestampProvider?: TimestampProvider +): MonoTypeOperatorFunction; + +/** + * Creates an observable, that when subscribed to, will create a {@link ReplaySubject}, + * and pass an observable from it (using {@link asObservable}) to the `selector` + * function, which then returns an observable that is subscribed to before "connecting" + * the source to the internal `ReplaySubject`. + * + * Since this is deprecated, for additional details see the documentation for {@link connect}. + * + * @param bufferSize The buffer size for the underlying {@link ReplaySubject}. + * @param windowTime The window time for the underlying {@link ReplaySubject}. + * @param selector A function used to setup the multicast. + * @param timestampProvider The timestamp provider for the underlying {@link ReplaySubject}. + * @deprecated To be removed in version 8. Use the new {@link connect} operator. + * `source.pipe(publishReplay(size, window, fn, scheduler))` is equivalent to + * `const subject = new ReplaySubject(size, window, scheduler), source.pipe(connect(fn, { connector: () => subject }))`. + */ +export function publishReplay>( + bufferSize: number | undefined, + windowTime: number | undefined, + selector: (shared: Observable) => O, + timestampProvider?: TimestampProvider +): OperatorFunction>; + +/** + * Creates a {@link ConnectableObservable} that uses a {@link ReplaySubject} + * internally. + * + * @param bufferSize The buffer size for the underlying {@link ReplaySubject}. + * @param windowTime The window time for the underlying {@link ReplaySubject}. + * @param selector Passing `undefined` here determines that this operator will return a {@link ConnectableObservable}. + * @param timestampProvider The timestamp provider for the underlying {@link ReplaySubject}. + * @deprecated To be removed in version 8. Use the new {@link connectable} create method to create + * a connectable observable. `source.pipe(publishReplay(size, time, scheduler))` is equivalent to + * `connectable(source, () => new ReplaySubject(size, time, scheduler))`. + * If you're using this with {@link refCount}, then use the new {@link share} operator, + * which is now highly configurable. `publishReplay(size, time, scheduler), refCount()` + * is equivalent to `share({ connector: () => new ReplaySubject(size, time, scheduler) })`. + */ +export function publishReplay>( + bufferSize: number | undefined, + windowTime: number | undefined, + selector: undefined, + timestampProvider: TimestampProvider ): OperatorFunction>; -/* tslint:enable:max-line-length */ export function publishReplay( bufferSize?: number, windowTime?: number, - selectorOrScheduler?: SchedulerLike | OperatorFunction, - scheduler?: SchedulerLike -): UnaryFunction, ConnectableObservable> { + selectorOrScheduler?: TimestampProvider | OperatorFunction, + timestampProvider?: TimestampProvider +) { if (selectorOrScheduler && !isFunction(selectorOrScheduler)) { - scheduler = selectorOrScheduler; + timestampProvider = selectorOrScheduler; } const selector = isFunction(selectorOrScheduler) ? selectorOrScheduler : undefined; - const subject = new ReplaySubject(bufferSize, windowTime, scheduler); + const subject = new ReplaySubject(bufferSize, windowTime, timestampProvider); - return (source: Observable) => multicast(() => subject, selector!)(source) as ConnectableObservable; + // Note, we're passing `selector!` here, because at runtime, `undefined` is an acceptable argument + // but it makes our TypeScript signature for `multicast` unhappy (as it should, because it's gross). + return (source: Observable) => multicast(subject, selector!)(source); } diff --git a/src/internal/operators/refCount.ts b/src/internal/operators/refCount.ts index 292cd45aa3..fe7e616139 100644 --- a/src/internal/operators/refCount.ts +++ b/src/internal/operators/refCount.ts @@ -1,5 +1,5 @@ -import { ConnectableObservable } from '../observable/ConnectableObservable'; /** @prettier */ +import { ConnectableObservable } from '../observable/ConnectableObservable'; import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction } from '../types'; import { operate } from '../util/lift'; @@ -15,7 +15,7 @@ import { OperatorSubscriber } from './OperatorSubscriber'; * refCount has only a single subscription independently of the number of subscribers to the target * observable. * - * Note that using the {@link share} operator is exactly the same as using the `multicast(() => new Subject())` operator + * Note that using the {@link share} operator is exactly the same as using the `multicast(() => new Subject())` operator * (making the observable hot) and the *refCount* operator in a sequence. * * ![](refCount.png) @@ -54,9 +54,11 @@ import { OperatorSubscriber } from './OperatorSubscriber'; * // Nothing happens until you call .connect() on the observable. * ``` * - * @see {@link ConnectableObservable} - * @see {@link share} - * @see {@link publish} + * @deprecated to be removed in version 8. Use the updated {@link share} operator, + * which now is highly configurable. How `share` is used will depend on the connectable + * observable you created just prior to the `refCount` operator. For examples on how + * to replace this, see documentation in {@link multicast}, {@link publish}, {@link publishReplay}, + * {@link publishBehavior}, {@link publishLast} or {@link ConnectableObservable}. */ export function refCount(): MonoTypeOperatorFunction { return operate((source, subscriber) => { diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index baa412a60c..23711b11b1 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -1,14 +1,50 @@ -import { Observable } from '../Observable'; -import { multicast } from './multicast'; -import { refCount } from './refCount'; +/** @prettier */ + import { Subject } from '../Subject'; -import { MonoTypeOperatorFunction } from '../types'; +import { MonoTypeOperatorFunction, OperatorFunction, SubjectLike } from '../types'; +import { Subscription } from '../Subscription'; +import { from } from '../observable/from'; +import { operate } from '../util/lift'; -function shareSubjectFactory() { - return new Subject(); +export interface ShareConfig { + /** + * The factory used to create the subject that will connect the source observable to + * multicast consumers. + */ + connector?: () => SubjectLike; + /** + * If true, the resulting observable will reset internal state on error from source and return to a "cold" state. This + * allows the resulting observable to be "retried" in the event of an error. + * If false, when an error comes from the source it will push the error into the connecting subject, and the subject + * will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent retries + * or resubscriptions will resubscribe to that same subject. In all cases, RxJS subjects will emit the same error again, however + * {@link ReplaySubject} will also push its buffered values before pushing the error. + */ + resetOnError?: boolean; + /** + * If true, the resulting observable will reset internal state on completion from source and return to a "cold" state. This + * allows the resulting observable to be "repeated" after it is done. + * If false, when the source completes, it will push the completion through the connecting subject, and the subject + * will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent repeats + * or resubscriptions will resubscribe to that same subject. + */ + resetOnComplete?: boolean; + /** + * If true, when the number of subscribers to the resulting observable reaches zero due to those subscribers unsubscribing, the + * internal state will be reset and the resulting observable will return to a "cold" state. This means that the next + * time the resulting observable is subscribed to, a new subject will be created and the source will be subscribed to + * again. + * If false, when the number of subscribers to the resulting observable reaches zero due to unsubscription, the subject + * will remain connected to the source, and new subscriptions to the result will be connected through that same subject. + */ + resetOnRefCountZero?: boolean; } +export function share(): MonoTypeOperatorFunction; + +export function share(options: ShareConfig): MonoTypeOperatorFunction; + /** * Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one * Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will @@ -19,7 +55,7 @@ function shareSubjectFactory() { * * ## Example * Generate new multicast Observable from the source Observable value - * ```typescript + * ```ts * import { interval } from 'rxjs'; * import { share, map } from 'rxjs/operators'; * @@ -50,12 +86,60 @@ function shareSubjectFactory() { * // subscription 1: 9 * // ... and so on * ``` - * - * @see {@link api/index/function/interval} - * @see {@link map} - * - * @return {Observable} An Observable that upon connection causes the source Observable to emit items to its Observers. */ -export function share(): MonoTypeOperatorFunction { - return (source: Observable) => refCount()(multicast(shareSubjectFactory)(source)) as Observable; +export function share(options?: ShareConfig): OperatorFunction { + options = options || {}; + const { connector = () => new Subject(), resetOnComplete = true, resetOnError = true, resetOnRefCountZero = true } = options; + + let connection: Subscription | null = null; + let subject: SubjectLike | null = null; + let refCount = 0; + let hasCompleted = false; + let hasErrored = false; + + const reset = () => { + connection = subject = null; + hasCompleted = hasErrored = false; + }; + + return operate((source, subscriber) => { + refCount++; + if (!subject) { + subject = connector!(); + } + + const castSubscription = subject.subscribe(subscriber); + + if (!connection) { + connection = from(source).subscribe({ + next: (value) => subject!.next(value), + error: (err) => { + hasErrored = true; + const dest = subject!; + if (resetOnError) { + reset(); + } + dest.error(err); + }, + complete: () => { + hasCompleted = true; + const dest = subject!; + if (resetOnComplete) { + reset(); + } + dest.complete(); + }, + }); + } + + return () => { + refCount--; + castSubscription.unsubscribe(); + if (!refCount && resetOnRefCountZero && !hasErrored && !hasCompleted) { + const conn = connection; + reset(); + conn?.unsubscribe(); + } + }; + }); } diff --git a/src/internal/operators/shareReplay.ts b/src/internal/operators/shareReplay.ts index db941567ee..66178b91d9 100644 --- a/src/internal/operators/shareReplay.ts +++ b/src/internal/operators/shareReplay.ts @@ -1,9 +1,6 @@ -import { Observable } from '../Observable'; import { ReplaySubject } from '../ReplaySubject'; -import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, SchedulerLike } from '../types'; -import { Subscriber } from '../Subscriber'; -import { operate } from '../util/lift'; +import { share } from './share'; export interface ShareReplayConfig { bufferSize?: number; @@ -126,67 +123,20 @@ export function shareReplay( windowTime?: number, scheduler?: SchedulerLike ): MonoTypeOperatorFunction { - let config: ShareReplayConfig; + let bufferSize: number; + let refCount = false; if (configOrBufferSize && typeof configOrBufferSize === 'object') { - config = configOrBufferSize as ShareReplayConfig; + bufferSize = configOrBufferSize.bufferSize ?? Infinity; + windowTime = configOrBufferSize.windowTime ?? Infinity; + refCount = !!configOrBufferSize.refCount; + scheduler = configOrBufferSize.scheduler; } else { - config = { - bufferSize: configOrBufferSize as number | undefined, - windowTime, - refCount: false, - scheduler - }; + bufferSize = configOrBufferSize ?? Infinity; } - return operate(shareReplayOperator(config)); -} - -function shareReplayOperator({ - bufferSize = Infinity, - windowTime = Infinity, - refCount: useRefCount, - scheduler -}: ShareReplayConfig) { - let subject: ReplaySubject | undefined; - let refCount = 0; - let subscription: Subscription | undefined; - - return (source: Observable, subscriber: Subscriber) => { - refCount++; - let innerSub: Subscription; - if (!subject) { - subject = new ReplaySubject(bufferSize, windowTime, scheduler); - innerSub = subject.subscribe(subscriber); - subscription = source.subscribe({ - next(value) { subject!.next(value); }, - error(err) { - const dest = subject; - subscription = undefined; - subject = undefined; - dest!.error(err); - }, - complete() { - subscription = undefined; - subject!.complete(); - }, - }); - // The following condition is needed because source can complete synchronously - // upon subscription. When that happens `subscription` is first set to `undefined` - // and right after is set to the "closed subscription" returned by `subscribe` - if (subscription.closed) { - subscription = undefined; - } - } else { - innerSub = subject.subscribe(subscriber); - } - - subscriber.add(() => { - refCount--; - innerSub.unsubscribe(); - if (useRefCount && refCount === 0 && subscription) { - subscription.unsubscribe(); - subscription = undefined; - subject = undefined; - } - }); - }; + return share({ + connector: () => new ReplaySubject(bufferSize, windowTime, scheduler), + resetOnError: true, + resetOnComplete: false, + resetOnRefCountZero: refCount + }); } diff --git a/src/internal/testing/TestScheduler.ts b/src/internal/testing/TestScheduler.ts index 254d36f051..d8d1de9515 100644 --- a/src/internal/testing/TestScheduler.ts +++ b/src/internal/testing/TestScheduler.ts @@ -1,3 +1,4 @@ +/** @prettier */ import { Observable } from '../Observable'; import { ColdObservable } from './ColdObservable'; import { HotObservable } from './HotObservable'; @@ -114,42 +115,44 @@ export class TestScheduler extends VirtualTimeScheduler { return subject; } - private materializeInnerObservable(observable: Observable, - outerFrame: number): TestMessage[] { + private materializeInnerObservable(observable: Observable, outerFrame: number): TestMessage[] { const messages: TestMessage[] = []; - observable.subscribe((value) => { - messages.push({ frame: this.frame - outerFrame, notification: nextNotification(value) }); - }, (error) => { - messages.push({ frame: this.frame - outerFrame, notification: errorNotification(error) }); - }, () => { - messages.push({ frame: this.frame - outerFrame, notification: COMPLETE_NOTIFICATION }); - }); + observable.subscribe( + (value) => { + messages.push({ frame: this.frame - outerFrame, notification: nextNotification(value) }); + }, + (error) => { + messages.push({ frame: this.frame - outerFrame, notification: errorNotification(error) }); + }, + () => { + messages.push({ frame: this.frame - outerFrame, notification: COMPLETE_NOTIFICATION }); + } + ); return messages; } - expectObservable(observable: Observable, - subscriptionMarbles: string | null = null): ({ toBe: observableToBeFn }) { + expectObservable(observable: Observable, subscriptionMarbles: string | null = null) { const actual: TestMessage[] = []; const flushTest: FlushableTest = { actual, ready: false }; const subscriptionParsed = TestScheduler.parseMarblesAsSubscriptions(subscriptionMarbles, this.runMode); - const subscriptionFrame = subscriptionParsed.subscribedFrame === Infinity ? - 0 : subscriptionParsed.subscribedFrame; + const subscriptionFrame = subscriptionParsed.subscribedFrame === Infinity ? 0 : subscriptionParsed.subscribedFrame; const unsubscriptionFrame = subscriptionParsed.unsubscribedFrame; let subscription: Subscription; this.schedule(() => { - subscription = observable.subscribe(x => { - let value = x; - // Support Observable-of-Observables - if (x instanceof Observable) { - value = this.materializeInnerObservable(value, this.frame); + subscription = observable.subscribe( + (x) => { + // Support Observable-of-Observables + const value = x instanceof Observable ? this.materializeInnerObservable(x, this.frame) : x; + actual.push({ frame: this.frame, notification: nextNotification(value) }); + }, + (error) => { + actual.push({ frame: this.frame, notification: errorNotification(error) }); + }, + () => { + actual.push({ frame: this.frame, notification: COMPLETE_NOTIFICATION }); } - actual.push({ frame: this.frame, notification: nextNotification(value) }); - }, (error) => { - actual.push({ frame: this.frame, notification: errorNotification(error) }); - }, () => { - actual.push({ frame: this.frame, notification: COMPLETE_NOTIFICATION }); - }); + ); }, subscriptionFrame); if (unsubscriptionFrame !== Infinity) { @@ -163,22 +166,41 @@ export class TestScheduler extends VirtualTimeScheduler { toBe(marbles: string, values?: any, errorValue?: any) { flushTest.ready = true; flushTest.expected = TestScheduler.parseMarbles(marbles, values, errorValue, true, runMode); - } + }, + toEqual: (other: Observable) => { + flushTest.ready = true; + flushTest.expected = []; + this.schedule(() => { + subscription = other.subscribe( + (x) => { + // Support Observable-of-Observables + const value = x instanceof Observable ? this.materializeInnerObservable(x, this.frame) : x; + flushTest.expected!.push({ frame: this.frame, notification: nextNotification(value) }); + }, + (error) => { + flushTest.expected!.push({ frame: this.frame, notification: errorNotification(error) }); + }, + () => { + flushTest.expected!.push({ frame: this.frame, notification: COMPLETE_NOTIFICATION }); + } + ); + }, subscriptionFrame); + }, }; } - expectSubscriptions(actualSubscriptionLogs: SubscriptionLog[]): ({ toBe: subscriptionLogsToBeFn }) { + expectSubscriptions(actualSubscriptionLogs: SubscriptionLog[]): { toBe: subscriptionLogsToBeFn } { const flushTest: FlushableTest = { actual: actualSubscriptionLogs, ready: false }; this.flushTests.push(flushTest); const { runMode } = this; return { toBe(marblesOrMarblesArray: string | string[]) { - const marblesArray: string[] = (typeof marblesOrMarblesArray === 'string') ? [marblesOrMarblesArray] : marblesOrMarblesArray; + const marblesArray: string[] = typeof marblesOrMarblesArray === 'string' ? [marblesOrMarblesArray] : marblesOrMarblesArray; flushTest.ready = true; - flushTest.expected = marblesArray.map(marbles => - TestScheduler.parseMarblesAsSubscriptions(marbles, runMode) - ).filter(marbles => marbles.subscribedFrame !== Infinity); - } + flushTest.expected = marblesArray + .map((marbles) => TestScheduler.parseMarblesAsSubscriptions(marbles, runMode)) + .filter((marbles) => marbles.subscribedFrame !== Infinity); + }, }; } @@ -190,7 +212,7 @@ export class TestScheduler extends VirtualTimeScheduler { super.flush(); - this.flushTests = this.flushTests.filter(test => { + this.flushTests = this.flushTests.filter((test) => { if (test.ready) { this.assertDeepEqual(test.actual, test.expected); return false; @@ -236,16 +258,14 @@ export class TestScheduler extends VirtualTimeScheduler { break; case '^': if (subscriptionFrame !== Infinity) { - throw new Error('found a second subscription point \'^\' in a ' + - 'subscription marble diagram. There can only be one.'); + throw new Error("found a second subscription point '^' in a " + 'subscription marble diagram. There can only be one.'); } subscriptionFrame = groupStart > -1 ? groupStart : frame; advanceFrameBy(1); break; case '!': if (unsubscriptionFrame !== Infinity) { - throw new Error('found a second unsubscription point \'!\' in a ' + - 'subscription marble diagram. There can only be one.'); + throw new Error("found a second unsubscription point '!' in a " + 'subscription marble diagram. There can only be one.'); } unsubscriptionFrame = groupStart > -1 ? groupStart : frame; break; @@ -283,8 +303,7 @@ export class TestScheduler extends VirtualTimeScheduler { } } - throw new Error('there can only be \'^\' and \'!\' markers in a ' + - 'subscription marble diagram. Found instead \'' + c + '\'.'); + throw new Error("there can only be '^' and '!' markers in a " + "subscription marble diagram. Found instead '" + c + "'."); } frame = nextFrame; @@ -298,28 +317,30 @@ export class TestScheduler extends VirtualTimeScheduler { } /** @nocollapse */ - static parseMarbles(marbles: string, - values?: any, - errorValue?: any, - materializeInnerObservables: boolean = false, - runMode = false): TestMessage[] { + static parseMarbles( + marbles: string, + values?: any, + errorValue?: any, + materializeInnerObservables: boolean = false, + runMode = false + ): TestMessage[] { if (marbles.indexOf('!') !== -1) { - throw new Error('conventional marble diagrams cannot have the ' + - 'unsubscription marker "!"'); + throw new Error('conventional marble diagrams cannot have the ' + 'unsubscription marker "!"'); } const len = marbles.length; const testMessages: TestMessage[] = []; const subIndex = runMode ? marbles.replace(/^[ ]+/, '').indexOf('^') : marbles.indexOf('^'); - let frame = subIndex === -1 ? 0 : (subIndex * -this.frameTimeFactor); - const getValue = typeof values !== 'object' ? - (x: any) => x : - (x: any) => { - // Support Observable-of-Observables - if (materializeInnerObservables && values[x] instanceof ColdObservable) { - return values[x].messages; - } - return values[x]; - }; + let frame = subIndex === -1 ? 0 : subIndex * -this.frameTimeFactor; + const getValue = + typeof values !== 'object' + ? (x: any) => x + : (x: any) => { + // Support Observable-of-Observables + if (materializeInnerObservables && values[x] instanceof ColdObservable) { + return values[x].messages; + } + return values[x]; + }; let groupStart = -1; for (let i = 0; i < len; i++) { @@ -427,7 +448,7 @@ export class TestScheduler extends VirtualTimeScheduler { const delegate = { requestAnimationFrame(callback: FrameRequestCallback) { if (!map) { - throw new Error("animate() was not called within run()"); + throw new Error('animate() was not called within run()'); } const handle = ++lastHandle; map.set(handle, callback); @@ -435,10 +456,10 @@ export class TestScheduler extends VirtualTimeScheduler { }, cancelAnimationFrame(handle: number) { if (!map) { - throw new Error("animate() was not called within run()"); + throw new Error('animate() was not called within run()'); } map.delete(handle); - } + }, }; const animate = (marbles: string) => { @@ -446,7 +467,7 @@ export class TestScheduler extends VirtualTimeScheduler { throw new Error('animate() must not be called more than once within run()'); } if (/[|#]/.test(marbles)) { - throw new Error('animate() must not complete or error') + throw new Error('animate() must not complete or error'); } map = new Map(); const messages = TestScheduler.parseMarbles(marbles, undefined, undefined, undefined, true); @@ -483,14 +504,17 @@ export class TestScheduler extends VirtualTimeScheduler { // animate run helper. let lastHandle = 0; - const scheduleLookup = new Map void; - subscription: Subscription; - type: 'immediate' | 'interval' | 'timeout'; - }>(); + const scheduleLookup = new Map< + number, + { + due: number; + duration: number; + handle: number; + handler: () => void; + subscription: Subscription; + type: 'immediate' | 'interval' | 'timeout'; + } + >(); const run = () => { // Whenever a scheduled run is executed, it must run a single immediate @@ -559,7 +583,7 @@ export class TestScheduler extends VirtualTimeScheduler { value.subscription.unsubscribe(); scheduleLookup.delete(handle); } - } + }, }; const interval = { @@ -581,7 +605,7 @@ export class TestScheduler extends VirtualTimeScheduler { value.subscription.unsubscribe(); scheduleLookup.delete(handle); } - } + }, }; const timeout = { @@ -603,7 +627,7 @@ export class TestScheduler extends VirtualTimeScheduler { value.subscription.unsubscribe(); scheduleLookup.delete(handle); } - } + }, }; return { immediate, interval, timeout }; diff --git a/src/internal/types.ts b/src/internal/types.ts index 810c72dcfe..42b651ad77 100644 --- a/src/internal/types.ts +++ b/src/internal/types.ts @@ -87,14 +87,7 @@ export type SubscribableOrPromise = Subscribable | Subscribable | P /** OBSERVABLE INTERFACES */ export interface Subscribable { - subscribe(observer?: PartialObserver): Unsubscribable; - /** @deprecated Use an observer instead of a complete callback */ - subscribe(next: null | undefined, error: null | undefined, complete: () => void): Unsubscribable; - /** @deprecated Use an observer instead of an error callback */ - subscribe(next: null | undefined, error: (error: any) => void, complete?: () => void): Unsubscribable; - /** @deprecated Use an observer instead of a complete callback */ - subscribe(next: (value: T) => void, error: null | undefined, complete: () => void): Unsubscribable; - subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Unsubscribable; + subscribe(observer: Partial>): Unsubscribable; } /** @@ -174,12 +167,13 @@ export interface CompletionObserver { export type PartialObserver = NextObserver | ErrorObserver | CompletionObserver; export interface Observer { - closed?: boolean; next: (value: T) => void; error: (err: any) => void; complete: () => void; } +export interface SubjectLike extends Observer, Subscribable {} + /** SCHEDULER INTERFACES */ export interface SchedulerLike extends TimestampProvider { diff --git a/src/operators/index.ts b/src/operators/index.ts index d25206ee8b..70a2812fbf 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -14,6 +14,7 @@ export { concatAll } from '../internal/operators/concatAll'; export { concatMap } from '../internal/operators/concatMap'; export { concatMapTo } from '../internal/operators/concatMapTo'; export { concat, concatWith } from '../internal/operators/concatWith'; +export { connect } from '../internal/operators/connect'; export { count } from '../internal/operators/count'; export { debounce } from '../internal/operators/debounce'; export { debounceTime } from '../internal/operators/debounceTime';