Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: workaround exponential planning time bug in DF fork #55

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/optimizer/src/unwrap_cast_in_comparison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};

use crate::utils::NamePreserver;
#[allow(deprecated)]
use arrow::datatypes::{
DataType, TimeUnit, MAX_DECIMAL128_FOR_EACH_PRECISION,
MIN_DECIMAL128_FOR_EACH_PRECISION,
Expand Down
194 changes: 30 additions & 164 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2148,17 +2148,38 @@ fn calculate_union_binary(
})
.collect::<Vec<_>>();

// TEMP HACK WORKAROUND
// Revert code from https://github.com/apache/datafusion/pull/12562
// Context: https://github.com/apache/datafusion/issues/13748
// Context: https://github.com/influxdata/influxdb_iox/issues/13038

// Next, calculate valid orderings for the union by searching for prefixes
// in both sides.
let mut orderings = UnionEquivalentOrderingBuilder::new();
orderings.add_satisfied_orderings(lhs.normalized_oeq_class(), lhs.constants(), &rhs);
orderings.add_satisfied_orderings(rhs.normalized_oeq_class(), rhs.constants(), &lhs);
let orderings = orderings.build();

let mut eq_properties =
EquivalenceProperties::new(lhs.schema).with_constants(constants);

let mut orderings = vec![];
for mut ordering in lhs.normalized_oeq_class().into_iter() {
// Progressively shorten the ordering to search for a satisfied prefix:
while !rhs.ordering_satisfy(&ordering) {
ordering.pop();
}
// There is a non-trivial satisfied prefix, add it as a valid ordering:
if !ordering.is_empty() {
orderings.push(ordering);
}
}
for mut ordering in rhs.normalized_oeq_class().into_iter() {
// Progressively shorten the ordering to search for a satisfied prefix:
while !lhs.ordering_satisfy(&ordering) {
ordering.pop();
}
// There is a non-trivial satisfied prefix, add it as a valid ordering:
if !ordering.is_empty() {
orderings.push(ordering);
}
}
let mut eq_properties = EquivalenceProperties::new(lhs.schema);
eq_properties.constants = constants;
eq_properties.add_new_orderings(orderings);

Ok(eq_properties)
}

Expand Down Expand Up @@ -2204,6 +2225,7 @@ struct UnionEquivalentOrderingBuilder {
orderings: Vec<LexOrdering>,
}

