From 75cf9a302967d883b2276ad1156680cd4f3f3fb5 Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Mon, 13 Nov 2023 22:38:36 -0500 Subject: [PATCH 1/3] fix: make configuration run without -c (all defaults) Includes: - also, add starter fns for default .pem/settings initialization --- homestar-runtime/src/cli.rs | 4 +- homestar-runtime/src/settings.rs | 110 +++++++++++++++++++++++-------- 2 files changed, 85 insertions(+), 29 deletions(-) diff --git a/homestar-runtime/src/cli.rs b/homestar-runtime/src/cli.rs index 9032b374..30353fa9 100644 --- a/homestar-runtime/src/cli.rs +++ b/homestar-runtime/src/cli.rs @@ -19,7 +19,6 @@ pub use error::Error; pub(crate) mod show; pub(crate) use show::ConsoleTable; -const DEFAULT_SETTINGS_FILE: &str = "./config/settings.toml"; const DEFAULT_DB_PATH: &str = "homestar.db"; const TMP_DIR: &str = "/tmp"; const HELP_TEMPLATE: &str = "{name} {version} @@ -87,13 +86,12 @@ pub enum Command { help = "Database path (SQLite) [optional]" )] database_url: Option, - /// Runtime configuration file (.toml), defaults to ./config/settings.toml. + /// Runtime configuration file (.toml). #[arg( short = 'c', long = "config", value_hint = clap::ValueHint::FilePath, value_name = "CONFIG", - default_value = DEFAULT_SETTINGS_FILE, help = "Runtime configuration file (.toml) [optional]" )] runtime_config: Option, diff --git a/homestar-runtime/src/settings.rs b/homestar-runtime/src/settings.rs index 483f3fe6..2111a585 100644 --- a/homestar-runtime/src/settings.rs +++ b/homestar-runtime/src/settings.rs @@ -7,6 +7,7 @@ use serde_with::{serde_as, DisplayFromStr, DurationMilliSeconds, DurationSeconds #[cfg(feature = "ipfs")] use std::net::Ipv4Addr; use std::{ + env, net::{IpAddr, Ipv6Addr}, path::PathBuf, time::Duration, @@ -15,11 +16,17 @@ use std::{ mod pubkey_config; pub(crate) use pubkey_config::PubkeyConfig; +#[cfg(target_os = "windows")] +const HOME_VAR: &str = "USERPROFILE"; +#[cfg(not(target_os = "windows"))] +const HOME_VAR: &str = "HOME"; + /// Application settings. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct Settings { #[serde(default)] pub(crate) monitoring: Monitoring, + #[serde(default)] pub(crate) node: Node, } @@ -38,6 +45,7 @@ impl Settings { /// Monitoring settings. #[serde_as] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[serde(default)] pub struct Monitoring { /// Tokio console port. pub console_subscriber_port: u16, @@ -49,7 +57,8 @@ pub struct Monitoring { /// Server settings. #[serde_as] -#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[serde(default)] pub struct Node { /// Network settings. #[serde(default)] @@ -59,11 +68,9 @@ pub struct Node { pub(crate) db: Database, /// Garbage collection interval. #[serde_as(as = "DurationSeconds")] - #[serde(default = "default_gc_interval")] pub(crate) gc_interval: Duration, /// Shutdown timeout. #[serde_as(as = "DurationSeconds")] - #[serde(default = "default_shutdown_timeout")] pub(crate) shutdown_timeout: Duration, } @@ -246,6 +253,17 @@ impl Default for Database { } } +impl Default for Node { + fn default() -> Self { + Self { + gc_interval: Duration::from_secs(1800), + shutdown_timeout: Duration::from_secs(20), + network: Default::default(), + db: Default::default(), + } + } +} + impl Default for Network { fn default() -> Self { Self { @@ -314,14 +332,6 @@ impl Network { } } -fn default_shutdown_timeout() -> Duration { - Duration::new(20, 0) -} - -fn default_gc_interval() -> Duration { - Duration::new(1800, 0) -} - impl Settings { /// Load settings. /// @@ -331,34 +341,52 @@ impl Settings { /// Use two underscores as defined by the separator below pub fn load() -> Result { #[cfg(test)] - let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("config/settings.toml"); + { + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("config/settings.toml"); + Self::build(Some(path)) + } #[cfg(not(test))] - let path = PathBuf::from("./config/settings.toml"); - - Self::build(path) + Self::build(None) } /// Load settings from file string that must conform to a [PathBuf]. pub fn load_from_file(file: PathBuf) -> Result { - Self::build(file) + Self::build(Some(file)) } - fn build(path: PathBuf) -> Result { - let s = Config::builder() - .add_source(File::with_name( - &path - .canonicalize() + fn build(path: Option) -> Result { + let builder = if let Some(p) = path { + Config::builder().add_source(File::with_name( + &p.canonicalize() .map_err(|e| ConfigError::NotFound(e.to_string()))? .as_path() .display() .to_string(), )) + } else { + Config::builder() + }; + + let s = builder .add_source(Environment::with_prefix("HOMESTAR").separator("__")) .build()?; s.try_deserialize() } } +#[allow(dead_code)] +fn config_dir() -> PathBuf { + let config_dir = + env::var("XDG_CONFIG_HOME").map_or_else(|_| home_dir().join(".config"), PathBuf::from); + config_dir.join("homestar") +} + +#[allow(dead_code)] +fn home_dir() -> PathBuf { + let home = env::var(HOME_VAR).unwrap_or_else(|_| panic!("{} not found", HOME_VAR)); + PathBuf::from(home) +} + #[cfg(test)] mod test { use super::*; @@ -380,7 +408,7 @@ mod test { #[test] fn defaults_with_modification() { - let settings = Settings::build("fixtures/settings.toml".into()).unwrap(); + let settings = Settings::build(Some("fixtures/settings.toml".into())).unwrap(); let mut default_modded_settings = Node::default(); default_modded_settings.network.events_buffer_len = 1000; @@ -396,14 +424,14 @@ mod test { fn overriding_env() { std::env::set_var("HOMESTAR__NODE__NETWORK__RPC_PORT", "2046"); std::env::set_var("HOMESTAR__NODE__DB__MAX_POOL_SIZE", "1"); - let settings = Settings::build("fixtures/settings.toml".into()).unwrap(); + let settings = Settings::build(Some("fixtures/settings.toml".into())).unwrap(); assert_eq!(settings.node.network.rpc_port, 2046); assert_eq!(settings.node.db.max_pool_size, 1); } #[test] fn import_existing_key() { - let settings = Settings::build("fixtures/settings-import-ed25519.toml".into()) + let settings = Settings::build(Some("fixtures/settings-import-ed25519.toml".into())) .expect("setting file in test fixtures"); let msg = b"foo bar"; @@ -424,7 +452,7 @@ mod test { #[test] fn import_secp256k1_key() { - let settings = Settings::build("fixtures/settings-import-secp256k1.toml".into()) + let settings = Settings::build(Some("fixtures/settings-import-secp256k1.toml".into())) .expect("setting file in test fixtures"); settings @@ -437,7 +465,7 @@ mod test { #[test] fn seeded_secp256k1_key() { - let settings = Settings::build("fixtures/settings-random-secp256k1.toml".into()) + let settings = Settings::build(Some("fixtures/settings-random-secp256k1.toml".into())) .expect("setting file in test fixtures"); settings @@ -447,4 +475,34 @@ mod test { .keypair() .expect("generate a seeded secp256k1 key"); } + + #[test] + fn test_config_dir_xdg() { + env::remove_var("HOME"); + env::set_var("XDG_CONFIG_HOME", "/home/user/custom_config"); + assert_eq!( + config_dir(), + PathBuf::from("/home/user/custom_config/homestar") + ); + env::remove_var("XDG_CONFIG_HOME"); + } + + #[cfg(not(target_os = "windows"))] + #[test] + fn test_config_dir() { + env::set_var("HOME", "/home/user"); + env::remove_var("XDG_CONFIG_HOME"); + assert_eq!(config_dir(), PathBuf::from("/home/user/.config/homestar")); + env::remove_var("HOME"); + } + + #[cfg(target_os = "windows")] + #[test] + fn test_config_dir() { + env::remove_var("XDG_CONFIG_HOME"); + assert_eq!( + config_dir(), + PathBuf::from(format!(r"{}\.config\homestar", env!("USERPROFILE"))) + ); + } } From 433b46a15ed1483c60dd1489d2fe039bd5934ccc Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Tue, 14 Nov 2023 12:09:07 -0500 Subject: [PATCH 2/3] chore: error handling around retry, wasms --- Cross.toml | 10 ++++- homestar-runtime/src/scheduler.rs | 1 + homestar-runtime/src/settings.rs | 2 +- homestar-runtime/src/tasks/fetch.rs | 51 ++++++++++++++++------- homestar-runtime/src/worker.rs | 21 +++++++--- homestar-runtime/src/workflow/settings.rs | 2 +- homestar-wasm/src/wasmtime/error.rs | 2 +- homestar-wasm/src/wasmtime/world.rs | 2 +- 8 files changed, 66 insertions(+), 25 deletions(-) diff --git a/Cross.toml b/Cross.toml index 688d1ac7..ff92d71f 100644 --- a/Cross.toml +++ b/Cross.toml @@ -6,11 +6,19 @@ passthrough = [ "RUSTFLAGS", ] +# When running `cross` with nix, do this within `nix-shell -p gcc rustup`. +# +# Then, run +# +# `cross build -p homestar-runtime --target x86_64-unknown-linux-musl` +# or +# `cross build -p homestar-runtime --target aarch64-unknown-linux-musl` + [target.x86_64-unknown-linux-musl] image = "burntsushi/cross:x86_64-unknown-linux-musl" [target.aarch64-unknown-linux-musl] -image = "burntsushi/cross:aarch64-unknown-linux-gnu" +image = "burntsushi/cross:aarch64-unknown-linux-musl" [target.x86_64-apple-darwin] image = "freeznet/x86_64-apple-darwin-cross:11.3" diff --git a/homestar-runtime/src/scheduler.rs b/homestar-runtime/src/scheduler.rs index df3f48bf..0dcc2f1e 100644 --- a/homestar-runtime/src/scheduler.rs +++ b/homestar-runtime/src/scheduler.rs @@ -171,6 +171,7 @@ impl<'a> TaskScheduler<'a> { .into_iter() .map(|(_, rsc)| rsc.to_owned()) .collect(); + let fetched = fetch_fn(resources_to_fetch).await?; match resume { diff --git a/homestar-runtime/src/settings.rs b/homestar-runtime/src/settings.rs index 2111a585..d29f8704 100644 --- a/homestar-runtime/src/settings.rs +++ b/homestar-runtime/src/settings.rs @@ -294,7 +294,7 @@ impl Default for Network { rpc_max_connections: 10, rpc_port: 3030, rpc_server_timeout: Duration::new(120, 0), - transport_connection_timeout: Duration::new(20, 0), + transport_connection_timeout: Duration::new(60, 0), webserver_host: Uri::from_static("127.0.0.1"), webserver_port: 1337, webserver_timeout: Duration::new(120, 0), diff --git a/homestar-runtime/src/tasks/fetch.rs b/homestar-runtime/src/tasks/fetch.rs index bf99f0be..79319e99 100644 --- a/homestar-runtime/src/tasks/fetch.rs +++ b/homestar-runtime/src/tasks/fetch.rs @@ -10,6 +10,7 @@ use anyhow::Result; use fnv::FnvHashSet; use indexmap::IndexMap; use std::sync::Arc; +use tracing::{info, warn}; pub(crate) struct Fetch; @@ -29,27 +30,49 @@ impl Fetch { ) -> Result>> { use futures::{stream::FuturesUnordered, TryStreamExt}; let settings = settings.as_ref(); + let retries = settings.retries; let tasks = FuturesUnordered::new(); for rsc in resources.iter() { - tracing::info!(rsc = rsc.to_string(), "Fetching resource"); - let task = tryhard::retry_fn(|| async { Self::fetch(rsc.clone(), ipfs.clone()).await }) - .with_config( - tryhard::RetryFutureConfig::new(settings.retries) - .exponential_backoff(settings.retry_initial_delay) - .max_delay(settings.retry_max_delay), + let task = tryhard::retry_fn(|| async { + info!( + rsc = rsc.to_string(), + "attempting to fetch resource from IPFS" ); + Self::fetch(rsc.clone(), ipfs.clone()).await + }) + .retries(retries) + .exponential_backoff(settings.retry_initial_delay) + .max_delay(settings.retry_max_delay) + .on_retry(|attempts, next_delay, error| { + let err = error.to_string(); + async move { + if attempts < retries { + warn!( + err = err, + attempts = attempts, + "retrying fetch after error @ {}ms", + next_delay.map(|d| d.as_millis()).unwrap_or(0) + ); + } else { + warn!(err = err, attempts = attempts, "maxed out # of retries"); + } + } + }); tasks.push(task); } - tasks.try_collect::>().await?.into_iter().try_fold( - IndexMap::default(), - |mut acc, res| { - let answer = res.1?; - acc.insert(res.0, answer); + info!("fetching necessary resources from IPFS"); + if let Ok(vec) = tasks.try_collect::>().await { + vec.into_iter() + .try_fold(IndexMap::default(), |mut acc, res| { + let answer = res.1?; + acc.insert(res.0, answer); - Ok::<_, anyhow::Error>(acc) - }, - ) + Ok::<_, anyhow::Error>(acc) + }) + } else { + Err(anyhow::anyhow!("Failed to fetch resources from IPFS")) + } } /// Gather resources via URLs, leveraging an exponential backoff. diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index 0f4b26e7..15a73e1a 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -147,14 +147,19 @@ where where F: FnOnce(FnvHashSet) -> BoxFuture<'a, Result>>>, { - let scheduler_ctx = TaskScheduler::init( + match TaskScheduler::init( self.graph.clone(), // Arc'ed &mut self.db.conn()?, fetch_fn, ) - .await?; - - self.run_queue(scheduler_ctx.scheduler, running_tasks).await + .await + { + Ok(ctx) => self.run_queue(ctx.scheduler, running_tasks).await, + Err(err) => { + error!(err=?err, "error initializing scheduler"); + Err(anyhow!("error initializing scheduler")) + } + } } #[allow(unused_mut)] @@ -185,7 +190,7 @@ where info!( workflow_cid = workflow_cid.to_string(), cid = cid.to_string(), - "resolving cid" + "attempting to resolve cid in workflow" ); if let Some(result) = linkmap.read().await.get(&cid) { @@ -369,7 +374,11 @@ where receipt_meta, additional_meta, )), - Err(e) => Err(anyhow!("cannot execute wasm module: {e}")), + Err(err) => Err( + anyhow!("cannot execute wasm module: {err}")) + .with_context(|| { + format!("not able to run fn {fun} for promised cid: {instruction_ptr}, in workflow {workflow_cid}") + }), } }); handles.push(handle); diff --git a/homestar-runtime/src/workflow/settings.rs b/homestar-runtime/src/workflow/settings.rs index cba18188..b2dbd834 100644 --- a/homestar-runtime/src/workflow/settings.rs +++ b/homestar-runtime/src/workflow/settings.rs @@ -18,7 +18,7 @@ pub struct Settings { impl Default for Settings { fn default() -> Self { Self { - retries: 10, + retries: 3, retry_max_delay: Duration::new(60, 0), retry_initial_delay: Duration::from_millis(500), p2p_timeout: Duration::new(5, 0), diff --git a/homestar-wasm/src/wasmtime/error.rs b/homestar-wasm/src/wasmtime/error.rs index 7e327579..a11147d8 100644 --- a/homestar-wasm/src/wasmtime/error.rs +++ b/homestar-wasm/src/wasmtime/error.rs @@ -40,7 +40,7 @@ pub enum Error { #[error(transparent)] WasmRuntime(#[from] anyhow::Error), /// Failure to find Wasm function for execution. - #[error("Wasm function {0} not found")] + #[error("Wasm function {0} not found in given Wasm component/resource")] WasmFunctionNotFound(String), /// [Wat] as Wasm component error. /// diff --git a/homestar-wasm/src/wasmtime/world.rs b/homestar-wasm/src/wasmtime/world.rs index 18f95e41..6338ff12 100644 --- a/homestar-wasm/src/wasmtime/world.rs +++ b/homestar-wasm/src/wasmtime/world.rs @@ -121,7 +121,7 @@ impl Env { }, Input::Deferred(await_promise) => { bail!(Error::ResolvePromise(ResolveError::UnresolvedCid(format!( - "deferred task not yet resolved for {}: {}", + "deferred task/instruction not yet resolved or exists for promise: {}: {}", await_promise.result(), await_promise.instruction_cid() )))) From 2429bfeef79a5f5a846734b1e1e28d44d03ed87c Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Tue, 14 Nov 2023 12:44:24 -0500 Subject: [PATCH 3/3] chore: logs --- homestar-runtime/src/tasks/fetch.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/homestar-runtime/src/tasks/fetch.rs b/homestar-runtime/src/tasks/fetch.rs index 79319e99..87922afd 100644 --- a/homestar-runtime/src/tasks/fetch.rs +++ b/homestar-runtime/src/tasks/fetch.rs @@ -10,7 +10,6 @@ use anyhow::Result; use fnv::FnvHashSet; use indexmap::IndexMap; use std::sync::Arc; -use tracing::{info, warn}; pub(crate) struct Fetch; @@ -34,7 +33,7 @@ impl Fetch { let tasks = FuturesUnordered::new(); for rsc in resources.iter() { let task = tryhard::retry_fn(|| async { - info!( + tracing::info!( rsc = rsc.to_string(), "attempting to fetch resource from IPFS" ); @@ -47,21 +46,21 @@ impl Fetch { let err = error.to_string(); async move { if attempts < retries { - warn!( + tracing::warn!( err = err, attempts = attempts, "retrying fetch after error @ {}ms", next_delay.map(|d| d.as_millis()).unwrap_or(0) ); } else { - warn!(err = err, attempts = attempts, "maxed out # of retries"); + tracing::warn!(err = err, attempts = attempts, "maxed out # of retries"); } } }); tasks.push(task); } - info!("fetching necessary resources from IPFS"); + tracing::info!("fetching necessary resources from IPFS"); if let Ok(vec) = tasks.try_collect::>().await { vec.into_iter() .try_fold(IndexMap::default(), |mut acc, res| {