Skip to content

Commit

Permalink
feat: return better plugin execution errors (#25842)
Browse files Browse the repository at this point in the history
* feat: return better plugin execution errors

This sets up the framework for fleshing out more useful plugin execution errors that get returned to the user during testing. We'll also want to capture these for logging in system tables.

Also fixes a test that was broken in previous commit on time limits. Didn't show up because of the feature flag.

* fix: compile errors without system-py feature
  • Loading branch information
pauldix authored Jan 16, 2025
1 parent 6ebbf26 commit 1f71750
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 29 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

117 changes: 112 additions & 5 deletions influxdb3/tests/server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use pretty_assertions::assert_eq;
use serde_json::{json, Value};
use test_helpers::assert_contains;
use test_helpers::tempfile::NamedTempFile;
#[cfg(feature = "system-py")]
use test_helpers::tempfile::TempDir;

pub fn run(args: &[&str]) -> String {
let process = Command::cargo_bin("influxdb3")
Expand Down Expand Up @@ -945,7 +947,7 @@ def process_writes(influxdb3_local, table_batches, args=None):
influxdb3_local.info("arg1: " + args["arg1"])
query_params = {"host": args["host"]}
query_result = influxdb3_local.query("SELECT * FROM cpu where host = $host", query_params)
query_result = influxdb3_local.query("SELECT host, region, usage FROM cpu where host = $host", query_params)
influxdb3_local.info("query result: " + str(query_result))
for table_batch in table_batches:
Expand Down Expand Up @@ -984,9 +986,9 @@ def process_writes(influxdb3_local, table_batches, args=None):
server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9 1\n\
cpu,host=s2,region=us-east usage=0.89 2\n\
cpu,host=s1,region=us-east usage=0.85 3",
"cpu,host=s1,region=us-east usage=0.9\n\
cpu,host=s2,region=us-east usage=0.89\n\
cpu,host=s1,region=us-east usage=0.85",
Precision::Nanosecond,
)
.await
Expand Down Expand Up @@ -1015,7 +1017,7 @@ def process_writes(influxdb3_local, table_batches, args=None):
let expected_result = r#"{
"log_lines": [
"INFO: arg1: arg1_value",
"INFO: query result: [{'host': 's2', 'region': 'us-east', 'time': 2, 'usage': 0.89}]",
"INFO: query result: [{'host': 's2', 'region': 'us-east', 'usage': 0.89}]",
"INFO: table: test_input",
"INFO: row: {'tag1': 'tag1_value', 'tag2': 'tag2_value', 'field1': 1, 'time': 500}",
"INFO: done"
Expand All @@ -1033,3 +1035,108 @@ def process_writes(influxdb3_local, table_batches, args=None):
let expected_result = serde_json::from_str::<serde_json::Value>(expected_result).unwrap();
assert_eq!(res, expected_result);
}

#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_wal_plugin_errors() {
use crate::ConfigProvider;
use influxdb3_client::Precision;

struct Test {
name: &'static str,
plugin_code: &'static str,
expected_error: &'static str,
}

let tests = vec![
Test {
name: "invalid_python",
plugin_code: r#"
lkjasdf9823
jjjjj / sss"#,
expected_error: "error executing plugin: IndentationError: unexpected indent (<string>, line 2)",
},
Test {
name: "no_process_writes",
plugin_code: r#"
def not_process_writes(influxdb3_local, table_batches, args=None):
influxdb3_local.info("done")"#,
expected_error: "error executing plugin: the process_writes function is not present in the plugin. Should be defined as: process_writes(influxdb3_local, table_batches, args=None)",
},
Test {
name: "no_args",
plugin_code: r#"
def process_writes(influxdb3_local, table_batches):
influxdb3_local.info("done")
"#,
expected_error: "error executing plugin: TypeError: process_writes() takes 2 positional arguments but 3 were given",
},
Test {
name: "no_table_batches",
plugin_code: r#"
def process_writes(influxdb3_local, args=None):
influxdb3_local.info("done")
"#,
expected_error: "error executing plugin: TypeError: process_writes() takes from 1 to 2 positional arguments but 3 were given",
},
Test {
name: "no_influxdb3_local",
plugin_code: r#"
def process_writes(table_batches, args=None):
influxdb3_local.info("done")
"#,
expected_error: "error executing plugin: TypeError: process_writes() takes from 1 to 2 positional arguments but 3 were given",
}];

let plugin_dir = TempDir::new().unwrap();

let server = TestServer::configure()
.with_plugin_dir(plugin_dir.path().to_str().unwrap())
.spawn()
.await;
let server_addr = server.client_addr();

server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9\n\
cpu,host=s2,region=us-east usage=0.89\n\
cpu,host=s1,region=us-east usage=0.85",
Precision::Nanosecond,
)
.await
.unwrap();

