Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37915][SQL] Combine unions if there is a project between them #35214

Closed
wants to merge 7 commits into from
Closed

[SPARK-37915][SQL] Combine unions if there is a project between them #35214

wants to merge 7 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Jan 15, 2022

What changes were proposed in this pull request?

This pr makes CombineUnions combine unions if there is a project between them. For example:

spark.range(1).selectExpr("CAST(id AS decimal(18, 1)) AS id").write.saveAsTable("t1")
spark.range(2).selectExpr("CAST(id AS decimal(18, 2)) AS id").write.saveAsTable("t2")
spark.range(3).selectExpr("CAST(id AS decimal(18, 3)) AS id").write.saveAsTable("t3")
spark.range(4).selectExpr("CAST(id AS decimal(18, 4)) AS id").write.saveAsTable("t4")
spark.range(5).selectExpr("CAST(id AS decimal(18, 5)) AS id").write.saveAsTable("t5")

spark.sql("SELECT id FROM t1 UNION SELECT id FROM t2 UNION SELECT id FROM t3 UNION SELECT id FROM t4 UNION SELECT id FROM t5").explain(true)

Before this pr:

== Optimized Logical Plan ==
Aggregate [id#36], [id#36]
+- Union false, false
   :- Aggregate [id#34], [cast(id#34 as decimal(22,5)) AS id#36]
   :  +- Union false, false
   :     :- Aggregate [id#32], [cast(id#32 as decimal(21,4)) AS id#34]
   :     :  +- Union false, false
   :     :     :- Aggregate [id#30], [cast(id#30 as decimal(20,3)) AS id#32]
   :     :     :  +- Union false, false
   :     :     :     :- Project [cast(id#25 as decimal(19,2)) AS id#30]
   :     :     :     :  +- Relation default.t1[id#25] parquet
   :     :     :     +- Project [cast(id#26 as decimal(19,2)) AS id#31]
   :     :     :        +- Relation default.t2[id#26] parquet
   :     :     +- Project [cast(id#27 as decimal(20,3)) AS id#33]
   :     :        +- Relation default.t3[id#27] parquet
   :     +- Project [cast(id#28 as decimal(21,4)) AS id#35]
   :        +- Relation default.t4[id#28] parquet
   +- Project [cast(id#29 as decimal(22,5)) AS id#37]
      +- Relation default.t5[id#29] parquet

After this pr:

== Optimized Logical Plan ==
Aggregate [id#36], [id#36]
+- Union false, false
   :- Project [cast(id#25 as decimal(22,5)) AS id#36]
   :  +- Relation default.t1[id#25] parquet
   :- Project [cast(id#26 as decimal(22,5)) AS id#46]
   :  +- Relation default.t2[id#26] parquet
   :- Project [cast(id#27 as decimal(22,5)) AS id#45]
   :  +- Relation default.t3[id#27] parquet
   :- Project [cast(id#28 as decimal(22,5)) AS id#44]
   :  +- Relation default.t4[id#28] parquet
   +- Project [cast(id#29 as decimal(22,5)) AS id#37]
      +- Relation default.t5[id#29] parquet

Why are the changes needed?

Improve query performance by reduce shuffles.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test.

@github-actions github-actions bot added the SQL label Jan 15, 2022
@wangyum
Copy link
Member Author

wangyum commented Jan 17, 2022

cc @cloud-fan @viirya

@wangyum wangyum changed the title [SPARK-37915][SQL] Push down deterministic projection through SQL UNION and combine them [SPARK-37915][SQL] Combine unions if there is a project between them Jan 18, 2022
Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously cannot PushProjectionThroughUnion + CombineUnions achieve same effect that combines unions with project between them? i.e.,

Original plan:

Project
  - Union
    - Union
      - Child1 
    - Union
      - Child2
    - Union
      - Child3 
    - ...

After PushProjectionThroughUnion:

Union
  - Project
    - Union
      - Child1 
  - Project
    - Union
      - Child2
  - Project
    - Union
      - Child3 
  - ...

Next iteration. After PushProjectionThroughUnion:

Union
  - Union
    - Project
      - Child1 
  - Union
    - Project
      - Child2
  - Union
    - Project
      - Child3 
  - ...

After CombineUnions:

Union
  - Project
    - Child1 
  - Project
    - Child2
  - Project
    - Child3 
  - ...

@wangyum
Copy link
Member Author

wangyum commented Jan 19, 2022

Another attempt combine unions after ReplaceDistinctWithAggregate: #35249

@cloud-fan cloud-fan closed this in ac2b0df Jan 25, 2022
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@wangyum wangyum deleted the SPARK-37915 branch January 25, 2022 11:16
Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm too

@tanelk
Copy link
Contributor

tanelk commented Oct 5, 2022

Hi,

I found a regression in spark 3.3.0 compared to spark 3.2.0. Git bisect lead me to this PR. It looks like this PR is not the direct cause, but it instead revealed an existing bug.

Minimal testcase:

  test("SPARK-40664: Cache with join, union and renames") {
    val df1 = Seq("1", "2").toDF("a")
    val df2 = Seq("2", "3").toDF("a")
      .withColumn("b", lit("b"))

    val joined = df1.join(broadcast(df2), "a")
      // Messing around the column can cause some problems with cache manager
      .withColumn("tmp_b", $"b")
      .drop("b")
      .withColumnRenamed("tmp_b", "b")
      .cache()

    val unioned = joined.union(joined)

    assertCached(unioned, 2)
  }

Error:

[info]   ArrayBuffer() had size 0 instead of expected size 2 Expected query to contain 2, but it actually had 0
[info]   Union false, false
[info]   :- Project [a#4, b#12 AS b#23]
[info]   :  +- Join Inner, (a#4 = a#10)
[info]   :     :- Project [value#1 AS a#4]
[info]   :     :  +- LocalRelation [value#1]
[info]   :     +- ResolvedHint (strategy=broadcast)
[info]   :        +- Project [a#10, b AS b#12]
[info]   :           +- Project [value#7 AS a#10]
[info]   :              +- LocalRelation [value#7]
[info]   +- Project [a#4 AS a#38, b#23 AS b#39]
[info]      +- Project [a#4, b#12 AS b#23]
[info]         +- Join Inner, (a#4 = a#10)
[info]            :- Project [value#36 AS a#4]
[info]            :  +- LocalRelation [value#36]
[info]            +- ResolvedHint (strategy=broadcast)
[info]               +- Project [a#10, b AS b#12]
[info]                  +- Project [value#37 AS a#10]
[info]                     +- LocalRelation [value#37] (QueryTest.scala:198)

As you see, the InMemoryRelation is missing, but I have no idea where to start looking for it.

I created an issue for it https://issues.apache.org/jira/browse/SPARK-40664

@wangyum
Copy link
Member Author

wangyum commented Oct 5, 2022

@tanelk This is a known issue, please see comment:

// This breaks caching, but it's usually ok because it addresses a very specific use case:
// using union to union many files or partitions.
CombineUnions(Union(logicalPlan, other.logicalPlan))

wangyum added a commit that referenced this pull request May 26, 2023
…between them (#855)

### What changes were proposed in this pull request?

This pr makes `CombineUnions` combine unions if there is a project between them. For example:
```scala
spark.range(1).selectExpr("CAST(id AS decimal(18, 1)) AS id").write.saveAsTable("t1")
spark.range(2).selectExpr("CAST(id AS decimal(18, 2)) AS id").write.saveAsTable("t2")
spark.range(3).selectExpr("CAST(id AS decimal(18, 3)) AS id").write.saveAsTable("t3")
spark.range(4).selectExpr("CAST(id AS decimal(18, 4)) AS id").write.saveAsTable("t4")
spark.range(5).selectExpr("CAST(id AS decimal(18, 5)) AS id").write.saveAsTable("t5")

spark.sql("SELECT id FROM t1 UNION SELECT id FROM t2 UNION SELECT id FROM t3 UNION SELECT id FROM t4 UNION SELECT id FROM t5").explain(true)
```

Before this pr:
```
== Optimized Logical Plan ==
Aggregate [id#36], [id#36]
+- Union false, false
   :- Aggregate [id#34], [cast(id#34 as decimal(22,5)) AS id#36]
   :  +- Union false, false
   :     :- Aggregate [id#32], [cast(id#32 as decimal(21,4)) AS id#34]
   :     :  +- Union false, false
   :     :     :- Aggregate [id#30], [cast(id#30 as decimal(20,3)) AS id#32]
   :     :     :  +- Union false, false
   :     :     :     :- Project [cast(id#25 as decimal(19,2)) AS id#30]
   :     :     :     :  +- Relation default.t1[id#25] parquet
   :     :     :     +- Project [cast(id#26 as decimal(19,2)) AS id#31]
   :     :     :        +- Relation default.t2[id#26] parquet
   :     :     +- Project [cast(id#27 as decimal(20,3)) AS id#33]
   :     :        +- Relation default.t3[id#27] parquet
   :     +- Project [cast(id#28 as decimal(21,4)) AS id#35]
   :        +- Relation default.t4[id#28] parquet
   +- Project [cast(id#29 as decimal(22,5)) AS id#37]
      +- Relation default.t5[id#29] parquet
```

After this pr:
```
== Optimized Logical Plan ==
Aggregate [id#36], [id#36]
+- Union false, false
   :- Project [cast(id#25 as decimal(22,5)) AS id#36]
   :  +- Relation default.t1[id#25] parquet
   :- Project [cast(id#26 as decimal(22,5)) AS id#46]
   :  +- Relation default.t2[id#26] parquet
   :- Project [cast(id#27 as decimal(22,5)) AS id#45]
   :  +- Relation default.t3[id#27] parquet
   :- Project [cast(id#28 as decimal(22,5)) AS id#44]
   :  +- Relation default.t4[id#28] parquet
   +- Project [cast(id#29 as decimal(22,5)) AS id#37]
      +- Relation default.t5[id#29] parquet
```

### Why are the changes needed?

Improve query performance by reduce shuffles.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #35214 from wangyum/SPARK-37915.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>

(cherry picked from commit ac2b0df)

* [SPARK-37915][SQL] Combine unions if there is a project between them
@cloud-fan
Copy link
Contributor

cloud-fan commented Aug 3, 2023

df.union can break caching by design, but it was to optimize a special df pattern df1.union(df2).union(df3).union.... This PR does make it worse as df.union can break caching more. I'm working on a fix to make df.union targeting the special df pattern it was designed to optimize.

UPDATE: #42315

cloud-fan added a commit that referenced this pull request Aug 4, 2023
### What changes were proposed in this pull request?

We have a long-standing tricky optimization in `Dataset.union`, which invokes the optimizer rule `CombineUnions` to pre-optimize the analyzed plan. This is to avoid too large analyzed plan for a specific dataframe query pattern `df1.union(df2).union(df3).union...`.

This tricky optimization is designed to break dataframe caching, but we thought it was fine as people usually won't cache the intermediate dataframe in a union chain. However, `CombineUnions` gets improved from time to time (e.g. #35214) and now it can optimize a wide range of Union patterns. Now it's possible that people union two dataframe, do something with `select`, and cache it. Then the dataframe is unioned again with other dataframes and people expect the df cache to work. However the cache won't work due to the tricky optimization in `Dataset.union`.

This PR updates `Dataset.union` to only combine adjacent Unions to match the original purpose.

### Why are the changes needed?

Fix perf regression due to breaking df caching

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

Closes #42315 from cloud-fan/union.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan added a commit that referenced this pull request Aug 4, 2023
### What changes were proposed in this pull request?

We have a long-standing tricky optimization in `Dataset.union`, which invokes the optimizer rule `CombineUnions` to pre-optimize the analyzed plan. This is to avoid too large analyzed plan for a specific dataframe query pattern `df1.union(df2).union(df3).union...`.

This tricky optimization is designed to break dataframe caching, but we thought it was fine as people usually won't cache the intermediate dataframe in a union chain. However, `CombineUnions` gets improved from time to time (e.g. #35214) and now it can optimize a wide range of Union patterns. Now it's possible that people union two dataframe, do something with `select`, and cache it. Then the dataframe is unioned again with other dataframes and people expect the df cache to work. However the cache won't work due to the tricky optimization in `Dataset.union`.

This PR updates `Dataset.union` to only combine adjacent Unions to match the original purpose.

### Why are the changes needed?

Fix perf regression due to breaking df caching

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

Closes #42315 from cloud-fan/union.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit ce1fe57)
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan added a commit that referenced this pull request Aug 4, 2023
We have a long-standing tricky optimization in `Dataset.union`, which invokes the optimizer rule `CombineUnions` to pre-optimize the analyzed plan. This is to avoid too large analyzed plan for a specific dataframe query pattern `df1.union(df2).union(df3).union...`.

This tricky optimization is designed to break dataframe caching, but we thought it was fine as people usually won't cache the intermediate dataframe in a union chain. However, `CombineUnions` gets improved from time to time (e.g. #35214) and now it can optimize a wide range of Union patterns. Now it's possible that people union two dataframe, do something with `select`, and cache it. Then the dataframe is unioned again with other dataframes and people expect the df cache to work. However the cache won't work due to the tricky optimization in `Dataset.union`.

This PR updates `Dataset.union` to only combine adjacent Unions to match the original purpose.

Fix perf regression due to breaking df caching

no

new test

Closes #42315 from cloud-fan/union.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit ce1fe57)
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan added a commit that referenced this pull request Aug 4, 2023
We have a long-standing tricky optimization in `Dataset.union`, which invokes the optimizer rule `CombineUnions` to pre-optimize the analyzed plan. This is to avoid too large analyzed plan for a specific dataframe query pattern `df1.union(df2).union(df3).union...`.

This tricky optimization is designed to break dataframe caching, but we thought it was fine as people usually won't cache the intermediate dataframe in a union chain. However, `CombineUnions` gets improved from time to time (e.g. #35214) and now it can optimize a wide range of Union patterns. Now it's possible that people union two dataframe, do something with `select`, and cache it. Then the dataframe is unioned again with other dataframes and people expect the df cache to work. However the cache won't work due to the tricky optimization in `Dataset.union`.

This PR updates `Dataset.union` to only combine adjacent Unions to match the original purpose.

Fix perf regression due to breaking df caching

no

new test

Closes #42315 from cloud-fan/union.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit ce1fe57)
Signed-off-by: Wenchen Fan <[email protected]>
viirya pushed a commit to viirya/spark-1 that referenced this pull request Oct 19, 2023
We have a long-standing tricky optimization in `Dataset.union`, which invokes the optimizer rule `CombineUnions` to pre-optimize the analyzed plan. This is to avoid too large analyzed plan for a specific dataframe query pattern `df1.union(df2).union(df3).union...`.

This tricky optimization is designed to break dataframe caching, but we thought it was fine as people usually won't cache the intermediate dataframe in a union chain. However, `CombineUnions` gets improved from time to time (e.g. apache#35214) and now it can optimize a wide range of Union patterns. Now it's possible that people union two dataframe, do something with `select`, and cache it. Then the dataframe is unioned again with other dataframes and people expect the df cache to work. However the cache won't work due to the tricky optimization in `Dataset.union`.

This PR updates `Dataset.union` to only combine adjacent Unions to match the original purpose.

Fix perf regression due to breaking df caching

no

new test

Closes apache#42315 from cloud-fan/union.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit ce1fe57)
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit b888ea8)
Signed-off-by: Dongjoon Hyun <[email protected]>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
### What changes were proposed in this pull request?

We have a long-standing tricky optimization in `Dataset.union`, which invokes the optimizer rule `CombineUnions` to pre-optimize the analyzed plan. This is to avoid too large analyzed plan for a specific dataframe query pattern `df1.union(df2).union(df3).union...`.

This tricky optimization is designed to break dataframe caching, but we thought it was fine as people usually won't cache the intermediate dataframe in a union chain. However, `CombineUnions` gets improved from time to time (e.g. apache#35214) and now it can optimize a wide range of Union patterns. Now it's possible that people union two dataframe, do something with `select`, and cache it. Then the dataframe is unioned again with other dataframes and people expect the df cache to work. However the cache won't work due to the tricky optimization in `Dataset.union`.

This PR updates `Dataset.union` to only combine adjacent Unions to match the original purpose.

### Why are the changes needed?

Fix perf regression due to breaking df caching

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

Closes apache#42315 from cloud-fan/union.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants