diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs index 5759d72a485f..d5345edb9ac8 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs @@ -158,15 +158,45 @@ impl ParquetSourceNode { // Distributes morsels across pipelines. This does not perform any CPU or I/O bound work - // it is purely a dispatch loop. Run on the computational executor to reduce context switches. + let last_morsel_min_split = self.config.num_pipelines; let distribute_task = async_executor::spawn(TaskPriority::High, async move { let mut morsel_seq = MorselSeq::default(); - while let Some(decode_fut) = decode_recv.recv().await { + + // Decode first non-empty morsel. + let mut next = None; + loop { + let Some(decode_fut) = decode_recv.recv().await else { + break; + }; let df = decode_fut.await?; if df.is_empty() { continue; } + next = Some(df); + break; + } - for df in split_to_morsels(&df, ideal_morsel_size) { + while let Some(df) = next.take() { + // Try to decode the next non-empty morsel first, so we know + // whether the df is the last morsel. + loop { + let Some(decode_fut) = decode_recv.recv().await else { + break; + }; + let next_df = decode_fut.await?; + if next_df.is_empty() { + continue; + } + next = Some(next_df); + break; + } + + for df in split_to_morsels( + &df, + ideal_morsel_size, + next.is_none(), + last_morsel_min_split, + ) { if raw_morsel_sender.send((df, morsel_seq)).await.is_err() { return Ok(()); } @@ -325,23 +355,29 @@ fn filtered_range(exclude: &[usize], len: usize) -> Vec { .collect() } -/// Note: The 2nd return is an upper bound on the number of morsels rather than an exact count. fn split_to_morsels( df: &DataFrame, ideal_morsel_size: usize, + last_morsel: bool, + last_morsel_min_split: usize, ) -> impl Iterator + '_ { - let n_morsels = if df.height() > 3 * ideal_morsel_size / 2 { + let mut n_morsels = if df.height() > 3 * ideal_morsel_size / 2 { // num_rows > (1.5 * ideal_morsel_size) (df.height() / ideal_morsel_size).max(2) } else { 1 }; - let rows_per_morsel = 1 + df.height() / n_morsels; + if last_morsel { + n_morsels = n_morsels.max(last_morsel_min_split); + } + + let rows_per_morsel = df.height().div_ceil(n_morsels).max(1); (0..i64::try_from(df.height()).unwrap()) .step_by(rows_per_morsel) .map(move |offset| df.slice(offset, rows_per_morsel)) + .filter(|df| !df.is_empty()) } mod tests {