Skip to content

Commit

Permalink
Spark: Add view support to SparkSessionCatalog (#11388)
Browse files Browse the repository at this point in the history
* Spark: Add view support to SparkSessionCatalog

* Don't replace view non-atomically
  • Loading branch information
nastra authored Dec 3, 2024
1 parent af8e3f5 commit 6501d29
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.util.List;
import java.util.Locale;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
Expand All @@ -35,6 +38,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalogConfig;
Expand All @@ -61,21 +65,30 @@
@ExtendWith(ParameterizedTestExtension.class)
public class TestViews extends ExtensionsTestBase {
private static final Namespace NAMESPACE = Namespace.of("default");
private static final String SPARK_CATALOG = "spark_catalog";
private final String tableName = "table";

@BeforeEach
@Override
public void before() {
super.before();
spark.conf().set("spark.sql.defaultCatalog", catalogName);
sql("USE %s", catalogName);
sql("CREATE NAMESPACE IF NOT EXISTS %s", NAMESPACE);
sql("CREATE TABLE %s (id INT, data STRING)", tableName);
sql(
"CREATE TABLE IF NOT EXISTS %s.%s (id INT, data STRING)%s",
NAMESPACE, tableName, catalogName.equals(SPARK_CATALOG) ? " USING iceberg" : "");
sql("USE %s.%s", catalogName, NAMESPACE);
}

@AfterEach
public void removeTable() {
sql("USE %s", catalogName);
sql("DROP TABLE IF EXISTS %s", tableName);
sql("DROP TABLE IF EXISTS %s.%s", NAMESPACE, tableName);

// reset spark session catalog
spark.sessionState().catalogManager().reset();
spark.conf().unset("spark.sql.catalog.spark_catalog");
}

@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
Expand All @@ -85,6 +98,14 @@ public static Object[][] parameters() {
SparkCatalogConfig.SPARK_WITH_VIEWS.catalogName(),
SparkCatalogConfig.SPARK_WITH_VIEWS.implementation(),
SparkCatalogConfig.SPARK_WITH_VIEWS.properties()
},
{
SparkCatalogConfig.SPARK_SESSION_WITH_VIEWS.catalogName(),
SparkCatalogConfig.SPARK_SESSION_WITH_VIEWS.implementation(),
ImmutableMap.builder()
.putAll(SparkCatalogConfig.SPARK_SESSION_WITH_VIEWS.properties())
.put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI))
.build()
}
};
}
Expand Down Expand Up @@ -502,11 +523,20 @@ public void readFromViewReferencingTempFunction() throws NoSuchTableException {

assertThat(sql(sql)).hasSize(1).containsExactly(row(5.5));

String expectedErrorMsg =
String.format("Cannot load function: %s.%s.%s", catalogName, NAMESPACE, functionName);
if (SPARK_CATALOG.equals(catalogName)) {
// spark session catalog tries to load a V1 function and has a different error msg
expectedErrorMsg =
String.format(
"[ROUTINE_NOT_FOUND] The function `%s`.`%s` cannot be found",
NAMESPACE, functionName);
}

// reading from a view that references a TEMP FUNCTION shouldn't be possible
assertThatThrownBy(() -> sql("SELECT * FROM %s", viewName))
.isInstanceOf(AnalysisException.class)
.hasMessageStartingWith(
String.format("Cannot load function: %s.%s.%s", catalogName, NAMESPACE, functionName));
.hasMessageStartingWith(expectedErrorMsg);
}

