Skip to content

Commit c01a5f1

Browse files
committed
[SPARK-36914][SQL] Implement dropIndex and listIndexes in JDBC (MySQL dialect)
1 parent 07ecbc4 commit c01a5f1

File tree

9 files changed

+212
-45
lines changed

9 files changed

+212
-45
lines changed

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala

+11-22
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ import org.scalatest.time.SpanSugar._
2424

2525
import org.apache.spark.SparkConf
2626
import org.apache.spark.sql.AnalysisException
27-
import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
28-
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
2927
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
3028
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
3129
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
@@ -121,31 +119,22 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
121119
assert(t.schema === expectedSchema)
122120
}
123121

124-
override def testIndex(tbl: String): Unit = {
125-
val loaded = Catalogs.load("mysql", conf)
126-
val jdbcTable = loaded.asInstanceOf[TableCatalog]
127-
.loadTable(Identifier.of(Array.empty[String], "new_table"))
128-
.asInstanceOf[SupportsIndex]
129-
assert(jdbcTable.indexExists("i1") == false)
130-
assert(jdbcTable.indexExists("i2") == false)
122+
override def supportsIndex: Boolean = true
131123

124+
override def testIndexProperties(jdbcTable: SupportsIndex): Unit = {
132125
val properties = new util.Properties();
133126
properties.put("KEY_BLOCK_SIZE", "10")
134127
properties.put("COMMENT", "'this is a comment'")
135-
jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
128+
// MySQL doesn't allow property set on individual column, so use empty Array for
129+
// column properties
130+
jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
136131
Array.empty[util.Map[NamedReference, util.Properties]], properties)
137132

138-
jdbcTable.createIndex("i2", "",
139-
Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")),
140-
Array.empty[util.Map[NamedReference, util.Properties]], new util.Properties)
141-
142-
assert(jdbcTable.indexExists("i1") == true)
143-
assert(jdbcTable.indexExists("i2") == true)
144-
145-
val m = intercept[IndexAlreadyExistsException] {
146-
jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
147-
Array.empty[util.Map[NamedReference, util.Properties]], properties)
148-
}.getMessage
149-
assert(m.contains("Failed to create index: i1 in new_table"))
133+
var index = jdbcTable.listIndexes()
134+
// The index property size is actually 1. Even though the index is created
135+
// with properties "KEY_BLOCK_SIZE", "10" and "COMMENT", "'this is a comment'", when
136+
// retrieving index using `SHOW INDEXES`, MySQL only returns `COMMENT`.
137+
assert(index(0).properties.size == 1)
138+
assert(index(0).properties.get("COMMENT").equals("this is a comment"))
150139
}
151140
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala

+83-4
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,15 @@
1717

1818
package org.apache.spark.sql.jdbc.v2
1919

20+
import java.util
21+
2022
import org.apache.log4j.Level
2123

2224
import org.apache.spark.sql.AnalysisException
25+
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
26+
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
27+
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
28+
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
2329
import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
2430
import org.apache.spark.sql.test.SharedSparkSession
2531
import org.apache.spark.sql.types._
@@ -181,12 +187,85 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
181187
}
182188
}
183189

184-
def testIndex(tbl: String): Unit = {}
190+
def supportsIndex: Boolean = false
191+
def testIndexProperties(jdbcTable: SupportsIndex): Unit = {}
185192

