Skip to content

Commit 40cb28f

Browse files
dchvnchenzhx
authored andcommitted
[SPARK-37343][SQL] Implement createIndex, IndexExists and dropIndex in JDBC (Postgres dialect)
### What changes were proposed in this pull request? Implementing `createIndex`/`IndexExists`/`dropIndex` in DS V2 JDBC for Postgres dialect. ### Why are the changes needed? This is a subtask of the V2 Index support. This PR implements `createIndex`, `IndexExists` and `dropIndex`. After review for some changes in this PR, I will create new PR for `listIndexs`, or add it in this PR. This PR only implements `createIndex`, `IndexExists` and `dropIndex` in Postgres dialect. ### Does this PR introduce _any_ user-facing change? Yes, `createIndex`/`IndexExists`/`dropIndex` in DS V2 JDBC ### How was this patch tested? New test. Closes apache#34673 from dchvn/Dsv2_index_postgres. Authored-by: dch nguyen <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 774f1df commit 40cb28f

File tree

6 files changed

+170
-124
lines changed

6 files changed

+170
-124
lines changed

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala

+4-22
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,21 @@
1818
package org.apache.spark.sql.jdbc.v2
1919

2020
import java.sql.{Connection, SQLFeatureNotSupportedException}
21-
import java.util
2221

2322
import org.scalatest.time.SpanSugar._
2423

2524
import org.apache.spark.SparkConf
2625
import org.apache.spark.sql.AnalysisException
27-
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
28-
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
2926
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
3027
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
3128
import org.apache.spark.sql.types._
3229
import org.apache.spark.tags.DockerTest
3330

