Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reworking Multicasting: share, connect, and makeConnectable #5634

Merged
merged 8 commits into from
Dec 14, 2020
11 changes: 5 additions & 6 deletions api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ export declare const config: {
useDeprecatedNextContext: boolean;
};

export declare function connectable<T>(source: ObservableInput<T>, connector?: Subject<T>): ConnectableObservableLike<T>;

export declare class ConnectableObservable<T> extends Observable<T> {
protected _connection: Subscription | null;
protected _refCount: number;
Expand Down Expand Up @@ -330,7 +332,6 @@ export declare type ObservedValueTupleFromArray<X> = {
export declare type ObservedValueUnionFromArray<X> = X extends Array<ObservableInput<infer T>> ? T : never;

export interface Observer<T> {
closed?: boolean;
complete: () => void;
error: (err: any) => void;
next: (value: T) => void;
Expand Down Expand Up @@ -438,12 +439,10 @@ export declare class Subject<T> extends Observable<T> implements SubscriptionLik
static create: (...args: any[]) => any;
}

export declare type SubjectLike<T> = Observer<T> & Subscribable<T>;

export interface Subscribable<T> {
subscribe(observer?: PartialObserver<T>): Unsubscribable;
subscribe(next: null | undefined, error: null | undefined, complete: () => void): Unsubscribable;
subscribe(next: null | undefined, error: (error: any) => void, complete?: () => void): Unsubscribable;
subscribe(next: (value: T) => void, error: null | undefined, complete: () => void): Unsubscribable;
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Unsubscribable;
subscribe(observer: Observer<T>): Unsubscribable;
}

export declare type SubscribableOrPromise<T> = Subscribable<T> | Subscribable<never> | PromiseLike<T> | InteropObservable<T>;
Expand Down
20 changes: 13 additions & 7 deletions api_guard/dist/types/operators/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ export declare function concatMapTo<T, R, O extends ObservableInput<any>>(observ

export declare function concatWith<T, A extends readonly unknown[]>(...otherSources: [...ObservableInputTuple<A>]): OperatorFunction<T, T | A[number]>;

export declare function connect<T, R>({ connector, setup, }: {
connector?: () => SubjectLike<T>;
setup: (shared: Observable<T>) => ObservableInput<R>;
}): OperatorFunction<T, R>;

export declare function count<T>(predicate?: (value: T, index: number) => boolean): OperatorFunction<T, number>;

export declare function debounce<T>(durationSelector: (value: T) => ObservableInput<any>): MonoTypeOperatorFunction<T>;
Expand Down Expand Up @@ -182,9 +187,9 @@ export declare function mergeWith<T, A extends readonly unknown[]>(...otherSourc
export declare function min<T>(comparer?: (x: T, y: T) => number): MonoTypeOperatorFunction<T>;

export declare function multicast<T>(subject: Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
export declare function multicast<T, O extends ObservableInput<any>>(subject: Subject<T>, selector: (shared: Observable<T>) => O): UnaryFunction<Observable<T>, ConnectableObservable<ObservedValueOf<O>>>;
export declare function multicast<T>(subjectFactory: (this: Observable<T>) => Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
export declare function multicast<T, O extends ObservableInput<any>>(SubjectFactory: (this: Observable<T>) => Subject<T>, selector: (shared: Observable<T>) => O): OperatorFunction<T, ObservedValueOf<O>>;
export declare function multicast<T, O extends ObservableInput<any>>(subject: Subject<T>, selector: (shared: Observable<T>) => O): OperatorFunction<T, ObservedValueOf<O>>;
export declare function multicast<T>(subjectFactory: () => Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
export declare function multicast<T, O extends ObservableInput<any>>(subjectFactory: () => Subject<T>, selector: (shared: Observable<T>) => O): OperatorFunction<T, ObservedValueOf<O>>;

export declare function observeOn<T>(scheduler: SchedulerLike, delay?: number): MonoTypeOperatorFunction<T>;

Expand All @@ -206,14 +211,14 @@ export declare function pluck<T>(...properties: string[]): OperatorFunction<T, u

export declare function publish<T>(): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
export declare function publish<T, O extends ObservableInput<any>>(selector: (shared: Observable<T>) => O): OperatorFunction<T, ObservedValueOf<O>>;
export declare function publish<T>(selector: MonoTypeOperatorFunction<T>): MonoTypeOperatorFunction<T>;

export declare function publishBehavior<T>(value: T): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
export declare function publishBehavior<T>(initialValue: T): UnaryFunction<Observable<T>, ConnectableObservable<T>>;

export declare function publishLast<T>(): UnaryFunction<Observable<T>, ConnectableObservable<T>>;

export declare function publishReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
export declare function publishReplay<T, O extends ObservableInput<any>>(bufferSize?: number, windowTime?: number, selector?: (shared: Observable<T>) => O, scheduler?: SchedulerLike): OperatorFunction<T, ObservedValueOf<O>>;
export declare function publishReplay<T>(bufferSize?: number, windowTime?: number, timestampProvider?: TimestampProvider): MonoTypeOperatorFunction<T>;
export declare function publishReplay<T, O extends ObservableInput<any>>(bufferSize: number | undefined, windowTime: number | undefined, selector: (shared: Observable<T>) => O, timestampProvider?: TimestampProvider): OperatorFunction<T, ObservedValueOf<O>>;
export declare function publishReplay<T, O extends ObservableInput<any>>(bufferSize: number | undefined, windowTime: number | undefined, selector: undefined, timestampProvider: TimestampProvider): OperatorFunction<T, ObservedValueOf<O>>;

export declare function race<T>(observables: Array<Observable<T>>): MonoTypeOperatorFunction<T>;
export declare function race<T, R>(observables: Array<Observable<T>>): OperatorFunction<T, R>;
Expand Down Expand Up @@ -248,6 +253,7 @@ export declare function scan<V, A, S>(accumulator: (acc: A | S, value: V, index:
export declare function sequenceEqual<T>(compareTo: Observable<T>, comparator?: (a: T, b: T) => boolean): OperatorFunction<T, boolean>;

export declare function share<T>(): MonoTypeOperatorFunction<T>;
export declare function share<T, R = T>(options: ShareOptions<T, R>): OperatorFunction<T, R>;

export declare function shareReplay<T>(config: ShareReplayConfig): MonoTypeOperatorFunction<T>;
export declare function shareReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
Expand Down
11 changes: 6 additions & 5 deletions api_guard/dist/types/testing/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ export declare class TestScheduler extends VirtualTimeScheduler {
[marble: string]: T;
}, error?: any): HotObservable<T>;
createTime(marbles: string): number;
expectObservable(observable: Observable<any>, subscriptionMarbles?: string | null): ({
toBe: observableToBeFn;
});
expectSubscriptions(actualSubscriptionLogs: SubscriptionLog[]): ({
expectObservable<T>(observable: Observable<T>, subscriptionMarbles?: string | null): {
toBe(marbles: string, values?: any, errorValue?: any): void;
toEqual: (other: Observable<T>) => void;
};
expectSubscriptions(actualSubscriptionLogs: SubscriptionLog[]): {
toBe: subscriptionLogsToBeFn;
});
};
flush(): void;
run<T>(callback: (helpers: RunHelpers) => T): T;
static frameTimeFactor: number;
Expand Down
148 changes: 148 additions & 0 deletions spec/deprecation-equivalents/multicasting-deprecations-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/** @prettier */
import { Observable, ConnectableObservable, connectable, of, AsyncSubject, BehaviorSubject, ReplaySubject, Subject, merge } from 'rxjs';
import { connect, share, multicast, publish, publishReplay, publishBehavior, publishLast, refCount, repeat, retry } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

describe('multicasting equivalent tests', () => {
let rxTest: TestScheduler;

beforeEach(() => {
rxTest = new TestScheduler(observableMatcher);
});

testEquivalents(
'multicast(() => new Subject()), refCount() and share()',
(source) =>
source.pipe(
multicast(() => new Subject<string>()),
refCount()
),
(source) => source.pipe(share())
);

testEquivalents(
'multicast(new Subject()), refCount() and share({ resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })',
(source) => source.pipe(multicast(new Subject()), refCount()),
(source) => source.pipe(share({ resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }))
);

testEquivalents(
'publish(), refCount() and share({ resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })',
(source) => source.pipe(publish(), refCount()),
(source) => source.pipe(share({ resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }))
);

testEquivalents(
'publishLast(), refCount() and share({ connector: () => new AsyncSubject(), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })',
(source) => source.pipe(publishLast(), refCount()),
(source) =>
source.pipe(share({ connector: () => new AsyncSubject(), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }))
);

testEquivalents(
'publishBehavior("X"), refCount() and share({ connector: () => new BehaviorSubject("X"), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })',
(source) => source.pipe(publishBehavior('X'), refCount()),
(source) =>
source.pipe(
share({ connector: () => new BehaviorSubject('X'), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })
)
);

testEquivalents(
'publishReplay(3, 10), refCount() and share({ connector: () => new ReplaySubject(3, 10), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })',
(source) => source.pipe(publishReplay(3, 10), refCount()),
(source) =>
source.pipe(
share({ connector: () => new ReplaySubject(3, 10), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })
)
);

const fn = (source: Observable<any>) => merge(source, source);

testEquivalents(
'publish(fn) and connect({ setup: fn })',
(source) => source.pipe(publish(fn)),
(source) =>
source.pipe(
connect({
setup: fn,
})
)
);

testEquivalents(
'publishReplay(3, 10, fn) and `subject = new ReplaySubject(3, 10), connect({ connector: () => subject , setup: fn })`',
(source) => source.pipe(publishReplay(3, 10, fn)),
(source) => {
const subject = new ReplaySubject(3, 10);
return source.pipe(connect({ connector: () => subject, setup: fn }));
}
);

/**
* Used to test a variety of scenarios with multicast operators that should be equivalent.
* @param name The name to add to the test output
* @param oldExpression The old expression we're saying matches the updated expression
* @param updatedExpression The updated expression we're telling people to use instead.
*/
function testEquivalents(
name: string,
oldExpression: (source: Observable<string>) => Observable<string>,
updatedExpression: (source: Observable<string>) => Observable<string>
) {
it(`should be equivalent for ${name} for async sources`, () => {
rxTest.run(({ cold, expectObservable }) => {
const source = cold('----a---b---c----d---e----|');
const old = oldExpression(source);
const updated = updatedExpression(source);
expectObservable(updated).toEqual(old);
});
});

it(`should be equivalent for ${name} for async sources that repeat`, () => {
rxTest.run(({ cold, expectObservable }) => {
const source = cold('----a---b---c----d---e----|');
const old = oldExpression(source).pipe(repeat(3));
const updated = updatedExpression(source).pipe(repeat(3));
expectObservable(updated).toEqual(old);
});
});

it(`should be equivalent for ${name} for async sources that retry`, () => {
rxTest.run(({ cold, expectObservable }) => {
const source = cold('----a---b---c----d---e----#');
const old = oldExpression(source).pipe(retry(3));
const updated = updatedExpression(source).pipe(retry(3));
expectObservable(updated).toEqual(old);
});
});

it(`should be equivalent for ${name} for async sources`, () => {
rxTest.run(({ expectObservable }) => {
const source = of('a', 'b', 'c');
const old = oldExpression(source);
const updated = updatedExpression(source);
expectObservable(updated).toEqual(old);
});
});

it(`should be equivalent for ${name} for async sources that repeat`, () => {
rxTest.run(({ expectObservable }) => {
const source = of('a', 'b', 'c');
const old = oldExpression(source).pipe(repeat(3));
const updated = updatedExpression(source).pipe(repeat(3));
expectObservable(updated).toEqual(old);
});
});

it(`should be equivalent for ${name} for async sources that retry`, () => {
rxTest.run(({ expectObservable }) => {
const source = of('a', 'b', 'c');
const old = oldExpression(source).pipe(retry(3));
const updated = updatedExpression(source).pipe(retry(3));
expectObservable(updated).toEqual(old);
});
});
}
});
50 changes: 50 additions & 0 deletions spec/operators/connect-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/** @prettier */
import { BehaviorSubject, merge } from 'rxjs';
import { connect, delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

describe('connect', () => {
let rxTest: TestScheduler;

beforeEach(() => {
rxTest = new TestScheduler(observableMatcher);
});

it('should connect a source through a setup function', () => {
rxTest.run(({ cold, time, expectObservable }) => {
const source = cold('---a----b-----c---|');
const d = time(' ---|');
const expected = ' ---a--a-b--b--c--c|';

const result = source.pipe(
connect({
setup: (shared) => {
return merge(shared.pipe(delay(d)), shared);
},
})
);

expectObservable(result).toBe(expected);
});
});

it('should connect a source through a setup function and use the provided connector', () => {
rxTest.run(({ cold, time, expectObservable }) => {
const source = cold('--------a---------b---------c-----|');
const d = time(' ---|');
const expected = ' S--S----a--a------b--b------c--c--|';

const result = source.pipe(
connect({
connector: () => new BehaviorSubject('S'),
setup: (shared) => {
return merge(shared.pipe(delay(d)), shared);
},
})
);

expectObservable(result).toBe(expected);
});
});
});
2 changes: 1 addition & 1 deletion spec/operators/publishReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ describe('publishReplay operator', () => {
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should emit an error when the selector throws an exception', () => {
it('should EMIT an error when the selector throws an exception', () => {
const error = "It's broken";
const selector = () => {
throw error;
Expand Down
Loading