Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Capture nullability in Values node planning #14472

Merged
merged 7 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 35 additions & 18 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,10 @@ impl LogicalPlanBuilder {
schema: &DFSchema,
) -> Result<Self> {
let n_cols = values[0].len();
let mut field_types: Vec<DataType> = Vec::with_capacity(n_cols);
let mut fields = ValuesFields::new();
for j in 0..n_cols {
let field_type = schema.field(j).data_type();
let field_nullable = schema.field(j).is_nullable();
for row in values.iter() {
let value = &row[j];
let data_type = value.get_type(schema)?;
Expand All @@ -260,17 +261,17 @@ impl LogicalPlanBuilder {
}
}
}
field_types.push(field_type.to_owned());
fields.push(field_type.to_owned(), field_nullable);
}

Self::infer_inner(values, &field_types, schema)
Self::infer_inner(values, fields, schema)
}

fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
let n_cols = values[0].len();
let schema = DFSchema::empty();
let mut fields = ValuesFields::new();

let mut field_types: Vec<DataType> = Vec::with_capacity(n_cols);
for j in 0..n_cols {
let mut common_type: Option<DataType> = None;
for (i, row) in values.iter().enumerate() {
Expand All @@ -293,37 +294,30 @@ impl LogicalPlanBuilder {
}
// assuming common_type was not set, and no error, therefore the type should be NULL
// since the code loop skips NULL
field_types.push(common_type.unwrap_or(DataType::Null));
fields.push(common_type.unwrap_or(DataType::Null), true);
}

Self::infer_inner(values, &field_types, &schema)
Self::infer_inner(values, fields, &schema)
}

fn infer_inner(
mut values: Vec<Vec<Expr>>,
field_types: &[DataType],
fields: ValuesFields,
schema: &DFSchema,
) -> Result<Self> {
let fields = fields.into_fields();
// wrap cast if data type is not same as common type.
for row in &mut values {
for (j, field_type) in field_types.iter().enumerate() {
for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
if let Expr::Literal(ScalarValue::Null) = row[j] {
row[j] = Expr::Literal(ScalarValue::try_from(field_type)?);
} else {
row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
}
}
}
let fields = field_types
.iter()
.enumerate()
.map(|(j, data_type)| {
// naming is following convention https://www.postgresql.org/docs/current/queries-values.html
let name = &format!("column{}", j + 1);
Field::new(name, data_type.clone(), true)
})
.collect::<Vec<_>>();
let dfschema = DFSchema::from_unqualified_fields(fields.into(), HashMap::new())?;

let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
let schema = DFSchemaRef::new(dfschema);

Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
Expand Down Expand Up @@ -1320,6 +1314,29 @@ impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
}
}

/// Container used when building fields for a `VALUES` node.
#[derive(Default)]
struct ValuesFields {
inner: Vec<Field>,
}

impl ValuesFields {
pub fn new() -> Self {
Self::default()
}

pub fn push(&mut self, data_type: DataType, nullable: bool) {
// Naming follows the convention described here:
// https://www.postgresql.org/docs/current/queries-values.html
let name = format!("column{}", self.inner.len() + 1);
self.inner.push(Field::new(name, data_type, nullable));
}

pub fn into_fields(self) -> Fields {
self.inner.into()
}
}

pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
let mut name_map = HashMap::new();
fields
Expand Down
4 changes: 4 additions & 0 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1897,6 +1897,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
let column_index = table_schema
.index_of_column_by_name(None, &c)
.ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;

