Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⛽ -> ⏲️ Switch from fuel to epoch interruptions. #273

Merged
merged 5 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 55 additions & 46 deletions cli/tests/integration/async_io.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use crate::common::Test;
use crate::common::TestResult;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use hyper::StatusCode;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::common::{Test, TestResult};
use hyper::{body::HttpBody, Body, Request, Response, StatusCode};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio::sync::Barrier;

// On Windows, streaming body backpressure doesn't seem to work as expected, either
Expand All @@ -22,37 +19,32 @@ async fn async_io_methods() -> TestResult {
let req_count_1 = request_count.clone();
let req_count_2 = request_count.clone();
let req_count_3 = request_count.clone();
let req_count_4 = request_count.clone();

let barrier = Arc::new(Barrier::new(3));
let barrier_1 = barrier.clone();
let barrier_2 = barrier.clone();
let sync_barrier = Arc::new(Barrier::new(2));
let sync_barrier_1 = sync_barrier.clone();

// We set up 4 async backends below, configured to test different
// combinations of async behavior from the guest. The first three backends
// are things we are actually testing, and the fourth ("Semaphore") is just
// used as a synchronization mechanism. Each backend will receive 4 requests
// total and will behave differently depending on which request # it is
// processing.
let test = Test::using_fixture("async_io.wasm")
.async_backend("Simple", "/", None, move |req: Request<Body>| {
assert_eq!(req.headers()["Host"], "simple.org");
let req_count_1 = req_count_1.clone();
let barrier_1 = barrier_1.clone();
Box::new(async move {
match req_count_1.load(Ordering::Relaxed) {
0 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
1 => Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
2 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
3 => {
0 | 2 | 3 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
Expand All @@ -69,21 +61,11 @@ async fn async_io_methods() -> TestResult {
let req_count_2 = req_count_2.clone();
Box::new(async move {
match req_count_2.load(Ordering::Relaxed) {
0 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
1 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
2 => Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
3 => Response::builder()
0 | 1 | 3 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
Expand All @@ -97,36 +79,63 @@ async fn async_io_methods() -> TestResult {
assert_eq!(req.headers()["Host"], "writebody.org");
let req_count_3 = req_count_3.clone();
let barrier_2 = barrier_2.clone();
let sync_barrier = sync_barrier.clone();
Box::new(async move {
match req_count_3.load(Ordering::Relaxed) {
0 => {
barrier_2.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
1 => {
barrier_2.wait().await;
3 => {
// Read at least 4MB and one 8K chunk from the request
// to relieve back-pressure for the guest. These numbers
// come from the amount of data that the guest writes to
// the request body in test-fixtures/src/bin/async_io.rs
let mut bod = req.into_body();
let mut bytes_read = 0;
while bytes_read < (4 * 1024 * 1024) + (8 * 1024) {
if let Some(Ok(bytes)) = bod.data().await {
bytes_read += bytes.len();
}
}

// The guest will have another outstanding request to
// the Semaphore backend below. Awaiting on the barrier
// here will cause that request to return indicating to
// the guest that we have read from the request body
// and the write handle should be ready again.
sync_barrier.wait().await;
let _body = hyper::body::to_bytes(bod);
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
2 => {
0 | 1 | 2 => {
barrier_2.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
_ => unreachable!(),
}
})
})
.await
.async_backend("Semaphore", "/", None, move |req: Request<Body>| {
assert_eq!(req.headers()["Host"], "writebody.org");
let req_count_4 = req_count_4.clone();
let sync_barrier_1 = sync_barrier_1.clone();
Box::new(async move {
match req_count_4.load(Ordering::Relaxed) {
3 => {
let _body = hyper::body::to_bytes(req.into_body()).await;
sync_barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
0 | 1 | 2 => Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
_ => unreachable!(),
}
})
Expand Down
97 changes: 57 additions & 40 deletions lib/src/execute.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! Guest code execution.

use std::net::Ipv4Addr;

use {
crate::{
body::Body,
Expand All @@ -18,11 +16,12 @@ use {
hyper::{Request, Response},
std::{
collections::HashSet,
net::IpAddr,
net::{IpAddr, Ipv4Addr},
path::{Path, PathBuf},
sync::atomic::AtomicU64,
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::Arc,
time::Instant,
thread::{self, JoinHandle},
time::{Duration, Instant},
},
tokio::sync::oneshot::{self, Sender},
tracing::{event, info, info_span, Instrument, Level},
Expand Down Expand Up @@ -61,6 +60,9 @@ pub struct ExecuteCtx {
object_store: Arc<ObjectStores>,
/// The secret stores for this execution.
secret_stores: Arc<SecretStores>,
// `Arc` for the two fields below because this struct must be `Clone`.
epoch_increment_thread: Option<Arc<JoinHandle<()>>>,
epoch_increment_stop: Arc<AtomicBool>,
}

impl ExecuteCtx {
Expand All @@ -77,6 +79,18 @@ impl ExecuteCtx {
let module = Module::from_file(&engine, module_path)?;
let instance_pre = linker.instantiate_pre(&module)?;

// Create the epoch-increment thread.
let epoch_interruption_period = Duration::from_micros(50);
let epoch_increment_stop = Arc::new(AtomicBool::new(false));
let engine_clone = engine.clone();
let epoch_increment_stop_clone = epoch_increment_stop.clone();
let epoch_increment_thread = Some(Arc::new(thread::spawn(move || {
while !epoch_increment_stop_clone.load(Ordering::Relaxed) {
thread::sleep(epoch_interruption_period);
engine_clone.increment_epoch();
}
})));

Ok(Self {
engine,
instance_pre: Arc::new(instance_pre),
Expand All @@ -90,6 +104,8 @@ impl ExecuteCtx {
next_req_id: Arc::new(AtomicU64::new(0)),
object_store: Arc::new(ObjectStores::new()),
secret_stores: Arc::new(SecretStores::new()),
epoch_increment_thread,
epoch_increment_stop,
})
}

Expand All @@ -104,11 +120,9 @@ impl ExecuteCtx {
}

/// Set the backends for this execution context.
pub fn with_backends(self, backends: Backends) -> Self {
Self {
backends: Arc::new(backends),
..self
}
pub fn with_backends(mut self, backends: Backends) -> Self {
self.backends = Arc::new(backends);
self
}

/// Get the geolocation mappings for this execution context.
Expand All @@ -117,11 +131,9 @@ impl ExecuteCtx {
}

/// Set the geolocation mappings for this execution context.
pub fn with_geolocation(self, geolocation: Geolocation) -> Self {
Self {
geolocation: Arc::new(geolocation),
..self
}
pub fn with_geolocation(mut self, geolocation: Geolocation) -> Self {
self.geolocation = Arc::new(geolocation);
self
}

/// Get the dictionaries for this execution context.
Expand All @@ -130,35 +142,27 @@ impl ExecuteCtx {
}

/// Set the dictionaries for this execution context.
pub fn with_dictionaries(self, dictionaries: Dictionaries) -> Self {
Self {
dictionaries: Arc::new(dictionaries),
..self
}
pub fn with_dictionaries(mut self, dictionaries: Dictionaries) -> Self {
self.dictionaries = Arc::new(dictionaries);
self
}

/// Set the object store for this execution context.
pub fn with_object_stores(self, object_store: ObjectStores) -> Self {
Self {
object_store: Arc::new(object_store),
..self
}
pub fn with_object_stores(mut self, object_store: ObjectStores) -> Self {
self.object_store = Arc::new(object_store);
self
}

/// Set the secret stores for this execution context.
pub fn with_secret_stores(self, secret_stores: SecretStores) -> Self {
Self {
secret_stores: Arc::new(secret_stores),
..self
}
pub fn with_secret_stores(mut self, secret_stores: SecretStores) -> Self {
self.secret_stores = Arc::new(secret_stores);
self
}

/// Set the path to the config for this execution context.
pub fn with_config_path(self, config_path: PathBuf) -> Self {
Self {
config_path: Arc::new(Some(config_path)),
..self
}
pub fn with_config_path(mut self, config_path: PathBuf) -> Self {
self.config_path = Arc::new(Some(config_path));
self
}

/// Whether to treat stdout as a logging endpoint.
Expand All @@ -167,8 +171,9 @@ impl ExecuteCtx {
}

/// Set the stdout logging policy for this execution context.
pub fn with_log_stdout(self, log_stdout: bool) -> Self {
Self { log_stdout, ..self }
pub fn with_log_stdout(mut self, log_stdout: bool) -> Self {
self.log_stdout = log_stdout;
self
}

/// Whether to treat stderr as a logging endpoint.
Expand All @@ -177,8 +182,9 @@ impl ExecuteCtx {
}

/// Set the stderr logging policy for this execution context.
pub fn with_log_stderr(self, log_stderr: bool) -> Self {
Self { log_stderr, ..self }
pub fn with_log_stderr(mut self, log_stderr: bool) -> Self {
self.log_stderr = log_stderr;
self
}

/// Gets the TLS configuration
Expand Down Expand Up @@ -410,6 +416,17 @@ impl ExecuteCtx {
}
}

impl Drop for ExecuteCtx {
fn drop(&mut self) {
if let Some(arc) = self.epoch_increment_thread.take() {
if let Ok(join_handle) = Arc::try_unwrap(arc) {
self.epoch_increment_stop.store(true, Ordering::Relaxed);
join_handle.join().unwrap();
}
}
}
}

fn configure_wasmtime(profiling_strategy: ProfilingStrategy) -> wasmtime::Config {
use wasmtime::{
Config, InstanceAllocationStrategy, PoolingAllocationConfig, WasmBacktraceDetails,
Expand All @@ -419,7 +436,7 @@ fn configure_wasmtime(profiling_strategy: ProfilingStrategy) -> wasmtime::Config
config.debug_info(false); // Keep this disabled - wasmtime will hang if enabled
config.wasm_backtrace_details(WasmBacktraceDetails::Enable);
config.async_support(true);
config.consume_fuel(true);
config.epoch_interruption(true);
config.profiler(profiling_strategy);

const MB: usize = 1 << 20;
Expand Down
3 changes: 2 additions & 1 deletion lib/src/linking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ pub(crate) fn create_store(
session,
};
let mut store = Store::new(ctx.engine(), wasm_ctx);
store.out_of_fuel_async_yield(u64::MAX, 10000);
store.set_epoch_deadline(1);
store.epoch_deadline_async_yield_and_update(1);
Ok(store)
}

Expand Down
Loading