Skip to content

Commit

Permalink
Add more options to ColumnWriterOptions
Browse files Browse the repository at this point in the history
Integer dictionary encoding enabled and String statistics limit
are converted to ColumnWriterParameters. This avoids additional
parameters from being passed around.
  • Loading branch information
Arunachalam Thirupathi authored and highker committed May 4, 2021
1 parent 969c847 commit 30cd883
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.OptionalInt;

import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_COMPRESSION_BUFFER_SIZE;
import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_STRING_STATISTICS_LIMIT;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

Expand All @@ -27,13 +28,22 @@ public class ColumnWriterOptions
private final CompressionKind compressionKind;
private final OptionalInt compressionLevel;
private final int compressionMaxBufferSize;
private final DataSize stringStatisticsLimit;
private final boolean integerDictionaryEncodingEnabled;

public ColumnWriterOptions(CompressionKind compressionKind, OptionalInt compressionLevel, DataSize compressionMaxBufferSize)
public ColumnWriterOptions(
CompressionKind compressionKind,
OptionalInt compressionLevel,
DataSize compressionMaxBufferSize,
DataSize stringStatisticsLimit,
boolean integerDictionaryEncodingEnabled)
{
this.compressionKind = requireNonNull(compressionKind, "compressionKind is null");
this.compressionLevel = requireNonNull(compressionLevel, "compressionLevel is null");
requireNonNull(compressionMaxBufferSize, "compressionMaxBufferSize is null");
this.compressionMaxBufferSize = toIntExact(compressionMaxBufferSize.toBytes());
this.stringStatisticsLimit = requireNonNull(stringStatisticsLimit, "stringStatisticsLimit is null");
this.integerDictionaryEncodingEnabled = integerDictionaryEncodingEnabled;
}

public CompressionKind getCompressionKind()
Expand All @@ -51,6 +61,16 @@ public int getCompressionMaxBufferSize()
return compressionMaxBufferSize;
}

public DataSize getStringStatisticsLimit()
{
return stringStatisticsLimit;
}

public boolean isIntegerDictionaryEncodingEnabled()
{
return integerDictionaryEncodingEnabled;
}

public static Builder builder()
{
return new Builder();
Expand All @@ -61,6 +81,8 @@ public static class Builder
private CompressionKind compressionKind;
private OptionalInt compressionLevel = OptionalInt.empty();
private DataSize compressionMaxBufferSize = DEFAULT_MAX_COMPRESSION_BUFFER_SIZE;
private DataSize stringStatisticsLimit = DEFAULT_MAX_STRING_STATISTICS_LIMIT;
private boolean integerDictionaryEncodingEnabled;

private Builder() {}

Expand All @@ -82,9 +104,21 @@ public Builder setCompressionMaxBufferSize(DataSize compressionMaxBufferSize)
return this;
}

public Builder setStringStatisticsLimit(DataSize stringStatisticsLimit)
{
this.stringStatisticsLimit = stringStatisticsLimit;
return this;
}

public Builder setIntegerDictionaryEncodingEnabled(boolean integerDictionaryEncodingEnabled)
{
this.integerDictionaryEncodingEnabled = integerDictionaryEncodingEnabled;
return this;
}

public ColumnWriterOptions build()
{
return new ColumnWriterOptions(compressionKind, compressionLevel, compressionMaxBufferSize);
return new ColumnWriterOptions(compressionKind, compressionLevel, compressionMaxBufferSize, stringStatisticsLimit, integerDictionaryEncodingEnabled);
}
}
}
12 changes: 8 additions & 4 deletions presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,13 @@ public OrcWriter(
this.orcEncoding = requireNonNull(orcEncoding, "orcEncoding is null");

requireNonNull(compressionKind, "compressionKind is null");
this.columnWriterOptions = ColumnWriterOptions.builder().setCompressionKind(compressionKind).setCompressionLevel(options.getCompressionLevel()).setCompressionMaxBufferSize(options.getMaxCompressionBufferSize()).build();
this.columnWriterOptions = ColumnWriterOptions.builder()
.setCompressionKind(compressionKind)
.setCompressionLevel(options.getCompressionLevel())
.setCompressionMaxBufferSize(options.getMaxCompressionBufferSize())
.setStringStatisticsLimit(options.getMaxStringStatisticsLimit())
.setIntegerDictionaryEncodingEnabled(options.isIntegerDictionaryEncodingEnabled())
.build();
recordValidation(validation -> validation.setCompression(compressionKind));

requireNonNull(options, "options is null");
Expand Down Expand Up @@ -239,10 +245,8 @@ public OrcWriter(
columnWriterOptions,
orcEncoding,
hiveStorageTimeZone,
options.getMaxStringStatisticsLimit(),
dwrfEncryptionInfo,
orcEncoding.createMetadataWriter(),
options.isIntegerDictionaryEncodingEnabled());
orcEncoding.createMetadataWriter());
columnWriters.add(columnWriter);

