Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Simplify the phase handling of new streaming sinks #21530

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 40 additions & 42 deletions crates/polars-stream/src/nodes/io_sinks/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use polars_utils::priority::Priority;

use super::{SinkNode, SinkRecvPort};
use crate::async_executor::spawn;
use crate::async_primitives::linearizer::Linearizer;
use crate::nodes::io_sinks::DEFAULT_SINK_LINEARIZER_BUFFER_SIZE;
use crate::nodes::{JoinHandle, MorselSeq, TaskPriority};

type Linearized = Priority<Reverse<MorselSeq>, Vec<u8>>;
Expand Down Expand Up @@ -40,67 +42,66 @@ impl SinkNode for CsvSinkNode {

fn spawn_sink(
&mut self,
_num_pipelines: usize,
num_pipelines: usize,
recv_ports_recv: SinkRecvPort,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
// .. -> Encode task
let (handle, recv_ports_recv, mut recv_linearizer) =
recv_ports_recv.parallel_into_linearize::<Linearized>();
join_handles.push(handle);
let rxs = recv_ports_recv.parallel(join_handles);
// Encode tasks -> IO task
let (mut lin_rx, lin_txs) =
Linearizer::<Linearized>::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE);

// 16MB
const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24;

// Encode task.
//
// Task encodes the columns into their corresponding CSV encoding.
for mut rx_receiver in recv_ports_recv {
join_handles.extend(rxs.into_iter().zip(lin_txs).map(|(mut rx, mut lin_tx)| {
let schema = self.schema.clone();
let options = self.write_options.clone();

join_handles.push(spawn(TaskPriority::High, async move {
spawn(TaskPriority::High, async move {
// Amortize the allocations over time. If we see that we need to do way larger
// allocations, we adjust to that over time.
let mut allocation_size = DEFAULT_ALLOCATION_SIZE;
let options = options.clone();

while let Ok((outcome, mut receiver, mut sender)) = rx_receiver.recv().await {
while let Ok(morsel) = receiver.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = CsvWriter::new(&mut buffer)
.include_bom(false) // Handled once in the IO task.
.include_header(false) // Handled once in the IO task.
.with_separator(options.serialize_options.separator)
.with_line_terminator(options.serialize_options.line_terminator.clone())
.with_quote_char(options.serialize_options.quote_char)
.with_datetime_format(options.serialize_options.datetime_format.clone())
.with_date_format(options.serialize_options.date_format.clone())
.with_time_format(options.serialize_options.time_format.clone())
.with_float_scientific(options.serialize_options.float_scientific)
.with_float_precision(options.serialize_options.float_precision)
.with_null_value(options.serialize_options.null.clone())
.with_quote_style(options.serialize_options.quote_style)
.n_threads(1) // Disable rayon parallelism
.batched(&schema)?;

writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());
sender.insert(Priority(Reverse(seq), buffer)).await.unwrap();
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
while let Ok(morsel) = rx.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = CsvWriter::new(&mut buffer)
.include_bom(false) // Handled once in the IO task.
.include_header(false) // Handled once in the IO task.
.with_separator(options.serialize_options.separator)
.with_line_terminator(options.serialize_options.line_terminator.clone())
.with_quote_char(options.serialize_options.quote_char)
.with_datetime_format(options.serialize_options.datetime_format.clone())
.with_date_format(options.serialize_options.date_format.clone())
.with_time_format(options.serialize_options.time_format.clone())
.with_float_scientific(options.serialize_options.float_scientific)
.with_float_precision(options.serialize_options.float_precision)
.with_null_value(options.serialize_options.null.clone())
.with_quote_style(options.serialize_options.quote_style)
.n_threads(1) // Disable rayon parallelism
.batched(&schema)?;

writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());
if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
return Ok(());
}

outcome.stopped();
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
}

PolarsResult::Ok(())
}));
}
})
}));

// IO task.
//
Expand Down Expand Up @@ -133,11 +134,8 @@ impl SinkNode for CsvSinkNode {
file = tokio::fs::File::from_std(std_file);
}