if value_indices[column_index].is_some() {
return schema_err!(SchemaError::DuplicateUnqualifiedField {
name: c,
Expand Down Expand Up @@ -1937,6 +1938,9 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
// Projection
let mut planner_context =
PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
planner_context.set_table_schema(Some(DFSchemaRef::new(
DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
)));
let source = self.query_to_plan(*source, &mut planner_context)?;
if fields.len() != source.schema().fields().len() {
plan_err!("Column count doesn't match insert query!")?;
Expand Down
12 changes: 6 additions & 6 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,10 +468,10 @@ fn plan_insert() {
let sql =
"insert into person (id, first_name, last_name) values (1, 'Alan', 'Turing')";
let plan = "Dml: op=[Insert Into] table=[person]\
\n Projection: CAST(column1 AS UInt32) AS id, column2 AS first_name, column3 AS last_name, \
\n Projection: column1 AS id, column2 AS first_name, column3 AS last_name, \
CAST(NULL AS Int32) AS age, CAST(NULL AS Utf8) AS state, CAST(NULL AS Float64) AS salary, \
CAST(NULL AS Timestamp(Nanosecond, None)) AS birth_date, CAST(NULL AS Int32) AS 😀\
\n Values: (Int64(1), Utf8(\"Alan\"), Utf8(\"Turing\"))";
\n Values: (CAST(Int64(1) AS UInt32), Utf8(\"Alan\"), Utf8(\"Turing\"))";
quick_test(sql, plan);
}

Expand All @@ -480,8 +480,8 @@ fn plan_insert_no_target_columns() {
let sql = "INSERT INTO test_decimal VALUES (1, 2), (3, 4)";
let plan = r#"
Dml: op=[Insert Into] table=[test_decimal]
Projection: CAST(column1 AS Int32) AS id, CAST(column2 AS Decimal128(10, 2)) AS price
Values: (Int64(1), Int64(2)), (Int64(3), Int64(4))
Projection: column1 AS id, column2 AS price
Values: (CAST(Int64(1) AS Int32), CAST(Int64(2) AS Decimal128(10, 2))), (CAST(Int64(3) AS Int32), CAST(Int64(4) AS Decimal128(10, 2)))
"#
.trim();
quick_test(sql, plan);
Expand All @@ -499,11 +499,11 @@ Dml: op=[Insert Into] table=[test_decimal]
)]
#[case::target_column_count_mismatch(
"INSERT INTO person (id, first_name, last_name) VALUES ($1, $2)",
"Error during planning: Column count doesn't match insert query!"
"Error during planning: Inconsistent data length across values list: got 2 values in row 0 but expected 3"
)]
#[case::source_column_count_mismatch(
"INSERT INTO person VALUES ($1, $2)",
"Error during planning: Column count doesn't match insert query!"
"Error during planning: Inconsistent data length across values list: got 2 values in row 0 but expected 8"
)]
#[case::extra_placeholder(
"INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3, $4)",
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sqllogictest/test_files/insert.slt
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ insert into table_without_values(id, id) values(3, 3);
statement error Arrow error: Cast error: Cannot cast string 'zoo' to value of Int64 type
insert into table_without_values(name, id) values(4, 'zoo');

statement error Error during planning: Column count doesn't match insert query!
statement error Error during planning: Inconsistent data length across values list: got 2 values in row 0 but expected 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem significantly better errors ❤️

insert into table_without_values(id) values(4, 'zoo');

# insert NULL values for the missing column (name)
Expand Down Expand Up @@ -299,10 +299,10 @@ insert into table_without_values(field1) values(3);
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
insert into table_without_values(field2) values(300);

statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
statement error Invalid argument error: Column 'column1' is declared as non-nullable but contains null values
insert into table_without_values values(NULL, 300);

statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
statement error Invalid argument error: Column 'column1' is declared as non-nullable but contains null values
insert into table_without_values values(3, 300), (NULL, 400);

query II rowsort
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ insert into table_without_values(id, id) values(3, 3);
statement error Arrow error: Cast error: Cannot cast string 'zoo' to value of Int64 type
insert into table_without_values(name, id) values(4, 'zoo');

statement error Error during planning: Column count doesn't match insert query!
statement error Error during planning: Inconsistent data length across values list: got 2 values in row 0 but expected 1
insert into table_without_values(id) values(4, 'zoo');

# insert NULL values for the missing column (name)
Expand Down Expand Up @@ -546,10 +546,10 @@ insert into table_without_values(field1) values(3);
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
insert into table_without_values(field2) values(300);

statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
statement error Invalid argument error: Column 'column1' is declared as non-nullable but contains null values
insert into table_without_values values(NULL, 300);

statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
statement error Invalid argument error: Column 'column1' is declared as non-nullable but contains null values
insert into table_without_values values(3, 300), (NULL, 400);

query II rowsort
Expand Down