Skip to content

Commit

Permalink
feat: Update trigger creation to validate plugin file present
Browse files Browse the repository at this point in the history
This updates trigger creation to load the plugin file before creating the trigger.

Another small change is to make Github references use filenames and paths identical to what they would be in the plugin-dir. This makes it a little easier to have the plugins repo local and develop against it and then be able to reference the same file later with gh: once it's up on the repo.
  • Loading branch information
pauldix committed Jan 27, 2025
1 parent d49276a commit 52b8956
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 42 deletions.
13 changes: 10 additions & 3 deletions influxdb3/src/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
.collect::<HashMap<String, String>>()
});

client
//println!("does this work?");
match client
.api_v3_configure_processing_engine_trigger_create(
database_name,
&trigger_name,
Expand All @@ -367,8 +368,14 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
trigger_arguments,
disabled,
)
.await?;
println!("Trigger {} created successfully", trigger_name);
.await
{
Err(e) => {
eprintln!("Failed to create trigger: {}", e);
return Err(e.into());
}
Ok(_) => println!("Trigger {} created successfully", trigger_name),
}
}
}
Ok(())
Expand Down
38 changes: 37 additions & 1 deletion influxdb3/tests/server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,7 @@ async fn test_load_wal_plugin_from_gh() {
let db_name = "foo";

// this will pull from https://github.com/influxdata/influxdb3_plugins/blob/main/examples/wal_plugin/wal_plugin.py
let plugin_name = "gh:examples/wal_plugin";
let plugin_name = "gh:examples/wal_plugin/wal_plugin.py";

// Run the test to make sure it'll load from GH
let result = run_with_confirmation(&[
Expand Down Expand Up @@ -1530,6 +1530,42 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_
assert_eq!(body, json!({"status": "updated"}));
}

#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_trigger_create_validates_file_present() {
let plugin_dir = TempDir::new().unwrap();

let server = TestServer::configure()
.with_plugin_dir(plugin_dir.path().to_str().unwrap())
.spawn()
.await;
let server_addr = server.client_addr();
let db_name = "foo";

// Setup: create database and plugin
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);

let trigger_path = "foo";
// creating the trigger should return a 404 error from github
let result = run_with_confirmation_and_err(&[
"create",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-filename",
"gh:not_a_file.py",
"--trigger-spec",
"request:foo",
"--trigger-arguments",
"test_arg=hello",
trigger_path,
]);
debug!(result = ?result, "create trigger");
assert_contains!(&result, "error reading file from Github: 404 Not Found");
}

#[test_log::test(tokio::test)]
async fn write_and_query_via_stdin() {
let server = TestServer::spawn().await;
Expand Down
31 changes: 18 additions & 13 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::manager::{ProcessingEngineError, ProcessingEngineManager};
use crate::plugins::Error;
#[cfg(feature = "system-py")]
use crate::plugins::PluginContext;
use crate::plugins::PluginError;
use anyhow::Context;
use bytes::Bytes;
use hashbrown::HashMap;
Expand Down Expand Up @@ -219,22 +219,23 @@ impl ProcessingEngineManagerImpl {
}
}

