Skip to content

Commit 6aeb2a5

Browse files
belieferchenzhx
authored andcommitted
[SPARK-37527][SQL] Compile COVAR_POP, COVAR_SAMP and CORR in H2Dialet
### What changes were proposed in this pull request? apache#35101 translate `COVAR_POP`, `COVAR_SAMP` and `CORR`, but the H2 lower version cannot support them. After apache#35013, we can compile the three aggregate functions in `H2Dialet` now. ### Why are the changes needed? Supplement the implement of `H2Dialet`. ### Does this PR introduce _any_ user-facing change? 'Yes'. Spark could complete push-down `COVAR_POP`, `COVAR_SAMP` and `CORR` into H2. ### How was this patch tested? Test updated. Closes apache#35145 from beliefer/SPARK-37527_followup. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 470371c commit 6aeb2a5

File tree

2 files changed

+20
-4
lines changed

2 files changed

+20
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala

+12
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,18 @@ private object H2Dialect extends JdbcDialect {
4747
assert(f.inputs().length == 1)
4848
val distinct = if (f.isDistinct) "DISTINCT " else ""
4949
Some(s"STDDEV_SAMP($distinct${f.inputs().head})")
50+
case f: GeneralAggregateFunc if f.name() == "COVAR_POP" =>
51+
assert(f.inputs().length == 2)
52+
val distinct = if (f.isDistinct) "DISTINCT " else ""
53+
Some(s"COVAR_POP($distinct${f.inputs().head}, ${f.inputs().last})")
54+
case f: GeneralAggregateFunc if f.name() == "COVAR_SAMP" =>
55+
assert(f.inputs().length == 2)
56+
val distinct = if (f.isDistinct) "DISTINCT " else ""
57+
Some(s"COVAR_SAMP($distinct${f.inputs().head}, ${f.inputs().last})")
58+
case f: GeneralAggregateFunc if f.name() == "CORR" =>
59+
assert(f.inputs().length == 2)
60+
val distinct = if (f.isDistinct) "DISTINCT " else ""
61+
Some(s"CORR($distinct${f.inputs().head}, ${f.inputs().last})")
5062
case _ => None
5163
}
5264
)

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala

+8-4
Original file line numberDiff line numberDiff line change
@@ -749,11 +749,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
749749
val df = sql("select COVAR_POP(bonus, bonus), COVAR_SAMP(bonus, bonus)" +
750750
" FROM h2.test.employee where dept > 0 group by DePt")
751751
checkFiltersRemoved(df)
752-
checkAggregateRemoved(df, false)
752+
checkAggregateRemoved(df)
753753
df.queryExecution.optimizedPlan.collect {
754754
case _: DataSourceV2ScanRelation =>
755755
val expected_plan_fragment =
756-
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)]"
756+
"PushedAggregates: [COVAR_POP(BONUS, BONUS), COVAR_SAMP(BONUS, BONUS)], " +
757+
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " +
758+
"PushedGroupByColumns: [DEPT]"
757759
checkKeywordsExistsInExplain(df, expected_plan_fragment)
758760
}
759761
checkAnswer(df, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, null)))
@@ -763,11 +765,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
763765
val df = sql("select CORR(bonus, bonus) FROM h2.test.employee where dept > 0" +
764766
" group by DePt")
765767
checkFiltersRemoved(df)
766-
checkAggregateRemoved(df, false)
768+
checkAggregateRemoved(df)
767769
df.queryExecution.optimizedPlan.collect {
768770
case _: DataSourceV2ScanRelation =>
769771
val expected_plan_fragment =
770-
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)]"
772+
"PushedAggregates: [CORR(BONUS, BONUS)], " +
773+
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " +
774+
"PushedGroupByColumns: [DEPT]"
771775
checkKeywordsExistsInExplain(df, expected_plan_fragment)
772776
}
773777
checkAnswer(df, Seq(Row(1d), Row(1d), Row(null)))

0 commit comments

Comments
 (0)