Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix subquery alias #1067

Merged
merged 1 commit into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ message AvroTableScanNode {
message ProjectionNode {
LogicalPlanNode input = 1;
repeated LogicalExprNode expr = 2;
oneof optional_alias {
string alias = 3;
}
}

message SelectionNode {
Expand Down
9 changes: 8 additions & 1 deletion ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?;
LogicalPlanBuilder::from(input)
.project(x)?
.project_with_alias(
x,
projection.optional_alias.as_ref().map(|a| match a {
protobuf::projection_node::OptionalAlias::Alias(alias) => {
alias.clone()
}
}),
)?
.build()
.map_err(|e| e.into())
}
Expand Down
30 changes: 17 additions & 13 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,19 +801,23 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
)))
}
}
LogicalPlan::Projection { expr, input, .. } => {
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
protobuf::ProjectionNode {
input: Some(Box::new(input.as_ref().try_into()?)),
expr: expr
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, BallistaError>>()?,
},
))),
})
}
LogicalPlan::Projection {
expr, input, alias, ..
} => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
protobuf::ProjectionNode {
input: Some(Box::new(input.as_ref().try_into()?)),
expr: expr.iter().map(|expr| expr.try_into()).collect::<Result<
Vec<_>,
BallistaError,
>>(
)?,
optional_alias: alias
.clone()
.map(protobuf::projection_node::OptionalAlias::Alias),
},
))),
}),
LogicalPlan::Filter { predicate, input } => {
let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
Expand Down
16 changes: 8 additions & 8 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2047,7 +2047,7 @@ mod tests {
async fn join_partitioned() -> Result<()> {
// self join on partition id (workaround for duplicate column name)
let results = execute(
"SELECT 1 FROM test JOIN (SELECT c1 AS id1 FROM test) ON c1=id1",
"SELECT 1 FROM test JOIN (SELECT c1 AS id1 FROM test) AS a ON c1=id1",
4,
)
.await?;
Expand Down Expand Up @@ -2080,7 +2080,7 @@ mod tests {
let results = plan_and_collect(
&mut ctx,
"SELECT * FROM t as t1 \
JOIN (SELECT * FROM t as t2) \
JOIN (SELECT * FROM t) as t2 \
ON t1.nanos = t2.nanos",
)
.await
Expand All @@ -2090,7 +2090,7 @@ mod tests {
let results = plan_and_collect(
&mut ctx,
"SELECT * FROM t as t1 \
JOIN (SELECT * FROM t as t2) \
JOIN (SELECT * FROM t) as t2 \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

ON t1.micros = t2.micros",
)
.await
Expand All @@ -2100,7 +2100,7 @@ mod tests {
let results = plan_and_collect(
&mut ctx,
"SELECT * FROM t as t1 \
JOIN (SELECT * FROM t as t2) \
JOIN (SELECT * FROM t) as t2 \
ON t1.millis = t2.millis",
)
.await
Expand Down Expand Up @@ -2967,12 +2967,12 @@ mod tests {
#[tokio::test]
async fn ctx_sql_should_optimize_plan() -> Result<()> {
let mut ctx = ExecutionContext::new();
let plan1 =
ctx.create_logical_plan("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?;
let plan1 = ctx
.create_logical_plan("SELECT * FROM (SELECT 1) AS one WHERE TRUE AND TRUE")?;

let opt_plan1 = ctx.optimize(&plan1)?;

let plan2 = ctx.sql("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?;
let plan2 = ctx.sql("SELECT * FROM (SELECT 1) AS one WHERE TRUE AND TRUE")?;

assert_eq!(
format!("{:?}", opt_plan1),
Expand Down Expand Up @@ -3727,7 +3727,7 @@ mod tests {
SELECT i, 'a' AS cat FROM catalog_a.schema_a.table_a
UNION ALL
SELECT i, 'b' AS cat FROM catalog_b.schema_b.table_b
)
) AS all
GROUP BY cat
ORDER BY cat
",
Expand Down
78 changes: 49 additions & 29 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,34 +236,22 @@ impl LogicalPlanBuilder {
Ok(Self::from(table_scan))
}

/// Apply a projection.
///
/// # Errors
/// This function errors under any of the following conditions:
/// * Two or more expressions have the same name
/// * An invalid expression is used (e.g. a `sort` expression)
/// Apply a projection without alias.
pub fn project(&self, expr: impl IntoIterator<Item = Expr>) -> Result<Self> {
let input_schema = self.plan.schema();
let mut projected_expr = vec![];
for e in expr {
match e {
Expr::Wildcard => {
projected_expr.extend(expand_wildcard(input_schema, &self.plan)?)
}
_ => projected_expr
.push(columnize_expr(normalize_col(e, &self.plan)?, input_schema)),
}
}

validate_unique_names("Projections", projected_expr.iter(), input_schema)?;

let schema = DFSchema::new(exprlist_to_fields(&projected_expr, input_schema)?)?;
self.project_with_alias(expr, None)
}

Ok(Self::from(LogicalPlan::Projection {
expr: projected_expr,
input: Arc::new(self.plan.clone()),
schema: DFSchemaRef::new(schema),
}))
/// Apply a projection with alias
pub fn project_with_alias(
&self,
expr: impl IntoIterator<Item = Expr>,
alias: Option<String>,
) -> Result<Self> {
Ok(Self::from(project_with_alias(
self.plan.clone(),
expr,
alias,
)?))
}

/// Apply a filter
Expand Down Expand Up @@ -477,12 +465,9 @@ impl LogicalPlanBuilder {
let group_expr = normalize_cols(group_expr, &self.plan)?;
let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
let all_expr = group_expr.iter().chain(aggr_expr.iter());

validate_unique_names("Aggregations", all_expr.clone(), self.plan.schema())?;

let aggr_schema =
DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?;

Ok(Self::from(LogicalPlan::Aggregate {
input: Arc::new(self.plan.clone()),
group_expr,
Expand Down Expand Up @@ -615,6 +600,41 @@ pub fn union_with_alias(
})
}

/// Project with optional alias
/// # Errors
/// This function errors under any of the following conditions:
/// * Two or more expressions have the same name
/// * An invalid expression is used (e.g. a `sort` expression)
pub fn project_with_alias(
plan: LogicalPlan,
expr: impl IntoIterator<Item = Expr>,
alias: Option<String>,
) -> Result<LogicalPlan> {
let input_schema = plan.schema();
let mut projected_expr = vec![];
for e in expr {
match e {
Expr::Wildcard => {
projected_expr.extend(expand_wildcard(input_schema, &plan)?)
}
_ => projected_expr
.push(columnize_expr(normalize_col(e, &plan)?, input_schema)),
}
}
validate_unique_names("Projections", projected_expr.iter(), input_schema)?;
let input_schema = DFSchema::new(exprlist_to_fields(&projected_expr, input_schema)?)?;
let schema = match alias {
Some(ref alias) => input_schema.replace_qualifier(alias.as_str()),
None => input_schema,
};
Ok(LogicalPlan::Projection {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

expr: projected_expr,
input: Arc::new(plan.clone()),
schema: DFSchemaRef::new(schema),
alias,
})
}

/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
pub(crate) fn expand_wildcard(
schema: &DFSchema,
Expand Down
9 changes: 8 additions & 1 deletion datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub enum LogicalPlan {
input: Arc<LogicalPlan>,
/// The schema description of the output
schema: DFSchemaRef,
/// Projection output relation alias
alias: Option<String>,
},
/// Filters rows from its input that do not match an
/// expression (essentially a WHERE clause with a predicate
Expand Down Expand Up @@ -723,14 +725,19 @@ impl LogicalPlan {

Ok(())
}
LogicalPlan::Projection { ref expr, .. } => {
LogicalPlan::Projection {
ref expr, alias, ..
} => {
write!(f, "Projection: ")?;
for (i, expr_item) in expr.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{:?}", expr_item)?;
}
if let Some(a) = alias {
write!(f, ", alias={}", a)?;
}
Ok(())
}
LogicalPlan::Filter {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
expr,
input,
schema,
alias,
} => {
let mut arrays = vec![];
for e in expr {
Expand All @@ -103,6 +104,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
expr: new_expr.pop().unwrap(),
input: Arc::new(new_input),
schema: schema.clone(),
alias: alias.clone(),
})
}
LogicalPlan::Filter { predicate, input } => {
Expand Down Expand Up @@ -278,6 +280,7 @@ fn build_project_plan(
expr: project_exprs,
input: Arc::new(input),
schema: Arc::new(schema),
alias: None,
})
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
input,
expr,
schema,
alias: _,
} => {
// A projection is filter-commutable, but re-writes all predicate expressions
// collect projection.
Expand Down
2 changes: 2 additions & 0 deletions datafusion/src/optimizer/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ fn limit_push_down(
expr,
input,
schema,
alias,
},
upper_limit,
) => {
Expand All @@ -93,6 +94,7 @@ fn limit_push_down(
execution_props,
)?),
schema: schema.clone(),
alias: alias.clone(),
})
}
(
Expand Down
3 changes: 3 additions & 0 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ fn optimize_plan(
input,
expr,
schema,
alias,
} => {
// projection:
// * remove any expression that is not required
Expand Down Expand Up @@ -190,6 +191,7 @@ fn optimize_plan(
expr: new_expr,
input: Arc::new(new_input),
schema: DFSchemaRef::new(DFSchema::new(new_fields)?),
alias: alias.clone(),
})
}
}
Expand Down Expand Up @@ -744,6 +746,7 @@ mod tests {
expr,
input: Arc::new(table_scan),
schema: Arc::new(projected_schema),
alias: None,
};

assert_fields_eq(&plan, vec!["a", "b"]);
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ pub fn from_plan(
inputs: &[LogicalPlan],
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Projection { schema, .. } => Ok(LogicalPlan::Projection {
LogicalPlan::Projection { schema, alias, .. } => Ok(LogicalPlan::Projection {
expr: expr.to_vec(),
input: Arc::new(inputs[0].clone()),
schema: schema.clone(),
alias: alias.clone(),
}),
LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter {
predicate: expr[0].clone(),
Expand Down
Loading