From eddb7644d02021f2a6a6d09a408c3d373f96cc04 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Fri, 28 Feb 2025 15:19:45 +0100 Subject: [PATCH 1/3] refactor: Improve new streaming sinks more --- .../polars-stream/src/nodes/io_sinks/ipc.rs | 73 ++--- .../polars-stream/src/nodes/io_sinks/mod.rs | 127 +++----- .../src/nodes/io_sinks/parquet.rs | 282 ++++++++---------- 3 files changed, 189 insertions(+), 293 deletions(-) diff --git a/crates/polars-stream/src/nodes/io_sinks/ipc.rs b/crates/polars-stream/src/nodes/io_sinks/ipc.rs index fa8422cb9e82..431085f73ab7 100644 --- a/crates/polars-stream/src/nodes/io_sinks/ipc.rs +++ b/crates/polars-stream/src/nodes/io_sinks/ipc.rs @@ -2,8 +2,6 @@ use std::cmp::Reverse; use std::io::BufWriter; use std::path::PathBuf; -use polars_core::config; -use polars_core::frame::DataFrame; use polars_core::prelude::CompatLevel; use polars_core::schema::{SchemaExt, SchemaRef}; use polars_core::utils::arrow; @@ -18,7 +16,10 @@ use polars_io::ipc::{IpcWriter, IpcWriterOptions}; use polars_io::SerWriter; use polars_utils::priority::Priority; -use super::{SinkNode, DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE}; +use super::{ + buffer_and_distribute_columns_task, SinkNode, DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE, + DEFAULT_SINK_LINEARIZER_BUFFER_SIZE, +}; use crate::async_executor::spawn; use crate::async_primitives::connector::connector; use crate::async_primitives::distributor_channel::distributor_channel; @@ -69,75 +70,37 @@ impl SinkNode for IpcSinkNode { join_handles: &mut Vec>>, ) { // .. -> Buffer task - let mut buffer_rx = recv_ports_recv.serial(join_handles); + let buffer_rx = recv_ports_recv.serial(join_handles); // Buffer task -> Encode tasks - let (mut dist_tx, dist_rxs) = + let (dist_tx, dist_rxs) = distributor_channel(num_pipelines, DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE); // Encode tasks -> Collect task let (mut lin_rx, lin_txs) = Linearizer::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE); // Collect task -> IO task - let (mut io_task_tx, mut io_task_rx) = connector::<(Vec, EncodedData)>(); + let (mut io_tx, mut io_rx) = connector::<(Vec, EncodedData)>(); let options = WriteOptions { compression: self.write_options.compression.map(Into::into), }; - let path = self.path.clone(); - let input_schema = self.input_schema.clone(); let compat_level = self.compat_level; let chunk_size = self.chunk_size; - let ipc_fields = input_schema + let ipc_fields = self + .input_schema .iter_fields() .map(|f| f.to_arrow(compat_level)) .collect::>(); let ipc_fields = default_ipc_fields(ipc_fields.iter()); // Buffer task. - // - // This task linearizes and buffers morsels until a given a maximum chunk size is reached - // and then sends the whole record batch to be encoded and written. - join_handles.push(spawn(TaskPriority::High, async move { - let mut seq = 0usize; - let mut buffer = DataFrame::empty_with_schema(input_schema.as_ref()); - - while let Ok(morsel) = buffer_rx.recv().await { - let (df, _, _, consume_token) = morsel.into_inner(); - // @NOTE: This also performs schema validation. - buffer.vstack_mut(&df)?; - - while buffer.height() >= chunk_size { - let df; - (df, buffer) = buffer.split_at(buffer.height().min(chunk_size) as i64); - - for (i, column) in df.take_columns().into_iter().enumerate() { - if dist_tx.send((seq, i, column)).await.is_err() { - return Ok(()); - } - } - seq += 1; - } - drop(consume_token); // Increase the backpressure. Only free up a pipeline when the - // morsel has started encoding in its entirety. This still - // allows for parallelism of Morsels, but prevents large - // bunches of Morsels from stacking up here. - } - - if config::verbose() { - eprintln!("[ipc_sink]: Flushing last chunk for '{}'", path.display()); - } - - // Flush the remaining rows. - assert!(buffer.height() <= chunk_size); - for (i, column) in buffer.take_columns().into_iter().enumerate() { - if dist_tx.send((seq, i, column)).await.is_err() { - return Ok(()); - } - } - - PolarsResult::Ok(()) - })); + join_handles.push(buffer_and_distribute_columns_task( + buffer_rx, + dist_tx, + chunk_size, + self.input_schema.clone(), + )); // Encoding tasks. // @@ -202,7 +165,7 @@ impl SinkNode for IpcSinkNode { // // Collects all the encoded data and packs it together for the IO task to write it. let input_schema = self.input_schema.clone(); - join_handles.push(spawn(TaskPriority::Low, async move { + join_handles.push(spawn(TaskPriority::High, async move { let mut dictionary_tracker = DictionaryTracker { dictionaries: Default::default(), cannot_replace: false, @@ -308,7 +271,7 @@ impl SinkNode for IpcSinkNode { &mut encoded_data, ); - if io_task_tx + if io_tx .send(( std::mem::take(&mut current.encoded_dictionaries), encoded_data, @@ -346,7 +309,7 @@ impl SinkNode for IpcSinkNode { .with_parallel(false) .batched(&input_schema)?; - while let Ok((dicts, record_batch)) = io_task_rx.recv().await { + while let Ok((dicts, record_batch)) = io_rx.recv().await { // @TODO: At the moment this is a sync write, this is not ideal because we can only // have so many blocking threads in the tokio threadpool. writer.write_encoded(dicts.as_slice(), &record_batch)?; diff --git a/crates/polars-stream/src/nodes/io_sinks/mod.rs b/crates/polars-stream/src/nodes/io_sinks/mod.rs index 009f2f940c52..02d3bdfe793b 100644 --- a/crates/polars-stream/src/nodes/io_sinks/mod.rs +++ b/crates/polars-stream/src/nodes/io_sinks/mod.rs @@ -1,6 +1,9 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use polars_core::config; +use polars_core::frame::DataFrame; +use polars_core::prelude::Column; +use polars_core::schema::SchemaRef; use polars_error::PolarsResult; use polars_expr::state::ExecutionState; @@ -13,7 +16,6 @@ use crate::async_primitives::distributor_channel; use crate::async_primitives::linearizer::{Inserter, Linearizer}; use crate::async_primitives::wait_group::WaitGroup; use crate::nodes::TaskPriority; -use crate::DEFAULT_LINEARIZER_BUFFER_SIZE; #[cfg(feature = "csv")] pub mod csv; @@ -133,97 +135,52 @@ impl SinkRecvPort { })); rx } +} - /// Receive the [`RecvPort`] serially that distributes amongst workers then [`Linearize`] again - /// to the end. - /// - /// This is useful for sinks that process incoming [`Morsel`]s column-wise as the processing - /// of the columns can be done in parallel. - #[allow(clippy::type_complexity)] - pub fn serial_into_distribute( - mut self, - ) -> ( - JoinHandle>, - Receiver<( - PhaseOutcome, - Option>, - distributor_channel::Sender, - )>, - Vec, Inserter)>>, - Receiver<(PhaseOutcome, Linearizer)>, - ) - where - D: Send + Sync + 'static, - L: Send + Sync + Ord + 'static, - { - let (mut tx_linearizer, rx_linearizer) = connector(); - let (mut rx_senders, rx_receivers) = (0..self.num_pipelines) - .map(|_| connector()) - .collect::<(Vec<_>, Vec<_>)>(); - let (mut tx_end, rx_end) = connector(); - let handle = spawn(TaskPriority::High, async move { - let mut outcomes = Vec::with_capacity(self.num_pipelines + 2); - let wg = WaitGroup::default(); - - let mut stop = false; - while !stop { - let input = self.recv.recv().await; - stop |= input.is_err(); // We want to send one last message without receiver when - // the channel is dropped. This allows us to flush buffers. - let (phase_outcome, receiver) = match input { - Ok((outcome, port)) => (Some(outcome), Some(port.serial())), - Err(()) => (None, None), - }; - - let (dist_tx, dist_rxs) = distributor_channel::distributor_channel::( - self.num_pipelines, - DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE, - ); - let (linearizer, senders) = - Linearizer::::new(self.num_pipelines, DEFAULT_LINEARIZER_BUFFER_SIZE); - - let (token, outcome) = PhaseOutcome::new_shared_wait(wg.token()); - if tx_linearizer - .send((outcome, receiver, dist_tx)) - .await - .is_err() - { - return Ok(()); - } - outcomes.push(token); - for ((dist_rx, rx_sender), sender) in - dist_rxs.into_iter().zip(rx_senders.iter_mut()).zip(senders) - { - let (token, outcome) = PhaseOutcome::new_shared_wait(wg.token()); - if rx_sender.send((outcome, dist_rx, sender)).await.is_err() { - return Ok(()); - } - outcomes.push(token); - } - let (token, outcome) = PhaseOutcome::new_shared_wait(wg.token()); - if tx_end.send((outcome, linearizer)).await.is_err() { - return Ok(()); - } - outcomes.push(token); - - wg.wait().await; - for outcome in &outcomes { - if outcome.did_finish() { +/// Spawn a task that linearizes and buffers morsels until a given a maximum chunk size is reached +/// and then distributes the columns amongst worker tasks. +fn buffer_and_distribute_columns_task( + mut rx: Receiver, + mut dist_tx: distributor_channel::Sender<(usize, usize, Column)>, + chunk_size: usize, + schema: SchemaRef, +) -> JoinHandle> { + spawn(TaskPriority::High, async move { + let mut seq = 0usize; + let mut buffer = DataFrame::empty_with_schema(schema.as_ref()); + + while let Ok(morsel) = rx.recv().await { + let (df, _, _, consume_token) = morsel.into_inner(); + // @NOTE: This also performs schema validation. + buffer.vstack_mut(&df)?; + + while buffer.height() >= chunk_size { + let df; + (df, buffer) = buffer.split_at(buffer.height().min(chunk_size) as i64); + + for (i, column) in df.take_columns().into_iter().enumerate() { + if dist_tx.send((seq, i, column)).await.is_err() { return Ok(()); } } - - if let Some(outcome) = phase_outcome { - outcome.stopped() - } - outcomes.clear(); + seq += 1; } + drop(consume_token); // Increase the backpressure. Only free up a pipeline when the + // morsel has started encoding in its entirety. This still + // allows for parallelism of Morsels, but prevents large + // bunches of Morsels from stacking up here. + } - Ok(()) - }); + // Flush the remaining rows. + assert!(buffer.height() <= chunk_size); + for (i, column) in buffer.take_columns().into_iter().enumerate() { + if dist_tx.send((seq, i, column)).await.is_err() { + return Ok(()); + } + } - (handle, rx_linearizer, rx_receivers, rx_end) - } + PolarsResult::Ok(()) + }) } pub trait SinkNode { diff --git a/crates/polars-stream/src/nodes/io_sinks/parquet.rs b/crates/polars-stream/src/nodes/io_sinks/parquet.rs index 73e27e58f29c..45e9e88a9405 100644 --- a/crates/polars-stream/src/nodes/io_sinks/parquet.rs +++ b/crates/polars-stream/src/nodes/io_sinks/parquet.rs @@ -3,8 +3,7 @@ use std::io::BufWriter; use std::path::{Path, PathBuf}; use std::sync::Mutex; -use polars_core::frame::DataFrame; -use polars_core::prelude::{ArrowSchema, Column, CompatLevel}; +use polars_core::prelude::{ArrowSchema, CompatLevel}; use polars_core::schema::SchemaRef; use polars_error::PolarsResult; use polars_expr::state::ExecutionState; @@ -19,11 +18,16 @@ use polars_parquet::write::{ }; use polars_utils::priority::Priority; -use super::{SinkNode, SinkRecvPort}; +use super::{ + buffer_and_distribute_columns_task, SinkNode, SinkRecvPort, + DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE, +}; use crate::async_executor::spawn; +use crate::async_primitives::connector::connector; +use crate::async_primitives::distributor_channel::distributor_channel; +use crate::async_primitives::linearizer::Linearizer; use crate::nodes::{JoinHandle, TaskPriority}; -type Linearized = Priority, Vec>>; pub struct ParquetSinkNode { path: PathBuf, @@ -72,16 +76,22 @@ impl SinkNode for ParquetSinkNode { fn spawn_sink( &mut self, - _num_pipelines: usize, + num_pipelines: usize, recv_ports_recv: SinkRecvPort, _state: &ExecutionState, join_handles: &mut Vec>>, ) { - let (handle, mut buffer_rx, worker_rxs, mut io_rx) = - recv_ports_recv.serial_into_distribute::<(usize, usize, Column), Linearized>(); - join_handles.push(handle); + // .. -> Buffer task + let buffer_rx = recv_ports_recv.serial(join_handles); + // Buffer task -> Encode tasks + let (dist_tx, dist_rxs) = + distributor_channel(num_pipelines, DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE); + // Encode tasks -> Collect task + let (mut lin_rx, lin_txs) = + Linearizer::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE); + // Collect task -> IO task + let (mut io_tx, mut io_rx) = connector::>>(); - let input_schema = self.input_schema.clone(); let write_options = self.write_options; let options = WriteOptions { @@ -92,138 +102,123 @@ impl SinkNode for ParquetSinkNode { }; // Buffer task. - // - // This task linearizes and buffers morsels until a given a maximum chunk size is reached - // and then sends the whole record batch to be encoded and written. - join_handles.push(spawn(TaskPriority::High, async move { - let mut buffer = DataFrame::empty_with_schema(input_schema.as_ref()); - let row_group_size = write_options + join_handles.push(buffer_and_distribute_columns_task( + buffer_rx, + dist_tx, + write_options .row_group_size - .unwrap_or(DEFAULT_ROW_GROUP_SIZE) - .max(1); - let mut row_group_index = 0; - - while let Ok((outcome, receiver, mut sender)) = buffer_rx.recv().await { - match receiver { - None => { - // Flush the remaining rows. - while buffer.height() > 0 { - let row_group; - - (row_group, buffer) = - buffer.split_at(row_group_size.min(buffer.height()) as i64); - for (column_idx, column) in - row_group.take_columns().into_iter().enumerate() + .unwrap_or(DEFAULT_ROW_GROUP_SIZE), + self.input_schema.clone(), + )); + + // Encode task. + // + // Task encodes the columns into their corresponding Parquet encoding. + join_handles.extend( + dist_rxs + .into_iter() + .zip(lin_txs) + .map(|(mut dist_rx, mut lin_tx)| { + let parquet_schema = self.parquet_schema.clone(); + let encodings = self.encodings.clone(); + + spawn(TaskPriority::High, async move { + while let Ok((rg_idx, col_idx, column)) = dist_rx.recv().await { + let type_ = &parquet_schema.fields()[col_idx]; + let encodings = &encodings[col_idx]; + + let array = column.as_materialized_series().rechunk(); + let array = array.to_arrow(0, CompatLevel::newest()); + + // @TODO: This causes all structs fields to be handled on a single thread. It + // would be preferable to split the encoding among multiple threads. + + // @NOTE: Since one Polars column might contain multiple Parquet columns (when + // it has a struct datatype), we return a Vec>. + + // Array -> Parquet pages. + let encoded_columns = + array_to_columns(array, type_.clone(), options, encodings)?; + + // Compress the pages. + let compressed_pages = encoded_columns + .into_iter() + .map(|encoded_pages| { + Compressor::new_from_vec( + encoded_pages.map(|result| { + result.map_err(|e| { + ParquetError::FeatureNotSupported(format!( + "reraised in polars: {e}", + )) + }) + }), + options.compression, + vec![], + ) + .collect::>>() + }) + .collect::>>()?; + + if lin_tx + .insert(Priority(Reverse(rg_idx), (col_idx, compressed_pages))) + .await + .is_err() { - if sender - .send((row_group_index, column_idx, column)) - .await - .is_err() - { - return Ok(()); - } + return Ok(()); } - row_group_index += 1; } - }, - Some(mut receiver) => { - while let Ok(morsel) = receiver.recv().await { - let (df, _, _, consume_token) = morsel.into_inner(); - // @NOTE: This also performs schema validation. - buffer.vstack_mut(&df)?; - - while buffer.height() >= row_group_size { - let row_group; - - (row_group, buffer) = - buffer.split_at(row_group_size.min(buffer.height()) as i64); - - for (column_idx, column) in - row_group.take_columns().into_iter().enumerate() - { - if sender - .send((row_group_index, column_idx, column)) - .await - .is_err() - { - return Ok(()); - } - } - - row_group_index += 1; - } - drop(consume_token); // Keep the consume_token until here to increase - // the backpressure. - } - }, - } - outcome.stopped(); + PolarsResult::Ok(()) + }) + }), + ); + + // Collect Task. + // + // Collects all the encoded data and packs it together for the IO task to write it. + let input_schema = self.input_schema.clone(); + let num_parquet_columns = self.parquet_schema.leaves().len(); + join_handles.push(spawn(TaskPriority::High, async move { + struct Current { + seq: usize, + num_columns_seen: usize, + columns: Vec>>>, } - PolarsResult::Ok(()) - })); + let mut current = Current { + seq: 0, + num_columns_seen: 0, + columns: (0..input_schema.len()).map(|_| None).collect(), + }; - // Encode task. - // - // Task encodes the columns into their corresponding Parquet encoding. - for mut worker_rx in worker_rxs { - let parquet_schema = self.parquet_schema.clone(); - let encodings = self.encodings.clone(); - - join_handles.push(spawn(TaskPriority::High, async move { - while let Ok((outcome, mut dist_rx, mut lin_tx)) = worker_rx.recv().await { - while let Ok((rg_idx, col_idx, column)) = dist_rx.recv().await { - let type_ = &parquet_schema.fields()[col_idx]; - let encodings = &encodings[col_idx]; - - let array = column.as_materialized_series().rechunk(); - let array = array.to_arrow(0, CompatLevel::newest()); - - // @TODO: This causes all structs fields to be handled on a single thread. It - // would be preferable to split the encoding among multiple threads. - - // @NOTE: Since one Polars column might contain multiple Parquet columns (when - // it has a struct datatype), we return a Vec>. - - // Array -> Parquet pages. - let encoded_columns = - array_to_columns(array, type_.clone(), options, encodings)?; - - // Compress the pages. - let compressed_pages = encoded_columns - .into_iter() - .map(|encoded_pages| { - Compressor::new_from_vec( - encoded_pages.map(|result| { - result.map_err(|e| { - ParquetError::FeatureNotSupported(format!( - "reraised in polars: {e}", - )) - }) - }), - options.compression, - vec![], - ) - .collect::>>() - }) - .collect::>>()?; - - if lin_tx - .insert(Priority(Reverse((rg_idx, col_idx)), compressed_pages)) - .await - .is_err() - { - return Ok(()); - } + // Linearize from all the Encoder tasks. + while let Some(Priority(Reverse(seq), (i, compressed_pages))) = lin_rx.get().await { + if current.num_columns_seen == 0 { + current.seq = seq; + } + + debug_assert_eq!(current.seq, seq); + debug_assert!(current.columns[i].is_none()); + current.columns[i] = Some(compressed_pages); + current.num_columns_seen += 1; + + if current.num_columns_seen == input_schema.len() { + // @Optimize: Keep track of these sizes so we can correctly preallocate + // them. + let mut current_row_group: Vec> = Vec::with_capacity(num_parquet_columns); + for column in current.columns.iter_mut() { + current_row_group.extend(column.take().unwrap()); } - outcome.stopped(); + if io_tx.send(current_row_group).await.is_err() { + return Ok(()); + } + current.num_columns_seen = 0; } + } - PolarsResult::Ok(()) - })); - } + Ok(()) + })); // IO task. // @@ -231,7 +226,6 @@ impl SinkNode for ParquetSinkNode { // spawned once. let path = self.path.clone(); let write_options = self.write_options; - let input_schema = self.input_schema.clone(); let arrow_schema = self.arrow_schema.clone(); let parquet_schema = self.parquet_schema.clone(); let encodings = self.encodings.clone(); @@ -262,29 +256,11 @@ impl SinkNode for ParquetSinkNode { let mut writer = BatchedWriter::new(file_writer, encodings, options, false); let num_parquet_columns = writer.parquet_schema().leaves().len(); - let mut current_row_group = Vec::with_capacity(num_parquet_columns); - - while let Ok((outcome, mut lin_rx)) = io_rx.recv().await { - // Linearize from all the Encoder tasks. - while let Some(Priority(Reverse((_, col_idx)), compressed_pages)) = - lin_rx.get().await - { - assert!(col_idx < input_schema.len()); - current_row_group.extend(compressed_pages); - - // Only if it is the last column of the row group, write the row group to the file. - if current_row_group.len() < num_parquet_columns { - continue; - } - - // @TODO: At the moment this is a sync write, this is not ideal because we can only - // have so many blocking threads in the tokio threadpool. - assert_eq!(current_row_group.len(), num_parquet_columns); - writer.write_row_group(¤t_row_group)?; - current_row_group.clear(); - } - - outcome.stopped(); + while let Ok(current_row_group) = io_rx.recv().await { + // @TODO: At the moment this is a sync write, this is not ideal because we can only + // have so many blocking threads in the tokio threadpool. + assert_eq!(current_row_group.len(), num_parquet_columns); + writer.write_row_group(¤t_row_group)?; } writer.finish()?; From e614a67fd8a83ef2506a37802db958812050cc5d Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Fri, 28 Feb 2025 15:40:14 +0100 Subject: [PATCH 2/3] refactor: simplify row-wise new streaming sinks --- .../polars-stream/src/nodes/io_sinks/csv.rs | 82 +++++++++---------- .../polars-stream/src/nodes/io_sinks/json.rs | 43 +++++----- .../polars-stream/src/nodes/io_sinks/mod.rs | 71 ++++++++-------- 3 files changed, 94 insertions(+), 102 deletions(-) diff --git a/crates/polars-stream/src/nodes/io_sinks/csv.rs b/crates/polars-stream/src/nodes/io_sinks/csv.rs index a6ca6ac6baff..23053f2e1d20 100644 --- a/crates/polars-stream/src/nodes/io_sinks/csv.rs +++ b/crates/polars-stream/src/nodes/io_sinks/csv.rs @@ -11,6 +11,8 @@ use polars_utils::priority::Priority; use super::{SinkNode, SinkRecvPort}; use crate::async_executor::spawn; +use crate::async_primitives::linearizer::Linearizer; +use crate::nodes::io_sinks::DEFAULT_SINK_LINEARIZER_BUFFER_SIZE; use crate::nodes::{JoinHandle, MorselSeq, TaskPriority}; type Linearized = Priority, Vec>; @@ -40,15 +42,16 @@ impl SinkNode for CsvSinkNode { fn spawn_sink( &mut self, - _num_pipelines: usize, + num_pipelines: usize, recv_ports_recv: SinkRecvPort, _state: &ExecutionState, join_handles: &mut Vec>>, ) { // .. -> Encode task - let (handle, recv_ports_recv, mut recv_linearizer) = - recv_ports_recv.parallel_into_linearize::(); - join_handles.push(handle); + let rxs = recv_ports_recv.parallel(join_handles); + // Encode tasks -> IO task + let (mut lin_rx, lin_txs) = + Linearizer::::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE); // 16MB const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24; @@ -56,51 +59,49 @@ impl SinkNode for CsvSinkNode { // Encode task. // // Task encodes the columns into their corresponding CSV encoding. - for mut rx_receiver in recv_ports_recv { + join_handles.extend(rxs.into_iter().zip(lin_txs).map(|(mut rx, mut lin_tx)| { let schema = self.schema.clone(); let options = self.write_options.clone(); - join_handles.push(spawn(TaskPriority::High, async move { + spawn(TaskPriority::High, async move { // Amortize the allocations over time. If we see that we need to do way larger // allocations, we adjust to that over time. let mut allocation_size = DEFAULT_ALLOCATION_SIZE; let options = options.clone(); - while let Ok((outcome, mut receiver, mut sender)) = rx_receiver.recv().await { - while let Ok(morsel) = receiver.recv().await { - let (df, seq, _, consume_token) = morsel.into_inner(); - - let mut buffer = Vec::with_capacity(allocation_size); - let mut writer = CsvWriter::new(&mut buffer) - .include_bom(false) // Handled once in the IO task. - .include_header(false) // Handled once in the IO task. - .with_separator(options.serialize_options.separator) - .with_line_terminator(options.serialize_options.line_terminator.clone()) - .with_quote_char(options.serialize_options.quote_char) - .with_datetime_format(options.serialize_options.datetime_format.clone()) - .with_date_format(options.serialize_options.date_format.clone()) - .with_time_format(options.serialize_options.time_format.clone()) - .with_float_scientific(options.serialize_options.float_scientific) - .with_float_precision(options.serialize_options.float_precision) - .with_null_value(options.serialize_options.null.clone()) - .with_quote_style(options.serialize_options.quote_style) - .n_threads(1) // Disable rayon parallelism - .batched(&schema)?; - - writer.write_batch(&df)?; - - allocation_size = allocation_size.max(buffer.len()); - sender.insert(Priority(Reverse(seq), buffer)).await.unwrap(); - drop(consume_token); // Keep the consume_token until here to increase the - // backpressure. + while let Ok(morsel) = rx.recv().await { + let (df, seq, _, consume_token) = morsel.into_inner(); + + let mut buffer = Vec::with_capacity(allocation_size); + let mut writer = CsvWriter::new(&mut buffer) + .include_bom(false) // Handled once in the IO task. + .include_header(false) // Handled once in the IO task. + .with_separator(options.serialize_options.separator) + .with_line_terminator(options.serialize_options.line_terminator.clone()) + .with_quote_char(options.serialize_options.quote_char) + .with_datetime_format(options.serialize_options.datetime_format.clone()) + .with_date_format(options.serialize_options.date_format.clone()) + .with_time_format(options.serialize_options.time_format.clone()) + .with_float_scientific(options.serialize_options.float_scientific) + .with_float_precision(options.serialize_options.float_precision) + .with_null_value(options.serialize_options.null.clone()) + .with_quote_style(options.serialize_options.quote_style) + .n_threads(1) // Disable rayon parallelism + .batched(&schema)?; + + writer.write_batch(&df)?; + + allocation_size = allocation_size.max(buffer.len()); + if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() { + return Ok(()); } - - outcome.stopped(); + drop(consume_token); // Keep the consume_token until here to increase the + // backpressure. } PolarsResult::Ok(()) - })); - } + }) + })); // IO task. // @@ -133,11 +134,8 @@ impl SinkNode for CsvSinkNode { file = tokio::fs::File::from_std(std_file); } - while let Ok((outcome, mut linearizer)) = recv_linearizer.recv().await { - while let Some(Priority(_, buffer)) = linearizer.get().await { - file.write_all(&buffer).await?; - } - outcome.stopped(); + while let Some(Priority(_, buffer)) = lin_rx.get().await { + file.write_all(&buffer).await?; } PolarsResult::Ok(()) diff --git a/crates/polars-stream/src/nodes/io_sinks/json.rs b/crates/polars-stream/src/nodes/io_sinks/json.rs index 938a13ea566b..e8d85611f211 100644 --- a/crates/polars-stream/src/nodes/io_sinks/json.rs +++ b/crates/polars-stream/src/nodes/io_sinks/json.rs @@ -8,6 +8,8 @@ use polars_utils::priority::Priority; use super::{SinkNode, SinkRecvPort}; use crate::async_executor::spawn; +use crate::async_primitives::linearizer::Linearizer; +use crate::nodes::io_sinks::DEFAULT_SINK_LINEARIZER_BUFFER_SIZE; use crate::nodes::{JoinHandle, MorselSeq, TaskPriority}; type Linearized = Priority, Vec>; @@ -31,14 +33,16 @@ impl SinkNode for NDJsonSinkNode { fn spawn_sink( &mut self, - _num_pipelines: usize, + num_pipelines: usize, recv_ports_recv: SinkRecvPort, _state: &ExecutionState, join_handles: &mut Vec>>, ) { - let (handle, rx_receivers, mut rx_linearizer) = - recv_ports_recv.parallel_into_linearize::(); - join_handles.push(handle); + // .. -> Encode task + let rxs = recv_ports_recv.parallel(join_handles); + // Encode tasks -> IO task + let (mut lin_rx, lin_txs) = + Linearizer::::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE); // 16MB const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24; @@ -46,28 +50,26 @@ impl SinkNode for NDJsonSinkNode { // Encode task. // // Task encodes the columns into their corresponding JSON encoding. - join_handles.extend(rx_receivers.into_iter().map(|mut rx_receiver| { + join_handles.extend(rxs.into_iter().zip(lin_txs).map(|(mut rx, mut lin_tx)| { spawn(TaskPriority::High, async move { // Amortize the allocations over time. If we see that we need to do way larger // allocations, we adjust to that over time. let mut allocation_size = DEFAULT_ALLOCATION_SIZE; - while let Ok((outcome, mut rx, mut sender)) = rx_receiver.recv().await { - while let Ok(morsel) = rx.recv().await { - let (df, seq, _, consume_token) = morsel.into_inner(); + while let Ok(morsel) = rx.recv().await { + let (df, seq, _, consume_token) = morsel.into_inner(); - let mut buffer = Vec::with_capacity(allocation_size); - let mut writer = BatchedWriter::new(&mut buffer); + let mut buffer = Vec::with_capacity(allocation_size); + let mut writer = BatchedWriter::new(&mut buffer); - writer.write_batch(&df)?; + writer.write_batch(&df)?; - allocation_size = allocation_size.max(buffer.len()); - sender.insert(Priority(Reverse(seq), buffer)).await.unwrap(); - drop(consume_token); // Keep the consume_token until here to increase the - // backpressure. + allocation_size = allocation_size.max(buffer.len()); + if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() { + return Ok(()); } - - outcome.stopped(); + drop(consume_token); // Keep the consume_token until here to increase the + // backpressure. } PolarsResult::Ok(()) @@ -91,11 +93,8 @@ impl SinkNode for NDJsonSinkNode { .await .map_err(|err| polars_utils::_limit_path_len_io_err(path.as_path(), err))?; - while let Ok((outcome, mut linearizer)) = rx_linearizer.recv().await { - while let Some(Priority(_, buffer)) = linearizer.get().await { - file.write_all(&buffer).await?; - } - outcome.stopped(); + while let Some(Priority(_, buffer)) = lin_rx.get().await { + file.write_all(&buffer).await?; } PolarsResult::Ok(()) diff --git a/crates/polars-stream/src/nodes/io_sinks/mod.rs b/crates/polars-stream/src/nodes/io_sinks/mod.rs index 02d3bdfe793b..b8e0c39f3b0b 100644 --- a/crates/polars-stream/src/nodes/io_sinks/mod.rs +++ b/crates/polars-stream/src/nodes/io_sinks/mod.rs @@ -7,13 +7,13 @@ use polars_core::schema::SchemaRef; use polars_error::PolarsResult; use polars_expr::state::ExecutionState; +use super::io_sources::PhaseOutcomeToken; use super::{ ComputeNode, JoinHandle, Morsel, PhaseOutcome, PortState, RecvPort, SendPort, TaskScope, }; use crate::async_executor::{spawn, AbortOnDropHandle}; use crate::async_primitives::connector::{connector, Receiver, Sender}; use crate::async_primitives::distributor_channel; -use crate::async_primitives::linearizer::{Inserter, Linearizer}; use crate::async_primitives::wait_group::WaitGroup; use crate::nodes::TaskPriority; @@ -57,62 +57,57 @@ impl SinkInputPort { } impl SinkRecvPort { - /// Receive the [`RecvPort`] in parallel and create a [`Linearizer`] for each phase. - /// - /// This is useful for sinks that process incoming [`Morsel`]s row-wise as the processing can - /// be done in parallel and then be linearized into the actual sink. - #[allow(clippy::type_complexity)] - pub fn parallel_into_linearize( + pub fn parallel( mut self, - ) -> ( - JoinHandle>, - Vec, Inserter)>>, - Receiver<(PhaseOutcome, Linearizer)>, - ) { - let (mut rx_senders, rx_receivers) = (0..self.num_pipelines) + join_handles: &mut Vec>>, + ) -> Vec> { + let (txs, rxs) = (0..self.num_pipelines) .map(|_| connector()) .collect::<(Vec<_>, Vec<_>)>(); - let (mut tx_linearizer, rx_linearizer) = connector(); - let handle = spawn(TaskPriority::High, async move { - let mut outcomes = Vec::with_capacity(self.num_pipelines + 1); - let wg = WaitGroup::default(); - - while let Ok((phase_outcome, port)) = self.recv.recv().await { - let inputs = port.parallel(); - - let (linearizer, senders) = - Linearizer::::new(self.num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE); + let (mut pass_txs, pass_rxs) = (0..self.num_pipelines) + .map(|_| connector()) + .collect::<(Vec<_>, Vec<_>)>(); + let mut outcomes = Vec::::with_capacity(self.num_pipelines); + let wg = WaitGroup::default(); - for ((input, rx_sender), sender) in - inputs.into_iter().zip(rx_senders.iter_mut()).zip(senders) - { + join_handles.push(spawn(TaskPriority::High, async move { + while let Ok((outcome, port_rxs)) = self.recv.recv().await { + let port_rxs = port_rxs.parallel(); + for (pass_tx, port_rx) in pass_txs.iter_mut().zip(port_rxs) { let (token, outcome) = PhaseOutcome::new_shared_wait(wg.token()); - if rx_sender.send((outcome, input, sender)).await.is_err() { + if pass_tx.send((outcome, port_rx)).await.is_err() { return Ok(()); } outcomes.push(token); } - let (token, outcome) = PhaseOutcome::new_shared_wait(wg.token()); - if tx_linearizer.send((outcome, linearizer)).await.is_err() { - return Ok(()); - } - outcomes.push(token); wg.wait().await; - for outcome in &outcomes { - if outcome.did_finish() { + for outcome_token in &outcomes { + if outcome_token.did_finish() { return Ok(()); } } - - phase_outcome.stopped(); outcomes.clear(); + outcome.stopped(); } Ok(()) - }); + })); + join_handles.extend(pass_rxs.into_iter().zip(txs).map(|(mut pass_rx, mut tx)| { + spawn(TaskPriority::High, async move { + while let Ok((outcome, mut rx)) = pass_rx.recv().await { + while let Ok(morsel) = rx.recv().await { + if tx.send(morsel).await.is_err() { + return Ok(()); + } + } + outcome.stopped(); + } + Ok(()) + }) + })); - (handle, rx_receivers, rx_linearizer) + rxs } /// Serialize the input and allow for long lived lasts to listen to a constant channel. From 34a479b7ccded055535c1ab211c5a748ccd67f8a Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Fri, 28 Feb 2025 15:40:57 +0100 Subject: [PATCH 3/3] fmt --- crates/polars-stream/src/nodes/io_sinks/parquet.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/polars-stream/src/nodes/io_sinks/parquet.rs b/crates/polars-stream/src/nodes/io_sinks/parquet.rs index 45e9e88a9405..f025b689e436 100644 --- a/crates/polars-stream/src/nodes/io_sinks/parquet.rs +++ b/crates/polars-stream/src/nodes/io_sinks/parquet.rs @@ -205,7 +205,8 @@ impl SinkNode for ParquetSinkNode { if current.num_columns_seen == input_schema.len() { // @Optimize: Keep track of these sizes so we can correctly preallocate // them. - let mut current_row_group: Vec> = Vec::with_capacity(num_parquet_columns); + let mut current_row_group: Vec> = + Vec::with_capacity(num_parquet_columns); for column in current.columns.iter_mut() { current_row_group.extend(column.take().unwrap()); }