diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 3d5d7cce5673..ff45ff43003d 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -55,6 +55,7 @@ use crate::variation_const::{ use datafusion::arrow::array::{new_empty_array, AsArray}; use datafusion::common::scalar::ScalarStructBuilder; use datafusion::dataframe::DataFrame; +use datafusion::logical_expr::builder::project; use datafusion::logical_expr::expr::InList; use datafusion::logical_expr::{ col, expr, Cast, Extension, GroupingSet, Like, LogicalPlanBuilder, Partitioning, @@ -79,6 +80,7 @@ use substrait::proto::expression::literal::{ use substrait::proto::expression::subquery::SubqueryType; use substrait::proto::expression::{self, FieldReference, Literal, ScalarFunction}; use substrait::proto::read_rel::local_files::file_or_files::PathType::UriFile; +use substrait::proto::rel_common::{Emit, EmitKind}; use substrait::proto::{ aggregate_function::AggregationInvocation, expression::{ @@ -92,9 +94,9 @@ use substrait::proto::{ join_rel, plan_rel, r#type, read_rel::ReadType, rel::RelType, - set_rel, + rel_common, set_rel, sort_field::{SortDirection, SortKind::*}, - AggregateFunction, Expression, NamedStruct, Plan, Rel, Type, + AggregateFunction, Expression, NamedStruct, Plan, Rel, RelCommon, Type, }; use substrait::proto::{ExtendedExpression, FunctionArgument, SortField}; @@ -560,42 +562,51 @@ pub async fn from_substrait_rel( rel: &Rel, extensions: &Extensions, ) -> Result { - match &rel.rel_type { + let plan: Result = match &rel.rel_type { Some(RelType::Project(p)) => { if let Some(input) = p.input.as_ref() { let mut input = LogicalPlanBuilder::from( from_substrait_rel(ctx, input, extensions).await?, ); - let mut names: HashSet = HashSet::new(); - let mut exprs: Vec = vec![]; - for e in &p.expressions { - let x = - from_substrait_rex(ctx, e, input.clone().schema(), extensions) + let original_schema = input.schema().clone(); + + // Ensure that all expressions have a unique display name, so that + // validate_unique_names does not fail when constructing the project. + let mut name_tracker = NameTracker::new(); + + // By default, a Substrait Project emits all inputs fields followed by all expressions. + // We build the explicit expressions first, and then the input expressions to avoid + // adding aliases to the explicit expressions (as part of ensuring unique names). + // + // This is helpful for plan visualization and tests, because when DataFusion produces + // Substrait Projects it adds an output mapping that excludes all input columns + // leaving only explicit expressions. + + let mut explicit_exprs: Vec = vec![]; + for expr in &p.expressions { + let e = + from_substrait_rex(ctx, expr, input.clone().schema(), extensions) .await?; // if the expression is WindowFunction, wrap in a Window relation - if let Expr::WindowFunction(_) = &x { + if let Expr::WindowFunction(_) = &e { // Adding the same expression here and in the project below // works because the project's builder uses columnize_expr(..) // to transform it into a column reference - input = input.window(vec![x.clone()])? + input = input.window(vec![e.clone()])? } - // Ensure the expression has a unique display name, so that project's - // validate_unique_names doesn't fail - let name = x.schema_name().to_string(); - let mut new_name = name.clone(); - let mut i = 0; - while names.contains(&new_name) { - new_name = format!("{}__temp__{}", name, i); - i += 1; - } - if new_name != name { - exprs.push(x.alias(new_name.clone())); - } else { - exprs.push(x); - } - names.insert(new_name); + explicit_exprs.push(name_tracker.get_uniquely_named_expr(e)?); } - input.project(exprs)?.build() + + let mut final_exprs: Vec = vec![]; + for index in 0..original_schema.fields().len() { + let e = Expr::Column(Column::from( + original_schema.qualified_field(index), + )); + final_exprs.push(name_tracker.get_uniquely_named_expr(e)?); + } + final_exprs.append(&mut explicit_exprs); + + input.project(final_exprs)?.build() } else { not_impl_err!("Projection without an input is not supported") } @@ -1059,6 +1070,143 @@ pub async fn from_substrait_rel( })) } _ => not_impl_err!("Unsupported RelType: {:?}", rel.rel_type), + }; + apply_emit_kind(retrieve_rel_common(rel), plan?) +} + +fn retrieve_rel_common(rel: &Rel) -> Option<&RelCommon> { + match rel.rel_type.as_ref() { + None => None, + Some(rt) => match rt { + RelType::Read(r) => r.common.as_ref(), + RelType::Filter(f) => f.common.as_ref(), + RelType::Fetch(f) => f.common.as_ref(), + RelType::Aggregate(a) => a.common.as_ref(), + RelType::Sort(s) => s.common.as_ref(), + RelType::Join(j) => j.common.as_ref(), + RelType::Project(p) => p.common.as_ref(), + RelType::Set(s) => s.common.as_ref(), + RelType::ExtensionSingle(e) => e.common.as_ref(), + RelType::ExtensionMulti(e) => e.common.as_ref(), + RelType::ExtensionLeaf(e) => e.common.as_ref(), + RelType::Cross(c) => c.common.as_ref(), + RelType::Reference(_) => None, + RelType::Write(w) => w.common.as_ref(), + RelType::Ddl(d) => d.common.as_ref(), + RelType::HashJoin(j) => j.common.as_ref(), + RelType::MergeJoin(j) => j.common.as_ref(), + RelType::NestedLoopJoin(j) => j.common.as_ref(), + RelType::Window(w) => w.common.as_ref(), + RelType::Exchange(e) => e.common.as_ref(), + RelType::Expand(e) => e.common.as_ref(), + }, + } +} + +fn retrieve_emit_kind(rel_common: Option<&RelCommon>) -> EmitKind { + // the default EmitKind is Direct if it is not set explicitly + let default = EmitKind::Direct(rel_common::Direct {}); + rel_common + .and_then(|rc| rc.emit_kind.as_ref()) + .map_or(default, |ek| ek.clone()) +} + +fn contains_volatile_expr(proj: &Projection) -> Result { + for expr in proj.expr.iter() { + if expr.is_volatile()? { + return Ok(true); + } + } + Ok(false) +} + +fn apply_emit_kind( + rel_common: Option<&RelCommon>, + plan: LogicalPlan, +) -> Result { + match retrieve_emit_kind(rel_common) { + EmitKind::Direct(_) => Ok(plan), + EmitKind::Emit(Emit { output_mapping }) => { + // It is valid to reference the same field multiple times in the Emit + // In this case, we need to provide unique names to avoid collisions + let mut name_tracker = NameTracker::new(); + match plan { + // To avoid adding a projection on top of a projection, we apply special case + // handling to flatten Substrait Emits. This is only applicable if none of the + // expressions in the projection are volatile. This is to avoid issues like + // converting a single call of the random() function into multiple calls due to + // duplicate fields in the output_mapping. + LogicalPlan::Projection(proj) if !contains_volatile_expr(&proj)? => { + let mut exprs: Vec = vec![]; + for field in output_mapping { + let expr = proj.expr + .get(field as usize) + .ok_or_else(|| substrait_datafusion_err!( + "Emit output field {} cannot be resolved in input schema {}", + field, proj.input.schema().clone() + ))?; + exprs.push(name_tracker.get_uniquely_named_expr(expr.clone())?); + } + + let input = Arc::unwrap_or_clone(proj.input); + project(input, exprs) + } + // Otherwise we just handle the output_mapping as a projection + _ => { + let input_schema = plan.schema(); + + let mut exprs: Vec = vec![]; + for index in output_mapping.into_iter() { + let column = Expr::Column(Column::from( + input_schema.qualified_field(index as usize), + )); + let expr = name_tracker.get_uniquely_named_expr(column)?; + exprs.push(expr); + } + + project(plan, exprs) + } + } + } + } +} + +struct NameTracker { + seen_names: HashSet, +} + +enum NameTrackerStatus { + NeverSeen, + SeenBefore, +} + +impl NameTracker { + fn new() -> Self { + NameTracker { + seen_names: HashSet::default(), + } + } + fn get_unique_name(&mut self, name: String) -> (String, NameTrackerStatus) { + match self.seen_names.insert(name.clone()) { + true => (name, NameTrackerStatus::NeverSeen), + false => { + let mut counter = 0; + loop { + let candidate_name = format!("{}__temp__{}", name, counter); + if self.seen_names.insert(candidate_name.clone()) { + return (candidate_name, NameTrackerStatus::SeenBefore); + } + counter += 1; + } + } + } + } + + fn get_uniquely_named_expr(&mut self, expr: Expr) -> Result { + match self.get_unique_name(expr.name_for_alias()?) { + (_, NameTrackerStatus::NeverSeen) => Ok(expr), + (name, NameTrackerStatus::SeenBefore) => Ok(expr.alias(name)), + } } } diff --git a/datafusion/substrait/tests/cases/emit_kind_tests.rs b/datafusion/substrait/tests/cases/emit_kind_tests.rs new file mode 100644 index 000000000000..ac66177ed796 --- /dev/null +++ b/datafusion/substrait/tests/cases/emit_kind_tests.rs @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for Emit Kind usage + +#[cfg(test)] +mod tests { + use crate::utils::test::{add_plan_schemas_to_ctx, read_json}; + + use datafusion::common::Result; + use datafusion::execution::SessionStateBuilder; + use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext}; + use datafusion_substrait::logical_plan::consumer::from_substrait_plan; + use datafusion_substrait::logical_plan::producer::to_substrait_plan; + + #[tokio::test] + async fn project_respects_direct_emit_kind() -> Result<()> { + let proto_plan = read_json( + "tests/testdata/test_plans/emit_kind/direct_on_project.substrait.json", + ); + let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?; + let plan = from_substrait_plan(&ctx, &proto_plan).await?; + + let plan_str = format!("{}", plan); + + assert_eq!( + plan_str, + "Projection: DATA.A AS a, DATA.B AS b, DATA.A + Int64(1) AS add1\ + \n TableScan: DATA" + ); + Ok(()) + } + + #[tokio::test] + async fn handle_emit_as_project() -> Result<()> { + let proto_plan = read_json( + "tests/testdata/test_plans/emit_kind/emit_on_filter.substrait.json", + ); + let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?; + let plan = from_substrait_plan(&ctx, &proto_plan).await?; + + let plan_str = format!("{}", plan); + + assert_eq!( + plan_str, + // Note that duplicate references in the remap are aliased + "Projection: DATA.B, DATA.A AS A1, DATA.A AS DATA.A__temp__0 AS A2\ + \n Filter: DATA.B = Int64(2)\ + \n TableScan: DATA" + ); + Ok(()) + } + + async fn make_context() -> Result { + let state = SessionStateBuilder::new() + .with_config(SessionConfig::default()) + .with_default_features() + .build(); + let ctx = SessionContext::new_with_state(state); + ctx.register_csv("data", "tests/testdata/data.csv", CsvReadOptions::default()) + .await?; + Ok(ctx) + } + + #[tokio::test] + async fn handle_emit_as_project_with_volatile_expr() -> Result<()> { + let ctx = make_context().await?; + + let df = ctx + .sql("SELECT random() AS c1, a + 1 AS c2 FROM data") + .await?; + + let plan = df.into_unoptimized_plan(); + assert_eq!( + format!("{}", plan), + "Projection: random() AS c1, data.a + Int64(1) AS c2\ + \n TableScan: data" + ); + + let proto = to_substrait_plan(&plan, &ctx)?; + let plan2 = from_substrait_plan(&ctx, &proto).await?; + // note how the Projections are not flattened + assert_eq!( + format!("{}", plan2), + "Projection: random() AS c1, data.a + Int64(1) AS c2\ + \n Projection: data.a, data.b, data.c, data.d, data.e, data.f, random(), data.a + Int64(1)\ + \n TableScan: data" + ); + Ok(()) + } + + #[tokio::test] + async fn handle_emit_as_project_without_volatile_exprs() -> Result<()> { + let ctx = make_context().await?; + let df = ctx.sql("SELECT a + 1, b + 2 FROM data").await?; + + let plan = df.into_unoptimized_plan(); + assert_eq!( + format!("{}", plan), + "Projection: data.a + Int64(1), data.b + Int64(2)\ + \n TableScan: data" + ); + + let proto = to_substrait_plan(&plan, &ctx)?; + let plan2 = from_substrait_plan(&ctx, &proto).await?; + + let plan1str = format!("{plan}"); + let plan2str = format!("{plan2}"); + assert_eq!(plan1str, plan2str); + + Ok(()) + } +} diff --git a/datafusion/substrait/tests/cases/mod.rs b/datafusion/substrait/tests/cases/mod.rs index 42aa23626106..b1f4b95df66f 100644 --- a/datafusion/substrait/tests/cases/mod.rs +++ b/datafusion/substrait/tests/cases/mod.rs @@ -16,6 +16,7 @@ // under the License. mod consumer_integration; +mod emit_kind_tests; mod function_test; mod logical_plans; mod roundtrip_logical_plan; diff --git a/datafusion/substrait/tests/testdata/test_plans/emit_kind/direct_on_project.substrait.json b/datafusion/substrait/tests/testdata/test_plans/emit_kind/direct_on_project.substrait.json new file mode 100644 index 000000000000..63b275e1723f --- /dev/null +++ b/datafusion/substrait/tests/testdata/test_plans/emit_kind/direct_on_project.substrait.json @@ -0,0 +1,90 @@ +{ + "extensionUris": [{ + "extensionUriAnchor": 1, + "uri": "/functions_arithmetic.yaml" + }], + "extensions": [{ + "extensionFunction": { + "extensionUriReference": 1, + "functionAnchor": 0, + "name": "add:i64_i64" + } + }], + "relations": [{ + "root": { + "input": { + "project": { + "common": { + "direct": { + } + }, + "input": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": ["A", "B"], + "struct": { + "types": [{ + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "namedTable": { + "names": ["DATA"] + } + } + }, + "expressions": [{ + "scalarFunction": { + "functionReference": 0, + "args": [], + "outputType": { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + } + }, { + "value": { + "literal": { + "i64": 1, + "nullable": false, + "typeVariationReference": 0 + } + } + }], + "options": [] + } + }] + } + }, + "names": ["a", "b", "add1"] + } + }], + "expectedTypeUrls": [] +} \ No newline at end of file diff --git a/datafusion/substrait/tests/testdata/test_plans/emit_kind/emit_on_filter.substrait.json b/datafusion/substrait/tests/testdata/test_plans/emit_kind/emit_on_filter.substrait.json new file mode 100644 index 000000000000..2fc970155955 --- /dev/null +++ b/datafusion/substrait/tests/testdata/test_plans/emit_kind/emit_on_filter.substrait.json @@ -0,0 +1,91 @@ +{ + "extensionUris": [{ + "extensionUriAnchor": 1, + "uri": "/functions_comparison.yaml" + }], + "extensions": [{ + "extensionFunction": { + "extensionUriReference": 1, + "functionAnchor": 0, + "name": "equal:any_any" + } + }], + "relations": [{ + "root": { + "input": { + "filter": { + "common": { + "emit": { + "outputMapping": [1, 0, 0] + } + }, + "input": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": ["A", "B"], + "struct": { + "types": [{ + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "namedTable": { + "names": ["DATA"] + } + } + }, + "condition": { + "scalarFunction": { + "functionReference": 0, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": { + } + } + } + }, { + "value": { + "literal": { + "i64": "2", + "nullable": false, + "typeVariationReference": 0 + } + } + }], + "options": [] + } + } + } + }, + "names": ["B", "A1", "A2"] + } + }], + "expectedTypeUrls": [] +} \ No newline at end of file