Skip to content

Commit

Permalink
observe takes time and pull in that time on synchronous pull
Browse files Browse the repository at this point in the history
  • Loading branch information
paldepind committed Sep 9, 2019
1 parent c4b0282 commit ab8f7af
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 32 deletions.
20 changes: 14 additions & 6 deletions src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,12 @@ export abstract class Reactive<A, C extends Child> implements Child {
subscribe(callback: (a: A) => void): PushOnlyObserver<A> {
return new PushOnlyObserver(callback, this);
}
observe(push: (a: A) => void, handlePulling: PullHandler): CbObserver<A> {
return new CbObserver(push, handlePulling, this);
observe(
push: (a: A) => void,
handlePulling: PullHandler,
t: Time = tick()
): CbObserver<A> {
return new CbObserver(push, handlePulling, t, this);
}
activate(t: number): void {
let newState = State.Done;
Expand Down Expand Up @@ -127,18 +131,22 @@ export class CbObserver<A> implements BListener, SListener<A> {
private endPulling: () => void;
node: Node<CbObserver<A>> = new Node(this);
constructor(
private callback: (a: A) => void,
private handlePulling: PullHandler,
private source: Parent<Child>
readonly callback: (a: A) => void,
readonly handlePulling: PullHandler,
private time: Time,
readonly source: Parent<Child>
) {
source.addListener(this.node, tick());
if (source.state === State.Pull) {
this.endPulling = handlePulling(this.pull.bind(this));
} else if (isBehavior(source) && source.state === State.Push) {
callback(source.last);
}
this.time = undefined;
}
pull(t: number = tick()): void {
pull(time: number): void {
const t =
time !== undefined ? time : this.time !== undefined ? this.time : tick();
if (isBehavior(this.source) && this.source.state === State.Pull) {
this.source.pull(t);
this.callback(this.source.last);
Expand Down
20 changes: 4 additions & 16 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,7 @@
import { Now, MapNowTuple, InstantRun, instant, sample } from "./now";
import {
Behavior,
SinkBehavior,
MapBehaviorTuple,
AccumPair,
accum,
accumCombine,
stepper,
when,
toggle,
switcher,
freezeAt
} from "./behavior";
import { Stream, SinkStream, scan, shift } from "./stream";
import { Behavior, SinkBehavior, MapBehaviorTuple } from "./behavior";
import { Stream, SinkStream } from "./stream";
import { Future, MapFutureTuple } from "./future";
import { integrate } from "./time";

export * from "./common";
export * from "./now";
Expand All @@ -24,6 +11,7 @@ export * from "./future";
export * from "./time";
export * from "./placeholder";
export * from "./animation";
export * from "./clock";

/**
* Map a function over a behavior or stream. This means that if at some point in
Expand Down Expand Up @@ -65,7 +53,7 @@ export function push<A>(a: A, sink: SinkBehavior<A> | SinkStream<A>): void {
sink.push(a);
}

export function combine<A>(...streams: Future<A>[]): Future<A>;
export function combine<A>(...futures: Future<A>[]): Future<A>;
export function combine<A>(...streams: Stream<A>[]): Stream<A>;
export function combine<A>(
...values: Future<A>[] | Stream<A>[]
Expand Down
38 changes: 28 additions & 10 deletions test/behavior.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,22 @@ describe("behavior", () => {
assert.deepEqual(spy.args, [[1], [2], [3]]);
});
});
describe("observe", () => {
it("samples in same timestamp", () => {
const f = jest.fn(() => 0);
const b = fromFunction(f);
let result = -1;
b.observe(
(n) => result,
(pull) => {
pull();
return () => {};
},
3
);
expect(f.mock.calls).toEqual([[3]]);
});
});
describe("fromFunction", () => {
it("pulls from time varying functions", () => {
let time = 0;
Expand Down Expand Up @@ -241,18 +257,20 @@ describe("behavior", () => {
const numE = H.fromFunction(() => n);
const applied = H.ap(fnB, numE);
const cb = spy();
applied.observe(cb, (pull) => {
pull();
push(add(2), fnB);
pull();
n = 4;
pull();
push(double, fnB);
pull();
n = 8;
pull();
let pull: () => void;
applied.observe(cb, (pull_) => {
pull = pull_;
return () => {};
});
pull();
push(add(2), fnB);
pull();
n = 4;
pull();
push(double, fnB);
pull();
n = 8;
pull();
assert.deepEqual(cb.args, [[6], [3], [6], [8], [16]]);
});
});
Expand Down

0 comments on commit ab8f7af

Please sign in to comment.