Skip to content

Commit

Permalink
Switch from fuel to epoch interruptions.
Browse files Browse the repository at this point in the history
Co-authored-by: Jamey Sharp <[email protected]>
  • Loading branch information
itsrainy and jameysharp committed May 31, 2023
1 parent d3e23d4 commit f98db2b
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 41 deletions.
97 changes: 57 additions & 40 deletions lib/src/execute.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! Guest code execution.
use std::net::Ipv4Addr;

use {
crate::{
body::Body,
Expand All @@ -18,11 +16,12 @@ use {
hyper::{Request, Response},
std::{
collections::HashSet,
net::IpAddr,
net::{IpAddr, Ipv4Addr},
path::{Path, PathBuf},
sync::atomic::AtomicU64,
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::Arc,
time::Instant,
thread::{self, JoinHandle},
time::{Duration, Instant},
},
tokio::sync::oneshot::{self, Sender},
tracing::{event, info, info_span, Instrument, Level},
Expand Down Expand Up @@ -61,6 +60,9 @@ pub struct ExecuteCtx {
object_store: Arc<ObjectStores>,
/// The secret stores for this execution.
secret_stores: Arc<SecretStores>,
// `Arc` for the two fields below because this struct must be `Clone`.
epoch_increment_thread: Option<Arc<JoinHandle<()>>>,
epoch_increment_stop: Arc<AtomicBool>,
}

impl ExecuteCtx {
Expand All @@ -77,6 +79,18 @@ impl ExecuteCtx {
let module = Module::from_file(&engine, module_path)?;
let instance_pre = linker.instantiate_pre(&module)?;

// Create the epoch-increment thread.
let epoch_interruption_period = Duration::from_micros(100);
let epoch_increment_stop = Arc::new(AtomicBool::new(false));
let engine_clone = engine.clone();
let epoch_increment_stop_clone = epoch_increment_stop.clone();
let epoch_increment_thread = Some(Arc::new(thread::spawn(move || {
while !epoch_increment_stop_clone.load(Ordering::Relaxed) {
thread::sleep(epoch_interruption_period);
engine_clone.increment_epoch();
}
})));

Ok(Self {
engine,
instance_pre: Arc::new(instance_pre),
Expand All @@ -90,6 +104,8 @@ impl ExecuteCtx {
next_req_id: Arc::new(AtomicU64::new(0)),
object_store: Arc::new(ObjectStores::new()),
secret_stores: Arc::new(SecretStores::new()),
epoch_increment_thread,
epoch_increment_stop,
})
}

Expand All @@ -104,11 +120,9 @@ impl ExecuteCtx {
}

/// Set the backends for this execution context.
pub fn with_backends(self, backends: Backends) -> Self {
Self {
backends: Arc::new(backends),
..self
}
pub fn with_backends(mut self, backends: Backends) -> Self {
self.backends = Arc::new(backends);
self
}

/// Get the geolocation mappings for this execution context.
Expand All @@ -117,11 +131,9 @@ impl ExecuteCtx {
}

/// Set the geolocation mappings for this execution context.
pub fn with_geolocation(self, geolocation: Geolocation) -> Self {
Self {
geolocation: Arc::new(geolocation),
..self
}
pub fn with_geolocation(mut self, geolocation: Geolocation) -> Self {
self.geolocation = Arc::new(geolocation);
self
}

/// Get the dictionaries for this execution context.
Expand All @@ -130,35 +142,27 @@ impl ExecuteCtx {
}

/// Set the dictionaries for this execution context.
pub fn with_dictionaries(self, dictionaries: Dictionaries) -> Self {
Self {
dictionaries: Arc::new(dictionaries),
..self
}
pub fn with_dictionaries(mut self, dictionaries: Dictionaries) -> Self {
self.dictionaries = Arc::new(dictionaries);
self
}

/// Set the object store for this execution context.
pub fn with_object_stores(self, object_store: ObjectStores) -> Self {
Self {
object_store: Arc::new(object_store),
..self
}
pub fn with_object_stores(mut self, object_store: ObjectStores) -> Self {
self.object_store = Arc::new(object_store);
self
}

/// Set the secret stores for this execution context.
pub fn with_secret_stores(self, secret_stores: SecretStores) -> Self {
Self {
secret_stores: Arc::new(secret_stores),
..self
}
pub fn with_secret_stores(mut self, secret_stores: SecretStores) -> Self {
self.secret_stores = Arc::new(secret_stores);
self
}

/// Set the path to the config for this execution context.
pub fn with_config_path(self, config_path: PathBuf) -> Self {
Self {
config_path: Arc::new(Some(config_path)),
..self
}
pub fn with_config_path(mut self, config_path: PathBuf) -> Self {
self.config_path = Arc::new(Some(config_path));
self
}

/// Whether to treat stdout as a logging endpoint.
Expand All @@ -167,8 +171,9 @@ impl ExecuteCtx {
}

/// Set the stdout logging policy for this execution context.
pub fn with_log_stdout(self, log_stdout: bool) -> Self {
Self { log_stdout, ..self }
pub fn with_log_stdout(mut self, log_stdout: bool) -> Self {
self.log_stdout = log_stdout;
self
}

/// Whether to treat stderr as a logging endpoint.
Expand All @@ -177,8 +182,9 @@ impl ExecuteCtx {
}

/// Set the stderr logging policy for this execution context.
pub fn with_log_stderr(self, log_stderr: bool) -> Self {
Self { log_stderr, ..self }
pub fn with_log_stderr(mut self, log_stderr: bool) -> Self {
self.log_stderr = log_stderr;
self
}

/// Gets the TLS configuration
Expand Down Expand Up @@ -410,6 +416,17 @@ impl ExecuteCtx {
}
}

impl Drop for ExecuteCtx {
fn drop(&mut self) {
if let Some(arc) = self.epoch_increment_thread.take() {
if let Ok(join_handle) = Arc::try_unwrap(arc) {
self.epoch_increment_stop.store(true, Ordering::Relaxed);
join_handle.join().unwrap();
}
}
}
}

fn configure_wasmtime(profiling_strategy: ProfilingStrategy) -> wasmtime::Config {
use wasmtime::{
Config, InstanceAllocationStrategy, PoolingAllocationConfig, WasmBacktraceDetails,
Expand All @@ -419,7 +436,7 @@ fn configure_wasmtime(profiling_strategy: ProfilingStrategy) -> wasmtime::Config
config.debug_info(false); // Keep this disabled - wasmtime will hang if enabled
config.wasm_backtrace_details(WasmBacktraceDetails::Enable);
config.async_support(true);
config.consume_fuel(true);
config.epoch_interruption(true);
config.profiler(profiling_strategy);

const MB: usize = 1 << 20;
Expand Down
3 changes: 2 additions & 1 deletion lib/src/linking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ pub(crate) fn create_store(
session,
};
let mut store = Store::new(ctx.engine(), wasm_ctx);
store.out_of_fuel_async_yield(u64::MAX, 10000);
store.set_epoch_deadline(1);
store.epoch_deadline_async_yield_and_update(1);
Ok(store)
}

Expand Down

0 comments on commit f98db2b

Please sign in to comment.