diff --git a/src/common.ts b/src/common.ts
index d2c197f..968c159 100644
--- a/src/common.ts
+++ b/src/common.ts
@@ -98,8 +98,12 @@ export abstract class Reactive implements Child {
subscribe(callback: (a: A) => void): PushOnlyObserver {
return new PushOnlyObserver(callback, this);
}
- observe(push: (a: A) => void, handlePulling: PullHandler): CbObserver {
- return new CbObserver(push, handlePulling, this);
+ observe(
+ push: (a: A) => void,
+ handlePulling: PullHandler,
+ t: Time = tick()
+ ): CbObserver {
+ return new CbObserver(push, handlePulling, t, this);
}
activate(t: number): void {
let newState = State.Done;
@@ -127,9 +131,10 @@ export class CbObserver implements BListener, SListener {
private endPulling: () => void;
node: Node> = new Node(this);
constructor(
- private callback: (a: A) => void,
- private handlePulling: PullHandler,
- private source: Parent
+ readonly callback: (a: A) => void,
+ readonly handlePulling: PullHandler,
+ private time: Time,
+ readonly source: Parent
) {
source.addListener(this.node, tick());
if (source.state === State.Pull) {
@@ -137,8 +142,11 @@ export class CbObserver implements BListener, SListener {
} else if (isBehavior(source) && source.state === State.Push) {
callback(source.last);
}
+ this.time = undefined;
}
- pull(t: number = tick()): void {
+ pull(time: number): void {
+ const t =
+ time !== undefined ? time : this.time !== undefined ? this.time : tick();
if (isBehavior(this.source) && this.source.state === State.Pull) {
this.source.pull(t);
this.callback(this.source.last);
diff --git a/src/index.ts b/src/index.ts
index 22d9ed2..eb27cd5 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -1,20 +1,7 @@
import { Now, MapNowTuple, InstantRun, instant, sample } from "./now";
-import {
- Behavior,
- SinkBehavior,
- MapBehaviorTuple,
- AccumPair,
- accum,
- accumCombine,
- stepper,
- when,
- toggle,
- switcher,
- freezeAt
-} from "./behavior";
-import { Stream, SinkStream, scan, shift } from "./stream";
+import { Behavior, SinkBehavior, MapBehaviorTuple } from "./behavior";
+import { Stream, SinkStream } from "./stream";
import { Future, MapFutureTuple } from "./future";
-import { integrate } from "./time";
export * from "./common";
export * from "./now";
@@ -24,6 +11,7 @@ export * from "./future";
export * from "./time";
export * from "./placeholder";
export * from "./animation";
+export * from "./clock";
/**
* Map a function over a behavior or stream. This means that if at some point in
@@ -65,7 +53,7 @@ export function push(a: A, sink: SinkBehavior | SinkStream): void {
sink.push(a);
}
-export function combine(...streams: Future[]): Future;
+export function combine(...futures: Future[]): Future;
export function combine(...streams: Stream[]): Stream;
export function combine(
...values: Future[] | Stream[]
diff --git a/test/behavior.ts b/test/behavior.ts
index 42043b8..e283cc6 100644
--- a/test/behavior.ts
+++ b/test/behavior.ts
@@ -119,6 +119,22 @@ describe("behavior", () => {
assert.deepEqual(spy.args, [[1], [2], [3]]);
});
});
+ describe("observe", () => {
+ it("samples in same timestamp", () => {
+ const f = jest.fn(() => 0);
+ const b = fromFunction(f);
+ let result = -1;
+ b.observe(
+ (n) => result,
+ (pull) => {
+ pull();
+ return () => {};
+ },
+ 3
+ );
+ expect(f.mock.calls).toEqual([[3]]);
+ });
+ });
describe("fromFunction", () => {
it("pulls from time varying functions", () => {
let time = 0;
@@ -241,18 +257,20 @@ describe("behavior", () => {
const numE = H.fromFunction(() => n);
const applied = H.ap(fnB, numE);
const cb = spy();
- applied.observe(cb, (pull) => {
- pull();
- push(add(2), fnB);
- pull();
- n = 4;
- pull();
- push(double, fnB);
- pull();
- n = 8;
- pull();
+ let pull: () => void;
+ applied.observe(cb, (pull_) => {
+ pull = pull_;
return () => {};
});
+ pull();
+ push(add(2), fnB);
+ pull();
+ n = 4;
+ pull();
+ push(double, fnB);
+ pull();
+ n = 8;
+ pull();
assert.deepEqual(cb.args, [[6], [3], [6], [8], [16]]);
});
});