Skip to content

Commit

Permalink
refactor: Simplify the phase handling of new streaming sinks (pola-rs…
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored and anath2 committed Mar 5, 2025
1 parent 0cd015b commit 1467491
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 395 deletions.
82 changes: 40 additions & 42 deletions crates/polars-stream/src/nodes/io_sinks/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use polars_utils::priority::Priority;

use super::{SinkNode, SinkRecvPort};
use crate::async_executor::spawn;
use crate::async_primitives::linearizer::Linearizer;
use crate::nodes::io_sinks::DEFAULT_SINK_LINEARIZER_BUFFER_SIZE;
use crate::nodes::{JoinHandle, MorselSeq, TaskPriority};

type Linearized = Priority<Reverse<MorselSeq>, Vec<u8>>;
Expand Down Expand Up @@ -40,67 +42,66 @@ impl SinkNode for CsvSinkNode {

fn spawn_sink(
&mut self,
_num_pipelines: usize,
num_pipelines: usize,
recv_ports_recv: SinkRecvPort,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
// .. -> Encode task
let (handle, recv_ports_recv, mut recv_linearizer) =
recv_ports_recv.parallel_into_linearize::<Linearized>();
join_handles.push(handle);
let rxs = recv_ports_recv.parallel(join_handles);
// Encode tasks -> IO task
let (mut lin_rx, lin_txs) =
Linearizer::<Linearized>::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE);

// 16MB
const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24;

// Encode task.
//
// Task encodes the columns into their corresponding CSV encoding.
for mut rx_receiver in recv_ports_recv {
join_handles.extend(rxs.into_iter().zip(lin_txs).map(|(mut rx, mut lin_tx)| {
let schema = self.schema.clone();
let options = self.write_options.clone();

join_handles.push(spawn(TaskPriority::High, async move {
spawn(TaskPriority::High, async move {
// Amortize the allocations over time. If we see that we need to do way larger
// allocations, we adjust to that over time.
let mut allocation_size = DEFAULT_ALLOCATION_SIZE;
let options = options.clone();

while let Ok((outcome, mut receiver, mut sender)) = rx_receiver.recv().await {
while let Ok(morsel) = receiver.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = CsvWriter::new(&mut buffer)
.include_bom(false) // Handled once in the IO task.
.include_header(false) // Handled once in the IO task.
.with_separator(options.serialize_options.separator)
.with_line_terminator(options.serialize_options.line_terminator.clone())
.with_quote_char(options.serialize_options.quote_char)
.with_datetime_format(options.serialize_options.datetime_format.clone())
.with_date_format(options.serialize_options.date_format.clone())
.with_time_format(options.serialize_options.time_format.clone())
.with_float_scientific(options.serialize_options.float_scientific)
.with_float_precision(options.serialize_options.float_precision)
.with_null_value(options.serialize_options.null.clone())
.with_quote_style(options.serialize_options.quote_style)
.n_threads(1) // Disable rayon parallelism
.batched(&schema)?;

writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());
sender.insert(Priority(Reverse(seq), buffer)).await.unwrap();
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
while let Ok(morsel) = rx.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = CsvWriter::new(&mut buffer)
.include_bom(false) // Handled once in the IO task.
.include_header(false) // Handled once in the IO task.
.with_separator(options.serialize_options.separator)
.with_line_terminator(options.serialize_options.line_terminator.clone())
.with_quote_char(options.serialize_options.quote_char)
.with_datetime_format(options.serialize_options.datetime_format.clone())
.with_date_format(options.serialize_options.date_format.clone())
.with_time_format(options.serialize_options.time_format.clone())
.with_float_scientific(options.serialize_options.float_scientific)
.with_float_precision(options.serialize_options.float_precision)
.with_null_value(options.serialize_options.null.clone())
.with_quote_style(options.serialize_options.quote_style)
.n_threads(1) // Disable rayon parallelism
.batched(&schema)?;

writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());
if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
return Ok(());
}

outcome.stopped();
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
}

PolarsResult::Ok(())
}));
}
})
}));

// IO task.
//
Expand Down Expand Up @@ -133,11 +134,8 @@ impl SinkNode for CsvSinkNode {
file = tokio::fs::File::from_std(std_file);
}

