Skip to content

Commit

Permalink
⛽ -> ⏲️ Switch from fuel to epoch interruptions. (#273)
Browse files Browse the repository at this point in the history
* Switch from fuel to epoch interruptions.

Co-authored-by: Jamey Sharp <[email protected]>

* Add explicit synchronization to fix the flaky async_io_methods test

* Make async_io test a bit easier to understand

* Need to also read at least one 8k chunk for the back-pressure to be relieved

* Add a comment giving context to the amount of data read in the async_io fixture

---------

Co-authored-by: Jamey Sharp <[email protected]>
  • Loading branch information
itsrainy and jameysharp authored Jun 2, 2023
1 parent d3e23d4 commit 362bba8
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 94 deletions.
101 changes: 55 additions & 46 deletions cli/tests/integration/async_io.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use crate::common::Test;
use crate::common::TestResult;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use hyper::StatusCode;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::common::{Test, TestResult};
use hyper::{body::HttpBody, Body, Request, Response, StatusCode};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio::sync::Barrier;

// On Windows, streaming body backpressure doesn't seem to work as expected, either
Expand All @@ -22,37 +19,32 @@ async fn async_io_methods() -> TestResult {
let req_count_1 = request_count.clone();
let req_count_2 = request_count.clone();
let req_count_3 = request_count.clone();
let req_count_4 = request_count.clone();

let barrier = Arc::new(Barrier::new(3));
let barrier_1 = barrier.clone();
let barrier_2 = barrier.clone();
let sync_barrier = Arc::new(Barrier::new(2));
let sync_barrier_1 = sync_barrier.clone();

// We set up 4 async backends below, configured to test different
// combinations of async behavior from the guest. The first three backends
// are things we are actually testing, and the fourth ("Semaphore") is just
// used as a synchronization mechanism. Each backend will receive 4 requests
// total and will behave differently depending on which request # it is
// processing.
let test = Test::using_fixture("async_io.wasm")
.async_backend("Simple", "/", None, move |req: Request<Body>| {
assert_eq!(req.headers()["Host"], "simple.org");
let req_count_1 = req_count_1.clone();
let barrier_1 = barrier_1.clone();
Box::new(async move {
match req_count_1.load(Ordering::Relaxed) {
0 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
1 => Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
2 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
3 => {
0 | 2 | 3 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
Expand All @@ -69,21 +61,11 @@ async fn async_io_methods() -> TestResult {
let req_count_2 = req_count_2.clone();
Box::new(async move {
match req_count_2.load(Ordering::Relaxed) {
0 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
1 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
2 => Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
3 => Response::builder()
0 | 1 | 3 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
Expand All @@ -97,36 +79,63 @@ async fn async_io_methods() -> TestResult {
assert_eq!(req.headers()["Host"], "writebody.org");
let req_count_3 = req_count_3.clone();
let barrier_2 = barrier_2.clone();
let sync_barrier = sync_barrier.clone();
Box::new(async move {
match req_count_3.load(Ordering::Relaxed) {
0 => {
barrier_2.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
1 => {
barrier_2.wait().await;
3 => {
// Read at least 4MB and one 8K chunk from the request
// to relieve back-pressure for the guest. These numbers
// come from the amount of data that the guest writes to
// the request body in test-fixtures/src/bin/async_io.rs
let mut bod = req.into_body();
let mut bytes_read = 0;
while bytes_read < (4 * 1024 * 1024) + (8 * 1024) {
if let Some(Ok(bytes)) = bod.data().await {
bytes_read += bytes.len();
}
}

// The guest will have another outstanding request to
// the Semaphore backend below. Awaiting on the barrier
// here will cause that request to return indicating to
// the guest that we have read from the request body
// and the write handle should be ready again.
sync_barrier.wait().await;
let _body = hyper::body::to_bytes(bod);
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
2 => {
0 | 1 | 2 => {
barrier_2.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
_ => unreachable!(),
}
})
})
.await
.async_backend("Semaphore", "/", None, move |req: Request<Body>| {
assert_eq!(req.headers()["Host"], "writebody.org");
let req_count_4 = req_count_4.clone();
let sync_barrier_1 = sync_barrier_1.clone();
Box::new(async move {
match req_count_4.load(Ordering::Relaxed) {
3 => {
let _body = hyper::body::to_bytes(req.into_body()).await;
sync_barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
0 | 1 | 2 => Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
_ => unreachable!(),
}
})
Expand Down
97 changes: 57 additions & 40 deletions lib/src/execute.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! Guest code execution.
use std::net::Ipv4Addr;

use {
crate::{
body::Body,
Expand All @@ -18,11 +16,12 @@ use {
hyper::{Request, Response},
std::{
collections::HashSet,
net::IpAddr,
net::{IpAddr, Ipv4Addr},
path::{Path, PathBuf},
sync::atomic::AtomicU64,
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::Arc,
time::Instant,
thread::{self, JoinHandle},
time::{Duration, Instant},
},
tokio::sync::oneshot::{self, Sender},
tracing::{event, info, info_span, Instrument, Level},
Expand Down Expand Up @@ -61,6 +60,9 @@ pub struct ExecuteCtx {
object_store: Arc<ObjectStores>,
/// The secret stores for this execution.
secret_stores: Arc<SecretStores>,
// `Arc` for the two fields below because this struct must be `Clone`.
epoch_increment_thread: Option<Arc<JoinHandle<()>>>,
epoch_increment_stop: Arc<AtomicBool>,
}

impl ExecuteCtx {
Expand All @@ -77,6 +79,18 @@ impl ExecuteCtx {
let module = Module::from_file(&engine, module_path)?;
let instance_pre = linker.instantiate_pre(&module)?;

// Create the epoch-increment thread.
let epoch_interruption_period = Duration::from_micros(50);
let epoch_increment_stop = Arc::new(AtomicBool::new(false));
let engine_clone = engine.clone();
let epoch_increment_stop_clone = epoch_increment_stop.clone();
let epoch_increment_thread = Some(Arc::new(thread::spawn(move || {
while !epoch_increment_stop_clone.load(Ordering::Relaxed) {
thread::sleep(epoch_interruption_period);
engine_clone.increment_epoch();
}
})));

Ok(Self {
engine,
instance_pre: Arc::new(instance_pre),
Expand All @@ -90,6 +104,8 @@ impl ExecuteCtx {
next_req_id: Arc::new(AtomicU64::new(0)),
object_store: Arc::new(ObjectStores::new()),
secret_stores: Arc::new(SecretStores::new()),
epoch_increment_thread,
epoch_increment_stop,
})
}

Expand All @@ -104,11 +120,9 @@ impl ExecuteCtx {
}

/// Set the backends for this execution context.
pub fn with_backends(self, backends: Backends) -> Self {
Self {
backends: Arc::new(backends),
..self
}
pub fn with_backends(mut self, backends: Backends) -> Self {
self.backends = Arc::new(backends);
self
}

/// Get the geolocation mappings for this execution context.
Expand All @@ -117,11 +131,9 @@ impl ExecuteCtx {
}

/// Set the geolocation mappings for this execution context.
pub fn with_geolocation(self, geolocation: Geolocation) -> Self {
Self {
geolocation: Arc::new(geolocation),
..self
}
pub fn with_geolocation(mut self, geolocation: Geolocation) -> Self {
self.geolocation = Arc::new(geolocation);
self
}

/// Get the dictionaries for this execution context.
Expand All @@ -130,35 +142,27 @@ impl ExecuteCtx {
}

/// Set the dictionaries for this execution context.
pub fn with_dictionaries(self, dictionaries: Dictionaries) -> Self {
Self {
dictionaries: Arc::new(dictionaries),
..self
}
pub fn with_dictionaries(mut self, dictionaries: Dictionaries) -> Self {
self.dictionaries = Arc::new(dictionaries);
self
}

/// Set the object store for this execution context.
pub fn with_object_stores(self, object_store: ObjectStores) -> Self {
Self {
object_store: Arc::new(object_store),
..self
}
pub fn with_object_stores(mut self, object_store: ObjectStores) -> Self {
self.object_store = Arc::new(object_store);
self
}

/// Set the secret stores for this execution context.
pub fn with_secret_stores(self, secret_stores: SecretStores) -> Self {
Self {
secret_stores: Arc::new(secret_stores),
..self
}
pub fn with_secret_stores(mut self, secret_stores: SecretStores) -> Self {
self.secret_stores = Arc::new(secret_stores);
self
}

/// Set the path to the config for this execution context.
pub fn with_config_path(self, config_path: PathBuf) -> Self {
Self {
config_path: Arc::new(Some(config_path)),
..self
}
pub fn with_config_path(mut self, config_path: PathBuf) -> Self {
self.config_path = Arc::new(Some(config_path));
self
}

/// Whether to treat stdout as a logging endpoint.
Expand All @@ -167,8 +171,9 @@ impl ExecuteCtx {
}

/// Set the stdout logging policy for this execution context.
pub fn with_log_stdout(self, log_stdout: bool) -> Self {
Self { log_stdout, ..self }
pub fn with_log_stdout(mut self, log_stdout: bool) -> Self {
self.log_stdout = log_stdout;
self
}

/// Whether to treat stderr as a logging endpoint.
Expand All @@ -177,8 +182,9 @@ impl ExecuteCtx {
}

/// Set the stderr logging policy for this execution context.
pub fn with_log_stderr(self, log_stderr: bool) -> Self {
Self { log_stderr, ..self }
pub fn with_log_stderr(mut self, log_stderr: bool) -> Self {
self.log_stderr = log_stderr;
self
}

/// Gets the TLS configuration
Expand Down Expand Up @@ -410,6 +416,17 @@ impl ExecuteCtx {
}
}

impl Drop for ExecuteCtx {
fn drop(&mut self) {
if let Some(arc) = self.epoch_increment_thread.take() {
if let Ok(join_handle) = Arc::try_unwrap(arc) {
self.epoch_increment_stop.store(true, Ordering::Relaxed);
join_handle.join().unwrap();
}
}
}
}

fn configure_wasmtime(profiling_strategy: ProfilingStrategy) -> wasmtime::Config {
use wasmtime::{
Config, InstanceAllocationStrategy, PoolingAllocationConfig, WasmBacktraceDetails,
Expand All @@ -419,7 +436,7 @@ fn configure_wasmtime(profiling_strategy: ProfilingStrategy) -> wasmtime::Config
config.debug_info(false); // Keep this disabled - wasmtime will hang if enabled
config.wasm_backtrace_details(WasmBacktraceDetails::Enable);
config.async_support(true);
config.consume_fuel(true);
config.epoch_interruption(true);
config.profiler(profiling_strategy);

const MB: usize = 1 << 20;
Expand Down
3 changes: 2 additions & 1 deletion lib/src/linking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ pub(crate) fn create_store(
session,
};
let mut store = Store::new(ctx.engine(), wasm_ctx);
store.out_of_fuel_async_yield(u64::MAX, 10000);
store.set_epoch_deadline(1);
store.epoch_deadline_async_yield_and_update(1);
Ok(store)
}

Expand Down
Loading

0 comments on commit 362bba8

Please sign in to comment.