From cb06b898a043b2d06ea012c2116d1ad42a5f2741 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 9 Oct 2023 15:07:35 +0200 Subject: [PATCH 1/6] File sink additions --- .../core/src/datasource/file_format/csv.rs | 9 ++++++ .../core/src/datasource/file_format/json.rs | 9 ++++++ .../src/datasource/file_format/parquet.rs | 9 ++++++ datafusion/core/src/datasource/memory.rs | 9 ++++++ datafusion/physical-plan/src/insert.rs | 28 +++++++++++++++++++ 5 files changed, 64 insertions(+) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 897174659e13..05df99af9a0b 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -33,6 +33,7 @@ use datafusion_physical_expr::PhysicalExpr; use async_trait::async_trait; use bytes::{Buf, Bytes}; +use datafusion_physical_plan::metrics::MetricsSet; use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; @@ -478,6 +479,14 @@ impl CsvSink { #[async_trait] impl DataSink for CsvSink { + fn as_any(&self) -> &dyn Any { + self + } + + fn metrics(&self) -> Option { + None + } + async fn write_all( &self, data: Vec, diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index c715317a9527..585ab82bc804 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -24,6 +24,7 @@ use datafusion_common::not_impl_err; use datafusion_common::DataFusionError; use datafusion_common::FileType; use datafusion_execution::TaskContext; +use datafusion_physical_plan::metrics::MetricsSet; use rand::distributions::Alphanumeric; use rand::distributions::DistString; use std::fmt; @@ -269,6 +270,14 @@ impl JsonSink { #[async_trait] impl DataSink for JsonSink { + fn as_any(&self) -> &dyn Any { + self + } + + fn metrics(&self) -> Option { + None + } + async fn write_all( &self, data: Vec, diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index ebdf3ea444b1..ea430b81e96b 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -17,6 +17,7 @@ //! Parquet format abstractions +use datafusion_physical_plan::metrics::MetricsSet; use parquet::column::writer::ColumnCloseResult; use parquet::file::writer::SerializedFileWriter; use rand::distributions::DistString; @@ -751,6 +752,14 @@ impl ParquetSink { #[async_trait] impl DataSink for ParquetSink { + fn as_any(&self) -> &dyn Any { + self + } + + fn metrics(&self) -> Option { + None + } + async fn write_all( &self, mut data: Vec, diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 337a8cabc269..ee01c0360300 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -17,6 +17,7 @@ //! [`MemTable`] for querying `Vec` by DataFusion. +use datafusion_physical_plan::metrics::MetricsSet; use futures::StreamExt; use log::debug; use std::any::Any; @@ -260,6 +261,14 @@ impl MemSink { #[async_trait] impl DataSink for MemSink { + fn as_any(&self) -> &dyn Any { + self + } + + fn metrics(&self) -> Option { + None + } + async fn write_all( &self, mut data: Vec, diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 8b467461ddad..6c1a16b53309 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -35,6 +35,7 @@ use std::any::Any; use std::fmt::Debug; use std::sync::Arc; +use crate::metrics::MetricsSet; use crate::stream::RecordBatchStreamAdapter; use datafusion_common::{exec_err, internal_err, DataFusionError}; use datafusion_execution::TaskContext; @@ -46,6 +47,23 @@ use datafusion_execution::TaskContext; /// output. #[async_trait] pub trait DataSink: DisplayAs + Debug + Send + Sync { + /// Returns the data sink as [`Any`](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Return a snapshot of the set of [`Metric`]s for this + /// [`ExecutionPlan`]. + /// + /// While the values of the metrics in the returned + /// [`MetricsSet`]s may change as execution progresses, the + /// specific metrics will not. + /// + /// Once `self.execute()` has returned (technically the future is + /// resolved) for all available partitions, the set of metrics + /// should be complete. If this function is called prior to + /// `execute()` new metrics may appear in subsequent calls. + fn metrics(&self) -> Option; + // TODO add desired input ordering // How does this sink want its input ordered? @@ -147,6 +165,16 @@ impl FileSinkExec { } Ok(streams) } + + /// Returns insert sink + pub fn sink(&self) -> &dyn DataSink { + self.sink.as_ref() + } + + /// Returns the metrics of the underlying [DataSink] + pub fn metrics(&self) -> Option { + self.sink.metrics() + } } impl DisplayAs for FileSinkExec { From 4d853f28fc52bc89c798021772fab7fc524101e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 9 Oct 2023 15:16:26 +0200 Subject: [PATCH 2/6] Fmt --- datafusion/core/src/datasource/file_format/json.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 1807cf7ff81c..fa8fb5a72331 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -24,8 +24,8 @@ use datafusion_common::not_impl_err; use datafusion_common::DataFusionError; use datafusion_common::FileType; use datafusion_execution::TaskContext; -use datafusion_physical_plan::metrics::MetricsSet; use datafusion_physical_expr::PhysicalSortRequirement; +use datafusion_physical_plan::metrics::MetricsSet; use rand::distributions::Alphanumeric; use rand::distributions::DistString; use std::fmt; From 18bf27fc8028ef8fb3f68c61ba2102c408880990 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 9 Oct 2023 16:00:21 +0200 Subject: [PATCH 3/6] Clippy --- datafusion/physical-plan/src/insert.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index ee6ca325df2b..46da4831476f 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -51,11 +51,11 @@ pub trait DataSink: DisplayAs + Debug + Send + Sync { /// downcast to a specific implementation. fn as_any(&self) -> &dyn Any; - /// Return a snapshot of the set of [`Metric`]s for this + /// Return a snapshot of the [`MetricsSet`] for this /// [`ExecutionPlan`]. /// /// While the values of the metrics in the returned - /// [`MetricsSet`]s may change as execution progresses, the + /// [`MetricsSet`] may change as execution progresses, the /// specific metrics will not. /// /// Once `self.execute()` has returned (technically the future is From 818687555b0fab9c3ee1ba7e9204fca06dbed82e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 10 Oct 2023 09:29:33 +0200 Subject: [PATCH 4/6] Update datafusion/physical-plan/src/insert.rs Co-authored-by: Andrew Lamb --- datafusion/physical-plan/src/insert.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 46da4831476f..4d62fe3f7cb4 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -52,7 +52,7 @@ pub trait DataSink: DisplayAs + Debug + Send + Sync { fn as_any(&self) -> &dyn Any; /// Return a snapshot of the [`MetricsSet`] for this - /// [`ExecutionPlan`]. + /// [`DataSink`]. /// /// While the values of the metrics in the returned /// [`MetricsSet`] may change as execution progresses, the From 52ffb7e0034f146046288867df6c64bdf578ec6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 10 Oct 2023 11:55:19 +0200 Subject: [PATCH 5/6] Feedback --- datafusion/physical-plan/src/insert.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 46da4831476f..bbbe9524c63e 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -40,6 +40,7 @@ use crate::stream::RecordBatchStreamAdapter; use datafusion_common::{exec_err, internal_err, DataFusionError}; use datafusion_execution::TaskContext; + /// `DataSink` implements writing streams of [`RecordBatch`]es to /// user defined destinations. /// @@ -51,17 +52,10 @@ pub trait DataSink: DisplayAs + Debug + Send + Sync { /// downcast to a specific implementation. fn as_any(&self) -> &dyn Any; - /// Return a snapshot of the [`MetricsSet`] for this - /// [`ExecutionPlan`]. - /// - /// While the values of the metrics in the returned - /// [`MetricsSet`] may change as execution progresses, the - /// specific metrics will not. + /// Return a snapshot of the [MetricsSet] for this + /// [DataSink]. /// - /// Once `self.execute()` has returned (technically the future is - /// resolved) for all available partitions, the set of metrics - /// should be complete. If this function is called prior to - /// `execute()` new metrics may appear in subsequent calls. + /// See [ExecutionPlan::metrics()] for more details fn metrics(&self) -> Option; // TODO add desired input ordering From e27f8c4b53505c8d8abda699b4a69659d783036e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 10 Oct 2023 14:36:12 +0200 Subject: [PATCH 6/6] Fmt --- datafusion/physical-plan/src/insert.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index bbbe9524c63e..bff20e85b7ff 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -40,7 +40,6 @@ use crate::stream::RecordBatchStreamAdapter; use datafusion_common::{exec_err, internal_err, DataFusionError}; use datafusion_execution::TaskContext; - /// `DataSink` implements writing streams of [`RecordBatch`]es to /// user defined destinations. ///