From 52b895673d727a64ad2ddca6e825b7d45c07443f Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Sun, 26 Jan 2025 17:51:16 -0500 Subject: [PATCH] feat: Update trigger creation to validate plugin file present This updates trigger creation to load the plugin file before creating the trigger. Another small change is to make Github references use filenames and paths identical to what they would be in the plugin-dir. This makes it a little easier to have the plugins repo local and develop against it and then be able to reference the same file later with gh: once it's up on the repo. --- influxdb3/src/commands/create.rs | 13 ++++-- influxdb3/tests/server/cli.rs | 38 +++++++++++++++- influxdb3_processing_engine/src/lib.rs | 31 ++++++++------ influxdb3_processing_engine/src/manager.rs | 6 +-- influxdb3_processing_engine/src/plugins.rs | 50 +++++++++++++--------- influxdb3_server/src/http.rs | 2 +- 6 files changed, 98 insertions(+), 42 deletions(-) diff --git a/influxdb3/src/commands/create.rs b/influxdb3/src/commands/create.rs index d9b97cc7d68..31876a980ce 100644 --- a/influxdb3/src/commands/create.rs +++ b/influxdb3/src/commands/create.rs @@ -358,7 +358,8 @@ pub async fn command(config: Config) -> Result<(), Box> { .collect::>() }); - client + //println!("does this work?"); + match client .api_v3_configure_processing_engine_trigger_create( database_name, &trigger_name, @@ -367,8 +368,14 @@ pub async fn command(config: Config) -> Result<(), Box> { trigger_arguments, disabled, ) - .await?; - println!("Trigger {} created successfully", trigger_name); + .await + { + Err(e) => { + eprintln!("Failed to create trigger: {}", e); + return Err(e.into()); + } + Ok(_) => println!("Trigger {} created successfully", trigger_name), + } } } Ok(()) diff --git a/influxdb3/tests/server/cli.rs b/influxdb3/tests/server/cli.rs index f772c191caa..df59ab1d25b 100644 --- a/influxdb3/tests/server/cli.rs +++ b/influxdb3/tests/server/cli.rs @@ -1374,7 +1374,7 @@ async fn test_load_wal_plugin_from_gh() { let db_name = "foo"; // this will pull from https://github.com/influxdata/influxdb3_plugins/blob/main/examples/wal_plugin/wal_plugin.py - let plugin_name = "gh:examples/wal_plugin"; + let plugin_name = "gh:examples/wal_plugin/wal_plugin.py"; // Run the test to make sure it'll load from GH let result = run_with_confirmation(&[ @@ -1530,6 +1530,42 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_ assert_eq!(body, json!({"status": "updated"})); } +#[cfg(feature = "system-py")] +#[test_log::test(tokio::test)] +async fn test_trigger_create_validates_file_present() { + let plugin_dir = TempDir::new().unwrap(); + + let server = TestServer::configure() + .with_plugin_dir(plugin_dir.path().to_str().unwrap()) + .spawn() + .await; + let server_addr = server.client_addr(); + let db_name = "foo"; + + // Setup: create database and plugin + run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); + + let trigger_path = "foo"; + // creating the trigger should return a 404 error from github + let result = run_with_confirmation_and_err(&[ + "create", + "trigger", + "--database", + db_name, + "--host", + &server_addr, + "--plugin-filename", + "gh:not_a_file.py", + "--trigger-spec", + "request:foo", + "--trigger-arguments", + "test_arg=hello", + trigger_path, + ]); + debug!(result = ?result, "create trigger"); + assert_contains!(&result, "error reading file from Github: 404 Not Found"); +} + #[test_log::test(tokio::test)] async fn write_and_query_via_stdin() { let server = TestServer::spawn().await; diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index 29ded6f2005..54fd786c9a8 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -1,7 +1,7 @@ use crate::manager::{ProcessingEngineError, ProcessingEngineManager}; -use crate::plugins::Error; #[cfg(feature = "system-py")] use crate::plugins::PluginContext; +use crate::plugins::PluginError; use anyhow::Context; use bytes::Bytes; use hashbrown::HashMap; @@ -219,22 +219,23 @@ impl ProcessingEngineManagerImpl { } } - pub async fn read_plugin_code(&self, name: &str) -> Result { + pub async fn read_plugin_code(&self, name: &str) -> Result { // if the name starts with gh: then we need to get it from the public github repo at https://github.com/influxdata/influxdb3_plugins/tree/main if name.starts_with("gh:") { let plugin_path = name.strip_prefix("gh:").unwrap(); - // the filename should be the last part of the name after the last / - let plugin_name = plugin_path - .split('/') - .last() - .context("plugin name for github plugins must be /")?; let url = format!( - "https://raw.githubusercontent.com/influxdata/influxdb3_plugins/main/{}/{}.py", - plugin_path, plugin_name + "https://raw.githubusercontent.com/influxdata/influxdb3_plugins/main/{}", + plugin_path ); let resp = reqwest::get(&url) .await .context("error getting plugin from github repo")?; + + // verify the response is a success + if !resp.status().is_success() { + return Err(PluginError::FetchingFromGithub(resp.status(), url)); + } + let resp_body = resp .text() .await @@ -326,6 +327,10 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { let Some((db_id, db_schema)) = self.catalog.db_id_and_schema(db_name) else { return Err(ProcessingEngineError::DatabaseNotFound(db_name.to_string())); }; + + // validate that we can actually read the plugin file + self.read_plugin_code(&plugin_filename).await?; + let catalog_op = CatalogOp::CreateTrigger(TriggerDefinition { trigger_name, plugin_filename, @@ -597,7 +602,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { &self, request: WalPluginTestRequest, query_executor: Arc, - ) -> Result { + ) -> Result { #[cfg(feature = "system-py")] { // create a copy of the catalog so we don't modify the original @@ -619,7 +624,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { } #[cfg(not(feature = "system-py"))] - Err(plugins::Error::AnyhowError(anyhow::anyhow!( + Err(plugins::PluginError::AnyhowError(anyhow::anyhow!( "system-py feature not enabled" ))) } @@ -629,7 +634,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { &self, request: SchedulePluginTestRequest, query_executor: Arc, - ) -> Result { + ) -> Result { #[cfg(feature = "system-py")] { // create a copy of the catalog so we don't modify the original @@ -657,7 +662,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { } #[cfg(not(feature = "system-py"))] - Err(plugins::Error::AnyhowError(anyhow::anyhow!( + Err(plugins::PluginError::AnyhowError(anyhow::anyhow!( "system-py feature not enabled" ))) } diff --git a/influxdb3_processing_engine/src/manager.rs b/influxdb3_processing_engine/src/manager.rs index a7a0b8cc094..251aae4fd20 100644 --- a/influxdb3_processing_engine/src/manager.rs +++ b/influxdb3_processing_engine/src/manager.rs @@ -33,7 +33,7 @@ pub enum ProcessingEngineError { PluginNotFound(String), #[error("plugin error: {0}")] - PluginError(#[from] crate::plugins::Error), + PluginError(#[from] crate::plugins::PluginError), #[error("failed to shutdown trigger {trigger_name} in database {database}")] TriggerShutdownError { @@ -99,13 +99,13 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static { &self, request: WalPluginTestRequest, query_executor: Arc, - ) -> Result; + ) -> Result; async fn test_schedule_plugin( &self, request: SchedulePluginTestRequest, query_executor: Arc, - ) -> Result; + ) -> Result; async fn request_trigger( &self, diff --git a/influxdb3_processing_engine/src/plugins.rs b/influxdb3_processing_engine/src/plugins.rs index 44076cdd9ef..a47e0aeffed 100644 --- a/influxdb3_processing_engine/src/plugins.rs +++ b/influxdb3_processing_engine/src/plugins.rs @@ -31,7 +31,7 @@ use thiserror::Error; use tokio::sync::mpsc; #[derive(Debug, Error)] -pub enum Error { +pub enum PluginError { #[error("invalid database {0}")] InvalidDatabase(String), @@ -68,6 +68,9 @@ pub enum Error { #[error("non-schedule plugin with schedule trigger: {0}")] NonSchedulePluginWithScheduleTrigger(String), + + #[error("error reading file from Github: {0} {1}")] + FetchingFromGithub(reqwest::StatusCode, String), } #[cfg(feature = "system-py")] @@ -101,11 +104,11 @@ pub(crate) fn run_schedule_plugin( time_provider: Arc, context: PluginContext, plugin_receiver: mpsc::Receiver, -) -> Result<(), Error> { +) -> Result<(), PluginError> { // Ensure that the plugin is a schedule plugin let plugin_type = trigger_definition.trigger.plugin_type(); if !matches!(plugin_type, influxdb3_wal::PluginType::Schedule) { - return Err(Error::NonSchedulePluginWithScheduleTrigger(format!( + return Err(PluginError::NonSchedulePluginWithScheduleTrigger(format!( "{:?}", trigger_definition ))); @@ -201,7 +204,7 @@ mod python_plugin { pub(crate) async fn run_wal_contents_plugin( &self, mut receiver: Receiver, - ) -> Result<(), Error> { + ) -> Result<(), PluginError> { info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_filename, "starting wal contents plugin"); loop { @@ -220,7 +223,7 @@ mod python_plugin { } } WalEvent::Shutdown(sender) => { - sender.send(()).map_err(|_| Error::FailedToShutdown)?; + sender.send(()).map_err(|_| PluginError::FailedToShutdown)?; break; } } @@ -234,7 +237,7 @@ mod python_plugin { mut receiver: Receiver, mut runner: ScheduleTriggerRunner, time_provider: Arc, - ) -> Result<(), Error> { + ) -> Result<(), PluginError> { loop { let Some(next_run_instant) = runner.next_run_time() else { break; @@ -243,7 +246,7 @@ mod python_plugin { tokio::select! { _ = time_provider.sleep_until(next_run_instant) => { let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else { - return Err(Error::MissingDb); + return Err(PluginError::MissingDb); }; runner.run_at_time(self, schema).await?; } @@ -254,7 +257,7 @@ mod python_plugin { break; } Some(ScheduleEvent::Shutdown(sender)) => { - sender.send(()).map_err(|_| Error::FailedToShutdown)?; + sender.send(()).map_err(|_| PluginError::FailedToShutdown)?; break; } } @@ -268,7 +271,7 @@ mod python_plugin { pub(crate) async fn run_request_plugin( &self, mut receiver: Receiver, - ) -> Result<(), Error> { + ) -> Result<(), PluginError> { info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_filename, "starting request plugin"); loop { @@ -282,7 +285,7 @@ mod python_plugin { self.write_buffer.catalog().db_schema(self.db_name.as_str()) else { error!(?self.trigger_definition, "missing db schema"); - return Err(Error::MissingDb); + return Err(PluginError::MissingDb); }; let result = execute_request_trigger( self.plugin_code.code().as_ref(), @@ -340,7 +343,7 @@ mod python_plugin { } } Some(RequestEvent::Shutdown(sender)) => { - sender.send(()).map_err(|_| Error::FailedToShutdown)?; + sender.send(()).map_err(|_| PluginError::FailedToShutdown)?; break; } } @@ -349,9 +352,12 @@ mod python_plugin { Ok(()) } - async fn process_wal_contents(&self, wal_contents: Arc) -> Result<(), Error> { + async fn process_wal_contents( + &self, + wal_contents: Arc, + ) -> Result<(), PluginError> { let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else { - return Err(Error::MissingDb); + return Err(PluginError::MissingDb); }; for wal_op in &wal_contents.ops { @@ -481,7 +487,7 @@ mod python_plugin { pub(crate) fn try_new( trigger_spec: &TriggerSpecificationDefinition, time_provider: Arc, - ) -> Result { + ) -> Result { match trigger_spec { TriggerSpecificationDefinition::AllTablesWalWrite | TriggerSpecificationDefinition::SingleTableWalWrite { .. } => { @@ -538,7 +544,7 @@ mod python_plugin { &mut self, plugin: &TriggerPlugin, db_schema: Arc, - ) -> Result<(), Error> { + ) -> Result<(), PluginError> { let Some(trigger_time) = self.next_trigger_time else { return Err(anyhow!("running a cron trigger that is finished.").into()); }; @@ -584,7 +590,7 @@ pub(crate) fn run_test_wal_plugin( query_executor: Arc, code: String, request: WalPluginTestRequest, -) -> Result { +) -> Result { use data_types::NamespaceName; use influxdb3_wal::Gen1Duration; use influxdb3_write::write_buffer::validator::WriteValidator; @@ -592,7 +598,7 @@ pub(crate) fn run_test_wal_plugin( let database = request.database; let namespace = NamespaceName::new(database.clone()) - .map_err(|_e| Error::InvalidDatabase(database.clone()))?; + .map_err(|_e| PluginError::InvalidDatabase(database.clone()))?; // parse the lp into a write batch let validator = WriteValidator::initialize( namespace.clone(), @@ -606,7 +612,7 @@ pub(crate) fn run_test_wal_plugin( Precision::Nanosecond, )?; let data = data.convert_lines_to_buffer(Gen1Duration::new_1m()); - let db = catalog.db_schema(&database).ok_or(Error::MissingDb)?; + let db = catalog.db_schema(&database).ok_or(PluginError::MissingDb)?; let plugin_return_state = influxdb3_py_api::system_py::execute_python_with_batch( &code, @@ -722,15 +728,17 @@ pub(crate) fn run_test_schedule_plugin( query_executor: Arc, code: String, request: influxdb3_client::plugin_development::SchedulePluginTestRequest, -) -> Result { +) -> Result { let database = request.database; - let db = catalog.db_schema(&database).ok_or(Error::MissingDb)?; + let db = catalog.db_schema(&database).ok_or(PluginError::MissingDb)?; let cron_schedule = request.schedule.as_deref().unwrap_or("* * * * * *"); let schedule = cron::Schedule::from_str(cron_schedule)?; let Some(schedule_time) = schedule.after(&now_time.date_time()).next() else { - return Err(Error::CronScheduleNeverTriggers(cron_schedule.to_string())); + return Err(PluginError::CronScheduleNeverTriggers( + cron_schedule.to_string(), + )); }; let plugin_return_state = influxdb3_py_api::system_py::execute_schedule_trigger( diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 730ac992d5d..e1299aa03f4 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -220,7 +220,7 @@ pub enum Error { PythonPluginsNotEnabled, #[error("Plugin error: {0}")] - Plugin(#[from] influxdb3_processing_engine::plugins::Error), + Plugin(#[from] influxdb3_processing_engine::plugins::PluginError), #[error("Processing engine error: {0}")] ProcessingEngine(#[from] influxdb3_processing_engine::manager::ProcessingEngineError),