Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reloads plugins if modified #25865

Merged
merged 1 commit into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions influxdb3/tests/server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1660,4 +1660,33 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_
.unwrap();

assert_eq!(val, json!([{"tag1": "tag1_value", "field1": 1}]));

// now update it to make sure that it reloads
let plugin_code = r#"
import json

def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
return 200, {"Content-Type": "application/json"}, json.dumps({"status": "updated"})
"#;
// clear all bytes from the plugin file
plugin_file.reopen().unwrap().set_len(0).unwrap();
plugin_file
.reopen()
.unwrap()
.write_all(plugin_code.as_bytes())
.unwrap();

// send an HTTP request to the server
let response = client
.post(format!("{}/api/v3/engine/foo", server_addr))
.header("Content-Type", "application/json")
.query(&[("q1", "whatevs")])
.body(r#"{"hello": "world"}"#)
.send()
.await
.unwrap();

let body = response.text().await.unwrap();
let body = serde_json::from_str::<serde_json::Value>(&body).unwrap();
assert_eq!(body, json!({"status": "updated"}));
}
111 changes: 92 additions & 19 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use influxdb3_write::WriteBuffer;
use iox_time::TimeProvider;
use observability_deps::tracing::{debug, error, warn};
use std::any::Any;
use std::sync::Arc;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use tokio::sync::oneshot::Receiver;
use tokio::sync::{mpsc, oneshot, RwLock};

Expand Down Expand Up @@ -169,7 +171,7 @@ impl ProcessingEngineManagerImpl {
}
}

pub async fn read_plugin_code(&self, name: &str) -> Result<String, plugins::Error> {
pub async fn read_plugin_code(&self, name: &str) -> Result<PluginCode, plugins::Error> {
// if the name starts with gh: then we need to get it from the public github repo at https://github.com/influxdata/influxdb3_plugins/tree/main
if name.starts_with("gh:") {
let plugin_path = name.strip_prefix("gh:").unwrap();
Expand All @@ -189,13 +191,76 @@ impl ProcessingEngineManagerImpl {
.text()
.await
.context("error reading plugin from github repo")?;
return Ok(resp_body);
return Ok(PluginCode::Github(Arc::from(resp_body)));
}

// otherwise we assume it is a local file
let plugin_dir = self.plugin_dir.clone().context("plugin dir not set")?;
let path = plugin_dir.join(name);
Ok(std::fs::read_to_string(path)?)
let plugin_path = plugin_dir.join(name);

// read it at least once to make sure it's there
let code = std::fs::read_to_string(plugin_path.clone())?;

// now we can return it
Ok(PluginCode::Local(LocalPlugin {
plugin_path,
last_read_and_code: Mutex::new((SystemTime::now(), Arc::from(code))),
}))
}
}

#[derive(Debug)]
pub enum PluginCode {
Github(Arc<str>),
Local(LocalPlugin),
}

impl PluginCode {
#[cfg(feature = "system-py")]
pub(crate) fn code(&self) -> Arc<str> {
match self {
PluginCode::Github(code) => Arc::clone(code),
PluginCode::Local(plugin) => plugin.read_if_modified(),
}
}
}

#[allow(dead_code)]
#[derive(Debug)]
pub struct LocalPlugin {
plugin_path: PathBuf,
last_read_and_code: Mutex<(SystemTime, Arc<str>)>,
}

impl LocalPlugin {
#[cfg(feature = "system-py")]
fn read_if_modified(&self) -> Arc<str> {
let metadata = std::fs::metadata(&self.plugin_path);

let mut last_read_and_code = self.last_read_and_code.lock().unwrap();
let (last_read, code) = &mut *last_read_and_code;

match metadata {
Ok(metadata) => {
let is_modified = match metadata.modified() {
Ok(modified) => modified > *last_read,
Err(_) => true, // if we can't get the modified time, assume it is modified
};

if is_modified {
// attempt to read the code, if it fails we will return the last known code
if let Ok(new_code) = std::fs::read_to_string(&self.plugin_path) {
*last_read = SystemTime::now();
*code = Arc::from(new_code);
} else {
error!("error reading plugin file {:?}", self.plugin_path);
}
}

Arc::clone(code)
}
Err(_) => Arc::clone(code),
}
}
}

Expand Down Expand Up @@ -555,13 +620,15 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
let now = self.time_provider.now();

let code = self.read_plugin_code(&request.filename).await?;
let code_string = code.code().to_string();

let res = plugins::run_test_wal_plugin(now, catalog, query_executor, code, request)
.unwrap_or_else(|e| WalPluginTestResponse {
log_lines: vec![],
database_writes: Default::default(),
errors: vec![e.to_string()],
});
let res =
plugins::run_test_wal_plugin(now, catalog, query_executor, code_string, request)
.unwrap_or_else(|e| WalPluginTestResponse {
log_lines: vec![],
database_writes: Default::default(),
errors: vec![e.to_string()],
});

return Ok(res);
}
Expand All @@ -585,15 +652,21 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
let now = self.time_provider.now();

