diff --git a/Cargo.lock b/Cargo.lock index c9e54438..01007b5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,9 +69,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" +checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" [[package]] name = "anstyle-parse" @@ -350,9 +350,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.13" +version = "4.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fbb260a053428790f3de475e304ff84cdbc4face759ea7a3e64c1edd938a7fc" +checksum = "11d8838454fda655dafd3accb2b6e2bea645b9e4078abe84a22ceb947235c5cc" dependencies = [ "clap_builder", "clap_derive", @@ -370,9 +370,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.13" +version = "4.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64b17d7ea74e9f833c7dbf2cbe4fb12ff26783eda4782a8975b72f895c9b4d99" +checksum = "216aec2b177652e3846684cbfe25c9964d18ec45234f0f5da5157b207ed1aab6" dependencies = [ "anstream", "anstyle", @@ -815,6 +815,21 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -822,6 +837,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -830,6 +846,34 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.75", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -848,10 +892,16 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -1029,6 +1079,7 @@ name = "hipcheck" version = "3.5.0" dependencies = [ "anyhow", + "async-stream", "base64 0.22.1", "chrono", "clap", @@ -1043,6 +1094,7 @@ dependencies = [ "env_logger", "finl_unicode", "fs_extra", + "futures", "git2", "graphql_client", "hipcheck-macros", @@ -1055,6 +1107,7 @@ dependencies = [ "maplit", "nom", "num-traits", + "num_enum", "once_cell", "ordered-float", "packageurl", @@ -1300,7 +1353,7 @@ dependencies = [ "proc-macro2", "quote", "strum 0.26.3", - "syn 2.0.74", + "syn 2.0.75", "thiserror", ] @@ -1602,6 +1655,27 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_enum" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.75", +] + [[package]] name = "number_prefix" version = "0.4.0" @@ -1849,6 +1923,15 @@ dependencies = [ "syn 2.0.75", ] +[[package]] +name = "proc-macro-crate" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" +dependencies = [ + "toml_edit 0.21.1", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -2322,9 +2405,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.122" +version = "1.0.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da" +checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed" dependencies = [ "itoa", "memchr", @@ -2725,7 +2808,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit", + "toml_edit 0.22.20", ] [[package]] @@ -2737,6 +2820,17 @@ dependencies = [ "serde", ] +[[package]] +name = "toml_edit" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" +dependencies = [ + "indexmap 2.2.6", + "toml_datetime", + "winnow 0.5.40", +] + [[package]] name = "toml_edit" version = "0.22.20" @@ -2747,7 +2841,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow", + "winnow 0.6.18", ] [[package]] @@ -3265,6 +3359,15 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" +dependencies = [ + "memchr", +] + [[package]] name = "winnow" version = "0.6.18" diff --git a/hipcheck/Cargo.toml b/hipcheck/Cargo.toml index 2100f485..0ab04509 100644 --- a/hipcheck/Cargo.toml +++ b/hipcheck/Cargo.toml @@ -89,8 +89,11 @@ fs_extra = "1.3.0" tonic = "0.12.1" prost = "0.13.1" rand = "0.8.5" -tokio = { version = "1.39.3", features = ["time"] } kdl = "4.6.0" +tokio = { version = "1.39.2", features = ["rt", "sync", "time"] } +futures = "0.3.30" +async-stream = "0.3.5" +num_enum = "0.7.3" # Exactly matching the version of rustls used by ureq # Get rid of default features since we don't use the AWS backed crypto provider (we use ring). diff --git a/hipcheck/src/plugin/manager.rs b/hipcheck/src/plugin/manager.rs index 2e699932..07d32bf1 100644 --- a/hipcheck/src/plugin/manager.rs +++ b/hipcheck/src/plugin/manager.rs @@ -1,19 +1,21 @@ use crate::hipcheck::plugin_client::PluginClient; use crate::plugin::{HcPluginClient, Plugin, PluginContext}; use crate::{hc_error, Result, F64}; +use futures::future::join_all; +use futures::Future; use rand::Rng; use std::collections::HashSet; use std::ops::Range; use std::process::Command; use tokio::time::{sleep_until, Duration, Instant}; +#[derive(Clone, Debug)] pub struct PluginExecutor { max_spawn_attempts: usize, max_conn_attempts: usize, port_range: Range, backoff_interval: Duration, jitter_percent: u8, - est_ports: HashSet, } impl PluginExecutor { pub fn new( @@ -36,21 +38,23 @@ impl PluginExecutor { port_range, backoff_interval, jitter_percent, - est_ports: HashSet::new(), }) } - fn get_available_port(&mut self) -> Result { + fn get_available_port(&self) -> Result { for i in self.port_range.start..self.port_range.end { - if !self.est_ports.contains(&i) - && std::net::TcpListener::bind(format!("127.0.0.1:{i}")).is_ok() - { + if std::net::TcpListener::bind(format!("127.0.0.1:{i}")).is_ok() { return Ok(i); } } Err(hc_error!("Failed to find available port")) } - pub async fn start_plugin(&mut self, plugin: &Plugin) -> Result { - let mut rng = rand::thread_rng(); + pub async fn start_plugins(&self, plugins: Vec) -> Result> { + join_all(plugins.into_iter().map(|p| self.start_plugin(p))) + .await + .into_iter() + .collect() + } + pub async fn start_plugin(&self, plugin: Plugin) -> Result { // Plugin startup design has inherent TOCTOU flaws since we tell the plugin // which port we expect it to bind to. We can try to ensure the port we pass // on the cmdline is not already in use, but it is still possible for that @@ -76,7 +80,7 @@ impl PluginExecutor { let mut opt_grpc: Option = None; while conn_attempts < self.max_conn_attempts { // Jitter could be positive or negative, so mult by 2 to cover both sides - let jitter: i32 = rng.gen_range(0..(2 * self.jitter_percent)) as i32; + let jitter: i32 = rand::thread_rng().gen_range(0..(2 * self.jitter_percent)) as i32; // Then subtract by self.jitter_percent to center around 0, and add to 100% let jitter_percent = 1.0 + ((jitter - (self.jitter_percent as i32)) as f64 / 100.0); // Once we are confident this math works, we can remove this @@ -107,14 +111,12 @@ impl PluginExecutor { spawn_attempts += 1; continue; }; - self.est_ports.insert(port); // We now have an open gRPC connection to our plugin process return Ok(PluginContext { plugin: plugin.clone(), port, grpc, proc, - channel: None, }); } Err(hc_error!( diff --git a/hipcheck/src/plugin/mod.rs b/hipcheck/src/plugin/mod.rs index 1a65780c..007b3d7b 100644 --- a/hipcheck/src/plugin/mod.rs +++ b/hipcheck/src/plugin/mod.rs @@ -2,8 +2,14 @@ mod download_manifest; mod manager; mod types; +use crate::hipcheck::Query; use crate::plugin::manager::*; pub use crate::plugin::types::*; +use crate::Result; +use futures::future::join_all; +use serde_json::Value; +use std::collections::HashMap; +use tokio::sync::mpsc; pub fn dummy() { let plugin = Plugin { @@ -18,3 +24,59 @@ pub fn dummy() { /* jitter_percent */ 10, ); } + +pub async fn initialize_plugins( + plugins: Vec, +) -> Result> { + let mut set = tokio::task::JoinSet::new(); + for (p, c) in plugins + .into_iter() + .map(Into::<(PluginContext, Value)>::into) + { + set.spawn(p.initialize(c)); + } + let mut out: Vec = vec![]; + while let Some(res) = set.join_next().await { + out.push(res??); + } + Ok(out) +} + +struct HcPluginCore { + executor: PluginExecutor, + plugins: HashMap, +} +impl HcPluginCore { + // When this object is returned, the plugins are all connected but the + // initialization protocol over the gRPC still needs to be completed + pub async fn new(executor: PluginExecutor, plugins: Vec<(PluginWithConfig)>) -> Result { + // Separate plugins and configs so we can start plugins async + let mut conf_map = HashMap::::new(); + let plugins = plugins + .into_iter() + .map(|pc| { + let (p, c) = pc.into(); + conf_map.insert(p.name.clone(), c); + p + }) + .collect(); + let ctxs = executor.start_plugins(plugins).await?; + // Rejoin plugin ctx with its config + let mapped_ctxs: Vec = ctxs + .into_iter() + .map(|c| { + let conf = conf_map.remove(&c.plugin.name).unwrap(); + PluginContextWithConfig(c, conf) + }) + .collect(); + // Use configs to initialize corresponding plugin + let plugins = HashMap::::from_iter( + initialize_plugins(mapped_ctxs) + .await? + .into_iter() + .map(|p| (p.name().to_owned(), p)), + ); + // Now we have a set of started and initialized plugins to interact with + Ok(HcPluginCore { executor, plugins }) + } +} diff --git a/hipcheck/src/plugin/types.rs b/hipcheck/src/plugin/types.rs index d1cda2fd..a14e79c6 100644 --- a/hipcheck/src/plugin/types.rs +++ b/hipcheck/src/plugin/types.rs @@ -1,13 +1,15 @@ use crate::hipcheck::plugin_client::PluginClient; use crate::hipcheck::{ Configuration, ConfigurationResult as PluginConfigResult, ConfigurationStatus, Empty, - Schema as PluginSchema, + Query as PluginQuery, QueryState, Schema as PluginSchema, }; -use crate::{hc_error, Result}; +use crate::{hc_error, Error, Result, StdResult}; use serde_json::Value; use std::collections::HashMap; +use std::convert::TryFrom; use std::ops::Not; use std::process::Child; +use tonic::codec::Streaming; use tonic::transport::Channel; pub type HcPluginClient = PluginClient; @@ -56,11 +58,14 @@ impl TryFrom for ConfigurationResult { // is stuffed into a custom error type enum that equals the protoc-generated one // minus the success variant. impl ConfigurationResult { - pub fn as_result(&self) -> std::result::Result<(), ConfigError> { + pub fn as_result(&self) -> Result<()> { let Ok(error) = self.status.try_into() else { return Ok(()); }; - Err(ConfigError::new(error, self.message.clone())) + Err(hc_error!( + "{}", + ConfigError::new(error, self.message.clone()).to_string() + )) } } pub enum ConfigErrorType { @@ -94,6 +99,22 @@ impl ConfigError { ConfigError { error, message } } } +impl std::fmt::Display for ConfigError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> StdResult<(), std::fmt::Error> { + use ConfigErrorType::*; + let msg = match &self.message { + Some(s) => format!(": {s}"), + None => "".to_owned(), + }; + let err = match self.error { + Unknown => "unknown configuration error occurred", + MissingRequiredConfig => "configuration is missing requried fields", + UnrecognizedConfig => "configuration contains unrecognized fields", + InvalidConfigValue => "configuration contains invalid values", + }; + write!(f, "{}{}", msg, err) + } +} // State for managing an actively running plugin process pub struct PluginContext { @@ -101,7 +122,6 @@ pub struct PluginContext { pub port: u16, pub grpc: HcPluginClient, pub proc: Child, - pub channel: Option, } // Redefinition of `grpc` field's functions with more useful types, additional // error & sanity checking @@ -139,6 +159,44 @@ impl PluginContext { let mut res = self.grpc.get_default_policy_expression(Empty {}).await?; Ok(res.get_ref().policy_expression.to_owned()) } + pub async fn initiate_query_protocol( + &mut self, + mut rx: tokio::sync::mpsc::Receiver, + ) -> Result> { + let stream = async_stream::stream! { + while let Some(item) = rx.recv().await { + yield item; + } + }; + match self.grpc.initiate_query_protocol(stream).await { + Ok(resp) => Ok(resp.into_inner()), + Err(e) => Err(hc_error!( + "query protocol initiation failed with tonic status code {}", + e + )), + } + } + pub async fn initialize(mut self, config: Value) -> Result { + let schemas = HashMap::::from_iter( + self.get_query_schemas() + .await? + .into_iter() + .map(|s| (s.query_name.clone(), s)), + ); + self.set_configuration(&config).await?.as_result()?; + let default_policy_expr = self.get_default_policy_expression().await?; + let (tx, mut out_rx) = tokio::sync::mpsc::channel::(10); + let rx = self.initiate_query_protocol(out_rx).await?; + Ok(PluginTransport { + schemas, + default_policy_expr, + ctx: self, + tx, + rx, + // active_query: None, + // last_id: 0, + }) + } } impl Drop for PluginContext { fn drop(&mut self) { @@ -147,3 +205,138 @@ impl Drop for PluginContext { } } } + +pub enum HcQueryResult { + Ok(Value), + Needs(String, String, String, Value), + Err(Error), +} + +struct Query { + id: usize, + // if false, response + request: bool, + publisher: String, + plugin: String, + query: String, + key: Value, + output: Value, +} +impl TryFrom for Query { + type Error = Error; + fn try_from(value: PluginQuery) -> Result { + use QueryState::*; + let request = match TryInto::::try_into(value.state)? { + QueryUnspecified => return Err(hc_error!("unspecified error from plugin")), + QueryReplyInProgress => { + return Err(hc_error!( + "invalid state QueryReplyInProgress for conversion to Query" + )) + } + QueryReplyComplete => false, + QuerySubmit => true, + }; + let key: Value = serde_json::from_str(value.key.as_str())?; + let output: Value = serde_json::from_str(value.output.as_str())?; + Ok(Query { + id: value.id as usize, + request, + publisher: value.publisher_name, + plugin: value.plugin_name, + query: value.query_name, + key, + output, + }) + } +} +impl TryFrom for PluginQuery { + type Error = crate::error::Error; + fn try_from(value: Query) -> Result { + let state_enum = match value.request { + true => QueryState::QuerySubmit, + false => QueryState::QueryReplyComplete, + }; + let key = serde_json::to_string(&value.key)?; + let output = serde_json::to_string(&value.output)?; + Ok(PluginQuery { + id: value.id as i32, + state: state_enum as i32, + publisher_name: value.publisher, + plugin_name: value.plugin, + query_name: value.query, + key, + output, + }) + } +} + +// Encapsulate an "initialized" state of a Plugin with interfaces that abstract +// query chunking to produce whole messages for the Hipcheck engine +pub struct PluginTransport { + pub schemas: HashMap, + pub default_policy_expr: String, // TODO - update with policy_expr type + ctx: PluginContext, + tx: tokio::sync::mpsc::Sender, + rx: Streaming, +} +impl PluginTransport { + pub fn name(&self) -> &str { + &self.ctx.plugin.name + } + async fn send(&mut self, query: Query) -> Result<()> { + let query: PluginQuery = query.try_into()?; + self.tx + .send(query) + .await + .map_err(|e| hc_error!("sending query failed: {}", e)) + } + async fn recv(&mut self) -> Result> { + use QueryState::*; + let Some(mut raw) = self.rx.message().await? else { + // gRPC channel was closed + return Ok(None); + }; + let mut state: QueryState = raw.state.try_into()?; + // As long as we expect successive chunks, keep receiving + if matches!(state, QueryReplyInProgress) { + while matches!(state, QueryReplyInProgress) { + println!("Retrieving next response"); + let Some(next) = self.rx.message().await? else { + return Err(hc_error!( + "plugin gRPC channel closed while sending chunked message" + )); + }; + // Assert that the ids are consistent + if next.id != raw.id { + return Err(hc_error!("msg ids from plugin do not match")); + } + state = next.state.try_into()?; + match state { + QueryUnspecified => return Err(hc_error!("unspecified error from plugin")), + QuerySubmit => { + return Err(hc_error!( + "plugin sent QuerySubmit state when reply chunk expected" + )) + } + QueryReplyInProgress | QueryReplyComplete => { + raw.output.push_str(next.output.as_str()); + } + }; + } + } + raw.try_into().map(Some) + } +} + +pub struct PluginWithConfig(pub Plugin, pub Value); +impl From for (Plugin, Value) { + fn from(value: PluginWithConfig) -> Self { + (value.0, value.1) + } +} +pub struct PluginContextWithConfig(pub PluginContext, pub Value); +impl From for (PluginContext, Value) { + fn from(value: PluginContextWithConfig) -> Self { + (value.0, value.1) + } +}