diff --git a/benchmarks/expected-plans/q16.txt b/benchmarks/expected-plans/q16.txt index 11943cf2477b..b1efb18fa495 100644 --- a/benchmarks/expected-plans/q16.txt +++ b/benchmarks/expected-plans/q16.txt @@ -3,7 +3,7 @@ Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS LAST, part.p_type AS Projection: group_alias_0 AS p_brand, group_alias_1 AS p_type, group_alias_2 AS p_size, COUNT(alias1) AS COUNT(DISTINCT partsupp.ps_suppkey) Aggregate: groupBy=[[group_alias_0, group_alias_1, group_alias_2]], aggr=[[COUNT(alias1)]] Aggregate: groupBy=[[part.p_brand AS group_alias_0, part.p_type AS group_alias_1, part.p_size AS group_alias_2, partsupp.ps_suppkey AS alias1]], aggr=[[]] - Anti Join: partsupp.ps_suppkey = __sq_1.s_suppkey + LeftAnti Join: partsupp.ps_suppkey = __sq_1.s_suppkey Inner Join: partsupp.ps_partkey = part.p_partkey TableScan: partsupp projection=[ps_partkey, ps_suppkey] Filter: part.p_brand != Utf8("Brand#45") AND part.p_type NOT LIKE Utf8("MEDIUM POLISHED%") AND part.p_size IN ([Int32(49), Int32(14), Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)]) diff --git a/benchmarks/expected-plans/q18.txt b/benchmarks/expected-plans/q18.txt index ebc22ea5d886..ce0b20c201e3 100644 --- a/benchmarks/expected-plans/q18.txt +++ b/benchmarks/expected-plans/q18.txt @@ -1,7 +1,7 @@ Sort: orders.o_totalprice DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST Projection: customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, SUM(lineitem.l_quantity) Aggregate: groupBy=[[customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice]], aggr=[[SUM(lineitem.l_quantity)]] - Semi Join: orders.o_orderkey = __sq_1.l_orderkey + LeftSemi Join: orders.o_orderkey = __sq_1.l_orderkey Inner Join: orders.o_orderkey = lineitem.l_orderkey Inner Join: customer.c_custkey = orders.o_custkey TableScan: customer projection=[c_custkey, c_name] diff --git a/benchmarks/expected-plans/q20.txt b/benchmarks/expected-plans/q20.txt index 6d3ef1f6cc75..e5398325e966 100644 --- a/benchmarks/expected-plans/q20.txt +++ b/benchmarks/expected-plans/q20.txt @@ -1,6 +1,6 @@ Sort: supplier.s_name ASC NULLS LAST Projection: supplier.s_name, supplier.s_address - Semi Join: supplier.s_suppkey = __sq_2.ps_suppkey + LeftSemi Join: supplier.s_suppkey = __sq_2.ps_suppkey Inner Join: supplier.s_nationkey = nation.n_nationkey TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey] Filter: nation.n_name = Utf8("CANADA") @@ -8,7 +8,7 @@ Sort: supplier.s_name ASC NULLS LAST Projection: partsupp.ps_suppkey AS ps_suppkey, alias=__sq_2 Filter: CAST(partsupp.ps_availqty AS Decimal128(38, 17)) > __sq_3.__value Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey - Semi Join: partsupp.ps_partkey = __sq_1.p_partkey + LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty] Projection: part.p_partkey AS p_partkey, alias=__sq_1 Filter: part.p_name LIKE Utf8("forest%") diff --git a/benchmarks/expected-plans/q21.txt b/benchmarks/expected-plans/q21.txt index f5aa1dc84cf9..397e0a8d8cf6 100644 --- a/benchmarks/expected-plans/q21.txt +++ b/benchmarks/expected-plans/q21.txt @@ -1,8 +1,8 @@ Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS LAST Projection: supplier.s_name, COUNT(UInt8(1)) AS numwait Aggregate: groupBy=[[supplier.s_name]], aggr=[[COUNT(UInt8(1))]] - Anti Join: l1.l_orderkey = l3.l_orderkey Filter: l3.l_suppkey != l1.l_suppkey - Semi Join: l1.l_orderkey = l2.l_orderkey Filter: l2.l_suppkey != l1.l_suppkey + LeftAnti Join: l1.l_orderkey = l3.l_orderkey Filter: l3.l_suppkey != l1.l_suppkey + LeftSemi Join: l1.l_orderkey = l2.l_orderkey Filter: l2.l_suppkey != l1.l_suppkey Inner Join: supplier.s_nationkey = nation.n_nationkey Inner Join: l1.l_orderkey = orders.o_orderkey Inner Join: supplier.s_suppkey = l1.l_suppkey diff --git a/benchmarks/expected-plans/q22.txt b/benchmarks/expected-plans/q22.txt index 919372f04a96..b56c8ff96fc7 100644 --- a/benchmarks/expected-plans/q22.txt +++ b/benchmarks/expected-plans/q22.txt @@ -5,7 +5,7 @@ Sort: custsale.cntrycode ASC NULLS LAST Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal, alias=custsale Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value CrossJoin: - Anti Join: customer.c_custkey = orders.o_custkey + LeftAnti Join: customer.c_custkey = orders.o_custkey Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) TableScan: customer projection=[c_custkey, c_phone, c_acctbal] TableScan: orders projection=[o_custkey] diff --git a/benchmarks/expected-plans/q4.txt b/benchmarks/expected-plans/q4.txt index a4339732e68f..3610ae175adc 100644 --- a/benchmarks/expected-plans/q4.txt +++ b/benchmarks/expected-plans/q4.txt @@ -1,7 +1,7 @@ Sort: orders.o_orderpriority ASC NULLS LAST Projection: orders.o_orderpriority, COUNT(UInt8(1)) AS order_count Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[COUNT(UInt8(1))]] - Semi Join: orders.o_orderkey = lineitem.l_orderkey + LeftSemi Join: orders.o_orderkey = lineitem.l_orderkey Filter: orders.o_orderdate >= Date32("8582") AND orders.o_orderdate < Date32("8674") TableScan: orders projection=[o_orderkey, o_orderdate, o_orderpriority] Filter: lineitem.l_commitdate < lineitem.l_receiptdate diff --git a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs index 6817001d374a..0149210469af 100644 --- a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs +++ b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs @@ -70,8 +70,13 @@ fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan) - fn supports_swap(join_type: JoinType) -> bool { match join_type { - JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => true, - JoinType::Semi | JoinType::Anti => false, + JoinType::Inner + | JoinType::Left + | JoinType::Right + | JoinType::Full + | JoinType::LeftSemi + | JoinType::RightSemi => true, + JoinType::LeftAnti => false, } } @@ -81,6 +86,8 @@ fn swap_join_type(join_type: JoinType) -> JoinType { JoinType::Full => JoinType::Full, JoinType::Left => JoinType::Right, JoinType::Right => JoinType::Left, + JoinType::LeftSemi => JoinType::RightSemi, + JoinType::RightSemi => JoinType::LeftSemi, _ => unreachable!(), } } diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index ff036b78b32e..b41c0df1eb0d 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -653,7 +653,7 @@ fn build_batch( (left_indices, right_indices) }; - if matches!(join_type, JoinType::Semi | JoinType::Anti) { + if matches!(join_type, JoinType::LeftSemi | JoinType::LeftAnti) { return Ok(( RecordBatch::new_empty(Arc::new(schema.clone())), left_filtered_indices, @@ -719,7 +719,7 @@ fn build_join_indexes( let left = &left_data.0; match join_type { - JoinType::Inner | JoinType::Semi | JoinType::Anti => { + JoinType::Inner | JoinType::LeftSemi | JoinType::LeftAnti => { // Using a buffer builder to avoid slower normal builder let mut left_indices = UInt64BufferBuilder::new(0); let mut right_indices = UInt32BufferBuilder::new(0); @@ -765,6 +765,54 @@ fn build_join_indexes( PrimitiveArray::::from(right), )) } + JoinType::RightSemi => { + let mut left_indices = UInt64BufferBuilder::new(0); + let mut right_indices = UInt32BufferBuilder::new(0); + + // Visit all of the right rows + for (row, hash_value) in hash_values.iter().enumerate() { + // Get the hash and find it in the build index + + // For every item on the left and right we check if it matches + // This possibly contains rows with hash collisions, + // So we have to check here whether rows are equal or not + // We only produce one row if there is a match + if let Some((_, indices)) = + left.0.get(*hash_value, |(hash, _)| *hash_value == *hash) + { + for &i in indices { + // Check hash collisions + if equal_rows( + i as usize, + row, + &left_join_values, + &keys_values, + *null_equals_null, + )? { + left_indices.append(i); + right_indices.append(row as u32); + break; + } + } + } + } + + let left = ArrayData::builder(DataType::UInt64) + .len(left_indices.len()) + .add_buffer(left_indices.finish()) + .build() + .unwrap(); + let right = ArrayData::builder(DataType::UInt32) + .len(right_indices.len()) + .add_buffer(right_indices.finish()) + .build() + .unwrap(); + + Ok(( + PrimitiveArray::::from(left), + PrimitiveArray::::from(right), + )) + } JoinType::Left => { let mut left_indices = UInt64Builder::with_capacity(0); let mut right_indices = UInt32Builder::with_capacity(0); @@ -853,7 +901,11 @@ fn apply_join_filter( )?; match join_type { - JoinType::Inner | JoinType::Left | JoinType::Anti | JoinType::Semi => { + JoinType::Inner + | JoinType::Left + | JoinType::LeftAnti + | JoinType::LeftSemi + | JoinType::RightSemi => { // For both INNER and LEFT joins, input arrays contains only indices for matched data. // Due to this fact it's correct to simply apply filter to intermediate batch and return // indices for left/right rows satisfying filter predicate @@ -1280,14 +1332,19 @@ impl HashJoinStream { let visited_left_side = self.visited_left_side.get_or_insert_with(|| { let num_rows = left_data.1.num_rows(); match self.join_type { - JoinType::Left | JoinType::Full | JoinType::Semi | JoinType::Anti => { + JoinType::Left + | JoinType::Full + | JoinType::LeftSemi + | JoinType::LeftAnti => { let mut buffer = BooleanBufferBuilder::new(num_rows); buffer.append_n(num_rows, false); buffer } - JoinType::Inner | JoinType::Right => BooleanBufferBuilder::new(0), + JoinType::Inner | JoinType::Right | JoinType::RightSemi => { + BooleanBufferBuilder::new(0) + } } }); @@ -1318,13 +1375,13 @@ impl HashJoinStream { match self.join_type { JoinType::Left | JoinType::Full - | JoinType::Semi - | JoinType::Anti => { + | JoinType::LeftSemi + | JoinType::LeftAnti => { left_side.iter().flatten().for_each(|x| { visited_left_side.set_bit(x as usize, true); }); } - JoinType::Inner | JoinType::Right => {} + JoinType::Inner | JoinType::Right | JoinType::RightSemi => {} } } Some(result.map(|x| x.0)) @@ -1335,8 +1392,8 @@ impl HashJoinStream { match self.join_type { JoinType::Left | JoinType::Full - | JoinType::Semi - | JoinType::Anti + | JoinType::LeftSemi + | JoinType::LeftAnti if !self.is_exhausted => { let result = produce_from_matched( @@ -1344,7 +1401,7 @@ impl HashJoinStream { &self.schema, &self.column_indices, left_data, - self.join_type != JoinType::Semi, + self.join_type != JoinType::LeftSemi, ); if let Ok(ref batch) = result { self.join_metrics.input_batches.add(1); @@ -1360,8 +1417,9 @@ impl HashJoinStream { } JoinType::Left | JoinType::Full - | JoinType::Semi - | JoinType::Anti + | JoinType::LeftSemi + | JoinType::RightSemi + | JoinType::LeftAnti | JoinType::Inner | JoinType::Right => {} } @@ -2094,7 +2152,49 @@ mod tests { Column::new_with_schema("b1", &right.schema())?, )]; - let join = join(left, right, on, &JoinType::Semi, false)?; + let join = join(left, right, on, &JoinType::LeftSemi, false)?; + + let columns = columns(&join.schema()); + assert_eq!(columns, vec!["a1", "b1", "c1"]); + + let stream = join.execute(0, task_ctx)?; + let batches = common::collect(stream).await?; + + let expected = vec![ + "+----+----+----+", + "| a1 | b1 | c1 |", + "+----+----+----+", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 2 | 5 | 8 |", + "+----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + Ok(()) + } + + #[tokio::test] + async fn join_right_semi() -> Result<()> { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let left = build_table( + ("a2", &vec![10, 20, 30, 40]), + ("b2", &vec![4, 5, 6, 5]), // 5 is double on the left + ("c2", &vec![70, 80, 90, 100]), + ); + let right = build_table( + ("a1", &vec![1, 2, 2, 3]), + ("b1", &vec![4, 5, 5, 7]), // 7 does not exist on the left + ("c1", &vec![7, 8, 8, 9]), + ); + + let on = vec![( + Column::new_with_schema("b2", &left.schema())?, + Column::new_with_schema("b1", &right.schema())?, + )]; + + let join = join(left, right, on, &JoinType::RightSemi, false)?; let columns = columns(&join.schema()); assert_eq!(columns, vec!["a1", "b1", "c1"]); @@ -2135,7 +2235,7 @@ mod tests { Column::new_with_schema("b1", &right.schema())?, )]; - let join = join(left, right, on, &JoinType::Anti, false)?; + let join = join(left, right, on, &JoinType::LeftAnti, false)?; let columns = columns(&join.schema()); assert_eq!(columns, vec!["a1", "b1", "c1"]); @@ -2196,7 +2296,7 @@ mod tests { let filter = JoinFilter::new(filter_expression, column_indices, intermediate_schema); - let join = join_with_filter(left, right, on, filter, &JoinType::Anti, false)?; + let join = join_with_filter(left, right, on, filter, &JoinType::LeftAnti, false)?; let columns = columns(&join.schema()); assert_eq!(columns, vec!["col1", "col2", "col3"]); diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index 3de712745de4..dfcab88c277e 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -89,6 +89,12 @@ impl SortMergeJoinExec { let left_schema = left.schema(); let right_schema = right.schema(); + if join_type == JoinType::RightSemi { + return Err(DataFusionError::NotImplemented( + "SortMergeJoinExec does not support JoinType::RightSemi".to_string(), + )); + } + check_join_is_valid(&left_schema, &right_schema, &on)?; if sort_options.len() != on.len() { return Err(DataFusionError::Plan(format!( @@ -129,10 +135,11 @@ impl ExecutionPlan for SortMergeJoinExec { fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { match self.join_type { - JoinType::Inner | JoinType::Left | JoinType::Semi | JoinType::Anti => { - self.left.output_ordering() - } - JoinType::Right => self.right.output_ordering(), + JoinType::Inner + | JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti => self.left.output_ordering(), + JoinType::Right | JoinType::RightSemi => self.right.output_ordering(), JoinType::Full => None, } } @@ -173,14 +180,14 @@ impl ExecutionPlan for SortMergeJoinExec { JoinType::Inner | JoinType::Left | JoinType::Full - | JoinType::Anti - | JoinType::Semi => ( + | JoinType::LeftAnti + | JoinType::LeftSemi => ( self.left.clone(), self.right.clone(), self.on.iter().map(|on| on.0.clone()).collect(), self.on.iter().map(|on| on.1.clone()).collect(), ), - JoinType::Right => ( + JoinType::Right | JoinType::RightSemi => ( self.right.clone(), self.left.clone(), self.on.iter().map(|on| on.1.clone()).collect(), @@ -767,13 +774,17 @@ impl SMJStream { Ordering::Less => { if matches!( self.join_type, - JoinType::Left | JoinType::Right | JoinType::Full | JoinType::Anti + JoinType::Left + | JoinType::Right + | JoinType::RightSemi + | JoinType::Full + | JoinType::LeftAnti ) { join_streamed = !self.streamed_joined; } } Ordering::Equal => { - if matches!(self.join_type, JoinType::Semi) { + if matches!(self.join_type, JoinType::LeftSemi) { join_streamed = !self.streamed_joined; } if matches!( @@ -915,7 +926,7 @@ impl SMJStream { let buffered_indices: UInt64Array = chunk.buffered_indices.finish(); let mut buffered_columns = - if matches!(self.join_type, JoinType::Semi | JoinType::Anti) { + if matches!(self.join_type, JoinType::LeftSemi | JoinType::LeftAnti) { vec![] } else if let Some(buffered_idx) = chunk.buffered_batch_idx { self.buffered_data.batches[buffered_idx] @@ -1732,7 +1743,7 @@ mod tests { Column::new_with_schema("b1", &right.schema())?, )]; - let (_, batches) = join_collect(left, right, on, JoinType::Anti).await?; + let (_, batches) = join_collect(left, right, on, JoinType::LeftAnti).await?; let expected = vec![ "+----+----+----+", "| a1 | b1 | c1 |", @@ -1763,7 +1774,7 @@ mod tests { Column::new_with_schema("b1", &right.schema())?, )]; - let (_, batches) = join_collect(left, right, on, JoinType::Semi).await?; + let (_, batches) = join_collect(left, right, on, JoinType::LeftSemi).await?; let expected = vec![ "+----+----+----+", "| a1 | b1 | c1 |", diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index f937dc1c42a9..1d1560478ae4 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -169,8 +169,9 @@ fn output_join_field(old_field: &Field, join_type: &JoinType, is_left: bool) -> JoinType::Left => !is_left, // right input is padded with nulls JoinType::Right => is_left, // left input is padded with nulls JoinType::Full => true, // both inputs can be padded with nulls - JoinType::Semi => false, // doesn't introduce nulls - JoinType::Anti => false, // doesn't introduce nulls (or can it??) + JoinType::LeftSemi => false, // doesn't introduce nulls + JoinType::RightSemi => false, // doesn't introduce nulls + JoinType::LeftAnti => false, // doesn't introduce nulls (or can it??) }; if force_nullable { @@ -221,7 +222,7 @@ pub fn build_join_schema( // left then right left_fields.chain(right_fields).unzip() } - JoinType::Semi | JoinType::Anti => left + JoinType::LeftSemi | JoinType::LeftAnti => left .fields() .iter() .cloned() @@ -236,6 +237,21 @@ pub fn build_join_schema( ) }) .unzip(), + JoinType::RightSemi => right + .fields() + .iter() + .cloned() + .enumerate() + .map(|(index, f)| { + ( + f, + ColumnIndex { + index, + side: JoinSide::Right, + }, + ) + }) + .unzip(), }; (Schema::new(fields), column_indices) @@ -394,8 +410,9 @@ fn estimate_join_cardinality( }) } - JoinType::Semi => None, - JoinType::Anti => None, + JoinType::LeftSemi => None, + JoinType::LeftAnti => None, + JoinType::RightSemi => None, } } diff --git a/datafusion/core/tests/join_fuzz.rs b/datafusion/core/tests/join_fuzz.rs index 8d4f31af5173..1204b442819a 100644 --- a/datafusion/core/tests/join_fuzz.rs +++ b/datafusion/core/tests/join_fuzz.rs @@ -78,7 +78,7 @@ async fn test_semi_join_1k() { run_join_test( make_staggered_batches(10000), make_staggered_batches(10000), - JoinType::Semi, + JoinType::LeftSemi, ) .await } @@ -88,7 +88,7 @@ async fn test_anti_join_1k() { run_join_test( make_staggered_batches(10000), make_staggered_batches(10000), - JoinType::Anti, + JoinType::LeftAnti, ) .await } diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs index 8c77d860e2b4..ed65d43919b3 100644 --- a/datafusion/core/tests/sql/subqueries.rs +++ b/datafusion/core/tests/sql/subqueries.rs @@ -94,7 +94,7 @@ where o_orderstatus in ( let plan = ctx.optimize(&plan).unwrap(); let actual = format!("{}", plan.display_indent()); let expected = r#"Projection: orders.o_orderkey - Semi Join: orders.o_orderstatus = __sq_1.l_linestatus, orders.o_orderkey = __sq_1.l_orderkey + LeftSemi Join: orders.o_orderstatus = __sq_1.l_linestatus, orders.o_orderkey = __sq_1.l_orderkey TableScan: orders projection=[o_orderkey, o_orderstatus] Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey, alias=__sq_1 TableScan: lineitem projection=[l_orderkey, l_linestatus]"# @@ -205,7 +205,7 @@ async fn tpch_q4_correlated() -> Result<()> { let expected = r#"Sort: orders.o_orderpriority ASC NULLS LAST Projection: orders.o_orderpriority, COUNT(UInt8(1)) AS order_count Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[COUNT(UInt8(1))]] - Semi Join: orders.o_orderkey = lineitem.l_orderkey + LeftSemi Join: orders.o_orderkey = lineitem.l_orderkey TableScan: orders projection=[o_orderkey, o_orderpriority] Filter: lineitem.l_commitdate < lineitem.l_receiptdate TableScan: lineitem projection=[l_orderkey, l_commitdate, l_receiptdate]"# @@ -322,7 +322,7 @@ order by s_name; let actual = format!("{}", plan.display_indent()); let expected = r#"Sort: supplier.s_name ASC NULLS LAST Projection: supplier.s_name, supplier.s_address - Semi Join: supplier.s_suppkey = __sq_2.ps_suppkey + LeftSemi Join: supplier.s_suppkey = __sq_2.ps_suppkey Inner Join: supplier.s_nationkey = nation.n_nationkey TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey] Filter: nation.n_name = Utf8("CANADA") @@ -330,7 +330,7 @@ order by s_name; Projection: partsupp.ps_suppkey AS ps_suppkey, alias=__sq_2 Filter: CAST(partsupp.ps_availqty AS Decimal128(38, 17)) > __sq_3.__value Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey - Semi Join: partsupp.ps_partkey = __sq_1.p_partkey + LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty] Projection: part.p_partkey AS p_partkey, alias=__sq_1 Filter: part.p_name LIKE Utf8("forest%") @@ -386,7 +386,7 @@ order by cntrycode;"#; Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal, alias=custsale Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value CrossJoin: - Anti Join: customer.c_custkey = orders.o_custkey + LeftAnti Join: customer.c_custkey = orders.o_custkey Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])] TableScan: orders projection=[o_custkey] diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 631a64da6fd6..829aa6682baf 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -743,7 +743,7 @@ impl LogicalPlanBuilder { LogicalPlanBuilder::intersect_or_except( left_plan, right_plan, - JoinType::Semi, + JoinType::LeftSemi, is_all, ) } @@ -757,7 +757,7 @@ impl LogicalPlanBuilder { LogicalPlanBuilder::intersect_or_except( left_plan, right_plan, - JoinType::Anti, + JoinType::LeftAnti, is_all, ) } @@ -823,10 +823,11 @@ pub fn build_join_schema( // left then right left_fields.chain(right_fields).cloned().collect() } - JoinType::Semi | JoinType::Anti => { + JoinType::LeftSemi | JoinType::LeftAnti => { // Only use the left side for the schema left.fields().clone() } + JoinType::RightSemi => right.fields().clone(), }; let mut metadata = left.metadata().clone(); diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 284e8c9aa429..38b3a789a8e5 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -976,10 +976,12 @@ pub enum JoinType { Right, /// Full Join Full, - /// Semi Join - Semi, - /// Anti Join - Anti, + /// Left Semi Join + LeftSemi, + /// Right Semi Join + RightSemi, + /// Left Anti Join + LeftAnti, } impl Display for JoinType { @@ -989,8 +991,9 @@ impl Display for JoinType { JoinType::Left => "Left", JoinType::Right => "Right", JoinType::Full => "Full", - JoinType::Semi => "Semi", - JoinType::Anti => "Anti", + JoinType::LeftSemi => "LeftSemi", + JoinType::RightSemi => "RightSemi", + JoinType::LeftAnti => "LeftAnti", }; write!(f, "{}", join_type) } diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs index 78e8fc32d474..c8102debe721 100644 --- a/datafusion/optimizer/src/decorrelate_where_exists.rs +++ b/datafusion/optimizer/src/decorrelate_where_exists.rs @@ -176,8 +176,8 @@ fn optimize_exists( // join our sub query into the main plan let join_type = match query_info.negated { - true => JoinType::Anti, - false => JoinType::Semi, + true => JoinType::LeftAnti, + false => JoinType::LeftSemi, }; let mut new_plan = LogicalPlanBuilder::from(outer_input.clone()).join( &subqry_plan, @@ -230,8 +230,8 @@ mod tests { .build()?; let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, c_name:Utf8] - Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, c_name:Utf8] + LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, c_name:Utf8] + LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, c_name:Utf8] TableScan: customer [c_custkey:Int64, c_name:Utf8] TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; @@ -266,9 +266,9 @@ mod tests { .build()?; let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, c_name:Utf8] + LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, c_name:Utf8] TableScan: customer [c_custkey:Int64, c_name:Utf8] - Semi Join: orders.o_orderkey = lineitem.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] + LeftSemi Join: orders.o_orderkey = lineitem.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]"#; @@ -296,7 +296,7 @@ mod tests { .build()?; let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, c_name:Utf8] + LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, c_name:Utf8] TableScan: customer [c_custkey:Int64, c_name:Utf8] Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; @@ -451,7 +451,7 @@ mod tests { // Doesn't matter we projected an expression, just that we returned a result let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, c_name:Utf8] + LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, c_name:Utf8] TableScan: customer [c_custkey:Int64, c_name:Utf8] TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; @@ -475,7 +475,7 @@ mod tests { let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8] - Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, c_name:Utf8] + LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, c_name:Utf8] TableScan: customer [c_custkey:Int64, c_name:Utf8] TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; @@ -527,7 +527,7 @@ mod tests { .build()?; let expected = r#"Projection: test.c [c:UInt32] - Semi Join: test.a = sq.a [a:UInt32, b:UInt32, c:UInt32] + LeftSemi Join: test.a = sq.a [a:UInt32, b:UInt32, c:UInt32] TableScan: test [a:UInt32, b:UInt32, c:UInt32] TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#; diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index 052ed796a407..fa0367c454d7 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -189,8 +189,8 @@ fn optimize_where_in( // join our sub query into the main plan let join_type = match query_info.negated { - true => JoinType::Anti, - false => JoinType::Semi, + true => JoinType::LeftAnti, + false => JoinType::LeftSemi, }; let mut new_plan = LogicalPlanBuilder::from(outer_input.clone()).join( &subqry_plan, @@ -259,8 +259,8 @@ mod tests { debug!("plan to optimize:\n{}", plan.display_indent()); let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Semi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8] - Semi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8] + LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8] + LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8] TableScan: customer [c_custkey:Int64, c_name:Utf8] Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64] TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] @@ -297,10 +297,10 @@ mod tests { .build()?; let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Semi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8] + LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8] TableScan: customer [c_custkey:Int64, c_name:Utf8] Projection: orders.o_custkey AS o_custkey, alias=__sq_2 [o_custkey:Int64] - Semi Join: orders.o_orderkey = __sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] + LeftSemi Join: orders.o_orderkey = __sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] Projection: lineitem.l_orderkey AS l_orderkey, alias=__sq_1 [l_orderkey:Int64] TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]"#; @@ -329,7 +329,7 @@ mod tests { .build()?; let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Semi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8] + LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8] TableScan: customer [c_custkey:Int64, c_name:Utf8] Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64] Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] @@ -356,7 +356,7 @@ mod tests { // Query will fail, but we can still transform the plan let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Semi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8] + LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8] TableScan: customer [c_custkey:Int64, c_name:Utf8] Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64] Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] @@ -382,7 +382,7 @@ mod tests { .build()?; let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Semi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8] + LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8] TableScan: customer [c_custkey:Int64, c_name:Utf8] Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64] Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] @@ -408,7 +408,7 @@ mod tests { .build()?; let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] - Semi Join: customer.c_custkey = __sq_1.o_custkey Filter: customer.c_custkey != orders.o_custkey [c_custkey:Int64, c_name:Utf8] + LeftSemi Join: customer.c_custkey = __sq_1.o_custkey Filter: customer.c_custkey != orders.o_custkey [c_custkey:Int64, c_name:Utf8] TableScan: customer [c_custkey:Int64, c_name:Utf8] Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64] TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; @@ -585,7 +585,7 @@ mod tests { let expected = r#"Projection: customer.c_custkey [c_custkey:Int64] Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8] - Semi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8] + LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8] TableScan: customer [c_custkey:Int64, c_name:Utf8] Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64] TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#; @@ -641,7 +641,7 @@ mod tests { .build()?; let expected = r#"Projection: test.b [b:UInt32] - Semi Join: test.c = __sq_1.c, test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32] + LeftSemi Join: test.c = __sq_1.c, test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32] TableScan: test [a:UInt32, b:UInt32, c:UInt32] Projection: sq.c AS c, sq.a AS a, alias=__sq_1 [c:UInt32, a:UInt32] TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#; @@ -660,7 +660,7 @@ mod tests { .build()?; let expected = r#"Projection: test.b [b:UInt32] - Semi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32] + LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32] TableScan: test [a:UInt32, b:UInt32, c:UInt32] Projection: sq.c AS c, alias=__sq_1 [c:UInt32] TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#; @@ -679,7 +679,7 @@ mod tests { .build()?; let expected = r#"Projection: test.b [b:UInt32] - Anti Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32] + LeftAnti Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32] TableScan: test [a:UInt32, b:UInt32, c:UInt32] Projection: sq.c AS c, alias=__sq_1 [c:UInt32] TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#; diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs index 148ae6715ddb..902b02817f65 100644 --- a/datafusion/optimizer/src/filter_push_down.rs +++ b/datafusion/optimizer/src/filter_push_down.rs @@ -176,7 +176,10 @@ fn lr_is_preserved(plan: &LogicalPlan) -> Result<(bool, bool)> { JoinType::Full => Ok((false, false)), // No columns from the right side of the join can be referenced in output // predicates for semi/anti joins, so whether we specify t/f doesn't matter. - JoinType::Semi | JoinType::Anti => Ok((true, false)), + JoinType::LeftSemi | JoinType::LeftAnti => Ok((true, false)), + // No columns from the left side of the join can be referenced in output + // predicates for semi/anti joins, so whether we specify t/f doesn't matter. + JoinType::RightSemi => Ok((false, true)), }, LogicalPlan::CrossJoin(_) => Ok((true, true)), _ => Err(DataFusionError::Internal( @@ -195,7 +198,7 @@ fn on_lr_is_preserved(plan: &LogicalPlan) -> Result<(bool, bool)> { JoinType::Left => Ok((false, true)), JoinType::Right => Ok((true, false)), JoinType::Full => Ok((false, false)), - JoinType::Semi | JoinType::Anti => { + JoinType::LeftSemi | JoinType::LeftAnti | JoinType::RightSemi => { // filter_push_down does not yet support SEMI/ANTI joins with join conditions Ok((false, false)) } diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs index 29f51a42f4e1..c1717da0dce7 100644 --- a/datafusion/optimizer/src/subquery_filter_to_join.rs +++ b/datafusion/optimizer/src/subquery_filter_to_join.rs @@ -116,9 +116,9 @@ impl OptimizerRule for SubqueryFilterToJoin { }; let join_type = if *negated { - JoinType::Anti + JoinType::LeftAnti } else { - JoinType::Semi + JoinType::LeftSemi }; let schema = build_join_schema( @@ -231,7 +231,7 @@ mod tests { .build()?; let expected = "Projection: test.b [b:UInt32]\ - \n Semi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ \n Projection: sq.c [c:UInt32]\ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; @@ -250,7 +250,7 @@ mod tests { .build()?; let expected = "Projection: test.b [b:UInt32]\ - \n Anti Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftAnti Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ \n Projection: sq.c [c:UInt32]\ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; @@ -272,8 +272,8 @@ mod tests { .build()?; let expected = "Projection: test.b [b:UInt32]\ - \n Semi Join: test.b = sq_2.c [a:UInt32, b:UInt32, c:UInt32]\ - \n Semi Join: test.c = sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.b = sq_2.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ \n Projection: sq_1.c [c:UInt32]\ \n TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\ @@ -301,7 +301,7 @@ mod tests { let expected = "Projection: test.b [b:UInt32]\ \n Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\ - \n Semi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ \n Projection: sq.c [c:UInt32]\ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; @@ -381,10 +381,10 @@ mod tests { .build()?; let expected = "Projection: test.b [b:UInt32]\ - \n Semi Join: test.b = sq.a [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.b = sq.a [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ \n Projection: sq.a [a:UInt32]\ - \n Semi Join: sq.a = sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: sq.a = sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\ \n Projection: sq_nested.c [c:UInt32]\ \n TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]"; @@ -413,7 +413,7 @@ mod tests { \n Subquery: [c:UInt32]\n Projection: sq_outer.c [c:UInt32]\ \n TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\ \n Projection: test.b, test.c, alias=wrapped [b:UInt32, c:UInt32]\ - \n Semi Join: test.c = sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ \n Projection: sq_inner.c [c:UInt32]\ \n TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]"; diff --git a/datafusion/optimizer/tests/integration-test.rs b/datafusion/optimizer/tests/integration-test.rs index 8003d590ec10..71a18b62455d 100644 --- a/datafusion/optimizer/tests/integration-test.rs +++ b/datafusion/optimizer/tests/integration-test.rs @@ -117,7 +117,7 @@ fn semi_join_with_join_filter() -> Result<()> { AND test.col_uint32 != t2.col_uint32)"; let plan = test_sql(sql)?; let expected = "Projection: test.col_utf8\ - \n Semi Join: test.col_int32 = t2.col_int32 Filter: test.col_uint32 != t2.col_uint32\ + \n LeftSemi Join: test.col_int32 = t2.col_int32 Filter: test.col_uint32 != t2.col_uint32\ \n TableScan: test projection=[col_int32, col_uint32, col_utf8]\ \n SubqueryAlias: t2\ \n TableScan: test projection=[col_int32, col_uint32, col_utf8]"; @@ -133,7 +133,7 @@ fn anti_join_with_join_filter() -> Result<()> { AND test.col_uint32 != t2.col_uint32)"; let plan = test_sql(sql)?; let expected = "Projection: test.col_utf8\ - \n Anti Join: test.col_int32 = t2.col_int32 Filter: test.col_uint32 != t2.col_uint32\ + \n LeftAnti Join: test.col_int32 = t2.col_int32 Filter: test.col_uint32 != t2.col_uint32\ \n TableScan: test projection=[col_int32, col_uint32, col_utf8]\ \n SubqueryAlias: t2\ \n TableScan: test projection=[col_int32, col_uint32, col_utf8]"; @@ -148,7 +148,7 @@ fn where_exists_distinct() -> Result<()> { SELECT DISTINCT col_int32 FROM test t2 WHERE test.col_int32 = t2.col_int32)"; let plan = test_sql(sql)?; let expected = "Projection: test.col_int32\ - \n Semi Join: test.col_int32 = t2.col_int32\ + \n LeftSemi Join: test.col_int32 = t2.col_int32\ \n TableScan: test projection=[col_int32]\ \n SubqueryAlias: t2\ \n TableScan: test projection=[col_int32]"; @@ -163,9 +163,9 @@ fn intersect() -> Result<()> { INTERSECT SELECT col_int32, col_utf8 FROM test"; let plan = test_sql(sql)?; let expected = - "Semi Join: test.col_int32 = test.col_int32, test.col_utf8 = test.col_utf8\ + "LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = test.col_utf8\ \n Distinct:\ - \n Semi Join: test.col_int32 = test.col_int32, test.col_utf8 = test.col_utf8\ + \n LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = test.col_utf8\ \n Distinct:\ \n TableScan: test projection=[col_int32, col_utf8]\ \n TableScan: test projection=[col_int32, col_utf8]\ diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index a878f285d94d..78b5669ed888 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -230,8 +230,9 @@ enum JoinType { LEFT = 1; RIGHT = 2; FULL = 3; - SEMI = 4; - ANTI = 5; + LEFTSEMI = 4; + LEFTANTI = 5; + RIGHTSEMI = 6; } enum JoinConstraint { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index e6b095b8430a..e8110b431366 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -6402,8 +6402,9 @@ impl serde::Serialize for JoinType { Self::Left => "LEFT", Self::Right => "RIGHT", Self::Full => "FULL", - Self::Semi => "SEMI", - Self::Anti => "ANTI", + Self::Leftsemi => "LEFTSEMI", + Self::Leftanti => "LEFTANTI", + Self::Rightsemi => "RIGHTSEMI", }; serializer.serialize_str(variant) } @@ -6419,8 +6420,9 @@ impl<'de> serde::Deserialize<'de> for JoinType { "LEFT", "RIGHT", "FULL", - "SEMI", - "ANTI", + "LEFTSEMI", + "LEFTANTI", + "RIGHTSEMI", ]; struct GeneratedVisitor; @@ -6467,8 +6469,9 @@ impl<'de> serde::Deserialize<'de> for JoinType { "LEFT" => Ok(JoinType::Left), "RIGHT" => Ok(JoinType::Right), "FULL" => Ok(JoinType::Full), - "SEMI" => Ok(JoinType::Semi), - "ANTI" => Ok(JoinType::Anti), + "LEFTSEMI" => Ok(JoinType::Leftsemi), + "LEFTANTI" => Ok(JoinType::Leftanti), + "RIGHTSEMI" => Ok(JoinType::Rightsemi), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 149edf020a89..cf6978faf19e 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1110,8 +1110,9 @@ pub enum JoinType { Left = 1, Right = 2, Full = 3, - Semi = 4, - Anti = 5, + Leftsemi = 4, + Leftanti = 5, + Rightsemi = 6, } impl JoinType { /// String value of the enum field names used in the ProtoBuf definition. @@ -1124,8 +1125,9 @@ impl JoinType { JoinType::Left => "LEFT", JoinType::Right => "RIGHT", JoinType::Full => "FULL", - JoinType::Semi => "SEMI", - JoinType::Anti => "ANTI", + JoinType::Leftsemi => "LEFTSEMI", + JoinType::Leftanti => "LEFTANTI", + JoinType::Rightsemi => "RIGHTSEMI", } } } diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 278130b06fee..1f0e40895b2c 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -239,8 +239,9 @@ impl From for JoinType { protobuf::JoinType::Left => JoinType::Left, protobuf::JoinType::Right => JoinType::Right, protobuf::JoinType::Full => JoinType::Full, - protobuf::JoinType::Semi => JoinType::Semi, - protobuf::JoinType::Anti => JoinType::Anti, + protobuf::JoinType::Leftsemi => JoinType::LeftSemi, + protobuf::JoinType::Rightsemi => JoinType::RightSemi, + protobuf::JoinType::Leftanti => JoinType::LeftAnti, } } } @@ -252,8 +253,9 @@ impl From for protobuf::JoinType { JoinType::Left => protobuf::JoinType::Left, JoinType::Right => protobuf::JoinType::Right, JoinType::Full => protobuf::JoinType::Full, - JoinType::Semi => protobuf::JoinType::Semi, - JoinType::Anti => protobuf::JoinType::Anti, + JoinType::LeftSemi => protobuf::JoinType::Leftsemi, + JoinType::RightSemi => protobuf::JoinType::Rightsemi, + JoinType::LeftAnti => protobuf::JoinType::Leftanti, } } }