Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg/Comet integration POC #9841

Merged
merged 32 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e7a9d8f
Iceberg/Comet integration
Mar 1, 2024
5660306
address comments
Apr 18, 2024
11bb4b7
address comments
Apr 21, 2024
4b01161
remove unnecessary code
Apr 21, 2024
e73dc0a
address comments
Apr 26, 2024
d055ca2
address comments
May 1, 2024
4f06c97
remove unnecessary public
May 1, 2024
d96183d
address comments
May 3, 2024
a2a3707
address comments
May 16, 2024
193a85b
minor changes
May 17, 2024
e68bbb5
update to use comet 0.3.0
huaxingao Oct 21, 2024
f2fbb3c
use the new Comet Utils.getColumnReader method
huaxingao Oct 21, 2024
fa0ee52
change PARQUET_READER_TYPE_DEFAULT to Comet to test CometReader
huaxingao Oct 21, 2024
5514f86
Ignore SmokeTest#testGettingStarted for now
huaxingao Oct 21, 2024
1eb40e5
rebase
huaxingao Dec 4, 2024
83196f6
add setRowGroupInfo(PageReadStore pageStore, Map<ColumnPath, ColumnCh…
huaxingao Dec 4, 2024
d1c6a14
formatting
huaxingao Dec 4, 2024
0eb4ce7
ignore a few tests for now
huaxingao Dec 4, 2024
b9ca9f3
remove comet dependency in build.gradle
huaxingao Dec 26, 2024
d552d4a
Trigger Build
huaxingao Dec 26, 2024
46d0170
add ColumnarBatchUtil
huaxingao Dec 29, 2024
9db707d
rebase
huaxingao Jan 26, 2025
d61325b
rebase
huaxingao Jan 28, 2025
e173dd3
convert constant value to Spark format
huaxingao Jan 28, 2025
a6b15d3
check type before casting
huaxingao Jan 28, 2025
77775a3
address comments
huaxingao Jan 29, 2025
4bf5cbf
address comments
huaxingao Jan 29, 2025
8f34742
remove un-intended change in test
huaxingao Jan 29, 2025
0d9e974
address comments
huaxingao Jan 29, 2025
46dd439
address comments
huaxingao Jan 30, 2025
10901b0
close importer in reset
huaxingao Jan 30, 2025
dae79ad
revert to iceberg reader
huaxingao Jan 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .baseline/checkstyle/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@

<!-- Referencing guava classes should be allowed in classes within bundled-guava module -->
<suppress files="org.apache.iceberg.GuavaClasses" id="BanUnrelocatedGuavaClasses"/>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see there are imports of shaded classes in CometColumnReader. Are those Comet classes? Can you explain a bit what exactly is shaded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comet shades arrow, protobuf and guava.

import org.apache.comet.shaded.arrow.c.CometSchemaImporter;
import org.apache.comet.shaded.arrow.memory.RootAllocator;

RootAllocator is an arrow class. CometSchemaImporter is a Comet class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish Comet would offer an API that wraps around shaded dependencies, so that we don’t have to reference shaded classes directly. It is a bit odd. I always considered shading an internal detail rather than something that would leak into a public API.

Thoughts, @RussellSpitzer @Fokko @nastra @amogh-jahagirdar @danielcweeks?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shaded imports can be removed. Comet has an API used here that requires these classes but we can change the API (only this integration uses that API).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao can you log an issue in Comet to address this? CometSchemaImporter is a Comet class but is in the org.apache.arrow.c package to overcome access restrictions (Arrow's SchemaImporter is package private). We can create a wrapper class to access the schema importer.
Also, we should ideally use the allocator from BatchReader, but that too can be in the wrapper class, I think. There is no issue with using a new allocator for each column, but the arrow allocator has powerful features in memory accounting that we can take advantage of down the road.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created apache/datafusion-comet#1352 for this issue. Will fix this in the next minor release.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

<!-- Suppress checks for CometColumnReader -->
<suppress files="org.apache.iceberg.spark.data.vectorized.CometColumnReader" checks="IllegalImport"/>
</suppressions>
5 changes: 5 additions & 0 deletions spark/v3.4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
annotationProcessor libs.immutables.value
compileOnly libs.immutables.value
implementation project(':iceberg-common')
implementation project(':iceberg-core')
implementation project(':iceberg-data')
Expand All @@ -77,6 +79,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
exclude group: 'org.roaringbitmap'
}

compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.3.0"

implementation libs.parquet.column
implementation libs.parquet.hadoop

Expand Down Expand Up @@ -189,6 +193,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation libs.avro.avro
testImplementation libs.parquet.hadoop
testImplementation libs.junit.vintage.engine
testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.3.0"

// Required because we remove antlr plugin dependencies from the compile configuration, see note above
runtimeOnly libs.antlr.runtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

public class SmokeTest extends SparkExtensionsTestBase {
Expand All @@ -44,7 +45,7 @@ public void dropTable() {
// Run through our Doc's Getting Started Example
// TODO Update doc example so that it can actually be run, modifications were required for this
// test suite to run
@Test
@Ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed.
Currently I set Comet reader as default to make sure all the tests will pass with Comet reader. I think I have to skip this test for Comet reader. Once I change back Iceberg reader as default, I will remove this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we will have to use the built-in reader by default.

public void testGettingStarted() throws IOException {
// Creating a table
sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

import java.io.Serializable;
import org.immutables.value.Value;

@Value.Immutable
public interface OrcBatchReadConf extends Serializable {
int batchSize();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

import java.io.Serializable;
import org.immutables.value.Value;

@Value.Immutable
public interface ParquetBatchReadConf extends Serializable {
int batchSize();

ParquetReaderType readerType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/** Enumerates the types of Parquet readers. */
public enum ParquetReaderType {
huaxingao marked this conversation as resolved.
Show resolved Hide resolved
/** ICEBERG type utilizes the built-in Parquet reader. */
ICEBERG("iceberg"),

/**
* COMET type changes the Parquet reader to the Apache DataFusion Comet Parquet reader. Comet
* Parquet reader performs I/O and decompression in the JVM but decodes in native to improve
* performance. Additionally, Comet will convert Spark's physical plan into a native physical plan
* and execute this plan natively.
*
* <p>TODO: Implement {@link org.apache.comet.parquet.SupportsComet} in SparkScan to convert Spark
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems SupportsComet is an empty marker interface. Does Comet native execution has a dependency on Iceberg code? How does it perform the conversion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comet checks SupportsComet. isCometEnabled() and wraps BatchScanExec with CometBatchScanExec if isCometEnabled is true. I will make SparkScan implement SupportsComet and return true for isCometEnabled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this marker interface avoid the need to depend on Iceberg classes? Okay, that makes sense.

Copy link
Contributor

@aokolnychyi aokolnychyi Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to include this TODO here, however? It doesn't seem to belong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to have a TODO note somewhere so that people will know that native execution is not yet supported. There are some additional steps we need to take to make native execution work. Otherwise people may think native execution is enabled by this PR. Let me see if there is a better place for this.

* physical plan to native physical plan for native execution.
*/
COMET("comet");

private final String parquetReaderType;

ParquetReaderType(String readerType) {
this.parquetReaderType = readerType;
}

public static ParquetReaderType fromName(String parquetReaderType) {
Preconditions.checkArgument(parquetReaderType != null, "Parquet reader type is null");

if (ICEBERG.parquetReaderType().equalsIgnoreCase(parquetReaderType)) {
return ICEBERG;

} else if (COMET.parquetReaderType().equalsIgnoreCase(parquetReaderType)) {
return COMET;

} else {
throw new IllegalArgumentException("Unknown parquet reader type: " + parquetReaderType);
}
}

public String parquetReaderType() {
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
return parquetReaderType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,12 @@ public boolean reportColumnStats() {
.defaultValue(SparkSQLProperties.REPORT_COLUMN_STATS_DEFAULT)
.parse();
}

public ParquetReaderType parquetReaderType() {
return confParser
.enumConf(ParquetReaderType::fromName)
.sessionConf(SparkSQLProperties.PARQUET_READER_TYPE)
.defaultValue(SparkSQLProperties.PARQUET_READER_TYPE_DEFAULT)
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ private SparkSQLProperties() {}
// Controls whether vectorized reads are enabled
public static final String VECTORIZATION_ENABLED = "spark.sql.iceberg.vectorization.enabled";

// Controls which Parquet reader implementation to use
public static final String PARQUET_READER_TYPE = "spark.sql.iceberg.parquet.reader-type";
public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.COMET;
huaxingao marked this conversation as resolved.
Show resolved Hide resolved

// Controls whether reading/writing timestamps without timezones is allowed
@Deprecated
public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,78 +18,133 @@
*/
package org.apache.iceberg.spark.data.vectorized;

import org.apache.iceberg.arrow.vectorized.VectorHolder;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;

public class ColumnVectorWithFilter extends IcebergArrowColumnVector {
/**
* A column vector implementation that applies row-level filtering.
*
* <p>This class wraps an existing column vector and uses a row ID mapping array to remap row
* indices during data access. Each method that retrieves data for a specific row translates the
* provided row index using the mapping array, effectively filtering the original data to only
* expose the live subset of rows. This approach allows efficient row-level filtering without
* modifying the underlying data.
*/
public class ColumnVectorWithFilter extends ColumnVector {
private final ColumnVector delegate;
private final int[] rowIdMapping;
private volatile ColumnVectorWithFilter[] children = null;

public ColumnVectorWithFilter(VectorHolder holder, int[] rowIdMapping) {
super(holder);
public ColumnVectorWithFilter(ColumnVector delegate, int[] rowIdMapping) {
super(delegate.dataType());
this.delegate = delegate;
this.rowIdMapping = rowIdMapping;
}

@Override
public void close() {
delegate.close();
}

@Override
public boolean hasNull() {
return delegate.hasNull();
}

@Override
public int numNulls() {
// computing the actual number of nulls with rowIdMapping is expensive
// it is OK to overestimate and return the number of nulls in the original vector
return delegate.numNulls();
}

@Override
public boolean isNullAt(int rowId) {
return nullabilityHolder().isNullAt(rowIdMapping[rowId]) == 1;
return delegate.isNullAt(rowIdMapping[rowId]);
}

@Override
public boolean getBoolean(int rowId) {
return accessor().getBoolean(rowIdMapping[rowId]);
return delegate.getBoolean(rowIdMapping[rowId]);
}

@Override
public byte getByte(int rowId) {
return delegate.getByte(rowIdMapping[rowId]);
}

@Override
public short getShort(int rowId) {
return delegate.getShort(rowIdMapping[rowId]);
}

@Override
public int getInt(int rowId) {
return accessor().getInt(rowIdMapping[rowId]);
return delegate.getInt(rowIdMapping[rowId]);
}

@Override
public long getLong(int rowId) {
return accessor().getLong(rowIdMapping[rowId]);
return delegate.getLong(rowIdMapping[rowId]);
}

@Override
public float getFloat(int rowId) {
return accessor().getFloat(rowIdMapping[rowId]);
return delegate.getFloat(rowIdMapping[rowId]);
}

@Override
public double getDouble(int rowId) {
return accessor().getDouble(rowIdMapping[rowId]);
return delegate.getDouble(rowIdMapping[rowId]);
}

@Override
public ColumnarArray getArray(int rowId) {
if (isNullAt(rowId)) {
return null;
}
return accessor().getArray(rowIdMapping[rowId]);
return delegate.getArray(rowIdMapping[rowId]);
}

@Override
public ColumnarMap getMap(int rowId) {
return delegate.getMap(rowIdMapping[rowId]);
}

@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
if (isNullAt(rowId)) {
return null;
}
return accessor().getDecimal(rowIdMapping[rowId], precision, scale);
return delegate.getDecimal(rowIdMapping[rowId], precision, scale);
}

@Override
public UTF8String getUTF8String(int rowId) {
if (isNullAt(rowId)) {
return null;
}
return accessor().getUTF8String(rowIdMapping[rowId]);
return delegate.getUTF8String(rowIdMapping[rowId]);
}

@Override
public byte[] getBinary(int rowId) {
if (isNullAt(rowId)) {
return null;
return delegate.getBinary(rowIdMapping[rowId]);
}

@Override
public ColumnVector getChild(int ordinal) {
if (children == null) {
synchronized (this) {
if (children == null) {
if (dataType() instanceof StructType) {
StructType structType = (StructType) dataType();
this.children = new ColumnVectorWithFilter[structType.length()];
for (int index = 0; index < structType.length(); index++) {
children[index] = new ColumnVectorWithFilter(delegate.getChild(index), rowIdMapping);
}
} else {
throw new UnsupportedOperationException("Unsupported nested type: " + dataType());
}
}
}
}
return accessor().getBinary(rowIdMapping[rowId]);

return children[ordinal];
}
}
Loading