Skip to content

Commit

Permalink
fix: No longer allow invalid "Subscribable" type as valid observable …
Browse files Browse the repository at this point in the history
…source in `from` and others.

- Deprecates `SubscribableOrPromise` type and removes its usage throughout the library
- Updates `ObservableInput` to be a more direct list and include `Observable` itself.
- Updates `switchAll` to have proper typing (it broke after the refactor of `ObservableInput`), removing weird legacy type
- Updates related tests

Resolves #4532
  • Loading branch information
benlesh committed Oct 15, 2020
1 parent 85eb7e3 commit 258dddd
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 41 deletions.
20 changes: 9 additions & 11 deletions spec/operators/expand-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { expect } from 'chai';
import { expand, mergeMap, map, take, toArray } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { Subscribable, EMPTY, Observable, of, Observer, asapScheduler, asyncScheduler } from 'rxjs';
import { Subscribable, EMPTY, Observable, of, Observer, asapScheduler, asyncScheduler, InteropObservable } from 'rxjs';

declare const rxTestScheduler: TestScheduler;

Expand Down Expand Up @@ -359,22 +359,20 @@ describe('expand', () => {

it('should recursively flatten lowercase-o observables', (done) => {
const expected = [1, 2, 4, 8, 16];
const project = (x: number, index: number): Subscribable<number> => {
const project = (x: number): InteropObservable<number> => {
if (x === 16) {
return <any>EMPTY;
return EMPTY as any;
}

const ish: any = {
subscribe: (observer: Observer<number>) => {
return {
subscribe(observer: Observer<number>) {
observer.next(x + x);
observer.complete();
},
[Symbol.observable] () {
return this;
}
};

ish[Symbol.observable] = function () {
return this;
};
return <Subscribable<number>> ish;
} as any;
};

of(1).pipe(
Expand Down
4 changes: 2 additions & 2 deletions src/internal/observable/forkJoin.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/** @prettier */
import { Observable } from '../Observable';
import { ObservableInput, ObservedValueUnionFromArray, ObservedValueOf, SubscribableOrPromise } from '../types';
import { ObservableInput, ObservedValueUnionFromArray, ObservedValueOf } from '../types';
import { map } from '../operators/map';
import { argsArgArrayOrObject } from '../util/argsArgArrayOrObject';
import { innerFrom } from './from';
import { popResultSelector } from '../util/args';

// forkJoin(a$, b$, c$)
/** @deprecated Use the version that takes an array of Observables instead */
export function forkJoin<T>(v1: SubscribableOrPromise<T>): Observable<[T]>;
export function forkJoin<T>(v1: ObservableInput<T>): Observable<[T]>;
/** @deprecated Use the version that takes an array of Observables instead */
export function forkJoin<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>): Observable<[T, T2]>;
/** @deprecated Use the version that takes an array of Observables instead */
Expand Down
2 changes: 1 addition & 1 deletion src/internal/observable/from.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ export function from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): O
// TODO: Use this throughout the library, rather than the `from` above, to avoid
// the unnecessary scheduling check and reduce bundled sizes of operators that use `from`.
// TODO: Eventually, this just becomes `from`, as we don't have the deprecated scheduled path anymore.
export function innerFrom<T>(input: ObservableInput<T>) {
export function innerFrom<T>(input: ObservableInput<T>): Observable<T> {
if (input instanceof Observable) {
return input;
}
Expand Down
6 changes: 3 additions & 3 deletions src/internal/observable/iif.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Observable } from '../Observable';
import { defer } from './defer';
import { EMPTY } from './empty';
import { SubscribableOrPromise } from '../types';
import { ObservableInput } from '../types';

/**
* Decides at subscription time which Observable will actually be subscribed.
Expand Down Expand Up @@ -90,8 +90,8 @@ import { SubscribableOrPromise } from '../types';
*/
export function iif<T = never, F = never>(
condition: () => boolean,
trueResult: SubscribableOrPromise<T> = EMPTY,
falseResult: SubscribableOrPromise<F> = EMPTY
trueResult: ObservableInput<T> = EMPTY,
falseResult: ObservableInput<F> = EMPTY
): Observable<T|F> {
return defer(() => condition() ? trueResult : falseResult);
}
8 changes: 4 additions & 4 deletions src/internal/operators/audit.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/** @prettier */
import { Subscriber } from '../Subscriber';
import { MonoTypeOperatorFunction, SubscribableOrPromise } from '../types';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';

import { operate } from '../util/lift';
import { innerFrom } from '../observable/from';
Expand Down Expand Up @@ -44,13 +44,13 @@ import { OperatorSubscriber } from './OperatorSubscriber';
* @see {@link sample}
* @see {@link throttle}
*
* @param {function(value: T): SubscribableOrPromise} durationSelector A function
* @param durationSelector A function
* that receives a value from the source Observable, for computing the silencing
* duration, returned as an Observable or a Promise.
* @return {Observable<T>} An Observable that performs rate-limiting of
* @return An Observable that performs rate-limiting of
* emissions from the source Observable.
*/
export function audit<T>(durationSelector: (value: T) => SubscribableOrPromise<any>): MonoTypeOperatorFunction<T> {
export function audit<T>(durationSelector: (value: T) => ObservableInput<any>): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
let hasValue = false;
let lastValue: T | null = null;
Expand Down
12 changes: 6 additions & 6 deletions src/internal/operators/bufferToggle.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/** @prettier */
import { Subscription } from '../Subscription';
import { OperatorFunction, SubscribableOrPromise } from '../types';
import { OperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { innerFrom } from '../observable/from';
import { OperatorSubscriber } from './OperatorSubscriber';
Expand Down Expand Up @@ -43,17 +43,17 @@ import { arrRemove } from '../util/arrRemove';
* @see {@link bufferWhen}
* @see {@link windowToggle}
*
* @param {SubscribableOrPromise<O>} openings A Subscribable or Promise of notifications to start new
* @param openings A Subscribable or Promise of notifications to start new
* buffers.
* @param {function(value: O): SubscribableOrPromise} closingSelector A function that takes
* @param closingSelector A function that takes
* the value emitted by the `openings` observable and returns a Subscribable or Promise,
* which, when it emits, signals that the associated buffer should be emitted
* and cleared.
* @return {Observable<T[]>} An observable of arrays of buffered values.
* @return An observable of arrays of buffered values.
*/
export function bufferToggle<T, O>(
openings: SubscribableOrPromise<O>,
closingSelector: (value: O) => SubscribableOrPromise<any>
openings: ObservableInput<O>,
closingSelector: (value: O) => ObservableInput<any>
): OperatorFunction<T, T[]> {
return operate((source, subscriber) => {
const buffers: T[][] = [];
Expand Down
8 changes: 4 additions & 4 deletions src/internal/operators/debounce.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/** @prettier */
import { Subscriber } from '../Subscriber';
import { MonoTypeOperatorFunction, SubscribableOrPromise } from '../types';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';

import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
Expand Down Expand Up @@ -56,14 +56,14 @@ import { innerFrom } from '../observable/from';
* @see {@link throttle}
* @see {@link throttleTime}
*
* @param {function(value: T): SubscribableOrPromise} durationSelector A function
* @param durationSelector A function
* that receives a value from the source Observable, for computing the timeout
* duration for each source value, returned as an Observable or a Promise.
* @return {Observable} An Observable that delays the emissions of the source
* @return An Observable that delays the emissions of the source
* Observable by the specified duration Observable returned by
* `durationSelector`, and may drop some values if they occur too frequently.
*/
export function debounce<T>(durationSelector: (value: T) => SubscribableOrPromise<any>): MonoTypeOperatorFunction<T> {
export function debounce<T>(durationSelector: (value: T) => ObservableInput<any>): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
let hasValue = false;
let lastValue: T | null = null;
Expand Down
5 changes: 2 additions & 3 deletions src/internal/operators/switchAll.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import {OperatorFunction, ObservableInput} from '../types';
import {OperatorFunction, ObservableInput, ObservedValueOf} from '../types';
import { switchMap } from './switchMap';
import { identity } from '../util/identity';

export function switchAll<T>(): OperatorFunction<ObservableInput<T>, T>;
export function switchAll<R>(): OperatorFunction<any, R>;
export function switchAll<O extends ObservableInput<any>>(): OperatorFunction<O, ObservedValueOf<O>>;

/**
* Converts a higher-order Observable into a first-order Observable
Expand Down
10 changes: 5 additions & 5 deletions src/internal/operators/throttle.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/** @prettier */
import { Subscription } from '../Subscription';

import { MonoTypeOperatorFunction, SubscribableOrPromise } from '../types';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { innerFrom } from '../observable/from';
Expand Down Expand Up @@ -52,16 +52,16 @@ export const defaultThrottleConfig: ThrottleConfig = {
* @see {@link sample}
* @see {@link throttleTime}
*
* @param {function(value: T): SubscribableOrPromise} durationSelector A function
* @param durationSelector A function
* that receives a value from the source Observable, for computing the silencing
* duration for each source value, returned as an Observable or a Promise.
* @param {Object} config a configuration object to define `leading` and `trailing` behavior. Defaults
* @param config a configuration object to define `leading` and `trailing` behavior. Defaults
* to `{ leading: true, trailing: false }`.
* @return {Observable<T>} An Observable that performs the throttle operation to
* @return An Observable that performs the throttle operation to
* limit the rate of emissions from the source.
*/
export function throttle<T>(
durationSelector: (value: T) => SubscribableOrPromise<any>,
durationSelector: (value: T) => ObservableInput<any>,
{ leading, trailing }: ThrottleConfig = defaultThrottleConfig
): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
Expand Down
9 changes: 8 additions & 1 deletion src/internal/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export interface SubscriptionLike extends Unsubscribable {
readonly closed: boolean;
}

/** @deprecated To be removed in v8. Do not use. Most likely you want to use `ObservableInput` */
export type SubscribableOrPromise<T> = Subscribable<T> | Subscribable<never> | PromiseLike<T> | InteropObservable<T>;

/** OBSERVABLE INTERFACES */
Expand All @@ -80,11 +81,17 @@ export interface Subscribable<T> {
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Unsubscribable;
}

export type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T> | AsyncIterableIterator<T>;
/**
* Valid types that can be converted to observables.
*/
export type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;

/** @deprecated use {@link InteropObservable } */
export type ObservableLike<T> = InteropObservable<T>;

/**
* An object that implements the `Symbol.observable` interface.
*/
export interface InteropObservable<T> {
[Symbol.observable]: () => Subscribable<T>;
}
Expand Down
2 changes: 1 addition & 1 deletion src/internal/util/argsArgArrayOrObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export function argsArgArrayOrObject<T, O extends Record<string, T>>(args: T[] |
if (isPOJO(first)) {
const keys = getKeys(first);
return {
args: keys.map((key) => (first as O)[key]),
args: keys.map((key) => first[key]),
keys,
};
}
Expand Down

0 comments on commit 258dddd

Please sign in to comment.