Skip to content

Commit

Permalink
HashJoin order fixing (#7155)
Browse files Browse the repository at this point in the history
* Order fixing

* Update join_disable_repartition_joins.slt

* Update hash_join.rs

* Update hash_join.rs

* Update hash_join.rs
  • Loading branch information
metesynnada authored Aug 2, 2023
1 parent 5faa10b commit 1e9816b
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 12 deletions.
41 changes: 36 additions & 5 deletions datafusion/core/src/physical_plan/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,14 +716,14 @@ impl RecordBatchStream for HashJoinStream {
// "+----+----+-----+----+----+-----+",
// "| a1 | b1 | c1 | a2 | b2 | c2 |",
// "+----+----+-----+----+----+-----+",
// "| 9 | 8 | 90 | 8 | 8 | 80 |",
// "| 11 | 8 | 110 | 8 | 8 | 80 |",
// "| 13 | 10 | 130 | 10 | 10 | 100 |",
// "| 13 | 10 | 130 | 12 | 10 | 120 |",
// "| 9 | 8 | 90 | 8 | 8 | 80 |",
// "+----+----+-----+----+----+-----+"
// And the result of build and probe indices are:
// Build indices: 5, 6, 6, 4
// Probe indices: 3, 4, 5, 3
// Build indices: 4, 5, 6, 6
// Probe indices: 3, 3, 4, 5
#[allow(clippy::too_many_arguments)]
pub fn build_equal_condition_join_indices(
build_hashmap: &JoinHashMap,
Expand Down Expand Up @@ -754,8 +754,36 @@ pub fn build_equal_condition_join_indices(
// Using a buffer builder to avoid slower normal builder
let mut build_indices = UInt64BufferBuilder::new(0);
let mut probe_indices = UInt32BufferBuilder::new(0);
// Visit all of the probe rows
for (row, hash_value) in hash_values.iter().enumerate() {
// The chained list algorithm generates build indices for each probe row in a reversed sequence as such:
// Build Indices: [5, 4, 3]
// Probe Indices: [1, 1, 1]
//
// This affects the output sequence. Hypothetically, it's possible to preserve the lexicographic order on the build side.
// Let's consider probe rows [0,1] as an example:
//
// When the probe iteration sequence is reversed, the following pairings can be derived:
//
// For probe row 1:
// (5, 1)
// (4, 1)
// (3, 1)
//
// For probe row 0:
// (5, 0)
// (4, 0)
// (3, 0)
//
// After reversing both sets of indices, we obtain reversed indices:
//
// (3,0)
// (4,0)
// (5,0)
// (3,1)
// (4,1)
// (5,1)
//
// With this approach, the lexicographic order on both the probe side and the build side is preserved.
for (row, hash_value) in hash_values.iter().enumerate().rev() {
// Get the hash and find it in the build index

// For every item on the build and probe we check if it matches
Expand All @@ -779,6 +807,9 @@ pub fn build_equal_condition_join_indices(
}
}
}
// Reversing both sets of indices
build_indices.as_slice_mut().reverse();
probe_indices.as_slice_mut().reverse();

let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None);
let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,18 @@ GlobalLimitExec: skip=0, fetch=5
--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true

# preserve_inner_join
query III nosort
SELECT t1.a, t2.a as a2, t2.b
query IIII nosort
SELECT t1.a, t1.b, t1.c, t2.a as a2
FROM annotated_data as t1
INNER JOIN annotated_data as t2
ON t1.d = t2.d ORDER BY a2, t2.b
LIMIT 5
----
1 0 0
1 0 0
1 0 0
1 0 0
1 0 0
0 0 0 0
0 0 2 0
0 0 3 0
0 0 6 0
0 0 20 0

query TT
EXPLAIN SELECT t2.a as a2, t2.b
Expand Down

0 comments on commit 1e9816b

Please sign in to comment.