Skip to content

Commit

Permalink
Merge branch 'master' into addTestUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
stefankandic committed Feb 18, 2025
2 parents 849d9c5 + f2b17b9 commit dd7d642
Show file tree
Hide file tree
Showing 69 changed files with 2,544 additions and 870 deletions.
64 changes: 64 additions & 0 deletions .github/workflows/iceberg_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
name: "Delta Iceberg Latest"
on: [push, pull_request]
jobs:
test:
name: "DIL: Scala ${{ matrix.scala }}"
runs-on: ubuntu-20.04
strategy:
matrix:
# These Scala versions must match those in the build.sbt
scala: [2.12.18, 2.13.13]
env:
SCALA_VERSION: ${{ matrix.scala }}
steps:
- uses: actions/checkout@v3
# TODO we can make this more selective
- uses: technote-space/get-diff-action@v4
id: git-diff
with:
PATTERNS: |
**
.github/workflows/**
!kernel/**
!connectors/**
- name: install java
uses: actions/setup-java@v3
with:
distribution: "zulu"
java-version: "8"
- name: Cache Scala, SBT
uses: actions/cache@v3
with:
path: |
~/.sbt
~/.ivy2
~/.cache/coursier
# Change the key if dependencies are changed. For each key, GitHub Actions will cache the
# the above directories when we use the key for the first time. After that, each run will
# just use the cache. The cache is immutable so we need to use a new key when trying to
# cache new stuff.
key: delta-sbt-cache-spark3.2-scala${{ matrix.scala }}
- name: Install Job dependencies
run: |
sudo apt-get update
sudo apt-get install -y make build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev python-openssl git
sudo apt install libedit-dev
curl -LO https://github.com/bufbuild/buf/releases/download/v1.28.1/buf-Linux-x86_64.tar.gz
mkdir -p ~/buf
tar -xvzf buf-Linux-x86_64.tar.gz -C ~/buf --strip-components 1
rm buf-Linux-x86_64.tar.gz
sudo apt install python3-pip --fix-missing
sudo pip3 install pipenv==2021.5.29
curl https://pyenv.run | bash
export PATH="~/.pyenv/bin:$PATH"
eval "$(pyenv init -)"
eval "$(pyenv virtualenv-init -)"
pyenv install 3.8.18
pyenv global system 3.8.18
pipenv --python 3.8 install
if: steps.git-diff.outputs.diff
- name: Run Scala/Java and Python tests
# when changing TEST_PARALLELISM_COUNT make sure to also change it in spark_master_test.yaml
run: |
TEST_PARALLELISM_COUNT=4 pipenv run python run-tests.py --group iceberg
if: steps.git-diff.outputs.diff
94 changes: 94 additions & 0 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@
- [Reader Requirements for Vacuum Protocol Check](#reader-requirements-for-vacuum-protocol-check)
- [Clustered Table](#clustered-table)
- [Writer Requirements for Clustered Table](#writer-requirements-for-clustered-table)
- [Variant Data Type](#variant-data-type)
- [Variant data in Parquet](#variant-data-in-parquet)
- [Writer Requirements for Variant Type](#writer-requirements-for-variant-type)
- [Reader Requirements for Variant Data Type](#reader-requirements-for-variant-data-type)
- [Compatibility with other Delta Features](#compatibility-with-other-delta-features)
- [Requirements for Writers](#requirements-for-writers)
- [Creation of New Log Entries](#creation-of-new-log-entries)
- [Consistency Between Table Metadata and Data Files](#consistency-between-table-metadata-and-data-files)
Expand Down Expand Up @@ -100,6 +105,7 @@
- [Struct Field](#struct-field)
- [Array Type](#array-type)
- [Map Type](#map-type)
- [Variant Type](#variant-type)
- [Column Metadata](#column-metadata)
- [Example](#example)
- [Checkpoint Schema](#checkpoint-schema)
Expand Down Expand Up @@ -1353,6 +1359,86 @@ The example above converts `configuration` field into JSON format, including esc
}
```


# Variant Data Type

This feature enables support for the `variant` data type, which stores semi-structured data.
The schema serialization method is described in [Schema Serialization Format](#schema-serialization-format).

To support this feature:
- The table must be on Reader Version 3 and Writer Version 7
- The feature `variantType` must exist in the table `protocol`'s `readerFeatures` and `writerFeatures`.

## Example JSON-Encoded Delta Table Schema with Variant types

```
{
"type" : "struct",
"fields" : [ {
"name" : "raw_data",
"type" : "variant",
"nullable" : true,
"metadata" : { }
}, {
"name" : "variant_array",
"type" : {
"type" : "array",
"elementType" : {
"type" : "variant"
},
"containsNull" : false
},
"nullable" : false,
"metadata" : { }
} ]
}
```

## Variant data in Parquet

The Variant data type is represented as two binary encoded values, according to the [Spark Variant binary encoding specification](https://github.com/apache/spark/blob/master/common/variant/README.md).
The two binary values are named `value` and `metadata`.

When writing Variant data to parquet files, the Variant data is written as a single Parquet struct, with the following fields:

Struct field name | Parquet primitive type | Description
-|-|-
value | binary | The binary-encoded Variant value, as described in [Variant binary encoding](https://github.com/apache/spark/blob/master/common/variant/README.md)
metadata | binary | The binary-encoded Variant metadata, as described in [Variant binary encoding](https://github.com/apache/spark/blob/master/common/variant/README.md)

The parquet struct must include the two struct fields `value` and `metadata`.
Supported writers must write the two binary fields, and supported readers must read the two binary fields.

[Variant shredding](https://github.com/apache/parquet-format/blob/master/VariantShredding.md) will be introduced in a separate `variantShredding` table feature. will be introduced later, as a separate `variantShredding` table feature.

## Writer Requirements for Variant Data Type

When Variant type is supported (`writerFeatures` field of a table's `protocol` action contains `variantType`), writers:
- must write a column of type `variant` to parquet as a struct containing the fields `value` and `metadata` and storing values that conform to the [Variant binary encoding specification](https://github.com/apache/spark/blob/master/common/variant/README.md)
- must not write a parquet struct field named `typed_value` to avoid confusion with the field required by [Variant shredding](https://github.com/apache/parquet-format/blob/master/VariantShredding.md) with the same name.

## Reader Requirements for Variant Data Type

When Variant type is supported (`readerFeatures` field of a table's `protocol` action contains `variantType`), readers:
- must recognize and tolerate a `variant` data type in a Delta schema
- must use the correct physical schema (struct-of-binary, with fields `value` and `metadata`) when reading a Variant data type from file
- must make the column available to the engine:
- [Recommended] Expose and interpret the struct-of-binary as a single Variant field in accordance with the [Spark Variant binary encoding specification](https://github.com/apache/spark/blob/master/common/variant/README.md).
- [Alternate] Expose the raw physical struct-of-binary, e.g. if the engine does not support Variant.
- [Alternate] Convert the struct-of-binary to a string, and expose the string representation, e.g. if the engine does not support Variant.

## Compatibility with other Delta Features

Feature | Support for Variant Data Type
-|-
Partition Columns | **Supported:** A Variant column is allowed to be a non-partitioned column of a partitioned table. <br/> **Unsupported:** Variant is not a comparable data type, so it cannot be included in a partition column.
Clustered Tables | **Supported:** A Variant column is allowed to be a non-clustering column of a clustered table. <br/> **Unsupported:** Variant is not a comparable data type, so it cannot be included in a clustering column.
Delta Column Statistics | **Supported:** A Variant column supports the `nullCount` statistic. <br/> **Unsupported:** Variant is not a comparable data type, so a Variant column does not support the `minValues` and `maxValues` statistics.
Generated Columns | **Supported:** A Variant column is allowed to be used as a source in a generated column expression, as long as the Variant type is not the result type of the generated column expression. <br/> **Unsupported:** The Variant data type is not allowed to be the result type of a generated column expression.
Delta CHECK Constraints | **Supported:** A Variant column is allowed to be used for a CHECK constraint expression.
Default Column Values | **Supported:** A Variant column is allowed to have a default column value.
Change Data Feed | **Supported:** A table using the Variant data type is allowed to enable the Delta Change Data Feed.

# In-Commit Timestamps

The In-Commit Timestamps writer feature strongly associates a monotonically increasing timestamp with each commit by storing it in the commit's metadata.
Expand Down Expand Up @@ -1965,6 +2051,14 @@ type| Always the string "map".
keyType| The type of element used for the key of this map, represented as a string containing the name of a primitive type, a struct definition, an array definition or a map definition
valueType| The type of element used for the key of this map, represented as a string containing the name of a primitive type, a struct definition, an array definition or a map definition

### Variant Type

Variant data uses the Delta type name `variant` for Delta schema serialization.

Field Name | Description
-|-
type | Always the string "variant"

### Column Metadata
A column metadata stores various information about the column.
For example, this MAY contain some keys like [`delta.columnMapping`](#column-mapping) or [`delta.generationExpression`](#generated-columns) or [`CURRENT_DEFAULT`](#default-columns).
Expand Down
15 changes: 10 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,6 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
Antlr4 / antlr4Version := "4.9.3",
Test / javaOptions ++= Seq("-Dlog4j.configurationFile=log4j2.properties"),

// Java-/Scala-/Uni-Doc Settings
scalacOptions ++= Seq(
"-P:genjavadoc:strictVisibility=true" // hide package private types and methods in javadoc
),
unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/tables/", "io/delta/exceptions/"))
)

Expand Down Expand Up @@ -1570,7 +1566,16 @@ val createTargetClassesDir = taskKey[Unit]("create target classes dir")

// Don't use these groups for any other projects
lazy val sparkGroup = project
.aggregate(spark, contribs, storage, storageS3DynamoDB, iceberg, testDeltaIcebergJar, sharing, hudi)
.aggregate(spark, contribs, storage, storageS3DynamoDB, sharing, hudi)
.settings(
// crossScalaVersions must be set to Nil on the aggregating project
crossScalaVersions := Nil,
publishArtifact := false,
publish / skip := false,
)

lazy val icebergGroup = project
.aggregate(iceberg, testDeltaIcebergJar)
.settings(
// crossScalaVersions must be set to Nil on the aggregating project
crossScalaVersions := Nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ class IcebergTable(
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_PARTITION_EVOLUTION_ENABLED)

private val bucketPartitionEnabled =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_BUCKET_PARTITION_ENABLED)
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_BUCKET_PARTITION_ENABLED) ||
deltaSnapshot.exists(s =>
DeltaConfigs.IGNORE_ICEBERG_BUCKET_PARTITION.fromMetaData(s.metadata)
)

// When a table is CLONED/federated with the session conf ON, it will have the table property
// set and will continue to support CAST TIME TYPE even when later the session conf is OFF.
Expand Down Expand Up @@ -101,8 +104,15 @@ class IcebergTable(
} else {
None
}
val bucketPartitionToNonPartition = if (bucketPartitionEnabled) {
Some((DeltaConfigs.IGNORE_ICEBERG_BUCKET_PARTITION.key -> "true"))
} else {
None
}
icebergTable.properties().asScala.toMap + (DeltaConfigs.COLUMN_MAPPING_MODE.key -> "id") +
(DeltaConfigs.LOG_RETENTION.key -> s"$maxSnapshotAgeMs millisecond") ++ castTimeTypeConf
(DeltaConfigs.LOG_RETENTION.key -> s"$maxSnapshotAgeMs millisecond") ++
castTimeTypeConf ++
bucketPartitionToNonPartition
}

override val partitionSchema: StructType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ public static KernelException invalidVersionRange(long startVersion, long endVer
}

/* ------------------------ PROTOCOL EXCEPTIONS ----------------------------- */
public static KernelException unsupportedTableFeature(String feature) {
String message =
String.format(
"Unsupported Delta table feature: table requires feature \"%s\" "
+ "which is unsupported by this version of Delta Kernel.",
feature);
return new KernelException(message);
}

