Skip to content


[SPARK-36914][SQL] Implement dropIndex and listIndexes in JDBC (MySQL…
Browse files Browse the repository at this point in the history
… dialect)

### What changes were proposed in this pull request?
This PR implements `dropIndex` and `listIndexes` in MySQL dialect

### Why are the changes needed?
As a subtask of the V2 Index support, this PR completes the implementation for JDBC V2 index support.

### Does this PR introduce _any_ user-facing change?
Yes, `dropIndex/listIndexes` in DS V2 JDBC

### How was this patch tested?
new tests

Closes apache#34236 from huaxingao/listIndexJDBC.

Authored-by: Huaxin Gao <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
huaxingao authored and chenzhx committed Apr 18, 2022
1 parent 52b36b0 commit ce63110
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
Expand Down Expand Up @@ -121,31 +119,22 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
assert(t.schema === expectedSchema)

override def testIndex(tbl: String): Unit = {
val loaded = Catalogs.load("mysql", conf)
val jdbcTable = loaded.asInstanceOf[TableCatalog]
.loadTable(Identifier.of(Array.empty[String], "new_table"))
assert(jdbcTable.indexExists("i1") == false)
assert(jdbcTable.indexExists("i2") == false)
override def supportsIndex: Boolean = true

override def testIndexProperties(jdbcTable: SupportsIndex): Unit = {
val properties = new util.Properties();
properties.put("KEY_BLOCK_SIZE", "10")
properties.put("COMMENT", "'this is a comment'")
jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
// MySQL doesn't allow property set on individual column, so use empty Array for
// column properties
jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
Array.empty[util.Map[NamedReference, util.Properties]], properties)

jdbcTable.createIndex("i2", "",
Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")),
Array.empty[util.Map[NamedReference, util.Properties]], new util.Properties)

assert(jdbcTable.indexExists("i1") == true)
assert(jdbcTable.indexExists("i2") == true)

val m = intercept[IndexAlreadyExistsException] {
jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
Array.empty[util.Map[NamedReference, util.Properties]], properties)
assert(m.contains("Failed to create index: i1 in new_table"))
var index = jdbcTable.listIndexes()
// The index property size is actually 1. Even though the index is created
// with properties "KEY_BLOCK_SIZE", "10" and "COMMENT", "'this is a comment'", when
// retrieving index using `SHOW INDEXES`, MySQL only returns `COMMENT`.
assert(index(0).properties.size == 1)
assert(index(0).properties.get("COMMENT").equals("this is a comment"))
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package org.apache.spark.sql.jdbc.v2

import java.util

import org.apache.log4j.Level

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Sample}
import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Sample}
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.aggregate.GeneralAggregateFunc
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper}
import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -186,6 +190,96 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

def supportsIndex: Boolean = false
def testIndexProperties(jdbcTable: SupportsIndex): Unit = {}

