diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 15157a1d3546e..7842ab36bb1b7 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -153,7 +153,7 @@ com.h2database h2 - 1.4.195 + 2.0.204 test diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 87bdc2e721ad8..f4b18f1adfdec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -24,7 +24,6 @@ import java.util.{Calendar, GregorianCalendar, Properties, TimeZone} import scala.collection.JavaConverters._ -import org.h2.jdbc.JdbcSQLException import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} @@ -54,7 +53,8 @@ class JDBCSuite extends QueryTest val urlWithUserAndPass = "jdbc:h2:mem:testdb0;user=testUser;password=testPass" var conn: java.sql.Connection = null - val testBytes = Array[Byte](99.toByte, 134.toByte, 135.toByte, 200.toByte, 205.toByte) + val testBytes = Array[Byte](99.toByte, 134.toByte, 135.toByte, 200.toByte, 205.toByte) ++ + Array.fill(15)(0.toByte) val testH2Dialect = new JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:h2") @@ -87,7 +87,6 @@ class JDBCSuite extends QueryTest val properties = new Properties() properties.setProperty("user", "testUser") properties.setProperty("password", "testPass") - properties.setProperty("rowId", "false") conn = DriverManager.getConnection(url, properties) conn.prepareStatement("create schema test").executeUpdate() @@ -162,7 +161,7 @@ class JDBCSuite extends QueryTest |OPTIONS (url '$url', dbtable 'TEST.STRTYPES', user 'testUser', password 'testPass') """.stripMargin.replaceAll("\n", " ")) - conn.prepareStatement("create table test.timetypes (a TIME, b DATE, c TIMESTAMP)" + conn.prepareStatement("create table test.timetypes (a TIME, b DATE, c TIMESTAMP(7))" ).executeUpdate() conn.prepareStatement("insert into test.timetypes values ('12:34:56', " + "'1996-01-01', '2002-02-20 11:22:33.543543543')").executeUpdate() @@ -177,12 +176,12 @@ class JDBCSuite extends QueryTest """.stripMargin.replaceAll("\n", " ")) conn.prepareStatement("CREATE TABLE test.timezone (tz TIMESTAMP WITH TIME ZONE) " + - "AS SELECT '1999-01-08 04:05:06.543543543 GMT-08:00'") + "AS SELECT '1999-01-08 04:05:06.543543543-08:00'") .executeUpdate() conn.commit() - conn.prepareStatement("CREATE TABLE test.array (ar ARRAY) " + - "AS SELECT '(1, 2, 3)'") + conn.prepareStatement("CREATE TABLE test.array_table (ar Integer ARRAY) " + + "AS SELECT ARRAY[1, 2, 3]") .executeUpdate() conn.commit() @@ -638,7 +637,7 @@ class JDBCSuite extends QueryTest assert(rows(0).getAs[Array[Byte]](0).sameElements(testBytes)) assert(rows(0).getString(1).equals("Sensitive")) assert(rows(0).getString(2).equals("Insensitive")) - assert(rows(0).getString(3).equals("Twenty-byte CHAR")) + assert(rows(0).getString(3).equals("Twenty-byte CHAR ")) assert(rows(0).getAs[Array[Byte]](4).sameElements(testBytes)) assert(rows(0).getString(5).equals("I am a clob!")) } @@ -729,20 +728,6 @@ class JDBCSuite extends QueryTest assert(math.abs(rows(0).getDouble(1) - 1.00000023841859331) < 1e-12) } - test("Pass extra properties via OPTIONS") { - // We set rowId to false during setup, which means that _ROWID_ column should be absent from - // all tables. If rowId is true (default), the query below doesn't throw an exception. - intercept[JdbcSQLException] { - sql( - s""" - |CREATE OR REPLACE TEMPORARY VIEW abc - |USING org.apache.spark.sql.jdbc - |OPTIONS (url '$url', dbtable '(SELECT _ROWID_ FROM test.people)', - | user 'testUser', password 'testPass') - """.stripMargin.replaceAll("\n", " ")) - } - } - test("Remap types via JdbcDialects") { JdbcDialects.registerDialect(testH2Dialect) val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties()) @@ -1375,7 +1360,7 @@ class JDBCSuite extends QueryTest }.getMessage assert(e.contains("Unsupported type TIMESTAMP_WITH_TIMEZONE")) e = intercept[SQLException] { - spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY", new Properties()).collect() + spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY_TABLE", new Properties()).collect() }.getMessage assert(e.contains("Unsupported type ARRAY")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 8808321323602..f99e40a6cbed6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -432,7 +432,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "PushedGroupByColumns: []" checkKeywordsExistsInExplain(df, expected_plan_fragment) } - checkAnswer(df, Seq(Row(2, 1.0))) + checkAnswer(df, Seq(Row(2, 1.5))) } test("partitioned scan with aggregate push-down: complete push-down only") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index efa2773bfd692..79952e5a6c288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -227,7 +227,7 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { JdbcDialects.registerDialect(testH2Dialect) val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) - val m = intercept[org.h2.jdbc.JdbcSQLException] { + val m = intercept[org.h2.jdbc.JdbcSQLSyntaxErrorException] { df.write.option("createTableOptions", "ENGINE tableEngineName") .jdbc(url1, "TEST.CREATETBLOPTS", properties) }.getMessage @@ -326,7 +326,7 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { test("save errors if wrong user/password combination") { val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) - val e = intercept[org.h2.jdbc.JdbcSQLException] { + val e = intercept[org.h2.jdbc.JdbcSQLInvalidAuthorizationSpecException] { df.write.format("jdbc") .option("dbtable", "TEST.SAVETEST") .option("url", url1) @@ -427,7 +427,7 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { // verify the data types of the created table by reading the database catalog of H2 val query = """ - |(SELECT column_name, type_name, character_maximum_length + |(SELECT column_name, data_type, character_maximum_length | FROM information_schema.columns WHERE table_name = 'DBCOLTYPETEST') """.stripMargin val rows = spark.read.jdbc(url1, query, properties).collect() @@ -436,7 +436,7 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { val typeName = row.getString(1) // For CHAR and VARCHAR, we also compare the max length if (typeName.contains("CHAR")) { - val charMaxLength = row.getInt(2) + val charMaxLength = row.getLong(2) assert(expectedTypes(row.getString(0)) == s"$typeName($charMaxLength)") } else { assert(expectedTypes(row.getString(0)) == typeName) @@ -452,15 +452,18 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { val df = spark.createDataFrame(sparkContext.parallelize(data), schema) // out-of-order - val expected1 = Map("id" -> "BIGINT", "first#name" -> "VARCHAR(123)", "city" -> "CHAR(20)") + val expected1 = + Map("id" -> "BIGINT", "first#name" -> "CHARACTER VARYING(123)", "city" -> "CHARACTER(20)") testUserSpecifiedColTypes(df, "`first#name` VARCHAR(123), id BIGINT, city CHAR(20)", expected1) // partial schema - val expected2 = Map("id" -> "INTEGER", "first#name" -> "VARCHAR(123)", "city" -> "CHAR(20)") + val expected2 = + Map("id" -> "INTEGER", "first#name" -> "CHARACTER VARYING(123)", "city" -> "CHARACTER(20)") testUserSpecifiedColTypes(df, "`first#name` VARCHAR(123), city CHAR(20)", expected2) withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { // should still respect the original column names - val expected = Map("id" -> "INTEGER", "first#name" -> "VARCHAR(123)", "city" -> "CLOB") + val expected = Map("id" -> "INTEGER", "first#name" -> "CHARACTER VARYING(123)", + "city" -> "CHARACTER LARGE OBJECT(9223372036854775807)") testUserSpecifiedColTypes(df, "`FiRsT#NaMe` VARCHAR(123)", expected) } @@ -470,7 +473,9 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { StructField("First#Name", StringType) :: StructField("city", StringType) :: Nil) val df = spark.createDataFrame(sparkContext.parallelize(data), schema) - val expected = Map("id" -> "INTEGER", "First#Name" -> "VARCHAR(123)", "city" -> "CLOB") + val expected = + Map("id" -> "INTEGER", "First#Name" -> "CHARACTER VARYING(123)", + "city" -> "CHARACTER LARGE OBJECT(9223372036854775807)") testUserSpecifiedColTypes(df, "`First#Name` VARCHAR(123)", expected) } }