Skip to content

Commit

Permalink
fix: Create new linearizer between rowwise new streaming sink phases (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored and anath2 committed Mar 5, 2025
1 parent c899d13 commit c91a8f8
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 52 deletions.
32 changes: 20 additions & 12 deletions crates/polars-stream/src/nodes/io_sinks/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ use polars_utils::priority::Priority;

use super::{SinkNode, SinkRecvPort};
use crate::async_executor::spawn;
use crate::async_primitives::linearizer::Linearizer;
use crate::nodes::{JoinHandle, MorselSeq, TaskPriority};
use crate::DEFAULT_LINEARIZER_BUFFER_SIZE;

type Linearized = Priority<Reverse<MorselSeq>, Vec<u8>>;
pub struct CsvSinkNode {
Expand Down Expand Up @@ -42,25 +40,23 @@ impl SinkNode for CsvSinkNode {

fn spawn_sink(
&mut self,
num_pipelines: usize,
_num_pipelines: usize,
recv_ports_recv: SinkRecvPort,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
// .. -> Encode task
let (handle, recv_ports_recv) = recv_ports_recv.parallel();
let (handle, recv_ports_recv, mut recv_linearizer) =
recv_ports_recv.parallel_into_linearize::<Linearized>();
join_handles.push(handle);
// Encode tasks -> IO task
let (mut linearizer, senders) =
Linearizer::<Linearized>::new(num_pipelines, DEFAULT_LINEARIZER_BUFFER_SIZE);

// 16MB
const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24;

// Encode task.
//
// Task encodes the columns into their corresponding CSV encoding.
for (mut rx_receiver, mut sender) in recv_ports_recv.into_iter().zip(senders.into_iter()) {
for mut rx_receiver in recv_ports_recv {
let schema = self.schema.clone();
let options = self.write_options.clone();

Expand All @@ -70,9 +66,11 @@ impl SinkNode for CsvSinkNode {
let mut allocation_size = DEFAULT_ALLOCATION_SIZE;
let options = options.clone();

while let Ok((_token, outcome, mut receiver)) = rx_receiver.recv().await {
while let Ok((phase_consume_token, outcome, mut receiver, mut sender)) =
rx_receiver.recv().await
{
while let Ok(morsel) = receiver.recv().await {
let (df, seq, _, _) = morsel.into_inner();
let (df, seq, _, consume_token) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = CsvWriter::new(&mut buffer)
Expand All @@ -95,9 +93,12 @@ impl SinkNode for CsvSinkNode {

allocation_size = allocation_size.max(buffer.len());
sender.insert(Priority(Reverse(seq), buffer)).await.unwrap();
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
}

outcome.stop();
drop(phase_consume_token);
}

PolarsResult::Ok(())
Expand Down Expand Up @@ -135,8 +136,15 @@ impl SinkNode for CsvSinkNode {
file = tokio::fs::File::from_std(std_file);
}

while let Some(Priority(_, buffer)) = linearizer.get().await {
file.write_all(&buffer).await?;
while let Ok((phase_consume_token, outcome, mut linearizer)) =
recv_linearizer.recv().await
{
while let Some(Priority(_, buffer)) = linearizer.get().await {
file.write_all(&buffer).await?;
}

outcome.stop();
drop(phase_consume_token);
}

PolarsResult::Ok(())
Expand Down
72 changes: 37 additions & 35 deletions crates/polars-stream/src/nodes/io_sinks/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use polars_utils::priority::Priority;

use super::{SinkNode, SinkRecvPort};
use crate::async_executor::spawn;
use crate::async_primitives::linearizer::Linearizer;
use crate::nodes::{JoinHandle, MorselSeq, TaskPriority};
use crate::DEFAULT_LINEARIZER_BUFFER_SIZE;

type Linearized = Priority<Reverse<MorselSeq>, Vec<u8>>;
pub struct NDJsonSinkNode {
Expand All @@ -33,51 +31,51 @@ impl SinkNode for NDJsonSinkNode {

fn spawn_sink(
&mut self,
num_pipelines: usize,
_num_pipelines: usize,
recv_ports_recv: SinkRecvPort,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
let (handle, rx_receivers) = recv_ports_recv.parallel();
let (handle, rx_receivers, mut rx_linearizer) =
recv_ports_recv.parallel_into_linearize::<Linearized>();
join_handles.push(handle);

// Encode tasks -> IO task
let (mut linearizer, senders) =
Linearizer::<Linearized>::new(num_pipelines, DEFAULT_LINEARIZER_BUFFER_SIZE);

// 16MB
const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24;

// Encode task.
//
// Task encodes the columns into their corresponding JSON encoding.
join_handles.extend(rx_receivers.into_iter().zip(senders).map(
|(mut rx_receiver, mut sender)| {
spawn(TaskPriority::High, async move {
// Amortize the allocations over time. If we see that we need to do way larger
// allocations, we adjust to that over time.
let mut allocation_size = DEFAULT_ALLOCATION_SIZE;

while let Ok((_token, outcome, mut rx)) = rx_receiver.recv().await {
while let Ok(morsel) = rx.recv().await {
let (df, seq, _, _) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = BatchedWriter::new(&mut buffer);

writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());
sender.insert(Priority(Reverse(seq), buffer)).await.unwrap();
}

outcome.stop();
join_handles.extend(rx_receivers.into_iter().map(|mut rx_receiver| {
spawn(TaskPriority::High, async move {
// Amortize the allocations over time. If we see that we need to do way larger
// allocations, we adjust to that over time.
let mut allocation_size = DEFAULT_ALLOCATION_SIZE;

while let Ok((phase_consume_token, outcome, mut rx, mut sender)) =
rx_receiver.recv().await
{
while let Ok(morsel) = rx.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = BatchedWriter::new(&mut buffer);

writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());
sender.insert(Priority(Reverse(seq), buffer)).await.unwrap();
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
}

PolarsResult::Ok(())
})
},
));
outcome.stop();
drop(phase_consume_token);
}

PolarsResult::Ok(())
})
}));

let path = self.path.clone();

Expand All @@ -96,8 +94,12 @@ impl SinkNode for NDJsonSinkNode {
.await
.map_err(|err| polars_utils::_limit_path_len_io_err(path.as_path(), err))?;

while let Some(Priority(_, buffer)) = linearizer.get().await {
file.write_all(&buffer).await?;
while let Ok((consume_token, outcome, mut linearizer)) = rx_linearizer.recv().await {
while let Some(Priority(_, buffer)) = linearizer.get().await {
file.write_all(&buffer).await?;
}
outcome.stop();
drop(consume_token);
}

PolarsResult::Ok(())
Expand Down
34 changes: 29 additions & 5 deletions crates/polars-stream/src/nodes/io_sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::io_sources::PhaseOutcomeToken;
use super::{ComputeNode, JoinHandle, Morsel, PortState, RecvPort, SendPort, TaskScope};
use crate::async_executor::{spawn, AbortOnDropHandle};
use crate::async_primitives::connector::{connector, Receiver, Sender};
use crate::async_primitives::linearizer::{Inserter, Linearizer};
use crate::async_primitives::wait_group::{WaitGroup, WaitToken};
use crate::nodes::TaskPriority;

Expand All @@ -20,6 +21,9 @@ pub mod json;
#[cfg(feature = "parquet")]
pub mod parquet;

// This needs to be low to increase the backpressure.
const DEFAULT_SINK_LINEARIZER_BUFFER_SIZE: usize = 1;

pub enum SinkInputPort {
Serial(Receiver<Morsel>),
Parallel(Vec<Receiver<Morsel>>),
Expand Down Expand Up @@ -56,34 +60,54 @@ impl SinkInputPort {
}

impl SinkRecvPort {
/// Receive the [`RecvPort`] in parallel and create a [`Linearizer`] for each phase.
///
/// This is useful for sinks that process incoming [`Morsel`]s row-wise as the processing can
/// be done in parallel and then be linearized into the actual sink.
#[allow(clippy::type_complexity)]
pub fn parallel(
pub fn parallel_into_linearize<T: Send + Sync + Ord + 'static>(
mut self,
) -> (
JoinHandle<PolarsResult<()>>,
Vec<Receiver<(WaitToken, PhaseOutcomeToken, Receiver<Morsel>)>>,
Vec<Receiver<(WaitToken, PhaseOutcomeToken, Receiver<Morsel>, Inserter<T>)>>,
Receiver<(WaitToken, PhaseOutcomeToken, Linearizer<T>)>,
) {
let (mut rx_senders, rx_receivers) = (0..self.num_pipelines)
.map(|_| connector())
.collect::<(Vec<_>, Vec<_>)>();
let (mut tx_linearizer, rx_linearizer) = connector();
let handle = spawn(TaskPriority::High, async move {
let wg = WaitGroup::default();

while let Ok(input) = self.recv.recv().await {
let inputs = input.port.parallel();

let (linearizer, senders) =
Linearizer::<T>::new(self.num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE);

let mut outcomes = Vec::with_capacity(inputs.len());
for (input, rx_sender) in inputs.into_iter().zip(rx_senders.iter_mut()) {
for ((input, rx_sender), sender) in
inputs.into_iter().zip(rx_senders.iter_mut()).zip(senders)
{
let outcome = PhaseOutcomeToken::new();
if rx_sender
.send((wg.token(), outcome.clone(), input))
.send((wg.token(), outcome.clone(), input, sender))
.await
.is_err()
{
return Ok(());
}
outcomes.push(outcome);
}
let outcome = PhaseOutcomeToken::new();
if tx_linearizer
.send((wg.token(), outcome.clone(), linearizer))
.await
.is_err()
{
return Ok(());
}
outcomes.push(outcome);

wg.wait().await;
for outcome in outcomes {
Expand All @@ -97,7 +121,7 @@ impl SinkRecvPort {
Ok(())
});

(handle, rx_receivers)
(handle, rx_receivers, rx_linearizer)
}
fn serial(self) -> Receiver<SinkInput> {
self.recv
Expand Down

0 comments on commit c91a8f8

Please sign in to comment.