diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 6159a64be36b..b97f2806e695 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -716,14 +716,14 @@ impl RecordBatchStream for HashJoinStream { // "+----+----+-----+----+----+-----+", // "| a1 | b1 | c1 | a2 | b2 | c2 |", // "+----+----+-----+----+----+-----+", +// "| 9 | 8 | 90 | 8 | 8 | 80 |", // "| 11 | 8 | 110 | 8 | 8 | 80 |", // "| 13 | 10 | 130 | 10 | 10 | 100 |", // "| 13 | 10 | 130 | 12 | 10 | 120 |", -// "| 9 | 8 | 90 | 8 | 8 | 80 |", // "+----+----+-----+----+----+-----+" // And the result of build and probe indices are: -// Build indices: 5, 6, 6, 4 -// Probe indices: 3, 4, 5, 3 +// Build indices: 4, 5, 6, 6 +// Probe indices: 3, 3, 4, 5 #[allow(clippy::too_many_arguments)] pub fn build_equal_condition_join_indices( build_hashmap: &JoinHashMap, @@ -754,8 +754,36 @@ pub fn build_equal_condition_join_indices( // Using a buffer builder to avoid slower normal builder let mut build_indices = UInt64BufferBuilder::new(0); let mut probe_indices = UInt32BufferBuilder::new(0); - // Visit all of the probe rows - for (row, hash_value) in hash_values.iter().enumerate() { + // The chained list algorithm generates build indices for each probe row in a reversed sequence as such: + // Build Indices: [5, 4, 3] + // Probe Indices: [1, 1, 1] + // + // This affects the output sequence. Hypothetically, it's possible to preserve the lexicographic order on the build side. + // Let's consider probe rows [0,1] as an example: + // + // When the probe iteration sequence is reversed, the following pairings can be derived: + // + // For probe row 1: + // (5, 1) + // (4, 1) + // (3, 1) + // + // For probe row 0: + // (5, 0) + // (4, 0) + // (3, 0) + // + // After reversing both sets of indices, we obtain reversed indices: + // + // (3,0) + // (4,0) + // (5,0) + // (3,1) + // (4,1) + // (5,1) + // + // With this approach, the lexicographic order on both the probe side and the build side is preserved. + for (row, hash_value) in hash_values.iter().enumerate().rev() { // Get the hash and find it in the build index // For every item on the build and probe we check if it matches @@ -779,6 +807,9 @@ pub fn build_equal_condition_join_indices( } } } + // Reversing both sets of indices + build_indices.as_slice_mut().reverse(); + probe_indices.as_slice_mut().reverse(); let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None); let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None); diff --git a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt index fcc6d665c6da..daeb7aad9aa5 100644 --- a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt +++ b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt @@ -63,18 +63,18 @@ GlobalLimitExec: skip=0, fetch=5 --------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true # preserve_inner_join -query III nosort -SELECT t1.a, t2.a as a2, t2.b +query IIII nosort +SELECT t1.a, t1.b, t1.c, t2.a as a2 FROM annotated_data as t1 INNER JOIN annotated_data as t2 ON t1.d = t2.d ORDER BY a2, t2.b LIMIT 5 ---- -1 0 0 -1 0 0 -1 0 0 -1 0 0 -1 0 0 +0 0 0 0 +0 0 2 0 +0 0 3 0 +0 0 6 0 +0 0 20 0 query TT EXPLAIN SELECT t2.a as a2, t2.b