Skip to content

Commit 5099aba

Browse files
committed
feat: replay to first subscriber subject
1 parent 4983778 commit 5099aba

File tree

3 files changed

+220
-0
lines changed

3 files changed

+220
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import type * as rxjs from 'rxjs';
2+
import { ReplayFirstSubscriberOnlySubject } from '../replay-first-subscriber-only.subject';
3+
4+
describe('ReplayFirstSubscriberOnlySubject', () => {
5+
let subscriber1NextSpy: jest.Mock;
6+
let subscriber2NextSpy: jest.Mock;
7+
8+
let subscriber1ErrorSpy: jest.Mock;
9+
let subscriber2ErrorSpy: jest.Mock;
10+
11+
let subscriber1CompleteSpy: jest.Mock;
12+
let subscriber2CompleteSpy: jest.Mock;
13+
14+
let subject: rxjs.SubjectLike<string>;
15+
16+
beforeEach(() => {
17+
subject = new ReplayFirstSubscriberOnlySubject<string>();
18+
19+
subscriber1NextSpy = jest.fn();
20+
subscriber2NextSpy = jest.fn();
21+
22+
subscriber1ErrorSpy = jest.fn();
23+
subscriber2ErrorSpy = jest.fn();
24+
25+
subscriber1CompleteSpy = jest.fn();
26+
subscriber2CompleteSpy = jest.fn();
27+
});
28+
29+
afterEach(() => {
30+
jest.clearAllMocks();
31+
jest.clearAllTimers();
32+
});
33+
34+
it('should replay buffered events to the first subscriber only', () => {
35+
// buffer events for the first subscriber
36+
subject.next('A');
37+
subject.next('B');
38+
39+
// first subscriber
40+
subject.subscribe({
41+
next: subscriber1NextSpy,
42+
error: subscriber1ErrorSpy,
43+
complete: subscriber1CompleteSpy,
44+
});
45+
46+
// new event to all current subscribers
47+
subject.next('C');
48+
49+
// second subscriber
50+
subject.subscribe({
51+
next: subscriber2NextSpy,
52+
error: subscriber2ErrorSpy,
53+
complete: subscriber2CompleteSpy,
54+
});
55+
56+
// new event to all current subscribers
57+
subject.next('D');
58+
59+
// done
60+
subject.complete();
61+
62+
// First subscriber receives all buffered and new events.
63+
expect(subscriber1NextSpy).toHaveBeenCalledTimes(4);
64+
expect(subscriber1NextSpy).toHaveBeenNthCalledWith(1, 'A');
65+
expect(subscriber1NextSpy).toHaveBeenNthCalledWith(2, 'B');
66+
expect(subscriber1NextSpy).toHaveBeenNthCalledWith(3, 'C');
67+
expect(subscriber1NextSpy).toHaveBeenNthCalledWith(4, 'D');
68+
expect(subscriber1ErrorSpy).toHaveBeenCalledTimes(0);
69+
expect(subscriber1CompleteSpy).toHaveBeenCalledTimes(1);
70+
71+
// Subsequent subscribers only receive new events.
72+
expect(subscriber2NextSpy).toHaveBeenCalledTimes(1);
73+
expect(subscriber2NextSpy).toHaveBeenNthCalledWith(1, 'D');
74+
expect(subscriber2ErrorSpy).toHaveBeenCalledTimes(0);
75+
expect(subscriber2CompleteSpy).toHaveBeenCalledTimes(1);
76+
});
77+
78+
it('should emit error to the first subscriber only', () => {
79+
// buffer events for the first subscriber
80+
subject.next('A');
81+
subject.error(new Error('test error'));
82+
83+
// first subscriber
84+
subject.subscribe({
85+
next: subscriber1NextSpy,
86+
error: subscriber1ErrorSpy,
87+
complete: subscriber1CompleteSpy,
88+
});
89+
90+
// new event to all current subscribers
91+
subject.next('C');
92+
93+
// second subscriber
94+
subject.subscribe({
95+
next: subscriber2NextSpy,
96+
error: subscriber2ErrorSpy,
97+
complete: subscriber2CompleteSpy,
98+
});
99+
100+
// new event to all current subscribers
101+
subject.next('D');
102+
103+
// done
104+
subject.complete();
105+
106+
// First subscriber receives all buffered events then errors.
107+
expect(subscriber1NextSpy).toHaveBeenCalledTimes(1);
108+
expect(subscriber1NextSpy).toHaveBeenNthCalledWith(1, 'A');
109+
expect(subscriber1ErrorSpy).toHaveBeenCalledTimes(1);
110+
expect(subscriber1CompleteSpy).toHaveBeenCalledTimes(0);
111+
112+
// Subsequent subscribers only receive new events.
113+
expect(subscriber2NextSpy).toHaveBeenCalledTimes(1);
114+
expect(subscriber2NextSpy).toHaveBeenNthCalledWith(1, 'D');
115+
expect(subscriber2ErrorSpy).toHaveBeenCalledTimes(0);
116+
expect(subscriber2CompleteSpy).toHaveBeenCalledTimes(1);
117+
});
118+
119+
it('should emit error to all subscribers', () => {
120+
// buffer events for the first subscriber
121+
subject.next('A');
122+
subject.next('B');
123+
124+
// first subscriber
125+
subject.subscribe({
126+
next: subscriber1NextSpy,
127+
error: subscriber1ErrorSpy,
128+
complete: subscriber1CompleteSpy,
129+
});
130+
131+
// new event to all current subscribers
132+
subject.next('C');
133+
134+
// second subscriber
135+
subject.subscribe({
136+
next: subscriber2NextSpy,
137+
error: subscriber2ErrorSpy,
138+
complete: subscriber2CompleteSpy,
139+
});
140+
141+
// new event to all current subscribers
142+
subject.error(new Error('test error'));
143+
subject.next('D'); // no subscriber will receive this because of the error
144+
145+
// done
146+
subject.complete();
147+
148+
// First subscriber receives all buffered and new events.
149+
expect(subscriber1NextSpy).toHaveBeenCalledTimes(3);
150+
expect(subscriber1NextSpy).toHaveBeenNthCalledWith(1, 'A');
151+
expect(subscriber1NextSpy).toHaveBeenNthCalledWith(2, 'B');
152+
expect(subscriber1NextSpy).toHaveBeenNthCalledWith(3, 'C');
153+
expect(subscriber1ErrorSpy).toHaveBeenCalledTimes(1);
154+
expect(subscriber1CompleteSpy).toHaveBeenCalledTimes(0);
155+
156+
// Subsequent subscribers only receive new events.
157+
expect(subscriber2NextSpy).toHaveBeenCalledTimes(0);
158+
expect(subscriber2ErrorSpy).toHaveBeenCalledTimes(1);
159+
expect(subscriber2CompleteSpy).toHaveBeenCalledTimes(0);
160+
});
161+
});