public static KernelException unsupportedReaderProtocol(
String tablePath, int tableReaderVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,77 @@ public class TableConfig<T> {
// TableConfigs //
//////////////////

/**
* Whether this Delta table is append-only. Files can't be deleted, or values can't be updated.
*/
public static final TableConfig<Boolean> APPEND_ONLY_ENABLED =
new TableConfig<>(
"delta.appendOnly",
"false",
Boolean::valueOf,
value -> true,
"needs to be a boolean.",
true);

/**
* Enable change data feed output. When enabled, DELETE, UPDATE, and MERGE INTO operations will
* need to do additional work to output their change data in an efficiently readable format.
*/
public static final TableConfig<Boolean> CHANGE_DATA_FEED_ENABLED =
new TableConfig<>(
"delta.enableChangeDataFeed",
"false",
Boolean::valueOf,
value -> true,
"needs to be a boolean.",
true);

public static final TableConfig<String> CHECKPOINT_POLICY =
new TableConfig<>(
"delta.checkpointPolicy",
"classic",
v -> v,
value -> value.equals("classic") || value.equals("v2"),
"needs to be a string and one of 'classic' or 'v2'.",
true);

/** Whether commands modifying this Delta table are allowed to create new deletion vectors. */
public static final TableConfig<Boolean> DELETION_VECTORS_CREATION_ENABLED =
new TableConfig<>(
"delta.enableDeletionVectors",
"false",
Boolean::valueOf,
value -> true,
"needs to be a boolean.",
true);

/**
* Whether widening the type of an existing column or field is allowed, either manually using
* ALTER TABLE CHANGE COLUMN or automatically if automatic schema evolution is enabled.
*/
public static final TableConfig<Boolean> TYPE_WIDENING_ENABLED =
new TableConfig<>(
"delta.enableTypeWidening",
"false",
Boolean::valueOf,
value -> true,
"needs to be a boolean.",
true);

/**
* Indicates whether Row Tracking is enabled on the table. When this flag is turned on, all rows
* are guaranteed to have Row IDs and Row Commit Versions assigned to them, and writers are
* expected to preserve them by materializing them to hidden columns in the data files.
*/
public static final TableConfig<Boolean> ROW_TRACKING_ENABLED =
new TableConfig<>(
"delta.enableRowTracking",
"false",
Boolean::valueOf,
value -> true,
"needs to be a boolean.",
true);

/**
* The shortest duration we have to keep logically deleted data files around before deleting them
* physically.
Expand Down Expand Up @@ -175,6 +246,17 @@ public class TableConfig<T> {
Collections.unmodifiableMap(
new HashMap<String, TableConfig<?>>() {
{
addConfig(this, APPEND_ONLY_ENABLED);
addConfig(this, CHANGE_DATA_FEED_ENABLED);
addConfig(this, CHECKPOINT_POLICY);
addConfig(this, DELETION_VECTORS_CREATION_ENABLED);
addConfig(this, TYPE_WIDENING_ENABLED);
addConfig(this, ROW_TRACKING_ENABLED);
addConfig(this, LOG_RETENTION);
addConfig(this, EXPIRED_LOG_CLEANUP_ENABLED);
addConfig(this, TOMBSTONE_RETENTION);
addConfig(this, CHECKPOINT_INTERVAL);
addConfig(this, IN_COMMIT_TIMESTAMPS_ENABLED);
addConfig(this, TOMBSTONE_RETENTION);
addConfig(this, CHECKPOINT_INTERVAL);
addConfig(this, IN_COMMIT_TIMESTAMPS_ENABLED);
Expand Down
Loading

0 comments on commit dd7d642

Please sign in to comment.