diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala index 0c6b2701c92b0..4f56f1f4ea1e7 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala @@ -17,21 +17,31 @@ package org.apache.spark.sql.jdbc.v2 +import java.util +import java.util.Collections + import scala.collection.JavaConverters._ import org.apache.logging.log4j.Level import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.connector.catalog.NamespaceChange +import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException +import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange} import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.tags.DockerTest @DockerTest private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerIntegrationFunSuite { val catalog = new JDBCTableCatalog() + private val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String] + private val schema: StructType = new StructType() + .add("id", IntegerType) + .add("data", StringType) + def builtinNamespaces: Array[Array[String]] test("listNamespaces: basic behavior") { @@ -60,4 +70,26 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte }.getMessage assert(msg.contains("Namespace 'foo' not found")) } + + test("Drop namespace") { + val ident1 = Identifier.of(Array("foo"), "tab") + // Drop empty namespace without cascade + catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava) + assert(catalog.namespaceExists(Array("foo")) === true) + catalog.dropNamespace(Array("foo"), cascade = false) + assert(catalog.namespaceExists(Array("foo")) === false) + + // Drop non empty namespace without cascade + catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava) + assert(catalog.namespaceExists(Array("foo")) === true) + catalog.createTable(ident1, schema, Array.empty, emptyProps) + intercept[NonEmptyNamespaceException] { + catalog.dropNamespace(Array("foo"), cascade = false) + } + + // Drop non empty namespace with cascade + assert(catalog.namespaceExists(Array("foo")) === true) + catalog.dropNamespace(Array("foo"), cascade = true) + assert(catalog.namespaceExists(Array("foo")) === false) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 7f68a73f8950a..cc40d19693b4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -1014,9 +1014,15 @@ object JdbcUtils extends Logging with SQLConfHelper { /** * Drops a namespace from the JDBC database. */ - def dropNamespace(conn: Connection, options: JDBCOptions, namespace: String): Unit = { + def dropNamespace( + conn: Connection, options: JDBCOptions, namespace: String, cascade: Boolean): Unit = { val dialect = JdbcDialects.get(options.url) - executeStatement(conn, options, s"DROP SCHEMA ${dialect.quoteIdentifier(namespace)}") + val dropCmd = if (cascade) { + s"DROP SCHEMA ${dialect.quoteIdentifier(namespace)} CASCADE" + } else { + s"DROP SCHEMA ${dialect.quoteIdentifier(namespace)}" + } + executeStatement(conn, options, dropCmd) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 1658f0dce7fbe..d06a28d952b38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -282,12 +282,9 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging namespace: Array[String], cascade: Boolean): Boolean = namespace match { case Array(db) if namespaceExists(namespace) => - if (listTables(Array(db)).nonEmpty) { - throw QueryExecutionErrors.namespaceNotEmptyError(namespace) - } JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException(s"Failed drop name space: $db", dialect) { - JdbcUtils.dropNamespace(conn, options, db) + JdbcUtils.dropNamespace(conn, options, db, cascade) true } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 356cb4ddbd008..fc33a57da7282 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -23,7 +23,7 @@ import java.util.Locale import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper -import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} +import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException} import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo @@ -215,6 +215,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { // https://www.postgresql.org/docs/14/errcodes-appendix.html case "42P07" => throw new IndexAlreadyExistsException(message, cause = Some(e)) case "42704" => throw new NoSuchIndexException(message, cause = Some(e)) + case "2BP01" => throw NonEmptyNamespaceException(message, cause = Some(e)) case _ => super.classifyException(message, e) } case unsupported: UnsupportedOperationException => throw unsupported