@TestTemplate
Expand Down Expand Up @@ -534,6 +564,9 @@ public void readFromViewWithCTE() throws NoSuchTableException {

@TestTemplate
public void rewriteFunctionIdentifier() {
assumeThat(catalogName)
.as("system namespace doesn't exist in SparkSessionCatalog")
.isNotEqualTo(SPARK_CATALOG);
String viewName = viewName("rewriteFunctionIdentifier");
String sql = "SELECT iceberg_version() AS version";

Expand Down Expand Up @@ -579,6 +612,9 @@ public void builtinFunctionIdentifierNotRewritten() {

@TestTemplate
public void rewriteFunctionIdentifierWithNamespace() {
assumeThat(catalogName)
.as("system namespace doesn't exist in SparkSessionCatalog")
.isNotEqualTo(SPARK_CATALOG);
String viewName = viewName("rewriteFunctionIdentifierWithNamespace");
String sql = "SELECT system.bucket(100, 'a') AS bucket_result, 'a' AS value";

Expand All @@ -596,8 +632,7 @@ public void rewriteFunctionIdentifierWithNamespace() {

assertThatThrownBy(() -> sql(sql))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("Cannot resolve function")
.hasMessageContaining("`system`.`bucket`");
.hasMessageContaining("Cannot resolve function `system`.`bucket`");

assertThat(sql("SELECT * FROM %s.%s.%s", catalogName, NAMESPACE, viewName))
.hasSize(1)
Expand All @@ -606,6 +641,9 @@ public void rewriteFunctionIdentifierWithNamespace() {

@TestTemplate
public void fullFunctionIdentifier() {
assumeThat(catalogName)
.as("system namespace doesn't exist in SparkSessionCatalog")
.isNotEqualTo(SPARK_CATALOG);
String viewName = viewName("fullFunctionIdentifier");
String sql =
String.format(
Expand Down Expand Up @@ -754,10 +792,19 @@ public void renameViewToDifferentTargetCatalog() {
.withSchema(schema(sql))
.create();

assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO spark_catalog.%s", viewName, renamedView))
String targetCatalog =
catalogName.equals(SPARK_CATALOG)
? SparkCatalogConfig.SPARK_WITH_VIEWS.catalogName()
: SPARK_CATALOG;

assertThatThrownBy(
() ->
sql(
"ALTER VIEW %s RENAME TO %s.%s.%s",
viewName, targetCatalog, NAMESPACE, renamedView))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(
"Cannot move view between catalogs: from=spark_with_views and to=spark_catalog");
"Cannot move view between catalogs: from=%s and to=%s", catalogName, targetCatalog);
}

@TestTemplate
Expand Down Expand Up @@ -813,7 +860,9 @@ public void renameViewTargetAlreadyExistsAsTable() {
.withSchema(schema(sql))
.create();

sql("CREATE TABLE %s (id INT, data STRING)", target);
sql(
"CREATE TABLE %s.%s.%s (id INT, data STRING)%s",
catalogName, NAMESPACE, target, catalogName.equals(SPARK_CATALOG) ? " USING iceberg" : "");
assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO %s", viewName, target))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(
Expand Down Expand Up @@ -895,30 +944,6 @@ public void dropTempView() {
assertThat(v1SessionCatalog().getTempView(tempView).isDefined()).isFalse();
}

/** The purpose of this test is mainly to make sure that normal view deletion isn't messed up */
@TestTemplate
public void dropV1View() {
String v1View = viewName("v1ViewToBeDropped");
sql("USE spark_catalog");
sql("CREATE NAMESPACE IF NOT EXISTS %s", NAMESPACE);
sql("CREATE TABLE %s (id INT, data STRING)", tableName);
sql("CREATE VIEW %s AS SELECT id FROM %s", v1View, tableName);
sql("USE %s", catalogName);
assertThat(
v1SessionCatalog()
.tableExists(new org.apache.spark.sql.catalyst.TableIdentifier(v1View)))
.isTrue();

sql("DROP VIEW spark_catalog.%s.%s", NAMESPACE, v1View);
assertThat(
v1SessionCatalog()
.tableExists(new org.apache.spark.sql.catalyst.TableIdentifier(v1View)))
.isFalse();

sql("USE spark_catalog");
sql("DROP TABLE IF EXISTS %s", tableName);
}

private SessionCatalog v1SessionCatalog() {
return spark.sessionState().catalogManager().v1SessionCatalog();
}
Expand Down Expand Up @@ -1316,11 +1341,13 @@ public void createViewWithSubqueryExpressionInFilterThatIsRewritten()

assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(5));

sql("USE spark_catalog");
if (!catalogName.equals(SPARK_CATALOG)) {
sql("USE spark_catalog");

assertThatThrownBy(() -> sql(sql))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(String.format("The table or view `%s` cannot be found", tableName));
assertThatThrownBy(() -> sql(sql))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(String.format("The table or view `%s` cannot be found", tableName));
}

// the underlying SQL in the View should be rewritten to have catalog & namespace
assertThat(sql("SELECT * FROM %s.%s.%s", catalogName, NAMESPACE, viewName))
Expand All @@ -1341,11 +1368,13 @@ public void createViewWithSubqueryExpressionInQueryThatIsRewritten() throws NoSu
.hasSize(3)
.containsExactly(row(3), row(3), row(3));

sql("USE spark_catalog");
if (!catalogName.equals(SPARK_CATALOG)) {
sql("USE spark_catalog");

assertThatThrownBy(() -> sql(sql))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(String.format("The table or view `%s` cannot be found", tableName));
assertThatThrownBy(() -> sql(sql))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(String.format("The table or view `%s` cannot be found", tableName));
}

// the underlying SQL in the View should be rewritten to have catalog & namespace
assertThat(sql("SELECT * FROM %s.%s.%s", catalogName, NAMESPACE, viewName))
Expand All @@ -1370,6 +1399,7 @@ public void describeExtendedView() {
sql(
"CREATE VIEW %s (new_id COMMENT 'ID', new_data COMMENT 'DATA') COMMENT 'view comment' AS %s",
viewName, sql);
String location = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName)).location();
assertThat(sql("DESCRIBE EXTENDED %s", viewName))
.contains(
row("new_id", "int", "ID"),
Expand All @@ -1382,8 +1412,8 @@ public void describeExtendedView() {
row(
"View Properties",
String.format(
"['format-version' = '1', 'location' = '/%s/%s', 'provider' = 'iceberg']",
NAMESPACE, viewName),
"['format-version' = '1', 'location' = '%s', 'provider' = 'iceberg']",
location),
""));
}

Expand Down Expand Up @@ -1441,12 +1471,15 @@ public void showViews() throws NoSuchTableException {
row(NAMESPACE.toString(), v1, false),
tempView);

assertThat(sql("SHOW VIEWS IN %s", catalogName))
.contains(
row(NAMESPACE.toString(), prefixV2, false),
row(NAMESPACE.toString(), prefixV3, false),
row(NAMESPACE.toString(), v1, false),
tempView);
if (!"rest".equals(catalogConfig.get(CatalogUtil.ICEBERG_CATALOG_TYPE))) {
// REST catalog requires a namespace
assertThat(sql("SHOW VIEWS IN %s", catalogName))
.contains(
row(NAMESPACE.toString(), prefixV2, false),
row(NAMESPACE.toString(), prefixV3, false),
row(NAMESPACE.toString(), v1, false),
tempView);
}

assertThat(sql("SHOW VIEWS IN %s.%s", catalogName, NAMESPACE))
.contains(
Expand All @@ -1461,7 +1494,10 @@ public void showViews() throws NoSuchTableException {

assertThat(sql("SHOW VIEWS LIKE 'non-existing'")).isEmpty();

assertThat(sql("SHOW VIEWS IN spark_catalog.default")).contains(tempView);
if (!catalogName.equals(SPARK_CATALOG)) {
sql("CREATE NAMESPACE IF NOT EXISTS spark_catalog.%s", NAMESPACE);
assertThat(sql("SHOW VIEWS IN spark_catalog.%s", NAMESPACE)).contains(tempView);
}

assertThat(sql("SHOW VIEWS IN global_temp"))
.contains(
Expand Down Expand Up @@ -1512,17 +1548,18 @@ public void showCreateSimpleView() {

sql("CREATE VIEW %s AS %s", viewName, sql);

String location = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName)).location();
String expected =
String.format(
"CREATE VIEW %s.%s.%s (\n"
+ " id,\n"
+ " data)\n"
+ "TBLPROPERTIES (\n"
+ " 'format-version' = '1',\n"
+ " 'location' = '/%s/%s',\n"
+ " 'location' = '%s',\n"
+ " 'provider' = 'iceberg')\n"
+ "AS\n%s\n",
catalogName, NAMESPACE, viewName, NAMESPACE, viewName, sql);
catalogName, NAMESPACE, viewName, location, sql);
assertThat(sql("SHOW CREATE TABLE %s", viewName)).containsExactly(row(expected));
}

Expand All @@ -1536,6 +1573,7 @@ public void showCreateComplexView() {
+ "COMMENT 'view comment' TBLPROPERTIES ('key1'='val1', 'key2'='val2') AS %s",
viewName, sql);

String location = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName)).location();
String expected =
String.format(
"CREATE VIEW %s.%s.%s (\n"
Expand All @@ -1546,10 +1584,10 @@ public void showCreateComplexView() {
+ " 'format-version' = '1',\n"
+ " 'key1' = 'val1',\n"
+ " 'key2' = 'val2',\n"
+ " 'location' = '/%s/%s',\n"
+ " 'location' = '%s',\n"
+ " 'provider' = 'iceberg')\n"
+ "AS\n%s\n",
catalogName, NAMESPACE, viewName, NAMESPACE, viewName, sql);
catalogName, NAMESPACE, viewName, location, sql);
assertThat(sql("SHOW CREATE TABLE %s", viewName)).containsExactly(row(expected));
}

Expand Down Expand Up @@ -1897,6 +1935,7 @@ public void createViewWithRecursiveCycle() {

@TestTemplate
public void createViewWithRecursiveCycleToV1View() {
assumeThat(catalogName).isNotEqualTo(SPARK_CATALOG);
String viewOne = viewName("view_one");
String viewTwo = viewName("view_two");

Expand All @@ -1908,7 +1947,7 @@ public void createViewWithRecursiveCycleToV1View() {
sql("USE %s", catalogName);
// viewOne points to viewTwo points to viewOne, creating a recursive cycle
String view1 = String.format("%s.%s.%s", catalogName, NAMESPACE, viewOne);
String view2 = String.format("%s.%s.%s", "spark_catalog", NAMESPACE, viewTwo);
String view2 = String.format("%s.%s.%s", SPARK_CATALOG, NAMESPACE, viewTwo);
String cycle = String.format("%s -> %s -> %s", view1, view2, view1);
assertThatThrownBy(() -> sql("CREATE OR REPLACE VIEW %s AS SELECT * FROM %s", viewOne, view2))
.isInstanceOf(AnalysisException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.ViewCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
Expand All @@ -35,7 +36,9 @@ abstract class BaseCatalog
ProcedureCatalog,
SupportsNamespaces,
HasIcebergCatalog,
SupportsFunctions {
SupportsFunctions,
ViewCatalog,
SupportsReplaceView {
private static final String USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS = "use-nullable-query-schema";
private static final boolean USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@
*
* <p>
*/
public class SparkCatalog extends BaseCatalog
implements org.apache.spark.sql.connector.catalog.ViewCatalog, SupportsReplaceView {
public class SparkCatalog extends BaseCatalog {
private static final Set<String> DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER);
private static final Splitter COMMA = Splitter.on(",");
private static final Joiner COMMA_JOINER = Joiner.on(",");
Expand Down
Loading

0 comments on commit 6501d29

Please sign in to comment.