Skip to content

Commit

Permalink
fix: writes with incorrect schema should fail (#25022)
Browse files Browse the repository at this point in the history
* test: add reproducer for #25006
* fix: validate schema of lines in lp and return error for invalid fields
  • Loading branch information
hiltontj authored May 29, 2024
1 parent 2ac986a commit faab7a0
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 15 deletions.
48 changes: 48 additions & 0 deletions influxdb3/tests/server/write.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use hyper::StatusCode;
use influxdb3_client::Precision;
use pretty_assertions::assert_eq;

use crate::TestServer;
Expand Down Expand Up @@ -272,3 +273,50 @@ async fn api_v2_write_round_trip() {
+------------------+-------------------------------+------+-------+"
);
}

/// Reproducer for [#25006][issue]
///
/// [issue]: https://github.com/influxdata/influxdb/issues/25006
#[tokio::test]
async fn writes_with_different_schema_should_fail() {
let server = TestServer::spawn().await;
// send a valid write request with the field t0_f0 as an integer:
server
.write_lp_to_db(
"foo",
"\
t0,t0_tag0=initTag t0_f0=0i 1715694000\n\
t0,t0_tag0=initTag t0_f0=1i 1715694001\n\
t0,t0_tag1=initTag t0_f0=0i 1715694000",
Precision::Second,
)
.await
.expect("writes LP with integer field");

// send another write request, to the same db, but with field t0_f0 as an unsigned integer:
let error = server
.write_lp_to_db(
"foo",
"\
t0,t0_tag0=initTag t0_f0=0u 1715694000\n\
t0,t0_tag0=initTag t0_f0=1u 1715694001\n\
t0,t0_tag1=initTag t0_f0=0u 1715694000",
Precision::Second,
)
.await
.expect_err("should fail when writing LP with same field as unsigned integer");

println!("error: {error:#?}");

// the request should have failed with an API error indicating incorrect schema for the field:
assert!(
matches!(
error,
influxdb3_client::Error::ApiError {
code: StatusCode::BAD_REQUEST,
message: _
}
),
"the request should hae failed with an API Error"
);
}
6 changes: 6 additions & 0 deletions influxdb3_write/src/write_buffer/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ impl WriteBufferFlusher {
&self,
segmented_data: Vec<ValidSegmentedData>,
) -> crate::write_buffer::Result<()> {
// Check for presence of valid segment data, otherwise, the await on the response receiver
// will hang below.
if segmented_data.is_empty() {
return Ok(());
}

let (response_tx, response_rx) = oneshot::channel();

self.buffer_tx
Expand Down
61 changes: 46 additions & 15 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,25 +469,22 @@ pub(crate) fn parse_validate_and_update_schema(
let mut valid_parsed_and_raw_lines: Vec<(ParsedLine, &str)> = vec![];

for (line_idx, maybe_line) in parse_lines(lp).enumerate() {
let line = match maybe_line {
let line = match maybe_line
.map_err(|e| WriteLineError {
// This unwrap is fine because we're moving line by line
// alongside the output from parse_lines
original_line: lp_lines.next().unwrap().to_string(),
line_number: line_idx + 1,
error_message: e.to_string(),
})
.and_then(|l| validate_line_schema(line_idx, l, schema))
{
Ok(line) => line,
Err(e) => {
if !accept_partial {
return Err(Error::ParseError(WriteLineError {
// This unwrap is fine because we're moving line by line
// alongside the output from parse_lines
original_line: lp_lines.next().unwrap().to_string(),
line_number: line_idx + 1,
error_message: e.to_string(),
}));
return Err(Error::ParseError(e));
} else {
errors.push(WriteLineError {
original_line: lp_lines.next().unwrap().to_string(),
// This unwrap is fine because we're moving line by line
// alongside the output from parse_lines
line_number: line_idx + 1,
error_message: e.to_string(),
});
errors.push(e);
}
continue;
}
Expand All @@ -512,6 +509,40 @@ pub(crate) fn parse_validate_and_update_schema(
})
}

/// Validate a line of line protocol against the given schema definition
///
/// This is for scenarios where a write comes in for a table that exists, but may have invalid field
/// types, based on the pre-existing schema.
fn validate_line_schema<'a>(
line_number: usize,
line: ParsedLine<'a>,
schema: &DatabaseSchema,
) -> Result<ParsedLine<'a>, WriteLineError> {
let table_name = line.series.measurement.as_str();
if let Some(table_schema) = schema.get_table_schema(table_name) {
for (field_name, field_val) in line.field_set.iter() {
if let Some(schema_col_type) = table_schema.field_type_by_name(field_name) {
let field_col_type = column_type_from_field(field_val);
if field_col_type != schema_col_type {
let field_name = field_name.to_string();
return Err(WriteLineError {
original_line: line.to_string(),
line_number: line_number + 1,
error_message: format!(
"invalid field value in line protocol for field '{field_name}' on line \
{line_number}: expected type {expected}, but got {got}",
expected = ColumnType::from(schema_col_type),
got = field_col_type,
),
});
}
}
}
}

Ok(line)
}

/// Takes parsed lines, validates their schema. If new tables or columns are defined, they
/// are passed back as a new DatabaseSchema as part of the ValidationResult. Lines are split
/// into partitions and the validation result contains the data that can then be serialized
Expand Down

0 comments on commit faab7a0

Please sign in to comment.