Skip to content

Commit

Permalink
refactor(merge): support N-args with concurrency and scheduler (#5877)
Browse files Browse the repository at this point in the history
* refactor(merge): N-args with scheduler

Using ObservableInputTuple, etc.

* chore: update api_guardian
  • Loading branch information
cartant authored Nov 17, 2020
1 parent 064f738 commit 646d022
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 182 deletions.
32 changes: 4 additions & 28 deletions api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,34 +212,10 @@ export declare function isObservable<T>(obj: any): obj is Observable<T>;

export declare function lastValueFrom<T>(source: Observable<T>): Promise<T>;

export declare function merge<T>(v1: ObservableInput<T>, scheduler: SchedulerLike): Observable<T>;
export declare function merge<T>(v1: ObservableInput<T>, concurrent: number, scheduler: SchedulerLike): Observable<T>;
export declare function merge<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, scheduler: SchedulerLike): Observable<T | T2>;
export declare function merge<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, concurrent: number, scheduler: SchedulerLike): Observable<T | T2>;
export declare function merge<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, scheduler: SchedulerLike): Observable<T | T2 | T3>;
export declare function merge<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, concurrent: number, scheduler: SchedulerLike): Observable<T | T2 | T3>;
export declare function merge<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, scheduler: SchedulerLike): Observable<T | T2 | T3 | T4>;
export declare function merge<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, concurrent: number, scheduler: SchedulerLike): Observable<T | T2 | T3 | T4>;
export declare function merge<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, scheduler: SchedulerLike): Observable<T | T2 | T3 | T4 | T5>;
export declare function merge<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, concurrent: number, scheduler: SchedulerLike): Observable<T | T2 | T3 | T4 | T5>;
export declare function merge<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, scheduler: SchedulerLike): Observable<T | T2 | T3 | T4 | T5 | T6>;
export declare function merge<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, concurrent: number, scheduler: SchedulerLike): Observable<T | T2 | T3 | T4 | T5 | T6>;
export declare function merge<T>(v1: ObservableInput<T>): Observable<T>;
export declare function merge<T>(v1: ObservableInput<T>, concurrent: number): Observable<T>;
export declare function merge<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>): Observable<T | T2>;
export declare function merge<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, concurrent: number): Observable<T | T2>;
export declare function merge<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<T | T2 | T3>;
export declare function merge<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, concurrent: number): Observable<T | T2 | T3>;
export declare function merge<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<T | T2 | T3 | T4>;
export declare function merge<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, concurrent: number): Observable<T | T2 | T3 | T4>;
export declare function merge<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<T | T2 | T3 | T4 | T5>;
export declare function merge<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, concurrent: number): Observable<T | T2 | T3 | T4 | T5>;
export declare function merge<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<T | T2 | T3 | T4 | T5 | T6>;
export declare function merge<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, concurrent: number): Observable<T | T2 | T3 | T4 | T5 | T6>;
export declare function merge<T>(...observables: (ObservableInput<T> | number)[]): Observable<T>;
export declare function merge<T>(...observables: (ObservableInput<T> | SchedulerLike | number)[]): Observable<T>;
export declare function merge<T, R>(...observables: (ObservableInput<any> | number)[]): Observable<R>;
export declare function merge<T, R>(...observables: (ObservableInput<any> | SchedulerLike | number)[]): Observable<R>;
export declare function merge<A extends readonly unknown[]>(...args: [...ObservableInputTuple<A>]): Observable<A[number]>;
export declare function merge<A extends readonly unknown[]>(...args: [...ObservableInputTuple<A>, number?]): Observable<A[number]>;
export declare function merge<A extends readonly unknown[]>(...args: [...ObservableInputTuple<A>, SchedulerLike?]): Observable<A[number]>;
export declare function merge<A extends readonly unknown[]>(...args: [...ObservableInputTuple<A>, number?, SchedulerLike?]): Observable<A[number]>;

