Skip to content

Commit

Permalink
undo 1.19 changes
Browse files Browse the repository at this point in the history
  • Loading branch information
swapna267 committed Jan 28, 2025
1 parent 38df8d8 commit 52bfbdc
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,17 @@ public class FlinkCatalog extends AbstractCatalog {
private final Namespace baseNamespace;
private final SupportsNamespaces asNamespaceCatalog;
private final Closeable closeable;
private final Map<String, String> catalogProps;
private final boolean cacheEnabled;

public FlinkCatalog(
String catalogName,
String defaultDatabase,
Namespace baseNamespace,
CatalogLoader catalogLoader,
Map<String, String> catalogProps,
boolean cacheEnabled,
long cacheExpirationIntervalMs) {
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.catalogProps = catalogProps;
this.baseNamespace = baseNamespace;
this.cacheEnabled = cacheEnabled;

Expand Down Expand Up @@ -335,15 +332,7 @@ public List<String> listTables(String databaseName)
public CatalogTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
Table table = loadIcebergTable(tablePath);
Map<String, String> catalogAndTableProps = Maps.newHashMap(catalogProps);
catalogAndTableProps.put(FlinkCreateTableOptions.CATALOG_NAME.key(), getName());
catalogAndTableProps.put(
FlinkCreateTableOptions.CATALOG_DATABASE.key(), tablePath.getDatabaseName());
catalogAndTableProps.put(
FlinkCreateTableOptions.CATALOG_TABLE.key(), tablePath.getObjectName());
catalogAndTableProps.put("connector", FlinkDynamicTableFactory.FACTORY_IDENTIFIER);
catalogAndTableProps.putAll(table.properties());
return toCatalogTableWithProps(table, catalogAndTableProps);
return toCatalogTable(table);
}

private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
Expand Down Expand Up @@ -395,6 +384,13 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws CatalogException, TableAlreadyExistException {
if (Objects.equals(
table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
throw new IllegalArgumentException(
"Cannot create the table with 'connector'='iceberg' table property in "
+ "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or "
+ "create table without 'connector'='iceberg' related properties in an iceberg table.");
}
Preconditions.checkArgument(table instanceof ResolvedCatalogTable, "table should be resolved");
createIcebergTable(tablePath, (ResolvedCatalogTable) table, ignoreIfExists);
}
Expand Down Expand Up @@ -629,7 +625,7 @@ private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSc
return partitionKeysBuilder.build();
}

static CatalogTable toCatalogTableWithProps(Table table, Map<String, String> props) {
static CatalogTable toCatalogTable(Table table) {
TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());

Expand All @@ -638,11 +634,7 @@ static CatalogTable toCatalogTableWithProps(Table table, Map<String, String> pro
// CatalogTableImpl to copy a new catalog table.
// Let's re-loading table from Iceberg catalog when creating source/sink operators.
// Iceberg does not have Table comment, so pass a null (Default comment value in Flink).
return new CatalogTableImpl(schema, partitionKeys, props, null);
}

static CatalogTable toCatalogTable(Table table) {
return toCatalogTableWithProps(table, table.properties());
return new CatalogTableImpl(schema, partitionKeys, table.properties(), null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ protected Catalog createCatalog(
defaultDatabase,
baseNamespace,
catalogLoader,
properties,
cacheEnabled,
cacheExpirationIntervalMs);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
Expand All @@ -44,6 +45,31 @@
public class FlinkDynamicTableFactory
implements DynamicTableSinkFactory, DynamicTableSourceFactory {
static final String FACTORY_IDENTIFIER = "iceberg";

private static final ConfigOption<String> CATALOG_NAME =
ConfigOptions.key("catalog-name")
.stringType()
.noDefaultValue()
.withDescription("Catalog name");

private static final ConfigOption<String> CATALOG_TYPE =
ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
.stringType()
.noDefaultValue()
.withDescription("Catalog type, the optional types are: custom, hadoop, hive.");

private static final ConfigOption<String> CATALOG_DATABASE =
ConfigOptions.key("catalog-database")
.stringType()
.defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
.withDescription("Database name managed in the iceberg catalog.");

private static final ConfigOption<String> CATALOG_TABLE =
ConfigOptions.key("catalog-table")
.stringType()
.noDefaultValue()
.withDescription("Table name managed in the underlying iceberg catalog and database.");

private final FlinkCatalog catalog;

public FlinkDynamicTableFactory() {
Expand Down Expand Up @@ -101,16 +127,16 @@ public DynamicTableSink createDynamicTableSink(Context context) {
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = Sets.newHashSet();
options.add(FlinkCreateTableOptions.CATALOG_TYPE);
options.add(FlinkCreateTableOptions.CATALOG_NAME);
options.add(CATALOG_TYPE);
options.add(CATALOG_NAME);
return options;
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = Sets.newHashSet();
options.add(FlinkCreateTableOptions.CATALOG_DATABASE);
options.add(FlinkCreateTableOptions.CATALOG_TABLE);
options.add(CATALOG_DATABASE);
options.add(CATALOG_TABLE);
return options;
}

Expand All @@ -127,17 +153,14 @@ private static TableLoader createTableLoader(
Configuration flinkConf = new Configuration();
tableProps.forEach(flinkConf::setString);

String catalogName = flinkConf.getString(FlinkCreateTableOptions.CATALOG_NAME);
String catalogName = flinkConf.getString(CATALOG_NAME);
Preconditions.checkNotNull(
catalogName,
"Table property '%s' cannot be null",
FlinkCreateTableOptions.CATALOG_NAME.key());
catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key());

String catalogDatabase =
flinkConf.getString(FlinkCreateTableOptions.CATALOG_DATABASE, databaseName);
String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, databaseName);
Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null");

String catalogTable = flinkConf.getString(FlinkCreateTableOptions.CATALOG_TABLE, tableName);
String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName);
Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null");

org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
Expand All @@ -54,8 +53,7 @@ public class IcebergTableSource
implements ScanTableSource,
SupportsProjectionPushDown,
SupportsFilterPushDown,
SupportsLimitPushDown,
SupportsSourceWatermark {
SupportsLimitPushDown {

private int[] projectedFields;
private Long limit;
Expand Down Expand Up @@ -177,14 +175,6 @@ public Result applyFilters(List<ResolvedExpression> flinkFilters) {
return Result.of(acceptedFilters, flinkFilters);
}

@Override
public void applySourceWatermark() {
if (!readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE)) {
throw new UnsupportedOperationException(
"Source watermarks are supported only in flip-27 iceberg source implementation");
}
}

@Override
public boolean supportsNestedProjection() {
// TODO: support nested projection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,23 +188,6 @@ public void testCreateTableLike() throws TableNotExistException {
.isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build());
}

@TestTemplate
public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT)");

sql("CREATE TABLE `default_catalog`.`default_database`.tl2 LIKE tl");

CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2");
assertThat(catalogTable.getSchema())
.isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build());

Map<String, String> options = catalogTable.getOptions();
assertThat(options.entrySet().containsAll(config.entrySet())).isTrue();
assertThat(options.get(FlinkCreateTableOptions.CATALOG_NAME.key())).isEqualTo(catalogName);
assertThat(options.get(FlinkCreateTableOptions.CATALOG_DATABASE.key())).isEqualTo(DATABASE);
assertThat(options.get(FlinkCreateTableOptions.CATALOG_TABLE.key())).isEqualTo("tl");
}

@TestTemplate
public void testCreateTableLocation() {
assumeThat(isHadoopCatalog)
Expand Down Expand Up @@ -677,12 +660,10 @@ private Table table(String name) {
}

private CatalogTable catalogTable(String name) throws TableNotExistException {
return catalogTable(getTableEnv().getCurrentCatalog(), DATABASE, name);
}

private CatalogTable catalogTable(String catalog, String database, String table)
throws TableNotExistException {
return (CatalogTable)
getTableEnv().getCatalog(catalog).get().getTable(new ObjectPath(database, table));
getTableEnv()
.getCatalog(getTableEnv().getCurrentCatalog())
.get()
.getTable(new ObjectPath(DATABASE, name));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,43 @@ public void testCatalogDatabaseConflictWithFlinkDatabase() {
.hasMessageStartingWith("Could not execute CreateTable in path");
}

@TestTemplate
public void testConnectorTableInIcebergCatalog() {
// Create the catalog properties
Map<String, String> catalogProps = Maps.newHashMap();
catalogProps.put("type", "iceberg");
if (isHiveCatalog()) {
catalogProps.put("catalog-type", "hive");
catalogProps.put(CatalogProperties.URI, CatalogTestBase.getURI(hiveConf));
} else {
catalogProps.put("catalog-type", "hadoop");
}
catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, createWarehouse());

// Create the table properties
Map<String, String> tableProps = createTableProps();

// Create a connector table in an iceberg catalog.
sql("CREATE CATALOG `test_catalog` WITH %s", toWithClause(catalogProps));
try {
assertThatThrownBy(
() ->
sql(
"CREATE TABLE `test_catalog`.`%s`.`%s` (id BIGINT, data STRING) WITH %s",
FlinkCatalogFactory.DEFAULT_DATABASE_NAME,
TABLE_NAME,
toWithClause(tableProps)))
.cause()
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Cannot create the table with 'connector'='iceberg' table property in an iceberg catalog, "
+ "Please create table with 'connector'='iceberg' property in a non-iceberg catalog or "
+ "create table without 'connector'='iceberg' related properties in an iceberg table.");
} finally {
sql("DROP CATALOG IF EXISTS `test_catalog`");
}
}

private Map<String, String> createTableProps() {
Map<String, String> tableProps = Maps.newHashMap(properties);
tableProps.put("catalog-name", catalogName);
Expand Down

0 comments on commit 52bfbdc

Please sign in to comment.