Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix output_partitioning(), output_ordering(), equivalence_properties() in WindowAggExec, shift the Column indexes #4455

Merged
merged 2 commits into from
Dec 2, 2022

Conversation

mingmwang
Copy link
Contributor

Which issue does this PR close?

Closes #4438.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

…) in WindowAggExec, shift the Column indexes
@github-actions github-actions bot added the core Core DataFusion crate label Dec 1, 2022
@mingmwang
Copy link
Contributor Author

@ozankabak @metesynnada @alamb

Please help to take a look.

@mingmwang
Copy link
Contributor Author

@mustafasrepo

Copy link
Contributor

@metesynnada metesynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solves the column indexing issue, so the unnecessary sorting goes away. Thanks for fixing that.

However, we still can not rely on the API while we are inside create_initial_plan(). Shouldn't we expect output_ordering() and output_partitioning() to work correctly inside the create_initial_plan()? It was before, but it no longer does.

Not being able to rely on this API at that stage prevents us from implementing certain operators that depend on this information (i.e. ordering and partitioning).

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for fixing this bug! I agree that this helps with @mustafasrepo's example, but I also agree with @metesynnada that it does not change the fact that the output_ordering and output_partitioning APIs do not work inside create_initial_plan (do not work in the specific sense that they may return non-final values, which can be inconsistent with other APIs like required_output_ordering).

So things boil down to whether we expect them to work (or not) while inside create_initial_plan. @mingmwang, what do you think? Should we expect them to return reliable values (e.g. consistent with required_input_ordering() API), or should we not expect this behavior?

Comment on lines 89 to 98
let new_expr = e.expr.clone().transform_down(&|e| match e
.as_any()
.downcast_ref::<Column>()
{
Some(col) => Ok(Some(Arc::new(Column::new(
col.name(),
window_expr_len + col.index(),
)))),
None => Ok(None),
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not very important, but it may improve readability and be a little more idiomatic:

  let new_expr = e.expr.clone().transform_down(&|e| {
      Ok(e.as_any().downcast_ref::<Column>().map(|col| {
          Arc::new(Column::new(
              col.name(),
              window_expr_len + col.index(),
          ))
              as Arc<dyn PhysicalExpr>
      }))
  });

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will follow your advice and change it.

let new_exprs = exprs
.into_iter()
.map(|expr| {
expr.transform_down(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the same vein, it may be a good idea to use:

expr.transform_down(&|e| {
    Ok(e.as_any().downcast_ref::<Column>().map(|col| {
        Arc::new(Column::new(
            col.name(),
            window_expr_len + col.index(),
        ))
            as Arc<dyn PhysicalExpr>
    }))
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for fixing this bug! I agree that this helps with @mustafasrepo's example, but I also agree with @metesynnada that it does not change the fact that the output_ordering and output_partitioning APIs do not work inside create_initial_plan (do not work in the specific sense that they may return non-final values, which can be inconsistent with other APIs like required_output_ordering).

So things boil down to whether we expect them to work (or not) while inside create_initial_plan. @mingmwang, what do you think? Should we expect them to return reliable values (e.g. consistent with required_input_ordering() API), or should we not expect this behavior?

Based on the current design, we can not expect them to return a fixed reliable values. Because the physical plan
tree will be adjusted and changed very dynamically, so the real output partitioning and output ordering is not fixed values until pass the Enforcement rule.

For your use cases, I think if your have specific physical operators that have requirements on the ordering and partitioning, just specify the requirements through the method required_input_ordering() and required_input_distribution(). And If you have rules need to co-operator with the output partitioning and output ordering, need to add the rules after the Enforcement rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do not mind, you can also show me your code, let's take a look and figure out how to make them work and be consistent with the current design.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying, much appreciated. @mustafasrepo will provide a pointer to our relevant code so that you can comment.

To give you a little more context beforehand: We are implementing a pipelineable version of WindowAggExec, but whether we can use this more efficient version in the physical plan depends on the output ordering of the previous operator. My understanding of your suggestion is that we should do this within the context of a new rule that replaces the pipeline-breaking operator with the new pipelineable operator.

Copy link
Contributor

@mustafasrepo mustafasrepo Dec 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do not mind, you can also show me your code, let's take a look and figure out how to make them work and be consistent with the current design.

Sure, we do check for the input ordering here (inside create_initial_plan)

https://github.com/apache/arrow-datafusion/blob/617f39e2d52915ee9cd91f9e19e44f46f567d303/datafusion/core/src/physical_plan/planner.rs#L573-L594

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, if I understand correctly, it is very similar to choose whether to use SortMergeJoin or HashJoin as the physical join implementation. If the input plan can produce required ordering, then prefer SortMergeJoin. Such kind of optimization is usually done in a rule.

You can also implement this rule in a more cost based matter. For each WindowAggExec, you can either choose to
use WindowAggExec or your pipelined version, pipeline version have requirements on output ordering, the Enforcement rule will add SortExec if the input plan ordering can not satisfy. At the end, count the number of SortExecs in the tree, the tree plan which has the least number of the SortExecs is the best plan. Or you can also tolerant with 1 or 2 additional SortExec and still prefer the pipelined version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! It makes sense to go through this route -- this PR provides the necessary fix for us to implement such a rule, so our original PR is now unnecessary. We will close it and utilize this fix in a rule.

@alamb
Copy link
Contributor

alamb commented Dec 2, 2022

LGTM -- thank you @mingmwang @ozankabak and @mustafasrepo

let logical_plan = state.optimize(&plan)?;
let physical_plan = state.create_physical_plan(&logical_plan).await?;
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
// Only 1 SortExec was added
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb merged commit 40e44a6 into apache:master Dec 2, 2022
@ursabot
Copy link

ursabot commented Dec 2, 2022

Benchmark runs are scheduled for baseline = 9bee14e and contender = 40e44a6. 40e44a6 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Fix output_ordering of WindowAggExec
6 participants