diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala index 3cb878774f2e9..67e81087f8fb0 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala @@ -24,8 +24,6 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkConf import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException -import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog} import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog @@ -121,31 +119,22 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { assert(t.schema === expectedSchema) } - override def testIndex(tbl: String): Unit = { - val loaded = Catalogs.load("mysql", conf) - val jdbcTable = loaded.asInstanceOf[TableCatalog] - .loadTable(Identifier.of(Array.empty[String], "new_table")) - .asInstanceOf[SupportsIndex] - assert(jdbcTable.indexExists("i1") == false) - assert(jdbcTable.indexExists("i2") == false) + override def supportsIndex: Boolean = true + override def testIndexProperties(jdbcTable: SupportsIndex): Unit = { val properties = new util.Properties(); properties.put("KEY_BLOCK_SIZE", "10") properties.put("COMMENT", "'this is a comment'") - jdbcTable.createIndex("i1", "", Array(FieldReference("col1")), + // MySQL doesn't allow property set on individual column, so use empty Array for + // column properties + jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")), Array.empty[util.Map[NamedReference, util.Properties]], properties) - jdbcTable.createIndex("i2", "", - Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")), - Array.empty[util.Map[NamedReference, util.Properties]], new util.Properties) - - assert(jdbcTable.indexExists("i1") == true) - assert(jdbcTable.indexExists("i2") == true) - - val m = intercept[IndexAlreadyExistsException] { - jdbcTable.createIndex("i1", "", Array(FieldReference("col1")), - Array.empty[util.Map[NamedReference, util.Properties]], properties) - }.getMessage - assert(m.contains("Failed to create index: i1 in new_table")) + var index = jdbcTable.listIndexes() + // The index property size is actually 1. Even though the index is created + // with properties "KEY_BLOCK_SIZE", "10" and "COMMENT", "'this is a comment'", when + // retrieving index using `SHOW INDEXES`, MySQL only returns `COMMENT`. + assert(index(0).properties.size == 1) + assert(index(0).properties.get("COMMENT").equals("this is a comment")) } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index f176726fd0af0..c7c18dab6d660 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -17,11 +17,15 @@ package org.apache.spark.sql.jdbc.v2 +import java.util + import org.apache.log4j.Level -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.catalyst.plans.logical.{Filter, Sample} +import org.apache.spark.sql.{AnalysisException, DataFrame} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Sample} +import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog} +import org.apache.spark.sql.connector.catalog.index.SupportsIndex +import org.apache.spark.sql.connector.expressions.aggregate.GeneralAggregateFunc import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper} import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite import org.apache.spark.sql.test.SharedSparkSession @@ -186,6 +190,96 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu } } + def supportsIndex: Boolean = false + def testIndexProperties(jdbcTable: SupportsIndex): Unit = {} + + test("SPARK-36913: Test INDEX") { + if (supportsIndex) { + withTable(s"$catalogName.new_table") { + sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT," + + s" col4 INT, col5 INT)") + val loaded = Catalogs.load(catalogName, conf) + val jdbcTable = loaded.asInstanceOf[TableCatalog] + .loadTable(Identifier.of(Array.empty[String], "new_table")) + .asInstanceOf[SupportsIndex] + assert(jdbcTable.indexExists("i1") == false) + assert(jdbcTable.indexExists("i2") == false) + + val properties = new util.Properties(); + val indexType = "DUMMY" + var m = intercept[UnsupportedOperationException] { + jdbcTable.createIndex("i1", indexType, Array(FieldReference("col1")), + Array.empty[util.Map[NamedReference, util.Properties]], properties) + }.getMessage + assert(m.contains(s"Index Type $indexType is not supported." + + s" The supported Index Types are: BTREE and HASH")) + + jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")), + Array.empty[util.Map[NamedReference, util.Properties]], properties) + + jdbcTable.createIndex("i2", "", + Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")), + Array.empty[util.Map[NamedReference, util.Properties]], properties) + + assert(jdbcTable.indexExists("i1") == true) + assert(jdbcTable.indexExists("i2") == true) + + m = intercept[IndexAlreadyExistsException] { + jdbcTable.createIndex("i1", "", Array(FieldReference("col1")), + Array.empty[util.Map[NamedReference, util.Properties]], properties) + }.getMessage + assert(m.contains("Failed to create index: i1 in new_table")) + + var index = jdbcTable.listIndexes() + assert(index.length == 2) + + assert(index(0).indexName.equals("i1")) + assert(index(0).indexType.equals("BTREE")) + var cols = index(0).columns + assert(cols.length == 1) + assert(cols(0).describe().equals("col1")) + assert(index(0).properties.size == 0) + + assert(index(1).indexName.equals("i2")) + assert(index(1).indexType.equals("BTREE")) + cols = index(1).columns + assert(cols.length == 3) + assert(cols(0).describe().equals("col2")) + assert(cols(1).describe().equals("col3")) + assert(cols(2).describe().equals("col5")) + assert(index(1).properties.size == 0) + + jdbcTable.dropIndex("i1") + assert(jdbcTable.indexExists("i1") == false) + assert(jdbcTable.indexExists("i2") == true) + + index = jdbcTable.listIndexes() + assert(index.length == 1) + + assert(index(0).indexName.equals("i2")) + assert(index(0).indexType.equals("BTREE")) + cols = index(0).columns + assert(cols.length == 3) + assert(cols(0).describe().equals("col2")) + assert(cols(1).describe().equals("col3")) + assert(cols(2).describe().equals("col5")) + + jdbcTable.dropIndex("i2") + assert(jdbcTable.indexExists("i1") == false) + assert(jdbcTable.indexExists("i2") == false) + index = jdbcTable.listIndexes() + assert(index.length == 0) + + m = intercept[NoSuchIndexException] { + jdbcTable.dropIndex("i2") + }.getMessage + assert(m.contains("Failed to drop index: i2")) + + testIndexProperties(jdbcTable) + } + } + } + def supportsTableSample: Boolean = false private def samplePushed(df: DataFrame): Boolean = { @@ -219,16 +313,6 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu scan.schema.names.sameElements(Seq(col)) } - - def testIndex(tbl: String): Unit = {} - - test("SPARK-36913: Test INDEX") { - withTable(s"$catalogName.new_table") { - sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, col4 INT, col5 INT)") - testIndex(s"$catalogName.new_table") - } - } - test("SPARK-37038: Test TABLESAMPLE") { if (supportsTableSample) { withTable(s"$catalogName.new_table") { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java index 24961e460cc26..4181cf5f25118 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java @@ -42,7 +42,7 @@ public interface SupportsIndex extends Table { * @param columns the columns on which index to be created * @param columnsProperties the properties of the columns on which index to be created * @param properties the properties of the index to be created - * @throws IndexAlreadyExistsException If the index already exists (optional) + * @throws IndexAlreadyExistsException If the index already exists. */ void createIndex(String indexName, String indexType, @@ -55,10 +55,9 @@ void createIndex(String indexName, * Drops the index with the given name. * * @param indexName the name of the index to be dropped. - * @return true if the index is dropped - * @throws NoSuchIndexException If the index does not exist (optional) + * @throws NoSuchIndexException If the index does not exist. */ - boolean dropIndex(String indexName) throws NoSuchIndexException; + void dropIndex(String indexName) throws NoSuchIndexException; /** * Checks whether an index exists in this table. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java index 99fce806a11b9..977ed8d6c7528 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java @@ -53,27 +53,25 @@ public TableIndex( /** * @return the Index name. */ - String indexName() { return indexName; } + public String indexName() { return indexName; } /** * @return the indexType of this Index. */ - String indexType() { return indexType; } + public String indexType() { return indexType; } /** * @return the column(s) this Index is on. Could be multi columns (a multi-column index). */ - NamedReference[] columns() { return columns; } + public NamedReference[] columns() { return columns; } /** * @return the map of column and column property map. */ - Map columnProperties() { return columnProperties; } + public Map columnProperties() { return columnProperties; } /** * Returns the index properties. */ - Properties properties() { - return properties; - } + public Properties properties() { return properties; } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala index 7a9f7b5c6bced..8b0710b2c1f19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala @@ -96,5 +96,5 @@ class NoSuchPartitionsException(message: String) extends AnalysisException(messa class NoSuchTempFunctionException(func: String) extends AnalysisException(s"Temporary function '$func' not found") -class NoSuchIndexException(indexName: String) - extends AnalysisException(s"Index '$indexName' not found") +class NoSuchIndexException(message: String, cause: Option[Throwable] = None) + extends AnalysisException(message, cause = cause) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 31c11568e35d5..2e21571939cf8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp} import org.apache.spark.sql.connector.catalog.TableChange +import org.apache.spark.sql.connector.catalog.index.TableIndex import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider @@ -1050,6 +1051,29 @@ object JdbcUtils extends Logging { dialect.indexExists(conn, indexName, tableName, options) } + /** + * Drop an index. + */ + def dropIndex( + conn: Connection, + indexName: String, + tableName: String, + options: JDBCOptions): Unit = { + val dialect = JdbcDialects.get(options.url) + executeStatement(conn, options, dialect.dropIndex(indexName, tableName)) + } + + /** + * List all the indexes in a table. + */ + def listIndexes( + conn: Connection, + tableName: String, + options: JDBCOptions): Array[TableIndex] = { + val dialect = JdbcDialects.get(options.url) + dialect.listIndexes(conn, tableName, options) + } + private def executeStatement(conn: Connection, options: JDBCOptions, sql: String): Unit = { val statement = conn.createStatement try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index 957d021963a7f..ba56643f4d980 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -73,11 +73,18 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt } } - override def dropIndex(indexName: String): Boolean = { - throw new UnsupportedOperationException("dropIndex is not supported yet") + override def dropIndex(indexName: String): Unit = { + JdbcUtils.withConnection(jdbcOptions) { conn => + JdbcUtils.classifyException(s"Failed to drop index: $indexName", + JdbcDialects.get(jdbcOptions.url)) { + JdbcUtils.dropIndex(conn, indexName, name, jdbcOptions) + } + } } override def listIndexes(): Array[TableIndex] = { - throw new UnsupportedOperationException("listIndexes is not supported yet") + JdbcUtils.withConnection(jdbcOptions) { conn => + JdbcUtils.listIndexes(conn, name, jdbcOptions) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index dcf9d0f0cfa52..dbf5e4c037d31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.TableChange._ +import org.apache.spark.sql.connector.catalog.index.TableIndex import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Avg, Count, CountStar, Max, Min, Sum} import org.apache.spark.sql.errors.QueryCompilationErrors @@ -327,7 +328,7 @@ abstract class JdbcDialect extends Serializable with Logging{ } /** - * Creates an index. + * Build a create index SQL statement. * * @param indexName the name of the index to be created * @param indexType the type of the index to be created @@ -335,6 +336,7 @@ abstract class JdbcDialect extends Serializable with Logging{ * @param columns the columns on which index to be created * @param columnsProperties the properties of the columns on which index to be created * @param properties the properties of the index to be created + * @return the SQL statement to use for creating the index. */ def createIndex( indexName: String, @@ -363,6 +365,27 @@ abstract class JdbcDialect extends Serializable with Logging{ throw new UnsupportedOperationException("indexExists is not supported") } + /** + * Build a drop index SQL statement. + * + * @param indexName the name of the index to be dropped. + * @param tableName the table name on which index to be dropped. + * @return the SQL statement to use for dropping the index. + */ + def dropIndex(indexName: String, tableName: String): String = { + throw new UnsupportedOperationException("dropIndex is not supported") + } + + /** + * Lists all the indexes in this table. + */ + def listIndexes( + conn: Connection, + tableName: String, + options: JDBCOptions): Array[TableIndex] = { + throw new UnsupportedOperationException("listIndexes is not supported") + } + /** * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. * @param message The error message to be placed to the returned exception. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 5c16ef6a947ba..7e85b3bbb84e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -25,8 +25,9 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper -import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException -import org.apache.spark.sql.connector.expressions.NamedReference +import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} +import org.apache.spark.sql.connector.catalog.index.TableIndex +import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.types.{BooleanType, DataType, FloatType, LongType, MetadataBuilder} @@ -127,10 +128,19 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { indexProperties = indexProperties + " " + s"$k $v" } } - + val iType = if (indexType.isEmpty) { + "" + } else { + if (indexType.length > 1 && !indexType.equalsIgnoreCase("BTREE") && + !indexType.equalsIgnoreCase("HASH")) { + throw new UnsupportedOperationException(s"Index Type $indexType is not supported." + + " The supported Index Types are: BTREE and HASH") + } + s"USING $indexType" + } // columnsProperties doesn't apply to MySQL so it is ignored - s"CREATE $indexType INDEX ${quoteIdentifier(indexName)} ON" + - s" ${quoteIdentifier(tableName)}" + s" (${columnList.mkString(", ")}) $indexProperties" + s"CREATE INDEX ${quoteIdentifier(indexName)} $iType ON" + + s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")}) $indexProperties" } // SHOW INDEX syntax @@ -157,17 +167,61 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { } } - override def classifyException(message: String, e: Throwable): AnalysisException = { - if (e.isInstanceOf[SQLException]) { - // Error codes are from - // https://mariadb.com/kb/en/mariadb-error-codes/#shared-mariadbmysql-error-codes - e.asInstanceOf[SQLException].getErrorCode match { - // ER_DUP_KEYNAME - case 1061 => - throw new IndexAlreadyExistsException(message, cause = Some(e)) - case _ => + override def dropIndex(indexName: String, tableName: String): String = { + s"DROP INDEX ${quoteIdentifier(indexName)} ON $tableName" + } + + // SHOW INDEX syntax + // https://dev.mysql.com/doc/refman/8.0/en/show-index.html + override def listIndexes( + conn: Connection, + tableName: String, + options: JDBCOptions): Array[TableIndex] = { + val sql = s"SHOW INDEXES FROM $tableName" + var indexMap: Map[String, TableIndex] = Map() + try { + val rs = JdbcUtils.executeQuery(conn, options, sql) + while (rs.next()) { + val indexName = rs.getString("key_name") + val colName = rs.getString("column_name") + val indexType = rs.getString("index_type") + val indexComment = rs.getString("Index_comment") + if (indexMap.contains(indexName)) { + val index = indexMap.get(indexName).get + val newIndex = new TableIndex(indexName, indexType, + index.columns() :+ FieldReference(colName), + index.columnProperties, index.properties) + indexMap += (indexName -> newIndex) + } else { + // The only property we are building here is `COMMENT` because it's the only one + // we can get from `SHOW INDEXES`. + val properties = new util.Properties(); + if (indexComment.nonEmpty) properties.put("COMMENT", indexComment) + val index = new TableIndex(indexName, indexType, Array(FieldReference(colName)), + new util.HashMap[NamedReference, util.Properties](), properties) + indexMap += (indexName -> index) + } } + } catch { + case _: Exception => + logWarning("Cannot retrieved index info.") + } + indexMap.values.toArray + } + + override def classifyException(message: String, e: Throwable): AnalysisException = { + e match { + case sqlException: SQLException => + sqlException.getErrorCode match { + // ER_DUP_KEYNAME + case 1061 => + throw new IndexAlreadyExistsException(message, cause = Some(e)) + case 1091 => + throw new NoSuchIndexException(message, cause = Some(e)) + case _ => super.classifyException(message, e) + } + case unsupported: UnsupportedOperationException => throw unsupported + case _ => super.classifyException(message, e) } - super.classifyException(message, e) } }