From d135f69ca5a9495ea963b00a0007a0598cfc4124 Mon Sep 17 00:00:00 2001 From: comphead Date: Sun, 26 Mar 2023 21:33:07 -0700 Subject: [PATCH 1/6] weaken schema check on mem table --- datafusion/common/src/utils.rs | 13 +++++++++++++ datafusion/core/src/datasource/memory.rs | 11 ++++++----- .../src/engines/datafusion/normalize.rs | 14 +------------- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs index a1226def8e56..ef32385828f0 100644 --- a/datafusion/common/src/utils.rs +++ b/datafusion/common/src/utils.rs @@ -20,6 +20,7 @@ use crate::{DataFusionError, Result, ScalarValue}; use arrow::array::ArrayRef; use arrow::compute::SortOptions; +use arrow::datatypes::SchemaRef; use sqlparser::ast::Ident; use sqlparser::dialect::GenericDialect; use sqlparser::parser::{Parser, ParserError}; @@ -234,6 +235,18 @@ pub(crate) fn parse_identifiers_normalized(s: &str) -> Vec { .collect::>() } +/// Check two schemas for being equal for field names/types +pub fn equivalent_names_and_types(schema: &SchemaRef, other: SchemaRef) -> bool { + if schema.fields().len() != other.fields().len() { + return false; + } + let self_fields = schema.fields().iter(); + let other_fields = other.fields().iter(); + self_fields + .zip(other_fields) + .all(|(f1, f2)| f1.name() == f2.name() && f1.data_type() == f2.data_type()) +} + #[cfg(test)] mod tests { use arrow::array::Float64Array; diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index b5fa33e38827..10edf9b9ef7e 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -19,6 +19,7 @@ //! queried by DataFusion. This allows data to be pre-loaded into memory and then //! repeatedly queried without incurring additional file I/O overhead. +use datafusion_common::utils::equivalent_names_and_types; use futures::{StreamExt, TryStreamExt}; use std::any::Any; use std::sync::Arc; @@ -51,11 +52,11 @@ pub struct MemTable { impl MemTable { /// Create a new in-memory table from the provided schema and record batches pub fn try_new(schema: SchemaRef, partitions: Vec>) -> Result { - if partitions - .iter() - .flatten() - .all(|batches| schema.contains(&batches.schema())) - { + if partitions.iter().flatten().all(|batches| { + dbg!(&schema); + dbg!(&batches.schema()); + equivalent_names_and_types(&schema, batches.schema()) + }) { Ok(Self { schema, batches: Arc::new(RwLock::new(partitions)), diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs index 5fad28f7338f..4dffec16a9d0 100644 --- a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs +++ b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use arrow::datatypes::SchemaRef; use arrow::{array, array::ArrayRef, datatypes::DataType, record_batch::RecordBatch}; +use datafusion_common::utils::equivalent_names_and_types; use datafusion_common::DFField; use datafusion_common::DataFusionError; use lazy_static::lazy_static; @@ -144,18 +144,6 @@ lazy_static! { static ref WORKSPACE_ROOT: object_store::path::Path = workspace_root(); } -/// Check two schemas for being equal for field names/types -fn equivalent_names_and_types(schema: &SchemaRef, other: SchemaRef) -> bool { - if schema.fields().len() != other.fields().len() { - return false; - } - let self_fields = schema.fields().iter(); - let other_fields = other.fields().iter(); - self_fields - .zip(other_fields) - .all(|(f1, f2)| f1.name() == f2.name() && f1.data_type() == f2.data_type()) -} - /// Convert a single batch to a `Vec>` for comparison fn convert_batch(batch: RecordBatch) -> Result>> { (0..batch.num_rows()) From 1674be46db8e9149e7de4a70cc6c5dd0af841569 Mon Sep 17 00:00:00 2001 From: comphead Date: Sun, 26 Mar 2023 22:20:14 -0700 Subject: [PATCH 2/6] weaken schema check on mem table --- datafusion/core/src/datasource/memory.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 10edf9b9ef7e..e5d7b5250dd9 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -52,11 +52,11 @@ pub struct MemTable { impl MemTable { /// Create a new in-memory table from the provided schema and record batches pub fn try_new(schema: SchemaRef, partitions: Vec>) -> Result { - if partitions.iter().flatten().all(|batches| { - dbg!(&schema); - dbg!(&batches.schema()); - equivalent_names_and_types(&schema, batches.schema()) - }) { + if partitions + .iter() + .flatten() + .all(|batches| equivalent_names_and_types(&schema, batches.schema())) + { Ok(Self { schema, batches: Arc::new(RwLock::new(partitions)), From 3302426061ef432c9325cd18afad4c281e52babd Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 27 Mar 2023 17:03:54 -0700 Subject: [PATCH 3/6] fix LEFT join column null inference --- datafusion/core/src/datasource/memory.rs | 3 +- .../tests/sqllogictests/test_files/join.slt | 7 ++++ datafusion/expr/src/logical_plan/builder.rs | 32 +++++++++++++++---- 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index e5d7b5250dd9..b5fa33e38827 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -19,7 +19,6 @@ //! queried by DataFusion. This allows data to be pre-loaded into memory and then //! repeatedly queried without incurring additional file I/O overhead. -use datafusion_common::utils::equivalent_names_and_types; use futures::{StreamExt, TryStreamExt}; use std::any::Any; use std::sync::Arc; @@ -55,7 +54,7 @@ impl MemTable { if partitions .iter() .flatten() - .all(|batches| equivalent_names_and_types(&schema, batches.schema())) + .all(|batches| schema.contains(&batches.schema())) { Ok(Self { schema, diff --git a/datafusion/core/tests/sqllogictests/test_files/join.slt b/datafusion/core/tests/sqllogictests/test_files/join.slt index f7c291f909f6..5f1c067ece52 100644 --- a/datafusion/core/tests/sqllogictests/test_files/join.slt +++ b/datafusion/core/tests/sqllogictests/test_files/join.slt @@ -530,6 +530,13 @@ FROM t1 ---- 11 a 55 +# test create table from query with LEFT join +statement ok +create table temp as +with t1 as (select 1 as col1, 'asd' as col2), + t2 as (select 1 as col3, 'sdf' as col4) +select col2, col4 from t1 left join t2 on col1 = col3 + statement ok drop table IF EXISTS t1; diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index c5e623785450..6e93f55532e4 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1041,20 +1041,40 @@ pub fn build_join_schema( right: &DFSchema, join_type: &JoinType, ) -> Result { + let right_fields = right.fields(); + let left_fields = left.fields(); + let fields: Vec = match join_type { - JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { - let right_fields = right.fields().iter(); - let left_fields = left.fields().iter(); + JoinType::Inner | JoinType::Full | JoinType::Right => { // left then right - left_fields.chain(right_fields).cloned().collect() + left_fields + .iter() + .chain(right_fields.iter()) + .cloned() + .collect() + } + JoinType::Left => { + // left then right, right set to nullable in case of not matched scenario + let right_fields_nullable: Vec = right_fields + .iter() + .map(|f| { + let ff = f.field().clone().with_nullable(true); + DFField::from_qualified(f.qualifier().unwrap(), ff) + }) + .collect(); + left_fields + .iter() + .chain(&right_fields_nullable) + .cloned() + .collect() } JoinType::LeftSemi | JoinType::LeftAnti => { // Only use the left side for the schema - left.fields().clone() + left_fields.clone() } JoinType::RightSemi | JoinType::RightAnti => { // Only use the right side for the schema - right.fields().clone() + right_fields.clone() } }; From b34e1b52de52b490c02ca9c611f870edc1da78cc Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 27 Mar 2023 17:32:06 -0700 Subject: [PATCH 4/6] fix LEFT join column null inference. fix test --- datafusion/expr/src/logical_plan/builder.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 6e93f55532e4..421ad0f4457c 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1058,8 +1058,12 @@ pub fn build_join_schema( let right_fields_nullable: Vec = right_fields .iter() .map(|f| { - let ff = f.field().clone().with_nullable(true); - DFField::from_qualified(f.qualifier().unwrap(), ff) + let field = f.field().clone().with_nullable(true); + if let Some(q) = f.qualifier() { + DFField::from_qualified(q, field) + } else { + DFField::from(field) + } }) .collect(); left_fields From bf8ed7bf3d4b19d9c436d3b546171b5d17ee493f Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 27 Mar 2023 21:04:09 -0700 Subject: [PATCH 5/6] fix LEFT join column null inference. fix test --- .../src/extract_equijoin_predicate.rs | 20 +++++++++---------- .../optimizer/src/push_down_projection.rs | 6 +++--- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs b/datafusion/optimizer/src/extract_equijoin_predicate.rs index 2f7a20d6e230..20b9c629712c 100644 --- a/datafusion/optimizer/src/extract_equijoin_predicate.rs +++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs @@ -179,7 +179,7 @@ mod tests { Some(col("t1.a").eq(col("t2.a"))), )? .build()?; - let expected = "Left Join: t1.a = t2.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]\ + let expected = "Left Join: t1.a = t2.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32;N, b:UInt32;N, c:UInt32;N]\ \n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]"; @@ -199,7 +199,7 @@ mod tests { Some((col("t1.a") + lit(10i64)).eq(col("t2.a") * lit(2u32))), )? .build()?; - let expected = "Left Join: t1.a + Int64(10) = t2.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]\ + let expected = "Left Join: t1.a + Int64(10) = t2.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32, a:UInt32;N, b:UInt32;N, c:UInt32;N]\ \n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]"; @@ -223,7 +223,7 @@ mod tests { ), )? .build()?; - let expected = "Left Join: Filter: t1.a + Int64(10) >= t2.a * UInt32(2) AND t1.b < Int32(100) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]\ + let expected = "Left Join: Filter: t1.a + Int64(10) >= t2.a * UInt32(2) AND t1.b < Int32(100) [a:UInt32, b:UInt32, c:UInt32, a:UInt32;N, b:UInt32;N, c:UInt32;N]\ \n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]"; @@ -250,7 +250,7 @@ mod tests { ), )? .build()?; - let expected = "Left Join: t1.a + UInt32(11) = t2.a * UInt32(2), t1.a + Int64(10) = t2.a * UInt32(2) Filter: t1.b < Int32(100) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]\ + let expected = "Left Join: t1.a + UInt32(11) = t2.a * UInt32(2), t1.a + Int64(10) = t2.a * UInt32(2) Filter: t1.b < Int32(100) [a:UInt32, b:UInt32, c:UInt32, a:UInt32;N, b:UInt32;N, c:UInt32;N]\ \n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]"; @@ -277,7 +277,7 @@ mod tests { ), )? .build()?; - let expected = "Left Join: t1.a = t2.a, t1.b = t2.b Filter: t1.c = t2.c OR t1.a + t1.b > t2.b + t2.c [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]\ + let expected = "Left Join: t1.a = t2.a, t1.b = t2.b Filter: t1.c = t2.c OR t1.a + t1.b > t2.b + t2.c [a:UInt32, b:UInt32, c:UInt32, a:UInt32;N, b:UInt32;N, c:UInt32;N]\ \n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]"; @@ -314,9 +314,9 @@ mod tests { ), )? .build()?; - let expected = "Left Join: t1.a = t2.a Filter: t1.c + t2.c + t3.c < UInt32(100) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]\ + let expected = "Left Join: t1.a = t2.a Filter: t1.c + t2.c + t3.c < UInt32(100) [a:UInt32, b:UInt32, c:UInt32, a:UInt32;N, b:UInt32;N, c:UInt32;N, a:UInt32;N, b:UInt32;N, c:UInt32;N]\ \n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]\ - \n Left Join: t2.a = t3.a Filter: t2.a + t3.b > UInt32(100) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]\ + \n Left Join: t2.a = t3.a Filter: t2.a + t3.b > UInt32(100) [a:UInt32, b:UInt32, c:UInt32, a:UInt32;N, b:UInt32;N, c:UInt32;N]\ \n TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: t3 [a:UInt32, b:UInt32, c:UInt32]"; @@ -349,9 +349,9 @@ mod tests { Some(col("t1.a").eq(col("t2.a")).and(col("t2.c").eq(col("t3.c")))), )? .build()?; - let expected = "Left Join: t1.a = t2.a Filter: t2.c = t3.c [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]\ + let expected = "Left Join: t1.a = t2.a Filter: t2.c = t3.c [a:UInt32, b:UInt32, c:UInt32, a:UInt32;N, b:UInt32;N, c:UInt32;N, a:UInt32;N, b:UInt32;N, c:UInt32;N]\ \n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]\ - \n Left Join: t2.a = t3.a Filter: t2.a + t3.b > UInt32(100) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]\ + \n Left Join: t2.a = t3.a Filter: t2.a + t3.b > UInt32(100) [a:UInt32, b:UInt32, c:UInt32, a:UInt32;N, b:UInt32;N, c:UInt32;N]\ \n TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: t3 [a:UInt32, b:UInt32, c:UInt32]"; @@ -380,7 +380,7 @@ mod tests { Some(filter), )? .build()?; - let expected = "Left Join: t1.a + CAST(Int64(1) AS UInt32) = t2.a + CAST(Int32(2) AS UInt32) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]\ + let expected = "Left Join: t1.a + CAST(Int64(1) AS UInt32) = t2.a + CAST(Int32(2) AS UInt32) [a:UInt32, b:UInt32, c:UInt32, a:UInt32;N, b:UInt32;N, c:UInt32;N]\ \n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]"; diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index 767077aa0c02..fd8f4c011a17 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -733,7 +733,7 @@ mod tests { vec![ DFField::new(Some("test"), "a", DataType::UInt32, false), DFField::new(Some("test"), "b", DataType::UInt32, false), - DFField::new(Some("test2"), "c1", DataType::UInt32, false), + DFField::new(Some("test2"), "c1", DataType::UInt32, true), ], HashMap::new(), )?, @@ -776,7 +776,7 @@ mod tests { vec![ DFField::new(Some("test"), "a", DataType::UInt32, false), DFField::new(Some("test"), "b", DataType::UInt32, false), - DFField::new(Some("test2"), "c1", DataType::UInt32, false), + DFField::new(Some("test2"), "c1", DataType::UInt32, true), ], HashMap::new(), )?, @@ -817,7 +817,7 @@ mod tests { vec![ DFField::new(Some("test"), "a", DataType::UInt32, false), DFField::new(Some("test"), "b", DataType::UInt32, false), - DFField::new(Some("test2"), "a", DataType::UInt32, false), + DFField::new(Some("test2"), "a", DataType::UInt32, true), ], HashMap::new(), )?, From b8af34d4b995eecf79af1fb1f546903f380128ec Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 28 Mar 2023 10:33:14 -0700 Subject: [PATCH 6/6] fix LEFT join column null inference. rollback some code --- datafusion/common/src/utils.rs | 13 ------------- .../src/engines/datafusion/normalize.rs | 14 +++++++++++++- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs index ef32385828f0..a1226def8e56 100644 --- a/datafusion/common/src/utils.rs +++ b/datafusion/common/src/utils.rs @@ -20,7 +20,6 @@ use crate::{DataFusionError, Result, ScalarValue}; use arrow::array::ArrayRef; use arrow::compute::SortOptions; -use arrow::datatypes::SchemaRef; use sqlparser::ast::Ident; use sqlparser::dialect::GenericDialect; use sqlparser::parser::{Parser, ParserError}; @@ -235,18 +234,6 @@ pub(crate) fn parse_identifiers_normalized(s: &str) -> Vec { .collect::>() } -/// Check two schemas for being equal for field names/types -pub fn equivalent_names_and_types(schema: &SchemaRef, other: SchemaRef) -> bool { - if schema.fields().len() != other.fields().len() { - return false; - } - let self_fields = schema.fields().iter(); - let other_fields = other.fields().iter(); - self_fields - .zip(other_fields) - .all(|(f1, f2)| f1.name() == f2.name() && f1.data_type() == f2.data_type()) -} - #[cfg(test)] mod tests { use arrow::array::Float64Array; diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs index 4dffec16a9d0..5fad28f7338f 100644 --- a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs +++ b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. +use arrow::datatypes::SchemaRef; use arrow::{array, array::ArrayRef, datatypes::DataType, record_batch::RecordBatch}; -use datafusion_common::utils::equivalent_names_and_types; use datafusion_common::DFField; use datafusion_common::DataFusionError; use lazy_static::lazy_static; @@ -144,6 +144,18 @@ lazy_static! { static ref WORKSPACE_ROOT: object_store::path::Path = workspace_root(); } +/// Check two schemas for being equal for field names/types +fn equivalent_names_and_types(schema: &SchemaRef, other: SchemaRef) -> bool { + if schema.fields().len() != other.fields().len() { + return false; + } + let self_fields = schema.fields().iter(); + let other_fields = other.fields().iter(); + self_fields + .zip(other_fields) + .all(|(f1, f2)| f1.name() == f2.name() && f1.data_type() == f2.data_type()) +} + /// Convert a single batch to a `Vec>` for comparison fn convert_batch(batch: RecordBatch) -> Result>> { (0..batch.num_rows())