186193
test("SPARK-36913: Test INDEX") {
187-
withTable(s"$catalogName.new_table") {
188-
sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, col4 INT, col5 INT)")
189-
testIndex(s"$catalogName.new_table")
194+
if (supportsIndex) {
195+
withTable(s"$catalogName.new_table") {
196+
sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT," +
197+
s" col4 INT, col5 INT)")
198+
val loaded = Catalogs.load(catalogName, conf)
199+
val jdbcTable = loaded.asInstanceOf[TableCatalog]
200+
.loadTable(Identifier.of(Array.empty[String], "new_table"))
201+
.asInstanceOf[SupportsIndex]
202+
assert(jdbcTable.indexExists("i1") == false)
203+
assert(jdbcTable.indexExists("i2") == false)
204+
205+
val properties = new util.Properties();
206+
jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
207+
Array.empty[util.Map[NamedReference, util.Properties]], properties)
208+
209+
jdbcTable.createIndex("i2", "",
210+
Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")),
211+
Array.empty[util.Map[NamedReference, util.Properties]], properties)
212+
213+
assert(jdbcTable.indexExists("i1") == true)
214+
assert(jdbcTable.indexExists("i2") == true)
215+
216+
var m = intercept[IndexAlreadyExistsException] {
217+
jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
218+
Array.empty[util.Map[NamedReference, util.Properties]], properties)
219+
}.getMessage
220+
assert(m.contains("Failed to create index: i1 in new_table"))
221+
222+
var index = jdbcTable.listIndexes()
223+
assert(index.length == 2)
224+
225+
assert(index(0).indexName.equals("i1"))
226+
assert(index(0).indexType.equals("BTREE"))
227+
var cols = index(0).columns
228+
assert(cols.length == 1)
229+
assert(cols(0).describe().equals("col1"))
230+
assert(index(0).properties.size == 0)
231+
232+
assert(index(1).indexName.equals("i2"))
233+
assert(index(1).indexType.equals("BTREE"))
234+
cols = index(1).columns
235+
assert(cols.length == 3)
236+
assert(cols(0).describe().equals("col2"))
237+
assert(cols(1).describe().equals("col3"))
238+
assert(cols(2).describe().equals("col5"))
239+
assert(index(1).properties.size == 0)
240+
241+
jdbcTable.dropIndex("i1")
242+
assert(jdbcTable.indexExists("i1") == false)
243+
assert(jdbcTable.indexExists("i2") == true)
244+
245+
index = jdbcTable.listIndexes()
246+
assert(index.length == 1)
247+
248+
assert(index(0).indexName.equals("i2"))
249+
assert(index(0).indexType.equals("BTREE"))
250+
cols = index(0).columns
251+
assert(cols.length == 3)
252+
assert(cols(0).describe().equals("col2"))
253+
assert(cols(1).describe().equals("col3"))
254+
assert(cols(2).describe().equals("col5"))
255+
256+
jdbcTable.dropIndex("i2")
257+
assert(jdbcTable.indexExists("i1") == false)
258+
assert(jdbcTable.indexExists("i2") == false)
259+
index = jdbcTable.listIndexes()
260+
assert(index.length == 0)
261+
262+
m = intercept[NoSuchIndexException] {
263+
jdbcTable.dropIndex("i2")
264+
}.getMessage
265+
assert(m.contains("Failed to drop index: i2"))
266+
267+
testIndexProperties(jdbcTable)
268+
}
190269
}
191270
}
192271
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,9 @@ void createIndex(String indexName,
5555
* Drops the index with the given name.
5656
*
5757
* @param indexName the name of the index to be dropped.
58-
* @return true if the index is dropped
5958
* @throws NoSuchIndexException If the index does not exist (optional)
6059
*/
61-
boolean dropIndex(String indexName) throws NoSuchIndexException;
60+
void dropIndex(String indexName) throws NoSuchIndexException;
6261

6362
/**
6463
* Checks whether an index exists in this table.

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -53,27 +53,30 @@ public TableIndex(
5353
/**
5454
* @return the Index name.
5555
*/
56-
String indexName() { return indexName; }
56+
public String indexName() { return indexName; }
5757

5858
/**
5959
* @return the indexType of this Index.
6060
*/
61-
String indexType() { return indexType; }
61+
public String indexType() { return indexType; }
6262

6363
/**
6464
* @return the column(s) this Index is on. Could be multi columns (a multi-column index).
6565
*/
66-
NamedReference[] columns() { return columns; }
66+
public NamedReference[] columns() { return columns; }
67+
68+
/**
69+
* set columns using the passed in param columns
70+
*/
71+
public void columns_(NamedReference[] columns) { this.columns = columns; }
6772

6873
/**
6974
* @return the map of column and column property map.
7075
*/
71-
Map<NamedReference, Properties> columnProperties() { return columnProperties; }
76+
public Map<NamedReference, Properties> columnProperties() { return columnProperties; }
7277

7378
/**
7479
* Returns the index properties.
7580
*/
76-
Properties properties() {
77-
return properties;
78-
}
81+
public Properties properties() { return properties; }
7982
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -108,5 +108,5 @@ case class NoSuchPartitionsException(override val message: String)
108108
case class NoSuchTempFunctionException(func: String)
109109
extends AnalysisException(s"Temporary function '$func' not found")
110110

111-
class NoSuchIndexException(indexName: String)
112-
extends AnalysisException(s"Index '$indexName' not found")
111+
class NoSuchIndexException(message: String, cause: Option[Throwable] = None)
112+
extends AnalysisException(message, cause = cause)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

+24
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
3838
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
3939
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
4040
import org.apache.spark.sql.connector.catalog.TableChange
41+
import org.apache.spark.sql.connector.catalog.index.TableIndex
4142
import org.apache.spark.sql.connector.expressions.NamedReference
4243
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
4344
import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
@@ -1040,6 +1041,29 @@ object JdbcUtils extends Logging with SQLConfHelper {
10401041
dialect.indexExists(conn, indexName, tableName, options)
10411042
}
10421043

1044+
/**
1045+
* Drop an index.
1046+
*/
1047+
def dropIndex(
1048+
conn: Connection,
1049+
indexName: String,
1050+
tableName: String,
1051+
options: JDBCOptions): Unit = {
1052+
val dialect = JdbcDialects.get(options.url)
1053+
executeStatement(conn, options, dialect.dropIndex(indexName, tableName))
1054+
}
1055+
1056+
/**
1057+
* List all the indexes in a table.
1058+
*/
1059+
def listIndexes(
1060+
conn: Connection,
1061+
tableName: String,
1062+
options: JDBCOptions): Array[TableIndex] = {
1063+
val dialect = JdbcDialects.get(options.url)
1064+
dialect.listIndexes(conn, tableName, options)
1065+
}
1066+
10431067
private def executeStatement(conn: Connection, options: JDBCOptions, sql: String): Unit = {
10441068
val statement = conn.createStatement
10451069
try {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala

+10-3
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,18 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
7373
}
7474
}
7575

76-
override def dropIndex(indexName: String): Boolean = {
77-
throw new UnsupportedOperationException("dropIndex is not supported yet")
76+
override def dropIndex(indexName: String): Unit = {
77+
JdbcUtils.withConnection(jdbcOptions) { conn =>
78+
JdbcUtils.classifyException(s"Failed to drop index: $indexName",
79+
JdbcDialects.get(jdbcOptions.url)) {
80+
JdbcUtils.dropIndex(conn, indexName, name, jdbcOptions)
81+
}
82+
}
7883
}
7984

8085
override def listIndexes(): Array[TableIndex] = {
81-
throw new UnsupportedOperationException("listIndexes is not supported yet")
86+
JdbcUtils.withConnection(jdbcOptions) { conn =>
87+
JdbcUtils.listIndexes(conn, name, jdbcOptions)
88+
}
8289
}
8390
}

sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala

+24-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.AnalysisException
3131
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
3232
import org.apache.spark.sql.connector.catalog.TableChange
3333
import org.apache.spark.sql.connector.catalog.TableChange._
34+
import org.apache.spark.sql.connector.catalog.index.TableIndex
3435
import org.apache.spark.sql.connector.expressions.NamedReference
3536
import org.apache.spark.sql.errors.QueryCompilationErrors
3637
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
@@ -290,14 +291,15 @@ abstract class JdbcDialect extends Serializable with Logging{
290291
}
291292

292293
/**
293-
* Creates an index.
294+
* Build a create index SQL statement.
294295
*
295296
* @param indexName the name of the index to be created
296297
* @param indexType the type of the index to be created
297298
* @param tableName the table on which index to be created
298299
* @param columns the columns on which index to be created
299300
* @param columnsProperties the properties of the columns on which index to be created
300301
* @param properties the properties of the index to be created
302+
* @return the SQL statement to use for creating the index.
301303
*/
302304
def createIndex(
303305
indexName: String,
@@ -326,6 +328,27 @@ abstract class JdbcDialect extends Serializable with Logging{
326328
throw new UnsupportedOperationException("indexExists is not supported")
327329
}
328330

331+
/**
332+
* Build a drop index SQL statement.
333+
*
334+
* @param indexName the name of the index to be dropped.
335+
* @param tableName the table name on which index to be dropped.
336+
* @return the SQL statement to use for dropping the index.
337+
*/
338+
def dropIndex(indexName: String, tableName: String): String = {
339+
throw new UnsupportedOperationException("dropIndex is not supported")
340+
}
341+
342+
/**
343+
* Lists all the indexes in this table.
344+
*/
345+
def listIndexes(
346+
conn: Connection,
347+
tableName: String,
348+
options: JDBCOptions): Array[TableIndex] = {
349+
throw new UnsupportedOperationException("listIndexes is not supported")
350+
}
351+
329352
/**
330353
* Gets a dialect exception, classifies it and wraps it by `AnalysisException`.
331354
* @param message The error message to be placed to the returned exception.

0 commit comments

Comments
 (0)