let db_name = "foo";

for test in tests {
let mut plugin_file = NamedTempFile::new_in(plugin_dir.path()).unwrap();
writeln!(plugin_file, "{}", test.plugin_code).unwrap();
let plugin_name = plugin_file.path().file_name().unwrap().to_str().unwrap();

let result = run_with_confirmation(&[
"test",
"wal_plugin",
"--database",
db_name,
"--host",
&server_addr,
"--lp",
"test_input,tag1=tag1_value,tag2=tag2_value field1=1i 500",
"--input-arguments",
"arg1=arg1_value,host=s2",
plugin_name,
]);
debug!(result = ?result, "test wal plugin");

println!("{}", result);
let res = serde_json::from_str::<serde_json::Value>(&result).unwrap();
let errors = res.get("errors").unwrap().as_array().unwrap();
let error = errors[0].as_str().unwrap();
assert_eq!(
error, test.expected_error,
"test: {}, response was: {}",
test.name, result
);
}
}
3 changes: 3 additions & 0 deletions influxdb3_processing_engine/src/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub enum Error {

#[error("reading plugin file: {0}")]
ReadPluginError(#[from] std::io::Error),

#[error("error executing plugin: {0}")]
PluginExecutionError(#[from] influxdb3_py_api::ExecutePluginError),
}

#[cfg(feature = "system-py")]
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_py_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license.workspace = true
system-py = ["pyo3"]

[dependencies]
anyhow.workspace = true
arrow-array.workspace = true
arrow-schema.workspace = true
hashbrown.workspace = true
Expand All @@ -20,6 +21,7 @@ iox_query_params.workspace = true
observability_deps.workspace = true
parking_lot.workspace = true
futures.workspace = true
thiserror.workspace = true
tokio.workspace = true

[dependencies.pyo3]
Expand Down
9 changes: 9 additions & 0 deletions influxdb3_py_api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
#[derive(Debug, thiserror::Error)]
pub enum ExecutePluginError {
#[error("the process_writes function is not present in the plugin. Should be defined as: process_writes(influxdb3_local, table_batches, args=None)")]
MissingProcessWritesFunction,

#[error("{0}")]
PluginError(#[from] anyhow::Error),
}

#[cfg(feature = "system-py")]
pub mod system_py;
76 changes: 52 additions & 24 deletions influxdb3_py_api/src/system_py.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::ExecutePluginError;
use anyhow::Context;
use arrow_array::types::Int32Type;
use arrow_array::{
BooleanArray, DictionaryArray, Float64Array, Int64Array, RecordBatch, StringArray,
Expand Down Expand Up @@ -353,7 +355,7 @@ pub fn execute_python_with_batch(
query_executor: Arc<dyn QueryExecutor>,
table_filter: Option<TableId>,
args: &Option<HashMap<String, String>>,
) -> PyResult<PluginReturnState> {
) -> Result<PluginReturnState, ExecutePluginError> {
Python::with_gil(|py| {
// import the LineBuilder for use in the python code
let globals = PyDict::new(py);
Expand All @@ -362,7 +364,8 @@ pub fn execute_python_with_batch(
&CString::new(LINE_BUILDER_CODE).unwrap(),
Some(&globals),
None,
)?;
)
.map_err(|e| anyhow::Error::new(e).context("failed to eval the LineBuilder API code"))?;

// convert the write batch into a python object
let mut table_batches = Vec::with_capacity(write_batch.table_chunks.len());
Expand All @@ -373,43 +376,61 @@ pub fn execute_python_with_batch(
continue;
}
}
let table_def = schema.tables.get(table_id).unwrap();
let table_def = schema.tables.get(table_id).context("table not found")?;

let dict = PyDict::new(py);
dict.set_item("table_name", table_def.table_name.as_ref())
.unwrap();
.context("failed to set table_name")?;

let mut rows: Vec<PyObject> = Vec::new();
for chunk in table_chunks.chunk_time_to_chunk.values() {
for row in &chunk.rows {
let py_row = PyDict::new(py);

for field in &row.fields {
let field_name = table_def.column_id_to_name(&field.id).unwrap();
let field_name = table_def
.column_id_to_name(&field.id)
.context("field not found")?;
match &field.value {
FieldData::String(s) => {
py_row.set_item(field_name.as_ref(), s.as_str()).unwrap();
py_row
.set_item(field_name.as_ref(), s.as_str())
.context("failed to set string field")?;
}
FieldData::Integer(i) => {
py_row.set_item(field_name.as_ref(), i).unwrap();
py_row
.set_item(field_name.as_ref(), i)
.context("failed to set integer field")?;
}
FieldData::UInteger(u) => {
py_row.set_item(field_name.as_ref(), u).unwrap();
py_row
.set_item(field_name.as_ref(), u)
.context("failed to set unsigned integer field")?;
}
FieldData::Float(f) => {
py_row.set_item(field_name.as_ref(), f).unwrap();
py_row
.set_item(field_name.as_ref(), f)
.context("failed to set float field")?;
}
FieldData::Boolean(b) => {
py_row.set_item(field_name.as_ref(), b).unwrap();
py_row
.set_item(field_name.as_ref(), b)
.context("failed to set boolean field")?;
}
FieldData::Tag(t) => {
py_row.set_item(field_name.as_ref(), t.as_str()).unwrap();
py_row
.set_item(field_name.as_ref(), t.as_str())
.context("failed to set tag field")?;
}
FieldData::Key(k) => {
py_row.set_item(field_name.as_ref(), k.as_str()).unwrap();
py_row
.set_item(field_name.as_ref(), k.as_str())
.context("failed to set key field")?;
}
FieldData::Timestamp(t) => {
py_row.set_item(field_name.as_ref(), t).unwrap();
py_row
.set_item(field_name.as_ref(), t)
.context("failed to set timestamp")?;
}
};
}
Expand All @@ -418,21 +439,23 @@ pub fn execute_python_with_batch(
}
}

let rows = PyList::new(py, rows).unwrap();
let rows = PyList::new(py, rows).context("failed to create rows list")?;

dict.set_item("rows", rows.unbind()).unwrap();
dict.set_item("rows", rows.unbind())
.context("failed to set rows")?;
table_batches.push(dict);
}

let py_batches = PyList::new(py, table_batches).unwrap();
let py_batches =
PyList::new(py, table_batches).context("failed to create table_batches list")?;

let api = PyPluginCallApi {
db_schema: schema,
query_executor,
return_state: Default::default(),
};
let return_state = Arc::clone(&api.return_state);
let local_api = api.into_pyobject(py)?;
let local_api = api.into_pyobject(py).map_err(anyhow::Error::from)?;

// turn args into an optional dict to pass into python
let args = args.as_ref().map(|args| {
Expand All @@ -444,14 +467,19 @@ pub fn execute_python_with_batch(
});

// run the code and get the python function to call
py.run(&CString::new(code).unwrap(), Some(&globals), None)?;
let py_func = py.eval(
&CString::new(PROCESS_WRITES_CALL_SITE).unwrap(),
Some(&globals),
None,
)?;
py.run(&CString::new(code).unwrap(), Some(&globals), None)
.map_err(anyhow::Error::from)?;
let py_func = py
.eval(
&CString::new(PROCESS_WRITES_CALL_SITE).unwrap(),
Some(&globals),
None,
)
.map_err(|_| ExecutePluginError::MissingProcessWritesFunction)?;

py_func.call1((local_api, py_batches.unbind(), args))?;
py_func
.call1((local_api, py_batches.unbind(), args))
.map_err(anyhow::Error::from)?;

// swap with an empty return state to avoid cloning
let empty_return_state = PluginReturnState::default();
Expand Down

0 comments on commit 1f71750

Please sign in to comment.