if (columnWriter instanceof DictionaryColumnWriter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.facebook.presto.orc.metadata.statistics.DateStatisticsBuilder;
import com.facebook.presto.orc.metadata.statistics.IntegerStatisticsBuilder;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import org.joda.time.DateTimeZone;

import java.util.List;
Expand All @@ -45,10 +44,8 @@ public static ColumnWriter createColumnWriter(
ColumnWriterOptions columnWriterOptions,
OrcEncoding orcEncoding,
DateTimeZone hiveStorageTimeZone,
DataSize stringStatisticsLimit,
DwrfEncryptionInfo dwrfEncryptors,
MetadataWriter metadataWriter,
boolean integerDictionaryEncodingEnabled)
MetadataWriter metadataWriter)
{
requireNonNull(type, "type is null");
OrcType orcType = orcTypes.get(columnIndex);
Expand All @@ -74,7 +71,7 @@ public static ColumnWriter createColumnWriter(
return new LongColumnWriter(columnIndex, type, columnWriterOptions, dwrfEncryptor, orcEncoding, IntegerStatisticsBuilder::new, metadataWriter);
case INT:
case LONG:
if (integerDictionaryEncodingEnabled && orcEncoding == DWRF) {
if (columnWriterOptions.isIntegerDictionaryEncodingEnabled() && orcEncoding == DWRF) {
// ORC V1 does not support Integer Dictionary encoding. DWRF supports Integer dictionary encoding.
return new LongDictionaryColumnWriter(columnIndex, type, columnWriterOptions, dwrfEncryptor, orcEncoding, metadataWriter);
}
Expand All @@ -95,7 +92,7 @@ public static ColumnWriter createColumnWriter(
// fall through
case VARCHAR:
case STRING:
return new SliceDictionaryColumnWriter(columnIndex, type, columnWriterOptions, dwrfEncryptor, orcEncoding, stringStatisticsLimit, metadataWriter);
return new SliceDictionaryColumnWriter(columnIndex, type, columnWriterOptions, dwrfEncryptor, orcEncoding, metadataWriter);

case LIST: {
int fieldColumnIndex = orcType.getFieldTypeIndex(0);
Expand All @@ -107,10 +104,8 @@ public static ColumnWriter createColumnWriter(
columnWriterOptions,
orcEncoding,
hiveStorageTimeZone,
stringStatisticsLimit,
dwrfEncryptors,
metadataWriter,
integerDictionaryEncodingEnabled);
metadataWriter);
return new ListColumnWriter(columnIndex, columnWriterOptions, dwrfEncryptor, orcEncoding, elementWriter, metadataWriter);
}

Expand All @@ -122,21 +117,17 @@ public static ColumnWriter createColumnWriter(
columnWriterOptions,
orcEncoding,
hiveStorageTimeZone,
stringStatisticsLimit,
dwrfEncryptors,
metadataWriter,
integerDictionaryEncodingEnabled);
metadataWriter);
ColumnWriter valueWriter = createColumnWriter(
orcType.getFieldTypeIndex(1),
orcTypes,
type.getTypeParameters().get(1),
columnWriterOptions,
orcEncoding,
hiveStorageTimeZone,
stringStatisticsLimit,
dwrfEncryptors,
metadataWriter,
integerDictionaryEncodingEnabled);
metadataWriter);
return new MapColumnWriter(columnIndex, columnWriterOptions, dwrfEncryptor, orcEncoding, keyWriter, valueWriter, metadataWriter);
}

