Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass-through IcebergIO catalog properties #31726

Merged
merged 9 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run"
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
}
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@
## New Features / Improvements

* Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)).
* [IcebergIO] All specified catalog properties are passed through to the connector ([#31726](https://github.com/apache/beam/pull/31726))

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* [IcebergIO] IcebergCatalogConfig was changed to support specifying catalog properties in a key-store fashion ([#31726](https://github.com/apache/beam/pull/31726))

## Deprecations

Expand Down
2 changes: 1 addition & 1 deletion sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ dependencies {
permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
provided library.java.json_org
implementation library.java.everit_json_schema
implementation library.java.snake_yaml
shadow library.java.snake_yaml
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid of this getting changed again. Is it necessarily part of this change? Is it safe now?

shadowTest library.java.everit_json_schema
provided library.java.junit
testImplementation "com.github.stefanbirkner:system-rules:1.19.0"
Expand Down
11 changes: 7 additions & 4 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.util.stream.Collectors
plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.iceberg',
shadowClosure: {},
validateShadowJar: false,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why false? I think we want to validate it. Otherwise we end up having duplicate classes in multiple jars. Basically we want everything in the jar to be in our namespace, not any other namespace.

)

description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg"
Expand Down Expand Up @@ -54,11 +56,12 @@ dependencies {
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation library.java.hadoop_common
// Hadoop GCS filesystem dependencies
runtimeOnly library.java.hadoop_client
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm OK with this... it isn't very Java-like to put optional dependencies as mandatory dependencies though. Typically these should also be resolvable via transitive dependencies if we depend on e.g. GCS core which provides the GCS filesystem.

runtimeOnly library.java.bigdataoss_gcsio
runtimeOnly library.java.bigdataoss_gcs_connector
runtimeOnly library.java.bigdataoss_util_hadoop

testImplementation library.java.hadoop_client
testImplementation library.java.bigdataoss_gcsio
testImplementation library.java.bigdataoss_gcs_connector
testImplementation library.java.bigdataoss_util_hadoop
testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version"
testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version"
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,214 +19,35 @@

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import java.util.Properties;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.checkerframework.dataflow.qual.Pure;

@AutoValue
public abstract class IcebergCatalogConfig implements Serializable {

@Pure
public abstract String getName();

/* Core Properties */
@Pure
public abstract @Nullable String getIcebergCatalogType();

@Pure
public abstract @Nullable String getCatalogImplementation();

@Pure
public abstract @Nullable String getFileIOImplementation();

@Pure
public abstract @Nullable String getWarehouseLocation();

@Pure
public abstract @Nullable String getMetricsReporterImplementation();

/* Caching */
@Pure
public abstract boolean getCacheEnabled();

@Pure
public abstract boolean getCacheCaseSensitive();

@Pure
public abstract long getCacheExpirationIntervalMillis();

@Pure
public abstract boolean getIOManifestCacheEnabled();

@Pure
public abstract long getIOManifestCacheExpirationIntervalMillis();

@Pure
public abstract long getIOManifestCacheMaxTotalBytes();

@Pure
public abstract long getIOManifestCacheMaxContentLength();

@Pure
public abstract @Nullable String getUri();

@Pure
public abstract int getClientPoolSize();

@Pure
public abstract long getClientPoolEvictionIntervalMs();

@Pure
public abstract @Nullable String getClientPoolCacheKeys();

@Pure
public abstract @Nullable String getLockImplementation();

@Pure
public abstract long getLockHeartbeatIntervalMillis();

@Pure
public abstract long getLockHeartbeatTimeoutMillis();

@Pure
public abstract int getLockHeartbeatThreads();

@Pure
public abstract long getLockAcquireIntervalMillis();

@Pure
public abstract long getLockAcquireTimeoutMillis();

@Pure
public abstract @Nullable String getAppIdentifier();

@Pure
public abstract @Nullable String getUser();

@Pure
public abstract long getAuthSessionTimeoutMillis();
public abstract String getCatalogName();

@Pure
public abstract @Nullable Configuration getConfiguration();
public abstract Properties getProperties();

@Pure
public static Builder builder() {
return new AutoValue_IcebergCatalogConfig.Builder()
.setIcebergCatalogType(null)
.setCatalogImplementation(null)
.setFileIOImplementation(null)
.setWarehouseLocation(null)
.setMetricsReporterImplementation(null) // TODO: Set this to our implementation
.setCacheEnabled(CatalogProperties.CACHE_ENABLED_DEFAULT)
.setCacheCaseSensitive(CatalogProperties.CACHE_CASE_SENSITIVE_DEFAULT)
.setCacheExpirationIntervalMillis(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT)
.setIOManifestCacheEnabled(CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT)
.setIOManifestCacheExpirationIntervalMillis(
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT)
.setIOManifestCacheMaxTotalBytes(
CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT)
.setIOManifestCacheMaxContentLength(
CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT)
.setUri(null)
.setClientPoolSize(CatalogProperties.CLIENT_POOL_SIZE_DEFAULT)
.setClientPoolEvictionIntervalMs(
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT)
.setClientPoolCacheKeys(null)
.setLockImplementation(null)
.setLockHeartbeatIntervalMillis(CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT)
.setLockHeartbeatTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT)
.setLockHeartbeatThreads(CatalogProperties.LOCK_HEARTBEAT_THREADS_DEFAULT)
.setLockAcquireIntervalMillis(CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS_DEFAULT)
.setLockAcquireTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT)
.setAppIdentifier(null)
.setUser(null)
.setAuthSessionTimeoutMillis(CatalogProperties.AUTH_SESSION_TIMEOUT_MS_DEFAULT)
.setConfiguration(null);
}

@Pure
public ImmutableMap<String, String> properties() {
return new PropertyBuilder()
.put(CatalogUtil.ICEBERG_CATALOG_TYPE, getIcebergCatalogType())
.put(CatalogProperties.CATALOG_IMPL, getCatalogImplementation())
.put(CatalogProperties.FILE_IO_IMPL, getFileIOImplementation())
.put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouseLocation())
.put(CatalogProperties.METRICS_REPORTER_IMPL, getMetricsReporterImplementation())
.put(CatalogProperties.CACHE_ENABLED, getCacheEnabled())
.put(CatalogProperties.CACHE_CASE_SENSITIVE, getCacheCaseSensitive())
.put(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, getCacheExpirationIntervalMillis())
.build();
return new AutoValue_IcebergCatalogConfig.Builder();
}

public org.apache.iceberg.catalog.Catalog catalog() {
Configuration conf = getConfiguration();
if (conf == null) {
conf = new Configuration();
}
return CatalogUtil.buildIcebergCatalog(getName(), properties(), conf);
return CatalogUtil.buildIcebergCatalog(
getCatalogName(), Maps.fromProperties(getProperties()), new Configuration());
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setCatalogName(String catalogName);

/* Core Properties */
public abstract Builder setName(String name);

public abstract Builder setIcebergCatalogType(@Nullable String icebergType);

public abstract Builder setCatalogImplementation(@Nullable String catalogImpl);

public abstract Builder setFileIOImplementation(@Nullable String fileIOImpl);

public abstract Builder setWarehouseLocation(@Nullable String warehouse);

public abstract Builder setMetricsReporterImplementation(@Nullable String metricsImpl);

/* Caching */
public abstract Builder setCacheEnabled(boolean cacheEnabled);

public abstract Builder setCacheCaseSensitive(boolean cacheCaseSensitive);

public abstract Builder setCacheExpirationIntervalMillis(long expiration);

public abstract Builder setIOManifestCacheEnabled(boolean enabled);

public abstract Builder setIOManifestCacheExpirationIntervalMillis(long expiration);

public abstract Builder setIOManifestCacheMaxTotalBytes(long bytes);

public abstract Builder setIOManifestCacheMaxContentLength(long length);

public abstract Builder setUri(@Nullable String uri);

public abstract Builder setClientPoolSize(int size);

public abstract Builder setClientPoolEvictionIntervalMs(long interval);

public abstract Builder setClientPoolCacheKeys(@Nullable String keys);

public abstract Builder setLockImplementation(@Nullable String lockImplementation);

public abstract Builder setLockHeartbeatIntervalMillis(long interval);

public abstract Builder setLockHeartbeatTimeoutMillis(long timeout);

public abstract Builder setLockHeartbeatThreads(int threads);

public abstract Builder setLockAcquireIntervalMillis(long interval);

public abstract Builder setLockAcquireTimeoutMillis(long timeout);

public abstract Builder setAppIdentifier(@Nullable String id);

public abstract Builder setUser(@Nullable String user);

public abstract Builder setAuthSessionTimeoutMillis(long timeout);

public abstract Builder setConfiguration(@Nullable Configuration conf);
public abstract Builder setProperties(Properties props);

public abstract IcebergCatalogConfig build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.Config;
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.iceberg.catalog.TableIdentifier;

/**
Expand All @@ -47,7 +49,6 @@ public class IcebergReadSchemaTransformProvider extends TypedSchemaTransformProv

@Override
protected SchemaTransform from(Config configuration) {
configuration.validate();
return new IcebergReadSchemaTransform(configuration);
}

Expand All @@ -68,21 +69,24 @@ public static Builder builder() {
return new AutoValue_IcebergReadSchemaTransformProvider_Config.Builder();
}

@SchemaFieldDescription("Identifier of the Iceberg table to write to.")
public abstract String getTable();

public abstract IcebergSchemaTransformCatalogConfig getCatalogConfig();
@SchemaFieldDescription("Name of the catalog containing the table.")
public abstract String getCatalogName();

@SchemaFieldDescription("Configuration properties used to set up the Iceberg catalog.")
public abstract Map<String, String> getCatalogProperties();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTable(String tables);
public abstract Builder setTable(String table);

public abstract Builder setCatalogConfig(IcebergSchemaTransformCatalogConfig catalogConfig);
public abstract Builder setCatalogName(String catalogName);

public abstract Config build();
}
public abstract Builder setCatalogProperties(Map<String, String> catalogProperties);

public void validate() {
getCatalogConfig().validate();
public abstract Config build();
}
}

Expand All @@ -109,17 +113,13 @@ Row getConfigurationRow() {

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
IcebergSchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig();
Properties properties = new Properties();
properties.putAll(configuration.getCatalogProperties());

IcebergCatalogConfig.Builder catalogBuilder =
IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName());

if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) {
catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType());
}
if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) {
catalogBuilder = catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation());
}
IcebergCatalogConfig.builder()
.setCatalogName(configuration.getCatalogName())
.setProperties(properties);

PCollection<Row> output =
input
Expand Down
Loading
Loading