Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34701][SQL] Introduce AnalysisOnlyCommand that allows its children to be removed once the command is marked as analyzed. #32032

Closed
wants to merge 15 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ class Analyzer(override val catalogManager: CatalogManager)
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
CleanupAliases)
CleanupAliases),
Batch("HandleAnalysisOnlyCommand", Once,
HandleAnalysisOnlyCommand)
)

/**
Expand Down Expand Up @@ -3637,6 +3639,19 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}
}

/**
* A rule that marks a command as analyzed so that its children are removed to avoid
* being optimized. This rule should run after all other analysis rules are run.
*/
object HandleAnalysisOnlyCommand extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case c: AnalysisOnlyCommand if c.resolved =>
checkAnalysis(c)
c.markAsAnalyzed()
c
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, UnaryLike}

Expand All @@ -37,3 +38,35 @@ trait Command extends LogicalPlan {
trait LeafCommand extends Command with LeafLike[LogicalPlan]
trait UnaryCommand extends Command with UnaryLike[LogicalPlan]
trait BinaryCommand extends Command with BinaryLike[LogicalPlan]

/**
* A logical node that represents a command whose children are only analyzed, but not optimized.
*/
trait AnalysisOnlyCommand extends Command {
private var _isAnalyzed: Boolean = false

def childrenToAnalyze: Seq[LogicalPlan]

override def children: Seq[LogicalPlan] = if (_isAnalyzed) Nil else childrenToAnalyze

def markAsAnalyzed(): Unit = {
if (!_isAnalyzed) {
// Since children will be removed once _isAnalyzed is set, ensure that there are no
// unresolved children before removing them.
if (childrenToAnalyze.exists(!_.resolved)) {
throw new AnalysisException(
"AnalysisOnlyCommand can be marked as analyzed only after all its children " +
"to analyze are resolved.")
}
_isAnalyzed = true
}
}

override def clone(): LogicalPlan = {
val cloned = super.clone()
if (_isAnalyzed) {
cloned.asInstanceOf[AnalysisOnlyCommand].markAsAnalyzed()
}
cloned
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,9 @@ case class CacheTableAsSelect(
plan: LogicalPlan,
originalText: String,
isLazy: Boolean,
options: Map[String, String]) extends LeafCommand
options: Map[String, String]) extends AnalysisOnlyCommand {
override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil
}

/**
* The logical plan of the UNCACHE TABLE command.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,45 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
tags -= tag
}

private var cachedChildren: Seq[BaseType] = null

/**
* Returns a Seq of the children of this node.
* Children should not change. Immutability required for containsChild optimization
*/
def children: Seq[BaseType]

lazy val containsChild: Set[TreeNode[_]] = children.toSet
private var cachedChildrenSet: Set[TreeNode[_]] = null

private def containsChild: Set[TreeNode[_]] = {
if (updateChildrenIfChanged() || cachedChildrenSet == null) {
cachedChildrenSet = cachedChildren.toSet
}
cachedChildrenSet
}

@transient private var cachedAllChildren: Set[TreeNode[_]] = null

private def allChildren: Set[TreeNode[_]] = {
if (updateChildrenIfChanged() || cachedAllChildren == null) {
cachedAllChildren = (cachedChildren ++ innerChildren).toSet[TreeNode[_]]
}
cachedAllChildren
}

// Returns true if children have changed after updating the cached children.
private def updateChildrenIfChanged(): Boolean = {
def fastEquals(l: Seq[BaseType], r: Seq[BaseType]): Boolean = {
l.length == r.length && l.zip(r).forall {
case (a1, a2) => a1 fastEquals a2
}
}
val curChildren = children
val changed = cachedChildren == null || !fastEquals(cachedChildren, children)
if (changed) {
cachedChildren = curChildren
}
changed
}

// Copied from Scala 2.13.1
// github.com/scala/scala/blob/v2.13.1/src/library/scala/util/hashing/MurmurHash3.scala#L56-L73
Expand Down Expand Up @@ -532,8 +564,6 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
*/
protected def stringArgs: Iterator[Any] = productIterator

private lazy val allChildren: Set[TreeNode[_]] = (children ++ innerChildren).toSet[TreeNode[_]]

/** Returns a string representing the arguments to this node, minus any children */
def argString(maxFields: Int): String = stringArgs.flatMap {
case tn: TreeNode[_] if allChildren.contains(tn) => Nil
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3370,7 +3370,7 @@ class Dataset[T] private[sql](
comment = None,
properties = Map.empty,
originalText = None,
child = logicalPlan,
plan = logicalPlan,
allowExisting = false,
replace = replace,
viewType = viewType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

case CreateViewStatement(
tbl, userSpecifiedColumns, comment, properties,
originalText, child, allowExisting, replace, viewType) if child.resolved =>
originalText, child, allowExisting, replace, viewType) =>

val v1TableName = if (viewType != PersistedView) {
// temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
Expand All @@ -491,15 +491,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
parseV1Table(tbl, "CREATE VIEW")
}
CreateViewCommand(
v1TableName.asTableIdentifier,
userSpecifiedColumns,
comment,
properties,
originalText,
child,
allowExisting,
replace,
viewType)
name = v1TableName.asTableIdentifier,
userSpecifiedColumns = userSpecifiedColumns,
comment = comment,
properties = properties,
originalText = originalText,
plan = child,
allowExisting = allowExisting,
replace = replace,
viewType = viewType)

case ShowViews(resolved: ResolvedNamespace, pattern, output) =>
resolved match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
case r: BaseRunnableCommand => ExecutedCommandExec(r) :: Nil

case MemoryPlan(sink, output) =>
val encoder = RowEncoder(StructType.fromAttributes(output))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LeafCommand, LogicalPlan}
import org.apache.spark.sql.connector.ExternalCommandRunner
import org.apache.spark.sql.execution.{ExplainMode, LeafExecNode, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.SQLMetric
Expand All @@ -34,10 +34,10 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* A logical command that is executed for its side-effects. `RunnableCommand`s are
* A logical command that is executed for its side-effects. `BaseRunnableCommand`s are
* wrapped in `ExecutedCommand` during execution.
*/
trait RunnableCommand extends LeafCommand {
trait BaseRunnableCommand extends Command {

// The map used to record the metrics of running the command. This will be passed to
// `ExecutedCommand` during query planning.
Expand All @@ -47,12 +47,17 @@ trait RunnableCommand extends LeafCommand {
}

/**
* A physical operator that executes the run method of a `RunnableCommand` and
* A runnable command that has no children.
*/
trait RunnableCommand extends BaseRunnableCommand with LeafCommand

/**
* A physical operator that executes the run method of a `BaseRunnableCommand` and
* saves the result to prevent multiple executions.
*
* @param cmd the `RunnableCommand` this operator will run.
*/
case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode {
case class ExecutedCommandExec(cmd: BaseRunnableCommand) extends LeafExecNode {

override lazy val metrics: Map[String, SQLMetric] = cmd.metrics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, Pe
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
Expand All @@ -48,8 +48,8 @@ import org.apache.spark.sql.util.SchemaUtils
* @param properties the properties of this view.
* @param originalText the original SQL text of this view, can be None if this view is created via
* Dataset API.
* @param child the logical plan that represents the view; this is used to generate the logical
* plan for temporary view and the view schema.
* @param plan the logical plan that represents the view; this is used to generate the logical
* plan for temporary view and the view schema.
* @param allowExisting if true, and if the view already exists, noop; if false, and if the view
* already exists, throws analysis exception.
* @param replace if true, and if the view already exists, updates it; if false, and if the view
Expand All @@ -62,15 +62,17 @@ case class CreateViewCommand(
comment: Option[String],
properties: Map[String, String],
originalText: Option[String],
child: LogicalPlan,
plan: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
viewType: ViewType)
extends RunnableCommand {
viewType: ViewType) extends BaseRunnableCommand with AnalysisOnlyCommand {

import ViewHelper._

override def innerChildren: Seq[QueryPlan[_]] = Seq(child)
override def innerChildren: Seq[QueryPlan[_]] = Seq(plan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we only include it as inner children only when isAnalyzed = true? Maybe we can put it in the base class, as it's better to include the plan in inner children to show it in the UI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan which UI are you referring to? If you are referring to the Spark UI, I see the following and it shows the same even if I use override def innerChildren: Seq[QueryPlan[_]] = if (isAnalyzed) Seq(plan) else Nil:
Screen Shot 2021-04-23 at 9 30 57 PM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about the "optimized plan" in the EXPLAIN result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the following for the both cases. Were you expecting something different?

== Parsed Logical Plan ==
'CreateViewStatement [test], SELECT 1, false, false, PersistedView
+- 'Project [unresolvedalias(1, None)]
   +- OneRowRelation

== Analyzed Logical Plan ==

CreateViewCommand `default`.`test`, SELECT 1, false, false, PersistedView, true
   +- Project [1 AS 1#7]
      +- OneRowRelation

== Optimized Logical Plan ==
CreateViewCommand `default`.`test`, SELECT 1, false, false, PersistedView, true
   +- Project [1 AS 1#7]
      +- OneRowRelation

== Physical Plan ==
Execute CreateViewCommand
   +- CreateViewCommand `default`.`test`, SELECT 1, false, false, PersistedView, true
         +- Project [1 AS 1#7]
            +- OneRowRelation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting. I thought there will be problems if a plan is in both children and innerChildren, but seems we are fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the reason is that when explain runs, CreateViewCommand already went thru the Analyzer and its children is removed (note the true above, which is isAnalyzed).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the "Parsed Logical Plan" should be unresolved and the plan is in both children and innerChildren.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Parsed Logical Plan" is CreateViewStatement, not CreateViewCommand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah now I get it. If there is a place that creates CreateViewCommand directly, we get a problem.

I think it's safer to avoid that to be future-proof. e.g.

override def innerChildren: Seq[QueryPlan[_]] = if (analyzed) Seq(plan) else Nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I will do a follow up PR.


// `plan` needs to be analyzed, but shouldn't be optimized so that caching works correctly.
override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil

if (viewType == PersistedView) {
require(originalText.isDefined, "'originalText' must be provided to create permanent view")
Expand All @@ -96,10 +98,10 @@ case class CreateViewCommand(
}

override def run(sparkSession: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = sparkSession.sessionState.executePlan(child)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
if (children.nonEmpty) {
throw new AnalysisException("The logical plan to represent the view is not analyzed.")
}
val analyzedPlan = plan

if (userSpecifiedColumns.nonEmpty &&
userSpecifiedColumns.length != analyzedPlan.output.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ case class CacheTableAsSelectExec(
comment = None,
properties = Map.empty,
originalText = Some(originalText),
child = query,
plan = query,
allowExisting = false,
replace = false,
viewType = LocalTempView
Expand Down