From b6b53044251b5e5fe9d045e9f4ad0063d429e639 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 1 Apr 2021 17:11:03 -0700 Subject: [PATCH 01/11] initial commit --- .../sql/catalyst/analysis/Analyzer.scala | 14 +++++++++- .../logical/TransformationAfterAnalysis.scala | 28 +++++++++++++++++++ .../catalyst/plans/logical/v2Commands.scala | 9 +++++- .../scala/org/apache/spark/sql/Dataset.scala | 5 ++-- .../analysis/ResolveSessionCatalog.scala | 21 +++++++------- .../spark/sql/execution/command/views.scala | 23 +++++++++------ .../datasources/v2/CacheTableExec.scala | 5 ++-- 7 files changed, 80 insertions(+), 25 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformationAfterAnalysis.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d7bc81cf4d4bf..6800558c82705 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -304,7 +304,9 @@ class Analyzer(override val catalogManager: CatalogManager) Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, - CleanupAliases) + CleanupAliases), + Batch("TransformAfterAnalysis", Once, + TransformAfterAnalysis) ) /** @@ -3789,6 +3791,16 @@ object CleanupAliases extends Rule[LogicalPlan] with AliasHelper { } } +/** + * A rule that calls `TransformationAfterAnalysis.transform` for a plan that needs to be transformed + * after all the analysis rules are run. + */ +object TransformAfterAnalysis extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case t: TransformationAfterAnalysis => t.transform + } +} + /** * Ignore event time watermark in batch query, which is only supported in Structured Streaming. * TODO: add this rule into analyzer rule list. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformationAfterAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformationAfterAnalysis.scala new file mode 100644 index 0000000000000..40e941ea203e4 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformationAfterAnalysis.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +/** + * A trait that exposes 'transform' which will be called by the `TransformAfterAnalysis` rule + * after all other analysis rules are called. One scenario to use this transformation is to + * remove any children of a logical plan so that they are not optimized; this is useful for + * commands that create a view or a cache because they need to work with analyzed plans. + */ +trait TransformationAfterAnalysis { + def transform: LogicalPlan +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 938c23a51128e..25d6d20654afd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -906,7 +906,14 @@ case class CacheTableAsSelect( plan: LogicalPlan, originalText: String, isLazy: Boolean, - options: Map[String, String]) extends Command + options: Map[String, String], + isPlanAnalyzed: Boolean = false) extends Command with TransformationAfterAnalysis { + // `plan` needs to be analyzed, but shouldn't be optimized. Thus, remove `plan` from + // children once the analysis phase is finished. + override def children: Seq[LogicalPlan] = if (!isPlanAnalyzed) plan :: Nil else Nil + + override def transform: LogicalPlan = copy(isPlanAnalyzed = true) +} /** * The logical plan of the UNCACHE TABLE command. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index fd02d0b131587..9f7023c4f3cc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3370,10 +3370,11 @@ class Dataset[T] private[sql]( comment = None, properties = Map.empty, originalText = None, - child = logicalPlan, + plan = logicalPlan, allowExisting = false, replace = replace, - viewType = viewType) + viewType = viewType, + isPlanAnalyzed = true) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index f9b9e5acb7fe8..347ba448b0afa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -482,7 +482,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case CreateViewStatement( tbl, userSpecifiedColumns, comment, properties, - originalText, child, allowExisting, replace, viewType) if child.resolved => + originalText, child, allowExisting, replace, viewType) => val v1TableName = if (viewType != PersistedView) { // temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name. @@ -491,15 +491,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) parseV1Table(tbl, "CREATE VIEW") } CreateViewCommand( - v1TableName.asTableIdentifier, - userSpecifiedColumns, - comment, - properties, - originalText, - child, - allowExisting, - replace, - viewType) + name = v1TableName.asTableIdentifier, + userSpecifiedColumns = userSpecifiedColumns, + comment = comment, + properties = properties, + originalText = originalText, + plan = child, + allowExisting = allowExisting, + replace = replace, + viewType = viewType, + isPlanAnalyzed = false) case ShowViews(resolved: ResolvedNamespace, pattern, output) => resolved match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index b302b268b3afc..0ae869775909a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, Pe import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, TransformationAfterAnalysis, View} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} @@ -62,15 +62,22 @@ case class CreateViewCommand( comment: Option[String], properties: Map[String, String], originalText: Option[String], - child: LogicalPlan, + plan: LogicalPlan, allowExisting: Boolean, replace: Boolean, - viewType: ViewType) - extends RunnableCommand { + viewType: ViewType, + isPlanAnalyzed: Boolean) + extends RunnableCommand with TransformationAfterAnalysis { import ViewHelper._ - override def innerChildren: Seq[QueryPlan[_]] = Seq(child) + override def innerChildren: Seq[QueryPlan[_]] = Seq(plan) + + // `plan` needs to be analyzed, but shouldn't be optimized. Thus, remove `plan` from + // children once the analysis phase is finished. + override def children: Seq[LogicalPlan] = if (!isPlanAnalyzed) plan :: Nil else Nil + + override def transform: LogicalPlan = copy(isPlanAnalyzed = true) if (viewType == PersistedView) { require(originalText.isDefined, "'originalText' must be provided to create permanent view") @@ -96,10 +103,8 @@ case class CreateViewCommand( } override def run(sparkSession: SparkSession): Seq[Row] = { - // If the plan cannot be analyzed, throw an exception and don't proceed. - val qe = sparkSession.sessionState.executePlan(child) - qe.assertAnalyzed() - val analyzedPlan = qe.analyzed + assert(isPlanAnalyzed) + val analyzedPlan = plan if (userSpecifiedColumns.nonEmpty && userSpecifiedColumns.length != analyzedPlan.output.length) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala index 56e008d1d95f8..3191ff608c081 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -101,10 +101,11 @@ case class CacheTableAsSelectExec( comment = None, properties = Map.empty, originalText = Some(originalText), - child = query, + plan = query, allowExisting = false, replace = false, - viewType = LocalTempView + viewType = LocalTempView, + isPlanAnalyzed = true ) ) dataFrameForCachedPlan.logicalPlan From 43f70b2319790e6746c53c6ab5255971468cc2b7 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 1 Apr 2021 18:56:35 -0700 Subject: [PATCH 02/11] fix compilation --- .../apache/spark/sql/catalyst/plans/logical/v2Commands.scala | 2 +- .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 92ad6f1d90c8d..235249aebccf9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -867,7 +867,7 @@ case class CacheTableAsSelect( originalText: String, isLazy: Boolean, options: Map[String, String], - isPlanAnalyzed: Boolean = false) extends Command with TransformationAfterAnalysis { + isPlanAnalyzed: Boolean = false) extends LeafCommand with TransformationAfterAnalysis { // `plan` needs to be analyzed, but shouldn't be optimized. Thus, remove `plan` from // children once the analysis phase is finished. override def children: Seq[LogicalPlan] = if (!isPlanAnalyzed) plan :: Nil else Nil diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 06bb7baed9ce5..e190b1766fb7a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -830,7 +830,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } trait LeafLike[T <: TreeNode[T]] { self: TreeNode[T] => - override final def children: Seq[T] = Nil + override def children: Seq[T] = Nil } trait UnaryLike[T <: TreeNode[T]] { self: TreeNode[T] => From 12fdbe9aea3775bd57b8fe04ecf9a944eadc7c8b Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 1 Apr 2021 21:23:56 -0700 Subject: [PATCH 03/11] fix test --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5e00298931536..7f809e9c627a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3709,7 +3709,9 @@ object CleanupAliases extends Rule[LogicalPlan] with AliasHelper { */ object TransformAfterAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case t: TransformationAfterAnalysis => t.transform + // Transform only if the plan is resolved so that unresolved plan will + // fail in checkAnalysis. + case t: TransformationAfterAnalysis if t.resolved => t.transform } } From e6e9061ea7da904007ce120cf6e0f209b9d56ca2 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 1 Apr 2021 23:42:31 -0700 Subject: [PATCH 04/11] Fix tests --- .../sql/catalyst/analysis/Analyzer.scala | 24 +++++++++---------- .../sql-tests/results/explain-aqe.sql.out | 2 +- .../sql-tests/results/explain.sql.out | 2 +- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7f809e9c627a2..c4431004af276 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3639,6 +3639,18 @@ class Analyzer(override val catalogManager: CatalogManager) } } } + + /** + * A rule that calls `TransformationAfterAnalysis.transform` for a plan that needs to be + * transformed after all the analysis rules are run. + */ + object TransformAfterAnalysis extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case t: TransformationAfterAnalysis if t.resolved => + checkAnalysis(t) + t.transform + } + } } /** @@ -3703,18 +3715,6 @@ object CleanupAliases extends Rule[LogicalPlan] with AliasHelper { } } -/** - * A rule that calls `TransformationAfterAnalysis.transform` for a plan that needs to be transformed - * after all the analysis rules are run. - */ -object TransformAfterAnalysis extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - // Transform only if the plan is resolved so that unresolved plan will - // fail in checkAnalysis. - case t: TransformationAfterAnalysis if t.resolved => t.transform - } -} - /** * Ignore event time watermark in batch query, which is only supported in Structured Streaming. * TODO: add this rule into analyzer rule list. diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index ddfab99b481c2..357445a806da4 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -913,7 +913,7 @@ Execute CreateViewCommand (1) Output: [] (2) CreateViewCommand -Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView +Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true (3) LogicalRelation Arguments: parquet, [key#x, val#x], CatalogTable( diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index 1f7f8f6615727..3d00872028fcb 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -858,7 +858,7 @@ Execute CreateViewCommand (1) Output: [] (2) CreateViewCommand -Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView +Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true (3) LogicalRelation Arguments: parquet, [key#x, val#x], CatalogTable( From ffceb113d32d272101e4f18c61e4d096d3736190 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 6 Apr 2021 21:36:42 -0700 Subject: [PATCH 05/11] address PR comments --- .../sql/catalyst/analysis/Analyzer.scala | 17 ++++---- .../sql/catalyst/plans/logical/Command.scala | 33 +++++++++++++++ .../catalyst/plans/logical/v2Commands.scala | 9 +---- .../spark/sql/catalyst/trees/TreeNode.scala | 40 ++++++++++++++++--- .../scala/org/apache/spark/sql/Dataset.scala | 3 +- .../analysis/ResolveSessionCatalog.scala | 3 +- .../spark/sql/execution/SparkStrategies.scala | 2 +- .../sql/execution/command/commands.scala | 15 ++++--- .../spark/sql/execution/command/views.scala | 21 +++++----- .../datasources/v2/CacheTableExec.scala | 3 +- .../sql-tests/results/explain-aqe.sql.out | 2 +- .../sql-tests/results/explain.sql.out | 2 +- 12 files changed, 104 insertions(+), 46 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c4431004af276..9259d17457ea6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -305,8 +305,8 @@ class Analyzer(override val catalogManager: CatalogManager) UpdateOuterReferences), Batch("Cleanup", fixedPoint, CleanupAliases), - Batch("TransformAfterAnalysis", Once, - TransformAfterAnalysis) + Batch("HandleAnalysisOnlyCommand", Once, + HandleAnalysisOnlyCommand) ) /** @@ -3641,14 +3641,15 @@ class Analyzer(override val catalogManager: CatalogManager) } /** - * A rule that calls `TransformationAfterAnalysis.transform` for a plan that needs to be - * transformed after all the analysis rules are run. + * A rule that marks a command as analyzed so that its children are removed to avoid + * being optimized. This rule should run after all other analysis rules are run. */ - object TransformAfterAnalysis extends Rule[LogicalPlan] { + object HandleAnalysisOnlyCommand extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case t: TransformationAfterAnalysis if t.resolved => - checkAnalysis(t) - t.transform + case c: AnalysisOnlyCommand if c.resolved => + checkAnalysis(c) + c.markAsAnalyzed() + c } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala index 94ead5e3edee9..47ff80dd0eba6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, UnaryLike} @@ -37,3 +38,35 @@ trait Command extends LogicalPlan { trait LeafCommand extends Command with LeafLike[LogicalPlan] trait UnaryCommand extends Command with UnaryLike[LogicalPlan] trait BinaryCommand extends Command with BinaryLike[LogicalPlan] + +/** + * A logical node that represents a command whose children are only analyzed, but not optimized. + */ +trait AnalysisOnlyCommand extends Command { + private var _isAnalyzed: Boolean = false + + def childrenToAnalyze: Seq[LogicalPlan] + + override def children: Seq[LogicalPlan] = if (_isAnalyzed) Nil else childrenToAnalyze + + def markAsAnalyzed(): Unit = { + if (!_isAnalyzed) { + // Since children will be removed once _isAnalyzed is set, ensure that there are no + // unresolved children before removing them. + if (childrenToAnalyze.exists(!_.resolved)) { + throw new AnalysisException( + "AnalysisOnlyCommand can be marked as analyzed only after all its children " + + "to analyze are resolved.") + } + _isAnalyzed = true + } + } + + override def clone(): LogicalPlan = { + val cloned = super.clone() + if (_isAnalyzed) { + cloned.asInstanceOf[AnalysisOnlyCommand].markAsAnalyzed() + } + cloned + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 235249aebccf9..c421ce2ce0316 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -866,13 +866,8 @@ case class CacheTableAsSelect( plan: LogicalPlan, originalText: String, isLazy: Boolean, - options: Map[String, String], - isPlanAnalyzed: Boolean = false) extends LeafCommand with TransformationAfterAnalysis { - // `plan` needs to be analyzed, but shouldn't be optimized. Thus, remove `plan` from - // children once the analysis phase is finished. - override def children: Seq[LogicalPlan] = if (!isPlanAnalyzed) plan :: Nil else Nil - - override def transform: LogicalPlan = copy(isPlanAnalyzed = true) + options: Map[String, String]) extends AnalysisOnlyCommand { + override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index e190b1766fb7a..6cfaded3adcfb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -111,13 +111,45 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { tags -= tag } + private var cachedChildren: Seq[BaseType] = null + /** * Returns a Seq of the children of this node. - * Children should not change. Immutability required for containsChild optimization */ def children: Seq[BaseType] - lazy val containsChild: Set[TreeNode[_]] = children.toSet + private var cachedChildrenSet: Set[TreeNode[_]] = null + + private def containsChild: Set[TreeNode[_]] = { + if (updateChildrenIfChanged() || cachedChildrenSet == null) { + cachedChildrenSet = cachedChildren.toSet + } + cachedChildrenSet + } + + @transient private var cachedAllChildren: Set[TreeNode[_]] = null + + private def allChildren: Set[TreeNode[_]] = { + if (updateChildrenIfChanged() || cachedAllChildren == null) { + cachedAllChildren = (cachedChildren ++ innerChildren).toSet[TreeNode[_]] + } + cachedAllChildren + } + + // Returns true if children have changed after updating the cached children. + private def updateChildrenIfChanged(): Boolean = { + def fastEquals(l: Seq[BaseType], r: Seq[BaseType]): Boolean = { + l.length == r.length && l.zip(r).forall { + case (a1, a2) => a1 fastEquals a2 + } + } + val curChildren = children + val changed = cachedChildren == null || !fastEquals(cachedChildren, children) + if (changed) { + cachedChildren = curChildren + } + changed + } // Copied from Scala 2.13.1 // github.com/scala/scala/blob/v2.13.1/src/library/scala/util/hashing/MurmurHash3.scala#L56-L73 @@ -532,8 +564,6 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { */ protected def stringArgs: Iterator[Any] = productIterator - private lazy val allChildren: Set[TreeNode[_]] = (children ++ innerChildren).toSet[TreeNode[_]] - /** Returns a string representing the arguments to this node, minus any children */ def argString(maxFields: Int): String = stringArgs.flatMap { case tn: TreeNode[_] if allChildren.contains(tn) => Nil @@ -830,7 +860,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } trait LeafLike[T <: TreeNode[T]] { self: TreeNode[T] => - override def children: Seq[T] = Nil + override final def children: Seq[T] = Nil } trait UnaryLike[T <: TreeNode[T]] { self: TreeNode[T] => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 9f7023c4f3cc7..6a3dbfeab2dd9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3373,8 +3373,7 @@ class Dataset[T] private[sql]( plan = logicalPlan, allowExisting = false, replace = replace, - viewType = viewType, - isPlanAnalyzed = true) + viewType = viewType) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 347ba448b0afa..cbce2ca95d503 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -499,8 +499,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) plan = child, allowExisting = allowExisting, replace = replace, - viewType = viewType, - isPlanAnalyzed = false) + viewType = viewType) case ShowViews(resolved: ResolvedNamespace, pattern, output) => resolved match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index ea83d590f21b9..ad285d830f855 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -610,7 +610,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object BasicOperators extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil - case r: RunnableCommand => ExecutedCommandExec(r) :: Nil + case r: BaseRunnableCommand => ExecutedCommandExec(r) :: Nil case MemoryPlan(sink, output) => val encoder = RowEncoder(StructType.fromAttributes(output)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index ac6e2ba9eba4f..851d8de3c7223 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Command, LeafCommand, LogicalPlan} import org.apache.spark.sql.connector.ExternalCommandRunner import org.apache.spark.sql.execution.{ExplainMode, LeafExecNode, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.SQLMetric @@ -34,10 +34,10 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap /** - * A logical command that is executed for its side-effects. `RunnableCommand`s are + * A logical command that is executed for its side-effects. `BaseRunnableCommand`s are * wrapped in `ExecutedCommand` during execution. */ -trait RunnableCommand extends LeafCommand { +trait BaseRunnableCommand extends Command { // The map used to record the metrics of running the command. This will be passed to // `ExecutedCommand` during query planning. @@ -47,12 +47,17 @@ trait RunnableCommand extends LeafCommand { } /** - * A physical operator that executes the run method of a `RunnableCommand` and + * A runnable command that has no children. + */ +trait RunnableCommand extends BaseRunnableCommand with LeafCommand + +/** + * A physical operator that executes the run method of a `BaseRunnableCommand` and * saves the result to prevent multiple executions. * * @param cmd the `RunnableCommand` this operator will run. */ -case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode { +case class ExecutedCommandExec(cmd: BaseRunnableCommand) extends LeafExecNode { override lazy val metrics: Map[String, SQLMetric] = cmd.metrics diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 0ae869775909a..05fdcfe195a1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, Pe import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, TransformationAfterAnalysis, View} +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} @@ -48,8 +48,8 @@ import org.apache.spark.sql.util.SchemaUtils * @param properties the properties of this view. * @param originalText the original SQL text of this view, can be None if this view is created via * Dataset API. - * @param child the logical plan that represents the view; this is used to generate the logical - * plan for temporary view and the view schema. + * @param plan the logical plan that represents the view; this is used to generate the logical + * plan for temporary view and the view schema. * @param allowExisting if true, and if the view already exists, noop; if false, and if the view * already exists, throws analysis exception. * @param replace if true, and if the view already exists, updates it; if false, and if the view @@ -65,19 +65,14 @@ case class CreateViewCommand( plan: LogicalPlan, allowExisting: Boolean, replace: Boolean, - viewType: ViewType, - isPlanAnalyzed: Boolean) - extends RunnableCommand with TransformationAfterAnalysis { + viewType: ViewType) extends BaseRunnableCommand with AnalysisOnlyCommand { import ViewHelper._ override def innerChildren: Seq[QueryPlan[_]] = Seq(plan) - // `plan` needs to be analyzed, but shouldn't be optimized. Thus, remove `plan` from - // children once the analysis phase is finished. - override def children: Seq[LogicalPlan] = if (!isPlanAnalyzed) plan :: Nil else Nil - - override def transform: LogicalPlan = copy(isPlanAnalyzed = true) + // `plan` needs to be analyzed, but shouldn't be optimized so that caching works correctly. + override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil if (viewType == PersistedView) { require(originalText.isDefined, "'originalText' must be provided to create permanent view") @@ -103,7 +98,9 @@ case class CreateViewCommand( } override def run(sparkSession: SparkSession): Seq[Row] = { - assert(isPlanAnalyzed) + if (children.nonEmpty) { + throw new AnalysisException("The logical plan to represent the view is not analyzed.") + } val analyzedPlan = plan if (userSpecifiedColumns.nonEmpty && diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala index 3191ff608c081..9a9f1126fc1c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -104,8 +104,7 @@ case class CacheTableAsSelectExec( plan = query, allowExisting = false, replace = false, - viewType = LocalTempView, - isPlanAnalyzed = true + viewType = LocalTempView ) ) dataFrameForCachedPlan.logicalPlan diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index 357445a806da4..ddfab99b481c2 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -913,7 +913,7 @@ Execute CreateViewCommand (1) Output: [] (2) CreateViewCommand -Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true +Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView (3) LogicalRelation Arguments: parquet, [key#x, val#x], CatalogTable( diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index 3d00872028fcb..1f7f8f6615727 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -858,7 +858,7 @@ Execute CreateViewCommand (1) Output: [] (2) CreateViewCommand -Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true +Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView (3) LogicalRelation Arguments: parquet, [key#x, val#x], CatalogTable( From acb74a115972fa5e45e9f212760b09e1c18bd462 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 6 Apr 2021 21:48:02 -0700 Subject: [PATCH 06/11] remove unused file --- .../logical/TransformationAfterAnalysis.scala | 28 ------------------- 1 file changed, 28 deletions(-) delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformationAfterAnalysis.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformationAfterAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformationAfterAnalysis.scala deleted file mode 100644 index 40e941ea203e4..0000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformationAfterAnalysis.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.plans.logical - -/** - * A trait that exposes 'transform' which will be called by the `TransformAfterAnalysis` rule - * after all other analysis rules are called. One scenario to use this transformation is to - * remove any children of a logical plan so that they are not optimized; this is useful for - * commands that create a view or a cache because they need to work with analyzed plans. - */ -trait TransformationAfterAnalysis { - def transform: LogicalPlan -} From b78cfdb896d8ae0d6da8e96631749ad47fa35623 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 7 Apr 2021 19:14:03 -0700 Subject: [PATCH 07/11] address PR comment --- .../sql/catalyst/analysis/Analyzer.scala | 1 - .../sql/catalyst/plans/logical/Command.scala | 32 +++------------- .../catalyst/plans/logical/v2Commands.scala | 5 ++- .../spark/sql/catalyst/trees/TreeNode.scala | 38 ++----------------- .../spark/sql/execution/command/views.scala | 10 +++-- .../datasources/v2/CacheTableExec.scala | 26 ++++++------- .../sql-tests/results/explain-aqe.sql.out | 2 +- .../sql-tests/results/explain.sql.out | 2 +- 8 files changed, 35 insertions(+), 81 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9259d17457ea6..789c84d1b5d1c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3649,7 +3649,6 @@ class Analyzer(override val catalogManager: CatalogManager) case c: AnalysisOnlyCommand if c.resolved => checkAnalysis(c) c.markAsAnalyzed() - c } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala index 47ff80dd0eba6..81ad92b3f9ecb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, UnaryLike} @@ -40,33 +39,12 @@ trait UnaryCommand extends Command with UnaryLike[LogicalPlan] trait BinaryCommand extends Command with BinaryLike[LogicalPlan] /** - * A logical node that represents a command whose children are only analyzed, but not optimized. + * A logical node that can be used for a command that requires its children to be only analyzed, + * but not optimized. */ trait AnalysisOnlyCommand extends Command { - private var _isAnalyzed: Boolean = false - + val isAnalyzed: Boolean def childrenToAnalyze: Seq[LogicalPlan] - - override def children: Seq[LogicalPlan] = if (_isAnalyzed) Nil else childrenToAnalyze - - def markAsAnalyzed(): Unit = { - if (!_isAnalyzed) { - // Since children will be removed once _isAnalyzed is set, ensure that there are no - // unresolved children before removing them. - if (childrenToAnalyze.exists(!_.resolved)) { - throw new AnalysisException( - "AnalysisOnlyCommand can be marked as analyzed only after all its children " + - "to analyze are resolved.") - } - _isAnalyzed = true - } - } - - override def clone(): LogicalPlan = { - val cloned = super.clone() - if (_isAnalyzed) { - cloned.asInstanceOf[AnalysisOnlyCommand].markAsAnalyzed() - } - cloned - } + override final def children: Seq[LogicalPlan] = if (isAnalyzed) Nil else childrenToAnalyze + def markAsAnalyzed(): LogicalPlan } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index c421ce2ce0316..13def0b7b746c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -866,8 +866,11 @@ case class CacheTableAsSelect( plan: LogicalPlan, originalText: String, isLazy: Boolean, - options: Map[String, String]) extends AnalysisOnlyCommand { + options: Map[String, String], + isAnalyzed: Boolean = false) extends AnalysisOnlyCommand { override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil + + override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 6cfaded3adcfb..06bb7baed9ce5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -111,45 +111,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { tags -= tag } - private var cachedChildren: Seq[BaseType] = null - /** * Returns a Seq of the children of this node. + * Children should not change. Immutability required for containsChild optimization */ def children: Seq[BaseType] - private var cachedChildrenSet: Set[TreeNode[_]] = null - - private def containsChild: Set[TreeNode[_]] = { - if (updateChildrenIfChanged() || cachedChildrenSet == null) { - cachedChildrenSet = cachedChildren.toSet - } - cachedChildrenSet - } - - @transient private var cachedAllChildren: Set[TreeNode[_]] = null - - private def allChildren: Set[TreeNode[_]] = { - if (updateChildrenIfChanged() || cachedAllChildren == null) { - cachedAllChildren = (cachedChildren ++ innerChildren).toSet[TreeNode[_]] - } - cachedAllChildren - } - - // Returns true if children have changed after updating the cached children. - private def updateChildrenIfChanged(): Boolean = { - def fastEquals(l: Seq[BaseType], r: Seq[BaseType]): Boolean = { - l.length == r.length && l.zip(r).forall { - case (a1, a2) => a1 fastEquals a2 - } - } - val curChildren = children - val changed = cachedChildren == null || !fastEquals(cachedChildren, children) - if (changed) { - cachedChildren = curChildren - } - changed - } + lazy val containsChild: Set[TreeNode[_]] = children.toSet // Copied from Scala 2.13.1 // github.com/scala/scala/blob/v2.13.1/src/library/scala/util/hashing/MurmurHash3.scala#L56-L73 @@ -564,6 +532,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { */ protected def stringArgs: Iterator[Any] = productIterator + private lazy val allChildren: Set[TreeNode[_]] = (children ++ innerChildren).toSet[TreeNode[_]] + /** Returns a string representing the arguments to this node, minus any children */ def argString(maxFields: Int): String = stringArgs.flatMap { case tn: TreeNode[_] if allChildren.contains(tn) => Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 05fdcfe195a1c..c9c6abc350976 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -55,6 +55,7 @@ import org.apache.spark.sql.util.SchemaUtils * @param replace if true, and if the view already exists, updates it; if false, and if the view * already exists, throws analysis exception. * @param viewType the expected view type to be created with this command. + * @param isAnalyzed whether this command is analyzed or not. */ case class CreateViewCommand( name: TableIdentifier, @@ -65,7 +66,8 @@ case class CreateViewCommand( plan: LogicalPlan, allowExisting: Boolean, replace: Boolean, - viewType: ViewType) extends BaseRunnableCommand with AnalysisOnlyCommand { + viewType: ViewType, + isAnalyzed: Boolean = false) extends BaseRunnableCommand with AnalysisOnlyCommand { import ViewHelper._ @@ -74,6 +76,8 @@ case class CreateViewCommand( // `plan` needs to be analyzed, but shouldn't be optimized so that caching works correctly. override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil + def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true) + if (viewType == PersistedView) { require(originalText.isDefined, "'originalText' must be provided to create permanent view") } @@ -98,8 +102,8 @@ case class CreateViewCommand( } override def run(sparkSession: SparkSession): Seq[Row] = { - if (children.nonEmpty) { - throw new AnalysisException("The logical plan to represent the view is not analyzed.") + if (!isAnalyzed) { + throw new AnalysisException("The logical plan that represents the view is not analyzed.") } val analyzedPlan = plan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala index 9a9f1126fc1c3..b8c8e0406519f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -94,19 +94,19 @@ case class CacheTableAsSelectExec( override lazy val relationName: String = tempViewName override lazy val planToCache: LogicalPlan = { - Dataset.ofRows(sparkSession, - CreateViewCommand( - name = TableIdentifier(tempViewName), - userSpecifiedColumns = Nil, - comment = None, - properties = Map.empty, - originalText = Some(originalText), - plan = query, - allowExisting = false, - replace = false, - viewType = LocalTempView - ) - ) + CreateViewCommand( + name = TableIdentifier(tempViewName), + userSpecifiedColumns = Nil, + comment = None, + properties = Map.empty, + originalText = Some(originalText), + plan = query, + allowExisting = false, + replace = false, + viewType = LocalTempView, + isAnalyzed = true + ).run(sparkSession) + dataFrameForCachedPlan.logicalPlan } diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index ddfab99b481c2..357445a806da4 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -913,7 +913,7 @@ Execute CreateViewCommand (1) Output: [] (2) CreateViewCommand -Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView +Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true (3) LogicalRelation Arguments: parquet, [key#x, val#x], CatalogTable( diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index 1f7f8f6615727..3d00872028fcb 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -858,7 +858,7 @@ Execute CreateViewCommand (1) Output: [] (2) CreateViewCommand -Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView +Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true (3) LogicalRelation Arguments: parquet, [key#x, val#x], CatalogTable( From 594981a643a2e3c183b56a44f00c52ad4267c517 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 7 Apr 2021 21:16:21 -0700 Subject: [PATCH 08/11] merge changes --- .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 2 +- .../scala/org/apache/spark/sql/execution/command/views.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 57620aaefb9e7..8aff1f918593c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -610,7 +610,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object BasicOperators extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil - case r: BaseRunnableCommand => ExecutedCommandExec(r) :: Nil + case r: RunnableCommand => ExecutedCommandExec(r) :: Nil case MemoryPlan(sink, output) => val encoder = RowEncoder(StructType.fromAttributes(output)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index c9c6abc350976..70ed50aa13ef1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -67,7 +67,7 @@ case class CreateViewCommand( allowExisting: Boolean, replace: Boolean, viewType: ViewType, - isAnalyzed: Boolean = false) extends BaseRunnableCommand with AnalysisOnlyCommand { + isAnalyzed: Boolean = false) extends RunnableCommand with AnalysisOnlyCommand { import ViewHelper._ From 0ab36530339428b947160b6f17184bbf54d0a8bd Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 9 Apr 2021 08:33:32 -0700 Subject: [PATCH 09/11] address PR comment --- .../spark/sql/catalyst/plans/logical/v2Commands.scala | 5 +---- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 3 ++- .../sql/catalyst/analysis/ResolveSessionCatalog.scala | 2 +- .../org/apache/spark/sql/execution/command/views.scala | 7 ++++++- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index c838ef2ae9265..a8d5a610fd401 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -820,10 +820,7 @@ case class RepairTable( case class AlterViewAs( child: LogicalPlan, originalText: String, - query: LogicalPlan) extends BinaryCommand { - override def left: LogicalPlan = child - override def right: LogicalPlan = query -} + query: LogicalPlan) extends UnaryCommand /** * The logical plan of the ALTER VIEW ... SET TBLPROPERTIES command. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 6a3dbfeab2dd9..540115b919a44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3373,7 +3373,8 @@ class Dataset[T] private[sql]( plan = logicalPlan, allowExisting = false, replace = replace, - viewType = viewType) + viewType = viewType, + isAnalyzed = true) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index cbce2ca95d503..7f3d0c621147e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -474,7 +474,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec, location) => AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec, location) - case AlterViewAs(ResolvedView(ident, _), originalText, query) if query.resolved => + case AlterViewAs(ResolvedView(ident, _), originalText, query) => AlterViewAsCommand( ident.asTableIdentifier, originalText, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 70ed50aa13ef1..2308ed7555476 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -239,12 +239,17 @@ case class CreateViewCommand( case class AlterViewAsCommand( name: TableIdentifier, originalText: String, - query: LogicalPlan) extends RunnableCommand { + query: LogicalPlan, + isAnalyzed: Boolean = false) extends RunnableCommand with AnalysisOnlyCommand { import ViewHelper._ override def innerChildren: Seq[QueryPlan[_]] = Seq(query) + override def childrenToAnalyze: Seq[LogicalPlan] = query :: Nil + + def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true) + override def run(session: SparkSession): Seq[Row] = { if (session.sessionState.catalog.isTempView(name)) { alterTemporaryView(session, query) From 782304402a75b29bac27f6cc167f002e8374ec55 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 9 Apr 2021 08:46:56 -0700 Subject: [PATCH 10/11] revert --- .../apache/spark/sql/catalyst/plans/logical/v2Commands.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index a8d5a610fd401..c838ef2ae9265 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -820,7 +820,10 @@ case class RepairTable( case class AlterViewAs( child: LogicalPlan, originalText: String, - query: LogicalPlan) extends UnaryCommand + query: LogicalPlan) extends BinaryCommand { + override def left: LogicalPlan = child + override def right: LogicalPlan = query +} /** * The logical plan of the ALTER VIEW ... SET TBLPROPERTIES command. From 111ef8be8bc03a62389aed4f0871b958714eb789 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 13 Apr 2021 20:18:34 -0700 Subject: [PATCH 11/11] add asserts --- .../spark/sql/catalyst/plans/logical/v2Commands.scala | 4 +++- .../org/apache/spark/sql/execution/command/views.scala | 8 ++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 84f3aacbf6d2a..bbc2b62880c62 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -1025,8 +1025,10 @@ case class CacheTableAsSelect( options: Map[String, String], isAnalyzed: Boolean = false) extends AnalysisOnlyCommand { override protected def withNewChildrenInternal( - newChildren: IndexedSeq[LogicalPlan]): CacheTableAsSelect = + newChildren: IndexedSeq[LogicalPlan]): CacheTableAsSelect = { + assert(!isAnalyzed) copy(plan = newChildren.head) + } override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 745d463e960b9..10ec4be7d9df3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -72,8 +72,10 @@ case class CreateViewCommand( import ViewHelper._ override protected def withNewChildrenInternal( - newChildren: IndexedSeq[LogicalPlan]): CreateViewCommand = + newChildren: IndexedSeq[LogicalPlan]): CreateViewCommand = { + assert(!isAnalyzed) copy(plan = newChildren.head) + } override def innerChildren: Seq[QueryPlan[_]] = Seq(plan) @@ -249,8 +251,10 @@ case class AlterViewAsCommand( import ViewHelper._ override protected def withNewChildrenInternal( - newChildren: IndexedSeq[LogicalPlan]): AlterViewAsCommand = + newChildren: IndexedSeq[LogicalPlan]): AlterViewAsCommand = { + assert(!isAnalyzed) copy(query = newChildren.head) + } override def innerChildren: Seq[QueryPlan[_]] = Seq(query)