diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 00edda8fa3a8..43179583c373 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -212,24 +212,36 @@ pub trait JoinHashMapType { deleted_offset: usize, ) { let (mut_map, mut_list) = self.get_mut(); - for (row, hash_value) in iter { - let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash); - if let Some((_, index)) = item { - // Already exists: add index to next array - let prev_index = *index; - // Store new value inside hashmap - *index = (row + 1) as u64; - // Update chained Vec at `row` with previous value - mut_list[row - deleted_offset] = prev_index; - } else { - mut_map.insert( - *hash_value, + for (row, &hash_value) in iter { + let item = mut_map.find_or_find_insert_slot( + hash_value, + |&(hash, _)| hash_value == hash, + |&(hash, _)| hash, + ); + match item { + Ok(bucket) => { + // Already exists: add index to next array + // SAFETY: is already initialized + let index = unsafe { bucket.as_mut() }; + let prev_index = index.1; + // Store new value inside hashmap + index.1 = (row + 1) as u64; + // Update chained Vec at `row` with previous value + mut_list[row - deleted_offset] = prev_index; + } + Err(slot) => { // store the value + 1 as 0 value reserved for end of list - (*hash_value, (row + 1) as u64), - |(hash, _)| *hash, - ); - // chained list at `row` is already initialized with 0 - // meaning end of list + // SAFETY: slot is valid (retrieved above) and not mutated + unsafe { + mut_map.insert_in_slot( + hash_value, + slot, + (hash_value, (row + 1) as u64), + ) + }; + // chained list at `row` is already initialized with 0 + // meaning end of list + } } } }