diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 6c61af76a80b..3d67ddb7edc0 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -323,7 +323,7 @@ pub struct ConfigOptions { pub catalog: CatalogOptions, /// Execution options pub execution: ExecutionOptions, - /// Explain options + /// Optimizer options pub optimizer: OptimizerOptions, /// Explain options pub explain: ExplainOptions, diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 1428e92a581d..8fe3dfdbafa3 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -26,8 +26,8 @@ use crate::utils::normalize_ident; use arrow_schema::DataType; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ - Column, DFSchema, DFSchemaRef, DataFusionError, ExprSchema, OwnedTableReference, - Result, TableReference, ToDFSchema, + Column, DFField, DFSchema, DFSchemaRef, DataFusionError, ExprSchema, + OwnedTableReference, Result, TableReference, ToDFSchema, }; use datafusion_expr::expr_rewriter::normalize_col_with_schemas; use datafusion_expr::logical_plan::builder::project; @@ -792,6 +792,24 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let arrow_schema = (*provider.schema()).clone(); let table_schema = DFSchema::try_from(arrow_schema)?; + let fields = if columns.is_empty() { + // Empty means we're inserting into all columns of the table + table_schema.fields().clone() + } else { + let fields = columns + .iter() + .map(|c| { + Ok(table_schema + .field_with_unqualified_name(&normalize_ident(c.clone()))? + .clone()) + }) + .collect::>>()?; + // Validate no duplicate fields + let table_schema = + DFSchema::new_with_metadata(fields, table_schema.metadata().clone())?; + table_schema.fields().clone() + }; + // infer types for Values clause... other types should be resolvable the regular way let mut prepare_param_data_types = BTreeMap::new(); if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() { @@ -804,14 +822,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { "Can't parse placeholder: {name}" )) })? - 1; - let col = columns.get(idx).ok_or_else(|| { + let field = fields.get(idx).ok_or_else(|| { DataFusionError::Plan(format!( "Placeholder ${} refers to a non existent column", idx + 1 )) })?; - let field = - table_schema.field_with_name(None, col.value.as_str())?; let dt = field.field().data_type().clone(); let _ = prepare_param_data_types.insert(name, dt); } @@ -824,21 +840,19 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let mut planner_context = PlannerContext::new_with_prepare_param_data_types(prepare_param_data_types); let source = self.query_to_plan(*source, &mut planner_context)?; - if columns.len() != source.schema().fields().len() { + if fields.len() != source.schema().fields().len() { Err(DataFusionError::Plan( "Column count doesn't match insert query!".to_owned(), ))?; } - let values_schema = source.schema(); - let exprs = columns + let exprs = fields .iter() .zip(source.schema().fields().iter()) - .map(|(c, f)| { - let col_name = c.value.clone(); - let col = table_schema.field_with_name(None, col_name.as_str())?; - let expr = datafusion_expr::Expr::Column(Column::from(f.name().clone())) - .alias(col_name) - .cast_to(col.data_type(), values_schema)?; + .map(|(target_field, source_field)| { + let expr = + datafusion_expr::Expr::Column(source_field.unqualified_column()) + .cast_to(target_field.data_type(), source.schema())? + .alias(target_field.name()); Ok(expr) }) .collect::>>()?; diff --git a/datafusion/sql/tests/integration_test.rs b/datafusion/sql/tests/integration_test.rs index f2af65e87875..270727259177 100644 --- a/datafusion/sql/tests/integration_test.rs +++ b/datafusion/sql/tests/integration_test.rs @@ -165,13 +165,60 @@ fn plan_insert() { "insert into person (id, first_name, last_name) values (1, 'Alan', 'Turing')"; let plan = r#" Dml: op=[Insert] table=[person] - Projection: CAST(column1 AS id AS UInt32), column2 AS first_name, column3 AS last_name + Projection: CAST(column1 AS UInt32) AS id, column2 AS first_name, column3 AS last_name Values: (Int64(1), Utf8("Alan"), Utf8("Turing")) "# .trim(); quick_test(sql, plan); } +#[test] +fn plan_insert_no_target_columns() { + let sql = "INSERT INTO test_decimal VALUES (1, 2), (3, 4)"; + let plan = r#" +Dml: op=[Insert] table=[test_decimal] + Projection: CAST(column1 AS Int32) AS id, CAST(column2 AS Decimal128(10, 2)) AS price + Values: (Int64(1), Int64(2)), (Int64(3), Int64(4)) + "# + .trim(); + quick_test(sql, plan); +} + +#[rstest] +#[case::duplicate_columns( + "INSERT INTO test_decimal (id, price, price) VALUES (1, 2, 3), (4, 5, 6)", + "Schema error: Schema contains duplicate unqualified field name 'price'" +)] +#[case::non_existing_column( + "INSERT INTO test_decimal (nonexistent, price) VALUES (1, 2), (4, 5)", + "Schema error: No field named 'nonexistent'. Valid fields are 'id', 'price'." +)] +#[case::type_mismatch( + "INSERT INTO test_decimal SELECT '2022-01-01', to_timestamp('2022-01-01T12:00:00')", + "Error during planning: Cannot automatically convert Timestamp(Nanosecond, None) to Decimal128(10, 2)" +)] +#[case::target_column_count_mismatch( + "INSERT INTO person (id, first_name, last_name) VALUES ($1, $2)", + "Error during planning: Column count doesn't match insert query!" +)] +#[case::source_column_count_mismatch( + "INSERT INTO person VALUES ($1, $2)", + "Error during planning: Column count doesn't match insert query!" +)] +#[case::extra_placeholder( + "INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3, $4)", + "Error during planning: Placeholder $4 refers to a non existent column" +)] +#[case::placeholder_type_unresolved( + "INSERT INTO person (id, first_name, last_name) VALUES ($2, $4, $6)", + "Error during planning: Placeholder type could not be resolved" +)] +#[test] +fn test_insert_schema_errors(#[case] sql: &str, #[case] error: &str) { + let err = logical_plan(sql).unwrap_err(); + assert_eq!(err.to_string(), error) +} + #[test] fn plan_update() { let sql = "update person set last_name='Kay' where id=1"; @@ -3464,36 +3511,6 @@ Dml: op=[Insert] table=[person] prepare_stmt_replace_params_quick_test(plan, param_values, expected_plan); } -#[test] -#[should_panic(expected = "Placeholder $4 refers to a non existent column")] -fn test_prepare_statement_insert_infer_gt() { - let sql = "insert into person (id, first_name, last_name) values ($1, $2, $3, $4)"; - - let expected_plan = r#""#.trim(); - let expected_dt = "[Int32]"; - let _ = prepare_stmt_quick_test(sql, expected_plan, expected_dt); -} - -#[test] -#[should_panic(expected = "value: Plan(\"Column count doesn't match insert query!\")")] -fn test_prepare_statement_insert_infer_lt() { - let sql = "insert into person (id, first_name, last_name) values ($1, $2)"; - - let expected_plan = r#""#.trim(); - let expected_dt = "[Int32]"; - let _ = prepare_stmt_quick_test(sql, expected_plan, expected_dt); -} - -#[test] -#[should_panic(expected = "value: Plan(\"Placeholder type could not be resolved\")")] -fn test_prepare_statement_insert_infer_gap() { - let sql = "insert into person (id, first_name, last_name) values ($2, $4, $6)"; - - let expected_plan = r#""#.trim(); - let expected_dt = "[Int32]"; - let _ = prepare_stmt_quick_test(sql, expected_plan, expected_dt); -} - #[test] fn test_prepare_statement_to_plan_one_param() { let sql = "PREPARE my_plan(INT) AS SELECT id, age FROM person WHERE age = $1";