From 10ee51630ff4925b655c0379f7929faea08c7329 Mon Sep 17 00:00:00 2001
From: Eduard Tudenhoefner <etudenhoefner@gmail.com>
Date: Mon, 29 Jan 2024 20:39:20 +0100
Subject: [PATCH] Spark 3.4: Support creating views via SQL (#9580)

---
 .../IcebergSparkSessionExtensions.scala       |   2 +
 .../sql/catalyst/analysis/CheckViews.scala    |  56 ++++
 .../sql/catalyst/analysis/ResolveViews.scala  |  28 +-
 .../analysis/RewriteViewCommands.scala        |  59 ++++
 .../logical/views/CreateIcebergView.scala     |  44 +++
 .../datasources/v2/CreateV2ViewExec.scala     |  97 +++++++
 .../v2/ExtendedDataSourceV2Strategy.scala     |  16 ++
 .../iceberg/spark/extensions/TestViews.java   | 263 ++++++++++++++++++
 .../apache/iceberg/spark/SparkCatalog.java    |  31 +++
 .../iceberg/spark/source/SparkView.java       |   7 +-
 10 files changed, 600 insertions(+), 3 deletions(-)
 create mode 100644 spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala
 create mode 100644 spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala
 create mode 100644 spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala

diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
index 48c0c1373e17..af6a64911ecb 100644
--- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
+++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.SparkSessionExtensions
 import org.apache.spark.sql.catalyst.analysis.AlignedRowLevelIcebergCommandCheck
 import org.apache.spark.sql.catalyst.analysis.AlignRowLevelCommandAssignments
 import org.apache.spark.sql.catalyst.analysis.CheckMergeIntoTableConditions
+import org.apache.spark.sql.catalyst.analysis.CheckViews
 import org.apache.spark.sql.catalyst.analysis.MergeIntoIcebergTableResolutionCheck
 import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion
 import org.apache.spark.sql.catalyst.analysis.ResolveMergeIntoTableReferences
@@ -55,6 +56,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
     extensions.injectResolutionRule { _ => AlignRowLevelCommandAssignments }
     extensions.injectResolutionRule { _ => RewriteUpdateTable }
     extensions.injectResolutionRule { _ => RewriteMergeIntoTable }
+    extensions.injectCheckRule { _ => CheckViews }
     extensions.injectCheckRule { _ => MergeIntoIcebergTableResolutionCheck }
     extensions.injectCheckRule { _ => AlignedRowLevelIcebergCommandCheck }
 
diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala
new file mode 100644
index 000000000000..8274c02bd16d
--- /dev/null
+++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.SchemaUtils
+
+object CheckViews extends (LogicalPlan => Unit) {
+
+  override def apply(plan: LogicalPlan): Unit = {
+    plan foreach {
+      case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _,
+      _, _, _, _, _, _) =>
+        verifyColumnCount(ident, columnAliases, query)
+        SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames, SQLConf.get.resolver)
+
+      case _ => // OK
+    }
+  }
+
+  private def verifyColumnCount(ident: Identifier, columns: Seq[String], query: LogicalPlan): Unit = {
+    if (columns.nonEmpty) {
+      if (columns.length > query.output.length) {
+        throw new AnalysisException(String.format("Cannot create view %s, the reason is not enough data columns:\n" +
+          "View columns: %s\n" +
+          "Data columns: %s", ident.toString, columns.mkString(", "), query.output.map(c => c.name).mkString(", ")))
+      } else if (columns.length < query.output.length) {
+        throw new AnalysisException(String.format("Cannot create view %s, the reason is too many data columns:\n" +
+          "View columns: %s\n" +
+          "Data columns: %s", ident.toString, columns.mkString(", "), query.output.map(c => c.name).mkString(", ")))
+      }
+    }
+  }
+}
diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
index 169327042969..29ead971ac1a 100644
--- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
+++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
 import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