pub async fn read_plugin_code(&self, name: &str) -> Result<PluginCode, plugins::Error> {
pub async fn read_plugin_code(&self, name: &str) -> Result<PluginCode, plugins::PluginError> {
// if the name starts with gh: then we need to get it from the public github repo at https://github.com/influxdata/influxdb3_plugins/tree/main
if name.starts_with("gh:") {
let plugin_path = name.strip_prefix("gh:").unwrap();
// the filename should be the last part of the name after the last /
let plugin_name = plugin_path
.split('/')
.last()
.context("plugin name for github plugins must be <dir>/<name>")?;
let url = format!(
"https://raw.githubusercontent.com/influxdata/influxdb3_plugins/main/{}/{}.py",
plugin_path, plugin_name
"https://raw.githubusercontent.com/influxdata/influxdb3_plugins/main/{}",
plugin_path
);
let resp = reqwest::get(&url)
.await
.context("error getting plugin from github repo")?;

// verify the response is a success
if !resp.status().is_success() {
return Err(PluginError::FetchingFromGithub(resp.status(), url));
}

let resp_body = resp
.text()
.await
Expand Down Expand Up @@ -326,6 +327,10 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
let Some((db_id, db_schema)) = self.catalog.db_id_and_schema(db_name) else {
return Err(ProcessingEngineError::DatabaseNotFound(db_name.to_string()));
};

// validate that we can actually read the plugin file
self.read_plugin_code(&plugin_filename).await?;

let catalog_op = CatalogOp::CreateTrigger(TriggerDefinition {
trigger_name,
plugin_filename,
Expand Down Expand Up @@ -597,7 +602,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
&self,
request: WalPluginTestRequest,
query_executor: Arc<dyn QueryExecutor>,
) -> Result<WalPluginTestResponse, plugins::Error> {
) -> Result<WalPluginTestResponse, plugins::PluginError> {
#[cfg(feature = "system-py")]
{
// create a copy of the catalog so we don't modify the original
Expand All @@ -619,7 +624,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
}

#[cfg(not(feature = "system-py"))]
Err(plugins::Error::AnyhowError(anyhow::anyhow!(
Err(plugins::PluginError::AnyhowError(anyhow::anyhow!(
"system-py feature not enabled"
)))
}
Expand All @@ -629,7 +634,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
&self,
request: SchedulePluginTestRequest,
query_executor: Arc<dyn QueryExecutor>,
) -> Result<SchedulePluginTestResponse, Error> {
) -> Result<SchedulePluginTestResponse, PluginError> {
#[cfg(feature = "system-py")]
{
// create a copy of the catalog so we don't modify the original
Expand Down Expand Up @@ -657,7 +662,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
}

#[cfg(not(feature = "system-py"))]
Err(plugins::Error::AnyhowError(anyhow::anyhow!(
Err(plugins::PluginError::AnyhowError(anyhow::anyhow!(
"system-py feature not enabled"
)))
}
Expand Down
6 changes: 3 additions & 3 deletions influxdb3_processing_engine/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub enum ProcessingEngineError {
PluginNotFound(String),

#[error("plugin error: {0}")]
PluginError(#[from] crate::plugins::Error),
PluginError(#[from] crate::plugins::PluginError),

#[error("failed to shutdown trigger {trigger_name} in database {database}")]
TriggerShutdownError {
Expand Down Expand Up @@ -99,13 +99,13 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
&self,
request: WalPluginTestRequest,
query_executor: Arc<dyn QueryExecutor>,
) -> Result<WalPluginTestResponse, crate::plugins::Error>;
) -> Result<WalPluginTestResponse, crate::plugins::PluginError>;

async fn test_schedule_plugin(
&self,
request: SchedulePluginTestRequest,
query_executor: Arc<dyn QueryExecutor>,
) -> Result<SchedulePluginTestResponse, crate::plugins::Error>;
) -> Result<SchedulePluginTestResponse, crate::plugins::PluginError>;

async fn request_trigger(
&self,
Expand Down
50 changes: 29 additions & 21 deletions influxdb3_processing_engine/src/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use thiserror::Error;
use tokio::sync::mpsc;

#[derive(Debug, Error)]
pub enum Error {
pub enum PluginError {
#[error("invalid database {0}")]
InvalidDatabase(String),

Expand Down Expand Up @@ -68,6 +68,9 @@ pub enum Error {

#[error("non-schedule plugin with schedule trigger: {0}")]
NonSchedulePluginWithScheduleTrigger(String),

#[error("error reading file from Github: {0} {1}")]
FetchingFromGithub(reqwest::StatusCode, String),
}

#[cfg(feature = "system-py")]
Expand Down Expand Up @@ -101,11 +104,11 @@ pub(crate) fn run_schedule_plugin(
time_provider: Arc<dyn TimeProvider>,
context: PluginContext,
plugin_receiver: mpsc::Receiver<ScheduleEvent>,
) -> Result<(), Error> {
) -> Result<(), PluginError> {
// Ensure that the plugin is a schedule plugin
let plugin_type = trigger_definition.trigger.plugin_type();
if !matches!(plugin_type, influxdb3_wal::PluginType::Schedule) {
return Err(Error::NonSchedulePluginWithScheduleTrigger(format!(
return Err(PluginError::NonSchedulePluginWithScheduleTrigger(format!(
"{:?}",
trigger_definition
)));
Expand Down Expand Up @@ -201,7 +204,7 @@ mod python_plugin {
pub(crate) async fn run_wal_contents_plugin(
&self,
mut receiver: Receiver<WalEvent>,
) -> Result<(), Error> {
) -> Result<(), PluginError> {
info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_filename, "starting wal contents plugin");

loop {
Expand All @@ -220,7 +223,7 @@ mod python_plugin {
}
}
WalEvent::Shutdown(sender) => {
sender.send(()).map_err(|_| Error::FailedToShutdown)?;
sender.send(()).map_err(|_| PluginError::FailedToShutdown)?;
break;
}
}
Expand All @@ -234,7 +237,7 @@ mod python_plugin {
mut receiver: Receiver<ScheduleEvent>,
mut runner: ScheduleTriggerRunner,
time_provider: Arc<dyn TimeProvider>,
) -> Result<(), Error> {
) -> Result<(), PluginError> {
loop {
let Some(next_run_instant) = runner.next_run_time() else {
break;
Expand All @@ -243,7 +246,7 @@ mod python_plugin {
tokio::select! {
_ = time_provider.sleep_until(next_run_instant) => {
let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else {
return Err(Error::MissingDb);
return Err(PluginError::MissingDb);
};
runner.run_at_time(self, schema).await?;
}
Expand All @@ -254,7 +257,7 @@ mod python_plugin {
break;
}
Some(ScheduleEvent::Shutdown(sender)) => {
sender.send(()).map_err(|_| Error::FailedToShutdown)?;
sender.send(()).map_err(|_| PluginError::FailedToShutdown)?;
break;
}
}
Expand All @@ -268,7 +271,7 @@ mod python_plugin {
pub(crate) async fn run_request_plugin(
&self,
mut receiver: Receiver<RequestEvent>,
) -> Result<(), Error> {
) -> Result<(), PluginError> {
info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_filename, "starting request plugin");

loop {
Expand All @@ -282,7 +285,7 @@ mod python_plugin {
self.write_buffer.catalog().db_schema(self.db_name.as_str())
else {
error!(?self.trigger_definition, "missing db schema");
return Err(Error::MissingDb);
return Err(PluginError::MissingDb);
};
let result = execute_request_trigger(
self.plugin_code.code().as_ref(),
Expand Down Expand Up @@ -340,7 +343,7 @@ mod python_plugin {
}
}
Some(RequestEvent::Shutdown(sender)) => {
sender.send(()).map_err(|_| Error::FailedToShutdown)?;
sender.send(()).map_err(|_| PluginError::FailedToShutdown)?;
break;
}
}
Expand All @@ -349,9 +352,12 @@ mod python_plugin {
Ok(())
}

async fn process_wal_contents(&self, wal_contents: Arc<WalContents>) -> Result<(), Error> {
async fn process_wal_contents(
&self,
wal_contents: Arc<WalContents>,
) -> Result<(), PluginError> {
let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else {
return Err(Error::MissingDb);
return Err(PluginError::MissingDb);
};

for wal_op in &wal_contents.ops {
Expand Down Expand Up @@ -481,7 +487,7 @@ mod python_plugin {
pub(crate) fn try_new(
trigger_spec: &TriggerSpecificationDefinition,
time_provider: Arc<dyn TimeProvider>,
) -> Result<Self, Error> {
) -> Result<Self, PluginError> {
match trigger_spec {
TriggerSpecificationDefinition::AllTablesWalWrite
| TriggerSpecificationDefinition::SingleTableWalWrite { .. } => {
Expand Down Expand Up @@ -538,7 +544,7 @@ mod python_plugin {
&mut self,
plugin: &TriggerPlugin,
db_schema: Arc<DatabaseSchema>,
) -> Result<(), Error> {
) -> Result<(), PluginError> {
let Some(trigger_time) = self.next_trigger_time else {
return Err(anyhow!("running a cron trigger that is finished.").into());
};
Expand Down Expand Up @@ -584,15 +590,15 @@ pub(crate) fn run_test_wal_plugin(
query_executor: Arc<dyn QueryExecutor>,
code: String,
request: WalPluginTestRequest,
) -> Result<WalPluginTestResponse, Error> {
) -> Result<WalPluginTestResponse, PluginError> {
use data_types::NamespaceName;
use influxdb3_wal::Gen1Duration;
use influxdb3_write::write_buffer::validator::WriteValidator;
use influxdb3_write::Precision;

let database = request.database;
let namespace = NamespaceName::new(database.clone())
.map_err(|_e| Error::InvalidDatabase(database.clone()))?;
.map_err(|_e| PluginError::InvalidDatabase(database.clone()))?;
// parse the lp into a write batch
let validator = WriteValidator::initialize(
namespace.clone(),
Expand All @@ -606,7 +612,7 @@ pub(crate) fn run_test_wal_plugin(
Precision::Nanosecond,
)?;
let data = data.convert_lines_to_buffer(Gen1Duration::new_1m());
let db = catalog.db_schema(&database).ok_or(Error::MissingDb)?;
let db = catalog.db_schema(&database).ok_or(PluginError::MissingDb)?;

let plugin_return_state = influxdb3_py_api::system_py::execute_python_with_batch(
&code,
Expand Down Expand Up @@ -722,15 +728,17 @@ pub(crate) fn run_test_schedule_plugin(
query_executor: Arc<dyn QueryExecutor>,
code: String,
request: influxdb3_client::plugin_development::SchedulePluginTestRequest,
) -> Result<influxdb3_client::plugin_development::SchedulePluginTestResponse, Error> {
) -> Result<influxdb3_client::plugin_development::SchedulePluginTestResponse, PluginError> {
let database = request.database;
let db = catalog.db_schema(&database).ok_or(Error::MissingDb)?;
let db = catalog.db_schema(&database).ok_or(PluginError::MissingDb)?;

let cron_schedule = request.schedule.as_deref().unwrap_or("* * * * * *");

let schedule = cron::Schedule::from_str(cron_schedule)?;
let Some(schedule_time) = schedule.after(&now_time.date_time()).next() else {
return Err(Error::CronScheduleNeverTriggers(cron_schedule.to_string()));
return Err(PluginError::CronScheduleNeverTriggers(
cron_schedule.to_string(),
));
};

let plugin_return_state = influxdb3_py_api::system_py::execute_schedule_trigger(
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ pub enum Error {
PythonPluginsNotEnabled,

#[error("Plugin error: {0}")]
Plugin(#[from] influxdb3_processing_engine::plugins::Error),
Plugin(#[from] influxdb3_processing_engine::plugins::PluginError),

#[error("Processing engine error: {0}")]
ProcessingEngine(#[from] influxdb3_processing_engine::manager::ProcessingEngineError),
Expand Down

0 comments on commit 52b8956

Please sign in to comment.