Skip to content

Commit

Permalink
perf: Split last rowgroup among all threads in new-streaming parquet …
Browse files Browse the repository at this point in the history
…reader (#21027)
  • Loading branch information
orlp authored Jan 31, 2025
1 parent 2df0404 commit c7888de
Showing 1 changed file with 41 additions and 5 deletions.
46 changes: 41 additions & 5 deletions crates/polars-stream/src/nodes/io_sources/parquet/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,45 @@ impl ParquetSourceNode {

// Distributes morsels across pipelines. This does not perform any CPU or I/O bound work -
// it is purely a dispatch loop. Run on the computational executor to reduce context switches.
let last_morsel_min_split = self.config.num_pipelines;
let distribute_task = async_executor::spawn(TaskPriority::High, async move {
let mut morsel_seq = MorselSeq::default();
while let Some(decode_fut) = decode_recv.recv().await {

// Decode first non-empty morsel.
let mut next = None;
loop {
let Some(decode_fut) = decode_recv.recv().await else {
break;
};
let df = decode_fut.await?;
if df.is_empty() {
continue;
}
next = Some(df);
break;
}

for df in split_to_morsels(&df, ideal_morsel_size) {
while let Some(df) = next.take() {
// Try to decode the next non-empty morsel first, so we know
// whether the df is the last morsel.
loop {
let Some(decode_fut) = decode_recv.recv().await else {
break;
};
let next_df = decode_fut.await?;
if next_df.is_empty() {
continue;
}
next = Some(next_df);
break;
}

for df in split_to_morsels(
&df,
ideal_morsel_size,
next.is_none(),
last_morsel_min_split,
) {
if raw_morsel_sender.send((df, morsel_seq)).await.is_err() {
return Ok(());
}
Expand Down Expand Up @@ -325,23 +355,29 @@ fn filtered_range(exclude: &[usize], len: usize) -> Vec<usize> {
.collect()
}

/// Note: The 2nd return is an upper bound on the number of morsels rather than an exact count.
fn split_to_morsels(
df: &DataFrame,
ideal_morsel_size: usize,
last_morsel: bool,
last_morsel_min_split: usize,
) -> impl Iterator<Item = DataFrame> + '_ {
let n_morsels = if df.height() > 3 * ideal_morsel_size / 2 {
let mut n_morsels = if df.height() > 3 * ideal_morsel_size / 2 {
// num_rows > (1.5 * ideal_morsel_size)
(df.height() / ideal_morsel_size).max(2)
} else {
1
};

let rows_per_morsel = 1 + df.height() / n_morsels;
if last_morsel {
n_morsels = n_morsels.max(last_morsel_min_split);
}

let rows_per_morsel = df.height().div_ceil(n_morsels).max(1);

(0..i64::try_from(df.height()).unwrap())
.step_by(rows_per_morsel)
.map(move |offset| df.slice(offset, rows_per_morsel))
.filter(|df| !df.is_empty())
}

mod tests {
Expand Down

0 comments on commit c7888de

Please sign in to comment.