From 8e26f9b83de069c2bb3b28a53064253f16d382bf Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Sun, 19 Jan 2025 18:54:14 +0200 Subject: [PATCH] fix: add build-side join keys to memory accounting --- .../physical-plan/src/joins/hash_join.rs | 52 +++++++++++++++---- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 8ccefb0397944..b1318c853849c 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -56,7 +56,7 @@ use arrow::array::{ Array, ArrayRef, BooleanArray, BooleanBufferBuilder, UInt32Array, UInt64Array, }; use arrow::compute::kernels::cmp::{eq, not_distinct}; -use arrow::compute::{and, concat_batches, take, FilterBuilder}; +use arrow::compute::{and, concat, concat_batches, take, FilterBuilder}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; @@ -914,6 +914,47 @@ async fn collect_left_input( }) .await?; + let batches_iter = batches.iter().rev(); + + let left_values = on_left + .iter() + .map(|expr| { + let mut key_reservation = 0; + let join_key_arrays = batches_iter + .clone() + .map(|batch| { + let array: Arc = expr + .evaluate(&batch) + .unwrap() + .into_array(batch.num_rows())?; + let array_size = array.get_array_memory_size(); + reservation.try_grow(array_size)?; + key_reservation += array_size; + Ok(array) + }) + .collect::>>()?; + + // `concat` function is non-consuming, so reserving x2 memory + // required for collected join key arrays (assuming worst case scenario) + reservation.try_grow(key_reservation)?; + let concatenated = concat( + join_key_arrays + .iter() + .map(AsRef::as_ref) + .collect::>() + .as_ref(), + )?; + + // Resizing reservation to its original size + concatenated array size + let build_size_mem = reservation.size() - key_reservation * 2 + + concatenated.get_array_memory_size(); + reservation.resize(build_size_mem); + metrics.build_mem_used.set(build_size_mem); + + Ok(concatenated) + }) + .collect::>>()?; + // Estimation of memory size, required for hashtable, prior to allocation. // Final result can be verified using `RawTable.allocation_info()` let fixed_size = size_of::(); @@ -928,7 +969,6 @@ async fn collect_left_input( let mut offset = 0; // Updating hashmap starting from the last batch - let batches_iter = batches.iter().rev(); for batch in batches_iter.clone() { hashes_buffer.clear(); hashes_buffer.resize(batch.num_rows(), 0); @@ -960,14 +1000,6 @@ async fn collect_left_input( BooleanBufferBuilder::new(0) }; - let left_values = on_left - .iter() - .map(|c| { - c.evaluate(&single_batch)? - .into_array(single_batch.num_rows()) - }) - .collect::>>()?; - let data = JoinLeftData::new( hashmap, single_batch,