Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34152][SQL] Make CreateViewStatement.child to be LogicalPlan's children so that it's resolved in analyze phase #31273

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
db4dfaf
initial commit
imback82 Jan 19, 2021
71c01e8
fix tests
imback82 Jan 21, 2021
8dc1961
wrap with View
imback82 Jan 23, 2021
7becf96
Merge branch 'master' into spark-34152
imback82 Jan 25, 2021
4b3f184
trying with View and optional CatalogTable
imback82 Jan 25, 2021
9085d17
test update
imback82 Jan 26, 2021
f2b86a9
Merge remote-tracking branch 'upstream/master' into spark-34152
imback82 Jan 28, 2021
bfabe9f
test fix
imback82 Jan 28, 2021
1f2e4c7
Fix explain-aqe.sql output
imback82 Jan 29, 2021
0b75fdc
fix test
imback82 Jan 29, 2021
ac663aa
Merge branch 'master' into spark-34152
imback82 Jan 31, 2021
30e750c
fix explain.sql
imback82 Jan 31, 2021
0fbf1cf
Fix tests
imback82 Jan 31, 2021
a8581c6
revert RemoveRedundantProjectsSuite.scala test
imback82 Jan 31, 2021
4e77301
rework on the udf
imback82 Jan 31, 2021
60cf521
clean up
imback82 Feb 1, 2021
a0ba508
clean up
imback82 Feb 1, 2021
1949cbc
Merge branch 'master' into spark-34152
imback82 Feb 6, 2021
5e38e2b
Merge branch 'master' into spark-34152
imback82 Feb 12, 2021
388ec16
Fix build issue
imback82 Feb 12, 2021
7e92eeb
fix
imback82 Feb 12, 2021
fbdcf3b
fix test
imback82 Feb 13, 2021
2a3e5b7
Fix test
imback82 Feb 13, 2021
e92cbe1
refinement
imback82 Feb 13, 2021
8422ae8
revert AnalysisContext change
imback82 Feb 18, 2021
bace91a
clean up
imback82 Feb 18, 2021
9432fec
Merge remote-tracking branch 'upstream/master' into spark-34152
imback82 Feb 19, 2021
737307c
Address PR comments
imback82 Feb 19, 2021
d906e19
Merge remote-tracking branch 'upstream/master' into spark-34152
imback82 Feb 19, 2021
4e55a1f
reduce churn
imback82 Feb 19, 2021
e9cd2f3
revert removing def
imback82 Feb 19, 2021
9da3b9c
fix tests
imback82 Feb 20, 2021
e36fc34
Revert View signature change
imback82 Feb 23, 2021
b54df8a
reduce churn
imback82 Feb 23, 2021
c5651b2
reduce churn
imback82 Feb 23, 2021
d4e958a
address PR comments
imback82 Feb 24, 2021
84d817c
Merge remote-tracking branch 'upstream/master' into spark-34152
imback82 Feb 24, 2021
82d58ba
Address PR comments
imback82 Feb 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,20 @@ class Analyzer(override val catalogManager: CatalogManager)
}

private def isResolvingView: Boolean = AnalysisContext.get.catalogAndNamespace.nonEmpty
private def referredTempViewNames: Seq[Seq[String]] = AnalysisContext.get.referredTempViewNames
private def isReferredTempViewName(nameParts: Seq[String]): Boolean = {
AnalysisContext.get.referredTempViewNames.exists { n =>
(n.length == nameParts.length) && n.zip(nameParts).forall {
case (a, b) => resolver(a, b)
}
}
}

private def unwrapRelationPlan(plan: LogicalPlan): LogicalPlan = {
EliminateSubqueryAliases(plan) match {
case v: View if v.isDataFrameTempView => v.child
case other => other
}
}

