Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
huaxingao committed Jan 29, 2025
1 parent 77775a3 commit 4bf5cbf
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/** Enumerates the types of Parquet readers. */
public enum ParquetReaderType {
/** ICEBERG type utilizes the built-in Parquet reader. */
ICEBERG("iceberg"),
ICEBERG,

/**
* COMET type changes the Parquet reader to the Apache DataFusion Comet Parquet reader. Comet
Expand All @@ -34,29 +34,14 @@ public enum ParquetReaderType {
* <p>TODO: Implement {@link org.apache.comet.parquet.SupportsComet} in SparkScan to convert Spark
* physical plan to native physical plan for native execution.
*/
COMET("comet");

private final String parquetReaderType;

ParquetReaderType(String readerType) {
this.parquetReaderType = readerType;
}

public static ParquetReaderType fromName(String parquetReaderType) {
Preconditions.checkArgument(parquetReaderType != null, "Parquet reader type is null");

if (ICEBERG.parquetReaderType().equalsIgnoreCase(parquetReaderType)) {
return ICEBERG;

} else if (COMET.parquetReaderType().equalsIgnoreCase(parquetReaderType)) {
return COMET;

} else {
throw new IllegalArgumentException("Unknown parquet reader type: " + parquetReaderType);
COMET;

public static ParquetReaderType fromString(String typeAsString) {
Preconditions.checkArgument(typeAsString != null, "Parquet reader type is null");
try {
return ParquetReaderType.valueOf(typeAsString.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unknown parquet reader type: " + typeAsString);
}
}

public String parquetReaderType() {
return parquetReaderType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public boolean reportColumnStats() {

public ParquetReaderType parquetReaderType() {
return confParser
.enumConf(ParquetReaderType::fromName)
.enumConf(ParquetReaderType::fromString)
.sessionConf(SparkSQLProperties.PARQUET_READER_TYPE)
.defaultValue(SparkSQLProperties.PARQUET_READER_TYPE_DEFAULT)
.parse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,16 @@
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;

@SuppressWarnings("checkstyle:VisibilityModifier")
class CometColumnReader implements VectorizedReader<CometVector> {
public static final int DEFAULT_BATCH_SIZE = 5000;

private final DataType sparkType;
// the delegated column reader from Comet side
protected AbstractColumnReader delegate;
private AbstractColumnReader delegate;
private final CometVector vector;
private final ColumnDescriptor descriptor;
protected boolean initialized = false;
protected int batchSize = DEFAULT_BATCH_SIZE;
private boolean initialized = false;
private int batchSize = DEFAULT_BATCH_SIZE;

CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) {
this.sparkType = sparkType;
Expand All @@ -65,10 +64,22 @@ class CometColumnReader implements VectorizedReader<CometVector> {
this.vector = new CometVector(sparkType, false);
}

public AbstractColumnReader getDelegate() {
public AbstractColumnReader delegate() {
return delegate;
}

void setDelegate(AbstractColumnReader delegate) {
this.delegate = delegate;
}

void setInitialized(boolean initialized) {
this.initialized = initialized;
}

public int batchSize() {
return batchSize;
}

/**
* This method is to initialized/reset the CometColumnReader. This needs to be called for each row
* group after readNextRowGroup, so a new dictionary encoding can be set for each of the new row
Expand All @@ -81,8 +92,8 @@ public void reset() {

CometSchemaImporter importer = new CometSchemaImporter(new RootAllocator());

delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false);
initialized = true;
this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false);
this.initialized = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {

private final CometColumnReader[] readers;
private final boolean hasIsDeletedColumn;
private DeleteFilter<InternalRow> deletes = null;
private long rowStartPosInBatch = 0;
// The delegated batch reader on Comet side
private final BatchReader delegate;
private DeleteFilter<InternalRow> deletes = null;
private long rowStartPosInBatch = 0;

CometColumnarBatchReader(List<VectorizedReader<?>> readers, Schema schema) {
this.readers =
Expand All @@ -59,7 +59,7 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
readers.stream().anyMatch(reader -> reader instanceof CometDeleteColumnReader);

AbstractColumnReader[] abstractColumnReaders = new AbstractColumnReader[readers.size()];
delegate = new BatchReader(abstractColumnReaders);
this.delegate = new BatchReader(abstractColumnReaders);
delegate.setSparkSchema(SparkSchemaUtil.convert(schema));
}

Expand All @@ -86,7 +86,7 @@ public void setRowGroupInfo(
}

for (int i = 0; i < readers.length; i++) {
delegate.getColumnReaders()[i] = this.readers[i].getDelegate();
delegate.getColumnReaders()[i] = this.readers[i].delegate();
}

this.rowStartPosInBatch =
Expand Down Expand Up @@ -178,7 +178,7 @@ ColumnVector[] readDataToColumnVectors() {
for (int i = 0; i < readers.length; i++) {
columnVectors[i] = readers[i].vector();
columnVectors[i].resetRowIdMapping();
org.apache.comet.vector.CometVector vector = readers[i].getDelegate().currentBatch();
org.apache.comet.vector.CometVector vector = readers[i].delegate().currentBatch();
columnVectors[i].setDelegate(vector);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ class CometConstantColumnReader<T> extends CometColumnReader {
CometConstantColumnReader(T value, Types.NestedField field) {
super(field);
// use delegate to set constant value on the native side to be consumed by native execution.
delegate =
new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false);
setDelegate(
new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false));
}

@Override
public void setBatchSize(int batchSize) {
delegate.setBatchSize(batchSize);
this.batchSize = batchSize;
initialized = true;
delegate().setBatchSize(batchSize);
setBatchSize(batchSize);
setInitialized(true);
}

private Object convertToSparkValue(T value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@
class CometDeleteColumnReader<T> extends CometColumnReader {
CometDeleteColumnReader(Types.NestedField field) {
super(field);
delegate = new DeleteColumnReader();
setDelegate(new DeleteColumnReader());
}

CometDeleteColumnReader(boolean[] isDeleted) {
super(MetadataColumns.IS_DELETED);
delegate = new DeleteColumnReader(isDeleted);
setDelegate(new DeleteColumnReader(isDeleted));
}

@Override
public void setBatchSize(int batchSize) {
delegate.setBatchSize(batchSize);
this.batchSize = batchSize;
initialized = true;
delegate().setBatchSize(batchSize);
setBatchSize(batchSize);
setInitialized(true);
}

private static class DeleteColumnReader extends MetadataColumnReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
class CometPositionColumnReader extends CometColumnReader {
CometPositionColumnReader(Types.NestedField field) {
super(field);
delegate = new PositionColumnReader(descriptor());
setDelegate(new PositionColumnReader(descriptor()));
}

@Override
public void setBatchSize(int batchSize) {
delegate.setBatchSize(batchSize);
this.batchSize = batchSize;
initialized = true;
delegate().setBatchSize(batchSize);
setBatchSize(batchSize);
setInitialized(true);
}

private static class PositionColumnReader extends MetadataColumnReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.spark.source;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
import static org.apache.iceberg.spark.SparkSQLProperties.PARQUET_READER_TYPE;
import static org.apache.iceberg.spark.source.SparkSQLExecutionHelper.lastExecutedMetricValue;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -109,12 +110,12 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
@Parameters(name = "fileFormat = {0}, formatVersion = {1}, vectorized = {2}, planningMode = {3}")
public static Object[][] parameters() {
return new Object[][] {
new Object[] {FileFormat.PARQUET, 2, false, PlanningMode.DISTRIBUTED},
// new Object[] {FileFormat.PARQUET, 2, false, PlanningMode.DISTRIBUTED},
new Object[] {FileFormat.PARQUET, 2, true, PlanningMode.LOCAL},
new Object[] {FileFormat.ORC, 2, false, PlanningMode.DISTRIBUTED},
new Object[] {FileFormat.AVRO, 2, false, PlanningMode.LOCAL},
new Object[] {FileFormat.PARQUET, 3, false, PlanningMode.DISTRIBUTED},
new Object[] {FileFormat.PARQUET, 3, true, PlanningMode.LOCAL},
// new Object[] {FileFormat.ORC, 2, false, PlanningMode.DISTRIBUTED},
// new Object[] {FileFormat.AVRO, 2, false, PlanningMode.LOCAL},
// new Object[] {FileFormat.PARQUET, 3, false, PlanningMode.DISTRIBUTED},
// new Object[] {FileFormat.PARQUET, 3, true, PlanningMode.LOCAL},
};
}

Expand All @@ -131,6 +132,7 @@ public static void startMetastoreAndSpark() {
.config("spark.ui.liveUpdate.period", 0)
.config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
.config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
// .config(PARQUET_READER_TYPE, "iceberg")
.enableHiveSupport()
.getOrCreate();

Expand Down

0 comments on commit 4bf5cbf

Please sign in to comment.