This repository has been archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathconnectableobservable.js
76 lines (65 loc) · 2.17 KB
/
connectableobservable.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
var RefCountObservable = (function (__super__) {
inherits(RefCountObservable, __super__);
function RefCountObservable(source) {
this.source = source;
this._count = 0;
this._connectableSubscription = null;
__super__.call(this);
}
RefCountObservable.prototype.subscribeCore = function (o) {
var subscription = this.source.subscribe(o);
++this._count === 1 && (this._connectableSubscription = this.source.connect());
return new RefCountDisposable(this, subscription);
};
function RefCountDisposable(p, s) {
this._p = p;
this._s = s;
this.isDisposed = false;
}
RefCountDisposable.prototype.dispose = function () {
if (!this.isDisposed) {
this.isDisposed = true;
this._s.dispose();
--this._p._count === 0 && this._p._connectableSubscription.dispose();
}
};
return RefCountObservable;
}(ObservableBase));
var ConnectableObservable = Rx.ConnectableObservable = (function (__super__) {
inherits(ConnectableObservable, __super__);
function ConnectableObservable(source, subject) {
this.source = source;
this._connection = null;
this._source = source.asObservable();
this._subject = subject;
__super__.call(this);
}
function ConnectDisposable(parent, subscription) {
this._p = parent;
this._s = subscription;
}
ConnectDisposable.prototype.dispose = function () {
if (this._s) {
this._s.dispose();
this._s = null;
this._p._connection = null;
}
};
ConnectableObservable.prototype.connect = function () {
if (!this._connection) {
if (this._subject.isStopped) {
return disposableEmpty;
}
var subscription = this._source.subscribe(this._subject);
this._connection = new ConnectDisposable(this, subscription);
}
return this._connection;
};
ConnectableObservable.prototype._subscribe = function (o) {
return this._subject.subscribe(o);
};
ConnectableObservable.prototype.refCount = function () {
return new RefCountObservable(this);
};
return ConnectableObservable;
}(Observable));