Skip to content

Commit f9b54fb

Browse files
belieferchenzhx
authored andcommitted
[SPARK-37867][SQL][FOLLOWUP] Compile aggregate functions for build-in DB2 dialect
### What changes were proposed in this pull request? This PR follows up apache#35166. The previously referenced DB2 documentation is incorrect, resulting in the lack of compile that supports some aggregate functions. The correct documentation is https://www.ibm.com/docs/en/db2/11.5?topic=af-regression-functions-regr-avgx-regr-avgy-regr-count ### Why are the changes needed? Make build-in DB2 dialect support complete aggregate push-down more aggregate functions. ### Does this PR introduce _any_ user-facing change? 'Yes'. Users could use complete aggregate push-down with build-in DB2 dialect. ### How was this patch tested? New tests. Closes apache#35520 from beliefer/SPARK-37867_followup. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 4c2380b commit f9b54fb

File tree

11 files changed

+123
-83
lines changed

11 files changed

+123
-83
lines changed

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala

+9
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,13 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {
9797
override def caseConvert(tableName: String): String = tableName.toUpperCase(Locale.ROOT)
9898

9999
testVarPop()
100+
testVarPop(true)
101+
testVarSamp()
102+
testVarSamp(true)
103+
testStddevPop()
104+
testStddevPop(true)
105+
testStddevSamp()
106+
testStddevSamp(true)
107+
testCovarPop()
108+
testCovarSamp()
100109
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala

+4
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,11 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JD
9797
}
9898

9999
testVarPop()
100+
testVarPop(true)
100101
testVarSamp()
102+
testVarSamp(true)
101103
testStddevPop()
104+
testStddevPop(true)
102105
testStddevSamp()
106+
testStddevSamp(true)
103107
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala

+7
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,17 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
9191
override def indexOptions: String = "FILLFACTOR=70"
9292

9393
testVarPop()
94+
testVarPop(true)
9495
testVarSamp()
96+
testVarSamp(true)
9597
testStddevPop()
98+
testStddevPop(true)
9699
testStddevSamp()
100+
testStddevSamp(true)
97101
testCovarPop()
102+
testCovarPop(true)
98103
testCovarSamp()
104+
testCovarSamp(true)
99105
testCorr()
106+
testCorr(true)
100107
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala

+35-28
Original file line numberDiff line numberDiff line change
@@ -386,10 +386,11 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
386386

387387
protected def caseConvert(tableName: String): String = tableName
388388

389-
protected def testVarPop(): Unit = {
390-
test(s"scan with aggregate push-down: VAR_POP") {
391-
val df = sql(s"SELECT VAR_POP(bonus) FROM $catalogAndNamespace.${caseConvert("employee")}" +
392-
" WHERE dept > 0 GROUP BY dept ORDER BY dept")
389+
protected def testVarPop(isDistinct: Boolean = false): Unit = {
390+
val distinct = if (isDistinct) "DISTINCT " else ""
391+
test(s"scan with aggregate push-down: VAR_POP with distinct: $isDistinct") {
392+
val df = sql(s"SELECT VAR_POP(${distinct}bonus) FROM $catalogAndNamespace." +
393+
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
393394
checkFilterPushed(df)
394395
checkAggregateRemoved(df)
395396
checkAggregatePushed(df, "VAR_POP")
@@ -401,11 +402,12 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
401402
}
402403
}
403404

404-
protected def testVarSamp(): Unit = {
405-
test(s"scan with aggregate push-down: VAR_SAMP") {
405+
protected def testVarSamp(isDistinct: Boolean = false): Unit = {
406+
val distinct = if (isDistinct) "DISTINCT " else ""
407+
test(s"scan with aggregate push-down: VAR_SAMP with distinct: $isDistinct") {
406408
val df = sql(
407-
s"SELECT VAR_SAMP(bonus) FROM $catalogAndNamespace.${caseConvert("employee")}" +
408-
" WHERE dept > 0 GROUP BY dept ORDER BY dept")
409+
s"SELECT VAR_SAMP(${distinct}bonus) FROM $catalogAndNamespace." +
410+
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
409411
checkFilterPushed(df)
410412
checkAggregateRemoved(df)
411413
checkAggregatePushed(df, "VAR_SAMP")
@@ -417,11 +419,12 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
417419
}
418420
}
419421

420-
protected def testStddevPop(): Unit = {
421-
test("scan with aggregate push-down: STDDEV_POP") {
422+
protected def testStddevPop(isDistinct: Boolean = false): Unit = {
423+
val distinct = if (isDistinct) "DISTINCT " else ""
424+
test(s"scan with aggregate push-down: STDDEV_POP with distinct: $isDistinct") {
422425
val df = sql(
423-
s"SELECT STDDEV_POP(bonus) FROM $catalogAndNamespace.${caseConvert("employee")}" +
424-
" WHERE dept > 0 GROUP BY dept ORDER BY dept")
426+
s"SELECT STDDEV_POP(${distinct}bonus) FROM $catalogAndNamespace." +
427+
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
425428
checkFilterPushed(df)
426429
checkAggregateRemoved(df)
427430
checkAggregatePushed(df, "STDDEV_POP")
@@ -433,11 +436,12 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
433436
}
434437
}
435438

436-
protected def testStddevSamp(): Unit = {
437-
test("scan with aggregate push-down: STDDEV_SAMP") {
439+
protected def testStddevSamp(isDistinct: Boolean = false): Unit = {
440+
val distinct = if (isDistinct) "DISTINCT " else ""
441+
test(s"scan with aggregate push-down: STDDEV_SAMP with distinct: $isDistinct") {
438442
val df = sql(
439-
s"SELECT STDDEV_SAMP(bonus) FROM $catalogAndNamespace.${caseConvert("employee")}" +
440-
" WHERE dept > 0 GROUP BY dept ORDER BY dept")
443+
s"SELECT STDDEV_SAMP(${distinct}bonus) FROM $catalogAndNamespace." +
444+
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
441445
checkFilterPushed(df)
442446
checkAggregateRemoved(df)
443447
checkAggregatePushed(df, "STDDEV_SAMP")
@@ -449,11 +453,12 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
449453
}
450454
}
451455

452-
protected def testCovarPop(): Unit = {
453-
test("scan with aggregate push-down: COVAR_POP") {
456+
protected def testCovarPop(isDistinct: Boolean = false): Unit = {
457+
val distinct = if (isDistinct) "DISTINCT " else ""
458+
test(s"scan with aggregate push-down: COVAR_POP with distinct: $isDistinct") {
454459
val df = sql(
455-
s"SELECT COVAR_POP(bonus, bonus) FROM $catalogAndNamespace.${caseConvert("employee")}" +
456-
" WHERE dept > 0 GROUP BY dept ORDER BY dept")
460+
s"SELECT COVAR_POP(${distinct}bonus, bonus) FROM $catalogAndNamespace." +
461+
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
457462
checkFilterPushed(df)
458463
checkAggregateRemoved(df)
459464
checkAggregatePushed(df, "COVAR_POP")
@@ -465,11 +470,12 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
465470
}
466471
}
467472

468-
protected def testCovarSamp(): Unit = {
469-
test("scan with aggregate push-down: COVAR_SAMP") {
473+
protected def testCovarSamp(isDistinct: Boolean = false): Unit = {
474+
val distinct = if (isDistinct) "DISTINCT " else ""
475+
test(s"scan with aggregate push-down: COVAR_SAMP with distinct: $isDistinct") {
470476
val df = sql(
471-
s"SELECT COVAR_SAMP(bonus, bonus) FROM $catalogAndNamespace.${caseConvert("employee")}" +
472-
" WHERE dept > 0 GROUP BY dept ORDER BY dept")
477+
s"SELECT COVAR_SAMP(${distinct}bonus, bonus) FROM $catalogAndNamespace." +
478+
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
473479
checkFilterPushed(df)
474480
checkAggregateRemoved(df)
475481
checkAggregatePushed(df, "COVAR_SAMP")
@@ -481,11 +487,12 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
481487
}
482488
}
483489

484-
protected def testCorr(): Unit = {
485-
test("scan with aggregate push-down: CORR") {
490+
protected def testCorr(isDistinct: Boolean = false): Unit = {
491+
val distinct = if (isDistinct) "DISTINCT " else ""
492+
test(s"scan with aggregate push-down: CORR with distinct: $isDistinct") {
486493
val df = sql(
487-
s"SELECT CORR(bonus, bonus) FROM $catalogAndNamespace.${caseConvert("employee")}" +
488-
" WHERE dept > 0 GROUP BY dept ORDER BY dept")
494+
s"SELECT CORR(${distinct}bonus, bonus) FROM $catalogAndNamespace." +
495+
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
489496
checkFilterPushed(df)
490497
checkAggregateRemoved(df)
491498
checkAggregatePushed(df, "CORR")

sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala

+19
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,32 @@ private object DB2Dialect extends JdbcDialect {
3030
override def canHandle(url: String): Boolean =
3131
url.toLowerCase(Locale.ROOT).startsWith("jdbc:db2")
3232

33+
// See https://www.ibm.com/docs/en/db2/11.5?topic=functions-aggregate
3334
override def compileAggregate(aggFunction: AggregateFunc): Option[String] = {
3435
super.compileAggregate(aggFunction).orElse(
3536
aggFunction match {
3637
case f: GeneralAggregateFunc if f.name() == "VAR_POP" =>
3738
assert(f.inputs().length == 1)
3839
val distinct = if (f.isDistinct) "DISTINCT " else ""
3940
Some(s"VARIANCE($distinct${f.inputs().head})")
41+
case f: GeneralAggregateFunc if f.name() == "VAR_SAMP" =>
42+
assert(f.inputs().length == 1)
43+
val distinct = if (f.isDistinct) "DISTINCT " else ""
44+
Some(s"VARIANCE_SAMP($distinct${f.inputs().head})")
45+
case f: GeneralAggregateFunc if f.name() == "STDDEV_POP" =>
46+
assert(f.inputs().length == 1)
47+
val distinct = if (f.isDistinct) "DISTINCT " else ""
48+
Some(s"STDDEV($distinct${f.inputs().head})")
49+
case f: GeneralAggregateFunc if f.name() == "STDDEV_SAMP" =>
50+
assert(f.inputs().length == 1)
51+
val distinct = if (f.isDistinct) "DISTINCT " else ""
52+
Some(s"STDDEV_SAMP($distinct${f.inputs().head})")
53+
case f: GeneralAggregateFunc if f.name() == "COVAR_POP" && f.isDistinct == false =>
54+
assert(f.inputs().length == 2)
55+
Some(s"COVARIANCE(${f.inputs().head}, ${f.inputs().last})")
56+
case f: GeneralAggregateFunc if f.name() == "COVAR_SAMP" && f.isDistinct == false =>
57+
assert(f.inputs().length == 2)
58+
Some(s"COVARIANCE_SAMP(${f.inputs().head}, ${f.inputs().last})")
4059
case _ => None
4160
}
4261
)

sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala

+10-13
Original file line numberDiff line numberDiff line change
@@ -30,25 +30,22 @@ private object DerbyDialect extends JdbcDialect {
3030
override def canHandle(url: String): Boolean =
3131
url.toLowerCase(Locale.ROOT).startsWith("jdbc:derby")
3232

33+
// See https://db.apache.org/derby/docs/10.15/ref/index.html
3334
override def compileAggregate(aggFunction: AggregateFunc): Option[String] = {
3435
super.compileAggregate(aggFunction).orElse(
3536
aggFunction match {
36-
case f: GeneralAggregateFunc if f.name() == "VAR_POP" =>
37+
case f: GeneralAggregateFunc if f.name() == "VAR_POP" && f.isDistinct == false =>
3738
assert(f.inputs().length == 1)
38-
val distinct = if (f.isDistinct) "DISTINCT " else ""
39-
Some(s"VAR_POP($distinct${f.inputs().head})")
40-
case f: GeneralAggregateFunc if f.name() == "VAR_SAMP" =>
39+
Some(s"VAR_POP(${f.inputs().head})")
40+
case f: GeneralAggregateFunc if f.name() == "VAR_SAMP" && f.isDistinct == false =>
4141
assert(f.inputs().length == 1)
42-
val distinct = if (f.isDistinct) "DISTINCT " else ""
43-
Some(s"VAR_SAMP($distinct${f.inputs().head})")
44-
case f: GeneralAggregateFunc if f.name() == "STDDEV_POP" =>
42+
Some(s"VAR_SAMP(${f.inputs().head})")
43+
case f: GeneralAggregateFunc if f.name() == "STDDEV_POP" && f.isDistinct == false =>
4544
assert(f.inputs().length == 1)
46-
val distinct = if (f.isDistinct) "DISTINCT " else ""
47-
Some(s"STDDEV_POP($distinct${f.inputs().head})")
48-
case f: GeneralAggregateFunc if f.name() == "STDDEV_SAMP" =>
45+
Some(s"STDDEV_POP(${f.inputs().head})")
46+
case f: GeneralAggregateFunc if f.name() == "STDDEV_SAMP" && f.isDistinct == false =>
4947
assert(f.inputs().length == 1)
50-
val distinct = if (f.isDistinct) "DISTINCT " else ""
51-
Some(s"STDDEV_SAMP($distinct${f.inputs().head})")
48+
Some(s"STDDEV_SAMP(${f.inputs().head})")
5249
case _ => None
5350
}
5451
)
@@ -72,7 +69,7 @@ private object DerbyDialect extends JdbcDialect {
7269

7370
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
7471

75-
// See https://db.apache.org/derby/docs/10.5/ref/rrefsqljrenametablestatement.html
72+
// See https://db.apache.org/derby/docs/10.15/ref/rrefsqljrenametablestatement.html
7673
override def renameTable(oldTable: String, newTable: String): String = {
7774
s"RENAME TABLE $oldTable TO $newTable"
7875
}

sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala

+3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ private object MsSqlServerDialect extends JdbcDialect {
4040
override def canHandle(url: String): Boolean =
4141
url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver")
4242

43+
// scalastyle:off line.size.limit
44+
// See https://docs.microsoft.com/en-us/sql/t-sql/functions/aggregate-functions-transact-sql?view=sql-server-ver15
45+
// scalastyle:on line.size.limit
4346
override def compileAggregate(aggFunction: AggregateFunc): Option[String] = {
4447
super.compileAggregate(aggFunction).orElse(
4548
aggFunction match {

sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala

+9-12
Original file line numberDiff line numberDiff line change
@@ -38,25 +38,22 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
3838
override def canHandle(url : String): Boolean =
3939
url.toLowerCase(Locale.ROOT).startsWith("jdbc:mysql")
4040

41+
// See https://dev.mysql.com/doc/refman/8.0/en/aggregate-functions.html
4142
override def compileAggregate(aggFunction: AggregateFunc): Option[String] = {
4243
super.compileAggregate(aggFunction).orElse(
4344
aggFunction match {
44-
case f: GeneralAggregateFunc if f.name() == "VAR_POP" =>
45+
case f: GeneralAggregateFunc if f.name() == "VAR_POP" && f.isDistinct == false =>
4546
assert(f.inputs().length == 1)
46-
val distinct = if (f.isDistinct) "DISTINCT " else ""
47-
Some(s"VAR_POP($distinct${f.inputs().head})")
48-
case f: GeneralAggregateFunc if f.name() == "VAR_SAMP" =>
47+
Some(s"VAR_POP(${f.inputs().head})")
48+
case f: GeneralAggregateFunc if f.name() == "VAR_SAMP" && f.isDistinct == false =>
4949
assert(f.inputs().length == 1)
50-
val distinct = if (f.isDistinct) "DISTINCT " else ""
51-
Some(s"VAR_SAMP($distinct${f.inputs().head})")
52-
case f: GeneralAggregateFunc if f.name() == "STDDEV_POP" =>
50+
Some(s"VAR_SAMP(${f.inputs().head})")
51+
case f: GeneralAggregateFunc if f.name() == "STDDEV_POP" && f.isDistinct == false =>
5352
assert(f.inputs().length == 1)
54-
val distinct = if (f.isDistinct) "DISTINCT " else ""
55-
Some(s"STDDEV_POP($distinct${f.inputs().head})")
56-
case f: GeneralAggregateFunc if f.name() == "STDDEV_SAMP" =>
53+
Some(s"STDDEV_POP(${f.inputs().head})")
54+
case f: GeneralAggregateFunc if f.name() == "STDDEV_SAMP" && f.isDistinct == false =>
5755
assert(f.inputs().length == 1)
58-
val distinct = if (f.isDistinct) "DISTINCT " else ""
59-
Some(s"STDDEV_SAMP($distinct${f.inputs().head})")
56+
Some(s"STDDEV_SAMP(${f.inputs().head})")
6057
case _ => None
6158
}
6259
)

sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala

+17-21
Original file line numberDiff line numberDiff line change
@@ -34,37 +34,33 @@ private case object OracleDialect extends JdbcDialect {
3434
override def canHandle(url: String): Boolean =
3535
url.toLowerCase(Locale.ROOT).startsWith("jdbc:oracle")
3636

37+
// scalastyle:off line.size.limit
38+
// https://docs.oracle.com/en/database/oracle/oracle-database/19/sqlrf/Aggregate-Functions.html#GUID-62BE676B-AF18-4E63-BD14-25206FEA0848
39+
// scalastyle:on line.size.limit
3740
override def compileAggregate(aggFunction: AggregateFunc): Option[String] = {
3841
super.compileAggregate(aggFunction).orElse(
3942
aggFunction match {
40-
case f: GeneralAggregateFunc if f.name() == "VAR_POP" =>
43+
case f: GeneralAggregateFunc if f.name() == "VAR_POP" && f.isDistinct == false =>
4144
assert(f.inputs().length == 1)
42-
val distinct = if (f.isDistinct) "DISTINCT " else ""
43-
Some(s"VAR_POP($distinct${f.inputs().head})")
44-
case f: GeneralAggregateFunc if f.name() == "VAR_SAMP" =>
45+
Some(s"VAR_POP(${f.inputs().head})")
46+
case f: GeneralAggregateFunc if f.name() == "VAR_SAMP" && f.isDistinct == false =>
4547
assert(f.inputs().length == 1)
46-
val distinct = if (f.isDistinct) "DISTINCT " else ""
47-
Some(s"VAR_SAMP($distinct${f.inputs().head})")
48-
case f: GeneralAggregateFunc if f.name() == "STDDEV_POP" =>
48+
Some(s"VAR_SAMP(${f.inputs().head})")
49+
case f: GeneralAggregateFunc if f.name() == "STDDEV_POP" && f.isDistinct == false =>
4950
assert(f.inputs().length == 1)
50-
val distinct = if (f.isDistinct) "DISTINCT " else ""
51-
Some(s"STDDEV_POP($distinct${f.inputs().head})")
52-
case f: GeneralAggregateFunc if f.name() == "STDDEV_SAMP" =>
51+
Some(s"STDDEV_POP(${f.inputs().head})")
52+
case f: GeneralAggregateFunc if f.name() == "STDDEV_SAMP" && f.isDistinct == false =>
5353
assert(f.inputs().length == 1)
54-
val distinct = if (f.isDistinct) "DISTINCT " else ""
55-
Some(s"STDDEV_SAMP($distinct${f.inputs().head})")
56-
case f: GeneralAggregateFunc if f.name() == "COVAR_POP" =>
54+
Some(s"STDDEV_SAMP(${f.inputs().head})")
55+
case f: GeneralAggregateFunc if f.name() == "COVAR_POP" && f.isDistinct == false =>
5756
assert(f.inputs().length == 2)
58-
val distinct = if (f.isDistinct) "DISTINCT " else ""
59-
Some(s"COVAR_POP($distinct${f.inputs().head}, ${f.inputs().last})")
60-
case f: GeneralAggregateFunc if f.name() == "COVAR_SAMP" =>
57+
Some(s"COVAR_POP(${f.inputs().head}, ${f.inputs().last})")
58+
case f: GeneralAggregateFunc if f.name() == "COVAR_SAMP" && f.isDistinct == false =>
6159
assert(f.inputs().length == 2)
62-
val distinct = if (f.isDistinct) "DISTINCT " else ""
63-
Some(s"COVAR_SAMP($distinct${f.inputs().head}, ${f.inputs().last})")
64-
case f: GeneralAggregateFunc if f.name() == "CORR" =>
60+
Some(s"COVAR_SAMP(${f.inputs().head}, ${f.inputs().last})")
61+
case f: GeneralAggregateFunc if f.name() == "CORR" && f.isDistinct == false =>
6562
assert(f.inputs().length == 2)
66-
val distinct = if (f.isDistinct) "DISTINCT " else ""
67-
Some(s"CORR($distinct${f.inputs().head}, ${f.inputs().last})")
63+
Some(s"CORR(${f.inputs().head}, ${f.inputs().last})")
6864
case _ => None
6965
}
7066
)

sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
3636
override def canHandle(url: String): Boolean =
3737
url.toLowerCase(Locale.ROOT).startsWith("jdbc:postgresql")
3838

39+
// See https://www.postgresql.org/docs/8.4/functions-aggregate.html
3940
override def compileAggregate(aggFunction: AggregateFunc): Option[String] = {
4041
super.compileAggregate(aggFunction).orElse(
4142
aggFunction match {

sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala

+9-9
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ private case object TeradataDialect extends JdbcDialect {
2828
override def canHandle(url: String): Boolean =
2929
url.toLowerCase(Locale.ROOT).startsWith("jdbc:teradata")
3030

31+
// scalastyle:off line.size.limit
32+
// See https://docs.teradata.com/r/Teradata-VantageTM-SQL-Functions-Expressions-and-Predicates/March-2019/Aggregate-Functions
33+
// scalastyle:on line.size.limit
3134
override def compileAggregate(aggFunction: AggregateFunc): Option[String] = {
3235
super.compileAggregate(aggFunction).orElse(
3336
aggFunction match {
@@ -47,18 +50,15 @@ private case object TeradataDialect extends JdbcDialect {
4750
assert(f.inputs().length == 1)
4851
val distinct = if (f.isDistinct) "DISTINCT " else ""
4952
Some(s"STDDEV_SAMP($distinct${f.inputs().head})")
50-
case f: GeneralAggregateFunc if f.name() == "COVAR_POP" =>
53+
case f: GeneralAggregateFunc if f.name() == "COVAR_POP" && f.isDistinct == false =>
5154
assert(f.inputs().length == 2)
52-
val distinct = if (f.isDistinct) "DISTINCT " else ""
53-
Some(s"COVAR_POP($distinct${f.inputs().head}, ${f.inputs().last})")
54-
case f: GeneralAggregateFunc if f.name() == "COVAR_SAMP" =>
55+
Some(s"COVAR_POP(${f.inputs().head}, ${f.inputs().last})")
56+
case f: GeneralAggregateFunc if f.name() == "COVAR_SAMP" && f.isDistinct == false =>
5557
assert(f.inputs().length == 2)
56-
val distinct = if (f.isDistinct) "DISTINCT " else ""
57-
Some(s"COVAR_SAMP($distinct${f.inputs().head}, ${f.inputs().last})")
58-
case f: GeneralAggregateFunc if f.name() == "CORR" =>
58+
Some(s"COVAR_SAMP(${f.inputs().head}, ${f.inputs().last})")
59+
case f: GeneralAggregateFunc if f.name() == "CORR" && f.isDistinct == false =>
5960
assert(f.inputs().length == 2)
60-
val distinct = if (f.isDistinct) "DISTINCT " else ""
61-
Some(s"CORR($distinct${f.inputs().head}, ${f.inputs().last})")
61+
Some(s"CORR(${f.inputs().head}, ${f.inputs().last})")
6262
case _ => None
6363
}
6464
)

0 commit comments

Comments
 (0)