From ff7fabb55812cfdb5cfd596cc100cef6391f1381 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 22 Mar 2021 11:15:52 -0700 Subject: [PATCH 1/6] initial commit --- .../org/apache/spark/sql/execution/command/views.scala | 9 ++------- .../sql/execution/datasources/v2/CacheTableExec.scala | 7 ++++++- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index b302b268b3afc..19e7a8b0862d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -62,7 +62,7 @@ case class CreateViewCommand( comment: Option[String], properties: Map[String, String], originalText: Option[String], - child: LogicalPlan, + analyzedPlan: LogicalPlan, allowExisting: Boolean, replace: Boolean, viewType: ViewType) @@ -70,7 +70,7 @@ case class CreateViewCommand( import ViewHelper._ - override def innerChildren: Seq[QueryPlan[_]] = Seq(child) + override def innerChildren: Seq[QueryPlan[_]] = Seq(analyzedPlan) if (viewType == PersistedView) { require(originalText.isDefined, "'originalText' must be provided to create permanent view") @@ -96,11 +96,6 @@ case class CreateViewCommand( } override def run(sparkSession: SparkSession): Seq[Row] = { - // If the plan cannot be analyzed, throw an exception and don't proceed. - val qe = sparkSession.sessionState.executePlan(child) - qe.assertAnalyzed() - val analyzedPlan = qe.analyzed - if (userSpecifiedColumns.nonEmpty && userSpecifiedColumns.length != analyzedPlan.output.length) { throw new AnalysisException(s"The number of columns produced by the SELECT clause " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala index 56e008d1d95f8..4c23cb22aed25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -94,6 +94,11 @@ case class CacheTableAsSelectExec( override lazy val relationName: String = tempViewName override lazy val planToCache: LogicalPlan = { + // If the plan cannot be analyzed, throw an exception and don't proceed. + val qe = sparkSession.sessionState.executePlan(query) + qe.assertAnalyzed() + val analyzedPlan = qe.analyzed + Dataset.ofRows(sparkSession, CreateViewCommand( name = TableIdentifier(tempViewName), @@ -101,7 +106,7 @@ case class CacheTableAsSelectExec( comment = None, properties = Map.empty, originalText = Some(originalText), - child = query, + child = analyzedPlan, allowExisting = false, replace = false, viewType = LocalTempView From 5829e484f8538bd1529ac4386fff1848634c2997 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 22 Mar 2021 16:39:14 -0700 Subject: [PATCH 2/6] fix compilation --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../spark/sql/execution/datasources/v2/CacheTableExec.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index fd02d0b131587..edbf584fea2df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3370,7 +3370,7 @@ class Dataset[T] private[sql]( comment = None, properties = Map.empty, originalText = None, - child = logicalPlan, + analyzedPlan = logicalPlan, allowExisting = false, replace = replace, viewType = viewType) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala index 4c23cb22aed25..e30dea93b22ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -106,7 +106,7 @@ case class CacheTableAsSelectExec( comment = None, properties = Map.empty, originalText = Some(originalText), - child = analyzedPlan, + analyzedPlan = analyzedPlan, allowExisting = false, replace = false, viewType = LocalTempView From c0e972355af70e26c70e17bc4e6cf1aaadb83e49 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 22 Mar 2021 21:07:15 -0700 Subject: [PATCH 3/6] fix test --- .../sql/catalyst/analysis/CheckAnalysis.scala | 3 +++ .../sql/catalyst/plans/logical/Command.scala | 1 + .../spark/sql/execution/command/views.scala | 6 ++++-- .../apache/spark/sql/DataFrameJoinSuite.scala | 6 ++++++ .../spark/sql/execution/SQLViewSuite.scala | 18 ++++++++++++++++++ 5 files changed, 32 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 389bbb828da6f..f4383990248e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -167,6 +167,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { case _: ShowTableExtended => throw new AnalysisException("SHOW TABLE EXTENDED is not supported for v2 tables.") + case c: Command => + c.plansToCheckAnalysis.foreach(checkAnalysis) + case operator: LogicalPlan => // Check argument data types of higher-order functions downwards first. // If the arguments of the higher-order functions are resolved but the type check fails, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala index 89bd865391b5a..b7d8a00205d77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala @@ -28,6 +28,7 @@ trait Command extends LogicalPlan { override def output: Seq[Attribute] = Seq.empty override def producedAttributes: AttributeSet = outputSet override def children: Seq[LogicalPlan] = Seq.empty + def plansToCheckAnalysis: Seq[LogicalPlan] = Seq.empty // Commands are eagerly executed. They will be converted to LocalRelation after the DataFrame // is created. That said, the statistics of a command is useless. Here we just return a dummy // statistics to avoid unnecessary statistics calculation of command's children. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 19e7a8b0862d4..0afc5de48b6fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -48,8 +48,8 @@ import org.apache.spark.sql.util.SchemaUtils * @param properties the properties of this view. * @param originalText the original SQL text of this view, can be None if this view is created via * Dataset API. - * @param child the logical plan that represents the view; this is used to generate the logical - * plan for temporary view and the view schema. + * @param analyzedPlan the logical plan that represents the view; this is used to generate the + * logical plan for temporary view and the view schema. * @param allowExisting if true, and if the view already exists, noop; if false, and if the view * already exists, throws analysis exception. * @param replace if true, and if the view already exists, updates it; if false, and if the view @@ -70,6 +70,8 @@ case class CreateViewCommand( import ViewHelper._ + override def plansToCheckAnalysis: Seq[LogicalPlan] = Seq(analyzedPlan) + override def innerChildren: Seq[QueryPlan[_]] = Seq(analyzedPlan) if (viewType == PersistedView) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index c2555a1991414..35bbb9dc0a64b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -39,6 +39,12 @@ class DataFrameJoinSuite extends QueryTest with AdaptiveSparkPlanHelper { import testImplicits._ + test("terry") { + val df = Seq(1).toDF("`x.y`") + df.select("`x.y`").show + df.resolve("``x.y``") + } + test("join - join using") { val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str") val df2 = Seq(1, 2, 3).map(i => (i, (i + 1).toString)).toDF("int", "str") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index adb212d653ce9..486919b3b424c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -47,6 +47,24 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { } } + test("terry") { + withTable("view_base_table") { + withView("key_dependent_view") { + sql("CREATE TABLE view_base_table (key int, data varchar(20)) USING PARQUET") + sql("CREATE VIEW key_dependent_view AS SELECT * FROM view_base_table GROUP BY key") + } + } + } + + test("terry2") { + withTempView("tv") { + Seq(1).toDF.createTempView("tv") + withView("v1_temp") { + sql("CREATE VIEW v1_temp AS SELECT * FROM tv") + } + } + } + test("create a permanent view on a permanent view") { withView("jtv1", "jtv2") { sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3") From 6fdd9e0edc924679420d3c64472b72e83bb6006f Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 22 Mar 2021 21:08:31 -0700 Subject: [PATCH 4/6] update --- .../apache/spark/sql/DataFrameJoinSuite.scala | 6 ------ .../spark/sql/execution/SQLViewSuite.scala | 18 ------------------ 2 files changed, 24 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 35bbb9dc0a64b..c2555a1991414 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -39,12 +39,6 @@ class DataFrameJoinSuite extends QueryTest with AdaptiveSparkPlanHelper { import testImplicits._ - test("terry") { - val df = Seq(1).toDF("`x.y`") - df.select("`x.y`").show - df.resolve("``x.y``") - } - test("join - join using") { val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str") val df2 = Seq(1, 2, 3).map(i => (i, (i + 1).toString)).toDF("int", "str") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 486919b3b424c..adb212d653ce9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -47,24 +47,6 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { } } - test("terry") { - withTable("view_base_table") { - withView("key_dependent_view") { - sql("CREATE TABLE view_base_table (key int, data varchar(20)) USING PARQUET") - sql("CREATE VIEW key_dependent_view AS SELECT * FROM view_base_table GROUP BY key") - } - } - } - - test("terry2") { - withTempView("tv") { - Seq(1).toDF.createTempView("tv") - withView("v1_temp") { - sql("CREATE VIEW v1_temp AS SELECT * FROM tv") - } - } - } - test("create a permanent view on a permanent view") { withView("jtv1", "jtv2") { sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3") From e04fbdda9e088173c956e24d4214f17bca640e29 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 23 Mar 2021 10:20:13 -0700 Subject: [PATCH 5/6] address comments --- .../sql/catalyst/analysis/CheckAnalysis.scala | 15 ++++++++++++--- .../sql/catalyst/plans/logical/Command.scala | 1 - .../spark/sql/execution/command/views.scala | 2 -- .../execution/datasources/v2/CacheTableExec.scala | 4 +++- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index f4383990248e8..d5dc6e7ca201a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -167,9 +167,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { case _: ShowTableExtended => throw new AnalysisException("SHOW TABLE EXTENDED is not supported for v2 tables.") - case c: Command => - c.plansToCheckAnalysis.foreach(checkAnalysis) - case operator: LogicalPlan => // Check argument data types of higher-order functions downwards first. // If the arguments of the higher-order functions are resolved but the type check fails, @@ -686,6 +683,18 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { case _ => // Analysis successful! } } + + // Check analysis on internal nodes. + plan match { + case c: Command => + c.innerChildren.foreach { + case l: LogicalPlan => checkAnalysis(l) + case _ => + } + + case _ => // Analysis successful! + } + checkCollectedMetrics(plan) extendedCheckRules.foreach(_(plan)) plan.foreachUp { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala index b7d8a00205d77..89bd865391b5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala @@ -28,7 +28,6 @@ trait Command extends LogicalPlan { override def output: Seq[Attribute] = Seq.empty override def producedAttributes: AttributeSet = outputSet override def children: Seq[LogicalPlan] = Seq.empty - def plansToCheckAnalysis: Seq[LogicalPlan] = Seq.empty // Commands are eagerly executed. They will be converted to LocalRelation after the DataFrame // is created. That said, the statistics of a command is useless. Here we just return a dummy // statistics to avoid unnecessary statistics calculation of command's children. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 0afc5de48b6fd..fc376ed0fc2d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -70,8 +70,6 @@ case class CreateViewCommand( import ViewHelper._ - override def plansToCheckAnalysis: Seq[LogicalPlan] = Seq(analyzedPlan) - override def innerChildren: Seq[QueryPlan[_]] = Seq(analyzedPlan) if (viewType == PersistedView) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala index e30dea93b22ba..0c0a0227f99b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -94,7 +94,9 @@ case class CacheTableAsSelectExec( override lazy val relationName: String = tempViewName override lazy val planToCache: LogicalPlan = { - // If the plan cannot be analyzed, throw an exception and don't proceed. + // CacheTableAsSelectExec.query is not resolved yet (e.g., not a child of CacheTableAsSelect) + // in order to skip optimizing it; note that we need to pass an analyzed plan to + // CreateViewCommand for the cache to work correctly. Thus, the query is analyzed below. val qe = sparkSession.sessionState.executePlan(query) qe.assertAnalyzed() val analyzedPlan = qe.analyzed From a14b3b921a778d1b4236945d64a1e62648ba1430 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 24 Mar 2021 08:44:52 -0700 Subject: [PATCH 6/6] address PR comment --- .../sql/catalyst/analysis/CheckAnalysis.scala | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index d5dc6e7ca201a..c699942ab55ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -680,21 +680,15 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { |Invalid expressions: [${invalidExprSqls.mkString(", ")}]""".stripMargin ) - case _ => // Analysis successful! - } - } + case c: Command => + c.innerChildren.foreach { + case l: LogicalPlan => checkAnalysis(l) + case _ => + } - // Check analysis on internal nodes. - plan match { - case c: Command => - c.innerChildren.foreach { - case l: LogicalPlan => checkAnalysis(l) - case _ => + case _ => // Analysis successful! } - - case _ => // Analysis successful! } - checkCollectedMetrics(plan) extendedCheckRules.foreach(_(plan)) plan.foreachUp {