Skip to content

Commit

Permalink
feat: Initial scaffolding for environment managers (pip, pipx, uv).
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse committed Jan 23, 2025
1 parent 1ed6c68 commit c8da105
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 42 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_client = { path = "../influxdb3_client" }
influxdb3_clap_blocks = { path = "../influxdb3_clap_blocks" }
influxdb3_process = { path = "../influxdb3_process", default-features = false }
influxdb3_processing_engine = {path = "../influxdb3_processing_engine"}
influxdb3_server = { path = "../influxdb3_server" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_write = { path = "../influxdb3_write" }
Expand Down
33 changes: 25 additions & 8 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use influxdb3_cache::{
last_cache::{self, LastCacheProvider},
parquet_cache::create_cached_obj_store_and_oracle,
};
use influxdb3_clap_blocks::plugins::{PackageManager, ProcessingEngineConfig};
use influxdb3_clap_blocks::{
datafusion::IoxQueryDatafusionConfig,
memory_size::MemorySize,
Expand All @@ -17,6 +18,10 @@ use influxdb3_clap_blocks::{
use influxdb3_process::{
build_malloc_conf, setup_metric_registry, INFLUXDB3_GIT_HASH, INFLUXDB3_VERSION, PROCESS_UUID,
};
use influxdb3_processing_engine::environment::{
PipManager, PipxManager, PythonEnvironmentManager, UVManager,
};
use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager;
use influxdb3_server::{
auth::AllOrNothingAuthorizer,
builder::ServerBuilder,
Expand All @@ -41,10 +46,7 @@ use observability_deps::tracing::*;
use panic_logging::SendPanicsToTracing;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use std::{
path::{Path, PathBuf},
str::FromStr,
};
use std::{path::Path, str::FromStr};
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::time::Instant;
Expand Down Expand Up @@ -312,9 +314,9 @@ pub struct Config {
)]
pub distinct_cache_eviction_interval: humantime::Duration,

/// The local directory that has python plugins and their test files.
#[clap(long = "plugin-dir", env = "INFLUXDB3_PLUGIN_DIR", action)]
pub plugin_dir: Option<PathBuf>,
/// The processing engine config.
#[clap(flatten)]
pub processing_engine_config: ProcessingEngineConfig,

/// Threshold for internal buffer, can be either percentage or absolute value.
/// eg: 70% or 100000
Expand Down Expand Up @@ -580,7 +582,7 @@ pub async fn command(config: Config) -> Result<()> {
trace_exporter,
trace_header_parser,
Arc::clone(&telemetry_store),
config.plugin_dir,
setup_processing_engine_env_manager(&config.processing_engine_config),
)?;

let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs {
Expand Down Expand Up @@ -619,6 +621,21 @@ pub async fn command(config: Config) -> Result<()> {
Ok(())
}

fn setup_processing_engine_env_manager(
config: &ProcessingEngineConfig,
) -> ProcessingEngineEnvironmentManager {
let package_manager: Arc<dyn PythonEnvironmentManager> = match config.package_manager {
PackageManager::Pip => Arc::new(PipManager),
PackageManager::Pipx => Arc::new(PipxManager),
PackageManager::UV => Arc::new(UVManager),
};
ProcessingEngineEnvironmentManager {
plugin_dir: config.plugin_dir.clone(),
virtual_env_location: config.virtual_env_location.clone(),
package_manager,
}
}

async fn setup_telemetry_store(
object_store_config: &ObjectStoreConfig,
instance_id: Arc<str>,
Expand Down
1 change: 1 addition & 0 deletions influxdb3_clap_blocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
pub mod datafusion;
pub mod memory_size;
pub mod object_store;
pub mod plugins;
pub mod socket_addr;
pub mod tokio;
23 changes: 23 additions & 0 deletions influxdb3_clap_blocks/src/plugins.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::path::PathBuf;

/// Specifies the behavior of the Processing Engine.
/// Currently used to determine the plugin directory and which tooling to use to initialize python,
/// but will expand for other settings, such as error behavior.
#[derive(Debug, clap::Parser, Clone)]
pub struct ProcessingEngineConfig {
#[clap(long = "plugin-dir")]
pub plugin_dir: Option<PathBuf>,
#[clap(long = "virtual-env-location", env = "VIRTUAL_ENV")]
pub virtual_env_location: Option<PathBuf>,

#[clap(long = "package-manager", default_value = "pip")]
pub package_manager: PackageManager,
}

#[derive(Debug, Clone, Copy, Default, clap::ValueEnum)]
pub enum PackageManager {
#[default]
Pip,
Pipx,
UV,
}
95 changes: 95 additions & 0 deletions influxdb3_processing_engine/src/environment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use crate::environment::PluginEnvironmentError::PluginEnvironmentDisabled;
use std::fmt::Debug;
use std::process::Command;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum PluginEnvironmentError {
#[error("Package manager not available: {0}")]
PackageManagerNotFound(String),
#[error("External call failed: {0}")]
InstallationFailed(#[from] std::io::Error),
#[error("Plugin environment management is disabled")]
PluginEnvironmentDisabled,
}

pub trait PythonEnvironmentManager: Debug + Send + Sync + 'static {
fn install_package(&self, package_name: String) -> Result<(), PluginEnvironmentError>;

fn install_requirements(&self, requirements_path: String)
-> Result<(), PluginEnvironmentError>;
}

#[derive(Debug, Copy, Clone)]
pub struct UVManager;
#[derive(Debug, Copy, Clone)]
pub struct PipManager;
#[derive(Debug, Copy, Clone)]
pub struct PipxManager;

#[derive(Debug, Copy, Clone)]
pub struct DisabledManager;

impl PythonEnvironmentManager for UVManager {
fn install_package(&self, package: String) -> Result<(), PluginEnvironmentError> {
Command::new("uv")
.args(["pip", "install", &package])
.output()?;
Ok(())
}

fn install_requirements(
&self,
requirements_path: String,
) -> Result<(), PluginEnvironmentError> {
Command::new("uv")
.args(["pip", "install", "-r", &requirements_path])
.output()?;
Ok(())
}
}

impl PythonEnvironmentManager for PipManager {
fn install_package(&self, package: String) -> Result<(), PluginEnvironmentError> {
Command::new("pip").args(["install", &package]).output()?;
Ok(())
}
fn install_requirements(
&self,
requirements_path: String,
) -> Result<(), PluginEnvironmentError> {
Command::new("pip")
.args(["install", "-r", &requirements_path])
.output()?;
Ok(())
}
}

impl PythonEnvironmentManager for PipxManager {
fn install_package(&self, package: String) -> Result<(), PluginEnvironmentError> {
Command::new("pipx").args(["install", &package]).output()?;
Ok(())
}
fn install_requirements(
&self,
requirements_path: String,
) -> Result<(), PluginEnvironmentError> {
Command::new("pipx")
.args(["install", "-r", &requirements_path])
.output()?;
Ok(())
}
}

impl PythonEnvironmentManager for DisabledManager {
fn install_package(&self, _package: String) -> Result<(), PluginEnvironmentError> {
Err(PluginEnvironmentDisabled)
}

fn install_requirements(
&self,
_requirements_path: String,
) -> Result<(), PluginEnvironmentError> {
Err(PluginEnvironmentDisabled)
}
}
32 changes: 24 additions & 8 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::manager::{ProcessingEngineError, ProcessingEngineManager};
use crate::plugins::Error;
#[cfg(feature = "system-py")]
use crate::plugins::PluginContext;
use crate::plugins::{Error, ProcessingEngineEnvironmentManager};
use anyhow::Context;
use bytes::Bytes;
use hashbrown::HashMap;
Expand Down Expand Up @@ -29,8 +29,11 @@ use std::time::SystemTime;
use tokio::sync::oneshot::Receiver;
use tokio::sync::{mpsc, oneshot, RwLock};

pub mod environment;
pub mod manager;
pub mod plugins;

#[cfg(feature = "system-py")]
pub(crate) mod virtualenv;

#[derive(Debug)]
Expand Down Expand Up @@ -154,20 +157,20 @@ impl PluginChannels {

impl ProcessingEngineManagerImpl {
pub fn new(
plugin_dir: Option<std::path::PathBuf>,
environment: ProcessingEngineEnvironmentManager,
catalog: Arc<Catalog>,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
time_provider: Arc<dyn TimeProvider>,
wal: Arc<dyn Wal>,
) -> Self {
// if given a plugin dir, try to initialize the virtualenv.
if let Some(ref plugin_dir) = plugin_dir {
let venv_path = plugin_dir.join(".venv");
virtualenv::try_init_venv(&venv_path)
if environment.plugin_dir.is_some() {
#[cfg(feature = "system-py")]
virtualenv::init_pyo3(&environment.virtual_env_location);
}
Self {
plugin_dir,
plugin_dir: environment.plugin_dir,
catalog,
write_buffer,
query_executor,
Expand Down Expand Up @@ -756,7 +759,9 @@ pub(crate) struct Request {

#[cfg(test)]
mod tests {
use crate::environment::DisabledManager;
use crate::manager::{ProcessingEngineError, ProcessingEngineManager};
use crate::plugins::ProcessingEngineEnvironmentManager;
use crate::ProcessingEngineManagerImpl;
use data_types::NamespaceName;
use datafusion_util::config::register_iox_object_store;
Expand Down Expand Up @@ -1230,10 +1235,21 @@ def process_writes(influxdb3_local, table_batches, args=None):
influxdb3_local.info("done")
"#;
writeln!(file, "{}", code).unwrap();
let plugin_dir = Some(file.path().parent().unwrap().to_path_buf());
let environment_manager = ProcessingEngineEnvironmentManager {
plugin_dir: Some(file.path().parent().unwrap().to_path_buf()),
virtual_env_location: None,
package_manager: Arc::new(DisabledManager),
};

(
ProcessingEngineManagerImpl::new(plugin_dir, catalog, wbuf, qe, time_provider, wal),
ProcessingEngineManagerImpl::new(
environment_manager,
catalog,
wbuf,
qe,
time_provider,
wal,
),
file,
)
}
Expand Down
9 changes: 9 additions & 0 deletions influxdb3_processing_engine/src/plugins.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::environment::PythonEnvironmentManager;
#[cfg(feature = "system-py")]
use crate::PluginCode;
#[cfg(feature = "system-py")]
Expand All @@ -23,6 +24,7 @@ use influxdb3_write::WriteBuffer;
use iox_time::TimeProvider;
use observability_deps::tracing::error;
use std::fmt::Debug;
use std::path::PathBuf;
#[cfg(feature = "system-py")]
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -89,6 +91,13 @@ pub(crate) fn run_wal_contents_plugin(
});
}

#[derive(Debug, Clone)]
pub struct ProcessingEngineEnvironmentManager {
pub plugin_dir: Option<PathBuf>,
pub virtual_env_location: Option<PathBuf>,
pub package_manager: Arc<dyn PythonEnvironmentManager>,
}

#[cfg(feature = "system-py")]
pub(crate) fn run_schedule_plugin(
db_name: String,
Expand Down
Loading

0 comments on commit c8da105

Please sign in to comment.