Skip to content

Commit 3cac7e6

Browse files
belieferchenzhx
authored andcommitted
[SPARK-37867][SQL] Compile aggregate functions of build-in JDBC dialect
### What changes were proposed in this pull request? DS V2 translate a lot of standard aggregate functions. Currently, only H2Dialect compile these standard aggregate functions. This PR compile these standard aggregate functions for other build-in JDBC dialect. ### Why are the changes needed? Make build-in JDBC dialect support complete aggregate push-down. ### Does this PR introduce _any_ user-facing change? 'Yes'. Users could use complete aggregate push-down with build-in JDBC dialect. ### How was this patch tested? New tests. Closes apache#35166 from beliefer/SPARK-37867. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent ac8fd9c commit 3cac7e6

14 files changed

+490
-38
lines changed

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala

+14-3
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
package org.apache.spark.sql.jdbc.v2
1919

2020
import java.sql.Connection
21+
import java.util.Locale
2122

2223
import org.scalatest.time.SpanSugar._
2324

2425
import org.apache.spark.SparkConf
2526
import org.apache.spark.sql.AnalysisException
2627
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
27-
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
28+
import org.apache.spark.sql.jdbc.DatabaseOnDocker
2829
import org.apache.spark.sql.types._
2930
import org.apache.spark.tags.DockerTest
3031

