From ea1ea5aff3702df270e28ce886f1e5d0dc6e5d4e Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Thu, 30 Jan 2025 19:27:49 +0100 Subject: [PATCH] refactor(rust): Spawn threads on our rayon pool in new-streaming (#21012) --- .../src/nodes/joins/equi_join.rs | 129 +++++++++--------- .../src/utils/in_memory_linearize.rs | 2 +- 2 files changed, 68 insertions(+), 63 deletions(-) diff --git a/crates/polars-stream/src/nodes/joins/equi_join.rs b/crates/polars-stream/src/nodes/joins/equi_join.rs index 8bf61b7ca22c..ae91d7087c9f 100644 --- a/crates/polars-stream/src/nodes/joins/equi_join.rs +++ b/crates/polars-stream/src/nodes/joins/equi_join.rs @@ -4,6 +4,7 @@ use polars_core::prelude::*; use polars_core::schema::{Schema, SchemaExt}; use polars_core::series::IsSorted; use polars_core::utils::accumulate_dataframes_vertical_unchecked; +use polars_core::POOL; use polars_expr::chunked_idx_table::{new_chunked_idx_table, ChunkedIdxTable}; use polars_expr::hash_keys::HashKeys; use polars_ops::frame::{JoinArgs, JoinType, MaintainOrderJoin}; @@ -214,51 +215,37 @@ impl BuildState { } } - let track_unmatchable = params.emit_unmatched_build(); - let table_per_partition: Vec<_> = results_per_partition - .into_par_iter() - .with_max_len(1) - .map(|results| { - // Estimate sizes and cardinality. - let mut sketch = CardinalitySketch::new(); - let mut num_frames = 0; - for result in &results { - sketch.combine(result.sketch.as_ref().unwrap()); - num_frames += result.frames.len(); - } - - // Build table for this partition. - let mut combined_frames = Vec::with_capacity(num_frames); - let mut chunk_seq_ids = Vec::with_capacity(num_frames); - let mut table = table.new_empty(); - table.reserve(sketch.estimate() * 5 / 4); - if params.preserve_order_build { - let mut combined = Vec::with_capacity(num_frames); - for result in results { - for (hash_keys, (seq, frame)) in - result.hash_keys.into_iter().zip(result.frames) - { - combined.push((seq, hash_keys, frame)); - } + POOL.install(|| { + let track_unmatchable = params.emit_unmatched_build(); + let table_per_partition: Vec<_> = results_per_partition + .into_par_iter() + .with_max_len(1) + .map(|results| { + // Estimate sizes and cardinality. + let mut sketch = CardinalitySketch::new(); + let mut num_frames = 0; + for result in &results { + sketch.combine(result.sketch.as_ref().unwrap()); + num_frames += result.frames.len(); } - combined.sort_unstable_by_key(|c| c.0); - for (seq, hash_keys, frame) in combined { - // Zero-sized chunks can get deleted, so skip entirely to avoid messing - // up the chunk counter. - if frame.height() == 0 { - continue; + // Build table for this partition. + let mut combined_frames = Vec::with_capacity(num_frames); + let mut chunk_seq_ids = Vec::with_capacity(num_frames); + let mut table = table.new_empty(); + table.reserve(sketch.estimate() * 5 / 4); + if params.preserve_order_build { + let mut combined = Vec::with_capacity(num_frames); + for result in results { + for (hash_keys, (seq, frame)) in + result.hash_keys.into_iter().zip(result.frames) + { + combined.push((seq, hash_keys, frame)); + } } - table.insert_key_chunk(hash_keys, track_unmatchable); - combined_frames.push(frame); - chunk_seq_ids.push(seq); - } - } else { - for result in results { - for (hash_keys, (_, frame)) in - result.hash_keys.into_iter().zip(result.frames) - { + combined.sort_unstable_by_key(|c| c.0); + for (seq, hash_keys, frame) in combined { // Zero-sized chunks can get deleted, so skip entirely to avoid messing // up the chunk counter. if frame.height() == 0 { @@ -267,31 +254,47 @@ impl BuildState { table.insert_key_chunk(hash_keys, track_unmatchable); combined_frames.push(frame); + chunk_seq_ids.push(seq); + } + } else { + for result in results { + for (hash_keys, (_, frame)) in + result.hash_keys.into_iter().zip(result.frames) + { + // Zero-sized chunks can get deleted, so skip entirely to avoid messing + // up the chunk counter. + if frame.height() == 0 { + continue; + } + + table.insert_key_chunk(hash_keys, track_unmatchable); + combined_frames.push(frame); + } } } - } - let df = if combined_frames.is_empty() { - if params.left_is_build { - DataFrame::empty_with_schema(¶ms.left_payload_schema) + let df = if combined_frames.is_empty() { + if params.left_is_build { + DataFrame::empty_with_schema(¶ms.left_payload_schema) + } else { + DataFrame::empty_with_schema(¶ms.right_payload_schema) + } } else { - DataFrame::empty_with_schema(¶ms.right_payload_schema) + accumulate_dataframes_vertical_unchecked(combined_frames) + }; + ProbeTable { + table, + df, + chunk_seq_ids, } - } else { - accumulate_dataframes_vertical_unchecked(combined_frames) - }; - ProbeTable { - table, - df, - chunk_seq_ids, - } - }) - .collect(); + }) + .collect(); - ProbeState { - table_per_partition, - max_seq_sent: MorselSeq::default(), - } + ProbeState { + table_per_partition, + max_seq_sent: MorselSeq::default(), + } + }) } } @@ -542,8 +545,10 @@ impl ProbeState { impl Drop for ProbeState { fn drop(&mut self) { - // Parallel drop as the state might be quite big. - self.table_per_partition.par_drain(..).for_each(drop); + POOL.install(|| { + // Parallel drop as the state might be quite big. + self.table_per_partition.par_drain(..).for_each(drop); + }) } } diff --git a/crates/polars-stream/src/utils/in_memory_linearize.rs b/crates/polars-stream/src/utils/in_memory_linearize.rs index 2cf1159cd286..ec341b93dde7 100644 --- a/crates/polars-stream/src/utils/in_memory_linearize.rs +++ b/crates/polars-stream/src/utils/in_memory_linearize.rs @@ -35,7 +35,7 @@ pub fn linearize(mut morsels_per_pipe: Vec>) -> Vec { let morsels_per_p = &morsels_per_pipe; let mut dataframes: Vec = Vec::with_capacity(num_morsels); let dataframes_ptr = unsafe { SyncPtr::new(dataframes.as_mut_ptr()) }; - rayon::scope(|s| { + POOL.scope(|s| { let mut out_offset = 0; let mut stop_idx_per_pipe = vec![0; morsels_per_p.len()]; for t in 0..n_threads {