From e76f50b7af66227b2c205fbab5a46c6e722a9e7b Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Mon, 31 Jul 2023 17:46:33 +0300 Subject: [PATCH 1/5] Order fixing --- datafusion/core/src/physical_plan/joins/hash_join.rs | 4 +++- .../test_files/join_disable_repartition_joins.slt | 10 +++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index ce1d6dbcc083..8c364e55b361 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -757,7 +757,7 @@ pub fn build_equal_condition_join_indices( let mut build_indices = UInt64BufferBuilder::new(0); let mut probe_indices = UInt32BufferBuilder::new(0); // Visit all of the probe rows - for (row, hash_value) in hash_values.iter().enumerate() { + for (row, hash_value) in hash_values.iter().enumerate().rev() { // Get the hash and find it in the build index // For every item on the build and probe we check if it matches @@ -781,6 +781,8 @@ pub fn build_equal_condition_join_indices( } } } + build_indices.as_slice_mut().reverse(); + probe_indices.as_slice_mut().reverse(); let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None); let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None); diff --git a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt index fcc6d665c6da..4b33cab8def8 100644 --- a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt +++ b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt @@ -70,11 +70,11 @@ SELECT t1.a, t2.a as a2, t2.b ON t1.d = t2.d ORDER BY a2, t2.b LIMIT 5 ---- -1 0 0 -1 0 0 -1 0 0 -1 0 0 -1 0 0 +0 0 0 +0 0 0 +0 0 0 +0 0 0 +0 0 0 query TT EXPLAIN SELECT t2.a as a2, t2.b From 553a08c473f6f4fe2a658545e951ac2869bdb673 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Mon, 31 Jul 2023 17:54:59 +0300 Subject: [PATCH 2/5] Update join_disable_repartition_joins.slt --- .../test_files/join_disable_repartition_joins.slt | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt index 4b33cab8def8..daeb7aad9aa5 100644 --- a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt +++ b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt @@ -63,18 +63,18 @@ GlobalLimitExec: skip=0, fetch=5 --------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true # preserve_inner_join -query III nosort -SELECT t1.a, t2.a as a2, t2.b +query IIII nosort +SELECT t1.a, t1.b, t1.c, t2.a as a2 FROM annotated_data as t1 INNER JOIN annotated_data as t2 ON t1.d = t2.d ORDER BY a2, t2.b LIMIT 5 ---- -0 0 0 -0 0 0 -0 0 0 -0 0 0 -0 0 0 +0 0 0 0 +0 0 2 0 +0 0 3 0 +0 0 6 0 +0 0 20 0 query TT EXPLAIN SELECT t2.a as a2, t2.b From f831f9138dd1e70b5e74bd3eda4a0a7443390309 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Tue, 1 Aug 2023 10:08:25 +0300 Subject: [PATCH 3/5] Update hash_join.rs --- .../core/src/physical_plan/joins/hash_join.rs | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 8c364e55b361..ea954a746953 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -756,7 +756,39 @@ pub fn build_equal_condition_join_indices( // Using a buffer builder to avoid slower normal builder let mut build_indices = UInt64BufferBuilder::new(0); let mut probe_indices = UInt32BufferBuilder::new(0); - // Visit all of the probe rows + // The chained list algorithm generates build indices for each probe row in a reversed sequence: + // + // Probe Indices: [1, 1, 1] + // Build Indices: [5, 4, 3] + // + // This affects the output sequence. Hypothetically, it's possible to preserve the lexicographic order on the build side. + // Let's consider probe rows [0,1] as an example: + // + // When the probe iteration sequence is reversed, the following pairings can be derived: + // + // Probe Row 1: + // (5, 1) + // (4, 1) + // (3, 1) + // + // Probe Row 0: + // (5, 0) + // (4, 0) + // (3, 0) + // + // After reversing both sets of indices, we obtain: + // + // Reversed Indices for Probe Row 0: + // (3,0) + // (4,0) + // (5,0) + // + // Reversed Indices for Probe Row 1: + // (3,1) + // (4,1) + // (5,1) + // + // With this approach, the lexicographic order on both the probe side and the build side is preserved. for (row, hash_value) in hash_values.iter().enumerate().rev() { // Get the hash and find it in the build index From 44f40301979ebeb8f2ca901172d3eac4aaad52d2 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Tue, 1 Aug 2023 10:41:50 +0300 Subject: [PATCH 4/5] Update hash_join.rs --- datafusion/core/src/physical_plan/joins/hash_join.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index ea954a746953..251195d9de53 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -718,14 +718,14 @@ impl RecordBatchStream for HashJoinStream { // "+----+----+-----+----+----+-----+", // "| a1 | b1 | c1 | a2 | b2 | c2 |", // "+----+----+-----+----+----+-----+", +// "| 9 | 8 | 90 | 8 | 8 | 80 |", // "| 11 | 8 | 110 | 8 | 8 | 80 |", // "| 13 | 10 | 130 | 10 | 10 | 100 |", // "| 13 | 10 | 130 | 12 | 10 | 120 |", -// "| 9 | 8 | 90 | 8 | 8 | 80 |", // "+----+----+-----+----+----+-----+" // And the result of build and probe indices are: -// Build indices: 5, 6, 6, 4 -// Probe indices: 3, 4, 5, 3 +// Build indices: 4, 5, 6, 6 +// Probe indices: 3, 3, 4, 5 #[allow(clippy::too_many_arguments)] pub fn build_equal_condition_join_indices( build_hashmap: &JoinHashMap, From 1a744edabb20a06984b4101d5036e073190b9bb2 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Tue, 1 Aug 2023 10:47:25 +0300 Subject: [PATCH 5/5] Update hash_join.rs --- .../core/src/physical_plan/joins/hash_join.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 251195d9de53..83259e378326 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -756,34 +756,30 @@ pub fn build_equal_condition_join_indices( // Using a buffer builder to avoid slower normal builder let mut build_indices = UInt64BufferBuilder::new(0); let mut probe_indices = UInt32BufferBuilder::new(0); - // The chained list algorithm generates build indices for each probe row in a reversed sequence: - // - // Probe Indices: [1, 1, 1] + // The chained list algorithm generates build indices for each probe row in a reversed sequence as such: // Build Indices: [5, 4, 3] + // Probe Indices: [1, 1, 1] // // This affects the output sequence. Hypothetically, it's possible to preserve the lexicographic order on the build side. // Let's consider probe rows [0,1] as an example: // // When the probe iteration sequence is reversed, the following pairings can be derived: // - // Probe Row 1: + // For probe row 1: // (5, 1) // (4, 1) // (3, 1) // - // Probe Row 0: + // For probe row 0: // (5, 0) // (4, 0) // (3, 0) // - // After reversing both sets of indices, we obtain: + // After reversing both sets of indices, we obtain reversed indices: // - // Reversed Indices for Probe Row 0: // (3,0) // (4,0) // (5,0) - // - // Reversed Indices for Probe Row 1: // (3,1) // (4,1) // (5,1) @@ -813,6 +809,7 @@ pub fn build_equal_condition_join_indices( } } } + // Reversing both sets of indices build_indices.as_slice_mut().reverse(); probe_indices.as_slice_mut().reverse();