@@ -36,8 +37,9 @@ import org.apache.spark.tags.DockerTest
3637
* }}}
3738
*/
3839
@DockerTest
39-
class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
40+
class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {
4041
override val catalogName: String = "db2"
42+
override val namespaceOpt: Option[String] = Some("DB2INST1")
4143
override val db = new DatabaseOnDocker {
4244
override val imageName = sys.env.getOrElse("DB2_DOCKER_IMAGE_NAME", "ibmcom/db2:11.5.4.0")
4345
override val env = Map(
@@ -59,8 +61,13 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
5961
override def sparkConf: SparkConf = super.sparkConf
6062
.set("spark.sql.catalog.db2", classOf[JDBCTableCatalog].getName)
6163
.set("spark.sql.catalog.db2.url", db.getJdbcUrl(dockerIp, externalPort))
64+
.set("spark.sql.catalog.db2.pushDownAggregate", "true")
6265

63-
override def dataPreparation(conn: Connection): Unit = {}
66+
override def tablePreparation(connection: Connection): Unit = {
67+
connection.prepareStatement(
68+
"CREATE TABLE employee (dept INTEGER, name VARCHAR(10), salary DECIMAL(20, 2), bonus DOUBLE)")
69+
.executeUpdate()
70+
}
6471

6572
override def testUpdateColumnType(tbl: String): Unit = {
6673
sql(s"CREATE TABLE $tbl (ID INTEGER)")
@@ -86,4 +93,8 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
8693
val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
8794
assert(t.schema === expectedSchema)
8895
}
96+
97+
override def caseConvert(tableName: String): String = tableName.toUpperCase(Locale.ROOT)
98+
99+
testVarPop()
89100
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.jdbc.v2
19+
20+
import java.sql.Connection
21+
22+
import org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite
23+
24+
abstract class DockerJDBCIntegrationV2Suite extends DockerJDBCIntegrationSuite {
25+
26+
/**
27+
* Prepare databases and tables for testing.
28+
*/
29+
override def dataPreparation(connection: Connection): Unit = {
30+
tablePreparation(connection)
31+
connection.prepareStatement("INSERT INTO employee VALUES (1, 'amy', 10000, 1000)")
32+
.executeUpdate()
33+
connection.prepareStatement("INSERT INTO employee VALUES (2, 'alex', 12000, 1200)")
34+
.executeUpdate()
35+
connection.prepareStatement("INSERT INTO employee VALUES (1, 'cathy', 9000, 1200)")
36+
.executeUpdate()
37+
connection.prepareStatement("INSERT INTO employee VALUES (2, 'david', 10000, 1300)")
38+
.executeUpdate()
39+
connection.prepareStatement("INSERT INTO employee VALUES (6, 'jen', 12000, 1200)")
40+
.executeUpdate()
41+
}
42+
43+
def tablePreparation(connection: Connection): Unit
44+
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala

+13-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.scalatest.time.SpanSugar._
2424
import org.apache.spark.SparkConf
2525
import org.apache.spark.sql.AnalysisException
2626
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
27-
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
27+
import org.apache.spark.sql.jdbc.DatabaseOnDocker
2828
import org.apache.spark.sql.types._
2929
import org.apache.spark.tags.DockerTest
3030

@@ -36,7 +36,7 @@ import org.apache.spark.tags.DockerTest
3636
* }}}
3737
*/
3838
@DockerTest
39-
class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
39+
class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {
4040

4141
override val catalogName: String = "mssql"
4242

@@ -57,10 +57,15 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBC
5757
override def sparkConf: SparkConf = super.sparkConf
5858
.set("spark.sql.catalog.mssql", classOf[JDBCTableCatalog].getName)
5959
.set("spark.sql.catalog.mssql.url", db.getJdbcUrl(dockerIp, externalPort))
60+
.set("spark.sql.catalog.mssql.pushDownAggregate", "true")
6061

6162
override val connectionTimeout = timeout(7.minutes)
6263

63-
override def dataPreparation(conn: Connection): Unit = {}
64+
override def tablePreparation(connection: Connection): Unit = {
65+
connection.prepareStatement(
66+
"CREATE TABLE employee (dept INT, name VARCHAR(32), salary NUMERIC(20, 2), bonus FLOAT)")
67+
.executeUpdate()
68+
}
6469

6570
override def notSupportsTableComment: Boolean = true
6671

@@ -90,4 +95,9 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBC
9095

9196
assert(msg.contains("UpdateColumnNullability is not supported"))
9297
}
98+
99+
testVarPop()
100+
testVarSamp()
101+
testStddevPop()
102+
testStddevSamp()
93103
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala

+13-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.scalatest.time.SpanSugar._
2424
import org.apache.spark.SparkConf
2525
import org.apache.spark.sql.AnalysisException
2626
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
27-
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
27+
import org.apache.spark.sql.jdbc.DatabaseOnDocker
2828
import org.apache.spark.sql.types._
2929
import org.apache.spark.tags.DockerTest
3030

@@ -39,7 +39,7 @@ import org.apache.spark.tags.DockerTest
3939
*
4040
*/
4141
@DockerTest
42-
class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
42+
class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {
4343
override val catalogName: String = "mysql"
4444
override val db = new DatabaseOnDocker {
4545
override val imageName = sys.env.getOrElse("MYSQL_DOCKER_IMAGE_NAME", "mysql:5.7.36")
@@ -57,13 +57,17 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
5757
override def sparkConf: SparkConf = super.sparkConf
5858
.set("spark.sql.catalog.mysql", classOf[JDBCTableCatalog].getName)
5959
.set("spark.sql.catalog.mysql.url", db.getJdbcUrl(dockerIp, externalPort))
60+
.set("spark.sql.catalog.mysql.pushDownAggregate", "true")
6061

6162
override val connectionTimeout = timeout(7.minutes)
6263

6364
private var mySQLVersion = -1
6465

65-
override def dataPreparation(conn: Connection): Unit = {
66-
mySQLVersion = conn.getMetaData.getDatabaseMajorVersion
66+
override def tablePreparation(connection: Connection): Unit = {
67+
mySQLVersion = connection.getMetaData.getDatabaseMajorVersion
68+
connection.prepareStatement(
69+
"CREATE TABLE employee (dept INT, name VARCHAR(32), salary DECIMAL(20, 2)," +
70+
" bonus DOUBLE)").executeUpdate()
6771
}
6872

6973
override def testUpdateColumnType(tbl: String): Unit = {
@@ -119,4 +123,9 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
119123
override def supportsIndex: Boolean = true
120124

121125
override def indexOptions: String = "KEY_BLOCK_SIZE=10"
126+
127+
testVarPop()
128+
testVarSamp()
129+
testStddevPop()
130+
testStddevSamp()
122131
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala

+21-3
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
package org.apache.spark.sql.jdbc.v2
1919

2020
import java.sql.Connection
21+
import java.util.Locale
2122

2223
import org.scalatest.time.SpanSugar._
2324

2425
import org.apache.spark.SparkConf
2526
import org.apache.spark.sql.AnalysisException
2627
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
27-
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
28+
import org.apache.spark.sql.jdbc.DatabaseOnDocker
2829
import org.apache.spark.sql.types._
2930
import org.apache.spark.tags.DockerTest
3031

@@ -53,8 +54,9 @@ import org.apache.spark.tags.DockerTest
5354
* It has been validated with 18.4.0 Express Edition.
5455
*/
5556
@DockerTest
56-
class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
57+
class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {
5758
override val catalogName: String = "oracle"
59+
override val namespaceOpt: Option[String] = Some("SYSTEM")
5860
override val db = new DatabaseOnDocker {
5961
lazy override val imageName = sys.env("ORACLE_DOCKER_IMAGE_NAME")
6062
override val env = Map(
@@ -69,9 +71,15 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest
6971
override def sparkConf: SparkConf = super.sparkConf
7072
.set("spark.sql.catalog.oracle", classOf[JDBCTableCatalog].getName)
7173
.set("spark.sql.catalog.oracle.url", db.getJdbcUrl(dockerIp, externalPort))
74+
.set("spark.sql.catalog.oracle.pushDownAggregate", "true")
7275

7376
override val connectionTimeout = timeout(7.minutes)
74-
override def dataPreparation(conn: Connection): Unit = {}
77+
78+
override def tablePreparation(connection: Connection): Unit = {
79+
connection.prepareStatement(
80+
"CREATE TABLE employee (dept NUMBER(32), name VARCHAR2(32), salary NUMBER(20, 2)," +
81+
" bonus BINARY_DOUBLE)").executeUpdate()
82+
}
7583

7684
override def testUpdateColumnType(tbl: String): Unit = {
7785
sql(s"CREATE TABLE $tbl (ID INTEGER)")
@@ -89,4 +97,14 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest
8997
assert(msg1.contains(
9098
s"Cannot update $catalogName.alt_table field ID: string cannot be cast to int"))
9199
}
100+
101+
override def caseConvert(tableName: String): String = tableName.toUpperCase(Locale.ROOT)
102+
103+
testVarPop()
104+
testVarSamp()
105+
testStddevPop()
106+
testStddevSamp()
107+
testCovarPop()
108+
testCovarSamp()
109+
testCorr()
92110
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala

+16-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.sql.Connection
2222
import org.apache.spark.SparkConf
2323
import org.apache.spark.sql.AnalysisException
2424
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
25-
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
25+
import org.apache.spark.sql.jdbc.DatabaseOnDocker
2626
import org.apache.spark.sql.types._
2727
import org.apache.spark.tags.DockerTest
2828

@@ -34,7 +34,7 @@ import org.apache.spark.tags.DockerTest
3434
* }}}
3535
*/
3636
@DockerTest
37-
class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
37+
class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {
3838
override val catalogName: String = "postgresql"
3939
override val db = new DatabaseOnDocker {
4040
override val imageName = sys.env.getOrElse("POSTGRES_DOCKER_IMAGE_NAME", "postgres:13.0-alpine")
@@ -51,8 +51,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTes
5151
.set("spark.sql.catalog.postgresql.url", db.getJdbcUrl(dockerIp, externalPort))
5252
.set("spark.sql.catalog.postgresql.pushDownTableSample", "true")
5353
.set("spark.sql.catalog.postgresql.pushDownLimit", "true")
54+
.set("spark.sql.catalog.postgresql.pushDownAggregate", "true")
5455

55-
override def dataPreparation(conn: Connection): Unit = {}
56+
override def tablePreparation(connection: Connection): Unit = {
57+
connection.prepareStatement(
58+
"CREATE TABLE employee (dept INTEGER, name VARCHAR(32), salary NUMERIC(20, 2)," +
59+
" bonus double precision)").executeUpdate()
60+
}
5661

5762
override def testUpdateColumnType(tbl: String): Unit = {
5863
sql(s"CREATE TABLE $tbl (ID INTEGER)")
@@ -84,4 +89,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTes
8489
override def supportsIndex: Boolean = true
8590

8691
override def indexOptions: String = "FILLFACTOR=70"
92+
93+
testVarPop()
94+
testVarSamp()
95+
testStddevPop()
96+
testStddevSamp()
97+
testCovarPop()
98+
testCovarSamp()
99+
testCorr()
87100
}

0 commit comments

Comments
 (0)