Skip to content

Commit

Permalink
refactor: directly use Subscriber to create safe subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Feb 21, 2022
1 parent 4e5d34d commit bdab737
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 37 deletions.
12 changes: 2 additions & 10 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Operator } from './Operator';
import { createSafeSubscriber, Subscriber } from './Subscriber';
import { isSubscription, Subscription } from './Subscription';
import { Subscription } from './Subscription';
import { TeardownLogic, OperatorFunction, Subscribable, Observer } from './types';
import { observable as Symbol_observable } from './symbol/observable';
import { pipeFromArray } from './util/pipe';
import { isFunction } from './util/isFunction';
import { isSubscriber } from './util/isSubscriber';

/**
* A representation of any set of values over any amount of time. This is the most basic building block
Expand Down Expand Up @@ -394,11 +394,3 @@ export class Observable<T> implements Subscribable<T> {
return pipeFromArray(operations)(this);
}
}

function isObserver<T>(value: any): value is Observer<T> {
return value && isFunction(value.next) && isFunction(value.error) && isFunction(value.complete);
}

function isSubscriber<T>(value: any): value is Subscriber<T> {
return (value && value instanceof Subscriber) || (isObserver(value) && isSubscription(value));
}
37 changes: 10 additions & 27 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { reportUnhandledError } from './util/reportUnhandledError';
import { noop } from './util/noop';
import { nextNotification, errorNotification, COMPLETE_NOTIFICATION } from './NotificationFactories';
import { timeoutProvider } from './scheduler/timeoutProvider';
import { isSubscriber } from './util/isSubscriber';

/**
* Implements the {@link Observer} interface and extends the
Expand Down Expand Up @@ -40,23 +41,20 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
protected isStopped: boolean = false;
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
protected destination: Subscriber<any> | Observer<any>; // this `any` is the escape hatch to erase extra type param (e.g. R)
protected destination: Subscriber<T> | Observer<T>; // this `any` is the escape hatch to erase extra type param (e.g. R)

/**
* @deprecated Internal implementation detail, do not use directly. Will be made internal in v8.
* There is no reason to directly create an instance of Subscriber. This type is exported for typings reasons.
*/
constructor(destination?: Subscriber<any> | Observer<any>) {
constructor(destination?: Subscriber<T> | Partial<Observer<T>> | ((value: T) => void) | null) {
super();
if (destination) {
this.destination = destination;
// Automatically chain subscriptions together here.
// if destination is a Subscription, then it is a Subscriber.
if (isSubscription(destination)) {
destination.add(this);
}
} else {
this.destination = EMPTY_OBSERVER;
this.destination = isSubscriber(destination) ? destination : createSafeObserver(destination);

// Automatically chain subscriptions together here.
// if destination is a Subscription, then it is a Subscriber.
if (isSubscription(destination)) {
destination.add(this);
}
}

Expand Down Expand Up @@ -135,21 +133,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
}
}

/**
* This bind is captured here because we want to be able to have
* compatibility with monoid libraries that tend to use a method named
* `bind`. In particular, a library called Monio requires this.
*/
const _bind = Function.prototype.bind;

function bind<Fn extends (...args: any[]) => any>(fn: Fn, thisArg: any): Fn {
return _bind.call(fn, thisArg);
}

/**
* Internal optimization only, DO NOT EXPOSE.
* @internal
*/
class ConsumerObserver<T> implements Observer<T> {
constructor(private partialObserver: Partial<Observer<T>>) {}

Expand Down Expand Up @@ -194,7 +177,7 @@ function createSafeObserver<T>(observerOrNext?: Partial<Observer<T>> | ((value:
}

export function createSafeSubscriber<T>(observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null) {
return new SafeSubscriber(observerOrNext);
return new Subscriber(observerOrNext);
}

class SafeSubscriber<T> extends Subscriber<T> {
Expand Down
6 changes: 6 additions & 0 deletions src/internal/util/isObserver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { Observer } from '../types';
import { isFunction } from './isFunction';

export function isObserver<T>(value: any): value is Observer<T> {
return value && isFunction(value.next) && isFunction(value.error) && isFunction(value.complete);
}
7 changes: 7 additions & 0 deletions src/internal/util/isSubscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { Subscriber } from '../Subscriber';
import { isSubscription } from '../Subscription';
import { isObserver } from './isObserver';

export function isSubscriber<T>(value: any): value is Subscriber<T> {
return (value && value instanceof Subscriber) || (isObserver(value) && isSubscription(value));
}

0 comments on commit bdab737

Please sign in to comment.