while let Ok((outcome, mut linearizer)) = recv_linearizer.recv().await {
while let Some(Priority(_, buffer)) = linearizer.get().await {
file.write_all(&buffer).await?;
}
outcome.stopped();
while let Some(Priority(_, buffer)) = lin_rx.get().await {
file.write_all(&buffer).await?;
}

PolarsResult::Ok(())
Expand Down
73 changes: 18 additions & 55 deletions crates/polars-stream/src/nodes/io_sinks/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use std::cmp::Reverse;
use std::io::BufWriter;
use std::path::PathBuf;

use polars_core::config;
use polars_core::frame::DataFrame;
use polars_core::prelude::CompatLevel;
use polars_core::schema::{SchemaExt, SchemaRef};
use polars_core::utils::arrow;
Expand All @@ -18,7 +16,10 @@ use polars_io::ipc::{IpcWriter, IpcWriterOptions};
use polars_io::SerWriter;
use polars_utils::priority::Priority;

use super::{SinkNode, DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE};
use super::{
buffer_and_distribute_columns_task, SinkNode, DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE,
DEFAULT_SINK_LINEARIZER_BUFFER_SIZE,
};
use crate::async_executor::spawn;
use crate::async_primitives::connector::connector;
use crate::async_primitives::distributor_channel::distributor_channel;
Expand Down Expand Up @@ -69,75 +70,37 @@ impl SinkNode for IpcSinkNode {
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
// .. -> Buffer task
let mut buffer_rx = recv_ports_recv.serial(join_handles);
let buffer_rx = recv_ports_recv.serial(join_handles);
// Buffer task -> Encode tasks
let (mut dist_tx, dist_rxs) =
let (dist_tx, dist_rxs) =
distributor_channel(num_pipelines, DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE);
// Encode tasks -> Collect task
let (mut lin_rx, lin_txs) =
Linearizer::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE);
// Collect task -> IO task
let (mut io_task_tx, mut io_task_rx) = connector::<(Vec<EncodedData>, EncodedData)>();
let (mut io_tx, mut io_rx) = connector::<(Vec<EncodedData>, EncodedData)>();

let options = WriteOptions {
compression: self.write_options.compression.map(Into::into),
};

let path = self.path.clone();
let input_schema = self.input_schema.clone();
let compat_level = self.compat_level;
let chunk_size = self.chunk_size;

let ipc_fields = input_schema
let ipc_fields = self
.input_schema
.iter_fields()
.map(|f| f.to_arrow(compat_level))
.collect::<Vec<_>>();
let ipc_fields = default_ipc_fields(ipc_fields.iter());

// Buffer task.
//
// This task linearizes and buffers morsels until a given a maximum chunk size is reached
// and then sends the whole record batch to be encoded and written.
join_handles.push(spawn(TaskPriority::High, async move {
let mut seq = 0usize;
let mut buffer = DataFrame::empty_with_schema(input_schema.as_ref());

while let Ok(morsel) = buffer_rx.recv().await {
let (df, _, _, consume_token) = morsel.into_inner();
// @NOTE: This also performs schema validation.
buffer.vstack_mut(&df)?;

while buffer.height() >= chunk_size {
let df;
(df, buffer) = buffer.split_at(buffer.height().min(chunk_size) as i64);

for (i, column) in df.take_columns().into_iter().enumerate() {
if dist_tx.send((seq, i, column)).await.is_err() {
return Ok(());
}
}
seq += 1;
}
drop(consume_token); // Increase the backpressure. Only free up a pipeline when the
// morsel has started encoding in its entirety. This still
// allows for parallelism of Morsels, but prevents large
// bunches of Morsels from stacking up here.
}

if config::verbose() {
eprintln!("[ipc_sink]: Flushing last chunk for '{}'", path.display());
}

// Flush the remaining rows.
assert!(buffer.height() <= chunk_size);
for (i, column) in buffer.take_columns().into_iter().enumerate() {
if dist_tx.send((seq, i, column)).await.is_err() {
return Ok(());
}
}

PolarsResult::Ok(())
}));
join_handles.push(buffer_and_distribute_columns_task(
buffer_rx,
dist_tx,
chunk_size,
self.input_schema.clone(),
));

// Encoding tasks.
//
Expand Down Expand Up @@ -202,7 +165,7 @@ impl SinkNode for IpcSinkNode {
//
// Collects all the encoded data and packs it together for the IO task to write it.
let input_schema = self.input_schema.clone();
join_handles.push(spawn(TaskPriority::Low, async move {
join_handles.push(spawn(TaskPriority::High, async move {
let mut dictionary_tracker = DictionaryTracker {
dictionaries: Default::default(),
cannot_replace: false,
Expand Down Expand Up @@ -308,7 +271,7 @@ impl SinkNode for IpcSinkNode {
&mut encoded_data,
);

if io_task_tx
if io_tx
.send((
std::mem::take(&mut current.encoded_dictionaries),
encoded_data,
Expand Down Expand Up @@ -346,7 +309,7 @@ impl SinkNode for IpcSinkNode {
.with_parallel(false)
.batched(&input_schema)?;

while let Ok((dicts, record_batch)) = io_task_rx.recv().await {
while let Ok((dicts, record_batch)) = io_rx.recv().await {
// @TODO: At the moment this is a sync write, this is not ideal because we can only
// have so many blocking threads in the tokio threadpool.
writer.write_encoded(dicts.as_slice(), &record_batch)?;
Expand Down
43 changes: 21 additions & 22 deletions crates/polars-stream/src/nodes/io_sinks/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use polars_utils::priority::Priority;

use super::{SinkNode, SinkRecvPort};
use crate::async_executor::spawn;
use crate::async_primitives::linearizer::Linearizer;
use crate::nodes::io_sinks::DEFAULT_SINK_LINEARIZER_BUFFER_SIZE;
use crate::nodes::{JoinHandle, MorselSeq, TaskPriority};

type Linearized = Priority<Reverse<MorselSeq>, Vec<u8>>;
Expand All @@ -31,43 +33,43 @@ impl SinkNode for NDJsonSinkNode {

fn spawn_sink(
&mut self,
_num_pipelines: usize,
num_pipelines: usize,
recv_ports_recv: SinkRecvPort,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
let (handle, rx_receivers, mut rx_linearizer) =
recv_ports_recv.parallel_into_linearize::<Linearized>();
join_handles.push(handle);
// .. -> Encode task
let rxs = recv_ports_recv.parallel(join_handles);
// Encode tasks -> IO task
let (mut lin_rx, lin_txs) =
Linearizer::<Linearized>::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE);

// 16MB
const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24;

// Encode task.
//
// Task encodes the columns into their corresponding JSON encoding.
join_handles.extend(rx_receivers.into_iter().map(|mut rx_receiver| {
join_handles.extend(rxs.into_iter().zip(lin_txs).map(|(mut rx, mut lin_tx)| {
spawn(TaskPriority::High, async move {
// Amortize the allocations over time. If we see that we need to do way larger
// allocations, we adjust to that over time.
let mut allocation_size = DEFAULT_ALLOCATION_SIZE;

while let Ok((outcome, mut rx, mut sender)) = rx_receiver.recv().await {
while let Ok(morsel) = rx.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();
while let Ok(morsel) = rx.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = BatchedWriter::new(&mut buffer);
let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = BatchedWriter::new(&mut buffer);

writer.write_batch(&df)?;
writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());
sender.insert(Priority(Reverse(seq), buffer)).await.unwrap();
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
allocation_size = allocation_size.max(buffer.len());
if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
return Ok(());
}

outcome.stopped();
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
}

PolarsResult::Ok(())
Expand All @@ -91,11 +93,8 @@ impl SinkNode for NDJsonSinkNode {
.await
.map_err(|err| polars_utils::_limit_path_len_io_err(path.as_path(), err))?;

while let Ok((outcome, mut linearizer)) = rx_linearizer.recv().await {
while let Some(Priority(_, buffer)) = linearizer.get().await {
file.write_all(&buffer).await?;
}
outcome.stopped();
while let Some(Priority(_, buffer)) = lin_rx.get().await {
file.write_all(&buffer).await?;
}

PolarsResult::Ok(())
Expand Down
Loading
Loading