export interface MonoTypeOperatorFunction<T> extends OperatorFunction<T, T> {
}
Expand Down
165 changes: 11 additions & 154 deletions src/internal/observable/merge.ts
Original file line number Diff line number Diff line change
@@ -1,163 +1,20 @@
/** @prettier */
import { Observable } from '../Observable';
import { ObservableInput, SchedulerLike } from '../types';
import { ObservableInput, ObservableInputTuple, SchedulerLike } from '../types';
import { mergeAll } from '../operators/mergeAll';
import { internalFromArray } from './fromArray';
import { argsOrArgArray } from '../util/argsOrArgArray';
import { innerFrom } from './from';
import { EMPTY } from './empty';
import { popNumber, popScheduler } from '../util/args';

/* tslint:disable:max-line-length */
export function merge<A extends readonly unknown[]>(...args: [...ObservableInputTuple<A>]): Observable<A[number]>;
export function merge<A extends readonly unknown[]>(...args: [...ObservableInputTuple<A>, number?]): Observable<A[number]>;
/** @deprecated The scheduler argument is deprecated, use scheduled and mergeAll. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function merge<T>(v1: ObservableInput<T>, scheduler: SchedulerLike): Observable<T>;
export function merge<A extends readonly unknown[]>(...args: [...ObservableInputTuple<A>, SchedulerLike?]): Observable<A[number]>;
/** @deprecated The scheduler argument is deprecated, use scheduled and mergeAll. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function merge<T>(v1: ObservableInput<T>, concurrent: number, scheduler: SchedulerLike): Observable<T>;
/** @deprecated The scheduler argument is deprecated, use scheduled and mergeAll. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function merge<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, scheduler: SchedulerLike): Observable<T | T2>;
/** @deprecated The scheduler argument is deprecated, use scheduled and mergeAll. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function merge<T, T2>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
concurrent: number,
scheduler: SchedulerLike
): Observable<T | T2>;
/** @deprecated The scheduler argument is deprecated, use scheduled and mergeAll. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function merge<T, T2, T3>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
scheduler: SchedulerLike
): Observable<T | T2 | T3>;
/** @deprecated The scheduler argument is deprecated, use scheduled and mergeAll. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function merge<T, T2, T3>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
concurrent: number,
scheduler: SchedulerLike
): Observable<T | T2 | T3>;
/** @deprecated The scheduler argument is deprecated, use scheduled and mergeAll. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function merge<T, T2, T3, T4>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
v4: ObservableInput<T4>,
scheduler: SchedulerLike
): Observable<T | T2 | T3 | T4>;
/** @deprecated The scheduler argument is deprecated, use scheduled and mergeAll. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function merge<T, T2, T3, T4>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
v4: ObservableInput<T4>,
concurrent: number,
scheduler: SchedulerLike
): Observable<T | T2 | T3 | T4>;
/** @deprecated The scheduler argument is deprecated, use scheduled and mergeAll. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function merge<T, T2, T3, T4, T5>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
v4: ObservableInput<T4>,
v5: ObservableInput<T5>,
scheduler: SchedulerLike
): Observable<T | T2 | T3 | T4 | T5>;
/** @deprecated The scheduler argument is deprecated, use scheduled and mergeAll. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function merge<T, T2, T3, T4, T5>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
v4: ObservableInput<T4>,
v5: ObservableInput<T5>,
concurrent: number,
scheduler: SchedulerLike
): Observable<T | T2 | T3 | T4 | T5>;
/** @deprecated The scheduler argument is deprecated, use scheduled and mergeAll. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function merge<T, T2, T3, T4, T5, T6>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
v4: ObservableInput<T4>,
v5: ObservableInput<T5>,
v6: ObservableInput<T6>,
scheduler: SchedulerLike
): Observable<T | T2 | T3 | T4 | T5 | T6>;
/** @deprecated The scheduler argument is deprecated, use scheduled and mergeAll. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function merge<T, T2, T3, T4, T5, T6>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
v4: ObservableInput<T4>,
v5: ObservableInput<T5>,
v6: ObservableInput<T6>,
concurrent: number,
scheduler: SchedulerLike
): Observable<T | T2 | T3 | T4 | T5 | T6>;
export function merge<A extends readonly unknown[]>(...args: [...ObservableInputTuple<A>, number?, SchedulerLike?]): Observable<A[number]>;

export function merge<T>(v1: ObservableInput<T>): Observable<T>;
export function merge<T>(v1: ObservableInput<T>, concurrent: number): Observable<T>;
export function merge<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>): Observable<T | T2>;
export function merge<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, concurrent: number): Observable<T | T2>;
export function merge<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<T | T2 | T3>;
export function merge<T, T2, T3>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
concurrent: number
): Observable<T | T2 | T3>;
export function merge<T, T2, T3, T4>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
v4: ObservableInput<T4>
): Observable<T | T2 | T3 | T4>;
export function merge<T, T2, T3, T4>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
v4: ObservableInput<T4>,
concurrent: number
): Observable<T | T2 | T3 | T4>;
export function merge<T, T2, T3, T4, T5>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
v4: ObservableInput<T4>,
v5: ObservableInput<T5>
): Observable<T | T2 | T3 | T4 | T5>;
export function merge<T, T2, T3, T4, T5>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
v4: ObservableInput<T4>,
v5: ObservableInput<T5>,
concurrent: number
): Observable<T | T2 | T3 | T4 | T5>;
export function merge<T, T2, T3, T4, T5, T6>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
v4: ObservableInput<T4>,
v5: ObservableInput<T5>,
v6: ObservableInput<T6>
): Observable<T | T2 | T3 | T4 | T5 | T6>;
export function merge<T, T2, T3, T4, T5, T6>(
v1: ObservableInput<T>,
v2: ObservableInput<T2>,
v3: ObservableInput<T3>,
v4: ObservableInput<T4>,
v5: ObservableInput<T5>,
v6: ObservableInput<T6>,
concurrent: number
): Observable<T | T2 | T3 | T4 | T5 | T6>;
export function merge<T>(...observables: (ObservableInput<T> | number)[]): Observable<T>;
/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduler).pipe(mergeAll())*/
export function merge<T>(...observables: (ObservableInput<T> | SchedulerLike | number)[]): Observable<T>;
export function merge<T, R>(...observables: (ObservableInput<any> | number)[]): Observable<R>;
/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduler).pipe(mergeAll())*/
export function merge<T, R>(...observables: (ObservableInput<any> | SchedulerLike | number)[]): Observable<R>;
/* tslint:enable:max-line-length */
/**
* Creates an output Observable which concurrently emits all values from every
* given input Observable.
Expand Down Expand Up @@ -225,16 +82,16 @@ export function merge<T, R>(...observables: (ObservableInput<any> | SchedulerLik
* @return {Observable} an Observable that emits items that are the result of
* every input Observable.
*/
export function merge(...args: (ObservableInput<any> | SchedulerLike | number)[]): Observable<unknown> {
export function merge(...args: (ObservableInput<unknown> | number | SchedulerLike)[]): Observable<unknown> {
const scheduler = popScheduler(args);
const concurrent = popNumber(args, Infinity);
args = argsOrArgArray(args);
return !args.length
const sources = argsOrArgArray(args) as ObservableInput<unknown>[];
return !sources.length
? // No source provided
EMPTY
: args.length === 1
: sources.length === 1
? // One source? Just return it.
innerFrom(args[0] as ObservableInput<any>)
innerFrom(sources[0])
: // Merge all sources
mergeAll(concurrent)(internalFromArray(args as ObservableInput<any>[], scheduler));
mergeAll(concurrent)(internalFromArray(sources, scheduler));
}

0 comments on commit 646d022

Please sign in to comment.