Skip to content

Commit 9113f8f

Browse files
committed
fix alias
1 parent 77ca7db commit 9113f8f

File tree

15 files changed

+210
-118
lines changed

15 files changed

+210
-118
lines changed

ballista/rust/core/proto/ballista.proto

+3
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,9 @@ message AvroTableScanNode {
311311
message ProjectionNode {
312312
LogicalPlanNode input = 1;
313313
repeated LogicalExprNode expr = 2;
314+
oneof optional_alias {
315+
string alias = 3;
316+
}
314317
}
315318

316319
message SelectionNode {

ballista/rust/core/src/serde/logical_plan/from_proto.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
6363
.map(|expr| expr.try_into())
6464
.collect::<Result<Vec<_>, _>>()?;
6565
LogicalPlanBuilder::from(input)
66-
.project(x)?
66+
.project_with_alias(
67+
x,
68+
projection.optional_alias.as_ref().map(|a| match a {
69+
protobuf::projection_node::OptionalAlias::Alias(alias) => {
70+
alias.clone()
71+
}
72+
}),
73+
)?
6774
.build()
6875
.map_err(|e| e.into())
6976
}

ballista/rust/core/src/serde/logical_plan/to_proto.rs

+17-13
Original file line numberDiff line numberDiff line change
@@ -801,19 +801,23 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
801801
)))
802802
}
803803
}
804-
LogicalPlan::Projection { expr, input, .. } => {
805-
Ok(protobuf::LogicalPlanNode {
806-
logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
807-
protobuf::ProjectionNode {
808-
input: Some(Box::new(input.as_ref().try_into()?)),
809-
expr: expr
810-
.iter()
811-
.map(|expr| expr.try_into())
812-
.collect::<Result<Vec<_>, BallistaError>>()?,
813-
},
814-
))),
815-
})
816-
}
804+
LogicalPlan::Projection {
805+
expr, input, alias, ..
806+
} => Ok(protobuf::LogicalPlanNode {
807+
logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
808+
protobuf::ProjectionNode {
809+
input: Some(Box::new(input.as_ref().try_into()?)),
810+
expr: expr.iter().map(|expr| expr.try_into()).collect::<Result<
811+
Vec<_>,
812+
BallistaError,
813+
>>(
814+
)?,
815+
optional_alias: alias
816+
.clone()
817+
.map(protobuf::projection_node::OptionalAlias::Alias),
818+
},
819+
))),
820+
}),
817821
LogicalPlan::Filter { predicate, input } => {
818822
let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
819823
Ok(protobuf::LogicalPlanNode {

datafusion/src/execution/context.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -2047,7 +2047,7 @@ mod tests {
20472047
async fn join_partitioned() -> Result<()> {
20482048
// self join on partition id (workaround for duplicate column name)
20492049
let results = execute(
2050-
"SELECT 1 FROM test JOIN (SELECT c1 AS id1 FROM test) ON c1=id1",
2050+
"SELECT 1 FROM test JOIN (SELECT c1 AS id1 FROM test) AS a ON c1=id1",
20512051
4,
20522052
)
20532053
.await?;
@@ -2080,7 +2080,7 @@ mod tests {
20802080
let results = plan_and_collect(
20812081
&mut ctx,
20822082
"SELECT * FROM t as t1 \
2083-
JOIN (SELECT * FROM t as t2) \
2083+
JOIN (SELECT * FROM t) as t2 \
20842084
ON t1.nanos = t2.nanos",
20852085
)
20862086
.await
@@ -2090,7 +2090,7 @@ mod tests {
20902090
let results = plan_and_collect(
20912091
&mut ctx,
20922092
"SELECT * FROM t as t1 \
2093-
JOIN (SELECT * FROM t as t2) \
2093+
JOIN (SELECT * FROM t) as t2 \
20942094
ON t1.micros = t2.micros",
20952095
)
20962096
.await
@@ -2100,7 +2100,7 @@ mod tests {
21002100
let results = plan_and_collect(
21012101
&mut ctx,
21022102
"SELECT * FROM t as t1 \
2103-
JOIN (SELECT * FROM t as t2) \
2103+
JOIN (SELECT * FROM t) as t2 \
21042104
ON t1.millis = t2.millis",
21052105
)
21062106
.await
@@ -2967,12 +2967,12 @@ mod tests {
29672967
#[tokio::test]
29682968
async fn ctx_sql_should_optimize_plan() -> Result<()> {
29692969
let mut ctx = ExecutionContext::new();
2970-
let plan1 =
2971-
ctx.create_logical_plan("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?;
2970+
let plan1 = ctx
2971+
.create_logical_plan("SELECT * FROM (SELECT 1) AS one WHERE TRUE AND TRUE")?;
29722972

29732973
let opt_plan1 = ctx.optimize(&plan1)?;
29742974

2975-
let plan2 = ctx.sql("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?;
2975+
let plan2 = ctx.sql("SELECT * FROM (SELECT 1) AS one WHERE TRUE AND TRUE")?;
29762976

29772977
assert_eq!(
29782978
format!("{:?}", opt_plan1),
@@ -3727,7 +3727,7 @@ mod tests {
37273727
SELECT i, 'a' AS cat FROM catalog_a.schema_a.table_a
37283728
UNION ALL
37293729
SELECT i, 'b' AS cat FROM catalog_b.schema_b.table_b
3730-
)
3730+
) AS all
37313731
GROUP BY cat
37323732
ORDER BY cat
37333733
",

datafusion/src/logical_plan/builder.rs

+49-29
Original file line numberDiff line numberDiff line change
@@ -236,34 +236,22 @@ impl LogicalPlanBuilder {
236236
Ok(Self::from(table_scan))
237237
}
238238

239-
/// Apply a projection.
240-
///
241-
/// # Errors
242-
/// This function errors under any of the following conditions:
243-
/// * Two or more expressions have the same name
244-
/// * An invalid expression is used (e.g. a `sort` expression)
239+
/// Apply a projection without alias.
245240
pub fn project(&self, expr: impl IntoIterator<Item = Expr>) -> Result<Self> {
246-
let input_schema = self.plan.schema();
247-
let mut projected_expr = vec![];
248-
for e in expr {
249-
match e {
250-
Expr::Wildcard => {
251-
projected_expr.extend(expand_wildcard(input_schema, &self.plan)?)
252-
}
253-
_ => projected_expr
254-
.push(columnize_expr(normalize_col(e, &self.plan)?, input_schema)),
255-
}
256-
}
257-
258-
validate_unique_names("Projections", projected_expr.iter(), input_schema)?;
259-
260-
let schema = DFSchema::new(exprlist_to_fields(&projected_expr, input_schema)?)?;
241+
self.project_with_alias(expr, None)
242+
}
261243

262-
Ok(Self::from(LogicalPlan::Projection {
263-
expr: projected_expr,
264-
input: Arc::new(self.plan.clone()),
265-
schema: DFSchemaRef::new(schema),
266-
}))
244+
/// Apply a projection with alias
245+
pub fn project_with_alias(
246+
&self,
247+
expr: impl IntoIterator<Item = Expr>,
248+
alias: Option<String>,
249+
) -> Result<Self> {
250+
Ok(Self::from(project_with_alias(
251+
self.plan.clone(),
252+
expr,
253+
alias,
254+
)?))
267255
}
268256

269257
/// Apply a filter
@@ -477,12 +465,9 @@ impl LogicalPlanBuilder {
477465
let group_expr = normalize_cols(group_expr, &self.plan)?;
478466
let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
479467
let all_expr = group_expr.iter().chain(aggr_expr.iter());
480-
481468
validate_unique_names("Aggregations", all_expr.clone(), self.plan.schema())?;
482-
483469
let aggr_schema =
484470
DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?;
485-
486471
Ok(Self::from(LogicalPlan::Aggregate {
487472
input: Arc::new(self.plan.clone()),
488473
group_expr,
@@ -615,6 +600,41 @@ pub fn union_with_alias(
615600
})
616601
}
617602

603+
/// Project with optional alias
604+
/// # Errors
605+
/// This function errors under any of the following conditions:
606+
/// * Two or more expressions have the same name
607+
/// * An invalid expression is used (e.g. a `sort` expression)
608+
pub fn project_with_alias(
609+
plan: LogicalPlan,
610+
expr: impl IntoIterator<Item = Expr>,
611+
alias: Option<String>,
612+
) -> Result<LogicalPlan> {
613+
let input_schema = plan.schema();
614+
let mut projected_expr = vec![];
615+
for e in expr {
616+
match e {
617+
Expr::Wildcard => {
618+
projected_expr.extend(expand_wildcard(input_schema, &plan)?)
619+
}
620+
_ => projected_expr
621+
.push(columnize_expr(normalize_col(e, &plan)?, input_schema)),
622+
}
623+
}
624+
validate_unique_names("Projections", projected_expr.iter(), input_schema)?;
625+
let input_schema = DFSchema::new(exprlist_to_fields(&projected_expr, input_schema)?)?;
626+
let schema = match alias {
627+
Some(ref alias) => input_schema.replace_qualifier(alias.as_str()),
628+
None => input_schema,
629+
};
630+
Ok(LogicalPlan::Projection {
631+
expr: projected_expr,
632+
input: Arc::new(plan.clone()),
633+
schema: DFSchemaRef::new(schema),
634+
alias,
635+
})
636+
}
637+
618638
/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
619639
pub(crate) fn expand_wildcard(
620640
schema: &DFSchema,

datafusion/src/logical_plan/expr.rs

+9
Original file line numberDiff line numberDiff line change
@@ -1188,6 +1188,15 @@ pub fn col(ident: &str) -> Expr {
11881188
Expr::Column(ident.into())
11891189
}
11901190

1191+
/// Create a column expression based on a qualified or unqualified column name with table name
1192+
pub fn col_with_table_name(ident: &str, table_name: &str) -> Expr {
1193+
let col = Column {
1194+
relation: Some(table_name.to_string()),
1195+
name: ident.to_string(),
1196+
};
1197+
Expr::Column(col)
1198+
}
1199+
11911200
/// Convert an expression into Column expression if it's already provided as input plan.
11921201
///
11931202
/// For example, it rewrites:

datafusion/src/logical_plan/mod.rs

+10-9
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,16 @@ pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
3737
pub use display::display_schema;
3838
pub use expr::{
3939
abs, acos, and, array, ascii, asin, atan, avg, binary_expr, bit_length, btrim, case,
40-
ceil, character_length, chr, col, columnize_expr, combine_filters, concat, concat_ws,
41-
cos, count, count_distinct, create_udaf, create_udf, date_part, date_trunc, exp,
42-
exprlist_to_fields, floor, in_list, initcap, left, length, lit, lit_timestamp_nano,
43-
ln, log10, log2, lower, lpad, ltrim, max, md5, min, normalize_col, normalize_cols,
44-
now, octet_length, or, random, regexp_match, regexp_replace, repeat, replace,
45-
replace_col, reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512,
46-
signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
47-
translate, trim, trunc, unnormalize_col, unnormalize_cols, upper, when, Column, Expr,
48-
ExprRewriter, ExpressionVisitor, Literal, Recursion, RewriteRecursion,
40+
ceil, character_length, chr, col, col_with_table_name, columnize_expr,
41+
combine_filters, concat, concat_ws, cos, count, count_distinct, create_udaf,
42+
create_udf, date_part, date_trunc, exp, exprlist_to_fields, floor, in_list, initcap,
43+
left, length, lit, lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5,
44+
min, normalize_col, normalize_cols, now, octet_length, or, random, regexp_match,
45+
regexp_replace, repeat, replace, replace_col, reverse, right, round, rpad, rtrim,
46+
sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos,
47+
substr, sum, tan, to_hex, translate, trim, trunc, unnormalize_col, unnormalize_cols,
48+
upper, when, Column, Expr, ExprRewriter, ExpressionVisitor, Literal, Recursion,
49+
RewriteRecursion,
4950
};
5051
pub use extension::UserDefinedLogicalNode;
5152
pub use operators::Operator;

datafusion/src/logical_plan/plan.rs

+2
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ pub enum LogicalPlan {
7676
input: Arc<LogicalPlan>,
7777
/// The schema description of the output
7878
schema: DFSchemaRef,
79+
/// Projection output relation alias
80+
alias: Option<String>,
7981
},
8082
/// Filters rows from its input that do not match an
8183
/// expression (essentially a WHERE clause with a predicate

datafusion/src/optimizer/common_subexpr_eliminate.rs

+3
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
8181
expr,
8282
input,
8383
schema,
84+
alias,
8485
} => {
8586
let mut arrays = vec![];
8687
for e in expr {
@@ -103,6 +104,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
103104
expr: new_expr.pop().unwrap(),
104105
input: Arc::new(new_input),
105106
schema: schema.clone(),
107+
alias: alias.clone(),
106108
})
107109
}
108110
LogicalPlan::Filter { predicate, input } => {
@@ -278,6 +280,7 @@ fn build_project_plan(
278280
expr: project_exprs,
279281
input: Arc::new(input),
280282
schema: Arc::new(schema),
283+
alias: None,
281284
})
282285
}
283286

datafusion/src/optimizer/filter_push_down.rs

+1
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
318318
input,
319319
expr,
320320
schema,
321+
alias: _,
321322
} => {
322323
// A projection is filter-commutable, but re-writes all predicate expressions
323324
// collect projection.

datafusion/src/optimizer/limit_push_down.rs

+2
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ fn limit_push_down(
8080
expr,
8181
input,
8282
schema,
83+
alias,
8384
},
8485
upper_limit,
8586
) => {
@@ -93,6 +94,7 @@ fn limit_push_down(
9394
execution_props,
9495
)?),
9596
schema: schema.clone(),
97+
alias: alias.clone(),
9698
})
9799
}
98100
(

datafusion/src/optimizer/projection_push_down.rs

+3
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ fn optimize_plan(
140140
input,
141141
expr,
142142
schema,
143+
alias,
143144
} => {
144145
// projection:
145146
// * remove any expression that is not required
@@ -190,6 +191,7 @@ fn optimize_plan(
190191
expr: new_expr,
191192
input: Arc::new(new_input),
192193
schema: DFSchemaRef::new(DFSchema::new(new_fields)?),
194+
alias: alias.clone(),
193195
})
194196
}
195197
}
@@ -744,6 +746,7 @@ mod tests {
744746
expr,
745747
input: Arc::new(table_scan),
746748
schema: Arc::new(projected_schema),
749+
alias: None,
747750
};
748751

749752
assert_fields_eq(&plan, vec!["a", "b"]);

datafusion/src/optimizer/utils.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,11 @@ pub fn from_plan(
118118
inputs: &[LogicalPlan],
119119
) -> Result<LogicalPlan> {
120120
match plan {
121-
LogicalPlan::Projection { schema, .. } => Ok(LogicalPlan::Projection {
121+
LogicalPlan::Projection { schema, alias, .. } => Ok(LogicalPlan::Projection {
122122
expr: expr.to_vec(),
123123
input: Arc::new(inputs[0].clone()),
124124
schema: schema.clone(),
125+
alias: alias.clone(),
125126
}),
126127
LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter {
127128
predicate: expr[0].clone(),

0 commit comments

Comments
 (0)