Skip to content

Commit

Permalink
[SPARK-35122][SQL] Migrate CACHE/UNCACHE TABLE to use AnalysisOnlyCom…
Browse files Browse the repository at this point in the history
…mand

### What changes were proposed in this pull request?

Now that `AnalysisOnlyCommand` in introduced in #32032, `CacheTable` and `UncacheTable` can extend `AnalysisOnlyCommand` to simplify the code base. For example, the logic to handle these commands such that the tables are only analyzed is scattered across different places.

### Why are the changes needed?

To simplify the code base to handle these two commands.

### Does this PR introduce _any_ user-facing change?

No, just internal refactoring.

### How was this patch tested?

The existing tests (e.g., `CachedTableSuite`) cover the changes in this PR. For example, if I make `CacheTable`/`UncacheTable` extend `LeafCommand`, there are few failures in `CachedTableSuite`.

Closes #32220 from imback82/cache_cmd_analysis_only.

Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
imback82 authored and cloud-fan committed Apr 19, 2021
1 parent c8d78a7 commit 7a06cdd
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -836,14 +836,6 @@ class Analyzer(override val catalogManager: CatalogManager)
lookupAndResolveTempView(ident)
.map(view => i.copy(table = view))
.getOrElse(i)
case c @ CacheTable(UnresolvedRelation(ident, _, false), _, _, _) =>
lookupAndResolveTempView(ident)
.map(view => c.copy(table = view))
.getOrElse(c)
case c @ UncacheTable(UnresolvedRelation(ident, _, false), _, _) =>
lookupAndResolveTempView(ident)
.map(view => c.copy(table = view, isTempView = true))
.getOrElse(c)
// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
Expand Down Expand Up @@ -1022,16 +1014,6 @@ class Analyzer(override val catalogManager: CatalogManager)
.map(v2Relation => i.copy(table = v2Relation))
.getOrElse(i)

case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) =>
lookupV2Relation(u.multipartIdentifier, u.options, false)
.map(v2Relation => c.copy(table = v2Relation))
.getOrElse(c)

case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
lookupV2Relation(u.multipartIdentifier, u.options, false)
.map(v2Relation => c.copy(table = v2Relation))
.getOrElse(c)

// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
Expand Down Expand Up @@ -1129,20 +1111,6 @@ class Analyzer(override val catalogManager: CatalogManager)
case other => i.copy(table = other)
}

case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) =>
lookupRelation(u.multipartIdentifier, u.options, false)
.map(resolveViews)
.map(EliminateSubqueryAliases(_))
.map(relation => c.copy(table = relation))
.getOrElse(c)

case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
lookupRelation(u.multipartIdentifier, u.options, false)
.map(resolveViews)
.map(EliminateSubqueryAliases(_))
.map(relation => c.copy(table = relation))
.getOrElse(c)

// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _) =>
u.failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")

case CacheTable(u: UnresolvedRelation, _, _, _) =>
u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")

case UncacheTable(u: UnresolvedRelation, _, _) =>
u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")

// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand if write.table.isInstanceOf[UnresolvedRelation] =>
val tblName = write.table.asInstanceOf[UnresolvedRelation].multipartIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,18 @@ case class CacheTable(
table: LogicalPlan,
multipartIdentifier: Seq[String],
isLazy: Boolean,
options: Map[String, String]) extends LeafCommand
options: Map[String, String],
isAnalyzed: Boolean = false) extends AnalysisOnlyCommand {
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): CacheTable = {
assert(!isAnalyzed)
copy(table = newChildren.head)
}

override def childrenToAnalyze: Seq[LogicalPlan] = table :: Nil

override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
}

/**
* The logical plan of the CACHE TABLE ... AS SELECT command.
Expand Down Expand Up @@ -1041,7 +1052,17 @@ case class CacheTableAsSelect(
case class UncacheTable(
table: LogicalPlan,
ifExists: Boolean,
isTempView: Boolean = false) extends LeafCommand
isAnalyzed: Boolean = false) extends AnalysisOnlyCommand {
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): UncacheTable = {
assert(!isAnalyzed)
copy(table = newChildren.head)
}

override def childrenToAnalyze: Seq[LogicalPlan] = table :: Nil

override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
}

/**
* The logical plan of the ALTER TABLE ... SET LOCATION command.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.{CacheTable, InsertIntoDir, InsertIntoStatement, LogicalPlan, Project, UncacheTable}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.connector.catalog.SupportsRead
Expand Down Expand Up @@ -271,20 +271,6 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _, _, _) =>
i.copy(table = DDLUtils.readHiveTable(tableMeta))

case c @ CacheTable(UnresolvedCatalogRelation(tableMeta, options, false), _, _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
c.copy(table = readDataSourceTable(tableMeta, options))

case c @ CacheTable(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _) =>
c.copy(table = DDLUtils.readHiveTable(tableMeta))

case u @ UncacheTable(UnresolvedCatalogRelation(tableMeta, options, false), _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
u.copy(table = readDataSourceTable(tableMeta, options))

case u @ UncacheTable(UnresolvedCatalogRelation(tableMeta, _, false), _, _) =>
u.copy(table = DDLUtils.readHiveTable(tableMeta))

case UnresolvedCatalogRelation(tableMeta, options, false)
if DDLUtils.isDatasourceTable(tableMeta) =>
readDataSourceTable(tableMeta, options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
CacheTableAsSelectExec(r.tempViewName, r.plan, r.originalText, r.isLazy, r.options) :: Nil

case r: UncacheTable =>
UncacheTableExec(r.table, cascade = !r.isTempView) :: Nil
def isTempView(table: LogicalPlan): Boolean = table match {
case SubqueryAlias(_, v: View) => v.isTempView
case _ => false
}
UncacheTableExec(r.table, cascade = !isTempView(r.table)) :: Nil

case SetTableLocation(table: ResolvedTable, partitionSpec, location) =>
if (partitionSpec.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{CacheTable, InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics, UncacheTable}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -231,16 +231,6 @@ case class RelationConversions(
assertNoNullTypeInSchema(query.schema)
OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)

// Cache table
case c @ CacheTable(relation: HiveTableRelation, _, _, _)
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
c.copy(table = metastoreCatalog.convert(relation))

// Uncache table
case u @ UncacheTable(relation: HiveTableRelation, _, _)
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
u.copy(table = metastoreCatalog.convert(relation))
}
}
}
Expand Down

0 comments on commit 7a06cdd

Please sign in to comment.