Expand All @@ -152,10 +143,8 @@ public static ColumnWriter createColumnWriter(
columnWriterOptions,
orcEncoding,
hiveStorageTimeZone,
stringStatisticsLimit,
dwrfEncryptors,
metadataWriter,
integerDictionaryEncodingEnabled));
metadataWriter));
}
return new StructColumnWriter(columnIndex, columnWriterOptions, dwrfEncryptor, fieldWriters.build(), metadataWriter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

public class SliceDictionaryColumnWriter
extends DictionaryColumnWriter
Expand All @@ -67,13 +66,12 @@ public SliceDictionaryColumnWriter(
ColumnWriterOptions columnWriterOptions,
Optional<DwrfDataEncryptor> dwrfEncryptor,
OrcEncoding orcEncoding,
DataSize stringStatisticsLimit,
MetadataWriter metadataWriter)
{
super(column, type, columnWriterOptions, dwrfEncryptor, orcEncoding, metadataWriter);
this.dictionaryDataStream = new ByteArrayOutputStream(columnWriterOptions, dwrfEncryptor, Stream.StreamKind.DICTIONARY_DATA);
this.dictionaryLengthStream = createLengthOutputStream(columnWriterOptions, dwrfEncryptor, orcEncoding);
this.stringStatisticsLimitInBytes = toIntExact(requireNonNull(stringStatisticsLimit, "stringStatisticsLimit is null").toBytes());
this.stringStatisticsLimitInBytes = toIntExact(columnWriterOptions.getStringStatisticsLimit().toBytes());
this.statisticsBuilder = newStringStatisticsBuilder();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,7 @@ public void writeDirect(BenchmarkData data)
@Benchmark
public void writeDictionary(BenchmarkData data)
{
ColumnWriter columnWriter;
Type type = data.getType();
if (type.equals(VARCHAR)) {
columnWriter = new SliceDictionaryColumnWriter(COLUMN_INDEX, type, columnWriterOptions, Optional.empty(), DWRF, DEFAULT_MAX_STRING_STATISTICS_LIMIT, DWRF.createMetadataWriter());
}
else {
columnWriter = new LongDictionaryColumnWriter(COLUMN_INDEX, type, columnWriterOptions, Optional.empty(), DWRF, DWRF.createMetadataWriter());
}
ColumnWriter columnWriter = getDictionaryColumnWriter(data);
for (Block block : data.getBlocks()) {
columnWriter.beginRowGroup();
columnWriter.writeBlock(block);
Expand All @@ -133,14 +126,7 @@ public void writeDictionary(BenchmarkData data)
@Benchmark
public void writeDictionaryAndConvert(BenchmarkData data)
{
DictionaryColumnWriter columnWriter;
Type type = data.getType();
if (type.equals(VARCHAR)) {
columnWriter = new SliceDictionaryColumnWriter(COLUMN_INDEX, type, columnWriterOptions, Optional.empty(), DWRF, DEFAULT_MAX_STRING_STATISTICS_LIMIT, DWRF.createMetadataWriter());
}
else {
columnWriter = new LongDictionaryColumnWriter(COLUMN_INDEX, type, columnWriterOptions, Optional.empty(), DWRF, DWRF.createMetadataWriter());
}
DictionaryColumnWriter columnWriter = getDictionaryColumnWriter(data);
for (Block block : data.getBlocks()) {
columnWriter.beginRowGroup();
columnWriter.writeBlock(block);
Expand All @@ -153,6 +139,19 @@ public void writeDictionaryAndConvert(BenchmarkData data)
columnWriter.reset();
}

private DictionaryColumnWriter getDictionaryColumnWriter(BenchmarkData data)
{
DictionaryColumnWriter columnWriter;
Type type = data.getType();
if (type.equals(VARCHAR)) {
columnWriter = new SliceDictionaryColumnWriter(COLUMN_INDEX, type, columnWriterOptions, Optional.empty(), DWRF, DWRF.createMetadataWriter());
}
else {
columnWriter = new LongDictionaryColumnWriter(COLUMN_INDEX, type, columnWriterOptions, Optional.empty(), DWRF, DWRF.createMetadataWriter());
}
return columnWriter;
}

@State(Scope.Thread)
public static class BenchmarkData
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import static com.facebook.presto.orc.OrcReader.INITIAL_BATCH_SIZE;
import static com.facebook.presto.orc.OrcTester.createCustomOrcSelectiveRecordReader;
import static com.facebook.presto.orc.OrcTester.createOrcWriter;
import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_STRING_STATISTICS_LIMIT;
import static com.facebook.presto.orc.metadata.ColumnEncoding.ColumnEncodingKind.DICTIONARY;
import static com.facebook.presto.orc.metadata.ColumnEncoding.ColumnEncodingKind.DICTIONARY_V2;
import static com.facebook.presto.orc.metadata.ColumnEncoding.ColumnEncodingKind.DIRECT;
Expand Down Expand Up @@ -95,7 +94,6 @@ public void testStringDirectConversion()
columnWriterOptions,
Optional.empty(),
ORC,
DEFAULT_MAX_STRING_STATISTICS_LIMIT,
ORC.createMetadataWriter());

// a single row group exceeds 2G after direct conversion
Expand Down

0 comments on commit 30cd883

Please sign in to comment.