Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract Explain, Analyze, Extension in LogicalPlan as independent struct #1317

Merged
merged 1 commit into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,24 +973,24 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
)),
})
}
LogicalPlan::Analyze { verbose, input, .. } => {
let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
LogicalPlan::Analyze(a) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to import the Analyze struct and use this code style

LogicalPlan::Analyze(Aanlyze{
 verbose, input,....
})

Copy link
Member Author

@xudong963 xudong963 Nov 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I don't have a strong desire which is better. When the struct has many variables, I'd like to use StructVar(s) => {s.v1, s.v2 ...} to keep code concise. If there are a few variables, it doesn't matter, both styles make sense to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree either style seems fine to me

let input: protobuf::LogicalPlanNode = a.input.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
protobuf::AnalyzeNode {
input: Some(Box::new(input)),
verbose: *verbose,
verbose: a.verbose,
},
))),
})
}
LogicalPlan::Explain { verbose, plan, .. } => {
let input: protobuf::LogicalPlanNode = plan.as_ref().try_into()?;
LogicalPlan::Explain(a) => {
let input: protobuf::LogicalPlanNode = a.plan.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
protobuf::ExplainNode {
input: Some(Box::new(input)),
verbose: *verbose,
verbose: a.verbose,
},
))),
})
Expand Down
53 changes: 22 additions & 31 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;

