Skip to content

Commit

Permalink
feat: Add remember operator (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
jvanbruegge authored Apr 28, 2022
1 parent 1bebe5c commit 6899403
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 6 deletions.
5 changes: 5 additions & 0 deletions .prettierrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"singleQuote": true,
"tabWidth": 2,
"arrowParens": "avoid"
}
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Currently, the following set of operators is implemented, others might follow. N
- [`sampleWith`](./src/sample.ts)
- [`sampleCombine`](./src/sample.ts)
- [`multicast`](./src/multicast.ts)
- [`remember`](./src/remember.ts)

### Sinks

Expand Down
5 changes: 0 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,5 @@
},
"publishConfig": {
"access": "public"
},
"prettier": {
"singleQuote": true,
"tabWidth": 2,
"arrowParens": "avoid"
}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export { flatten } from './flatten';
export { sample, sampleWith, sampleCombine } from './sample';
export { uponStart, uponEnd } from './hooks';
export { multicast } from './multicast';
export { remember } from './remember';

// Sinks
export { subscribe } from './subscribe';
Expand Down
2 changes: 1 addition & 1 deletion src/multicast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export function multicast<T>(source: Producer<T>): Producer<T> {
sinks[sinks.indexOf(sink)] = void 0;

// Allow others to subscribe in the same iteration of the JS event loop
Promise.resolve().then(() => {
queueMicrotask(() => {
if (sinks.every(x => x === undefined)) {
sinks = [];
talkback(2);
Expand Down
70 changes: 70 additions & 0 deletions src/remember.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { Operator, StrictSink } from './types';

export function remember<T>(
numRemembered: number = 1,
shouldUnsubscribe: boolean = false
): Operator<T, T> {
return source => {
const marker = {};
let sinks: Array<StrictSink<T> | undefined> = [];
let talkback: any;
let lasts: T[] = Array(numRemembered).fill(marker);
let curr = -1;
let started = false;

const mkTalkback = (sink: StrictSink<T>) => () => {
sinks[sinks.indexOf(sink)] = void 0;

if (shouldUnsubscribe) {
// Allow others to subscribe in the same iteration of the JS event loop
queueMicrotask(() => {
if (sinks.every(x => x === undefined)) {
sinks = [];
talkback(2);
}
});
}
};

return (_, sink) => {
sinks.push(sink);

if (!started) {
source(0, (t, d) => {
if (t === 0) {
started = true;
talkback = d;
for (let i = 0; i < sinks.length; i++) {
const sink = sinks[i]!;
sink(0, mkTalkback(sink));
}
} else {
if (t === 1) {
curr = (curr + 1) % numRemembered;
lasts[curr] = d;
}

let hasDeleted = false;
for (let i = 0; i < sinks.length; i++) {
const sink = sinks[i];
if (sink) sink(t, d);
else hasDeleted = true;
}

if (hasDeleted) {
sinks = sinks.filter(x => x !== undefined);
}
}
});
} else {
sink(0, mkTalkback(sink));
for (let i = 0; i < lasts.length; i++) {
let idx = (curr + 1 + i) % numRemembered;
if (lasts[idx] !== marker) {
sink(1, lasts[idx]);
}
}
}
};
};
}
184 changes: 184 additions & 0 deletions test/remember.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import * as assert from 'assert';

import {
pipe,
subscribe,
of,
uponStart,
fromArray,
remember,
uponEnd,
merge,
makeSubject,
never,
} from '../src/index';

describe('remember tests', () => {
it('should call hook only once with remember', done => {
let numHook = 0;
const of$ = pipe(
of('some value'),
uponStart(() => {
numHook++;
}),
remember()
);

let listener1Called = false;
let listener2Called = false;

pipe(
of$,
subscribe(
x => {
assert.strictEqual(x, 'some value');
listener1Called = true;
},
err => {
if (err) done(err);
}
)
);

pipe(
of$,
subscribe(
x => {
assert.strictEqual(x, 'some value');
listener2Called = true;
},
err => {
if (err) done(err);
}
)
);

setTimeout(() => {
assert.strictEqual(listener1Called, true, 'listener 1 called');
assert.strictEqual(listener2Called, true, 'listener 2 called');
assert.strictEqual(numHook, 1);
done();
}, 20);
});

it('should call hook only once with remember when shouldUnsubscribe == true', done => {
let numHook = 0;
let completed = 0;
const of$ = pipe(
merge(of('some value'), never()),
uponStart(() => {
numHook++;
}),
uponEnd(() => {
completed++;
}),
remember(1, true)
);

let listener1Called = false;
let listener2Called = false;

const unsubscribe1 = pipe(
of$,
subscribe(
x => {
assert.strictEqual(x, 'some value');
listener1Called = true;
},
err => {
if (err) done(err);
}
)
);

const unsubscribe2 = pipe(
of$,
subscribe(
x => {
assert.strictEqual(x, 'some value');
listener2Called = true;
},
err => {
if (err) done(err);
}
)
);

unsubscribe1();

setTimeout(() => unsubscribe2(), 10);
setTimeout(() => {
assert.strictEqual(listener1Called, true, 'listener 1 called');
assert.strictEqual(listener2Called, true, 'listener 2 called');
assert.strictEqual(numHook, 1);
assert.strictEqual(completed, 1);
done();
}, 20);
});

it('should synchronously provide last value even after emission', done => {
const data$ = pipe(fromArray([1, 2, 3]), remember(2));

let numCalled = [0, 0];
let expected1 = [1, 2, 3];
pipe(
data$,
subscribe(x => {
assert.strictEqual(x, expected1.shift());
numCalled[0]++;
})
);

setTimeout(() => {
let expected2 = [2, 3];
pipe(
data$,
subscribe(x => {
assert.strictEqual(x, expected2.shift());
numCalled[1]++;
})
);

// No setTimeout, emission should be synchronous
assert.deepStrictEqual(numCalled, [3, 2]);
done();
}, 10);
});

it('should not unsubscribe from source if all sinks unsubscribe', done => {
let numCalled = [0, 0];
const subject = makeSubject<number>();

const data$ = pipe(
merge(of(1), subject),
uponEnd(() => assert.fail('should not unsubscribe source')),
remember(2)
);

const unsubscribe1 = pipe(
data$,
subscribe(x => {
assert.strictEqual(x, 1);
numCalled[0]++;
})
);

let expected = [1, 2];
const unsubscribe2 = pipe(
data$,
subscribe(x => {
assert.strictEqual(x, expected.shift());
numCalled[1]++;
})
);

setTimeout(() => {
unsubscribe1();
subject(1, 2);
unsubscribe2();

assert.deepStrictEqual(numCalled, [1, 2]);
done();
}, 10);
});
});

0 comments on commit 6899403

Please sign in to comment.