From c8da10511422dbd6d850a9872efbe3b610a15c5d Mon Sep 17 00:00:00 2001 From: Jackson Newhouse Date: Thu, 23 Jan 2025 11:18:28 -0800 Subject: [PATCH] feat: Initial scaffolding for environment managers (pip, pipx, uv). --- Cargo.lock | 1 + influxdb3/Cargo.toml | 1 + influxdb3/src/commands/serve.rs | 33 +++++-- influxdb3_clap_blocks/src/lib.rs | 1 + influxdb3_clap_blocks/src/plugins.rs | 23 +++++ .../src/environment.rs | 95 +++++++++++++++++++ influxdb3_processing_engine/src/lib.rs | 32 +++++-- influxdb3_processing_engine/src/plugins.rs | 9 ++ influxdb3_processing_engine/src/virtualenv.rs | 40 ++++---- influxdb3_server/src/builder.rs | 2 +- influxdb3_server/src/lib.rs | 16 +++- 11 files changed, 211 insertions(+), 42 deletions(-) create mode 100644 influxdb3_clap_blocks/src/plugins.rs create mode 100644 influxdb3_processing_engine/src/environment.rs diff --git a/Cargo.lock b/Cargo.lock index e1637a3a13a..8a55bb4e24d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2775,6 +2775,7 @@ dependencies = [ "influxdb3_clap_blocks", "influxdb3_client", "influxdb3_process", + "influxdb3_processing_engine", "influxdb3_server", "influxdb3_sys_events", "influxdb3_telemetry", diff --git a/influxdb3/Cargo.toml b/influxdb3/Cargo.toml index e4d797b2ba9..315bd972093 100644 --- a/influxdb3/Cargo.toml +++ b/influxdb3/Cargo.toml @@ -28,6 +28,7 @@ influxdb3_catalog = { path = "../influxdb3_catalog" } influxdb3_client = { path = "../influxdb3_client" } influxdb3_clap_blocks = { path = "../influxdb3_clap_blocks" } influxdb3_process = { path = "../influxdb3_process", default-features = false } +influxdb3_processing_engine = {path = "../influxdb3_processing_engine"} influxdb3_server = { path = "../influxdb3_server" } influxdb3_wal = { path = "../influxdb3_wal" } influxdb3_write = { path = "../influxdb3_write" } diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index f528cd133c1..de8c1636915 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -7,6 +7,7 @@ use influxdb3_cache::{ last_cache::{self, LastCacheProvider}, parquet_cache::create_cached_obj_store_and_oracle, }; +use influxdb3_clap_blocks::plugins::{PackageManager, ProcessingEngineConfig}; use influxdb3_clap_blocks::{ datafusion::IoxQueryDatafusionConfig, memory_size::MemorySize, @@ -17,6 +18,10 @@ use influxdb3_clap_blocks::{ use influxdb3_process::{ build_malloc_conf, setup_metric_registry, INFLUXDB3_GIT_HASH, INFLUXDB3_VERSION, PROCESS_UUID, }; +use influxdb3_processing_engine::environment::{ + PipManager, PipxManager, PythonEnvironmentManager, UVManager, +}; +use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager; use influxdb3_server::{ auth::AllOrNothingAuthorizer, builder::ServerBuilder, @@ -41,10 +46,7 @@ use observability_deps::tracing::*; use panic_logging::SendPanicsToTracing; use parquet_file::storage::{ParquetStorage, StorageId}; use std::{num::NonZeroUsize, sync::Arc, time::Duration}; -use std::{ - path::{Path, PathBuf}, - str::FromStr, -}; +use std::{path::Path, str::FromStr}; use thiserror::Error; use tokio::net::TcpListener; use tokio::time::Instant; @@ -312,9 +314,9 @@ pub struct Config { )] pub distinct_cache_eviction_interval: humantime::Duration, - /// The local directory that has python plugins and their test files. - #[clap(long = "plugin-dir", env = "INFLUXDB3_PLUGIN_DIR", action)] - pub plugin_dir: Option, + /// The processing engine config. + #[clap(flatten)] + pub processing_engine_config: ProcessingEngineConfig, /// Threshold for internal buffer, can be either percentage or absolute value. /// eg: 70% or 100000 @@ -580,7 +582,7 @@ pub async fn command(config: Config) -> Result<()> { trace_exporter, trace_header_parser, Arc::clone(&telemetry_store), - config.plugin_dir, + setup_processing_engine_env_manager(&config.processing_engine_config), )?; let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs { @@ -619,6 +621,21 @@ pub async fn command(config: Config) -> Result<()> { Ok(()) } +fn setup_processing_engine_env_manager( + config: &ProcessingEngineConfig, +) -> ProcessingEngineEnvironmentManager { + let package_manager: Arc = match config.package_manager { + PackageManager::Pip => Arc::new(PipManager), + PackageManager::Pipx => Arc::new(PipxManager), + PackageManager::UV => Arc::new(UVManager), + }; + ProcessingEngineEnvironmentManager { + plugin_dir: config.plugin_dir.clone(), + virtual_env_location: config.virtual_env_location.clone(), + package_manager, + } +} + async fn setup_telemetry_store( object_store_config: &ObjectStoreConfig, instance_id: Arc, diff --git a/influxdb3_clap_blocks/src/lib.rs b/influxdb3_clap_blocks/src/lib.rs index e5880e50c7d..7db9f1b788c 100644 --- a/influxdb3_clap_blocks/src/lib.rs +++ b/influxdb3_clap_blocks/src/lib.rs @@ -3,5 +3,6 @@ pub mod datafusion; pub mod memory_size; pub mod object_store; +pub mod plugins; pub mod socket_addr; pub mod tokio; diff --git a/influxdb3_clap_blocks/src/plugins.rs b/influxdb3_clap_blocks/src/plugins.rs new file mode 100644 index 00000000000..f2d5cd25ddf --- /dev/null +++ b/influxdb3_clap_blocks/src/plugins.rs @@ -0,0 +1,23 @@ +use std::path::PathBuf; + +/// Specifies the behavior of the Processing Engine. +/// Currently used to determine the plugin directory and which tooling to use to initialize python, +/// but will expand for other settings, such as error behavior. +#[derive(Debug, clap::Parser, Clone)] +pub struct ProcessingEngineConfig { + #[clap(long = "plugin-dir")] + pub plugin_dir: Option, + #[clap(long = "virtual-env-location", env = "VIRTUAL_ENV")] + pub virtual_env_location: Option, + + #[clap(long = "package-manager", default_value = "pip")] + pub package_manager: PackageManager, +} + +#[derive(Debug, Clone, Copy, Default, clap::ValueEnum)] +pub enum PackageManager { + #[default] + Pip, + Pipx, + UV, +} diff --git a/influxdb3_processing_engine/src/environment.rs b/influxdb3_processing_engine/src/environment.rs new file mode 100644 index 00000000000..41853d85173 --- /dev/null +++ b/influxdb3_processing_engine/src/environment.rs @@ -0,0 +1,95 @@ +use crate::environment::PluginEnvironmentError::PluginEnvironmentDisabled; +use std::fmt::Debug; +use std::process::Command; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum PluginEnvironmentError { + #[error("Package manager not available: {0}")] + PackageManagerNotFound(String), + #[error("External call failed: {0}")] + InstallationFailed(#[from] std::io::Error), + #[error("Plugin environment management is disabled")] + PluginEnvironmentDisabled, +} + +pub trait PythonEnvironmentManager: Debug + Send + Sync + 'static { + fn install_package(&self, package_name: String) -> Result<(), PluginEnvironmentError>; + + fn install_requirements(&self, requirements_path: String) + -> Result<(), PluginEnvironmentError>; +} + +#[derive(Debug, Copy, Clone)] +pub struct UVManager; +#[derive(Debug, Copy, Clone)] +pub struct PipManager; +#[derive(Debug, Copy, Clone)] +pub struct PipxManager; + +#[derive(Debug, Copy, Clone)] +pub struct DisabledManager; + +impl PythonEnvironmentManager for UVManager { + fn install_package(&self, package: String) -> Result<(), PluginEnvironmentError> { + Command::new("uv") + .args(["pip", "install", &package]) + .output()?; + Ok(()) + } + + fn install_requirements( + &self, + requirements_path: String, + ) -> Result<(), PluginEnvironmentError> { + Command::new("uv") + .args(["pip", "install", "-r", &requirements_path]) + .output()?; + Ok(()) + } +} + +impl PythonEnvironmentManager for PipManager { + fn install_package(&self, package: String) -> Result<(), PluginEnvironmentError> { + Command::new("pip").args(["install", &package]).output()?; + Ok(()) + } + fn install_requirements( + &self, + requirements_path: String, + ) -> Result<(), PluginEnvironmentError> { + Command::new("pip") + .args(["install", "-r", &requirements_path]) + .output()?; + Ok(()) + } +} + +impl PythonEnvironmentManager for PipxManager { + fn install_package(&self, package: String) -> Result<(), PluginEnvironmentError> { + Command::new("pipx").args(["install", &package]).output()?; + Ok(()) + } + fn install_requirements( + &self, + requirements_path: String, + ) -> Result<(), PluginEnvironmentError> { + Command::new("pipx") + .args(["install", "-r", &requirements_path]) + .output()?; + Ok(()) + } +} + +impl PythonEnvironmentManager for DisabledManager { + fn install_package(&self, _package: String) -> Result<(), PluginEnvironmentError> { + Err(PluginEnvironmentDisabled) + } + + fn install_requirements( + &self, + _requirements_path: String, + ) -> Result<(), PluginEnvironmentError> { + Err(PluginEnvironmentDisabled) + } +} diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index 931d3e2d070..6a8fd04fb81 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -1,7 +1,7 @@ use crate::manager::{ProcessingEngineError, ProcessingEngineManager}; -use crate::plugins::Error; #[cfg(feature = "system-py")] use crate::plugins::PluginContext; +use crate::plugins::{Error, ProcessingEngineEnvironmentManager}; use anyhow::Context; use bytes::Bytes; use hashbrown::HashMap; @@ -29,8 +29,11 @@ use std::time::SystemTime; use tokio::sync::oneshot::Receiver; use tokio::sync::{mpsc, oneshot, RwLock}; +pub mod environment; pub mod manager; pub mod plugins; + +#[cfg(feature = "system-py")] pub(crate) mod virtualenv; #[derive(Debug)] @@ -154,7 +157,7 @@ impl PluginChannels { impl ProcessingEngineManagerImpl { pub fn new( - plugin_dir: Option, + environment: ProcessingEngineEnvironmentManager, catalog: Arc, write_buffer: Arc, query_executor: Arc, @@ -162,12 +165,12 @@ impl ProcessingEngineManagerImpl { wal: Arc, ) -> Self { // if given a plugin dir, try to initialize the virtualenv. - if let Some(ref plugin_dir) = plugin_dir { - let venv_path = plugin_dir.join(".venv"); - virtualenv::try_init_venv(&venv_path) + if environment.plugin_dir.is_some() { + #[cfg(feature = "system-py")] + virtualenv::init_pyo3(&environment.virtual_env_location); } Self { - plugin_dir, + plugin_dir: environment.plugin_dir, catalog, write_buffer, query_executor, @@ -756,7 +759,9 @@ pub(crate) struct Request { #[cfg(test)] mod tests { + use crate::environment::DisabledManager; use crate::manager::{ProcessingEngineError, ProcessingEngineManager}; + use crate::plugins::ProcessingEngineEnvironmentManager; use crate::ProcessingEngineManagerImpl; use data_types::NamespaceName; use datafusion_util::config::register_iox_object_store; @@ -1230,10 +1235,21 @@ def process_writes(influxdb3_local, table_batches, args=None): influxdb3_local.info("done") "#; writeln!(file, "{}", code).unwrap(); - let plugin_dir = Some(file.path().parent().unwrap().to_path_buf()); + let environment_manager = ProcessingEngineEnvironmentManager { + plugin_dir: Some(file.path().parent().unwrap().to_path_buf()), + virtual_env_location: None, + package_manager: Arc::new(DisabledManager), + }; ( - ProcessingEngineManagerImpl::new(plugin_dir, catalog, wbuf, qe, time_provider, wal), + ProcessingEngineManagerImpl::new( + environment_manager, + catalog, + wbuf, + qe, + time_provider, + wal, + ), file, ) } diff --git a/influxdb3_processing_engine/src/plugins.rs b/influxdb3_processing_engine/src/plugins.rs index e9f91796662..a8dfb296acb 100644 --- a/influxdb3_processing_engine/src/plugins.rs +++ b/influxdb3_processing_engine/src/plugins.rs @@ -1,3 +1,4 @@ +use crate::environment::PythonEnvironmentManager; #[cfg(feature = "system-py")] use crate::PluginCode; #[cfg(feature = "system-py")] @@ -23,6 +24,7 @@ use influxdb3_write::WriteBuffer; use iox_time::TimeProvider; use observability_deps::tracing::error; use std::fmt::Debug; +use std::path::PathBuf; #[cfg(feature = "system-py")] use std::str::FromStr; use std::sync::Arc; @@ -89,6 +91,13 @@ pub(crate) fn run_wal_contents_plugin( }); } +#[derive(Debug, Clone)] +pub struct ProcessingEngineEnvironmentManager { + pub plugin_dir: Option, + pub virtual_env_location: Option, + pub package_manager: Arc, +} + #[cfg(feature = "system-py")] pub(crate) fn run_schedule_plugin( db_name: String, diff --git a/influxdb3_processing_engine/src/virtualenv.rs b/influxdb3_processing_engine/src/virtualenv.rs index f4f9e2ffaf4..ca76b5f98e5 100644 --- a/influxdb3_processing_engine/src/virtualenv.rs +++ b/influxdb3_processing_engine/src/virtualenv.rs @@ -1,5 +1,5 @@ -use observability_deps::tracing::{info, warn}; -use std::path::Path; +use observability_deps::tracing::{debug, warn}; +use std::path::{Path, PathBuf}; use std::process::Command; use std::sync::Once; use thiserror::Error; @@ -16,7 +16,10 @@ pub(crate) enum VenvError { fn get_python_version() -> Result<(u8, u8), std::io::Error> { let output = Command::new("python3") - .args(["-c", "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')"]) + .args([ + "-c", + "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')", + ]) .output()?; let version = String::from_utf8_lossy(&output.stdout).trim().to_string(); @@ -27,7 +30,6 @@ fn get_python_version() -> Result<(u8, u8), std::io::Error> { Ok((major, minor)) } -#[cfg(unix)] fn set_pythonpath(venv_dir: &Path) -> Result<(), std::io::Error> { let (major, minor) = get_python_version()?; let site_packages = venv_dir @@ -35,28 +37,26 @@ fn set_pythonpath(venv_dir: &Path) -> Result<(), std::io::Error> { .join(format!("python{}.{}", major, minor)) .join("site-packages"); - info!("site packages is {}", site_packages.to_string_lossy()); - - if site_packages.exists() { - std::env::set_var("PYTHONPATH", site_packages); - } + debug!("Setting PYTHONPATH to: {}", site_packages.to_string_lossy()); + std::env::set_var("PYTHONPATH", &site_packages); Ok(()) } -pub(crate) fn try_init_venv(venv_path: &Path) { +pub(crate) fn init_pyo3(venv_path: &Option) { PYTHON_INIT.call_once(|| { - if let Err(err) = initialize_venv(venv_path) { - warn!( - "Failed to initialize virtualenv at {}: {}", - venv_path.to_string_lossy(), - err - ); + if let Some(venv_path) = venv_path { + if let Err(err) = initialize_venv(venv_path) { + warn!( + "Failed to initialize virtualenv at {}: {}", + venv_path.to_string_lossy(), + err + ); + } } pyo3::prepare_freethreaded_python(); }) } - #[cfg(unix)] pub(crate) fn initialize_venv(venv_path: &Path) -> Result<(), VenvError> { use std::process::Command; @@ -68,6 +68,7 @@ pub(crate) fn initialize_venv(venv_path: &Path) -> Result<(), VenvError> { activate_script ))); } + set_pythonpath(venv_path)?; let output = Command::new("bash") .arg("-c") @@ -82,7 +83,6 @@ pub(crate) fn initialize_venv(venv_path: &Path) -> Result<(), VenvError> { String::from_utf8_lossy(&output.stderr).to_string(), )); } - set_pythonpath(venv_path)?; // Apply environment changes String::from_utf8_lossy(&output.stdout) @@ -90,8 +90,8 @@ pub(crate) fn initialize_venv(venv_path: &Path) -> Result<(), VenvError> { .filter_map(|line| line.split_once('=')) .for_each(|(key, value)| { println!("{}={}", key, value); - std::env::set_var(key, value)}); - + std::env::set_var(key, value) + }); Ok(()) } diff --git a/influxdb3_server/src/builder.rs b/influxdb3_server/src/builder.rs index 40ab6d35fae..94847323c39 100644 --- a/influxdb3_server/src/builder.rs +++ b/influxdb3_server/src/builder.rs @@ -154,7 +154,7 @@ impl let persister = Arc::clone(&self.persister.0); let authorizer = Arc::clone(&self.authorizer); let processing_engine = Arc::new(ProcessingEngineManagerImpl::new( - self.common_state.plugin_dir.clone(), + self.common_state.processing_engine_environment.clone(), self.write_buffer.0.catalog(), Arc::clone(&self.write_buffer.0), Arc::clone(&self.query_executor.0), diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 301fbef40a9..16f698eb46f 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -27,6 +27,7 @@ use authz::Authorizer; use hyper::server::conn::AddrIncoming; use hyper::server::conn::Http; use hyper::service::service_fn; +use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_write::persister::Persister; use iox_time::TimeProvider; @@ -35,7 +36,6 @@ use observability_deps::tracing::info; use service::hybrid; use std::convert::Infallible; use std::fmt::Debug; -use std::path::PathBuf; use std::sync::Arc; use thiserror::Error; use tokio::net::TcpListener; @@ -79,7 +79,7 @@ pub struct CommonServerState { trace_exporter: Option>, trace_header_parser: TraceHeaderParser, telemetry_store: Arc, - plugin_dir: Option, + processing_engine_environment: ProcessingEngineEnvironmentManager, } impl CommonServerState { @@ -88,14 +88,14 @@ impl CommonServerState { trace_exporter: Option>, trace_header_parser: TraceHeaderParser, telemetry_store: Arc, - plugin_dir: Option, + processing_engine_environment: ProcessingEngineEnvironmentManager, ) -> Result { Ok(Self { metrics, trace_exporter, trace_header_parser, telemetry_store, - plugin_dir, + processing_engine_environment, }) } @@ -237,6 +237,8 @@ mod tests { use std::sync::Arc; use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; + use influxdb3_processing_engine::environment::DisabledManager; + use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn write_and_query() { @@ -796,7 +798,11 @@ mod tests { None, trace_header_parser, Arc::clone(&sample_telem_store), - None, + ProcessingEngineEnvironmentManager{ + plugin_dir: None, + virtual_env_location: None, + package_manager: Arc::new(DisabledManager), + }, ) .unwrap(); let query_executor = QueryExecutorImpl::new(CreateQueryExecutorArgs {