while let Ok((outcome, mut linearizer)) = recv_linearizer.recv().await {
while let Some(Priority(_, buffer)) = linearizer.get().await {
file.write_all(&buffer).await?;
}
outcome.stopped();
while let Some(Priority(_, buffer)) = lin_rx.get().await {
file.write_all(&buffer).await?;
}

PolarsResult::Ok(())
Expand Down
73 changes: 18 additions & 55 deletions crates/polars-stream/src/nodes/io_sinks/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use std::cmp::Reverse;
use std::io::BufWriter;
use std::path::PathBuf;

use polars_core::config;
use polars_core::frame::DataFrame;
use polars_core::prelude::CompatLevel;
use polars_core::schema::{SchemaExt, SchemaRef};
use polars_core::utils::arrow;
Expand All @@ -18,7 +16,10 @@ use polars_io::ipc::{IpcWriter, IpcWriterOptions};
use polars_io::SerWriter;
use polars_utils::priority::Priority;

use super::{SinkNode, DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE};
use super::{
buffer_and_distribute_columns_task, SinkNode, DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE,
DEFAULT_SINK_LINEARIZER_BUFFER_SIZE,
};
use crate::async_executor::spawn;
use crate::async_primitives::connector::connector;
use crate::async_primitives::distributor_channel::distributor_channel;
Expand Down Expand Up @@ -69,75 +70,37 @@ impl SinkNode for IpcSinkNode {
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
// .. -> Buffer task
let mut buffer_rx = recv_ports_recv.serial(join_handles);
let buffer_rx = recv_ports_recv.serial(join_handles);
// Buffer task -> Encode tasks
let (mut dist_tx, dist_rxs) =
let (dist_tx, dist_rxs) =
distributor_channel(num_pipelines, DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE);
// Encode tasks -> Collect task
let (mut lin_rx, lin_txs) =
Linearizer::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE);
// Collect task -> IO task
let (mut io_task_tx, mut io_task_rx) = connector::<(Vec<EncodedData>, EncodedData)>();
let (mut io_tx, mut io_rx) = connector::<(Vec<EncodedData>, EncodedData)>();

let options = WriteOptions {
compression: self.write_options.compression.map(Into::into),
};

let path = self.path.clone();
let input_schema = self.input_schema.clone();
let compat_level = self.compat_level;
let chunk_size = self.chunk_size;

let ipc_fields = input_schema
let ipc_fields = self
.input_schema
.iter_fields()
.map(|f| f.to_arrow(compat_level))
.collect::<Vec<_>>();
let ipc_fields = default_ipc_fields(ipc_fields.iter());

// Buffer task.
//
// This task linearizes and buffers morsels until a given a maximum chunk size is reached
// and then sends the whole record batch to be encoded and written.
join_handles.push(spawn(TaskPriority::High, async move {
let mut seq = 0usize;
let mut buffer = DataFrame::empty_with_schema(input_schema.as_ref());

while let Ok(morsel) = buffer_rx.recv().await {
let (df, _, _, consume_token) = morsel.into_inner();
// @NOTE: This also performs schema validation.
buffer.vstack_mut(&df)?;

while buffer.height() >= chunk_size {
let df;
(df, buffer) = buffer.split_at(buffer.height().min(chunk_size) as i64);

for (i, column) in df.take_columns().into_iter().enumerate() {
if dist_tx.send((seq, i, column)).await.is_err() {
return Ok(());
}
}
seq += 1;
}
drop(consume_token); // Increase the backpressure. Only free up a pipeline when the
// morsel has started encoding in its entirety. This still
// allows for parallelism of Morsels, but prevents large
// bunches of Morsels from stacking up here.
}

if config::verbose() {
eprintln!("[ipc_sink]: Flushing last chunk for '{}'", path.display());
}

// Flush the remaining rows.
assert!(buffer.height() <= chunk_size);
for (i, column) in buffer.take_columns().into_iter().enumerate() {
if dist_tx.send((seq, i, column)).await.is_err() {
return Ok(());
}
}

PolarsResult::Ok(())
}));
join_handles.push(buffer_and_distribute_columns_task(
buffer_rx,
dist_tx,
chunk_size,
self.input_schema.clone(),
));

// Encoding tasks.
//
Expand Down Expand Up @@ -202,7 +165,7 @@ impl SinkNode for IpcSinkNode {
//
// Collects all the encoded data and packs it together for the IO task to write it.
let input_schema = self.input_schema.clone();
join_handles.push(spawn(TaskPriority::Low, async move {
join_handles.push(spawn(TaskPriority::High, async move {
let mut dictionary_tracker = DictionaryTracker {
dictionaries: Default::default(),
cannot_replace: false,
Expand Down Expand Up @@ -308,7 +271,7 @@ impl SinkNode for IpcSinkNode {
&mut encoded_data,
);

if io_task_tx
if io_tx
.send((
std::mem::take(&mut current.encoded_dictionaries),
encoded_data,
Expand Down Expand Up @@ -346,7 +309,7 @@ impl SinkNode for IpcSinkNode {
.with_parallel(false)
.batched(&input_schema)?;

while let Ok((dicts, record_batch)) = io_task_rx.recv().await {
while let Ok((dicts, record_batch)) = io_rx.recv().await {
// @TODO: At the moment this is a sync write, this is not ideal because we can only
// have so many blocking threads in the tokio threadpool.
writer.write_encoded(dicts.as_slice(), &record_batch)?;
Expand Down
43 changes: 21 additions & 22 deletions crates/polars-stream/src/nodes/io_sinks/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use polars_utils::priority::Priority;

use super::{SinkNode, SinkRecvPort};
use crate::async_executor::spawn;
use crate::async_primitives::linearizer::Linearizer;
use crate::nodes::io_sinks::DEFAULT_SINK_LINEARIZER_BUFFER_SIZE;
use crate::nodes::{JoinHandle, MorselSeq, TaskPriority};

type Linearized = Priority<Reverse<MorselSeq>, Vec<u8>>;
Expand All @@ -31,43 +33,43 @@ impl SinkNode for NDJsonSinkNode {

fn spawn_sink(
&mut self,
_num_pipelines: usize,
num_pipelines: usize,
recv_ports_recv: SinkRecvPort,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
let (handle, rx_receivers, mut rx_linearizer) =
recv_ports_recv.parallel_into_linearize::<Linearized>();
join_handles.push(handle);
// .. -> Encode task
let rxs = recv_ports_recv.parallel(join_handles);
// Encode tasks -> IO task
let (mut lin_rx, lin_txs) =
Linearizer::<Linearized>::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE);

// 16MB
const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24;

// Encode task.
//
// Task encodes the columns into their corresponding JSON encoding.
join_handles.extend(rx_receivers.into_iter().map(|mut rx_receiver| {
join_handles.extend(rxs.into_iter().zip(lin_txs).map(|(mut rx, mut lin_tx)| {
spawn(TaskPriority::High, async move {
// Amortize the allocations over time. If we see that we need to do way larger
// allocations, we adjust to that over time.
let mut allocation_size = DEFAULT_ALLOCATION_SIZE;

while let Ok((outcome, mut rx, mut sender)) = rx_receiver.recv().await {
while let Ok(morsel) = rx.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();
while let Ok(morsel) = rx.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = BatchedWriter::new(&mut buffer);
let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = BatchedWriter::new(&mut buffer);

writer.write_batch(&df)?;
writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());
sender.insert(Priority(Reverse(seq), buffer)).await.unwrap();
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
allocation_size = allocation_size.max(buffer.len());
if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
return Ok(());
}

outcome.stopped();
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
}

PolarsResult::Ok(())
Expand All @@ -91,11 +93,8 @@ impl SinkNode for NDJsonSinkNode {
.await
.map_err(|err| polars_utils::_limit_path_len_io_err(path.as_path(), err))?;

while let Ok((outcome, mut linearizer)) = rx_linearizer.recv().await {
while let Some(Priority(_, buffer)) = linearizer.get().await {
file.write_all(&buffer).await?;
}
outcome.stopped();
while let Some(Priority(_, buffer)) = lin_rx.get().await {
file.write_all(&buffer).await?;
}

PolarsResult::Ok(())
Expand Down
Loading

0 comments on commit 1467491

Please sign in to comment.