Skip to content

Commit

Permalink
GH-3055: Disable column statistics for all columns by configuration (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac authored Nov 13, 2024
1 parent 54335a6 commit 34359c9
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public static WriterVersion fromString(String name) {
private final ValuesWriterFactory valuesWriterFactory;
private final int columnIndexTruncateLength;
private final int statisticsTruncateLength;
private final boolean statisticsEnabled;

// The expected NDV (number of distinct values) for each columns
private final ColumnProperty<Long> bloomFilterNDVs;
Expand Down Expand Up @@ -141,6 +142,7 @@ private ParquetProperties(Builder builder) {
this.valuesWriterFactory = builder.valuesWriterFactory;
this.columnIndexTruncateLength = builder.columnIndexTruncateLength;
this.statisticsTruncateLength = builder.statisticsTruncateLength;
this.statisticsEnabled = builder.statisticsEnabled;
this.bloomFilterNDVs = builder.bloomFilterNDVs.build();
this.bloomFilterFPPs = builder.bloomFilterFPPs.build();
this.bloomFilterEnabled = builder.bloomFilterEnabled.build();
Expand Down Expand Up @@ -334,7 +336,13 @@ public Map<String, String> getExtraMetaData() {
}

public boolean getStatisticsEnabled(ColumnDescriptor column) {
return statistics.getValue(column);
// First check column-specific setting
Boolean columnSetting = statistics.getValue(column);
if (columnSetting != null) {
return columnSetting;
}
// Fall back to global setting
return statisticsEnabled;
}

@Override
Expand Down Expand Up @@ -369,6 +377,7 @@ public static class Builder {
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH;
private boolean statisticsEnabled = DEFAULT_STATISTICS_ENABLED;
private final ColumnProperty.Builder<Long> bloomFilterNDVs;
private final ColumnProperty.Builder<Double> bloomFilterFPPs;
private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
Expand Down Expand Up @@ -679,6 +688,11 @@ public Builder withStatisticsEnabled(String columnPath, boolean enabled) {
return this;
}

public Builder withStatisticsEnabled(boolean enabled) {
this.statisticsEnabled = enabled;
return this;
}

public ParquetProperties build() {
ParquetProperties properties = new ParquetProperties(this);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
Expand Down
16 changes: 16 additions & 0 deletions parquet-hadoop/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -509,3 +509,19 @@ If `true` then an attempt will be made to dynamically load the relevant classes;
if not found then the library will use the classic non-vectored reads: it is safe to enable this option on older releases.
**Default value:** `false`

---

**Property:** `parquet.column.statistics.enabled`
**Description:** Whether to enable column statistics collection.
If `true`, statistics will be collected for all columns unless explicitly disabled for specific columns.
If `false`, statistics will be disabled for all columns regardless of column-specific settings.
It is possible to enable or disable statistics for specific columns by appending `#` followed by the column path.
**Default value:** `true`
**Example:**
```java
// Enable statistics for all columns
conf.set("parquet.column.statistics.enabled", true);
// Disable statistics for 'column.path'
conf.set("parquet.column.statistics.enabled#column.path", false);
// The final configuration will be: Enable statistics for all columns except 'column.path'
```
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public static enum JobSummaryLevel {
public static final String BLOOM_FILTER_CANDIDATES_NUMBER = "parquet.bloom.filter.candidates.number";
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
public static final String STATISTICS_ENABLED = "parquet.column.statistics.enabled";

public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
String level = conf.get(JOB_SUMMARY_LEVEL);
Expand Down Expand Up @@ -388,6 +389,26 @@ public static boolean getPageWriteChecksumEnabled(Configuration conf) {
return conf.getBoolean(PAGE_WRITE_CHECKSUM_ENABLED, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
}

public static void setStatisticsEnabled(JobContext jobContext, boolean enabled) {
getConfiguration(jobContext).setBoolean(STATISTICS_ENABLED, enabled);
}

public static boolean getStatisticsEnabled(Configuration conf) {
return conf.getBoolean(STATISTICS_ENABLED, ParquetProperties.DEFAULT_STATISTICS_ENABLED);
}

public static void setStatisticsEnabled(JobContext jobContext, String columnPath, boolean enabled) {
getConfiguration(jobContext).set(STATISTICS_ENABLED + "#" + columnPath, String.valueOf(enabled));
}

public static boolean getStatisticsEnabled(Configuration conf, String columnPath) {
String columnSpecific = conf.get(STATISTICS_ENABLED + "#" + columnPath);
if (columnSpecific != null) {
return Boolean.parseBoolean(columnSpecific);
}
return conf.getBoolean(STATISTICS_ENABLED, ParquetProperties.DEFAULT_STATISTICS_ENABLED);
}

private WriteSupport<T> writeSupport;
private ParquetOutputCommitter committer;

Expand Down Expand Up @@ -463,7 +484,8 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
.withBloomFilterEnabled(getBloomFilterEnabled(conf))
.withAdaptiveBloomFilterEnabled(getAdaptiveBloomFilterEnabled(conf))
.withPageRowCountLimit(getPageRowCountLimit(conf))
.withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf));
.withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
.withStatisticsEnabled(getStatisticsEnabled(conf));
new ColumnConfigParser()
.withColumnConfig(
ENABLE_DICTIONARY, key -> conf.getBoolean(key, false), propsBuilder::withDictionaryEncoding)
Expand All @@ -479,6 +501,10 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
BLOOM_FILTER_CANDIDATES_NUMBER,
key -> conf.getInt(key, ParquetProperties.DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER),
propsBuilder::withBloomFilterCandidatesNumber)
.withColumnConfig(
STATISTICS_ENABLED,
key -> conf.getBoolean(key, ParquetProperties.DEFAULT_STATISTICS_ENABLED),
propsBuilder::withStatisticsEnabled)
.parseConfig(conf);

ParquetProperties props = propsBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,18 @@ public SELF withStatisticsEnabled(String columnPath, boolean enabled) {
return self();
}

/**
* Sets whether statistics are enabled globally. When disabled, statistics will not be collected
* for any column unless explicitly enabled for specific columns.
*
* @param enabled whether to collect statistics globally
* @return this builder for method chaining
*/
public SELF withStatisticsEnabled(boolean enabled) {
encodingPropsBuilder.withStatisticsEnabled(enabled);
return self();
}

/**
* Build a {@link ParquetWriter} with the accumulated configuration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,30 @@ public abstract static class WriteContext {
protected final boolean enableValidation;
protected final ParquetProperties.WriterVersion version;
protected final Set<String> disableColumnStatistics;
protected final boolean disableAllStatistics;

public WriteContext(
File path,
MessageType schema,
int blockSize,
int pageSize,
boolean enableDictionary,
boolean enableValidation,
ParquetProperties.WriterVersion version,
Set<String> disableColumnStatistics,
boolean disableAllStatistics)
throws IOException {
this.path = path;
this.fsPath = new Path(path.toString());
this.schema = schema;
this.blockSize = blockSize;
this.pageSize = pageSize;
this.enableDictionary = enableDictionary;
this.enableValidation = enableValidation;
this.version = version;
this.disableColumnStatistics = disableColumnStatistics;
this.disableAllStatistics = disableAllStatistics;
}

public WriteContext(
File path,
Expand All @@ -52,7 +76,16 @@ public WriteContext(
boolean enableValidation,
ParquetProperties.WriterVersion version)
throws IOException {
this(path, schema, blockSize, pageSize, enableDictionary, enableValidation, version, ImmutableSet.of());
this(
path,
schema,
blockSize,
pageSize,
enableDictionary,
enableValidation,
version,
ImmutableSet.of(),
false);
}

public WriteContext(
Expand All @@ -65,15 +98,16 @@ public WriteContext(
ParquetProperties.WriterVersion version,
Set<String> disableColumnStatistics)
throws IOException {
this.path = path;
this.fsPath = new Path(path.toString());
this.schema = schema;
this.blockSize = blockSize;
this.pageSize = pageSize;
this.enableDictionary = enableDictionary;
this.enableValidation = enableValidation;
this.version = version;
this.disableColumnStatistics = disableColumnStatistics;
this(
path,
schema,
blockSize,
pageSize,
enableDictionary,
enableValidation,
version,
disableColumnStatistics,
false);
}

public abstract void write(ParquetWriter<Group> writer) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,29 @@ public DataContext(
ParquetProperties.WriterVersion version,
Set<String> disableColumnStatistics)
throws IOException {
super(path, buildSchema(seed), blockSize, pageSize, enableDictionary, true, version);
this(seed, path, blockSize, pageSize, enableDictionary, version, disableColumnStatistics, false);
}

public DataContext(
long seed,
File path,
int blockSize,
int pageSize,
boolean enableDictionary,
ParquetProperties.WriterVersion version,
Set<String> disableColumnStatistics,
boolean disableAllStatistics)
throws IOException {
super(
path,
buildSchema(seed),
blockSize,
pageSize,
enableDictionary,
true,
version,
disableColumnStatistics,
disableAllStatistics);

this.random = new Random(seed);
this.recordCount = random.nextInt(MAX_TOTAL_ROWS);
Expand Down Expand Up @@ -643,4 +665,29 @@ public void testDisableStatistics() throws IOException {
DataGenerationContext.writeAndTest(test);
}
}

@Test
public void testGlobalStatisticsDisabled() throws IOException {
File file = folder.newFile("test_file_global_stats_disabled.parquet");
file.delete();

LOG.info(String.format("RANDOM SEED: %s", RANDOM_SEED));
Random random = new Random(RANDOM_SEED);

int blockSize = (random.nextInt(54) + 10) * MEGABYTE;
int pageSize = (random.nextInt(10) + 1) * MEGABYTE;

// Create context with global statistics disabled
DataContext context = new DataContext(
random.nextLong(),
file,
blockSize,
pageSize,
true, // enable dictionary
ParquetProperties.WriterVersion.PARQUET_2_0,
ImmutableSet.of(), // no specific column statistics disabled
true); // disable all statistics globally

DataGenerationContext.writeAndTest(context);
}
}

0 comments on commit 34359c9

Please sign in to comment.