Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert target columns empty fix #5079

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ pub struct ConfigOptions {
pub catalog: CatalogOptions,
/// Execution options
pub execution: ExecutionOptions,
/// Explain options
/// Optimizer options
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ Nice drive by cleanup

pub optimizer: OptimizerOptions,
/// Explain options
pub explain: ExplainOptions,
Expand Down
42 changes: 28 additions & 14 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use crate::utils::normalize_ident;
use arrow_schema::DataType;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
Column, DFSchema, DFSchemaRef, DataFusionError, ExprSchema, OwnedTableReference,
Result, TableReference, ToDFSchema,
Column, DFField, DFSchema, DFSchemaRef, DataFusionError, ExprSchema,
OwnedTableReference, Result, TableReference, ToDFSchema,
};
use datafusion_expr::expr_rewriter::normalize_col_with_schemas;
use datafusion_expr::logical_plan::builder::project;
Expand Down Expand Up @@ -791,6 +791,24 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let arrow_schema = (*provider.schema()).clone();
let table_schema = DFSchema::try_from(arrow_schema)?;

let fields = if columns.is_empty() {
// Empty means we're inserting into all columns of the table
table_schema.fields().clone()
} else {
let fields = columns
.iter()
.map(|c| {
Ok(table_schema
.field_with_unqualified_name(&normalize_ident(c.clone()))?
.clone())
})
.collect::<Result<Vec<DFField>>>()?;
// Validate no duplicate fields
let table_schema =
DFSchema::new_with_metadata(fields, table_schema.metadata().clone())?;
table_schema.fields().clone()
};

// infer types for Values clause... other types should be resolvable the regular way
let mut prepare_param_data_types = BTreeMap::new();
if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
Expand All @@ -803,14 +821,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
"Can't parse placeholder: {name}"
))
})? - 1;
let col = columns.get(idx).ok_or_else(|| {
let field = fields.get(idx).ok_or_else(|| {
DataFusionError::Plan(format!(
"Placeholder ${} refers to a non existent column",
idx + 1
))
})?;
let field =
table_schema.field_with_name(None, col.value.as_str())?;
let dt = field.field().data_type().clone();
let _ = prepare_param_data_types.insert(name, dt);
}
Expand All @@ -823,21 +839,19 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut planner_context =
PlannerContext::new_with_prepare_param_data_types(prepare_param_data_types);
let source = self.query_to_plan(*source, &mut planner_context)?;
if columns.len() != source.schema().fields().len() {
if fields.len() != source.schema().fields().len() {
Err(DataFusionError::Plan(
"Column count doesn't match insert query!".to_owned(),
))?;
}
let values_schema = source.schema();
let exprs = columns
let exprs = fields
.iter()
.zip(source.schema().fields().iter())
.map(|(c, f)| {
let col_name = c.value.clone();
let col = table_schema.field_with_name(None, col_name.as_str())?;
let expr = datafusion_expr::Expr::Column(Column::from(f.name().clone()))
.alias(col_name)
.cast_to(col.data_type(), values_schema)?;
.map(|(target_field, source_field)| {
let expr =
datafusion_expr::Expr::Column(source_field.unqualified_column())
.cast_to(target_field.data_type(), source.schema())?
.alias(target_field.name());
Ok(expr)
})
.collect::<Result<Vec<datafusion_expr::Expr>>>()?;
Expand Down
79 changes: 48 additions & 31 deletions datafusion/sql/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,60 @@ fn plan_insert() {
"insert into person (id, first_name, last_name) values (1, 'Alan', 'Turing')";
let plan = r#"
Dml: op=[Insert] table=[person]
Projection: CAST(column1 AS id AS UInt32), column2 AS first_name, column3 AS last_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Projection: CAST(column1 AS UInt32) AS id, column2 AS first_name, column3 AS last_name
Values: (Int64(1), Utf8("Alan"), Utf8("Turing"))
"#
.trim();
quick_test(sql, plan);
}

#[test]
fn plan_insert_no_target_columns() {
let sql = "INSERT INTO test_decimal VALUES (1, 2), (3, 4)";
let plan = r#"
Dml: op=[Insert] table=[test_decimal]
Projection: CAST(column1 AS Int32) AS id, CAST(column2 AS Decimal128(10, 2)) AS price
Values: (Int64(1), Int64(2)), (Int64(3), Int64(4))
"#
.trim();
quick_test(sql, plan);
}

#[rstest]
#[case::duplicate_columns(
"INSERT INTO test_decimal (id, price, price) VALUES (1, 2, 3), (4, 5, 6)",
"Schema error: Schema contains duplicate unqualified field name 'price'"
)]
#[case::non_existing_column(
"INSERT INTO test_decimal (nonexistent, price) VALUES (1, 2), (4, 5)",
"Schema error: No field named 'nonexistent'. Valid fields are 'id', 'price'."
)]
#[case::type_mismatch(
"INSERT INTO test_decimal SELECT '2022-01-01', to_timestamp('2022-01-01T12:00:00')",
"Error during planning: Cannot automatically convert Timestamp(Nanosecond, None) to Decimal128(10, 2)"
)]
#[case::target_column_count_mismatch(
"INSERT INTO person (id, first_name, last_name) VALUES ($1, $2)",
"Error during planning: Column count doesn't match insert query!"
)]
#[case::source_column_count_mismatch(
"INSERT INTO person VALUES ($1, $2)",
"Error during planning: Column count doesn't match insert query!"
)]
#[case::extra_placeholder(
"INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3, $4)",
"Error during planning: Placeholder $4 refers to a non existent column"
)]
#[case::placeholder_type_unresolved(
"INSERT INTO person (id, first_name, last_name) VALUES ($2, $4, $6)",
"Error during planning: Placeholder type could not be resolved"
)]
#[test]
fn test_insert_schema_errors(#[case] sql: &str, #[case] error: &str) {
let err = logical_plan(sql).unwrap_err();
assert_eq!(err.to_string(), error)
}

#[test]
fn plan_update() {
let sql = "update person set last_name='Kay' where id=1";
Expand Down Expand Up @@ -3464,36 +3511,6 @@ Dml: op=[Insert] table=[person]
prepare_stmt_replace_params_quick_test(plan, param_values, expected_plan);
}

#[test]
#[should_panic(expected = "Placeholder $4 refers to a non existent column")]
fn test_prepare_statement_insert_infer_gt() {
let sql = "insert into person (id, first_name, last_name) values ($1, $2, $3, $4)";

let expected_plan = r#""#.trim();
let expected_dt = "[Int32]";
let _ = prepare_stmt_quick_test(sql, expected_plan, expected_dt);
}

#[test]
#[should_panic(expected = "value: Plan(\"Column count doesn't match insert query!\")")]
fn test_prepare_statement_insert_infer_lt() {
let sql = "insert into person (id, first_name, last_name) values ($1, $2)";

let expected_plan = r#""#.trim();
let expected_dt = "[Int32]";
let _ = prepare_stmt_quick_test(sql, expected_plan, expected_dt);
}

#[test]
#[should_panic(expected = "value: Plan(\"Placeholder type could not be resolved\")")]
fn test_prepare_statement_insert_infer_gap() {
let sql = "insert into person (id, first_name, last_name) values ($2, $4, $6)";

let expected_plan = r#""#.trim();
let expected_dt = "[Int32]";
let _ = prepare_stmt_quick_test(sql, expected_plan, expected_dt);
}

#[test]
fn test_prepare_statement_to_plan_one_param() {
let sql = "PREPARE my_plan(INT) AS SELECT id, age FROM person WHERE age = $1";
Expand Down