/**
* Resolve relations to temp views. This is not an actual rule, and is called by
Expand All @@ -887,7 +900,7 @@ class Analyzer(override val catalogManager: CatalogManager)
case write: V2WriteCommand =>
write.table match {
case UnresolvedRelation(ident, _, false) =>
lookupTempView(ident, performCheck = true).map(EliminateSubqueryAliases(_)).map {
lookupTempView(ident, performCheck = true).map(unwrapRelationPlan).map {
case r: DataSourceV2Relation => write.withNewTable(r)
case _ => throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(ident.quoted)
}.getOrElse(write)
Expand Down Expand Up @@ -924,7 +937,7 @@ class Analyzer(override val catalogManager: CatalogManager)
isStreaming: Boolean = false,
performCheck: Boolean = false): Option[LogicalPlan] = {
// Permanent View can't refer to temp views, no need to lookup at all.
if (isResolvingView && !referredTempViewNames.contains(identifier)) return None
if (isResolvingView && !isReferredTempViewName(identifier)) return None

val tmpView = identifier match {
case Seq(part1) => v1SessionCatalog.lookupTempView(part1)
Expand All @@ -942,7 +955,7 @@ class Analyzer(override val catalogManager: CatalogManager)
// If we are resolving relations insides views, we need to expand single-part relation names with
// the current catalog and namespace of when the view was created.
private def expandRelationName(nameParts: Seq[String]): Seq[String] = {
if (!isResolvingView || referredTempViewNames.contains(nameParts)) return nameParts
if (!isResolvingView || isReferredTempViewName(nameParts)) return nameParts

if (nameParts.length == 1) {
AnalysisContext.get.catalogAndNamespace :+ nameParts.head
Expand Down Expand Up @@ -1139,7 +1152,10 @@ class Analyzer(override val catalogManager: CatalogManager)
case other => other
}

EliminateSubqueryAliases(relation) match {
// Inserting into a file-based temporary view is allowed.
// (e.g., spark.read.parquet("path").createOrReplaceTempView("t").
// Thus, we need to look at the raw plan if `relation` is a temporary view.
unwrapRelationPlan(relation) match {
case v: View =>
throw QueryCompilationErrors.insertIntoViewNotAllowedError(v.desc.identifier, table)
case other => i.copy(table = other)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ object UnsupportedOperationChecker extends Logging {
case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
_: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias |
_: TypedFilter) =>
case v: View if v.isDataFrameTempView =>
case node if node.nodeName == "StreamingRelationV2" =>
case node =>
throwError(s"Continuous processing does not support ${node.nodeName} operations.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,7 @@ class SessionCatalog(
}

/**
* Generate a [[View]] operator from the view description if the view stores sql text,
* otherwise, it is same to `getRawTempView`
* Generate a [[View]] operator from the temporary view stored.
*/
def getTempView(name: String): Option[LogicalPlan] = synchronized {
getRawTempView(name).map(getTempViewPlan)
Expand All @@ -641,8 +640,7 @@ class SessionCatalog(
}

/**
* Generate a [[View]] operator from the view description if the view stores sql text,
* otherwise, it is same to `getRawGlobalTempView`
* Generate a [[View]] operator from the global temporary view stored.
*/
def getGlobalTempView(name: String): Option[LogicalPlan] = {
getRawGlobalTempView(name).map(getTempViewPlan)
Expand Down Expand Up @@ -683,7 +681,7 @@ class SessionCatalog(
val table = formatTableName(name.table)
if (name.database.isEmpty) {
tempViews.get(table).map {
case TemporaryViewRelation(metadata) => metadata
case TemporaryViewRelation(metadata, _) => metadata
case plan =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CatalogTable(
identifier = TableIdentifier(table),
Expand All @@ -693,7 +691,7 @@ class SessionCatalog(
}.getOrElse(getTableMetadata(name))
} else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
globalTempViewManager.get(table).map {
case TemporaryViewRelation(metadata) => metadata
case TemporaryViewRelation(metadata, _) => metadata
case plan =>
CatalogTable(
identifier = TableIdentifier(table, Some(globalTempViewManager.database)),
Expand Down Expand Up @@ -838,9 +836,11 @@ class SessionCatalog(

private def getTempViewPlan(plan: LogicalPlan): LogicalPlan = {
plan match {
case viewInfo: TemporaryViewRelation =>
fromCatalogTable(viewInfo.tableMeta, isTempView = true)
case v => v
case TemporaryViewRelation(tableMeta, None) =>
fromCatalogTable(tableMeta, isTempView = true)
case TemporaryViewRelation(tableMeta, Some(plan)) =>
View(desc = tableMeta, isTempView = true, child = plan)
case other => other
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the current PR's approach is fine, we can remove this case other => other and change createTempView to use tableDefinition: TemporaryViewRelation instead of tableDefinition: LogicalPlan once we migrate ALTER VIEW AS and CREATE TEMP VIEW USING.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
Expand Down Expand Up @@ -467,6 +468,8 @@ object CatalogTable {
val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames"
val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames"

val VIEW_CREATED_FROM_DATAFRAME = VIEW_PREFIX + "createdFromDataFrame"

def splitLargeTableProp(
key: String,
value: String,
Expand Down Expand Up @@ -779,9 +782,15 @@ case class UnresolvedCatalogRelation(

/**
* A wrapper to store the temporary view info, will be kept in `SessionCatalog`
* and will be transformed to `View` during analysis
* and will be transformed to `View` during analysis. If the temporary view was
* created from a dataframe, `plan` is set to the analyzed plan for the view.
*/
case class TemporaryViewRelation(tableMeta: CatalogTable) extends LeafNode {
case class TemporaryViewRelation(
tableMeta: CatalogTable,
plan: Option[LogicalPlan] = None) extends LeafNode {
require(plan.isEmpty ||
(plan.get.resolved && tableMeta.properties.contains(VIEW_CREATED_FROM_DATAFRAME)))

override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.AliasIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -443,21 +444,25 @@ case class InsertIntoDir(
}

/**
* A container for holding the view description(CatalogTable), and the output of the view. The
* child should be a logical plan parsed from the `CatalogTable.viewText`, should throw an error
* if the `viewText` is not defined.
* A container for holding the view description(CatalogTable) and info whether the view is temporary
* or not. If it's a SQL (temp) view, the child should be a logical plan parsed from the
* `CatalogTable.viewText`. Otherwise, the view is a temporary one created from a dataframe and the
* view description should contain a `VIEW_CREATED_FROM_DATAFRAME` property; in this case, the child
* must be already resolved.
*
* This operator will be removed at the end of analysis stage.
*
* @param desc A view description(CatalogTable) that provides necessary information to resolve the
* view.
* we are able to decouple the output from the underlying structure.
* @param child The logical plan of a view operator, it should be a logical plan parsed from the
* `CatalogTable.viewText`, should throw an error if the `viewText` is not defined.
* @param isTempView A flag to indicate whether the view is temporary or not.
* @param child The logical plan of a view operator. If the view description is available, it should
* be a logical plan parsed from the `CatalogTable.viewText`.
*/
case class View(
desc: CatalogTable,
isTempView: Boolean,
child: LogicalPlan) extends UnaryNode {
require(!isDataFrameTempView || child.resolved)

override def output: Seq[Attribute] = child.output

Expand All @@ -470,6 +475,9 @@ case class View(
case _ => child.canonicalized
}

def isDataFrameTempView: Boolean =
isTempView && desc.properties.contains(VIEW_CREATED_FROM_DATAFRAME)

// When resolving a SQL view, we use an extra Project to add cast and alias to make sure the view
// output schema doesn't change even if the table referenced by the view is changed after view
// creation. We should remove this extra Project during canonicalize if it does nothing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ case class CreateViewStatement(
child: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
viewType: ViewType) extends ParsedStatement
viewType: ViewType) extends ParsedStatement {

override def children: Seq[LogicalPlan] = Seq(child)
}

/**
* A REPLACE TABLE command, as parsed from SQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {
Project(Seq(UnresolvedAttribute("a")), testRelation),
Project(testRelation.output, testRelation))

checkAnalysis(
checkAnalysisWithoutViewWrapper(
Project(Seq(UnresolvedAttribute("TbL.a")),
SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")))),
Project(testRelation.output, testRelation))
Expand All @@ -105,13 +105,13 @@ class AnalysisSuite extends AnalysisTest with Matchers {
SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")))),
Seq("cannot resolve"))

checkAnalysis(
checkAnalysisWithoutViewWrapper(
Project(Seq(UnresolvedAttribute("TbL.a")),
SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")))),
Project(testRelation.output, testRelation),
caseSensitive = false)

checkAnalysis(
checkAnalysisWithoutViewWrapper(
Project(Seq(UnresolvedAttribute("tBl.a")),
SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")))),
Project(testRelation.output, testRelation),
Expand Down Expand Up @@ -203,10 +203,10 @@ class AnalysisSuite extends AnalysisTest with Matchers {

test("resolve relations") {
assertAnalysisError(UnresolvedRelation(TableIdentifier("tAbLe")), Seq())
checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE")), testRelation)
checkAnalysis(
checkAnalysisWithoutViewWrapper(UnresolvedRelation(TableIdentifier("TaBlE")), testRelation)
checkAnalysisWithoutViewWrapper(
UnresolvedRelation(TableIdentifier("tAbLe")), testRelation, caseSensitive = false)
checkAnalysis(
checkAnalysisWithoutViewWrapper(
UnresolvedRelation(TableIdentifier("TaBlE")), testRelation, caseSensitive = false)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ trait AnalysisTest extends PlanTest {
}
}

protected def checkAnalysisWithoutViewWrapper(
inputPlan: LogicalPlan,
expectedPlan: LogicalPlan,
caseSensitive: Boolean = true): Unit = {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
val actualPlan = getAnalyzer.executeAndCheck(inputPlan, new QueryPlanningTracker)
val transformed = actualPlan transformUp {
case v: View if v.isDataFrameTempView => v.child
}
comparePlans(transformed, expectedPlan)
}
}

protected override def comparePlans(
plan1: LogicalPlan,
plan2: LogicalPlan,
Expand Down
Loading