use crate::logical_plan::plan::ExplainPlan;
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udf::ScalarUDF;
use crate::physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -650,28 +651,23 @@ impl ExecutionContext {

/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
if let LogicalPlan::Explain {
verbose,
plan,
stringified_plans,
schema,
} = plan
{
let mut stringified_plans = stringified_plans.clone();
if let LogicalPlan::Explain(e) = plan {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comments like above.

let mut stringified_plans = e.stringified_plans.clone();

// optimize the child plan, capturing the output of each optimizer
let plan = self.optimize_internal(plan, |optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
stringified_plans.push(optimized_plan.to_stringified(plan_type));
})?;

Ok(LogicalPlan::Explain {
verbose: *verbose,
let plan =
self.optimize_internal(e.plan.as_ref(), |optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
stringified_plans.push(optimized_plan.to_stringified(plan_type));
})?;

Ok(LogicalPlan::Explain(ExplainPlan {
verbose: e.verbose,
plan: Arc::new(plan),
stringified_plans,
schema: schema.clone(),
})
schema: e.schema.clone(),
}))
} else {
self.optimize_internal(plan, |_, _| {})
}
Expand Down Expand Up @@ -1221,33 +1217,28 @@ mod tests {
.build()
.unwrap();

if let LogicalPlan::Explain {
stringified_plans, ..
} = &plan
{
assert_eq!(stringified_plans.len(), 1);
if let LogicalPlan::Explain(e) = &plan {
assert_eq!(e.stringified_plans.len(), 1);
} else {
panic!("plan was not an explain: {:?}", plan);
}

// now optimize the plan and expect to see more plans
let optimized_plan = ExecutionContext::new().optimize(&plan).unwrap();
if let LogicalPlan::Explain {
stringified_plans, ..
} = &optimized_plan
{
if let LogicalPlan::Explain(e) = &optimized_plan {
// should have more than one plan
assert!(
stringified_plans.len() > 1,
e.stringified_plans.len() > 1,
"plans: {:#?}",
stringified_plans
e.stringified_plans
);
// should have at least one optimized plan
let opt = stringified_plans
let opt = e
.stringified_plans
.iter()
.any(|p| matches!(p.plan_type, PlanType::OptimizedLogicalPlan { .. }));

assert!(opt, "plans: {:#?}", stringified_plans);
assert!(opt, "plans: {:#?}", e.stringified_plans);
} else {
panic!("plan was not an explain: {:?}", plan);
}
Expand Down
12 changes: 7 additions & 5 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use crate::datasource::{
MemTable, TableProvider,
};
use crate::error::{DataFusionError, Result};
use crate::logical_plan::plan::{TableScanPlan, ToStringifiedPlan};
use crate::logical_plan::plan::{
AnalyzePlan, ExplainPlan, TableScanPlan, ToStringifiedPlan,
};
use crate::prelude::*;
use crate::scalar::ScalarValue;
use arrow::{
Expand Down Expand Up @@ -696,21 +698,21 @@ impl LogicalPlanBuilder {
let schema = schema.to_dfschema_ref()?;

if analyze {
Ok(Self::from(LogicalPlan::Analyze {
Ok(Self::from(LogicalPlan::Analyze(AnalyzePlan {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code style is better than above style.
This is same with this pull request #1290

verbose,
input: Arc::new(self.plan.clone()),
schema,
}))
})))
} else {
let stringified_plans =
vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];

Ok(Self::from(LogicalPlan::Explain {
Ok(Self::from(LogicalPlan::Explain(ExplainPlan {
verbose,
plan: Arc::new(self.plan.clone()),
stringified_plans,
schema,
}))
})))
}
}

Expand Down
88 changes: 51 additions & 37 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,39 @@ pub struct TableScanPlan {
pub limit: Option<usize>,
}

/// Produces a relation with string representations of
/// various parts of the plan
#[derive(Clone)]
pub struct ExplainPlan {
/// Should extra (detailed, intermediate plans) be included?
pub verbose: bool,
/// The logical plan that is being EXPLAIN'd
pub plan: Arc<LogicalPlan>,
/// Represent the various stages plans have gone through
pub stringified_plans: Vec<StringifiedPlan>,
/// The output schema of the explain (2 columns of text)
pub schema: DFSchemaRef,
}

/// Runs the actual plan, and then prints the physical plan with
/// with execution metrics.
#[derive(Clone)]
pub struct AnalyzePlan {
/// Should extra detail be included?
pub verbose: bool,
/// The logical plan that is being EXPLAIN ANALYZE'd
pub input: Arc<LogicalPlan>,
/// The output schema of the explain (2 columns of text)
pub schema: DFSchemaRef,
}

/// Extension operator defined outside of DataFusion
#[derive(Clone)]
pub struct ExtensionPlan {
/// The runtime extension operator
pub node: Arc<dyn UserDefinedLogicalNode + Send + Sync>,
}

/// A LogicalPlan represents the different types of relational
/// operators (such as Projection, Filter, etc) and can be created by
/// the SQL query planner and the DataFrame API.
Expand Down Expand Up @@ -236,31 +269,12 @@ pub enum LogicalPlan {
},
/// Produces a relation with string representations of
/// various parts of the plan
Explain {
/// Should extra (detailed, intermediate plans) be included?
verbose: bool,
/// The logical plan that is being EXPLAIN'd
plan: Arc<LogicalPlan>,
/// Represent the various stages plans have gone through
stringified_plans: Vec<StringifiedPlan>,
/// The output schema of the explain (2 columns of text)
schema: DFSchemaRef,
},
Explain(ExplainPlan),
/// Runs the actual plan, and then prints the physical plan with
/// with execution metrics.
Analyze {
/// Should extra detail be included?
verbose: bool,
/// The logical plan that is being EXPLAIN ANALYZE'd
input: Arc<LogicalPlan>,
/// The output schema of the explain (2 columns of text)
schema: DFSchemaRef,
},
Analyze(AnalyzePlan),
/// Extension operator defined outside of DataFusion
Extension {
/// The runtime extension operator
node: Arc<dyn UserDefinedLogicalNode + Send + Sync>,
},
Extension(ExtensionPlan),
}

impl LogicalPlan {
Expand All @@ -282,9 +296,9 @@ impl LogicalPlan {
LogicalPlan::Repartition { input, .. } => input.schema(),
LogicalPlan::Limit { input, .. } => input.schema(),
LogicalPlan::CreateExternalTable { schema, .. } => schema,
LogicalPlan::Explain { schema, .. } => schema,
LogicalPlan::Analyze { schema, .. } => schema,
LogicalPlan::Extension { node } => node.schema(),
LogicalPlan::Explain(explain) => &explain.schema,
LogicalPlan::Analyze(analyze) => &analyze.schema,
LogicalPlan::Extension(extension) => extension.node.schema(),
LogicalPlan::Union { schema, .. } => schema,
LogicalPlan::CreateMemoryTable { input, .. } => input.schema(),
LogicalPlan::DropTable { schema, .. } => schema,
Expand Down Expand Up @@ -324,9 +338,9 @@ impl LogicalPlan {
LogicalPlan::Union { schema, .. } => {
vec![schema]
}
LogicalPlan::Extension { node } => vec![node.schema()],
LogicalPlan::Explain { schema, .. }
| LogicalPlan::Analyze { schema, .. }
LogicalPlan::Extension(extension) => vec![extension.node.schema()],
LogicalPlan::Explain(ExplainPlan { schema, .. })
| LogicalPlan::Analyze(AnalyzePlan { schema, .. })
| LogicalPlan::EmptyRelation { schema, .. }
| LogicalPlan::CreateExternalTable { schema, .. } => vec![schema],
LogicalPlan::Limit { input, .. }
Expand Down Expand Up @@ -374,7 +388,7 @@ impl LogicalPlan {
.flat_map(|(l, r)| vec![Expr::Column(l.clone()), Expr::Column(r.clone())])
.collect(),
LogicalPlan::Sort { expr, .. } => expr.clone(),
LogicalPlan::Extension { node } => node.expressions(),
LogicalPlan::Extension(extension) => extension.node.expressions(),
// plans without expressions
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
Expand Down Expand Up @@ -404,10 +418,10 @@ impl LogicalPlan {
LogicalPlan::Join { left, right, .. } => vec![left, right],
LogicalPlan::CrossJoin { left, right, .. } => vec![left, right],
LogicalPlan::Limit { input, .. } => vec![input],
LogicalPlan::Extension { node } => node.inputs(),
LogicalPlan::Extension(extension) => extension.node.inputs(),
LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
LogicalPlan::Explain { plan, .. } => vec![plan],
LogicalPlan::Analyze { input: plan, .. } => vec![plan],
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
LogicalPlan::CreateMemoryTable { input, .. } => vec![input],
// plans without inputs
LogicalPlan::TableScan { .. }
Expand Down Expand Up @@ -548,16 +562,16 @@ impl LogicalPlan {
}
LogicalPlan::Limit { input, .. } => input.accept(visitor)?,
LogicalPlan::CreateMemoryTable { input, .. } => input.accept(visitor)?,
LogicalPlan::Extension { node } => {
for input in node.inputs() {
LogicalPlan::Extension(extension) => {
for input in extension.node.inputs() {
if !input.accept(visitor)? {
return Ok(false);
}
}
true
}
LogicalPlan::Explain { plan, .. } => plan.accept(visitor)?,
LogicalPlan::Analyze { input: plan, .. } => plan.accept(visitor)?,
LogicalPlan::Explain(explain) => explain.plan.accept(visitor)?,
LogicalPlan::Analyze(analyze) => analyze.input.accept(visitor)?,
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
Expand Down Expand Up @@ -889,7 +903,7 @@ impl LogicalPlan {
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union { .. } => write!(f, "Union"),
LogicalPlan::Extension { ref node } => node.fmt_for_explain(f),
LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
}
}
}
Expand Down
21 changes: 9 additions & 12 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::TableScanPlan;
use crate::logical_plan::plan::{AnalyzePlan, TableScanPlan};
use crate::logical_plan::{
build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
LogicalPlanBuilder, ToDFSchema,
Expand Down Expand Up @@ -355,30 +355,27 @@ fn optimize_plan(
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
)),
LogicalPlan::Analyze {
input,
verbose,
schema,
} => {
LogicalPlan::Analyze(a) => {
// make sure we keep all the columns from the input plan
let required_columns = input
let required_columns = a
.input
.schema()
.fields()
.iter()
.map(|f| f.qualified_column())
.collect::<HashSet<Column>>();

Ok(LogicalPlan::Analyze {
Ok(LogicalPlan::Analyze(AnalyzePlan {
input: Arc::new(optimize_plan(
optimizer,
input,
&a.input,
&required_columns,
false,
execution_props,
)?),
verbose: *verbose,
schema: schema.clone(),
})
verbose: a.verbose,
schema: a.schema.clone(),
}))
}
LogicalPlan::Union {
inputs,
Expand Down
Loading