Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

monotonic clock: introduce duration type, split subscribe #7358

Merged
merged 10 commits into from
Oct 30, 2023
4 changes: 2 additions & 2 deletions crates/test-programs/src/bin/preview1_poll_oneoff_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ unsafe fn test_timeout() {
);
assert!(
after - before >= timeout,
"poll_oneoff should sleep for the specified interval"
"poll_oneoff should sleep for the specified interval of {timeout}. before: {before}, after: {after}"
);
}

Expand Down Expand Up @@ -122,7 +122,7 @@ unsafe fn test_sleep() {
);
assert!(
after - before >= timeout,
"poll_oneoff should sleep for the specified interval"
"poll_oneoff should sleep for the specified interval of {timeout}. before: {before}, after: {after}"
);
}

Expand Down
2 changes: 1 addition & 1 deletion crates/test-programs/src/bin/preview2_ip_name_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fn main() {
// the resolution and allows errors.
let addresses = ip_name_lookup::resolve_addresses(&network, "github.com", None, false).unwrap();
let lookup = addresses.subscribe();
let timeout = monotonic_clock::subscribe(1_000_000_000, false);
let timeout = monotonic_clock::subscribe_duration(1_000_000_000);
let ready = poll::poll_list(&[&lookup, &timeout]);
assert!(ready.len() > 0);
match ready[0] {
Expand Down
10 changes: 5 additions & 5 deletions crates/test-programs/src/bin/preview2_sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ fn main() {

fn sleep_10ms() {
let dur = 10_000_000;
let p = monotonic_clock::subscribe(monotonic_clock::now() + dur, true);
let p = monotonic_clock::subscribe_instant(monotonic_clock::now() + dur);
poll::poll_one(&p);
let p = monotonic_clock::subscribe(dur, false);
let p = monotonic_clock::subscribe_duration(dur);
poll::poll_one(&p);
}

fn sleep_0ms() {
let p = monotonic_clock::subscribe(monotonic_clock::now(), true);
let p = monotonic_clock::subscribe_instant(monotonic_clock::now());
poll::poll_one(&p);
let p = monotonic_clock::subscribe(0, false);
let p = monotonic_clock::subscribe_duration(0);
poll::poll_one(&p);
}

fn sleep_backwards_in_time() {
let p = monotonic_clock::subscribe(monotonic_clock::now() - 1, true);
let p = monotonic_clock::subscribe_instant(monotonic_clock::now() - 1);
poll::poll_one(&p);
}
4 changes: 2 additions & 2 deletions crates/test-programs/src/sockets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl OutgoingDatagramStream {
}

pub fn blocking_send(&self, mut datagrams: &[OutgoingDatagram]) -> Result<(), ErrorCode> {
let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false);
let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS);

while !datagrams.is_empty() {
let permit = self.blocking_check_send(&timeout)?;
Expand All @@ -180,7 +180,7 @@ impl OutgoingDatagramStream {

impl IncomingDatagramStream {
pub fn blocking_receive(&self, count: Range<u64>) -> Result<Vec<IncomingDatagram>, ErrorCode> {
let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false);
let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS);
let pollable = self.subscribe();
let mut datagrams = vec![];

Expand Down
26 changes: 19 additions & 7 deletions crates/wasi-http/wit/deps/clocks/monotonic-clock.wit
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,34 @@
interface monotonic-clock {
use wasi:io/[email protected].{pollable};

/// A timestamp in nanoseconds.
/// An instant in time, in nanoseconds. An instant is relative to an
/// unspecified initial value, and can only be compared to instances from
/// the same monotonic-clock.
type instant = u64;

/// A duration of time, in nanoseconds.
type duration = u64;

/// Read the current value of the clock.
///
/// The clock is monotonic, therefore calling this function repeatedly will
/// produce a sequence of non-decreasing values.
now: func() -> instant;

/// Query the resolution of the clock.
resolution: func() -> instant;
/// Query the resolution of the clock. Returns the duration of time
/// corresponding to a clock tick.
resolution: func() -> duration;

/// Create a `pollable` which will resolve once the specified time has been
/// reached.
subscribe: func(
/// Create a `pollable` which will resolve once the specified instant
/// occured.
subscribe-instant: func(
when: instant,
absolute: bool
) -> pollable;

/// Create a `pollable` which will resolve once the given duration has
/// elapsed, starting at the time at which this function was called.
/// occured.
subscribe-duration: func(
when: duration,
) -> pollable;
}
9 changes: 6 additions & 3 deletions crates/wasi-preview1-component-adapter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1748,12 +1748,15 @@ pub unsafe extern "C" fn poll_oneoff(
clock.timeout
};

monotonic_clock::subscribe(timeout, false)
monotonic_clock::subscribe_duration(timeout)
}

CLOCKID_MONOTONIC => {
let s = monotonic_clock::subscribe(clock.timeout, absolute);
s
if absolute {
monotonic_clock::subscribe_instant(clock.timeout)
} else {
monotonic_clock::subscribe_duration(clock.timeout)
}
}

_ => return Err(ERRNO_INVAL),
Expand Down
52 changes: 36 additions & 16 deletions crates/wasi/src/preview2/host/clocks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(unused_variables)]

use crate::preview2::bindings::{
clocks::monotonic_clock::{self, Instant},
clocks::monotonic_clock::{self, Duration as WasiDuration, Instant},
clocks::timezone::{self, TimezoneDisplay},
clocks::wall_clock::{self, Datetime},
};
Expand Down Expand Up @@ -43,6 +43,23 @@ impl<T: WasiView> wall_clock::Host for T {
}
}

fn subscribe_to_duration(
table: &mut crate::preview2::Table,
duration: tokio::time::Duration,
) -> anyhow::Result<Resource<Pollable>> {
let sleep = if let Some(deadline) = tokio::time::Instant::now().checked_add(duration) {
// NB: this resource created here is not actually exposed to wasm, it's
// only an internal implementation detail used to match the signature
// expected by `subscribe`.
table.push(Deadline::Instant(deadline))?
} else {
// If the user specifies a time so far in the future we can't
// represent it, wait forever rather than trap.
table.push(Deadline::Never)?
};
subscribe(table, sleep)
}

impl<T: WasiView> monotonic_clock::Host for T {
fn now(&mut self) -> anyhow::Result<Instant> {
Ok(self.ctx().monotonic_clock.now())
Expand All @@ -52,30 +69,33 @@ impl<T: WasiView> monotonic_clock::Host for T {
Ok(self.ctx().monotonic_clock.resolution())
}

fn subscribe(&mut self, when: Instant, absolute: bool) -> anyhow::Result<Resource<Pollable>> {
fn subscribe_instant(&mut self, when: Instant) -> anyhow::Result<Resource<Pollable>> {
let clock_now = self.ctx().monotonic_clock.now();
let duration = if absolute {
Duration::from_nanos(when.saturating_sub(clock_now))
let duration = if when > clock_now {
Duration::from_nanos(when - clock_now)
} else {
Duration::from_nanos(when)
Duration::from_nanos(0)
};
let deadline = tokio::time::Instant::now()
.checked_add(duration)
.ok_or_else(|| anyhow::anyhow!("time overflow: duration {duration:?}"))?;
// NB: this resource created here is not actually exposed to wasm, it's
// only an internal implementation detail used to match the signature
// expected by `subscribe`.
let sleep = self.table_mut().push(Sleep(deadline))?;
subscribe(self.table_mut(), sleep)
subscribe_to_duration(&mut self.table_mut(), duration)
}
pchickey marked this conversation as resolved.
Show resolved Hide resolved

fn subscribe_duration(&mut self, duration: WasiDuration) -> anyhow::Result<Resource<Pollable>> {
subscribe_to_duration(&mut self.table_mut(), Duration::from_nanos(duration))
}
}

struct Sleep(tokio::time::Instant);
enum Deadline {
Instant(tokio::time::Instant),
Never,
}

#[async_trait::async_trait]
impl Subscribe for Sleep {
impl Subscribe for Deadline {
async fn ready(&mut self) {
tokio::time::sleep_until(self.0).await;
match self {
Deadline::Instant(instant) => tokio::time::sleep_until(*instant).await,
Deadline::Never => std::future::pending().await,
}
}
}

Expand Down
12 changes: 9 additions & 3 deletions crates/wasi/src/preview2/preview1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2072,9 +2072,15 @@ impl<
}
_ => return Err(types::Errno::Inval.into()),
};
monotonic_clock::Host::subscribe(self, timeout, absolute)
.context("failed to call `monotonic_clock::subscribe`")
.map_err(types::Error::trap)?
if absolute {
monotonic_clock::Host::subscribe_instant(self, timeout)
.context("failed to call `monotonic_clock::subscribe_instant`")
.map_err(types::Error::trap)?
} else {
monotonic_clock::Host::subscribe_duration(self, timeout)
.context("failed to call `monotonic_clock::subscribe_duration`")
.map_err(types::Error::trap)?
}
}
types::SubscriptionU::FdRead(types::SubscriptionFdReadwrite {
file_descriptor,
Expand Down
26 changes: 19 additions & 7 deletions crates/wasi/wit/deps/clocks/monotonic-clock.wit
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,34 @@
interface monotonic-clock {
use wasi:io/[email protected].{pollable};

/// A timestamp in nanoseconds.
/// An instant in time, in nanoseconds. An instant is relative to an
/// unspecified initial value, and can only be compared to instances from
/// the same monotonic-clock.
type instant = u64;

/// A duration of time, in nanoseconds.
type duration = u64;

/// Read the current value of the clock.
///
/// The clock is monotonic, therefore calling this function repeatedly will
/// produce a sequence of non-decreasing values.
now: func() -> instant;

/// Query the resolution of the clock.
resolution: func() -> instant;
/// Query the resolution of the clock. Returns the duration of time
/// corresponding to a clock tick.
resolution: func() -> duration;

/// Create a `pollable` which will resolve once the specified time has been
/// reached.
subscribe: func(
/// Create a `pollable` which will resolve once the specified instant
/// occured.
subscribe-instant: func(
when: instant,
absolute: bool
) -> pollable;

/// Create a `pollable` which will resolve once the given duration has
/// elapsed, starting at the time at which this function was called.
/// occured.
subscribe-duration: func(
when: duration,
) -> pollable;
}