test("SPARK-36913: Test INDEX") {
if (supportsIndex) {
withTable(s"$catalogName.new_table") {
sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT," +
s" col4 INT, col5 INT)")
val loaded = Catalogs.load(catalogName, conf)
val jdbcTable = loaded.asInstanceOf[TableCatalog]
.loadTable(Identifier.of(Array.empty[String], "new_table"))
assert(jdbcTable.indexExists("i1") == false)
assert(jdbcTable.indexExists("i2") == false)

val properties = new util.Properties();
val indexType = "DUMMY"
var m = intercept[UnsupportedOperationException] {
jdbcTable.createIndex("i1", indexType, Array(FieldReference("col1")),
Array.empty[util.Map[NamedReference, util.Properties]], properties)
assert(m.contains(s"Index Type $indexType is not supported." +
s" The supported Index Types are: BTREE and HASH"))

jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
Array.empty[util.Map[NamedReference, util.Properties]], properties)

jdbcTable.createIndex("i2", "",
Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")),
Array.empty[util.Map[NamedReference, util.Properties]], properties)

assert(jdbcTable.indexExists("i1") == true)
assert(jdbcTable.indexExists("i2") == true)

m = intercept[IndexAlreadyExistsException] {
jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
Array.empty[util.Map[NamedReference, util.Properties]], properties)
assert(m.contains("Failed to create index: i1 in new_table"))

var index = jdbcTable.listIndexes()
assert(index.length == 2)

var cols = index(0).columns
assert(cols.length == 1)
assert(index(0).properties.size == 0)

cols = index(1).columns
assert(cols.length == 3)
assert(index(1).properties.size == 0)

assert(jdbcTable.indexExists("i1") == false)
assert(jdbcTable.indexExists("i2") == true)

index = jdbcTable.listIndexes()
assert(index.length == 1)

cols = index(0).columns
assert(cols.length == 3)

assert(jdbcTable.indexExists("i1") == false)
assert(jdbcTable.indexExists("i2") == false)
index = jdbcTable.listIndexes()
assert(index.length == 0)

m = intercept[NoSuchIndexException] {
assert(m.contains("Failed to drop index: i2"))


def supportsTableSample: Boolean = false

private def samplePushed(df: DataFrame): Boolean = {
Expand Down Expand Up @@ -219,16 +313,6 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

def testIndex(tbl: String): Unit = {}

test("SPARK-36913: Test INDEX") {
withTable(s"$catalogName.new_table") {
sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, col4 INT, col5 INT)")

test("SPARK-37038: Test TABLESAMPLE") {
if (supportsTableSample) {
withTable(s"$catalogName.new_table") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public interface SupportsIndex extends Table {
* @param columns the columns on which index to be created
* @param columnsProperties the properties of the columns on which index to be created
* @param properties the properties of the index to be created
* @throws IndexAlreadyExistsException If the index already exists (optional)
* @throws IndexAlreadyExistsException If the index already exists.
void createIndex(String indexName,
String indexType,
Expand All @@ -55,10 +55,9 @@ void createIndex(String indexName,
* Drops the index with the given name.
* @param indexName the name of the index to be dropped.
* @return true if the index is dropped
* @throws NoSuchIndexException If the index does not exist (optional)
* @throws NoSuchIndexException If the index does not exist.
boolean dropIndex(String indexName) throws NoSuchIndexException;
void dropIndex(String indexName) throws NoSuchIndexException;

* Checks whether an index exists in this table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,25 @@ public TableIndex(
* @return the Index name.
String indexName() { return indexName; }
public String indexName() { return indexName; }

* @return the indexType of this Index.
String indexType() { return indexType; }
public String indexType() { return indexType; }

* @return the column(s) this Index is on. Could be multi columns (a multi-column index).
NamedReference[] columns() { return columns; }
public NamedReference[] columns() { return columns; }

* @return the map of column and column property map.
Map<NamedReference, Properties> columnProperties() { return columnProperties; }
public Map<NamedReference, Properties> columnProperties() { return columnProperties; }

* Returns the index properties.
Properties properties() {
return properties;
public Properties properties() { return properties; }
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,5 @@ class NoSuchPartitionsException(message: String) extends AnalysisException(messa
class NoSuchTempFunctionException(func: String)
extends AnalysisException(s"Temporary function '$func' not found")

class NoSuchIndexException(indexName: String)
extends AnalysisException(s"Index '$indexName' not found")
class NoSuchIndexException(message: String, cause: Option[Throwable] = None)
extends AnalysisException(message, cause = cause)
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
Expand Down Expand Up @@ -1050,6 +1051,29 @@ object JdbcUtils extends Logging {
dialect.indexExists(conn, indexName, tableName, options)

* Drop an index.
def dropIndex(
conn: Connection,
indexName: String,
tableName: String,
options: JDBCOptions): Unit = {
val dialect = JdbcDialects.get(options.url)
executeStatement(conn, options, dialect.dropIndex(indexName, tableName))

* List all the indexes in a table.
def listIndexes(
conn: Connection,
tableName: String,
options: JDBCOptions): Array[TableIndex] = {
val dialect = JdbcDialects.get(options.url)
dialect.listIndexes(conn, tableName, options)

private def executeStatement(conn: Connection, options: JDBCOptions, sql: String): Unit = {
val statement = conn.createStatement
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,18 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt

override def dropIndex(indexName: String): Boolean = {
throw new UnsupportedOperationException("dropIndex is not supported yet")
override def dropIndex(indexName: String): Unit = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
JdbcUtils.classifyException(s"Failed to drop index: $indexName",
JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.dropIndex(conn, indexName, name, jdbcOptions)

override def listIndexes(): Array[TableIndex] = {
throw new UnsupportedOperationException("listIndexes is not supported yet")
JdbcUtils.withConnection(jdbcOptions) { conn =>
JdbcUtils.listIndexes(conn, name, jdbcOptions)
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Avg, Count, CountStar, Max, Min, Sum}
import org.apache.spark.sql.errors.QueryCompilationErrors
Expand Down Expand Up @@ -327,14 +328,15 @@ abstract class JdbcDialect extends Serializable with Logging{

* Creates an index.
* Build a create index SQL statement.
* @param indexName the name of the index to be created
* @param indexType the type of the index to be created
* @param tableName the table on which index to be created
* @param columns the columns on which index to be created
* @param columnsProperties the properties of the columns on which index to be created
* @param properties the properties of the index to be created
* @return the SQL statement to use for creating the index.
def createIndex(
indexName: String,
Expand Down Expand Up @@ -363,6 +365,27 @@ abstract class JdbcDialect extends Serializable with Logging{
throw new UnsupportedOperationException("indexExists is not supported")

* Build a drop index SQL statement.
* @param indexName the name of the index to be dropped.
* @param tableName the table name on which index to be dropped.
* @return the SQL statement to use for dropping the index.
def dropIndex(indexName: String, tableName: String): String = {
throw new UnsupportedOperationException("dropIndex is not supported")

* Lists all the indexes in this table.
def listIndexes(
conn: Connection,
tableName: String,
options: JDBCOptions): Array[TableIndex] = {
throw new UnsupportedOperationException("listIndexes is not supported")

* Gets a dialect exception, classifies it and wraps it by `AnalysisException`.
* @param message The error message to be placed to the returned exception.
Expand Down

0 comments on commit ce63110

Please sign in to comment.