diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 51e85e25e077..1c12f4697005 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -1295,21 +1295,30 @@ fn construct_prefix_orderings( relevant_sort_expr: &PhysicalSortExpr, dependency_map: &DependencyMap, ) -> Vec { + let mut dep_enumerator = DependencyEnumerator::new(); dependency_map[relevant_sort_expr] .dependencies .iter() - .flat_map(|dep| construct_orderings(dep, dependency_map)) + .flat_map(|dep| dep_enumerator.construct_orderings(dep, dependency_map)) .collect() } -/// Given a set of relevant dependencies (`relevant_deps`) and a map of dependencies -/// (`dependency_map`), this function generates all possible prefix orderings -/// based on the given dependencies. +/// Generates all possible orderings where dependencies are satisfied for the +/// current projection expression. +/// +/// # Examaple +/// If `dependences` is `a + b ASC` and the dependency map holds dependencies +/// * `a ASC` --> `[c ASC]` +/// * `b ASC` --> `[d DESC]`, +/// +/// This function generates these two sort orders +/// * `[c ASC, d DESC, a + b ASC]` +/// * `[d DESC, c ASC, a + b ASC]` /// /// # Parameters /// -/// * `dependencies` - A reference to the dependencies. -/// * `dependency_map` - A reference to the map of dependencies for expressions. +/// * `dependencies` - Set of relevant expressions. +/// * `dependency_map` - Map of dependencies for expressions that may appear in `dependencies` /// /// # Returns /// @@ -1335,11 +1344,6 @@ fn generate_dependency_orderings( return vec![vec![]]; } - // Generate all possible orderings where dependencies are satisfied for the - // current projection expression. For example, if expression is `a + b ASC`, - // and the dependency for `a ASC` is `[c ASC]`, the dependency for `b ASC` - // is `[d DESC]`, then we generate `[c ASC, d DESC, a + b ASC]` and - // `[d DESC, c ASC, a + b ASC]`. relevant_prefixes .into_iter() .multi_cartesian_product() @@ -1421,7 +1425,7 @@ struct DependencyNode { } impl DependencyNode { - // Insert dependency to the state (if exists). + /// Insert dependency to the state (if exists). fn insert_dependency(&mut self, dependency: Option<&PhysicalSortExpr>) { if let Some(dep) = dependency { self.dependencies.insert(dep.clone()); @@ -1437,38 +1441,69 @@ impl DependencyNode { type DependencyMap = IndexMap; type Dependencies = IndexSet; -/// This function recursively analyzes the dependencies of the given sort -/// expression within the given dependency map to construct lexicographical -/// orderings that include the sort expression and its dependencies. -/// -/// # Parameters -/// -/// - `referred_sort_expr`: A reference to the sort expression (`PhysicalSortExpr`) -/// for which lexicographical orderings satisfying its dependencies are to be -/// constructed. -/// - `dependency_map`: A reference to the `DependencyMap` that contains -/// dependencies for different `PhysicalSortExpr`s. -/// -/// # Returns -/// -/// A vector of lexicographical orderings (`Vec`) based on the given -/// sort expression and its dependencies. -fn construct_orderings( - referred_sort_expr: &PhysicalSortExpr, - dependency_map: &DependencyMap, -) -> Vec { - // We are sure that `referred_sort_expr` is inside `dependency_map`. - let node = &dependency_map[referred_sort_expr]; - // Since we work on intermediate nodes, we are sure `val.target_sort_expr` - // exists. - let target_sort_expr = node.target_sort_expr.clone().unwrap(); - if node.dependencies.is_empty() { - vec![vec![target_sort_expr]] - } else { +/// Contains a mapping of all dependencies we have processed for each sort expr +struct DependencyEnumerator<'a> { + /// Maps `expr` --> `[exprs]` that have previously been processed + seen: IndexMap<&'a PhysicalSortExpr, IndexSet<&'a PhysicalSortExpr>>, +} + +impl<'a> DependencyEnumerator<'a> { + fn new() -> Self { + Self { + seen: IndexMap::new(), + } + } + + /// Insert a new dependency, + /// + /// returns false if the dependency was already in the map + /// returns true if the dependency was newly inserted + fn insert( + &mut self, + target: &'a PhysicalSortExpr, + dep: &'a PhysicalSortExpr, + ) -> bool { + self.seen.entry(target).or_default().insert(dep) + } + + /// This function recursively analyzes the dependencies of the given sort + /// expression within the given dependency map to construct lexicographical + /// orderings that include the sort expression and its dependencies. + /// + /// # Parameters + /// + /// - `referred_sort_expr`: A reference to the sort expression (`PhysicalSortExpr`) + /// for which lexicographical orderings satisfying its dependencies are to be + /// constructed. + /// - `dependency_map`: A reference to the `DependencyMap` that contains + /// dependencies for different `PhysicalSortExpr`s. + /// + /// # Returns + /// + /// A vector of lexicographical orderings (`Vec`) based on the given + /// sort expression and its dependencies. + fn construct_orderings( + &mut self, + referred_sort_expr: &'a PhysicalSortExpr, + dependency_map: &'a DependencyMap, + ) -> Vec { + // We are sure that `referred_sort_expr` is inside `dependency_map`. + let node = &dependency_map[referred_sort_expr]; + // Since we work on intermediate nodes, we are sure `val.target_sort_expr` + // exists. + let target_sort_expr = node.target_sort_expr.as_ref().unwrap(); + if node.dependencies.is_empty() { + return vec![vec![target_sort_expr.clone()]]; + }; + node.dependencies .iter() .flat_map(|dep| { - let mut orderings = construct_orderings(dep, dependency_map); + let mut orderings = if self.insert(target_sort_expr, dep) { + self.construct_orderings(dep, dependency_map) + } else { + vec![] + }; for ordering in orderings.iter_mut() { ordering.push(target_sort_expr.clone()) } @@ -1763,6 +1798,51 @@ mod tests { Ok(()) } + #[test] + fn project_equivalence_properties_test_multi() -> Result<()> { + // test multiple input orderings with equivalence properties + let input_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Int64, true), + Field::new("c", DataType::Int64, true), + Field::new("d", DataType::Int64, true), + ])); + + let mut input_properties = EquivalenceProperties::new(Arc::clone(&input_schema)); + // add equivalent ordering [a, b, c, d] + input_properties.add_new_ordering(vec![ + parse_sort_expr("a", &input_schema), + parse_sort_expr("b", &input_schema), + parse_sort_expr("c", &input_schema), + parse_sort_expr("d", &input_schema), + ]); + + // add equivalent ordering [a, c, b, d] + input_properties.add_new_ordering(vec![ + parse_sort_expr("a", &input_schema), + parse_sort_expr("c", &input_schema), + parse_sort_expr("b", &input_schema), // NB b and c are swapped + parse_sort_expr("d", &input_schema), + ]); + + // simply project all the columns in order + let proj_exprs = vec![ + (col("a", &input_schema)?, "a".to_string()), + (col("b", &input_schema)?, "b".to_string()), + (col("c", &input_schema)?, "c".to_string()), + (col("d", &input_schema)?, "d".to_string()), + ]; + let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?; + let out_properties = input_properties.project(&projection_mapping, input_schema); + + assert_eq!( + out_properties.to_string(), + "order: [[a@0 ASC,c@2 ASC,b@1 ASC,d@3 ASC], [a@0 ASC,b@1 ASC,c@2 ASC,d@3 ASC]]" + ); + + Ok(()) + } + #[test] fn test_join_equivalence_properties() -> Result<()> { let schema = create_test_schema()?;