@@ -38,6 +39,7 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog
 import org.apache.spark.sql.connector.catalog.View
 import org.apache.spark.sql.connector.catalog.ViewCatalog
 import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.MetadataBuilder
 
 case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog {
 
@@ -59,6 +61,30 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
       loadView(catalog, ident)
         .map(_ => ResolvedV2View(catalog.asViewCatalog, ident))
         .getOrElse(u)
+
+    case c@CreateIcebergView(ResolvedIdentifier(_, _), _, query, columnAliases, columnComments, _, _, _, _, _, _)
+      if query.resolved && !c.rewritten =>
+      val aliased = aliasColumns(query, columnAliases, columnComments)
+      c.copy(query = aliased, queryColumnNames = query.schema.fieldNames, rewritten = true)
+  }
+
+  private def aliasColumns(
+    plan: LogicalPlan,
+    columnAliases: Seq[String],
+    columnComments: Seq[Option[String]]): LogicalPlan = {
+    if (columnAliases.isEmpty || columnAliases.length != plan.output.length) {
+      plan
+    } else {
+      val projectList = plan.output.zipWithIndex.map { case (attr, pos) =>
+        if (columnComments.apply(pos).isDefined) {
+          val meta = new MetadataBuilder().putString("comment", columnComments.apply(pos).get).build()
+          Alias(attr, columnAliases.apply(pos))(explicitMetadata = Some(meta))
+        } else {
+          Alias(attr, columnAliases.apply(pos))()
+        }
+      }
+      Project(projectList, plan)
+    }
   }
 
   def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match {
@@ -150,7 +176,7 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
     catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name))
   }
 
-  implicit class ViewHelper(plugin: CatalogPlugin) {
+  implicit class IcebergViewHelper(plugin: CatalogPlugin) {
     def asViewCatalog: ViewCatalog = plugin match {
       case viewCatalog: ViewCatalog =>
         viewCatalog
diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala
index 2b35db33c0c5..e630933b6fab 100644
--- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala
+++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala
@@ -19,13 +19,19 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+import org.apache.spark.sql.catalyst.plans.logical.CreateView
 import org.apache.spark.sql.catalyst.plans.logical.DropView
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.View
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
 import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.connector.catalog.CatalogPlugin
+import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.catalog.LookupCatalog
 import org.apache.spark.sql.connector.catalog.ViewCatalog
 
@@ -40,6 +46,20 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi
   override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
     case DropView(ResolvedView(resolved), ifExists) =>
       DropIcebergView(resolved, ifExists)
+
+    case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties,
+    Some(queryText), query, allowExisting, replace) =>
+      val q = CTESubstitution.apply(query)
+      verifyTemporaryObjectsDontExist(resolved.identifier, q)
+      CreateIcebergView(child = resolved,
+        queryText = queryText,
+        query = q,
+        columnAliases = userSpecifiedColumns.map(_._1),
+        columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)),
+        comment = comment,
+        properties = properties,
+        allowExisting = allowExisting,
+        replace = replace)
   }
 
   private def isTempView(nameParts: Seq[String]): Boolean = {
@@ -62,4 +82,43 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi
         None
     }
   }
+
+  /**
+   * Permanent views are not allowed to reference temp objects
+   */
+  private def verifyTemporaryObjectsDontExist(
+    name: Identifier,
+    child: LogicalPlan): Unit = {
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+    val tempViews = collectTemporaryViews(child)
+    tempViews.foreach { nameParts =>
+      throw new AnalysisException(String.format("Cannot create the persistent object %s" +
+        " of the type VIEW because it references to the temporary object %s of" +
+        " the type VIEW. Please make the temporary object %s" +
+        " persistent, or make the persistent object %s temporary",
+        name.name(), nameParts.quoted, nameParts.quoted, name.name()))
+    };
+
+    // TODO: check for temp function names
+  }
+
+  /**
+   * Collect all temporary views and return the identifiers separately
+   */
+  private def collectTemporaryViews(child: LogicalPlan): Seq[Seq[String]] = {
+    def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = {
+      child.flatMap {
+        case unresolved: UnresolvedRelation if isTempView(unresolved.multipartIdentifier) =>
+          Seq(unresolved.multipartIdentifier)
+        case view: View if view.isTempView => Seq(view.desc.identifier.nameParts)
+        case plan => plan.expressions.flatMap(_.flatMap {
+          case e: SubqueryExpression => collectTempViews(e.plan)
+          case _ => Seq.empty
+        })
+      }.distinct
+    }
+
+    collectTempViews(child)
+  }
 }
diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala
new file mode 100644
index 000000000000..9366d5efe163
--- /dev/null
+++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.views
+
+import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+case class CreateIcebergView(
+  child: LogicalPlan,
+  queryText: String,
+  query: LogicalPlan,
+  columnAliases: Seq[String],
+  columnComments: Seq[Option[String]],
+  queryColumnNames: Seq[String] = Seq.empty,
+  comment: Option[String],
+  properties: Map[String, String],
+  allowExisting: Boolean,
+  replace: Boolean,
+  rewritten: Boolean = false) extends BinaryCommand {
+  override def left: LogicalPlan = child
+
+  override def right: LogicalPlan = query
+
+  override protected def withNewChildrenInternal(
+    newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
+    copy(child = newLeft, query = newRight)
+}
diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
new file mode 100644
index 000000000000..e326bb87bbf5
--- /dev/null
+++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.types.StructType
+import scala.collection.JavaConverters._
+
+
+case class CreateV2ViewExec(
+  catalog: ViewCatalog,
+  ident: Identifier,
+  queryText: String,
+  viewSchema: StructType,
+  columnAliases: Seq[String],
+  columnComments: Seq[Option[String]],
+  queryColumnNames: Seq[String],
+  comment: Option[String],
+  properties: Map[String, String],
+  allowExisting: Boolean,
+  replace: Boolean) extends LeafV2CommandExec {
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+    val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name
+    val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null
+    val currentNamespace = session.sessionState.catalogManager.currentNamespace
+
+    val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION
+    val newProperties = properties ++
+      comment.map(ViewCatalog.PROP_COMMENT -> _) +
+      (ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion,
+        ViewCatalog.PROP_ENGINE_VERSION -> engineVersion)
+
+    if (replace) {
+      // CREATE OR REPLACE VIEW
+      if (catalog.viewExists(ident)) {
+        catalog.dropView(ident)
+      }
+      // FIXME: replaceView API doesn't exist in Spark 3.5
+      catalog.createView(
+        ident,
+        queryText,
+        currentCatalog,
+        currentNamespace,
+        viewSchema,
+        queryColumnNames.toArray,
+        columnAliases.toArray,
+        columnComments.map(c => c.orNull).toArray,
+        newProperties.asJava)
+    } else {
+      try {
+        // CREATE VIEW [IF NOT EXISTS]
+        catalog.createView(
+          ident,
+          queryText,
+          currentCatalog,
+          currentNamespace,
+          viewSchema,
+          queryColumnNames.toArray,
+          columnAliases.toArray,
+          columnComments.map(c => c.orNull).toArray,
+          newProperties.asJava)
+      } catch {
+        case _: ViewAlreadyExistsException if allowExisting => // Ignore
+      }
+    }
+
+    Nil
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"CreateV2ViewExec: ${ident}"
+  }
+}
diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
index ea228be0b80e..97ac2b87fffa 100644
--- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
+++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
@@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
 import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
 import org.apache.spark.sql.catalyst.plans.logical.UpdateRows
 import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
 import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
 import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
 import org.apache.spark.sql.connector.catalog.Identifier
@@ -134,6 +135,21 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
     case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) =>
       DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil
 
+    case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), queryText, query,
+    columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _) =>
+      CreateV2ViewExec(
+        catalog = viewCatalog,
+        ident = ident,
+        queryText = queryText,
+        columnAliases = columnAliases,
+        columnComments = columnComments,
+        queryColumnNames = queryColumnNames,
+        viewSchema = query.schema,
+        comment = comment,
+        properties = properties,
+        allowExisting = allowExisting,
+        replace = replace) :: Nil
+
     case _ => Nil
   }
 
diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
index 6725c9497499..3cdc99be45b7 100644
--- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
+++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
@@ -40,6 +40,7 @@
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.source.SimpleRecord;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.view.View;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -886,6 +887,268 @@ private String viewName(String viewName) {
     return viewName + new Random().nextInt(1000000);
   }
 
