Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract logical plan: rename the plan name (follow up) #1354

Merged
merged 1 commit into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion::dataframe::DataFrame;
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::dataframe_impl::DataFrameImpl;
use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScanPlan};
use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScan};
use datafusion::prelude::{AvroReadOptions, CsvReadOptions};
use datafusion::sql::parser::FileType;

Expand Down Expand Up @@ -212,7 +212,7 @@ impl BallistaContext {
options: CsvReadOptions<'_>,
) -> Result<()> {
match self.read_csv(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan(TableScanPlan { source, .. }) => {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source)
}
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
Expand All @@ -221,7 +221,7 @@ impl BallistaContext {

pub async fn register_parquet(&self, name: &str, path: &str) -> Result<()> {
match self.read_parquet(path).await?.to_logical_plan() {
LogicalPlan::TableScan(TableScanPlan { source, .. }) => {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source)
}
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
Expand All @@ -235,7 +235,7 @@ impl BallistaContext {
options: AvroReadOptions<'_>,
) -> Result<()> {
match self.read_avro(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan(TableScanPlan { source, .. }) => {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source)
}
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use datafusion::logical_plan::{
exprlist_to_fields,
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, JoinType, Limit,
LogicalPlan, Repartition, TableScanPlan, Values,
LogicalPlan, Repartition, TableScan, Values,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::functions::BuiltinScalarFunction;
Expand Down Expand Up @@ -699,7 +699,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
)),
})
}
LogicalPlan::TableScan(TableScanPlan {
LogicalPlan::TableScan(TableScan {
table_name,
source,
filters,
Expand Down
10 changes: 5 additions & 5 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;

use crate::logical_plan::plan::ExplainPlan;
use crate::logical_plan::plan::Explain;
use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy;
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udf::ScalarUDF;
Expand Down Expand Up @@ -664,7 +664,7 @@ impl ExecutionContext {
stringified_plans.push(optimized_plan.to_stringified(plan_type));
})?;

Ok(LogicalPlan::Explain(ExplainPlan {
Ok(LogicalPlan::Explain(Explain {
verbose: e.verbose,
plan: Arc::new(plan),
stringified_plans,
Expand Down Expand Up @@ -1178,7 +1178,7 @@ impl FunctionRegistry for ExecutionContextState {
mod tests {
use super::*;
use crate::logical_plan::plan::Projection;
use crate::logical_plan::TableScanPlan;
use crate::logical_plan::TableScan;
use crate::logical_plan::{binary_expr, lit, Operator};
use crate::physical_plan::functions::{make_scalar_function, Volatility};
use crate::physical_plan::{collect, collect_partitioned};
Expand Down Expand Up @@ -1417,7 +1417,7 @@ mod tests {
let optimized_plan = ctx.optimize(&logical_plan)?;
match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScanPlan {
LogicalPlan::TableScan(TableScan {
source,
projected_schema,
..
Expand Down Expand Up @@ -1490,7 +1490,7 @@ mod tests {
let optimized_plan = ctx.optimize(&plan)?;
match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScanPlan {
LogicalPlan::TableScan(TableScan {
source,
projected_schema,
..
Expand Down
10 changes: 5 additions & 5 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use crate::datasource::{
};
use crate::error::{DataFusionError, Result};
use crate::logical_plan::plan::{
Aggregate, AnalyzePlan, EmptyRelation, ExplainPlan, Filter, Join, Projection, Sort,
TableScanPlan, ToStringifiedPlan, Union, Window,
Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort,
TableScan, ToStringifiedPlan, Union, Window,
};
use crate::prelude::*;
use crate::scalar::ScalarValue;
Expand Down Expand Up @@ -395,7 +395,7 @@ impl LogicalPlanBuilder {
DFSchema::try_from_qualified_schema(&table_name, &schema)
})?;

let table_scan = LogicalPlan::TableScan(TableScanPlan {
let table_scan = LogicalPlan::TableScan(TableScan {
table_name,
source: provider,
projected_schema: Arc::new(projected_schema),
Expand Down Expand Up @@ -699,7 +699,7 @@ impl LogicalPlanBuilder {
let schema = schema.to_dfschema_ref()?;

if analyze {
Ok(Self::from(LogicalPlan::Analyze(AnalyzePlan {
Ok(Self::from(LogicalPlan::Analyze(Analyze {
verbose,
input: Arc::new(self.plan.clone()),
schema,
Expand All @@ -708,7 +708,7 @@ impl LogicalPlanBuilder {
let stringified_plans =
vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];

Ok(Self::from(LogicalPlan::Explain(ExplainPlan {
Ok(Self::from(LogicalPlan::Explain(Explain {
verbose,
plan: Arc::new(self.plan.clone()),
stringified_plans,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub use operators::Operator;
pub use plan::{
CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable, EmptyRelation,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor,
Repartition, TableScanPlan, Union, Values,
Repartition, TableScan, Union, Values,
};
pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan};
pub use registry::FunctionRegistry;
26 changes: 13 additions & 13 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub struct Window {

/// Produces rows from a table provider by reference or from the context
#[derive(Clone)]
pub struct TableScanPlan {
pub struct TableScan {
/// The name of the table
pub table_name: String,
/// The source of the table
Expand Down Expand Up @@ -184,7 +184,7 @@ pub struct DropTable {
/// Produces a relation with string representations of
/// various parts of the plan
#[derive(Clone)]
pub struct ExplainPlan {
pub struct Explain {
/// Should extra (detailed, intermediate plans) be included?
pub verbose: bool,
/// The logical plan that is being EXPLAIN'd
Expand All @@ -198,7 +198,7 @@ pub struct ExplainPlan {
/// Runs the actual plan, and then prints the physical plan with
/// with execution metrics.
#[derive(Clone)]
pub struct AnalyzePlan {
pub struct Analyze {
/// Should extra detail be included?
pub verbose: bool,
/// The logical plan that is being EXPLAIN ANALYZE'd
Expand All @@ -209,7 +209,7 @@ pub struct AnalyzePlan {

/// Extension operator defined outside of DataFusion
#[derive(Clone)]
pub struct ExtensionPlan {
pub struct Extension {
/// The runtime extension operator
pub node: Arc<dyn UserDefinedLogicalNode + Send + Sync>,
}
Expand Down Expand Up @@ -322,7 +322,7 @@ pub enum LogicalPlan {
/// Union multiple inputs
Union(Union),
/// Produces rows from a table provider by reference or from the context
TableScan(TableScanPlan),
TableScan(TableScan),
/// Produces no rows: An empty relation with an empty schema
EmptyRelation(EmptyRelation),
/// Produces the first `n` tuples from its input and discards the rest.
Expand All @@ -339,12 +339,12 @@ pub enum LogicalPlan {
Values(Values),
/// Produces a relation with string representations of
/// various parts of the plan
Explain(ExplainPlan),
Explain(Explain),
/// Runs the actual plan, and then prints the physical plan with
/// with execution metrics.
Analyze(AnalyzePlan),
Analyze(Analyze),
/// Extension operator defined outside of DataFusion
Extension(ExtensionPlan),
Extension(Extension),
}

impl LogicalPlan {
Expand All @@ -353,7 +353,7 @@ impl LogicalPlan {
match self {
LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
LogicalPlan::Values(Values { schema, .. }) => schema,
LogicalPlan::TableScan(TableScanPlan {
LogicalPlan::TableScan(TableScan {
projected_schema, ..
}) => projected_schema,
LogicalPlan::Projection(Projection { schema, .. }) => schema,
Expand Down Expand Up @@ -382,7 +382,7 @@ impl LogicalPlan {
/// Get a vector of references to all schemas in every node of the logical plan
pub fn all_schemas(&self) -> Vec<&DFSchemaRef> {
match self {
LogicalPlan::TableScan(TableScanPlan {
LogicalPlan::TableScan(TableScan {
projected_schema, ..
}) => vec![projected_schema],
LogicalPlan::Values(Values { schema, .. }) => vec![schema],
Expand Down Expand Up @@ -413,8 +413,8 @@ impl LogicalPlan {
vec![schema]
}
LogicalPlan::Extension(extension) => vec![extension.node.schema()],
LogicalPlan::Explain(ExplainPlan { schema, .. })
| LogicalPlan::Analyze(AnalyzePlan { schema, .. })
LogicalPlan::Explain(Explain { schema, .. })
| LogicalPlan::Analyze(Analyze { schema, .. })
| LogicalPlan::EmptyRelation(EmptyRelation { schema, .. })
| LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
vec![schema]
Expand Down Expand Up @@ -865,7 +865,7 @@ impl LogicalPlan {
write!(f, "Values: {}{}", str_values.join(", "), elipse)
}

LogicalPlan::TableScan(TableScanPlan {
LogicalPlan::TableScan(TableScan {
ref table_name,
ref projection,
ref filters,
Expand Down
8 changes: 4 additions & 4 deletions datafusion/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection};
use crate::logical_plan::{
and, replace_col, Column, CrossJoin, Limit, LogicalPlan, TableScanPlan,
and, replace_col, Column, CrossJoin, Limit, LogicalPlan, TableScan,
};
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
Expand Down Expand Up @@ -454,7 +454,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {

optimize_join(state, plan, left, right)
}
LogicalPlan::TableScan(TableScanPlan {
LogicalPlan::TableScan(TableScan {
source,
projected_schema,
filters,
Expand Down Expand Up @@ -490,7 +490,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
issue_filters(
state,
used_columns,
&LogicalPlan::TableScan(TableScanPlan {
&LogicalPlan::TableScan(TableScan {
source: source.clone(),
projection: projection.clone(),
projected_schema: projected_schema.clone(),
Expand Down Expand Up @@ -1177,7 +1177,7 @@ mod tests {
) -> Result<LogicalPlan> {
let test_provider = PushDownProvider { filter_support };

let table_scan = LogicalPlan::TableScan(TableScanPlan {
let table_scan = LogicalPlan::TableScan(TableScan {
table_name: "test".to_string(),
filters: vec![],
projected_schema: Arc::new(DFSchema::try_from(
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/optimizer/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::utils;
use crate::error::Result;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::Projection;
use crate::logical_plan::{Limit, TableScanPlan};
use crate::logical_plan::{Limit, TableScan};
use crate::logical_plan::{LogicalPlan, Union};
use crate::optimizer::optimizer::OptimizerRule;
use std::sync::Arc;
Expand Down Expand Up @@ -58,7 +58,7 @@ fn limit_push_down(
}))
}
(
LogicalPlan::TableScan(TableScanPlan {
LogicalPlan::TableScan(TableScan {
table_name,
source,
projection,
Expand All @@ -67,7 +67,7 @@ fn limit_push_down(
projected_schema,
}),
Some(upper_limit),
) => Ok(LogicalPlan::TableScan(TableScanPlan {
) => Ok(LogicalPlan::TableScan(TableScan {
table_name: table_name.clone(),
source: source.clone(),
projection: projection.clone(),
Expand Down
8 changes: 4 additions & 4 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{
Aggregate, AnalyzePlan, Join, Projection, TableScanPlan, Window,
Aggregate, Analyze, Join, Projection, TableScan, Window,
};
use crate::logical_plan::{
build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
Expand Down Expand Up @@ -330,7 +330,7 @@ fn optimize_plan(
}
// scans:
// * remove un-used columns from the scan projection
LogicalPlan::TableScan(TableScanPlan {
LogicalPlan::TableScan(TableScan {
table_name,
source,
filters,
Expand All @@ -344,7 +344,7 @@ fn optimize_plan(
has_projection,
)?;
// return the table scan with projection
Ok(LogicalPlan::TableScan(TableScanPlan {
Ok(LogicalPlan::TableScan(TableScan {
table_name: table_name.clone(),
source: source.clone(),
projection: Some(projection),
Expand All @@ -366,7 +366,7 @@ fn optimize_plan(
.map(|f| f.qualified_column())
.collect::<HashSet<Column>>();

Ok(LogicalPlan::Analyze(AnalyzePlan {
Ok(LogicalPlan::Analyze(Analyze {
input: Arc::new(optimize_plan(
optimizer,
&a.input,
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow::record_batch::RecordBatch;
use super::optimizer::OptimizerRule;
use crate::execution::context::{ExecutionContextState, ExecutionProps};
use crate::logical_plan::plan::{
Aggregate, AnalyzePlan, ExtensionPlan, Filter, Join, Projection, Sort, Window,
Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, Window,
};
use crate::logical_plan::{
build_join_schema, Column, CreateMemoryTable, DFSchema, DFSchemaRef, Expr,
Expand Down Expand Up @@ -236,7 +236,7 @@ pub fn from_plan(
name: name.clone(),
}))
}
LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(ExtensionPlan {
LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
node: e.node.from_template(expr, inputs),
})),
LogicalPlan::Union(Union { schema, alias, .. }) => {
Expand All @@ -249,7 +249,7 @@ pub fn from_plan(
LogicalPlan::Analyze(a) => {
assert!(expr.is_empty());
assert_eq!(inputs.len(), 1);
Ok(LogicalPlan::Analyze(AnalyzePlan {
Ok(LogicalPlan::Analyze(Analyze {
verbose: a.verbose,
schema: a.schema.clone(),
input: Arc::new(inputs[0].clone()),
Expand Down
Loading