Skip to content

Commit 6a79299

Browse files
committed
fix alias
1 parent a8dedc8 commit 6a79299

File tree

15 files changed

+214
-118
lines changed

15 files changed

+214
-118
lines changed

ballista/rust/core/proto/ballista.proto

+3
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,9 @@ message AvroTableScanNode {
311311
message ProjectionNode {
312312
LogicalPlanNode input = 1;
313313
repeated LogicalExprNode expr = 2;
314+
oneof optional_alias {
315+
string alias = 3;
316+
}
314317
}
315318

316319
message SelectionNode {

ballista/rust/core/src/serde/logical_plan/from_proto.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
6363
.map(|expr| expr.try_into())
6464
.collect::<Result<Vec<_>, _>>()?;
6565
LogicalPlanBuilder::from(input)
66-
.project(x)?
66+
.project_with_alias(
67+
x,
68+
projection.optional_alias.as_ref().map(|a| match a {
69+
protobuf::projection_node::OptionalAlias::Alias(alias) => {
70+
alias.clone()
71+
}
72+
}),
73+
)?
6774
.build()
6875
.map_err(|e| e.into())
6976
}

ballista/rust/core/src/serde/logical_plan/to_proto.rs

+17-13
Original file line numberDiff line numberDiff line change
@@ -801,19 +801,23 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
801801
)))
802802
}
803803
}
804-
LogicalPlan::Projection { expr, input, .. } => {
805-
Ok(protobuf::LogicalPlanNode {
806-
logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
807-
protobuf::ProjectionNode {
808-
input: Some(Box::new(input.as_ref().try_into()?)),
809-
expr: expr
810-
.iter()
811-
.map(|expr| expr.try_into())
812-
.collect::<Result<Vec<_>, BallistaError>>()?,
813-
},
814-
))),
815-
})
816-
}
804+
LogicalPlan::Projection {
805+
expr, input, alias, ..
806+
} => Ok(protobuf::LogicalPlanNode {
807+
logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
808+
protobuf::ProjectionNode {
809+
input: Some(Box::new(input.as_ref().try_into()?)),
810+
expr: expr.iter().map(|expr| expr.try_into()).collect::<Result<
811+
Vec<_>,
812+
BallistaError,
813+
>>(
814+
)?,
815+
optional_alias: alias
816+
.clone()
817+
.map(protobuf::projection_node::OptionalAlias::Alias),
818+
},
819+
))),
820+
}),
817821
LogicalPlan::Filter { predicate, input } => {
818822
let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
819823
Ok(protobuf::LogicalPlanNode {

datafusion/src/execution/context.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -2047,7 +2047,7 @@ mod tests {
20472047
async fn join_partitioned() -> Result<()> {
20482048
// self join on partition id (workaround for duplicate column name)
20492049
let results = execute(
2050-
"SELECT 1 FROM test JOIN (SELECT c1 AS id1 FROM test) ON c1=id1",
2050+
"SELECT 1 FROM test JOIN (SELECT c1 AS id1 FROM test) AS a ON c1=id1",
20512051
4,
20522052
)
20532053
.await?;
@@ -2080,7 +2080,7 @@ mod tests {
20802080
let results = plan_and_collect(
20812081
&mut ctx,
20822082
"SELECT * FROM t as t1 \
2083-
JOIN (SELECT * FROM t as t2) \
2083+
JOIN (SELECT * FROM t) as t2 \
20842084
ON t1.nanos = t2.nanos",
20852085
)
20862086
.await
@@ -2090,7 +2090,7 @@ mod tests {
20902090
let results = plan_and_collect(
20912091
&mut ctx,
20922092
"SELECT * FROM t as t1 \
2093-
JOIN (SELECT * FROM t as t2) \
2093+
JOIN (SELECT * FROM t) as t2 \
20942094
ON t1.micros = t2.micros",
20952095
)
20962096
.await
@@ -2100,7 +2100,7 @@ mod tests {
21002100
let results = plan_and_collect(
21012101
&mut ctx,
21022102
"SELECT * FROM t as t1 \
2103-
JOIN (SELECT * FROM t as t2) \
2103+
JOIN (SELECT * FROM t) as t2 \
21042104
ON t1.millis = t2.millis",
21052105
)
21062106
.await
@@ -2965,12 +2965,12 @@ mod tests {
29652965
#[tokio::test]
29662966
async fn ctx_sql_should_optimize_plan() -> Result<()> {
29672967
let mut ctx = ExecutionContext::new();
2968-
let plan1 =
2969-
ctx.create_logical_plan("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?;
2968+
let plan1 = ctx
2969+
.create_logical_plan("SELECT * FROM (SELECT 1) AS one WHERE TRUE AND TRUE")?;
29702970

29712971
let opt_plan1 = ctx.optimize(&plan1)?;
29722972

2973-
let plan2 = ctx.sql("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?;
2973+
let plan2 = ctx.sql("SELECT * FROM (SELECT 1) AS one WHERE TRUE AND TRUE")?;
29742974

29752975
assert_eq!(
29762976
format!("{:?}", opt_plan1),
@@ -3723,7 +3723,7 @@ mod tests {
37233723
SELECT i, 'a' AS cat FROM catalog_a.schema_a.table_a
37243724
UNION ALL
37253725
SELECT i, 'b' AS cat FROM catalog_b.schema_b.table_b
3726-
)
3726+
) AS all
37273727
GROUP BY cat
37283728
ORDER BY cat
37293729
",

datafusion/src/logical_plan/builder.rs

+53-29
Original file line numberDiff line numberDiff line change
@@ -231,34 +231,26 @@ impl LogicalPlanBuilder {
231231
Ok(Self::from(table_scan))
232232
}
233233

234-
/// Apply a projection.
235-
///
236-
/// # Errors
237-
/// This function errors under any of the following conditions:
238-
/// * Two or more expressions have the same name
239-
/// * An invalid expression is used (e.g. a `sort` expression)
234+
/// Apply a projection without alias.
240235
pub fn project(&self, expr: impl IntoIterator<Item = Expr>) -> Result<Self> {
241-
let input_schema = self.plan.schema();
242-
let mut projected_expr = vec![];
243-
for e in expr {
244-
match e {
245-
Expr::Wildcard => {
246-
projected_expr.extend(expand_wildcard(input_schema, &self.plan)?)
247-
}
248-
_ => projected_expr
249-
.push(columnize_expr(normalize_col(e, &self.plan)?, input_schema)),
250-
}
251-
}
252-
253-
validate_unique_names("Projections", projected_expr.iter(), input_schema)?;
254-
255-
let schema = DFSchema::new(exprlist_to_fields(&projected_expr, input_schema)?)?;
236+
Ok(Self::from(project_with_alias(
237+
self.plan.clone(),
238+
expr,
239+
None,
240+
)?))
241+
}
256242

257-
Ok(Self::from(LogicalPlan::Projection {
258-
expr: projected_expr,
259-
input: Arc::new(self.plan.clone()),
260-
schema: DFSchemaRef::new(schema),
261-
}))
243+
/// Apply a projection with alias
244+
pub fn project_with_alias(
245+
&self,
246+
expr: impl IntoIterator<Item = Expr>,
247+
alias: Option<String>,
248+
) -> Result<Self> {
249+
Ok(Self::from(project_with_alias(
250+
self.plan.clone(),
251+
expr,
252+
alias,
253+
)?))
262254
}
263255

264256
/// Apply a filter
@@ -472,12 +464,9 @@ impl LogicalPlanBuilder {
472464
let group_expr = normalize_cols(group_expr, &self.plan)?;
473465
let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
474466
let all_expr = group_expr.iter().chain(aggr_expr.iter());
475-
476467
validate_unique_names("Aggregations", all_expr.clone(), self.plan.schema())?;
477-
478468
let aggr_schema =
479469
DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?;
480-
481470
Ok(Self::from(LogicalPlan::Aggregate {
482471
input: Arc::new(self.plan.clone()),
483472
group_expr,
@@ -610,6 +599,41 @@ pub fn union_with_alias(
610599
})
611600
}
612601

602+
/// Project with optional alias
603+
/// # Errors
604+
/// This function errors under any of the following conditions:
605+
/// * Two or more expressions have the same name
606+
/// * An invalid expression is used (e.g. a `sort` expression)
607+
pub fn project_with_alias(
608+
plan: LogicalPlan,
609+
expr: impl IntoIterator<Item = Expr>,
610+
alias: Option<String>,
611+
) -> Result<LogicalPlan> {
612+
let input_schema = plan.schema();
613+
let mut projected_expr = vec![];
614+
for e in expr {
615+
match e {
616+
Expr::Wildcard => {
617+
projected_expr.extend(expand_wildcard(input_schema, &plan)?)
618+
}
619+
_ => projected_expr
620+
.push(columnize_expr(normalize_col(e, &plan)?, input_schema)),
621+
}
622+
}
623+
validate_unique_names("Projections", projected_expr.iter(), input_schema)?;
624+
let input_schema = DFSchema::new(exprlist_to_fields(&projected_expr, input_schema)?)?;
625+
let schema = match alias {
626+
Some(ref alias) => input_schema.replace_qualifier(alias.as_str()),
627+
None => input_schema,
628+
};
629+
Ok(LogicalPlan::Projection {
630+
expr: projected_expr,
631+
input: Arc::new(plan.clone()),
632+
schema: DFSchemaRef::new(schema),
633+
alias,
634+
})
635+
}
636+
613637
/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
614638
pub(crate) fn expand_wildcard(
615639
schema: &DFSchema,

datafusion/src/logical_plan/expr.rs

+9
Original file line numberDiff line numberDiff line change
@@ -1187,6 +1187,15 @@ pub fn col(ident: &str) -> Expr {
11871187
Expr::Column(ident.into())
11881188
}
11891189

1190+
/// Create a column expression based on a qualified or unqualified column name and table name
1191+
pub fn col_with_table_name(ident: &str, table_name: &str) -> Expr {
1192+
let col = Column {
1193+
relation: Some(table_name.to_string()),
1194+
name: ident.to_string(),
1195+
};
1196+
Expr::Column(col)
1197+
}
1198+
11901199
/// Convert an expression into Column expression if it's already provided as input plan.
11911200
///
11921201
/// For example, it rewrites:

datafusion/src/logical_plan/mod.rs

+10-9
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,16 @@ pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
3737
pub use display::display_schema;
3838
pub use expr::{
3939
abs, acos, and, array, ascii, asin, atan, avg, binary_expr, bit_length, btrim, case,
40-
ceil, character_length, chr, col, columnize_expr, combine_filters, concat, concat_ws,
41-
cos, count, count_distinct, create_udaf, create_udf, date_part, date_trunc, exp,
42-
exprlist_to_fields, floor, in_list, initcap, left, length, lit, lit_timestamp_nano,
43-
ln, log10, log2, lower, lpad, ltrim, max, md5, min, normalize_col, normalize_cols,
44-
now, octet_length, or, random, regexp_match, regexp_replace, repeat, replace,
45-
replace_col, reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512,
46-
signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
47-
translate, trim, trunc, unnormalize_col, unnormalize_cols, upper, when, Column, Expr,
48-
ExprRewriter, ExpressionVisitor, Literal, Recursion, RewriteRecursion,
40+
ceil, character_length, chr, col, col_with_table_name, columnize_expr,
41+
combine_filters, concat, concat_ws, cos, count, count_distinct, create_udaf,
42+
create_udf, date_part, date_trunc, exp, exprlist_to_fields, floor, in_list, initcap,
43+
left, length, lit, lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5,
44+
min, normalize_col, normalize_cols, now, octet_length, or, random, regexp_match,
45+
regexp_replace, repeat, replace, replace_col, reverse, right, round, rpad, rtrim,
46+
sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos,
47+
substr, sum, tan, to_hex, translate, trim, trunc, unnormalize_col, unnormalize_cols,
48+
upper, when, Column, Expr, ExprRewriter, ExpressionVisitor, Literal, Recursion,
49+
RewriteRecursion,
4950
};
5051
pub use extension::UserDefinedLogicalNode;
5152
pub use operators::Operator;

datafusion/src/logical_plan/plan.rs

+2
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ pub enum LogicalPlan {
7676
input: Arc<LogicalPlan>,
7777
/// The schema description of the output
7878
schema: DFSchemaRef,
79+
/// Projection output relation alias
80+
alias: Option<String>,
7981
},
8082
/// Filters rows from its input that do not match an
8183
/// expression (essentially a WHERE clause with a predicate

datafusion/src/optimizer/common_subexpr_eliminate.rs

+3
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
8181
expr,
8282
input,
8383
schema,
84+
alias,
8485
} => {
8586
let mut arrays = vec![];
8687
for e in expr {
@@ -103,6 +104,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
103104
expr: new_expr.pop().unwrap(),
104105
input: Arc::new(new_input),
105106
schema: schema.clone(),
107+
alias: alias.clone(),
106108
})
107109
}
108110
LogicalPlan::Filter { predicate, input } => {
@@ -278,6 +280,7 @@ fn build_project_plan(
278280
expr: project_exprs,
279281
input: Arc::new(input),
280282
schema: Arc::new(schema),
283+
alias: None,
281284
})
282285
}
283286

datafusion/src/optimizer/filter_push_down.rs

+1
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
318318
input,
319319
expr,
320320
schema,
321+
alias: _,
321322
} => {
322323
// A projection is filter-commutable, but re-writes all predicate expressions
323324
// collect projection.

datafusion/src/optimizer/limit_push_down.rs

+2
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ fn limit_push_down(
8080
expr,
8181
input,
8282
schema,
83+
alias,
8384
},
8485
upper_limit,
8586
) => {
@@ -93,6 +94,7 @@ fn limit_push_down(
9394
execution_props,
9495
)?),
9596
schema: schema.clone(),
97+
alias: alias.clone(),
9698
})
9799
}
98100
(

datafusion/src/optimizer/projection_push_down.rs

+3
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ fn optimize_plan(
140140
input,
141141
expr,
142142
schema,
143+
alias,
143144
} => {
144145
// projection:
145146
// * remove any expression that is not required
@@ -190,6 +191,7 @@ fn optimize_plan(
190191
expr: new_expr,
191192
input: Arc::new(new_input),
192193
schema: DFSchemaRef::new(DFSchema::new(new_fields)?),
194+
alias: alias.clone(),
193195
})
194196
}
195197
}
@@ -744,6 +746,7 @@ mod tests {
744746
expr,
745747
input: Arc::new(table_scan),
746748
schema: Arc::new(projected_schema),
749+
alias: None,
747750
};
748751

749752
assert_fields_eq(&plan, vec!["a", "b"]);

datafusion/src/optimizer/utils.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,11 @@ pub fn from_plan(
118118
inputs: &[LogicalPlan],
119119
) -> Result<LogicalPlan> {
120120
match plan {
121-
LogicalPlan::Projection { schema, .. } => Ok(LogicalPlan::Projection {
121+
LogicalPlan::Projection { schema, alias, .. } => Ok(LogicalPlan::Projection {
122122
expr: expr.to_vec(),
123123
input: Arc::new(inputs[0].clone()),
124124
schema: schema.clone(),
125+
alias: alias.clone(),
125126
}),
126127
LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter {
127128
predicate: expr[0].clone(),

0 commit comments

Comments
 (0)