From 52bfbdc0b03bafb610efde8486a6633be2b9cc4f Mon Sep 17 00:00:00 2001 From: swapna marru Date: Tue, 28 Jan 2025 09:34:45 -0800 Subject: [PATCH] undo 1.19 changes --- .../apache/iceberg/flink/FlinkCatalog.java | 28 ++++------ .../iceberg/flink/FlinkCatalogFactory.java | 1 - .../flink/FlinkCreateTableOptions.java | 51 ------------------- .../flink/FlinkDynamicTableFactory.java | 45 ++++++++++++---- .../flink/source/IcebergTableSource.java | 12 +---- .../iceberg/flink/TestFlinkCatalogTable.java | 27 ++-------- .../iceberg/flink/TestIcebergConnector.java | 37 ++++++++++++++ 7 files changed, 86 insertions(+), 115 deletions(-) delete mode 100644 flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 081f81cb8450..86295d78cc13 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -97,7 +97,6 @@ public class FlinkCatalog extends AbstractCatalog { private final Namespace baseNamespace; private final SupportsNamespaces asNamespaceCatalog; private final Closeable closeable; - private final Map catalogProps; private final boolean cacheEnabled; public FlinkCatalog( @@ -105,12 +104,10 @@ public FlinkCatalog( String defaultDatabase, Namespace baseNamespace, CatalogLoader catalogLoader, - Map catalogProps, boolean cacheEnabled, long cacheExpirationIntervalMs) { super(catalogName, defaultDatabase); this.catalogLoader = catalogLoader; - this.catalogProps = catalogProps; this.baseNamespace = baseNamespace; this.cacheEnabled = cacheEnabled; @@ -335,15 +332,7 @@ public List listTables(String databaseName) public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { Table table = loadIcebergTable(tablePath); - Map catalogAndTableProps = Maps.newHashMap(catalogProps); - catalogAndTableProps.put(FlinkCreateTableOptions.CATALOG_NAME.key(), getName()); - catalogAndTableProps.put( - FlinkCreateTableOptions.CATALOG_DATABASE.key(), tablePath.getDatabaseName()); - catalogAndTableProps.put( - FlinkCreateTableOptions.CATALOG_TABLE.key(), tablePath.getObjectName()); - catalogAndTableProps.put("connector", FlinkDynamicTableFactory.FACTORY_IDENTIFIER); - catalogAndTableProps.putAll(table.properties()); - return toCatalogTableWithProps(table, catalogAndTableProps); + return toCatalogTable(table); } private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException { @@ -395,6 +384,13 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws CatalogException, TableAlreadyExistException { + if (Objects.equals( + table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) { + throw new IllegalArgumentException( + "Cannot create the table with 'connector'='iceberg' table property in " + + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or " + + "create table without 'connector'='iceberg' related properties in an iceberg table."); + } Preconditions.checkArgument(table instanceof ResolvedCatalogTable, "table should be resolved"); createIcebergTable(tablePath, (ResolvedCatalogTable) table, ignoreIfExists); } @@ -629,7 +625,7 @@ private static List toPartitionKeys(PartitionSpec spec, Schema icebergSc return partitionKeysBuilder.build(); } - static CatalogTable toCatalogTableWithProps(Table table, Map props) { + static CatalogTable toCatalogTable(Table table) { TableSchema schema = FlinkSchemaUtil.toSchema(table.schema()); List partitionKeys = toPartitionKeys(table.spec(), table.schema()); @@ -638,11 +634,7 @@ static CatalogTable toCatalogTableWithProps(Table table, Map pro // CatalogTableImpl to copy a new catalog table. // Let's re-loading table from Iceberg catalog when creating source/sink operators. // Iceberg does not have Table comment, so pass a null (Default comment value in Flink). - return new CatalogTableImpl(schema, partitionKeys, props, null); - } - - static CatalogTable toCatalogTable(Table table) { - return toCatalogTableWithProps(table, table.properties()); + return new CatalogTableImpl(schema, partitionKeys, table.properties(), null); } @Override diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index dd065617bd88..fe4008a13ce5 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -168,7 +168,6 @@ protected Catalog createCatalog( defaultDatabase, baseNamespace, catalogLoader, - properties, cacheEnabled, cacheExpirationIntervalMs); } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java deleted file mode 100644 index f0df076abe31..000000000000 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink; - -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; - -public class FlinkCreateTableOptions { - - private FlinkCreateTableOptions() {} - - public static final ConfigOption CATALOG_NAME = - ConfigOptions.key("catalog-name") - .stringType() - .noDefaultValue() - .withDescription("Catalog name"); - - public static final ConfigOption CATALOG_TYPE = - ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) - .stringType() - .noDefaultValue() - .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); - - public static final ConfigOption CATALOG_DATABASE = - ConfigOptions.key("catalog-database") - .stringType() - .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) - .withDescription("Database name managed in the iceberg catalog."); - - public static final ConfigOption CATALOG_TABLE = - ConfigOptions.key("catalog-table") - .stringType() - .noDefaultValue() - .withDescription("Table name managed in the underlying iceberg catalog and database."); -} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java index f49ab9c646c2..b7f1be4b93fb 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogDatabaseImpl; @@ -44,6 +45,31 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory { static final String FACTORY_IDENTIFIER = "iceberg"; + + private static final ConfigOption CATALOG_NAME = + ConfigOptions.key("catalog-name") + .stringType() + .noDefaultValue() + .withDescription("Catalog name"); + + private static final ConfigOption CATALOG_TYPE = + ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) + .stringType() + .noDefaultValue() + .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); + + private static final ConfigOption CATALOG_DATABASE = + ConfigOptions.key("catalog-database") + .stringType() + .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) + .withDescription("Database name managed in the iceberg catalog."); + + private static final ConfigOption CATALOG_TABLE = + ConfigOptions.key("catalog-table") + .stringType() + .noDefaultValue() + .withDescription("Table name managed in the underlying iceberg catalog and database."); + private final FlinkCatalog catalog; public FlinkDynamicTableFactory() { @@ -101,16 +127,16 @@ public DynamicTableSink createDynamicTableSink(Context context) { @Override public Set> requiredOptions() { Set> options = Sets.newHashSet(); - options.add(FlinkCreateTableOptions.CATALOG_TYPE); - options.add(FlinkCreateTableOptions.CATALOG_NAME); + options.add(CATALOG_TYPE); + options.add(CATALOG_NAME); return options; } @Override public Set> optionalOptions() { Set> options = Sets.newHashSet(); - options.add(FlinkCreateTableOptions.CATALOG_DATABASE); - options.add(FlinkCreateTableOptions.CATALOG_TABLE); + options.add(CATALOG_DATABASE); + options.add(CATALOG_TABLE); return options; } @@ -127,17 +153,14 @@ private static TableLoader createTableLoader( Configuration flinkConf = new Configuration(); tableProps.forEach(flinkConf::setString); - String catalogName = flinkConf.getString(FlinkCreateTableOptions.CATALOG_NAME); + String catalogName = flinkConf.getString(CATALOG_NAME); Preconditions.checkNotNull( - catalogName, - "Table property '%s' cannot be null", - FlinkCreateTableOptions.CATALOG_NAME.key()); + catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key()); - String catalogDatabase = - flinkConf.getString(FlinkCreateTableOptions.CATALOG_DATABASE, databaseName); + String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, databaseName); Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null"); - String catalogTable = flinkConf.getString(FlinkCreateTableOptions.CATALOG_TABLE, tableName); + String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName); Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null"); org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java index 1325b8d8dd70..65adce77d9f9 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java @@ -35,7 +35,6 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; -import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark; import org.apache.flink.table.data.RowData; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.types.DataType; @@ -54,8 +53,7 @@ public class IcebergTableSource implements ScanTableSource, SupportsProjectionPushDown, SupportsFilterPushDown, - SupportsLimitPushDown, - SupportsSourceWatermark { + SupportsLimitPushDown { private int[] projectedFields; private Long limit; @@ -177,14 +175,6 @@ public Result applyFilters(List flinkFilters) { return Result.of(acceptedFilters, flinkFilters); } - @Override - public void applySourceWatermark() { - if (!readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE)) { - throw new UnsupportedOperationException( - "Source watermarks are supported only in flip-27 iceberg source implementation"); - } - } - @Override public boolean supportsNestedProjection() { // TODO: support nested projection diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index be5aa53dff75..04d7b8da6b9c 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -188,23 +188,6 @@ public void testCreateTableLike() throws TableNotExistException { .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); } - @TestTemplate - public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT)"); - - sql("CREATE TABLE `default_catalog`.`default_database`.tl2 LIKE tl"); - - CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2"); - assertThat(catalogTable.getSchema()) - .isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build()); - - Map options = catalogTable.getOptions(); - assertThat(options.entrySet().containsAll(config.entrySet())).isTrue(); - assertThat(options.get(FlinkCreateTableOptions.CATALOG_NAME.key())).isEqualTo(catalogName); - assertThat(options.get(FlinkCreateTableOptions.CATALOG_DATABASE.key())).isEqualTo(DATABASE); - assertThat(options.get(FlinkCreateTableOptions.CATALOG_TABLE.key())).isEqualTo("tl"); - } - @TestTemplate public void testCreateTableLocation() { assumeThat(isHadoopCatalog) @@ -677,12 +660,10 @@ private Table table(String name) { } private CatalogTable catalogTable(String name) throws TableNotExistException { - return catalogTable(getTableEnv().getCurrentCatalog(), DATABASE, name); - } - - private CatalogTable catalogTable(String catalog, String database, String table) - throws TableNotExistException { return (CatalogTable) - getTableEnv().getCatalog(catalog).get().getTable(new ObjectPath(database, table)); + getTableEnv() + .getCatalog(getTableEnv().getCurrentCatalog()) + .get() + .getTable(new ObjectPath(DATABASE, name)); } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java index 525df0e3d136..47f5485df879 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java @@ -256,6 +256,43 @@ public void testCatalogDatabaseConflictWithFlinkDatabase() { .hasMessageStartingWith("Could not execute CreateTable in path"); } + @TestTemplate + public void testConnectorTableInIcebergCatalog() { + // Create the catalog properties + Map catalogProps = Maps.newHashMap(); + catalogProps.put("type", "iceberg"); + if (isHiveCatalog()) { + catalogProps.put("catalog-type", "hive"); + catalogProps.put(CatalogProperties.URI, CatalogTestBase.getURI(hiveConf)); + } else { + catalogProps.put("catalog-type", "hadoop"); + } + catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, createWarehouse()); + + // Create the table properties + Map tableProps = createTableProps(); + + // Create a connector table in an iceberg catalog. + sql("CREATE CATALOG `test_catalog` WITH %s", toWithClause(catalogProps)); + try { + assertThatThrownBy( + () -> + sql( + "CREATE TABLE `test_catalog`.`%s`.`%s` (id BIGINT, data STRING) WITH %s", + FlinkCatalogFactory.DEFAULT_DATABASE_NAME, + TABLE_NAME, + toWithClause(tableProps))) + .cause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Cannot create the table with 'connector'='iceberg' table property in an iceberg catalog, " + + "Please create table with 'connector'='iceberg' property in a non-iceberg catalog or " + + "create table without 'connector'='iceberg' related properties in an iceberg table."); + } finally { + sql("DROP CATALOG IF EXISTS `test_catalog`"); + } + } + private Map createTableProps() { Map tableProps = Maps.newHashMap(properties); tableProps.put("catalog-name", catalogName);