#[expect(unused)]
impl UnionEquivalentOrderingBuilder {
fn new() -> Self {
Self { orderings: vec![] }
Expand Down Expand Up @@ -3552,134 +3574,6 @@ mod tests {
.run()
}

#[test]
fn test_union_equivalence_properties_constants_fill_gaps() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are tests of the reverted behavior so I removed them in this commit

let schema = create_test_schema().unwrap();
UnionEquivalenceTest::new(&schema)
.with_child_sort_and_const_exprs(
// First child orderings: [a ASC, c ASC], const [b]
vec![vec!["a", "c"]],
vec!["b"],
&schema,
)
.with_child_sort_and_const_exprs(
// Second child orderings: [b ASC, c ASC], const [a]
vec![vec!["b", "c"]],
vec!["a"],
&schema,
)
.with_expected_sort_and_const_exprs(
// Union orderings: [
// [a ASC, b ASC, c ASC],
// [b ASC, a ASC, c ASC]
// ], const []
vec![vec!["a", "b", "c"], vec!["b", "a", "c"]],
vec![],
)
.run()
}

#[test]
fn test_union_equivalence_properties_constants_no_fill_gaps() {
let schema = create_test_schema().unwrap();
UnionEquivalenceTest::new(&schema)
.with_child_sort_and_const_exprs(
// First child orderings: [a ASC, c ASC], const [d] // some other constant
vec![vec!["a", "c"]],
vec!["d"],
&schema,
)
.with_child_sort_and_const_exprs(
// Second child orderings: [b ASC, c ASC], const [a]
vec![vec!["b", "c"]],
vec!["a"],
&schema,
)
.with_expected_sort_and_const_exprs(
// Union orderings: [[a]] (only a is constant)
vec![vec!["a"]],
vec![],
)
.run()
}

#[test]
fn test_union_equivalence_properties_constants_fill_some_gaps() {
let schema = create_test_schema().unwrap();
UnionEquivalenceTest::new(&schema)
.with_child_sort_and_const_exprs(
// First child orderings: [c ASC], const [a, b] // some other constant
vec![vec!["c"]],
vec!["a", "b"],
&schema,
)
.with_child_sort_and_const_exprs(
// Second child orderings: [a DESC, b], const []
vec![vec!["a DESC", "b"]],
vec![],
&schema,
)
.with_expected_sort_and_const_exprs(
// Union orderings: [[a, b]] (can fill in the a/b with constants)
vec![vec!["a DESC", "b"]],
vec![],
)
.run()
}

#[test]
fn test_union_equivalence_properties_constants_fill_gaps_non_symmetric() {
let schema = create_test_schema().unwrap();
UnionEquivalenceTest::new(&schema)
.with_child_sort_and_const_exprs(
// First child orderings: [a ASC, c ASC], const [b]
vec![vec!["a", "c"]],
vec!["b"],
&schema,
)
.with_child_sort_and_const_exprs(
// Second child orderings: [b ASC, c ASC], const [a]
vec![vec!["b DESC", "c"]],
vec!["a"],
&schema,
)
.with_expected_sort_and_const_exprs(
// Union orderings: [
// [a ASC, b ASC, c ASC],
// [b ASC, a ASC, c ASC]
// ], const []
vec![vec!["a", "b DESC", "c"], vec!["b DESC", "a", "c"]],
vec![],
)
.run()
}

#[test]
fn test_union_equivalence_properties_constants_gap_fill_symmetric() {
let schema = create_test_schema().unwrap();
UnionEquivalenceTest::new(&schema)
.with_child_sort_and_const_exprs(
// First child: [a ASC, b ASC, d ASC], const [c]
vec![vec!["a", "b", "d"]],
vec!["c"],
&schema,
)
.with_child_sort_and_const_exprs(
// Second child: [a ASC, c ASC, d ASC], const [b]
vec![vec!["a", "c", "d"]],
vec!["b"],
&schema,
)
.with_expected_sort_and_const_exprs(
// Union orderings:
// [a, b, c, d]
// [a, c, b, d]
vec![vec!["a", "c", "b", "d"], vec!["a", "b", "c", "d"]],
vec![],
)
.run()
}

#[test]
fn test_union_equivalence_properties_constants_gap_fill_and_common() {
let schema = create_test_schema().unwrap();
Expand All @@ -3705,34 +3599,6 @@ mod tests {
.run()
}

#[test]
fn test_union_equivalence_properties_constants_middle_desc() {
let schema = create_test_schema().unwrap();
UnionEquivalenceTest::new(&schema)
.with_child_sort_and_const_exprs(
// NB `b DESC` in the first child
//
// First child: [a ASC, b DESC, d ASC], const [c]
vec![vec!["a", "b DESC", "d"]],
vec!["c"],
&schema,
)
.with_child_sort_and_const_exprs(
// Second child: [a ASC, c ASC, d ASC], const [b]
vec![vec!["a", "c", "d"]],
vec!["b"],
&schema,
)
.with_expected_sort_and_const_exprs(
// Union orderings:
// [a, b, d] (c constant)
// [a, c, d] (b constant)
vec![vec!["a", "c", "b DESC", "d"], vec!["a", "b DESC", "c", "d"]],
vec![],
)
.run()
}

// TODO tests with multiple constants

#[derive(Debug)]
Expand Down
10 changes: 10 additions & 0 deletions datafusion/physical-optimizer/src/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};

use crate::PhysicalOptimizerRule;
Expand Down Expand Up @@ -135,6 +137,14 @@ pub fn check_plan_sanity(
plan.required_input_ordering(),
plan.required_input_distribution(),
) {
// TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492
if child.as_any().downcast_ref::<UnionExec>().is_some() {
continue;
}
if child.as_any().downcast_ref::<SortExec>().is_some() {
continue;
}

let child_eq_props = child.equivalence_properties();
if let Some(sort_req) = sort_req {
if !child_eq_props.ordering_satisfy_requirement(&sort_req) {
Expand Down
91 changes: 27 additions & 64 deletions datafusion/sqllogictest/test_files/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,9 @@ physical_plan
# Clean up after the test
########

statement ok
drop table t

statement ok
drop table t1;

Expand Down Expand Up @@ -778,76 +781,36 @@ select make_array(make_array(1)) x UNION ALL SELECT make_array(arrow_cast(make_a
[[-1]]
[[1]]

###
# Test for https://github.com/apache/datafusion/issues/11492
###

# Input data is
# a,b,c
# 1,2,3

statement ok
CREATE EXTERNAL TABLE aggregate_test_100 (
c1 VARCHAR NOT NULL,
c2 TINYINT NOT NULL,
c3 SMALLINT NOT NULL,
c4 SMALLINT,
c5 INT,
c6 BIGINT NOT NULL,
c7 SMALLINT NOT NULL,
c8 INT NOT NULL,
c9 BIGINT UNSIGNED NOT NULL,
c10 VARCHAR NOT NULL,
c11 FLOAT NOT NULL,
c12 DOUBLE NOT NULL,
c13 VARCHAR NOT NULL
CREATE EXTERNAL TABLE t (
a INT,
b INT,
c INT
)
STORED AS CSV
LOCATION '../../testing/data/csv/aggregate_test_100.csv'
LOCATION '../core/tests/data/example.csv'
WITH ORDER (a ASC)
OPTIONS ('format.has_header' 'true');

statement ok
set datafusion.execution.batch_size = 2;
query T
SELECT (SELECT a from t ORDER BY a) UNION ALL (SELECT 'bar' as a from t) ORDER BY a;
----
1
bar

# Constant value tracking across union
query TT
explain
SELECT * FROM(
(
SELECT * FROM aggregate_test_100 WHERE c1='a'
)
UNION ALL
(
SELECT * FROM aggregate_test_100 WHERE c1='a'
))
ORDER BY c1
query I
SELECT (SELECT a from t ORDER BY a) UNION ALL (SELECT NULL as a from t) ORDER BY a;
----
logical_plan
01)Sort: aggregate_test_100.c1 ASC NULLS LAST
02)--Union
03)----Filter: aggregate_test_100.c1 = Utf8("a")
04)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")]
05)----Filter: aggregate_test_100.c1 = Utf8("a")
06)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")]
physical_plan
01)CoalescePartitionsExec
02)--UnionExec
03)----CoalesceBatchesExec: target_batch_size=2
04)------FilterExec: c1@0 = a
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true
07)----CoalesceBatchesExec: target_batch_size=2
08)------FilterExec: c1@0 = a
09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
10)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true
1
NULL

# Clean up after the test
statement ok
drop table aggregate_test_100;

# test for https://github.com/apache/datafusion/issues/14352
query TB rowsort
SELECT
a,
a IS NOT NULL
FROM (
-- second column, even though it's not selected, was necessary to reproduce the bug linked above
SELECT 'foo' AS a, 3 AS b
UNION ALL
SELECT NULL AS a, 4 AS b
)
----
NULL false
foo true
drop table t
Loading