+  @Test
+  public void createViewIfNotExists() {
+    String viewName = "viewThatAlreadyExists";
+    sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName);
+
+    assertThatThrownBy(() -> sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            String.format(
+                "Cannot create view %s.%s because it already exists", NAMESPACE, viewName));
+
+    // using IF NOT EXISTS should work
+    assertThatNoException()
+        .isThrownBy(
+            () -> sql("CREATE VIEW IF NOT EXISTS %s AS SELECT id FROM %s", viewName, tableName));
+  }
+
+  @Test
+  public void createViewWithInvalidSQL() {
+    assertThatThrownBy(() -> sql("CREATE VIEW simpleViewWithInvalidSQL AS invalid SQL"))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Syntax error");
+  }
+
+  @Test
+  public void createViewReferencingTempView() throws NoSuchTableException {
+    insertRows(10);
+    String tempView = "temporaryViewBeingReferencedInAnotherView";
+    String viewReferencingTempView = "viewReferencingTemporaryView";
+
+    sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", tempView, tableName);
+
+    // creating a view that references a TEMP VIEW shouldn't be possible
+    assertThatThrownBy(
+            () -> sql("CREATE VIEW %s AS SELECT id FROM %s", viewReferencingTempView, tempView))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Cannot create the persistent object")
+        .hasMessageContaining(viewReferencingTempView)
+        .hasMessageContaining("of the type VIEW because it references to the temporary object")
+        .hasMessageContaining(tempView);
+  }
+
+  @Test
+  public void createViewReferencingGlobalTempView() throws NoSuchTableException {
+    insertRows(10);
+    String globalTempView = "globalTemporaryViewBeingReferenced";
+    String viewReferencingTempView = "viewReferencingGlobalTemporaryView";
+
+    sql(
+        "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5",
+        globalTempView, tableName);
+
+    // creating a view that references a GLOBAL TEMP VIEW shouldn't be possible
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CREATE VIEW %s AS SELECT id FROM global_temp.%s",
+                    viewReferencingTempView, globalTempView))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Cannot create the persistent object")
+        .hasMessageContaining(viewReferencingTempView)
+        .hasMessageContaining("of the type VIEW because it references to the temporary object")
+        .hasMessageContaining(globalTempView);
+  }
+
+  @Test
+  public void createViewUsingNonExistingTable() {
+    assertThatThrownBy(
+            () -> sql("CREATE VIEW viewWithNonExistingTable AS SELECT id FROM non_existing"))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("The table or view `non_existing` cannot be found");
+  }
+
+  @Test
+  public void createViewWithMismatchedColumnCounts() {
+    String viewName = "viewWithMismatchedColumnCounts";
+
+    assertThatThrownBy(
+            () -> sql("CREATE VIEW %s (id, data) AS SELECT id FROM %s", viewName, tableName))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(String.format("Cannot create view %s.%s", NAMESPACE, viewName))
+        .hasMessageContaining("not enough data columns")
+        .hasMessageContaining("View columns: id, data")
+        .hasMessageContaining("Data columns: id");
+
+    assertThatThrownBy(
+            () -> sql("CREATE VIEW %s (id) AS SELECT id, data FROM %s", viewName, tableName))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(String.format("Cannot create view %s.%s", NAMESPACE, viewName))
+        .hasMessageContaining("too many data columns")
+        .hasMessageContaining("View columns: id")
+        .hasMessageContaining("Data columns: id, data");
+  }
+
+  @Test
+  public void createViewWithColumnAliases() throws NoSuchTableException {
+    insertRows(6);
+    String viewName = "viewWithColumnAliases";
+
+    sql(
+        "CREATE VIEW %s (new_id COMMENT 'ID', new_data COMMENT 'DATA') AS SELECT id, data FROM %s WHERE id <= 3",
+        viewName, tableName);
+
+    View view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName));
+    assertThat(view.properties()).containsEntry("queryColumnNames", "id,data");
+
+    assertThat(view.schema().columns()).hasSize(2);
+    Types.NestedField first = view.schema().columns().get(0);
+    assertThat(first.name()).isEqualTo("new_id");
+    assertThat(first.doc()).isEqualTo("ID");
+
+    Types.NestedField second = view.schema().columns().get(1);
+    assertThat(second.name()).isEqualTo("new_data");
+    assertThat(second.doc()).isEqualTo("DATA");
+
+    assertThat(sql("SELECT new_id FROM %s", viewName))
+        .hasSize(3)
+        .containsExactlyInAnyOrder(row(1), row(2), row(3));
+
+    sql("DROP VIEW %s", viewName);
+
+    sql(
+        "CREATE VIEW %s (new_data, new_id) AS SELECT data, id FROM %s WHERE id <= 3",
+        viewName, tableName);
+
+    assertThat(sql("SELECT new_id FROM %s", viewName))
+        .hasSize(3)
+        .containsExactlyInAnyOrder(row(1), row(2), row(3));
+  }
+
+  @Test
+  public void createViewWithDuplicateColumnNames() {
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CREATE VIEW viewWithDuplicateColumnNames (new_id, new_id) AS SELECT id, id FROM %s WHERE id <= 3",
+                    tableName))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("The column `new_id` already exists");
+  }
+
+  @Test
+  public void createViewWithDuplicateQueryColumnNames() throws NoSuchTableException {
+    insertRows(3);
+    String viewName = "viewWithDuplicateQueryColumnNames";
+    String sql = String.format("SELECT id, id FROM %s WHERE id <= 3", tableName);
+
+    // not specifying column aliases in the view should fail
+    assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("The column `id` already exists");
+
+    sql("CREATE VIEW %s (id_one, id_two) AS %s", viewName, sql);
+
+    assertThat(sql("SELECT * FROM %s", viewName))
+        .hasSize(3)
+        .containsExactlyInAnyOrder(row(1, 1), row(2, 2), row(3, 3));
+  }
+
+  @Test
+  public void createViewWithCTE() throws NoSuchTableException {
+    insertRows(10);
+    String viewName = "simpleViewWithCTE";
+    String sql =
+        String.format(
+            "WITH max_by_data AS (SELECT max(id) as max FROM %s) "
+                + "SELECT max, count(1) AS count FROM max_by_data GROUP BY max",
+            tableName);
+
+    sql("CREATE VIEW %s AS %s", viewName, sql);
+
+    assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L));
+  }
+
+  @Test
+  public void createViewWithConflictingNamesForCTEAndTempView() throws NoSuchTableException {
+    insertRows(10);
+    String viewName = "viewWithConflictingNamesForCTEAndTempView";
+    String cteName = "cteName";
+    String sql =
+        String.format(
+            "WITH %s AS (SELECT max(id) as max FROM %s) "
+                + "(SELECT max, count(1) AS count FROM %s GROUP BY max)",
+            cteName, tableName, cteName);
+
+    // create a CTE and a TEMP VIEW with the same name
+    sql("CREATE TEMPORARY VIEW %s AS SELECT * from %s", cteName, tableName);
+    sql("CREATE VIEW %s AS %s", viewName, sql);
+
+    // CTE should take precedence over the TEMP VIEW when data is read
+    assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L));
+  }
+
+  @Test
+  public void createViewWithCTEReferencingTempView() {
+    String viewName = "viewWithCTEReferencingTempView";
+    String tempViewInCTE = "tempViewInCTE";
+    String sql =
+        String.format(
+            "WITH max_by_data AS (SELECT max(id) as max FROM %s) "
+                + "SELECT max, count(1) AS count FROM max_by_data GROUP BY max",
+            tempViewInCTE);
+
+    sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE ID <= 5", tempViewInCTE, tableName);
+
+    assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Cannot create the persistent object")
+        .hasMessageContaining(viewName)
+        .hasMessageContaining("of the type VIEW because it references to the temporary object")
+        .hasMessageContaining(tempViewInCTE);
+  }
+
+  @Test
+  public void createViewWithNonExistingQueryColumn() {
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CREATE VIEW viewWithNonExistingQueryColumn AS SELECT non_existing FROM %s WHERE id <= 3",
+                    tableName))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            "A column or function parameter with name `non_existing` cannot be resolved");
+  }
+
+  @Test
+  public void createViewWithSubqueryExpressionUsingTempView() {
+    String viewName = "viewWithSubqueryExpression";
+    String tempView = "simpleTempView";
+    String sql =
+        String.format("SELECT * FROM %s WHERE id = (SELECT id FROM %s)", tableName, tempView);
+
+    sql("CREATE TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5", tempView, tableName);
+
+    assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(String.format("Cannot create the persistent object %s", viewName))
+        .hasMessageContaining(
+            String.format("because it references to the temporary object %s", tempView));
+  }
+
+  @Test
+  public void createViewWithSubqueryExpressionUsingGlobalTempView() {
+    String viewName = "simpleViewWithSubqueryExpression";
+    String globalTempView = "simpleGlobalTempView";
+    String sql =
+        String.format(
+            "SELECT * FROM %s WHERE id = (SELECT id FROM global_temp.%s)",
+            tableName, globalTempView);
+
+    sql(
+        "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5",
+        globalTempView, tableName);
+
+    assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(String.format("Cannot create the persistent object %s", viewName))
+        .hasMessageContaining(
+            String.format(
+                "because it references to the temporary object global_temp.%s", globalTempView));
+  }
+
   private void insertRows(int numRows) throws NoSuchTableException {
     List<SimpleRecord> records = Lists.newArrayListWithCapacity(numRows);
     for (int i = 1; i <= numRows; i++) {
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index 23a62858f488..5abf048e87dd 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -26,6 +26,7 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.StringJoiner;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
@@ -568,6 +569,36 @@ public View createView(
       String[] columnComments,
       Map<String, String> properties)
       throws ViewAlreadyExistsException, NoSuchNamespaceException {
+    if (null != asViewCatalog) {
+      Schema icebergSchema = SparkSchemaUtil.convert(schema);
+
+      StringJoiner joiner = new StringJoiner(",");
+      Arrays.stream(queryColumnNames).forEach(joiner::add);
+
+      try {
+        Map<String, String> props =
+            ImmutableMap.<String, String>builder()
+                .putAll(Spark3Util.rebuildCreateProperties(properties))
+                .put("queryColumnNames", joiner.toString())
+                .build();
+        org.apache.iceberg.view.View view =
+            asViewCatalog
+                .buildView(buildIdentifier(ident))
+                .withDefaultCatalog(currentCatalog)
+                .withDefaultNamespace(Namespace.of(currentNamespace))
+                .withQuery("spark", sql)
+                .withSchema(icebergSchema)
+                .withLocation(properties.get("location"))
+                .withProperties(props)
+                .create();
+        return new SparkView(catalogName, view);
+      } catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) {
+        throw new NoSuchNamespaceException(currentNamespace);
+      } catch (AlreadyExistsException e) {
+        throw new ViewAlreadyExistsException(ident);
+      }
+    }
+
     throw new UnsupportedOperationException(
         "Creating a view is not supported by catalog: " + catalogName);
   }
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
index 424519623e4d..5391d75476ce 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
@@ -35,8 +35,9 @@
 
 public class SparkView implements org.apache.spark.sql.connector.catalog.View {
 
+  private static final String QUERY_COLUMN_NAMES = "queryColumnNames";
   private static final Set<String> RESERVED_PROPERTIES =
-      ImmutableSet.of("provider", "location", FORMAT_VERSION);
+      ImmutableSet.of("provider", "location", FORMAT_VERSION, QUERY_COLUMN_NAMES);
 
   private final View icebergView;
   private final String catalogName;
@@ -86,7 +87,9 @@ public StructType schema() {
 
   @Override
   public String[] queryColumnNames() {
-    return new String[0];
+    return icebergView.properties().containsKey(QUERY_COLUMN_NAMES)
+        ? icebergView.properties().get(QUERY_COLUMN_NAMES).split(",")
+        : new String[0];
   }
 
   @Override