electron/common/observable/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './replay-first-subscriber-only.subject';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import * as rxjs from 'rxjs';
2+
3+
/**
4+
* A custom `ReplaySubject` that only replays the buffered events
5+
* to the first subscriber. All subsequent subscribers will only
6+
* receive new events, as if subscribed to a `Subject`.
7+
* https://stackoverflow.com/a/69390202/470818
8+
*/
9+
export class ReplayFirstSubscriberOnlySubject<T>
10+
implements rxjs.SubjectLike<T>
11+
{
12+
/**
13+
* Buffers events and replays them to the first subscriber.
14+
* Continues to emit new events.
15+
*/
16+
private replaySubject: rxjs.Subject<T>;
17+
18+
/**
19+
* Subject used for all subsequent subscribers.
20+
* They only receive new emits, not buffered events before subscribing.
21+
*/
22+
private subject?: rxjs.Subject<T>;
23+
24+
constructor() {
25+
this.replaySubject = new rxjs.ReplaySubject<T>();
26+
}
27+
28+
public next(value: T): void {
29+
// Notify the first subscriber of the value.
30+
this.replaySubject.next(value);
31+
// Notify all subsequent subscribers of the value.
32+
this.subject?.next(value);
33+
}
34+
35+
public error(error: Error): void {
36+
// Notify the first subscriber of the error.
37+
this.replaySubject.error(error);
38+
// Notify all subsequent subscribers of the error.
39+
this.subject?.error(error);
40+
}
41+
42+
public complete(): void {
43+
// Notify the first subscriber of the completion.
44+
this.replaySubject.complete();
45+
// Notify all subsequent subscribers of the completion.
46+
this.subject?.complete();
47+
}
48+
49+
public subscribe(observer: Partial<rxjs.Observer<T>>): rxjs.Unsubscribable {
50+
const subscription = (this.subject ?? this.replaySubject).subscribe(
51+
observer
52+
);
53+
if (!this.subject) {
54+
this.subject = new rxjs.Subject<T>();
55+
}
56+
return subscription;
57+
}
58+
}

0 commit comments

Comments
 (0)