diff --git a/.github/workflows/api-binary-compatibility.yml b/.github/workflows/api-binary-compatibility.yml
index fd7f6f14c2ef..245da4ecd4d1 100644
--- a/.github/workflows/api-binary-compatibility.yml
+++ b/.github/workflows/api-binary-compatibility.yml
@@ -46,7 +46,7 @@ jobs:
#
# See https://github.com/actions/checkout/issues/124
fetch-depth: 0
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 11
diff --git a/.github/workflows/delta-conversion-ci.yml b/.github/workflows/delta-conversion-ci.yml
index 67f5acdb32b2..6fd97e662ab0 100644
--- a/.github/workflows/delta-conversion-ci.yml
+++ b/.github/workflows/delta-conversion-ci.yml
@@ -62,7 +62,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
@@ -74,7 +74,7 @@ jobs:
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
restore-keys: ${{ runner.os }}-gradle-
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- - run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.12 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
+ - run: ./gradlew -DsparkVersions=3.5 -DscalaVersion=2.12 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
- uses: actions/upload-artifact@v3
if: failure()
with:
@@ -91,7 +91,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
@@ -103,7 +103,7 @@ jobs:
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
restore-keys: ${{ runner.os }}-gradle-
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- - run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.13 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
+ - run: ./gradlew -DsparkVersions=3.5 -DscalaVersion=2.13 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
- uses: actions/upload-artifact@v3
if: failure()
with:
diff --git a/.github/workflows/flink-ci.yml b/.github/workflows/flink-ci.yml
index 4ea17af6c0cb..702ae9bc898d 100644
--- a/.github/workflows/flink-ci.yml
+++ b/.github/workflows/flink-ci.yml
@@ -60,12 +60,12 @@ jobs:
strategy:
matrix:
jvm: [8, 11]
- flink: ['1.15', '1.16', '1.17']
+ flink: ['1.16', '1.17', '1.18']
env:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
diff --git a/.github/workflows/hive-ci.yml b/.github/workflows/hive-ci.yml
index d7474a185436..f582e516fcd1 100644
--- a/.github/workflows/hive-ci.yml
+++ b/.github/workflows/hive-ci.yml
@@ -60,7 +60,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
@@ -86,7 +86,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 8
diff --git a/.github/workflows/java-ci.yml b/.github/workflows/java-ci.yml
index 4c8e3a5b8d92..4936e2b6514b 100644
--- a/.github/workflows/java-ci.yml
+++ b/.github/workflows/java-ci.yml
@@ -57,7 +57,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
@@ -81,7 +81,7 @@ jobs:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 8
@@ -91,7 +91,7 @@ jobs:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 8
diff --git a/.github/workflows/jmh-benchmarks.yml b/.github/workflows/jmh-benchmarks.yml
index f2d6c659f61e..e76590543852 100644
--- a/.github/workflows/jmh-benchmarks.yml
+++ b/.github/workflows/jmh-benchmarks.yml
@@ -79,7 +79,7 @@ jobs:
with:
repository: ${{ github.event.inputs.repo }}
ref: ${{ github.event.inputs.ref }}
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 11
diff --git a/.github/workflows/open-api.yml b/.github/workflows/open-api.yml
index ae18c646e591..397d1bf30e4a 100644
--- a/.github/workflows/open-api.yml
+++ b/.github/workflows/open-api.yml
@@ -40,7 +40,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- - uses: actions/setup-python@v4
+ - uses: actions/setup-python@v5
with:
python-version: 3.9
- name: Install
diff --git a/.github/workflows/publish-snapshot.yml b/.github/workflows/publish-snapshot.yml
index 92c6b2b09223..c2809cfdcbac 100644
--- a/.github/workflows/publish-snapshot.yml
+++ b/.github/workflows/publish-snapshot.yml
@@ -34,7 +34,7 @@ jobs:
with:
# we need to fetch all tags so that getProjectVersion() in build.gradle correctly determines the next SNAPSHOT version from the newest tag
fetch-depth: 0
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 8
diff --git a/.github/workflows/recurring-jmh-benchmarks.yml b/.github/workflows/recurring-jmh-benchmarks.yml
index e1f750041890..434b79755069 100644
--- a/.github/workflows/recurring-jmh-benchmarks.yml
+++ b/.github/workflows/recurring-jmh-benchmarks.yml
@@ -49,7 +49,7 @@ jobs:
with:
repository: ${{ github.event.inputs.repo }}
ref: ${{ github.event.inputs.ref }}
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 11
diff --git a/.github/workflows/spark-ci.yml b/.github/workflows/spark-ci.yml
index a05c9aa19e29..3dacdf77c1a8 100644
--- a/.github/workflows/spark-ci.yml
+++ b/.github/workflows/spark-ci.yml
@@ -63,7 +63,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
@@ -93,7 +93,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
@@ -123,7 +123,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 17
diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml
index 566ae2441ea0..3c98621c2beb 100644
--- a/.github/workflows/stale.yml
+++ b/.github/workflows/stale.yml
@@ -31,7 +31,7 @@ jobs:
if: github.repository_owner == 'apache'
runs-on: ubuntu-22.04
steps:
- - uses: actions/stale@v8.0.0
+ - uses: actions/stale@v9.0.0
with:
stale-issue-label: 'stale'
exempt-issue-labels: 'not-stale'
diff --git a/api/src/main/java/org/apache/iceberg/StreamingUpdate.java b/api/src/main/java/org/apache/iceberg/StreamingUpdate.java
new file mode 100644
index 000000000000..01f2f4e79a1f
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/StreamingUpdate.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.exceptions.ValidationException;
+
+/**
+ * API for appending sequential updates to a table
+ *
+ *
This API accumulates batches of file additions and deletions by order, produces a new {@link
+ * Snapshot} of the changes where each batch is added to a new data sequence number, and commits
+ * that snapshot as the current.
+ *
+ *
When committing, these changes will be applied to the latest table snapshot. Commit conflicts
+ * will be resolved by applying the changes to the new latest snapshot and reattempting the commit.
+ * If any of the deleted files are no longer in the latest snapshot when reattempting, the commit
+ * will throw a {@link ValidationException}.
+ */
+public interface StreamingUpdate extends SnapshotUpdate {
+ /**
+ * Remove a data file from the current table state.
+ *
+ * This rewrite operation may change the size or layout of the data files. When applicable, it
+ * is also recommended to discard already deleted records while rewriting data files. However, the
+ * set of live data records must never change.
+ *
+ * @param dataFile a rewritten data file
+ * @return this for method chaining
+ */
+ default StreamingUpdate deleteFile(DataFile dataFile) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " does not implement deleteFile");
+ }
+
+ /**
+ * Remove a delete file from the table state.
+ *
+ *
This rewrite operation may change the size or layout of the delete files. When applicable,
+ * it is also recommended to discard delete records for files that are no longer part of the table
+ * state. However, the set of applicable delete records must never change.
+ *
+ * @param deleteFile a rewritten delete file
+ * @return this for method chaining
+ */
+ default StreamingUpdate deleteFile(DeleteFile deleteFile) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " does not implement deleteFile");
+ }
+
+ /**
+ * Add a new data file to a specific. All files in this batch will receive the same data sequence
+ * number.
+ *
+ *
This rewrite operation may change the size or layout of the data files. When applicable, it
+ * is also recommended to discard already deleted records while rewriting data files. However, the
+ * set of live data records must never change.
+ *
+ * @param dataFile a new data file
+ * @param batchOrdinal The batch ordinal to associate with this data file
+ * @return this for method chaining
+ */
+ default StreamingUpdate addFile(DataFile dataFile, int batchOrdinal) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " does not implement addFile");
+ }
+
+ /**
+ * Add a new delete file to a specific batch. All files in this batch will receive the same data
+ * sequence number.
+ *
+ *
This rewrite operation may change the size or layout of the delete files. When applicable,
+ * it is also recommended to discard delete records for files that are no longer part of the table
+ * state. However, the set of applicable delete records must never change.
+ *
+ * @param deleteFile a new delete file
+ * @param batchOrdinal The batch ordinal to associate with this data file
+ * @return this for method chaining
+ */
+ default StreamingUpdate addFile(DeleteFile deleteFile, int batchOrdinal) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " does not implement addFile");
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java
index 5ab1def51ca0..7683f1d59d84 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -233,6 +233,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();
+ /**
+ * Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
+ * table.
+ *
+ * @return a new {@link StreamingUpdate}
+ */
+ default StreamingUpdate newStreamingUpdate() {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " doesn't implement newStreamingUpdate()");
+ }
+
/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this table
* and commit.
diff --git a/api/src/main/java/org/apache/iceberg/Transaction.java b/api/src/main/java/org/apache/iceberg/Transaction.java
index aeec1f589d06..85d0267bd379 100644
--- a/api/src/main/java/org/apache/iceberg/Transaction.java
+++ b/api/src/main/java/org/apache/iceberg/Transaction.java
@@ -95,6 +95,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();
+ /**
+ * Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
+ * table.
+ *
+ * @return a new {@link StreamingUpdate}
+ */
+ default StreamingUpdate newStreamingUpdate() {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " doesn't implement newStreamingUpdate()");
+ }
+
/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this
* table.
diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index cefd765a91b5..fcb528caba96 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -360,6 +360,52 @@ public int hashCode() {
}
}
+ // similar to Row but has its own hashCode() and equals() implementations
+ // it is useful for testing custom collections that rely on wrappers
+ public static class CustomRow implements StructLike {
+ public static CustomRow of(Object... values) {
+ return new CustomRow(values);
+ }
+
+ private final Object[] values;
+
+ private CustomRow(Object... values) {
+ this.values = values;
+ }
+
+ @Override
+ public int size() {
+ return values.length;
+ }
+
+ @Override
+ public T get(int pos, Class javaClass) {
+ return javaClass.cast(values[pos]);
+ }
+
+ @Override
+ public void set(int pos, T value) {
+ values[pos] = value;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ } else if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ CustomRow that = (CustomRow) other;
+ return Arrays.equals(values, that.values);
+ }
+
+ @Override
+ public int hashCode() {
+ return 17 * Arrays.hashCode(values);
+ }
+ }
+
public static class TestFieldSummary implements ManifestFile.PartitionFieldSummary {
private final boolean containsNull;
private final Boolean containsNaN;
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3SignRequestParser.java b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3SignRequestParser.java
index a6928183f705..2c5f74ad8064 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3SignRequestParser.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3SignRequestParser.java
@@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import java.net.URI;
import java.util.Arrays;
+import java.util.Collections;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -122,9 +123,9 @@ public void roundTripSerde() {
"amz-sdk-request",
Arrays.asList("attempt=1", "max=4"),
"Content-Length",
- Arrays.asList("191"),
+ Collections.singletonList("191"),
"Content-Type",
- Arrays.asList("application/json"),
+ Collections.singletonList("application/json"),
"User-Agent",
Arrays.asList("aws-sdk-java/2.20.18", "Linux/5.4.0-126")))
.build();
@@ -158,9 +159,9 @@ public void roundTripSerdeWithProperties() {
"amz-sdk-request",
Arrays.asList("attempt=1", "max=4"),
"Content-Length",
- Arrays.asList("191"),
+ Collections.singletonList("191"),
"Content-Type",
- Arrays.asList("application/json"),
+ Collections.singletonList("application/json"),
"User-Agent",
Arrays.asList("aws-sdk-java/2.20.18", "Linux/5.4.0-126")))
.properties(ImmutableMap.of("k1", "v1"))
@@ -198,9 +199,9 @@ public void roundTripWithBody() {
"amz-sdk-request",
Arrays.asList("attempt=1", "max=4"),
"Content-Length",
- Arrays.asList("191"),
+ Collections.singletonList("191"),
"Content-Type",
- Arrays.asList("application/json"),
+ Collections.singletonList("application/json"),
"User-Agent",
Arrays.asList("aws-sdk-java/2.20.18", "Linux/5.4.0-126")))
.properties(ImmutableMap.of("k1", "v1"))
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3SignResponseParser.java b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3SignResponseParser.java
index d7337b1b1777..d2cf132ba598 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3SignResponseParser.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3SignResponseParser.java
@@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import java.net.URI;
import java.util.Arrays;
+import java.util.Collections;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -70,9 +71,9 @@ public void roundTripSerde() {
"amz-sdk-request",
Arrays.asList("attempt=1", "max=4"),
"Content-Length",
- Arrays.asList("191"),
+ Collections.singletonList("191"),
"Content-Type",
- Arrays.asList("application/json"),
+ Collections.singletonList("application/json"),
"User-Agent",
Arrays.asList("aws-sdk-java/2.20.18", "Linux/5.4.0-126")))
.build();
diff --git a/build.gradle b/build.gradle
index 94996a41a648..3f76cbea02bf 100644
--- a/build.gradle
+++ b/build.gradle
@@ -577,11 +577,11 @@ project(':iceberg-delta-lake') {
exclude group: 'com.google.code.gson', module: 'gson'
}
- // The newest version of delta-core uses Spark 3.3.*. Since its only for test, we do
+ // The newest version of delta-core uses Spark 3.5.*. Since its only for test, we do
// not need to include older version of delta-core
- if (sparkVersions.contains("3.3")) {
- integrationImplementation "io.delta:delta-core_${scalaVersion}:${libs.versions.delta.core.get()}"
- integrationImplementation project(path: ":iceberg-spark:iceberg-spark-3.3_${scalaVersion}")
+ if (sparkVersions.contains("3.5")) {
+ integrationImplementation "io.delta:delta-spark_${scalaVersion}:${libs.versions.delta.spark.get()}"
+ integrationImplementation project(path: ":iceberg-spark:iceberg-spark-3.5_${scalaVersion}")
integrationImplementation(libs.hadoop2.minicluster) {
exclude group: 'org.apache.avro', module: 'avro'
// to make sure netty libs only come from project(':iceberg-arrow')
@@ -590,7 +590,7 @@ project(':iceberg-delta-lake') {
}
integrationImplementation project(path: ':iceberg-hive-metastore')
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
- integrationImplementation("org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark.hive33.get()}") {
+ integrationImplementation("org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark.hive35.get()}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
exclude group: 'org.apache.parquet'
@@ -602,9 +602,9 @@ project(':iceberg-delta-lake') {
}
}
- // The newest version of delta-core uses Spark 3.3.*. The integration test should only be built
- // if iceberg-spark-3.3 is available
- if (sparkVersions.contains("3.3")) {
+ // The newest version of delta-core uses Spark 3.5.*. The integration test should only be built
+ // if iceberg-spark-3.5 is available
+ if (sparkVersions.contains("3.5")) {
sourceSets {
integration {
java.srcDir "$projectDir/src/integration/java"
@@ -615,6 +615,7 @@ project(':iceberg-delta-lake') {
}
task integrationTest(type: Test) {
+ useJUnitPlatform()
testClassesDirs = sourceSets.integration.output.classesDirs
classpath = sourceSets.integration.runtimeClasspath
jvmArgs += project.property('extraJvmArgs')
diff --git a/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java b/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java
index 4acbf2a16396..f4b015c16053 100644
--- a/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java
@@ -66,6 +66,11 @@ public RewriteFiles newRewrite() {
throw new UnsupportedOperationException("Cannot rewrite in a " + descriptor + " table");
}
+ @Override
+ public StreamingUpdate newStreamingUpdate() {
+ throw new UnsupportedOperationException("Cannot update a " + descriptor + " table");
+ }
+
@Override
public RewriteManifests rewriteManifests() {
throw new UnsupportedOperationException(
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index 87768e34894a..c70dda2bd6d0 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -22,8 +22,8 @@
import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -247,7 +247,7 @@ private void performRewrite(List currentManifests) {
rewrittenManifests.add(manifest);
try (ManifestReader reader =
ManifestFiles.read(manifest, ops.io(), ops.current().specsById())
- .select(Arrays.asList("*"))) {
+ .select(Collections.singletonList("*"))) {
reader
.liveEntries()
.forEach(
diff --git a/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java b/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java
new file mode 100644
index 000000000000..8ef34afb18ec
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Comparator;
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+class BaseStreamingUpdate extends MergingSnapshotProducer
+ implements StreamingUpdate {
+ private final List batches = Lists.newArrayList();
+
+ BaseStreamingUpdate(String tableName, TableOperations ops) {
+ super(tableName, ops);
+ }
+
+ @Override
+ protected BaseStreamingUpdate self() {
+ return this;
+ }
+
+ @Override
+ protected String operation() {
+ return DataOperations.OVERWRITE;
+ }
+
+ @Override
+ public List apply(TableMetadata base, Snapshot snapshot) {
+ long startingSequenceNumber = base.nextSequenceNumber();
+ batches.sort(Comparator.comparingInt(o -> o.ordinal));
+ for (Batch batch : batches) {
+ long dataSequenceNumber = startingSequenceNumber + batch.ordinal + 1;
+ batch.newDataFiles.forEach(f -> add(f, dataSequenceNumber));
+ batch.newDeleteFiles.forEach(f -> add(f, dataSequenceNumber));
+ }
+ return super.apply(base, snapshot);
+ }
+
+ @Override
+ public StreamingUpdate addFile(DataFile dataFile, int batchOrdinal) {
+ return StreamingUpdate.super.addFile(dataFile, batchOrdinal);
+ }
+
+ @Override
+ public StreamingUpdate addFile(DeleteFile deleteFile, int batchOrdinal) {
+ return StreamingUpdate.super.addFile(deleteFile, batchOrdinal);
+ }
+
+ @Override
+ public BaseStreamingUpdate toBranch(String branch) {
+ targetBranch(branch);
+ return this;
+ }
+
+ private static class Batch {
+ private final List newDataFiles = Lists.newArrayList();
+ private final List newDeleteFiles = Lists.newArrayList();
+ private final int ordinal;
+
+ /**
+ * Creates a new set of updates to a specific batch
+ *
+ * @param ordinal the batch ordinal
+ */
+ Batch(int ordinal) {
+ this.ordinal = ordinal;
+ }
+
+ public List getNewDataFiles() {
+ return newDataFiles;
+ }
+
+ public List getNewDeleteFiles() {
+ return newDeleteFiles;
+ }
+
+ public int getOrdinal() {
+ return ordinal;
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java
index d4cf1f4b76f4..ccc018768347 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -190,6 +190,11 @@ public RewriteFiles newRewrite() {
return new BaseRewriteFiles(name, ops).reportWith(reporter);
}
+ @Override
+ public StreamingUpdate newStreamingUpdate() {
+ return new BaseStreamingUpdate(name, ops).reportWith(reporter);
+ }
+
@Override
public RewriteManifests rewriteManifests() {
return new BaseRewriteManifests(ops).reportWith(reporter);
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 018f70eb16fa..cf263b755002 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -45,6 +45,7 @@
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;
@@ -192,6 +193,16 @@ public RewriteFiles newRewrite() {
return rewrite;
}
+ @Override
+ public StreamingUpdate newStreamingUpdate() {
+ checkLastOperationCommitted("StreamingUpdate");
+ StreamingUpdate streamingUpdate =
+ new BaseStreamingUpdate(tableName, transactionOps).reportWith(reporter);
+ streamingUpdate.deleteWith(enqueueDelete);
+ updates.add(streamingUpdate);
+ return streamingUpdate;
+ }
+
@Override
public RewriteManifests rewriteManifests() {
checkLastOperationCommitted("RewriteManifests");
@@ -446,16 +457,20 @@ private void commitSimpleTransaction() {
}
Set committedFiles = committedFiles(ops, newSnapshots);
- // delete all of the files that were deleted in the most recent set of operation commits
- Tasks.foreach(deletedFiles)
- .suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
- .run(
- path -> {
- if (committedFiles == null || !committedFiles.contains(path)) {
- ops.io().deleteFile(path);
- }
- });
+ if (committedFiles != null) {
+ // delete all of the files that were deleted in the most recent set of operation commits
+ Tasks.foreach(deletedFiles)
+ .suppressFailureWhenFinished()
+ .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
+ .run(
+ path -> {
+ if (!committedFiles.contains(path)) {
+ ops.io().deleteFile(path);
+ }
+ });
+ } else {
+ LOG.warn("Failed to load metadata for a committed snapshot, skipping clean-up");
+ }
} catch (RuntimeException e) {
LOG.warn("Failed to load committed metadata, skipping clean-up", e);
@@ -502,9 +517,11 @@ private void applyUpdates(TableOperations underlyingOps) {
}
}
+ // committedFiles returns null whenever the set of committed files
+ // cannot be determined from the provided snapshots
private static Set committedFiles(TableOperations ops, Set snapshotIds) {
if (snapshotIds.isEmpty()) {
- return null;
+ return ImmutableSet.of();
}
Set committedFiles = Sets.newHashSet();
@@ -696,6 +713,11 @@ public RewriteFiles newRewrite() {
return BaseTransaction.this.newRewrite();
}
+ @Override
+ public StreamingUpdate newStreamingUpdate() {
+ return BaseTransaction.this.newStreamingUpdate();
+ }
+
@Override
public RewriteManifests rewriteManifests() {
return BaseTransaction.this.rewriteManifests();
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 1dcfa6d3d41d..632958e242dd 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
@@ -83,9 +84,9 @@ abstract class MergingSnapshotProducer extends SnapshotProducer {
private final boolean snapshotIdInheritanceEnabled;
// update data
- private final List newDataFiles = Lists.newArrayList();
+ private final List> newDataFiles = Lists.newArrayList();
private Long newDataFilesDataSequenceNumber;
- private final Map> newDeleteFilesBySpec = Maps.newHashMap();
+ private final Map>> newDeleteFilesBySpec = Maps.newHashMap();
private final List appendManifests = Lists.newArrayList();
private final List rewrittenAppendManifests = Lists.newArrayList();
private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder();
@@ -158,7 +159,8 @@ protected Expression rowFilter() {
}
protected List addedDataFiles() {
- return ImmutableList.copyOf(newDataFiles);
+ return ImmutableList.copyOf(
+ newDataFiles.stream().map(FileHolder::file).collect(Collectors.toList()));
}
protected void failAnyDelete() {
@@ -228,31 +230,41 @@ protected boolean addsDeleteFiles() {
/** Add a data file to the new snapshot. */
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
- setDataSpec(file);
- addedFilesSummary.addedFile(dataSpec(), file);
+ addDataFile(new FileHolder<>(file));
+ }
+
+ /** Add a data file to the new snapshot. */
+ protected void add(DataFile file, long dataSequenceNumber) {
+ Preconditions.checkNotNull(file, "Invalid data file: null");
+ addDataFile(new FileHolder<>(file, dataSequenceNumber));
+ }
+
+ private void addDataFile(FileHolder dataFile) {
+ setDataSpec(dataFile.file());
+ addedFilesSummary.addedFile(dataSpec(), dataFile.file());
hasNewDataFiles = true;
- newDataFiles.add(file);
+ newDataFiles.add(dataFile);
}
/** Add a delete file to the new snapshot. */
protected void add(DeleteFile file) {
Preconditions.checkNotNull(file, "Invalid delete file: null");
- add(new DeleteFileHolder(file));
+ addDelete(new FileHolder<>(file));
}
/** Add a delete file to the new snapshot. */
protected void add(DeleteFile file, long dataSequenceNumber) {
Preconditions.checkNotNull(file, "Invalid delete file: null");
- add(new DeleteFileHolder(file, dataSequenceNumber));
+ addDelete(new FileHolder<>(file, dataSequenceNumber));
}
- private void add(DeleteFileHolder fileHolder) {
- int specId = fileHolder.deleteFile().specId();
+ private void addDelete(FileHolder fileHolder) {
+ int specId = fileHolder.file().specId();
PartitionSpec fileSpec = ops.current().spec(specId);
- List deleteFiles =
+ List> deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList());
deleteFiles.add(fileHolder);
- addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
+ addedFilesSummary.addedFile(fileSpec, fileHolder.file());
hasNewDeleteFiles = true;
}
@@ -960,9 +972,23 @@ private List newDataFilesAsManifests() {
RollingManifestWriter writer = newRollingManifestWriter(dataSpec());
try {
if (newDataFilesDataSequenceNumber == null) {
- newDataFiles.forEach(writer::add);
+ newDataFiles.forEach(
+ f -> {
+ if (f.dataSequenceNumber() == null) {
+ writer.add(f.file());
+ } else {
+ writer.add(f.file(), f.dataSequenceNumber);
+ }
+ });
} else {
- newDataFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber));
+ newDataFiles.forEach(
+ f -> {
+ if (f.dataSequenceNumber() == null) {
+ writer.add(f.file(), newDataFilesDataSequenceNumber);
+ } else {
+ writer.add(f.file(), f.dataSequenceNumber);
+ }
+ });
}
} finally {
writer.close();
@@ -1006,9 +1032,9 @@ private List newDeleteFilesAsManifests() {
deleteFiles.forEach(
df -> {
if (df.dataSequenceNumber() != null) {
- writer.add(df.deleteFile(), df.dataSequenceNumber());
+ writer.add(df.file(), df.dataSequenceNumber());
} else {
- writer.add(df.deleteFile());
+ writer.add(df.file());
}
});
} finally {
@@ -1132,33 +1158,33 @@ protected ManifestReader newManifestReader(ManifestFile manifest) {
}
}
- private static class DeleteFileHolder {
- private final DeleteFile deleteFile;
+ private static class FileHolder> {
+ private final T file;
private final Long dataSequenceNumber;
/**
* Wrap a delete file for commit with a given data sequence number
*
- * @param deleteFile delete file
+ * @param file content file
* @param dataSequenceNumber data sequence number to apply
*/
- DeleteFileHolder(DeleteFile deleteFile, long dataSequenceNumber) {
- this.deleteFile = deleteFile;
+ FileHolder(T file, long dataSequenceNumber) {
+ this.file = file;
this.dataSequenceNumber = dataSequenceNumber;
}
/**
* Wrap a delete file for commit with the latest sequence number
*
- * @param deleteFile delete file
+ * @param file the content fle
*/
- DeleteFileHolder(DeleteFile deleteFile) {
- this.deleteFile = deleteFile;
+ FileHolder(T file) {
+ this.file = file;
this.dataSequenceNumber = null;
}
- public DeleteFile deleteFile() {
- return deleteFile;
+ public T file() {
+ return file;
}
public Long dataSequenceNumber() {
diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java
index 778a993c5144..b1fbed43b626 100644
--- a/core/src/main/java/org/apache/iceberg/SerializableTable.java
+++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -331,6 +331,11 @@ public RewriteFiles newRewrite() {
throw new UnsupportedOperationException(errorMsg("newRewrite"));
}
+ @Override
+ public StreamingUpdate newStreamingUpdate() {
+ throw new UnsupportedOperationException("newStreamingWrite");
+ }
+
@Override
public RewriteManifests rewriteManifests() {
throw new UnsupportedOperationException(errorMsg("rewriteManifests"));
diff --git a/core/src/main/java/org/apache/iceberg/UpdateRequirement.java b/core/src/main/java/org/apache/iceberg/UpdateRequirement.java
index 80ecf84efa45..fc1f55311175 100644
--- a/core/src/main/java/org/apache/iceberg/UpdateRequirement.java
+++ b/core/src/main/java/org/apache/iceberg/UpdateRequirement.java
@@ -25,7 +25,10 @@
/** Represents a requirement for a {@link MetadataUpdate} */
public interface UpdateRequirement {
- void validate(TableMetadata base);
+ default void validate(TableMetadata base) {
+ throw new ValidationException(
+ "Cannot validate %s against a table", this.getClass().getSimpleName());
+ }
default void validate(ViewMetadata base) {
throw new ValidationException(
@@ -62,12 +65,25 @@ public void validate(TableMetadata base) {
"Requirement failed: UUID does not match: expected %s != %s", base.uuid(), uuid);
}
}
+ }
+
+ class AssertViewUUID implements UpdateRequirement {
+ private final String uuid;
+
+ public AssertViewUUID(String uuid) {
+ Preconditions.checkArgument(uuid != null, "Invalid required UUID: null");
+ this.uuid = uuid;
+ }
+
+ public String uuid() {
+ return uuid;
+ }
@Override
public void validate(ViewMetadata base) {
if (!uuid.equalsIgnoreCase(base.uuid())) {
throw new CommitFailedException(
- "Requirement failed: UUID does not match: expected %s != %s", base.uuid(), uuid);
+ "Requirement failed: view UUID does not match: expected %s != %s", base.uuid(), uuid);
}
}
}
diff --git a/core/src/main/java/org/apache/iceberg/UpdateRequirementParser.java b/core/src/main/java/org/apache/iceberg/UpdateRequirementParser.java
index 091d4f1fc58c..5c4dc2221290 100644
--- a/core/src/main/java/org/apache/iceberg/UpdateRequirementParser.java
+++ b/core/src/main/java/org/apache/iceberg/UpdateRequirementParser.java
@@ -35,6 +35,7 @@ private UpdateRequirementParser() {}
// assertion types
static final String ASSERT_TABLE_UUID = "assert-table-uuid";
+ static final String ASSERT_VIEW_UUID = "assert-view-uuid";
static final String ASSERT_TABLE_DOES_NOT_EXIST = "assert-create";
static final String ASSERT_REF_SNAPSHOT_ID = "assert-ref-snapshot-id";
static final String ASSERT_LAST_ASSIGNED_FIELD_ID = "assert-last-assigned-field-id";
@@ -68,6 +69,7 @@ private UpdateRequirementParser() {}
private static final Map, String> TYPES =
ImmutableMap., String>builder()
.put(UpdateRequirement.AssertTableUUID.class, ASSERT_TABLE_UUID)
+ .put(UpdateRequirement.AssertViewUUID.class, ASSERT_VIEW_UUID)
.put(UpdateRequirement.AssertTableDoesNotExist.class, ASSERT_TABLE_DOES_NOT_EXIST)
.put(UpdateRequirement.AssertRefSnapshotID.class, ASSERT_REF_SNAPSHOT_ID)
.put(UpdateRequirement.AssertLastAssignedFieldId.class, ASSERT_LAST_ASSIGNED_FIELD_ID)
@@ -101,6 +103,9 @@ public static void toJson(UpdateRequirement updateRequirement, JsonGenerator gen
case ASSERT_TABLE_UUID:
writeAssertTableUUID((UpdateRequirement.AssertTableUUID) updateRequirement, generator);
break;
+ case ASSERT_VIEW_UUID:
+ writeAssertViewUUID((UpdateRequirement.AssertViewUUID) updateRequirement, generator);
+ break;
case ASSERT_REF_SNAPSHOT_ID:
writeAssertRefSnapshotId(
(UpdateRequirement.AssertRefSnapshotID) updateRequirement, generator);
@@ -159,6 +164,8 @@ public static UpdateRequirement fromJson(JsonNode jsonNode) {
return readAssertTableDoesNotExist(jsonNode);
case ASSERT_TABLE_UUID:
return readAssertTableUUID(jsonNode);
+ case ASSERT_VIEW_UUID:
+ return readAssertViewUUID(jsonNode);
case ASSERT_REF_SNAPSHOT_ID:
return readAssertRefSnapshotId(jsonNode);
case ASSERT_LAST_ASSIGNED_FIELD_ID:
@@ -182,6 +189,11 @@ private static void writeAssertTableUUID(
gen.writeStringField(UUID, requirement.uuid());
}
+ private static void writeAssertViewUUID(
+ UpdateRequirement.AssertViewUUID requirement, JsonGenerator gen) throws IOException {
+ gen.writeStringField(UUID, requirement.uuid());
+ }
+
private static void writeAssertRefSnapshotId(
UpdateRequirement.AssertRefSnapshotID requirement, JsonGenerator gen) throws IOException {
gen.writeStringField(NAME, requirement.refName());
@@ -231,6 +243,11 @@ private static UpdateRequirement readAssertTableUUID(JsonNode node) {
return new UpdateRequirement.AssertTableUUID(uuid);
}
+ private static UpdateRequirement readAssertViewUUID(JsonNode node) {
+ String uuid = JsonUtil.getString(UUID, node);
+ return new UpdateRequirement.AssertViewUUID(uuid);
+ }
+
private static UpdateRequirement readAssertRefSnapshotId(JsonNode node) {
String name = JsonUtil.getString(NAME, node);
Long snapshotId = JsonUtil.getLongOrNull(SNAPSHOT_ID, node);
diff --git a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java
index 8a7a761ff2c1..6a5d07d7813d 100644
--- a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java
+++ b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java
@@ -23,6 +23,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.view.ViewMetadata;
public class UpdateRequirements {
@@ -56,6 +57,16 @@ public static List forUpdateTable(
return builder.build();
}
+ public static List forReplaceView(
+ ViewMetadata base, List metadataUpdates) {
+ Preconditions.checkArgument(null != base, "Invalid view metadata: null");
+ Preconditions.checkArgument(null != metadataUpdates, "Invalid metadata updates: null");
+ Builder builder = new Builder(null, false);
+ builder.require(new UpdateRequirement.AssertViewUUID(base.uuid()));
+ metadataUpdates.forEach(builder::update);
+ return builder.build();
+ }
+
private static class Builder {
private final TableMetadata base;
private final ImmutableList.Builder requirements = ImmutableList.builder();
diff --git a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java
index 846820a99d9f..d4083420efa6 100644
--- a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java
@@ -202,6 +202,9 @@ public ErrorResponse parseResponse(int code, String json) {
public void accept(ErrorResponse error) {
switch (error.code()) {
case 400:
+ if (IllegalArgumentException.class.getSimpleName().equals(error.type())) {
+ throw new IllegalArgumentException(error.message());
+ }
throw new BadRequestException("Malformed request: %s", error.message());
case 401:
throw new NotAuthorizedException("Not authorized: %s", error.message());
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index 5a55afbfce22..5f660f0f4fe8 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -55,6 +55,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableCommit;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
@@ -91,6 +92,7 @@
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.iceberg.util.EnvironmentUtil;
+import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.ThreadPools;
import org.apache.iceberg.view.BaseView;
@@ -122,6 +124,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private final Function