Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add NDJson sink for the new streaming engine #20805

Merged
merged 2 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ Fo = "Fo"
ND = "ND"
nd = "nd"
NDJson = "NDJson"
NDJsonSinkNode = "NDJsonSinkNode"
NDJsonReadOptions = "NDJsonReadOptions"
opt_nd = "opt_nd"
123 changes: 123 additions & 0 deletions crates/polars-stream/src/nodes/io_sinks/json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::cmp::Reverse;
use std::io::Write;
use std::path::{Path, PathBuf};

use polars_error::PolarsResult;
use polars_expr::state::ExecutionState;
use polars_io::json::BatchedWriter;
use polars_utils::priority::Priority;

use crate::async_primitives::linearizer::Linearizer;
use crate::nodes::{ComputeNode, JoinHandle, MorselSeq, PortState, TaskPriority, TaskScope};
use crate::pipe::{RecvPort, SendPort};
use crate::DEFAULT_LINEARIZER_BUFFER_SIZE;

pub struct NDJsonSinkNode {
path: PathBuf,
}

impl NDJsonSinkNode {
pub fn new(path: &Path) -> PolarsResult<Self> {
Ok(Self {
path: path.to_path_buf(),
})
}
}

impl ComputeNode for NDJsonSinkNode {
fn name(&self) -> &str {
"ndjson_sink"
}

fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) -> PolarsResult<()> {
assert!(send.is_empty());
assert!(recv.len() == 1);

// We are always ready to receive, unless the sender is done, then we're
// also done.
if recv[0] != PortState::Done {
recv[0] = PortState::Ready;
}

Ok(())
}

fn spawn<'env, 's>(
&'env mut self,
scope: &'s TaskScope<'s, 'env>,
recv_ports: &mut [Option<RecvPort<'_>>],
send_ports: &mut [Option<SendPort<'_>>],
_state: &'s ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
assert!(recv_ports.len() == 1);
assert!(send_ports.is_empty());

// .. -> Encode task
let receivers = recv_ports[0].take().unwrap().parallel();
// Encode tasks -> IO task
let (mut linearizer, senders) = Linearizer::<Priority<Reverse<MorselSeq>, Vec<u8>>>::new(
receivers.len(),
DEFAULT_LINEARIZER_BUFFER_SIZE,
);

let slf = &*self;

// 16MB
const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24;

// Encode task.
//
// Task encodes the columns into their corresponding CSV encoding.
for (mut receiver, mut sender) in receivers.into_iter().zip(senders) {
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
// Amortize the allocations over time. If we see that we need to do way larger
// allocations, we adjust to that over time.
let mut allocation_size = DEFAULT_ALLOCATION_SIZE;

while let Ok(morsel) = receiver.recv().await {
let (df, seq, _, _) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = BatchedWriter::new(&mut buffer);

writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());
sender.insert(Priority(Reverse(seq), buffer)).await.unwrap();
}

PolarsResult::Ok(())
}));
}

// IO task.
//
// Task that will actually do write to the target file.
let io_runtime = polars_io::pl_async::get_runtime();

let path = slf.path.clone();
let io_task = io_runtime.spawn(async move {
use tokio::fs::OpenOptions;

let file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path.as_path())
.await
.map_err(|err| polars_utils::_limit_path_len_io_err(path.as_path(), err))?;

let mut file = file.into_std().await;

// Write the header
while let Some(Priority(_, buffer)) = linearizer.get().await {
file.write_all(&buffer)?;
}

PolarsResult::Ok(())
});
join_handles
.push(scope.spawn_task(TaskPriority::Low, async move { io_task.await.unwrap() }));
}
}
2 changes: 2 additions & 0 deletions crates/polars-stream/src/nodes/io_sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@
pub mod csv;
#[cfg(feature = "ipc")]
pub mod ipc;
#[cfg(feature = "json")]
pub mod json;
#[cfg(feature = "parquet")]
pub mod parquet;
10 changes: 9 additions & 1 deletion crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,15 @@ pub fn lower_ir(
input: phys_input,
}
},
_ => todo!(),
#[cfg(feature = "json")]
FileType::Json(_) => {
let phys_input = lower_ir!(*input)?;
PhysNodeKind::FileSink {
path,
file_type,
input: phys_input,
}
},
}
},
},
Expand Down
6 changes: 5 additions & 1 deletion crates/polars-stream/src/physical_plan/to_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ fn to_graph_rec<'a>(
nodes::io_sinks::ipc::IpcSinkNode::new(input_schema, path, ipc_writer_options)?,
[(input_key, input.port)],
),
#[cfg(feature = "json")]
FileType::Json(_) => ctx.graph.add_node(
nodes::io_sinks::json::NDJsonSinkNode::new(path)?,
[(input_key, input.port)],
),
#[cfg(feature = "parquet")]
FileType::Parquet(parquet_writer_options) => ctx.graph.add_node(
nodes::io_sinks::parquet::ParquetSinkNode::new(
Expand All @@ -235,7 +240,6 @@ fn to_graph_rec<'a>(
nodes::io_sinks::csv::CsvSinkNode::new(input_schema, path, csv_writer_options)?,
[(input_key, input.port)],
),
_ => todo!(),
}
},

Expand Down
Loading