diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 00a92dea4cf1..fff6f26305fa 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -30,7 +30,7 @@ use datafusion::dataframe::DataFrame; use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::dataframe_impl::DataFrameImpl; -use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScanPlan}; +use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScan}; use datafusion::prelude::{AvroReadOptions, CsvReadOptions}; use datafusion::sql::parser::FileType; @@ -212,7 +212,7 @@ impl BallistaContext { options: CsvReadOptions<'_>, ) -> Result<()> { match self.read_csv(path, options).await?.to_logical_plan() { - LogicalPlan::TableScan(TableScanPlan { source, .. }) => { + LogicalPlan::TableScan(TableScan { source, .. }) => { self.register_table(name, source) } _ => Err(DataFusionError::Internal("Expected tables scan".to_owned())), @@ -221,7 +221,7 @@ impl BallistaContext { pub async fn register_parquet(&self, name: &str, path: &str) -> Result<()> { match self.read_parquet(path).await?.to_logical_plan() { - LogicalPlan::TableScan(TableScanPlan { source, .. }) => { + LogicalPlan::TableScan(TableScan { source, .. }) => { self.register_table(name, source) } _ => Err(DataFusionError::Internal("Expected tables scan".to_owned())), @@ -235,7 +235,7 @@ impl BallistaContext { options: AvroReadOptions<'_>, ) -> Result<()> { match self.read_avro(path, options).await?.to_logical_plan() { - LogicalPlan::TableScan(TableScanPlan { source, .. }) => { + LogicalPlan::TableScan(TableScan { source, .. }) => { self.register_table(name, source) } _ => Err(DataFusionError::Internal("Expected tables scan".to_owned())), diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 897558ca0a14..68ed7097632f 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -37,7 +37,7 @@ use datafusion::logical_plan::{ exprlist_to_fields, window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits}, Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, JoinType, Limit, - LogicalPlan, Repartition, TableScanPlan, Values, + LogicalPlan, Repartition, TableScan, Values, }; use datafusion::physical_plan::aggregates::AggregateFunction; use datafusion::physical_plan::functions::BuiltinScalarFunction; @@ -699,7 +699,7 @@ impl TryInto for &LogicalPlan { )), }) } - LogicalPlan::TableScan(TableScanPlan { + LogicalPlan::TableScan(TableScan { table_name, source, filters, diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 9e5ac1f19678..27116c0a4a95 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -77,7 +77,7 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches; use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec; use crate::physical_optimizer::repartition::Repartition; -use crate::logical_plan::plan::ExplainPlan; +use crate::logical_plan::plan::Explain; use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy; use crate::physical_plan::planner::DefaultPhysicalPlanner; use crate::physical_plan::udf::ScalarUDF; @@ -664,7 +664,7 @@ impl ExecutionContext { stringified_plans.push(optimized_plan.to_stringified(plan_type)); })?; - Ok(LogicalPlan::Explain(ExplainPlan { + Ok(LogicalPlan::Explain(Explain { verbose: e.verbose, plan: Arc::new(plan), stringified_plans, @@ -1178,7 +1178,7 @@ impl FunctionRegistry for ExecutionContextState { mod tests { use super::*; use crate::logical_plan::plan::Projection; - use crate::logical_plan::TableScanPlan; + use crate::logical_plan::TableScan; use crate::logical_plan::{binary_expr, lit, Operator}; use crate::physical_plan::functions::{make_scalar_function, Volatility}; use crate::physical_plan::{collect, collect_partitioned}; @@ -1417,7 +1417,7 @@ mod tests { let optimized_plan = ctx.optimize(&logical_plan)?; match &optimized_plan { LogicalPlan::Projection(Projection { input, .. }) => match &**input { - LogicalPlan::TableScan(TableScanPlan { + LogicalPlan::TableScan(TableScan { source, projected_schema, .. @@ -1490,7 +1490,7 @@ mod tests { let optimized_plan = ctx.optimize(&plan)?; match &optimized_plan { LogicalPlan::Projection(Projection { input, .. }) => match &**input { - LogicalPlan::TableScan(TableScanPlan { + LogicalPlan::TableScan(TableScan { source, projected_schema, .. diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index e7f34412568d..fc7df595586f 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -26,8 +26,8 @@ use crate::datasource::{ }; use crate::error::{DataFusionError, Result}; use crate::logical_plan::plan::{ - Aggregate, AnalyzePlan, EmptyRelation, ExplainPlan, Filter, Join, Projection, Sort, - TableScanPlan, ToStringifiedPlan, Union, Window, + Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort, + TableScan, ToStringifiedPlan, Union, Window, }; use crate::prelude::*; use crate::scalar::ScalarValue; @@ -395,7 +395,7 @@ impl LogicalPlanBuilder { DFSchema::try_from_qualified_schema(&table_name, &schema) })?; - let table_scan = LogicalPlan::TableScan(TableScanPlan { + let table_scan = LogicalPlan::TableScan(TableScan { table_name, source: provider, projected_schema: Arc::new(projected_schema), @@ -699,7 +699,7 @@ impl LogicalPlanBuilder { let schema = schema.to_dfschema_ref()?; if analyze { - Ok(Self::from(LogicalPlan::Analyze(AnalyzePlan { + Ok(Self::from(LogicalPlan::Analyze(Analyze { verbose, input: Arc::new(self.plan.clone()), schema, @@ -708,7 +708,7 @@ impl LogicalPlanBuilder { let stringified_plans = vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)]; - Ok(Self::from(LogicalPlan::Explain(ExplainPlan { + Ok(Self::from(LogicalPlan::Explain(Explain { verbose, plan: Arc::new(self.plan.clone()), stringified_plans, diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs index 494501df0bb0..a20d57206749 100644 --- a/datafusion/src/logical_plan/mod.rs +++ b/datafusion/src/logical_plan/mod.rs @@ -53,7 +53,7 @@ pub use operators::Operator; pub use plan::{ CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, - Repartition, TableScanPlan, Union, Values, + Repartition, TableScan, Union, Values, }; pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan}; pub use registry::FunctionRegistry; diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 88bc7a28ae2d..952572f4dea3 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -100,7 +100,7 @@ pub struct Window { /// Produces rows from a table provider by reference or from the context #[derive(Clone)] -pub struct TableScanPlan { +pub struct TableScan { /// The name of the table pub table_name: String, /// The source of the table @@ -184,7 +184,7 @@ pub struct DropTable { /// Produces a relation with string representations of /// various parts of the plan #[derive(Clone)] -pub struct ExplainPlan { +pub struct Explain { /// Should extra (detailed, intermediate plans) be included? pub verbose: bool, /// The logical plan that is being EXPLAIN'd @@ -198,7 +198,7 @@ pub struct ExplainPlan { /// Runs the actual plan, and then prints the physical plan with /// with execution metrics. #[derive(Clone)] -pub struct AnalyzePlan { +pub struct Analyze { /// Should extra detail be included? pub verbose: bool, /// The logical plan that is being EXPLAIN ANALYZE'd @@ -209,7 +209,7 @@ pub struct AnalyzePlan { /// Extension operator defined outside of DataFusion #[derive(Clone)] -pub struct ExtensionPlan { +pub struct Extension { /// The runtime extension operator pub node: Arc, } @@ -322,7 +322,7 @@ pub enum LogicalPlan { /// Union multiple inputs Union(Union), /// Produces rows from a table provider by reference or from the context - TableScan(TableScanPlan), + TableScan(TableScan), /// Produces no rows: An empty relation with an empty schema EmptyRelation(EmptyRelation), /// Produces the first `n` tuples from its input and discards the rest. @@ -339,12 +339,12 @@ pub enum LogicalPlan { Values(Values), /// Produces a relation with string representations of /// various parts of the plan - Explain(ExplainPlan), + Explain(Explain), /// Runs the actual plan, and then prints the physical plan with /// with execution metrics. - Analyze(AnalyzePlan), + Analyze(Analyze), /// Extension operator defined outside of DataFusion - Extension(ExtensionPlan), + Extension(Extension), } impl LogicalPlan { @@ -353,7 +353,7 @@ impl LogicalPlan { match self { LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema, LogicalPlan::Values(Values { schema, .. }) => schema, - LogicalPlan::TableScan(TableScanPlan { + LogicalPlan::TableScan(TableScan { projected_schema, .. }) => projected_schema, LogicalPlan::Projection(Projection { schema, .. }) => schema, @@ -382,7 +382,7 @@ impl LogicalPlan { /// Get a vector of references to all schemas in every node of the logical plan pub fn all_schemas(&self) -> Vec<&DFSchemaRef> { match self { - LogicalPlan::TableScan(TableScanPlan { + LogicalPlan::TableScan(TableScan { projected_schema, .. }) => vec![projected_schema], LogicalPlan::Values(Values { schema, .. }) => vec![schema], @@ -413,8 +413,8 @@ impl LogicalPlan { vec![schema] } LogicalPlan::Extension(extension) => vec![extension.node.schema()], - LogicalPlan::Explain(ExplainPlan { schema, .. }) - | LogicalPlan::Analyze(AnalyzePlan { schema, .. }) + LogicalPlan::Explain(Explain { schema, .. }) + | LogicalPlan::Analyze(Analyze { schema, .. }) | LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) | LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => { vec![schema] @@ -865,7 +865,7 @@ impl LogicalPlan { write!(f, "Values: {}{}", str_values.join(", "), elipse) } - LogicalPlan::TableScan(TableScanPlan { + LogicalPlan::TableScan(TableScan { ref table_name, ref projection, ref filters, diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index 2c702971f86f..a11a15912d3d 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -18,7 +18,7 @@ use crate::datasource::datasource::TableProviderFilterPushDown; use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection}; use crate::logical_plan::{ - and, replace_col, Column, CrossJoin, Limit, LogicalPlan, TableScanPlan, + and, replace_col, Column, CrossJoin, Limit, LogicalPlan, TableScan, }; use crate::logical_plan::{DFSchema, Expr}; use crate::optimizer::optimizer::OptimizerRule; @@ -454,7 +454,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { optimize_join(state, plan, left, right) } - LogicalPlan::TableScan(TableScanPlan { + LogicalPlan::TableScan(TableScan { source, projected_schema, filters, @@ -490,7 +490,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { issue_filters( state, used_columns, - &LogicalPlan::TableScan(TableScanPlan { + &LogicalPlan::TableScan(TableScan { source: source.clone(), projection: projection.clone(), projected_schema: projected_schema.clone(), @@ -1177,7 +1177,7 @@ mod tests { ) -> Result { let test_provider = PushDownProvider { filter_support }; - let table_scan = LogicalPlan::TableScan(TableScanPlan { + let table_scan = LogicalPlan::TableScan(TableScan { table_name: "test".to_string(), filters: vec![], projected_schema: Arc::new(DFSchema::try_from( diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs index e2c65de13f9f..15d5093abed9 100644 --- a/datafusion/src/optimizer/limit_push_down.rs +++ b/datafusion/src/optimizer/limit_push_down.rs @@ -21,7 +21,7 @@ use super::utils; use crate::error::Result; use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::Projection; -use crate::logical_plan::{Limit, TableScanPlan}; +use crate::logical_plan::{Limit, TableScan}; use crate::logical_plan::{LogicalPlan, Union}; use crate::optimizer::optimizer::OptimizerRule; use std::sync::Arc; @@ -58,7 +58,7 @@ fn limit_push_down( })) } ( - LogicalPlan::TableScan(TableScanPlan { + LogicalPlan::TableScan(TableScan { table_name, source, projection, @@ -67,7 +67,7 @@ fn limit_push_down( projected_schema, }), Some(upper_limit), - ) => Ok(LogicalPlan::TableScan(TableScanPlan { + ) => Ok(LogicalPlan::TableScan(TableScan { table_name: table_name.clone(), source: source.clone(), projection: projection.clone(), diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 1b331ba65b29..f92ab653fd4e 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -21,7 +21,7 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{ - Aggregate, AnalyzePlan, Join, Projection, TableScanPlan, Window, + Aggregate, Analyze, Join, Projection, TableScan, Window, }; use crate::logical_plan::{ build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, @@ -330,7 +330,7 @@ fn optimize_plan( } // scans: // * remove un-used columns from the scan projection - LogicalPlan::TableScan(TableScanPlan { + LogicalPlan::TableScan(TableScan { table_name, source, filters, @@ -344,7 +344,7 @@ fn optimize_plan( has_projection, )?; // return the table scan with projection - Ok(LogicalPlan::TableScan(TableScanPlan { + Ok(LogicalPlan::TableScan(TableScan { table_name: table_name.clone(), source: source.clone(), projection: Some(projection), @@ -366,7 +366,7 @@ fn optimize_plan( .map(|f| f.qualified_column()) .collect::>(); - Ok(LogicalPlan::Analyze(AnalyzePlan { + Ok(LogicalPlan::Analyze(Analyze { input: Arc::new(optimize_plan( optimizer, &a.input, diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 60a81f084a42..39f8a73afeb2 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -24,7 +24,7 @@ use arrow::record_batch::RecordBatch; use super::optimizer::OptimizerRule; use crate::execution::context::{ExecutionContextState, ExecutionProps}; use crate::logical_plan::plan::{ - Aggregate, AnalyzePlan, ExtensionPlan, Filter, Join, Projection, Sort, Window, + Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, Window, }; use crate::logical_plan::{ build_join_schema, Column, CreateMemoryTable, DFSchema, DFSchemaRef, Expr, @@ -236,7 +236,7 @@ pub fn from_plan( name: name.clone(), })) } - LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(ExtensionPlan { + LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension { node: e.node.from_template(expr, inputs), })), LogicalPlan::Union(Union { schema, alias, .. }) => { @@ -249,7 +249,7 @@ pub fn from_plan( LogicalPlan::Analyze(a) => { assert!(expr.is_empty()); assert_eq!(inputs.len(), 1); - Ok(LogicalPlan::Analyze(AnalyzePlan { + Ok(LogicalPlan::Analyze(Analyze { verbose: a.verbose, schema: a.schema.clone(), input: Arc::new(inputs[0].clone()), diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index f2f526d9b85c..1302369886f8 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -24,7 +24,7 @@ use super::{ }; use crate::execution::context::ExecutionContextState; use crate::logical_plan::plan::{ - Aggregate, EmptyRelation, Filter, Join, Projection, Sort, TableScanPlan, Window, + Aggregate, EmptyRelation, Filter, Join, Projection, Sort, TableScan, Window, }; use crate::logical_plan::{ unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator, @@ -330,7 +330,7 @@ impl DefaultPhysicalPlanner { let batch_size = ctx_state.config.batch_size; let exec_plan: Result> = match logical_plan { - LogicalPlan::TableScan (TableScanPlan { + LogicalPlan::TableScan (TableScan { source, projection, filters, @@ -1460,7 +1460,7 @@ mod tests { use super::*; use crate::datasource::object_store::local::LocalFileSystem; use crate::execution::options::CsvReadOptions; - use crate::logical_plan::plan::ExtensionPlan; + use crate::logical_plan::plan::Extension; use crate::logical_plan::{DFField, DFSchema, DFSchemaRef}; use crate::physical_plan::{ expressions, DisplayFormatType, Partitioning, Statistics, @@ -1611,7 +1611,7 @@ mod tests { async fn default_extension_planner() { let ctx_state = make_ctx_state(); let planner = DefaultPhysicalPlanner::default(); - let logical_plan = LogicalPlan::Extension(ExtensionPlan { + let logical_plan = LogicalPlan::Extension(Extension { node: Arc::new(NoOpExtensionNode::default()), }); let plan = planner @@ -1640,7 +1640,7 @@ mod tests { BadExtensionPlanner {}, )]); - let logical_plan = LogicalPlan::Extension(ExtensionPlan { + let logical_plan = LogicalPlan::Extension(Extension { node: Arc::new(NoOpExtensionNode::default()), }); let plan = planner diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index ad84bdc11b70..ac3d3b176dcd 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -67,7 +67,7 @@ use super::{ }, }; use crate::logical_plan::builder::project_with_alias; -use crate::logical_plan::plan::{AnalyzePlan, ExplainPlan}; +use crate::logical_plan::plan::{Analyze, Explain}; /// The ContextProvider trait allows the query planner to obtain meta-data about tables and /// functions referenced in SQL statements @@ -331,7 +331,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let schema = schema.to_dfschema_ref()?; if analyze { - Ok(LogicalPlan::Analyze(AnalyzePlan { + Ok(LogicalPlan::Analyze(Analyze { verbose, input: plan, schema, @@ -339,7 +339,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } else { let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)]; - Ok(LogicalPlan::Explain(ExplainPlan { + Ok(LogicalPlan::Explain(Explain { verbose, plan, stringified_plans, diff --git a/datafusion/src/test/user_defined.rs b/datafusion/src/test/user_defined.rs index 394d865da1cb..19ed0580b91c 100644 --- a/datafusion/src/test/user_defined.rs +++ b/datafusion/src/test/user_defined.rs @@ -23,13 +23,13 @@ use std::{ sync::Arc, }; -use crate::logical_plan::plan::ExtensionPlan; +use crate::logical_plan::plan::Extension; use crate::logical_plan::{DFSchemaRef, Expr, LogicalPlan, UserDefinedLogicalNode}; /// Create a new user defined plan node, for testing pub fn new(input: LogicalPlan) -> LogicalPlan { let node = Arc::new(TestUserDefinedPlanNode { input }); - LogicalPlan::Extension(ExtensionPlan { node }) + LogicalPlan::Extension(Extension { node }) } struct TestUserDefinedPlanNode { diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs index a145ca3fdf88..b1288f7b5f63 100644 --- a/datafusion/tests/custom_sources.rs +++ b/datafusion/tests/custom_sources.rs @@ -31,7 +31,7 @@ use datafusion::{ use datafusion::execution::context::ExecutionContext; use datafusion::logical_plan::{ - col, Expr, LogicalPlan, LogicalPlanBuilder, TableScanPlan, UNNAMED_TABLE, + col, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, UNNAMED_TABLE, }; use datafusion::physical_plan::{ ColumnStatistics, ExecutionPlan, Partitioning, RecordBatchStream, @@ -218,7 +218,7 @@ async fn custom_source_dataframe() -> Result<()> { let optimized_plan = ctx.optimize(&logical_plan)?; match &optimized_plan { LogicalPlan::Projection(Projection { input, .. }) => match &**input { - LogicalPlan::TableScan(TableScanPlan { + LogicalPlan::TableScan(TableScan { source, projected_schema, .. diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 809018b86a58..9216bdb775bd 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -38,7 +38,7 @@ use datafusion::assert_contains; use datafusion::assert_not_contains; use datafusion::logical_plan::plan::{Aggregate, Projection}; use datafusion::logical_plan::LogicalPlan; -use datafusion::logical_plan::TableScanPlan; +use datafusion::logical_plan::TableScan; use datafusion::physical_plan::functions::Volatility; use datafusion::physical_plan::metrics::MetricValue; use datafusion::physical_plan::ExecutionPlan; @@ -93,7 +93,7 @@ async fn nyc() -> Result<()> { match &optimized_plan { LogicalPlan::Projection(Projection { input, .. }) => match input.as_ref() { LogicalPlan::Aggregate(Aggregate { input, .. }) => match input.as_ref() { - LogicalPlan::TableScan(TableScanPlan { + LogicalPlan::TableScan(TableScan { ref projected_schema, .. }) => { diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index d9319854719b..d3c6083adefb 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -86,7 +86,7 @@ use std::{any::Any, collections::BTreeMap, fmt, sync::Arc}; use async_trait::async_trait; use datafusion::execution::context::ExecutionProps; -use datafusion::logical_plan::plan::{ExtensionPlan, Sort}; +use datafusion::logical_plan::plan::{Extension, Sort}; use datafusion::logical_plan::{DFSchemaRef, Limit}; /// Execute the specified sql and return the resulting record batches @@ -296,7 +296,7 @@ impl OptimizerRule for TopKOptimizerRule { { if expr.len() == 1 { // we found a sort with a single sort expr, replace with a a TopK - return Ok(LogicalPlan::Extension(ExtensionPlan { + return Ok(LogicalPlan::Extension(Extension { node: Arc::new(TopKPlanNode { k: *n, input: self.optimize(input.as_ref(), execution_props)?,