diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 7626c339cb420..9a87917d0e468 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1391,4 +1391,58 @@ private[spark] object QueryCompilationErrors { def functionUnsupportedInV2CatalogError(): Throwable = { new AnalysisException("function is only supported in v1 catalog") } + + def cannotOperateOnHiveDataSourceFilesError(operation: String): Throwable = { + new AnalysisException("Hive data source can only be used with tables, you can not " + + s"$operation files of Hive data source directly.") + } + + def setPathOptionAndCallWithPathParameterError(method: String): Throwable = { + new AnalysisException( + s""" + |There is a 'path' option set and $method() is called with a path + |parameter. Either remove the path option, or call $method() without the parameter. + |To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'. + """.stripMargin.replaceAll("\n", " ")) + } + + def userSpecifiedSchemaWithTextFileError(): Throwable = { + new AnalysisException("User specified schema not supported with `textFile`") + } + + def tempViewNotSupportStreamingWriteError(viewName: String): Throwable = { + new AnalysisException(s"Temporary view $viewName doesn't support streaming write") + } + + def streamingIntoViewNotSupportedError(viewName: String): Throwable = { + new AnalysisException(s"Streaming into views $viewName is not supported.") + } + + def inputSourceDiffersFromDataSourceProviderError( + source: String, tableName: String, table: CatalogTable): Throwable = { + new AnalysisException(s"The input source($source) is different from the table " + + s"$tableName's data source provider(${table.provider.get}).") + } + + def tableNotSupportStreamingWriteError(tableName: String, t: Table): Throwable = { + new AnalysisException(s"Table $tableName doesn't support streaming write - $t") + } + + def queryNameNotSpecifiedForMemorySinkError(): Throwable = { + new AnalysisException("queryName must be specified for memory sink") + } + + def sourceNotSupportedWithContinuousTriggerError(source: String): Throwable = { + new AnalysisException(s"'$source' is not supported with continuous trigger") + } + + def columnNotFoundInExistingColumnsError( + columnType: String, columnName: String, validColumnNames: Seq[String]): Throwable = { + new AnalysisException(s"$columnType column $columnName not found in " + + s"existing columns (${validColumnNames.mkString(", ")})") + } + + def operationNotSupportPartitioningError(operation: String): Throwable = { + new AnalysisException(s"'$operation' does not support partitioning") + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 5c60d72db32cb..5c752bf20b169 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -22,6 +22,7 @@ import java.net.URISyntaxException import java.sql.{SQLException, SQLFeatureNotSupportedException} import java.time.{DateTimeException, LocalDate} import java.time.temporal.ChronoField +import java.util.ConcurrentModificationException import org.apache.hadoop.fs.{FileStatus, Path} import org.codehaus.commons.compiler.{CompileException, InternalCompilerException} @@ -874,4 +875,13 @@ object QueryExecutionErrors { def cannotCastUTF8StringToDataTypeError(s: UTF8String, to: DataType): Throwable = { new DateTimeException(s"Cannot cast $s to $to.") } + + def registeringStreamingQueryListenerError(e: Exception): Throwable = { + new SparkException("Exception when registering StreamingQueryListener", e) + } + + def concurrentQueryInstanceError(): Throwable = { + new ConcurrentModificationException( + "Another instance of this query was just started by a concurrent session.") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index f83269e17b86b..1798f6e2c88bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -23,17 +23,17 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.Evolving import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils} import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider} import org.apache.spark.sql.connector.catalog.TableCapability._ +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, FileDataSourceV2} import org.apache.spark.sql.execution.streaming.StreamingRelation -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -195,8 +195,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo private def loadInternal(path: Option[String]): DataFrame = { if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) { - throw new AnalysisException("Hive data source can only be used with tables, you can not " + - "read files of Hive data source directly.") + throw QueryCompilationErrors.cannotOperateOnHiveDataSourceFilesError("read") } val optionsWithPath = if (path.isEmpty) { @@ -256,9 +255,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo def load(path: String): DataFrame = { if (!sparkSession.sessionState.conf.legacyPathOptionBehavior && extraOptions.contains("path")) { - throw new AnalysisException("There is a 'path' option set and load() is called with a path" + - "parameter. Either remove the path option, or call load() without the parameter. " + - s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.") + throw QueryCompilationErrors.setPathOptionAndCallWithPathParameterError("load") } loadInternal(Some(path)) } @@ -597,7 +594,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo */ def textFile(path: String): Dataset[String] = { if (userSpecifiedSchema.nonEmpty) { - throw new AnalysisException("User specified schema not supported with `textFile`") + throw QueryCompilationErrors.userSpecifiedSchemaWithTextFileError() } text(path).select("value").as[String](sparkSession.implicits.newStringEncoder) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index abca919038e50..b25aedbeca79e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -33,12 +33,12 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog, TableProvider, V1Table, V2TableWithV1Fallback} import org.apache.spark.sql.connector.catalog.TableCapability._ +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, FileDataSourceV2} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils @@ -276,9 +276,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { def start(path: String): StreamingQuery = { if (!df.sparkSession.sessionState.conf.legacyPathOptionBehavior && extraOptions.contains("path")) { - throw new AnalysisException("There is a 'path' option set and start() is called with a " + - "path parameter. Either remove the path option, or call start() without the parameter. " + - s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.") + throw QueryCompilationErrors.setPathOptionAndCallWithPathParameterError("start") } startInternal(Some(path)) } @@ -332,7 +330,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { // on analyzer to resolve it. Directly lookup only for temp view to provide clearer message. // TODO (SPARK-27484): we should add the writing node before the plan is analyzed. if (df.sparkSession.sessionState.catalog.isTempView(originalMultipartIdentifier)) { - throw new AnalysisException(s"Temporary view $tableName doesn't support streaming write") + throw QueryCompilationErrors.tempViewNotSupportStreamingWriteError(tableName) } if (!catalog.asTableCatalog.tableExists(identifier)) { @@ -361,12 +359,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { def writeToV1Table(table: CatalogTable): StreamingQuery = { if (table.tableType == CatalogTableType.VIEW) { - throw new AnalysisException(s"Streaming into views $tableName is not supported.") + throw QueryCompilationErrors.streamingIntoViewNotSupportedError(tableName) } require(table.provider.isDefined) if (source != table.provider.get) { - throw new AnalysisException(s"The input source($source) is different from the table " + - s"$tableName's data source provider(${table.provider.get}).") + throw QueryCompilationErrors.inputSourceDiffersFromDataSourceProviderError( + source, tableName, table) } format(table.provider.get) .option("path", new Path(table.location).toString).start() @@ -380,21 +378,19 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { writeToV1Table(t.v1Table) case t: V1Table => writeToV1Table(t.v1Table) - case t => throw new AnalysisException(s"Table $tableName doesn't support streaming " + - s"write - $t") + case t => throw QueryCompilationErrors.tableNotSupportStreamingWriteError(tableName, t) } } private def startInternal(path: Option[String]): StreamingQuery = { if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) { - throw new AnalysisException("Hive data source can only be used with tables, you can not " + - "write files of Hive data source directly.") + throw QueryCompilationErrors.cannotOperateOnHiveDataSourceFilesError("write") } if (source == SOURCE_NAME_MEMORY) { assertNotPartitioned(SOURCE_NAME_MEMORY) if (extraOptions.get("queryName").isEmpty) { - throw new AnalysisException("queryName must be specified for memory sink") + throw QueryCompilationErrors.queryNameNotSpecifiedForMemorySinkError() } val sink = new MemorySink() val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink, df.schema.toAttributes)) @@ -409,7 +405,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } else if (source == SOURCE_NAME_FOREACH_BATCH) { assertNotPartitioned(SOURCE_NAME_FOREACH_BATCH) if (trigger.isInstanceOf[ContinuousTrigger]) { - throw new AnalysisException(s"'$source' is not supported with continuous trigger") + throw QueryCompilationErrors.sourceNotSupportedWithContinuousTriggerError(source) } val sink = new ForeachBatchSink[T](foreachBatchWriter, ds.exprEnc) startQuery(sink, extraOptions) @@ -556,13 +552,13 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { private def normalize(columnName: String, columnType: String): String = { val validColumnNames = df.logicalPlan.output.map(_.name) validColumnNames.find(df.sparkSession.sessionState.analyzer.resolver(_, columnName)) - .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " + - s"existing columns (${validColumnNames.mkString(", ")})")) + .getOrElse(throw QueryCompilationErrors.columnNotFoundInExistingColumnsError( + columnType, columnName, validColumnNames)) } private def assertNotPartitioned(operation: String): Unit = { if (partitioningColumns.isDefined) { - throw new AnalysisException(s"'$operation' does not support partitioning") + throw QueryCompilationErrors.operationNotSupportPartitioningError(operation) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index b07a2f4e5562a..dd5194038a979 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -17,19 +17,19 @@ package org.apache.spark.sql.streaming -import java.util.{ConcurrentModificationException, UUID} +import java.util.UUID import java.util.concurrent.{TimeoutException, TimeUnit} import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.SparkException import org.apache.spark.annotation.Evolving import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.streaming.{WriteToStream, WriteToStreamStatement} import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef @@ -81,7 +81,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo } } catch { case e: Exception => - throw new SparkException("Exception when registering StreamingQueryListener", e) + throw QueryExecutionErrors.registeringStreamingQueryListenerError(e) } /** @@ -371,8 +371,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo val oldActiveQuery = sparkSession.sharedState.activeStreamingQueries.put( query.id, query.streamingQuery) // we need to put the StreamExecution, not the wrapper if (oldActiveQuery != null) { - throw new ConcurrentModificationException( - "Another instance of this query was just started by a concurrent session.") + throw QueryExecutionErrors.concurrentQueryInstanceError() } activeQueries.put(query.id, query) }