3431
/**
3532
*
36-
* To run this test suite for a specific version (e.g., mysql:5.7.31):
33+
* To run this test suite for a specific version (e.g., mysql:5.7.36):
3734
* {{{
38-
* ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:5.7.31
35+
* ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:5.7.36
3936
* ./build/sbt -Pdocker-integration-tests "testOnly *v2*MySQLIntegrationSuite"
4037
*
4138
* }}}
@@ -45,7 +42,7 @@ import org.apache.spark.tags.DockerTest
4542
class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
4643
override val catalogName: String = "mysql"
4744
override val db = new DatabaseOnDocker {
48-
override val imageName = sys.env.getOrElse("MYSQL_DOCKER_IMAGE_NAME", "mysql:5.7.31")
45+
override val imageName = sys.env.getOrElse("MYSQL_DOCKER_IMAGE_NAME", "mysql:5.7.36")
4946
override val env = Map(
5047
"MYSQL_ROOT_PASSWORD" -> "rootpass"
5148
)
@@ -121,20 +118,5 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
121118

122119
override def supportsIndex: Boolean = true
123120

124-
override def testIndexProperties(jdbcTable: SupportsIndex): Unit = {
125-
val properties = new util.Properties();
126-
properties.put("KEY_BLOCK_SIZE", "10")
127-
properties.put("COMMENT", "'this is a comment'")
128-
// MySQL doesn't allow property set on individual column, so use empty Array for
129-
// column properties
130-
jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
131-
Array.empty[util.Map[NamedReference, util.Properties]], properties)
132-
133-
var index = jdbcTable.listIndexes()
134-
// The index property size is actually 1. Even though the index is created
135-
// with properties "KEY_BLOCK_SIZE", "10" and "COMMENT", "'this is a comment'", when
136-
// retrieving index using `SHOW INDEXES`, MySQL only returns `COMMENT`.
137-
assert(index(0).properties.size == 1)
138-
assert(index(0).properties.get("COMMENT").equals("this is a comment"))
139-
}
121+
override def indexOptions: String = "KEY_BLOCK_SIZE=10"
140122
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala

+4
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTes
8080
}
8181

8282
override def supportsTableSample: Boolean = true
83+
84+
override def supportsIndex: Boolean = true
85+
86+
override def indexOptions: String = "FILLFACTOR=70"
8387
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala

+26-62
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717

1818
package org.apache.spark.sql.jdbc.v2
1919

20-
import java.util
21-
2220
import org.apache.log4j.Level
2321

2422
import org.apache.spark.sql.{AnalysisException, DataFrame}
23+
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
2524
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Sample}
2625
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
2726
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
@@ -191,91 +190,56 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
191190
}
192191

193192
def supportsIndex: Boolean = false
194-
def testIndexProperties(jdbcTable: SupportsIndex): Unit = {}
195193

196-
test("SPARK-36913: Test INDEX") {
194+
def indexOptions: String = ""
195+
196+
test("SPARK-36895: Test INDEX Using SQL") {
197197
if (supportsIndex) {
198198
withTable(s"$catalogName.new_table") {
199199
sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT," +
200-
s" col4 INT, col5 INT)")
200+
" col4 INT, col5 INT)")
201201
val loaded = Catalogs.load(catalogName, conf)
202202
val jdbcTable = loaded.asInstanceOf[TableCatalog]
203203
.loadTable(Identifier.of(Array.empty[String], "new_table"))
204204
.asInstanceOf[SupportsIndex]
205205
assert(jdbcTable.indexExists("i1") == false)
206206
assert(jdbcTable.indexExists("i2") == false)
207207

208-
val properties = new util.Properties();
209208
val indexType = "DUMMY"
210209
var m = intercept[UnsupportedOperationException] {
211-
jdbcTable.createIndex("i1", indexType, Array(FieldReference("col1")),
212-
Array.empty[util.Map[NamedReference, util.Properties]], properties)
210+
sql(s"CREATE index i1 ON $catalogName.new_table USING $indexType (col1)")
213211
}.getMessage
214212
assert(m.contains(s"Index Type $indexType is not supported." +
215-
s" The supported Index Types are: BTREE and HASH"))
216-
217-
jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
218-
Array.empty[util.Map[NamedReference, util.Properties]], properties)
213+
s" The supported Index Types are:"))
219214

220-
jdbcTable.createIndex("i2", "",
221-
Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")),
222-
Array.empty[util.Map[NamedReference, util.Properties]], properties)
215+
sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)")
216+
sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" +
217+
s" OPTIONS ($indexOptions)")
223218

224219
assert(jdbcTable.indexExists("i1") == true)
225220
assert(jdbcTable.indexExists("i2") == true)
226221

222+
// This should pass without exception
223+
sql(s"CREATE index IF NOT EXISTS i1 ON $catalogName.new_table (col1)")
224+
227225
m = intercept[IndexAlreadyExistsException] {
228-
jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
229-
Array.empty[util.Map[NamedReference, util.Properties]], properties)
226+
sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
230227
}.getMessage
231-
assert(m.contains("Failed to create index: i1 in new_table"))
232-
233-
var index = jdbcTable.listIndexes()
234-
assert(index.length == 2)
235-
236-
assert(index(0).indexName.equals("i1"))
237-
assert(index(0).indexType.equals("BTREE"))
238-
var cols = index(0).columns
239-
assert(cols.length == 1)
240-
assert(cols(0).describe().equals("col1"))
241-
assert(index(0).properties.size == 0)
242-
243-
assert(index(1).indexName.equals("i2"))
244-
assert(index(1).indexType.equals("BTREE"))
245-
cols = index(1).columns
246-
assert(cols.length == 3)
247-
assert(cols(0).describe().equals("col2"))
248-
assert(cols(1).describe().equals("col3"))
249-
assert(cols(2).describe().equals("col5"))
250-
assert(index(1).properties.size == 0)
251-
252-
jdbcTable.dropIndex("i1")
253-
assert(jdbcTable.indexExists("i1") == false)
254-
assert(jdbcTable.indexExists("i2") == true)
255-
256-
index = jdbcTable.listIndexes()
257-
assert(index.length == 1)
228+
assert(m.contains("Failed to create index i1 in new_table"))
258229

259-
assert(index(0).indexName.equals("i2"))
260-
assert(index(0).indexType.equals("BTREE"))
261-
cols = index(0).columns
262-
assert(cols.length == 3)
263-
assert(cols(0).describe().equals("col2"))
264-
assert(cols(1).describe().equals("col3"))
265-
assert(cols(2).describe().equals("col5"))
230+
sql(s"DROP index i1 ON $catalogName.new_table")
231+
sql(s"DROP index i2 ON $catalogName.new_table")
266232

267-
jdbcTable.dropIndex("i2")
268233
assert(jdbcTable.indexExists("i1") == false)
269234
assert(jdbcTable.indexExists("i2") == false)
270-
index = jdbcTable.listIndexes()
271-
assert(index.length == 0)
235+
236+
// This should pass without exception
237+
sql(s"DROP index IF EXISTS i1 ON $catalogName.new_table")
272238

273239
m = intercept[NoSuchIndexException] {
274-
jdbcTable.dropIndex("i2")
240+
sql(s"DROP index i1 ON $catalogName.new_table")
275241
}.getMessage
276-
assert(m.contains("Failed to drop index: i2"))
277-
278-
testIndexProperties(jdbcTable)
242+
assert(m.contains("Failed to drop index i1 in new_table"))
279243
}
280244
}
281245
}
@@ -338,23 +302,23 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
338302
assert(samplePushed(df3))
339303
assert(limitPushed(df3, 2))
340304
assert(columnPruned(df3, "col1"))
341-
assert(df3.collect().length == 2)
305+
assert(df3.collect().length <= 2)
342306

343307
// sample(... PERCENT) push down + limit push down + column pruning
344308
val df4 = sql(s"SELECT col1 FROM $catalogName.new_table" +
345309
" TABLESAMPLE (50 PERCENT) REPEATABLE (12345) LIMIT 2")
346310
assert(samplePushed(df4))
347311
assert(limitPushed(df4, 2))
348312
assert(columnPruned(df4, "col1"))
349-
assert(df4.collect().length == 2)
313+
assert(df4.collect().length <= 2)
350314

351315
// sample push down + filter push down + limit push down
352316
val df5 = sql(s"SELECT * FROM $catalogName.new_table" +
353317
" TABLESAMPLE (BUCKET 6 OUT OF 10) WHERE col1 > 0 LIMIT 2")
354318
assert(samplePushed(df5))
355319
assert(filterPushed(df5))
356320
assert(limitPushed(df5, 2))
357-
assert(df5.collect().length == 2)
321+
assert(df5.collect().length <= 2)
358322

359323
// sample + filter + limit + column pruning
360324
// sample pushed down, filer/limit not pushed down, column pruned
@@ -365,7 +329,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
365329
assert(!filterPushed(df6))
366330
assert(!limitPushed(df6, 2))
367331
assert(columnPruned(df6, "col1"))
368-
assert(df6.collect().length == 2)
332+
assert(df6.collect().length <= 2)
369333

370334
// sample + limit
371335
// Push down order is sample -> filter -> limit

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

+70-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import java.util
2323
import java.util.Locale
2424
import java.util.concurrent.TimeUnit
2525

26+
import scala.collection.JavaConverters._
27+
import scala.collection.mutable.ArrayBuffer
2628
import scala.util.Try
2729
import scala.util.control.NonFatal
2830

@@ -38,7 +40,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
3840
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
3941
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
4042
import org.apache.spark.sql.connector.catalog.TableChange
41-
import org.apache.spark.sql.connector.catalog.index.TableIndex
43+
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
4244
import org.apache.spark.sql.connector.expressions.NamedReference
4345
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
4446
import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
@@ -1084,6 +1086,73 @@ object JdbcUtils extends Logging {
10841086
}
10851087
}
10861088

1089+
/**
1090+
* Check if index exists in a table
1091+
*/
1092+
def checkIfIndexExists(
1093+
conn: Connection,
1094+
sql: String,
1095+
options: JDBCOptions): Boolean = {
1096+
val statement = conn.createStatement
1097+
try {
1098+
statement.setQueryTimeout(options.queryTimeout)
1099+
val rs = statement.executeQuery(sql)
1100+
rs.next
1101+
} catch {
1102+
case _: Exception =>
1103+
logWarning("Cannot retrieved index info.")
1104+
false
1105+
} finally {
1106+
statement.close()
1107+
}
1108+
}
1109+
1110+
/**
1111+
* Process index properties and return tuple of indexType and list of the other index properties.
1112+
*/
1113+
def processIndexProperties(
1114+
properties: util.Map[String, String],
1115+
catalogName: String): (String, Array[String]) = {
1116+
var indexType = ""
1117+
val indexPropertyList: ArrayBuffer[String] = ArrayBuffer[String]()
1118+
val supportedIndexTypeList = getSupportedIndexTypeList(catalogName)
1119+
1120+
if (!properties.isEmpty) {
1121+
properties.asScala.foreach { case (k, v) =>
1122+
if (k.equals(SupportsIndex.PROP_TYPE)) {
1123+
if (containsIndexTypeIgnoreCase(supportedIndexTypeList, v)) {
1124+
indexType = s"USING $v"
1125+
} else {
1126+
throw new UnsupportedOperationException(s"Index Type $v is not supported." +
1127+
s" The supported Index Types are: ${supportedIndexTypeList.mkString(" AND ")}")
1128+
}
1129+
} else {
1130+
indexPropertyList.append(s"$k = $v")
1131+
}
1132+
}
1133+
}
1134+
(indexType, indexPropertyList.toArray)
1135+
}
1136+
1137+
def containsIndexTypeIgnoreCase(supportedIndexTypeList: Array[String], value: String): Boolean = {
1138+
if (supportedIndexTypeList.isEmpty) {
1139+
throw new UnsupportedOperationException(
1140+
"Cannot specify 'USING index_type' in 'CREATE INDEX'")
1141+
}
1142+
for (indexType <- supportedIndexTypeList) {
1143+
if (value.equalsIgnoreCase(indexType)) return true
1144+
}
1145+
false
1146+
}
1147+
1148+
def getSupportedIndexTypeList(catalogName: String): Array[String] = {
1149+
catalogName match {
1150+
case "mysql" => Array("BTREE", "HASH")
1151+
case "postgresql" => Array("BTREE", "HASH", "BRIN")
1152+
case _ => Array.empty
1153+
}
1154+
}
1155+
10871156
def executeQuery(conn: Connection, options: JDBCOptions, sql: String): ResultSet = {
10881157
val statement = conn.createStatement
10891158
try {

sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala

+7-37
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import java.sql.{Connection, SQLException, Types}
2121
import java.util
2222
import java.util.Locale
2323

24-
import scala.collection.JavaConverters._
25-
2624
import org.apache.spark.sql.AnalysisException
2725
import org.apache.spark.sql.catalyst.SQLConfHelper
2826
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
@@ -115,32 +113,17 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
115113
// https://dev.mysql.com/doc/refman/8.0/en/create-index.html
116114
override def createIndex(
117115
indexName: String,
118-
indexType: String,
119116
tableName: String,
120117
columns: Array[NamedReference],
121118
columnsProperties: Array[util.Map[NamedReference, util.Properties]],
122119
properties: util.Properties): String = {
123120
val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head))
124-
var indexProperties: String = ""
125-
val scalaProps = properties.asScala
126-
if (!properties.isEmpty) {
127-
scalaProps.foreach { case (k, v) =>
128-
indexProperties = indexProperties + " " + s"$k $v"
129-
}
130-
}
131-
val iType = if (indexType.isEmpty) {
132-
""
133-
} else {
134-
if (indexType.length > 1 && !indexType.equalsIgnoreCase("BTREE") &&
135-
!indexType.equalsIgnoreCase("HASH")) {
136-
throw new UnsupportedOperationException(s"Index Type $indexType is not supported." +
137-
" The supported Index Types are: BTREE and HASH")
138-
}
139-
s"USING $indexType"
140-
}
121+
val (indexType, indexPropertyList) = JdbcUtils.processIndexProperties(properties, "mysql")
122+
141123
// columnsProperties doesn't apply to MySQL so it is ignored
142-
s"CREATE INDEX ${quoteIdentifier(indexName)} $iType ON" +
143-
s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")}) $indexProperties"
124+
s"CREATE INDEX ${quoteIdentifier(indexName)} $indexType ON" +
125+
s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")})" +
126+
s" ${indexPropertyList.mkString(" ")}"
144127
}
145128

146129
// SHOW INDEX syntax
@@ -150,21 +133,8 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
150133
indexName: String,
151134
tableName: String,
152135
options: JDBCOptions): Boolean = {
153-
val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableName)}"
154-
try {
155-
val rs = JdbcUtils.executeQuery(conn, options, sql)
156-
while (rs.next()) {
157-
val retrievedIndexName = rs.getString("key_name")
158-
if (conf.resolver(retrievedIndexName, indexName)) {
159-
return true
160-
}
161-
}
162-
false
163-
} catch {
164-
case _: Exception =>
165-
logWarning("Cannot retrieved index info.")
166-
false
167-
}
136+
val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableName)} WHERE key_name = '$indexName'"
137+
JdbcUtils.checkIfIndexExists(conn, sql, options)
168138
}
169139

170140
override def dropIndex(indexName: String, tableName: String): String = {

0 commit comments

Comments
 (0)