-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathObservable.ts
133 lines (127 loc) · 3.85 KB
/
Observable.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import {
SubscriberFunction,
Subscription,
SubscriptionLike
} from "./Subscription.ts";
import { Observer } from "./Observer.ts";
import {
isAsyncIterable,
isIterable,
isObservableLike
} from "./symbol.ts";
import { isObserver, assertIsObserver } from "./utils.ts";
/**
* Interface that aligns with signatures to an object that is an Observable
*/
export interface ObservableLike<Value = unknown> {
[Symbol.observable](): Observable<Value>;
}
/**
* An Observable represents a sequence of values which may be observed.
*/
export class Observable<Value = unknown> implements ObservableLike<Value> {
private _subscriber: SubscriberFunction<Value>;
constructor(subscriber: SubscriberFunction<Value>) {
if (!(this instanceof Observable)) {
throw new TypeError("Observable cannot be called as a function");
}
if (typeof subscriber !== "function") {
throw new TypeError("Observable initializer must be a function");
}
this._subscriber = subscriber;
}
/**
* Subscribes to the sequence with an observer
* @param observer the observer to be subscribed
*/
subscribe(observer: Observer<Value>): SubscriptionLike;
/**
* Subscribes to the sequence with callbacks
* @param onNext callback to be invoked on the next value of the sequence
* @param onError callback to be invoked on the next error of the sequence
* @param onComplete callback to be invoked when the sequence is completed
*/
subscribe(
onNext: (value: Value) => void,
onError?: (error: unknown) => void,
onComplete?: () => void,
): SubscriptionLike;
subscribe(
observerOrOnNext: Observer<Value> | ((value: Value) => void),
onError?: (error: unknown) => void,
onComplete?: () => void,
): SubscriptionLike {
const observer = isObserver(observerOrOnNext) ? observerOrOnNext : {
next: observerOrOnNext,
error: onError,
complete: onComplete,
};
assertIsObserver(observer);
return new Subscription(observer, {subscriber: this._subscriber});
}
/** Returns itself */
[Symbol.observable](): Observable<Value> {
return this;
}
/**
* Converts arguments to an Observable
* @param items an iterablet
*/
static of<Value = unknown>(...items: Value[]): Observable<Value> {
return new Observable<Value>((observer) => {
for (const iterator of items) observer.next(iterator);
observer.complete();
});
}
/**
* Converts an observable Observable
* @param observable the observableLike to be converted
*/
static from<Value = unknown>(
observable: ObservableLike<Value>,
): Observable<Value>;
/**
* Converts an iterable to an Observable
* @param iterable the iterable to be converted
*/
static from<Value = unknown>(
iterable: Iterable<Value>,
): Observable<Value>;
/**
* Converts an async iterable to an Observable
* @param iterable the iterable to be converted
*/
static from<Value = unknown>(
iterable: AsyncIterable<Value>,
): Observable<Value>;
static from<Value = unknown>(
argument:
| Iterable<Value>
| AsyncIterable<Value>
| ObservableLike<Value>,
): Observable<Value> {
if (isObservableLike(argument)) {
const observable = argument[Symbol.observable]();
return new Observable<Value>((observer) =>
observable.subscribe(observer)
);
}
if (isIterable(argument)) {
return new Observable<Value>((observer) => {
for (const value of argument) observer.next(value);
observer.complete();
});
}
if (isAsyncIterable(argument)) {
return new Observable<Value>((observe) => {
(async () => {
for await (const value of argument) observe.next(value);
})();
observe.complete();
});
}
throw new TypeError(
"Observable.from expects to receive an ObservableLike, Iterable or AsyncIterable",
);
}
}