diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index eb2ec33d08699..2edd0c7ee5e3d 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -63,9 +63,18 @@ use crate::physical_plan::coalesce_batches::concat_batches; use log::debug; // Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value. +// +// Note that the `u64` keys are not stored in the hashmap (hence the `()` as key), but are only used +// to put the indices in a certain bucket. +// By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the left side, +// we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value. // E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1 // As the key is a hash value, we need to check possible hash collisions in the probe stage -type JoinHashMap = HashMap, IdHashBuilder>; +// During this stage it might be the case that a row is contained the same hashmap value, +// but the values don't match. Those are checked in the [equal_rows] macro +// TODO: speed up collission check and move away from using a hashbrown HashMap +// https://github.com/apache/arrow-datafusion/issues/50 +type JoinHashMap = HashMap<(), SmallVec<[u64; 1]>, IdHashBuilder>; type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>; /// join execution plan executes partitions in parallel and combines them into a set of @@ -255,34 +264,33 @@ impl ExecutionPlan for HashJoinExec { // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. - let initial = ( - JoinHashMap::with_hasher(IdHashBuilder {}), - Vec::new(), - 0, - Vec::new(), - ); - let (hashmap, batches, num_rows, _) = stream + let initial = (0, Vec::new()); + let (num_rows, batches) = stream .try_fold(initial, |mut acc, batch| async { - let hash = &mut acc.0; - let values = &mut acc.1; - let offset = acc.2; - acc.3.clear(); - acc.3.resize(batch.num_rows(), 0); - update_hash( - &on_left, - &batch, - hash, - offset, - &self.random_state, - &mut acc.3, - ) - .unwrap(); - acc.2 += batch.num_rows(); - values.push(batch); + acc.0 += batch.num_rows(); + acc.1.push(batch); Ok(acc) }) .await?; - + let mut hashmap = JoinHashMap::with_capacity_and_hasher( + num_rows, + IdHashBuilder {}, + ); + let mut hashes_buffer = Vec::new(); + let mut offset = 0; + for batch in batches.iter() { + hashes_buffer.clear(); + hashes_buffer.resize(batch.num_rows(), 0); + update_hash( + &on_left, + &batch, + &mut hashmap, + offset, + &self.random_state, + &mut hashes_buffer, + )?; + offset += batch.num_rows(); + } // Merge all batches into a single batch, so we // can directly index into the arrays let single_batch = @@ -311,34 +319,31 @@ impl ExecutionPlan for HashJoinExec { // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. - let initial = ( - JoinHashMap::with_hasher(IdHashBuilder {}), - Vec::new(), - 0, - Vec::new(), - ); - let (hashmap, batches, num_rows, _) = stream + let initial = (0, Vec::new()); + let (num_rows, batches) = stream .try_fold(initial, |mut acc, batch| async { - let hash = &mut acc.0; - let values = &mut acc.1; - let offset = acc.2; - acc.3.clear(); - acc.3.resize(batch.num_rows(), 0); - update_hash( - &on_left, - &batch, - hash, - offset, - &self.random_state, - &mut acc.3, - ) - .unwrap(); - acc.2 += batch.num_rows(); - values.push(batch); + acc.0 += batch.num_rows(); + acc.1.push(batch); Ok(acc) }) .await?; - + let mut hashmap = + JoinHashMap::with_capacity_and_hasher(num_rows, IdHashBuilder {}); + let mut hashes_buffer = Vec::new(); + let mut offset = 0; + for batch in batches.iter() { + hashes_buffer.clear(); + hashes_buffer.resize(batch.num_rows(), 0); + update_hash( + &on_left, + &batch, + &mut hashmap, + offset, + &self.random_state, + &mut hashes_buffer, + )?; + offset += batch.num_rows(); + } // Merge all batches into a single batch, so we // can directly index into the arrays let single_batch = @@ -399,15 +404,23 @@ fn update_hash( .map(|name| Ok(col(name).evaluate(batch)?.into_array(batch.num_rows()))) .collect::>>()?; - // update the hash map + // calculate the hash values let hash_values = create_hashes(&keys_values, &random_state, hashes_buffer)?; // insert hashes to key of the hashmap for (row, hash_value) in hash_values.iter().enumerate() { - hash.raw_entry_mut() - .from_key_hashed_nocheck(*hash_value, hash_value) - .and_modify(|_, v| v.push((row + offset) as u64)) - .or_insert_with(|| (*hash_value, smallvec![(row + offset) as u64])); + match hash.raw_entry_mut().from_hash(*hash_value, |_| true) { + hashbrown::hash_map::RawEntryMut::Occupied(mut entry) => { + entry.get_mut().push((row + offset) as u64); + } + hashbrown::hash_map::RawEntryMut::Vacant(entry) => { + entry.insert_hashed_nocheck( + *hash_value, + (), + smallvec![(row + offset) as u64], + ); + } + }; } Ok(()) } @@ -574,7 +587,9 @@ fn build_join_indexes( // For every item on the left and right we check if it matches // This possibly contains rows with hash collisions, // So we have to check here whether rows are equal or not - if let Some(indices) = left.get(hash_value) { + if let Some((_, indices)) = + left.raw_entry().from_hash(*hash_value, |_| true) + { for &i in indices { // Check hash collisions if equal_rows(i as usize, row, &left_join_values, &keys_values)? { @@ -611,7 +626,9 @@ fn build_join_indexes( // First visit all of the rows for (row, hash_value) in hash_values.iter().enumerate() { - if let Some(indices) = left.get(hash_value) { + if let Some((_, indices)) = + left.raw_entry().from_hash(*hash_value, |_| true) + { for &i in indices { // Collision check if equal_rows(i as usize, row, &left_join_values, &keys_values)? { @@ -638,8 +655,8 @@ fn build_join_indexes( let mut right_indices = UInt32Builder::new(0); for (row, hash_value) in hash_values.iter().enumerate() { - match left.get(hash_value) { - Some(indices) => { + match left.raw_entry().from_hash(*hash_value, |_| true) { + Some((_, indices)) => { for &i in indices { if equal_rows( i as usize, @@ -649,6 +666,9 @@ fn build_join_indexes( )? { left_indices.append_value(i)?; right_indices.append_value(row as u32)?; + } else { + left_indices.append_null()?; + right_indices.append_value(row as u32)?; } } } @@ -697,6 +717,7 @@ impl BuildHasher for IdHashBuilder { } // Combines two hashes into one hash +#[inline] fn combine_hashes(l: u64, r: u64) -> u64 { let hash = (17 * 37u64).wrapping_add(l); hash.wrapping_mul(37).wrapping_add(r) @@ -708,7 +729,6 @@ macro_rules! equal_rows_elem { let right_array = $r.as_any().downcast_ref::<$array_type>().unwrap(); match (left_array.is_null($left), left_array.is_null($right)) { - (true, true) => true, (false, false) => left_array.value($left) == right_array.value($right), _ => false, } @@ -755,21 +775,75 @@ fn equal_rows( } macro_rules! hash_array { - ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident) => { + ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => { let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); if array.null_count() == 0 { - for (i, hash) in $hashes.iter_mut().enumerate() { - *hash = - combine_hashes($ty::get_hash(&array.value(i), $random_state), *hash); - } - } else { - for (i, hash) in $hashes.iter_mut().enumerate() { - if !array.is_null(i) { + if $multi_col { + for (i, hash) in $hashes.iter_mut().enumerate() { *hash = combine_hashes( $ty::get_hash(&array.value(i), $random_state), *hash, ); } + } else { + for (i, hash) in $hashes.iter_mut().enumerate() { + *hash = $ty::get_hash(&array.value(i), $random_state); + } + } + } else { + if $multi_col { + for (i, hash) in $hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = combine_hashes( + $ty::get_hash(&array.value(i), $random_state), + *hash, + ); + } + } + } else { + for (i, hash) in $hashes.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = $ty::get_hash(&array.value(i), $random_state); + } + } + } + } + }; +} + +macro_rules! hash_array_primitive { + ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => { + let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); + let values = array.values(); + + if array.null_count() == 0 { + if $multi_col { + for (hash, value) in $hashes.iter_mut().zip(values.iter()) { + *hash = combine_hashes($ty::get_hash(value, $random_state), *hash); + } + } else { + for (hash, value) in $hashes.iter_mut().zip(values.iter()) { + *hash = $ty::get_hash(value, $random_state) + } + } + } else { + if $multi_col { + for (i, (hash, value)) in + $hashes.iter_mut().zip(values.iter()).enumerate() + { + if !array.is_null(i) { + *hash = + combine_hashes($ty::get_hash(value, $random_state), *hash); + } + } + } else { + for (i, (hash, value)) in + $hashes.iter_mut().zip(values.iter()).enumerate() + { + if !array.is_null(i) { + *hash = $ty::get_hash(value, $random_state); + } + } } } }; @@ -781,58 +855,140 @@ pub fn create_hashes<'a>( random_state: &RandomState, hashes_buffer: &'a mut Vec, ) -> Result<&'a mut Vec> { + // combine hashes with `combine_hashes` if we have more than 1 column + let multi_col = arrays.len() > 1; + for col in arrays { match col.data_type() { DataType::UInt8 => { - hash_array!(UInt8Array, col, u8, hashes_buffer, random_state); + hash_array_primitive!( + UInt8Array, + col, + u8, + hashes_buffer, + random_state, + multi_col + ); } DataType::UInt16 => { - hash_array!(UInt16Array, col, u16, hashes_buffer, random_state); + hash_array_primitive!( + UInt16Array, + col, + u16, + hashes_buffer, + random_state, + multi_col + ); } DataType::UInt32 => { - hash_array!(UInt32Array, col, u32, hashes_buffer, random_state); + hash_array_primitive!( + UInt32Array, + col, + u32, + hashes_buffer, + random_state, + multi_col + ); } DataType::UInt64 => { - hash_array!(UInt64Array, col, u64, hashes_buffer, random_state); + hash_array_primitive!( + UInt64Array, + col, + u64, + hashes_buffer, + random_state, + multi_col + ); } DataType::Int8 => { - hash_array!(Int8Array, col, i8, hashes_buffer, random_state); + hash_array_primitive!( + Int8Array, + col, + i8, + hashes_buffer, + random_state, + multi_col + ); } DataType::Int16 => { - hash_array!(Int16Array, col, i16, hashes_buffer, random_state); + hash_array_primitive!( + Int16Array, + col, + i16, + hashes_buffer, + random_state, + multi_col + ); } DataType::Int32 => { - hash_array!(Int32Array, col, i32, hashes_buffer, random_state); + hash_array_primitive!( + Int32Array, + col, + i32, + hashes_buffer, + random_state, + multi_col + ); } DataType::Int64 => { - hash_array!(Int64Array, col, i64, hashes_buffer, random_state); + hash_array_primitive!( + Int64Array, + col, + i64, + hashes_buffer, + random_state, + multi_col + ); } DataType::Timestamp(TimeUnit::Microsecond, None) => { - hash_array!( + hash_array_primitive!( TimestampMicrosecondArray, col, i64, hashes_buffer, - random_state + random_state, + multi_col ); } DataType::Timestamp(TimeUnit::Nanosecond, None) => { - hash_array!( + hash_array_primitive!( TimestampNanosecondArray, col, i64, hashes_buffer, - random_state + random_state, + multi_col ); } DataType::Boolean => { - hash_array!(BooleanArray, col, u8, hashes_buffer, random_state); + hash_array!( + BooleanArray, + col, + u8, + hashes_buffer, + random_state, + multi_col + ); } DataType::Utf8 => { - hash_array!(StringArray, col, str, hashes_buffer, random_state); + hash_array!( + StringArray, + col, + str, + hashes_buffer, + random_state, + multi_col + ); } DataType::LargeUtf8 => { - hash_array!(LargeStringArray, col, str, hashes_buffer, random_state); + hash_array!( + LargeStringArray, + col, + str, + hashes_buffer, + random_state, + multi_col + ); } _ => { // This is internal because we should have caught this before. @@ -1218,7 +1374,7 @@ mod tests { #[test] fn join_with_hash_collision() -> Result<()> { - let mut hashmap_left = HashMap::with_hasher(IdHashBuilder {}); + let mut hashmap_left = HashMap::with_capacity_and_hasher(2, IdHashBuilder {}); let left = build_table_i32( ("a", &vec![10, 20]), ("x", &vec![100, 200]), @@ -1231,8 +1387,18 @@ mod tests { create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?; // Create hash collisions - hashmap_left.insert(hashes[0], smallvec![0, 1]); - hashmap_left.insert(hashes[1], smallvec![0, 1]); + match hashmap_left.raw_entry_mut().from_hash(hashes[0], |_| true) { + hashbrown::hash_map::RawEntryMut::Vacant(entry) => { + entry.insert_hashed_nocheck(hashes[0], (), smallvec![0, 1]) + } + _ => unreachable!("Hash should not be vacant"), + }; + match hashmap_left.raw_entry_mut().from_hash(hashes[1], |_| true) { + hashbrown::hash_map::RawEntryMut::Vacant(entry) => { + entry.insert_hashed_nocheck(hashes[1], (), smallvec![0, 1]) + } + _ => unreachable!("Hash should not be vacant"), + }; let right = build_table_i32( ("a", &vec![10, 20]), diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 70baffc700ba2..79baeae35e961 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -2669,12 +2669,11 @@ async fn inner_join_qualified_names() -> Result<()> { } #[tokio::test] -#[ignore = "https://issues.apache.org/jira/browse/ARROW-12266"] async fn inner_join_nulls() { let sql = "SELECT * FROM (SELECT null AS id1) t1 INNER JOIN (SELECT null AS id2) t2 ON id1 = id2"; - let expected: &[&[&str]] = &[&[]]; + let expected: &[&[&str]] = &[]; let mut ctx = create_join_context_qualified().unwrap(); let actual = execute(&mut ctx, sql).await;