From bb5f61b72fc71fd9a04a62a20fb3075d9cffb734 Mon Sep 17 00:00:00 2001 From: Tiphaine Ruy Date: Tue, 24 Aug 2021 21:27:19 +0200 Subject: [PATCH 1/2] fixes #933 replace placeholder fmt_as fr ExecutionPlan impls --- .../src/execution_plans/distributed_query.rs | 19 ++++++++- ballista/rust/executor/src/collect.rs | 16 +++++++- datafusion/src/physical_plan/json.rs | 13 +++++++ datafusion/src/physical_plan/planner.rs | 13 +++++++ datafusion/src/physical_plan/union.rs | 14 ++++++- .../physical_plan/windows/window_agg_exec.rs | 16 +++++++- datafusion/src/test/exec.rs | 39 ++++++++++++++++++- datafusion/tests/provider_filter_pushdown.rs | 16 +++++++- 8 files changed, 139 insertions(+), 7 deletions(-) diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs index 8abfe6678893..7793ad9e9244 100644 --- a/ballista/rust/core/src/execution_plans/distributed_query.rs +++ b/ballista/rust/core/src/execution_plans/distributed_query.rs @@ -34,7 +34,8 @@ use datafusion::arrow::datatypes::{Schema, SchemaRef}; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::{ - ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, }; use async_trait::async_trait; @@ -186,6 +187,22 @@ impl ExecutionPlan for DistributedQueryExec { }; } } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!( + f, + "DistributedQueryExec: scheduler_url={}", + self.scheduler_url + ) + } + } + } } async fn fetch_partition( diff --git a/ballista/rust/executor/src/collect.rs b/ballista/rust/executor/src/collect.rs index a4c544f8c47b..e9448c82d861 100644 --- a/ballista/rust/executor/src/collect.rs +++ b/ballista/rust/executor/src/collect.rs @@ -27,7 +27,9 @@ use datafusion::arrow::{ datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch, }; use datafusion::error::DataFusionError; -use datafusion::physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; +use datafusion::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, +}; use datafusion::{error::Result, physical_plan::RecordBatchStream}; use futures::stream::SelectAll; use futures::Stream; @@ -102,6 +104,18 @@ impl ExecutionPlan for CollectExec { select_all: Box::pin(futures::stream::select_all(streams)), })) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "CollectExec") + } + } + } } struct MergedRecordBatchStream { diff --git a/datafusion/src/physical_plan/json.rs b/datafusion/src/physical_plan/json.rs index ed9b0b03a38e..24631c57739e 100644 --- a/datafusion/src/physical_plan/json.rs +++ b/datafusion/src/physical_plan/json.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use futures::Stream; +use super::DisplayFormatType; use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream}; use crate::error::{DataFusionError, Result}; use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter}; @@ -311,6 +312,18 @@ impl ExecutionPlan for NdJsonExec { } } } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "NdJsonExec: source={:?}", self.source) + } + } + } } struct NdJsonStream { diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 02ab15d1a652..81063280b48e 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -1376,6 +1376,7 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { mod tests { use super::*; use crate::logical_plan::{DFField, DFSchema, DFSchemaRef}; + use crate::physical_plan::DisplayFormatType; use crate::physical_plan::{csv::CsvReadOptions, expressions, Partitioning}; use crate::scalar::ScalarValue; use crate::{ @@ -1777,6 +1778,18 @@ mod tests { async fn execute(&self, _partition: usize) -> Result { unimplemented!("NoOpExecutionPlan::execute"); } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "NoOpExecutionPlan") + } + } + } } // Produces an execution plan where the schema is mismatched from diff --git a/datafusion/src/physical_plan/union.rs b/datafusion/src/physical_plan/union.rs index cbab728a8428..932bd5c5c0f5 100644 --- a/datafusion/src/physical_plan/union.rs +++ b/datafusion/src/physical_plan/union.rs @@ -25,7 +25,7 @@ use std::{any::Any, sync::Arc}; use arrow::datatypes::SchemaRef; -use super::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; +use super::{DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream}; use crate::error::Result; use async_trait::async_trait; @@ -94,6 +94,18 @@ impl ExecutionPlan for UnionExec { partition ))) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "UnionExec") + } + } + } } #[cfg(test)] diff --git a/datafusion/src/physical_plan/windows/window_agg_exec.rs b/datafusion/src/physical_plan/windows/window_agg_exec.rs index 2ff1f34ce4c7..d25eebf4de8c 100644 --- a/datafusion/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/src/physical_plan/windows/window_agg_exec.rs @@ -19,8 +19,8 @@ use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ - common, Distribution, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, WindowExpr, + common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + RecordBatchStream, SendableRecordBatchStream, WindowExpr, }; use arrow::{ array::ArrayRef, @@ -143,6 +143,18 @@ impl ExecutionPlan for WindowAggExec { )); Ok(stream) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "WindowAggExec") + } + } + } } fn create_schema( diff --git a/datafusion/src/test/exec.rs b/datafusion/src/test/exec.rs index 247dab1fb5bd..fa1f36c230f9 100644 --- a/datafusion/src/test/exec.rs +++ b/datafusion/src/test/exec.rs @@ -33,7 +33,8 @@ use arrow::{ use futures::Stream; use crate::physical_plan::{ - ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, }; use crate::{ error::{DataFusionError, Result}, @@ -190,6 +191,18 @@ impl ExecutionPlan for MockExec { // returned stream simply reads off the rx stream Ok(RecordBatchReceiverStream::create(&self.schema, rx)) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "MockExec") + } + } + } } fn clone_error(e: &ArrowError) -> ArrowError { @@ -281,6 +294,18 @@ impl ExecutionPlan for BarrierExec { // returned stream simply reads off the rx stream Ok(RecordBatchReceiverStream::create(&self.schema, rx)) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "BarrierExec") + } + } + } } /// A mock execution plan that errors on a call to execute @@ -331,4 +356,16 @@ impl ExecutionPlan for ErrorExec { partition ))) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "ErrorExec") + } + } + } } diff --git a/datafusion/tests/provider_filter_pushdown.rs b/datafusion/tests/provider_filter_pushdown.rs index 0bf67bea8b9d..07b0eb2bb2ce 100644 --- a/datafusion/tests/provider_filter_pushdown.rs +++ b/datafusion/tests/provider_filter_pushdown.rs @@ -26,7 +26,9 @@ use datafusion::error::Result; use datafusion::execution::context::ExecutionContext; use datafusion::logical_plan::Expr; use datafusion::physical_plan::common::SizedRecordBatchStream; -use datafusion::physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; +use datafusion::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, +}; use datafusion::prelude::*; use datafusion::scalar::ScalarValue; use std::sync::Arc; @@ -84,6 +86,18 @@ impl ExecutionPlan for CustomPlan { self.batches.clone(), ))) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "CustomPlan: batch_size={}", self.batches.len(),) + } + } + } } #[derive(Clone)] From c5e46b131535b21698860f1e2c5aaec40b53c4ce Mon Sep 17 00:00:00 2001 From: Tiphaine Ruy Date: Wed, 25 Aug 2021 09:15:45 +0200 Subject: [PATCH 2/2] Add window_expr vec to fmt_as for window_agg_exec --- datafusion/src/physical_plan/windows/window_agg_exec.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/windows/window_agg_exec.rs b/datafusion/src/physical_plan/windows/window_agg_exec.rs index d25eebf4de8c..c7466477ce79 100644 --- a/datafusion/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/src/physical_plan/windows/window_agg_exec.rs @@ -151,9 +151,16 @@ impl ExecutionPlan for WindowAggExec { ) -> std::fmt::Result { match t { DisplayFormatType::Default => { - write!(f, "WindowAggExec") + write!(f, "WindowAggExec: ")?; + let g: Vec = self + .window_expr + .iter() + .map(|e| format!("{}: {:?}", e.name().to_owned(), e.field())) + .collect(); + write!(f, "wdw=[{}]", g.join(", "))?; } } + Ok(()) } }