let code = self.read_plugin_code(&request.filename).await?;
let code_string = code.code().to_string();

let res =
plugins::run_test_schedule_plugin(now, catalog, query_executor, code, request)
.unwrap_or_else(|e| SchedulePluginTestResponse {
log_lines: vec![],
database_writes: Default::default(),
errors: vec![e.to_string()],
trigger_time: None,
});
let res = plugins::run_test_schedule_plugin(
now,
catalog,
query_executor,
code_string,
request,
)
.unwrap_or_else(|e| SchedulePluginTestResponse {
log_lines: vec![],
database_writes: Default::default(),
errors: vec![e.to_string()],
trigger_time: None,
});

return Ok(res);
}
Expand Down
31 changes: 16 additions & 15 deletions influxdb3_processing_engine/src/plugins.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
#[cfg(feature = "system-py")]
use crate::PluginCode;
#[cfg(feature = "system-py")]
use crate::PluginEvent;
use data_types::NamespaceName;
use hashbrown::HashMap;
use influxdb3_catalog::catalog::Catalog;
#[cfg(feature = "system-py")]
use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse};
#[cfg(feature = "system-py")]
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_wal::Gen1Duration;
#[cfg(feature = "system-py")]
use influxdb3_wal::TriggerDefinition;
#[cfg(feature = "system-py")]
use influxdb3_wal::TriggerSpecificationDefinition;
use influxdb3_write::write_buffer;
use influxdb3_write::write_buffer::validator::WriteValidator;
use influxdb3_write::Precision;
#[cfg(feature = "system-py")]
use influxdb3_write::WriteBuffer;
#[cfg(feature = "system-py")]
use iox_time::TimeProvider;
use observability_deps::tracing::error;
use std::fmt::Debug;
#[cfg(feature = "system-py")]
Expand All @@ -21,14 +30,6 @@ use thiserror::Error;
#[cfg(feature = "system-py")]
use tokio::sync::mpsc;

use data_types::NamespaceName;
use hashbrown::HashMap;
use influxdb3_wal::Gen1Duration;
use influxdb3_write::write_buffer::validator::WriteValidator;
use influxdb3_write::Precision;
#[cfg(feature = "system-py")]
use iox_time::TimeProvider;

#[derive(Debug, Error)]
pub enum Error {
#[error("invalid database {0}")]
Expand Down Expand Up @@ -69,7 +70,7 @@ pub enum Error {
#[cfg(feature = "system-py")]
pub(crate) fn run_wal_contents_plugin(
db_name: String,
plugin_code: String,
plugin_code: PluginCode,
trigger_definition: TriggerDefinition,
context: PluginContext,
) {
Expand All @@ -91,7 +92,7 @@ pub(crate) fn run_wal_contents_plugin(
#[cfg(feature = "system-py")]
pub(crate) fn run_schedule_plugin(
db_name: String,
plugin_code: String,
plugin_code: PluginCode,
trigger_definition: TriggerDefinition,
time_provider: Arc<dyn TimeProvider>,
context: PluginContext,
Expand Down Expand Up @@ -119,7 +120,7 @@ pub(crate) fn run_schedule_plugin(
#[cfg(feature = "system-py")]
pub(crate) fn run_request_plugin(
db_name: String,
plugin_code: String,
plugin_code: PluginCode,
trigger_definition: TriggerDefinition,
context: PluginContext,
) {
Expand Down Expand Up @@ -152,7 +153,7 @@ pub(crate) struct PluginContext {
#[derive(Debug)]
struct TriggerPlugin {
trigger_definition: TriggerDefinition,
plugin_code: String,
plugin_code: PluginCode,
db_name: String,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
Expand Down Expand Up @@ -283,7 +284,7 @@ mod python_plugin {
return Err(Error::MissingDb);
};
let result = execute_request_trigger(
&self.plugin_code,
self.plugin_code.code().as_ref(),
Arc::clone(&schema),
Arc::clone(&self.query_executor),
&self.trigger_definition.trigger_arguments,
Expand Down Expand Up @@ -386,7 +387,7 @@ mod python_plugin {
};

let result = execute_python_with_batch(
&self.plugin_code,
self.plugin_code.code().as_ref(),
write_batch,
Arc::clone(&schema),
Arc::clone(&self.query_executor),
Expand Down Expand Up @@ -483,7 +484,7 @@ mod python_plugin {
};

let result = execute_schedule_trigger(
&plugin.plugin_code,
plugin.plugin_code.code().as_ref(),
trigger_time,
Arc::clone(&db_schema),
Arc::clone(&plugin.query_executor),
Expand Down
Loading