diff --git a/influxdb3/tests/server/cli.rs b/influxdb3/tests/server/cli.rs index fb0e96577e3..38bcde22c14 100644 --- a/influxdb3/tests/server/cli.rs +++ b/influxdb3/tests/server/cli.rs @@ -1660,4 +1660,33 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_ .unwrap(); assert_eq!(val, json!([{"tag1": "tag1_value", "field1": 1}])); + + // now update it to make sure that it reloads + let plugin_code = r#" +import json + +def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): + return 200, {"Content-Type": "application/json"}, json.dumps({"status": "updated"}) +"#; + // clear all bytes from the plugin file + plugin_file.reopen().unwrap().set_len(0).unwrap(); + plugin_file + .reopen() + .unwrap() + .write_all(plugin_code.as_bytes()) + .unwrap(); + + // send an HTTP request to the server + let response = client + .post(format!("{}/api/v3/engine/foo", server_addr)) + .header("Content-Type", "application/json") + .query(&[("q1", "whatevs")]) + .body(r#"{"hello": "world"}"#) + .send() + .await + .unwrap(); + + let body = response.text().await.unwrap(); + let body = serde_json::from_str::(&body).unwrap(); + assert_eq!(body, json!({"status": "updated"})); } diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index a0739782102..a0ac0f8ee8b 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -23,7 +23,9 @@ use influxdb3_write::WriteBuffer; use iox_time::TimeProvider; use observability_deps::tracing::{debug, error, warn}; use std::any::Any; -use std::sync::Arc; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use std::time::SystemTime; use tokio::sync::oneshot::Receiver; use tokio::sync::{mpsc, oneshot, RwLock}; @@ -169,7 +171,7 @@ impl ProcessingEngineManagerImpl { } } - pub async fn read_plugin_code(&self, name: &str) -> Result { + pub async fn read_plugin_code(&self, name: &str) -> Result { // if the name starts with gh: then we need to get it from the public github repo at https://github.com/influxdata/influxdb3_plugins/tree/main if name.starts_with("gh:") { let plugin_path = name.strip_prefix("gh:").unwrap(); @@ -189,13 +191,76 @@ impl ProcessingEngineManagerImpl { .text() .await .context("error reading plugin from github repo")?; - return Ok(resp_body); + return Ok(PluginCode::Github(Arc::from(resp_body))); } // otherwise we assume it is a local file let plugin_dir = self.plugin_dir.clone().context("plugin dir not set")?; - let path = plugin_dir.join(name); - Ok(std::fs::read_to_string(path)?) + let plugin_path = plugin_dir.join(name); + + // read it at least once to make sure it's there + let code = std::fs::read_to_string(plugin_path.clone())?; + + // now we can return it + Ok(PluginCode::Local(LocalPlugin { + plugin_path, + last_read_and_code: Mutex::new((SystemTime::now(), Arc::from(code))), + })) + } +} + +#[derive(Debug)] +pub enum PluginCode { + Github(Arc), + Local(LocalPlugin), +} + +impl PluginCode { + #[cfg(feature = "system-py")] + pub(crate) fn code(&self) -> Arc { + match self { + PluginCode::Github(code) => Arc::clone(code), + PluginCode::Local(plugin) => plugin.read_if_modified(), + } + } +} + +#[allow(dead_code)] +#[derive(Debug)] +pub struct LocalPlugin { + plugin_path: PathBuf, + last_read_and_code: Mutex<(SystemTime, Arc)>, +} + +impl LocalPlugin { + #[cfg(feature = "system-py")] + fn read_if_modified(&self) -> Arc { + let metadata = std::fs::metadata(&self.plugin_path); + + let mut last_read_and_code = self.last_read_and_code.lock().unwrap(); + let (last_read, code) = &mut *last_read_and_code; + + match metadata { + Ok(metadata) => { + let is_modified = match metadata.modified() { + Ok(modified) => modified > *last_read, + Err(_) => true, // if we can't get the modified time, assume it is modified + }; + + if is_modified { + // attempt to read the code, if it fails we will return the last known code + if let Ok(new_code) = std::fs::read_to_string(&self.plugin_path) { + *last_read = SystemTime::now(); + *code = Arc::from(new_code); + } else { + error!("error reading plugin file {:?}", self.plugin_path); + } + } + + Arc::clone(code) + } + Err(_) => Arc::clone(code), + } } } @@ -555,13 +620,15 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { let now = self.time_provider.now(); let code = self.read_plugin_code(&request.filename).await?; + let code_string = code.code().to_string(); - let res = plugins::run_test_wal_plugin(now, catalog, query_executor, code, request) - .unwrap_or_else(|e| WalPluginTestResponse { - log_lines: vec![], - database_writes: Default::default(), - errors: vec![e.to_string()], - }); + let res = + plugins::run_test_wal_plugin(now, catalog, query_executor, code_string, request) + .unwrap_or_else(|e| WalPluginTestResponse { + log_lines: vec![], + database_writes: Default::default(), + errors: vec![e.to_string()], + }); return Ok(res); } @@ -585,15 +652,21 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { let now = self.time_provider.now(); let code = self.read_plugin_code(&request.filename).await?; + let code_string = code.code().to_string(); - let res = - plugins::run_test_schedule_plugin(now, catalog, query_executor, code, request) - .unwrap_or_else(|e| SchedulePluginTestResponse { - log_lines: vec![], - database_writes: Default::default(), - errors: vec![e.to_string()], - trigger_time: None, - }); + let res = plugins::run_test_schedule_plugin( + now, + catalog, + query_executor, + code_string, + request, + ) + .unwrap_or_else(|e| SchedulePluginTestResponse { + log_lines: vec![], + database_writes: Default::default(), + errors: vec![e.to_string()], + trigger_time: None, + }); return Ok(res); } diff --git a/influxdb3_processing_engine/src/plugins.rs b/influxdb3_processing_engine/src/plugins.rs index 9a390559c46..e9f91796662 100644 --- a/influxdb3_processing_engine/src/plugins.rs +++ b/influxdb3_processing_engine/src/plugins.rs @@ -1,17 +1,26 @@ #[cfg(feature = "system-py")] +use crate::PluginCode; +#[cfg(feature = "system-py")] use crate::PluginEvent; +use data_types::NamespaceName; +use hashbrown::HashMap; use influxdb3_catalog::catalog::Catalog; #[cfg(feature = "system-py")] use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; #[cfg(feature = "system-py")] use influxdb3_internal_api::query_executor::QueryExecutor; +use influxdb3_wal::Gen1Duration; #[cfg(feature = "system-py")] use influxdb3_wal::TriggerDefinition; #[cfg(feature = "system-py")] use influxdb3_wal::TriggerSpecificationDefinition; use influxdb3_write::write_buffer; +use influxdb3_write::write_buffer::validator::WriteValidator; +use influxdb3_write::Precision; #[cfg(feature = "system-py")] use influxdb3_write::WriteBuffer; +#[cfg(feature = "system-py")] +use iox_time::TimeProvider; use observability_deps::tracing::error; use std::fmt::Debug; #[cfg(feature = "system-py")] @@ -21,14 +30,6 @@ use thiserror::Error; #[cfg(feature = "system-py")] use tokio::sync::mpsc; -use data_types::NamespaceName; -use hashbrown::HashMap; -use influxdb3_wal::Gen1Duration; -use influxdb3_write::write_buffer::validator::WriteValidator; -use influxdb3_write::Precision; -#[cfg(feature = "system-py")] -use iox_time::TimeProvider; - #[derive(Debug, Error)] pub enum Error { #[error("invalid database {0}")] @@ -69,7 +70,7 @@ pub enum Error { #[cfg(feature = "system-py")] pub(crate) fn run_wal_contents_plugin( db_name: String, - plugin_code: String, + plugin_code: PluginCode, trigger_definition: TriggerDefinition, context: PluginContext, ) { @@ -91,7 +92,7 @@ pub(crate) fn run_wal_contents_plugin( #[cfg(feature = "system-py")] pub(crate) fn run_schedule_plugin( db_name: String, - plugin_code: String, + plugin_code: PluginCode, trigger_definition: TriggerDefinition, time_provider: Arc, context: PluginContext, @@ -119,7 +120,7 @@ pub(crate) fn run_schedule_plugin( #[cfg(feature = "system-py")] pub(crate) fn run_request_plugin( db_name: String, - plugin_code: String, + plugin_code: PluginCode, trigger_definition: TriggerDefinition, context: PluginContext, ) { @@ -152,7 +153,7 @@ pub(crate) struct PluginContext { #[derive(Debug)] struct TriggerPlugin { trigger_definition: TriggerDefinition, - plugin_code: String, + plugin_code: PluginCode, db_name: String, write_buffer: Arc, query_executor: Arc, @@ -283,7 +284,7 @@ mod python_plugin { return Err(Error::MissingDb); }; let result = execute_request_trigger( - &self.plugin_code, + self.plugin_code.code().as_ref(), Arc::clone(&schema), Arc::clone(&self.query_executor), &self.trigger_definition.trigger_arguments, @@ -386,7 +387,7 @@ mod python_plugin { }; let result = execute_python_with_batch( - &self.plugin_code, + self.plugin_code.code().as_ref(), write_batch, Arc::clone(&schema), Arc::clone(&self.query_executor), @@ -483,7 +484,7 @@ mod python_plugin { }; let result = execute_schedule_trigger( - &plugin.plugin_code, + plugin.plugin_code.code().as_ref(), trigger_time, Arc::clone(&db_schema), Arc::clone(&plugin.query_executor),