Skip to content

Commit

Permalink
PARQUET-2451: Add BYTE_STREAM_SPLIT support for FIXED_LEN_BYTE_ARRAY,…
Browse files Browse the repository at this point in the history
… INT32 and INT64 (#1291)
  • Loading branch information
pitrou authored Apr 26, 2024
1 parent 1ae7da3 commit 09445b5
Show file tree
Hide file tree
Showing 18 changed files with 1,128 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader;
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForDouble;
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFLBA;
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFloat;
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForInteger;
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForLong;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader;
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
Expand Down Expand Up @@ -129,6 +132,12 @@ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valu
return new ByteStreamSplitValuesReaderForFloat();
case DOUBLE:
return new ByteStreamSplitValuesReaderForDouble();
case INT32:
return new ByteStreamSplitValuesReaderForInteger();
case INT64:
return new ByteStreamSplitValuesReaderForLong();
case FIXED_LEN_BYTE_ARRAY:
return new ByteStreamSplitValuesReaderForFLBA(descriptor.getTypeLength());
default:
throw new ParquetDecodingException("no byte stream split reader for type " + descriptor.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public class ParquetProperties {

private static final int MIN_SLAB_SIZE = 64;

private enum ByteStreamSplitMode {
NONE,
FLOATING_POINT,
EXTENDED
}

public enum WriterVersion {
PARQUET_1_0("v1"),
PARQUET_2_0("v2");
Expand Down Expand Up @@ -114,7 +120,7 @@ public static WriterVersion fromString(String name) {
private final ColumnProperty<Integer> numBloomFilterCandidates;
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private final boolean enableByteStreamSplit;
private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
private final Map<String, String> extraMetaData;

private ParquetProperties(Builder builder) {
Expand All @@ -141,10 +147,18 @@ private ParquetProperties(Builder builder) {
this.numBloomFilterCandidates = builder.numBloomFilterCandidates.build();
this.pageRowCountLimit = builder.pageRowCountLimit;
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
this.enableByteStreamSplit = builder.enableByteStreamSplit;
this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
this.extraMetaData = builder.extraMetaData;
}

public static Builder builder() {
return new Builder();
}

public static Builder copy(ParquetProperties toCopy) {
return new Builder(toCopy);
}

public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
return newColumnDescriptorValuesWriter(path.getMaxRepetitionLevel());
}
Expand Down Expand Up @@ -208,8 +222,23 @@ public boolean isDictionaryEnabled(ColumnDescriptor column) {
return dictionaryEnabled.getValue(column);
}

@Deprecated()
public boolean isByteStreamSplitEnabled() {
return enableByteStreamSplit;
return byteStreamSplitEnabled.getDefaultValue() != ByteStreamSplitMode.NONE;
}

public boolean isByteStreamSplitEnabled(ColumnDescriptor column) {
switch (column.getPrimitiveType().getPrimitiveTypeName()) {
case FLOAT:
case DOUBLE:
return byteStreamSplitEnabled.getValue(column) != ByteStreamSplitMode.NONE;
case INT32:
case INT64:
case FIXED_LEN_BYTE_ARRAY:
return byteStreamSplitEnabled.getValue(column) == ByteStreamSplitMode.EXTENDED;
default:
return false;
}
}

public ByteBufferAllocator getAllocator() {
Expand Down Expand Up @@ -301,14 +330,6 @@ public Map<String, String> getExtraMetaData() {
return extraMetaData;
}

public static Builder builder() {
return new Builder();
}

public static Builder copy(ParquetProperties toCopy) {
return new Builder(toCopy);
}

@Override
public String toString() {
return "Parquet page size to " + getPageSizeThreshold() + '\n'
Expand Down Expand Up @@ -349,11 +370,16 @@ public static class Builder {
private final ColumnProperty.Builder<Boolean> bloomFilterEnabled;
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED;
private final ColumnProperty.Builder<ByteStreamSplitMode> byteStreamSplitEnabled;
private Map<String, String> extraMetaData = new HashMap<>();

private Builder() {
enableDict = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED);
byteStreamSplitEnabled = ColumnProperty.<ByteStreamSplitMode>builder()
.withDefaultValue(
DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED
? ByteStreamSplitMode.FLOATING_POINT
: ByteStreamSplitMode.NONE);
bloomFilterEnabled = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_ENABLED);
bloomFilterNDVs = ColumnProperty.<Long>builder().withDefaultValue(null);
bloomFilterFPPs = ColumnProperty.<Double>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_FPP);
Expand All @@ -365,7 +391,7 @@ private Builder() {

private Builder(ParquetProperties toCopy) {
this.pageSize = toCopy.pageSizeThreshold;
this.enableDict = ColumnProperty.<Boolean>builder(toCopy.dictionaryEnabled);
this.enableDict = ColumnProperty.builder(toCopy.dictionaryEnabled);
this.dictPageSize = toCopy.dictionaryPageSizeThreshold;
this.writerVersion = toCopy.writerVersion;
this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck;
Expand All @@ -375,13 +401,13 @@ private Builder(ParquetProperties toCopy) {
this.allocator = toCopy.allocator;
this.pageRowCountLimit = toCopy.pageRowCountLimit;
this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
this.bloomFilterNDVs = ColumnProperty.<Long>builder(toCopy.bloomFilterNDVs);
this.bloomFilterFPPs = ColumnProperty.<Double>builder(toCopy.bloomFilterFPPs);
this.bloomFilterEnabled = ColumnProperty.<Boolean>builder(toCopy.bloomFilterEnabled);
this.adaptiveBloomFilterEnabled = ColumnProperty.<Boolean>builder(toCopy.adaptiveBloomFilterEnabled);
this.numBloomFilterCandidates = ColumnProperty.<Integer>builder(toCopy.numBloomFilterCandidates);
this.bloomFilterNDVs = ColumnProperty.builder(toCopy.bloomFilterNDVs);
this.bloomFilterFPPs = ColumnProperty.builder(toCopy.bloomFilterFPPs);
this.bloomFilterEnabled = ColumnProperty.builder(toCopy.bloomFilterEnabled);
this.adaptiveBloomFilterEnabled = ColumnProperty.builder(toCopy.adaptiveBloomFilterEnabled);
this.numBloomFilterCandidates = ColumnProperty.builder(toCopy.numBloomFilterCandidates);
this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
this.enableByteStreamSplit = toCopy.enableByteStreamSplit;
this.byteStreamSplitEnabled = ColumnProperty.builder(toCopy.byteStreamSplitEnabled);
this.extraMetaData = toCopy.extraMetaData;
}

Expand Down Expand Up @@ -420,8 +446,40 @@ public Builder withDictionaryEncoding(String columnPath, boolean enableDictionar
return this;
}

public Builder withByteStreamSplitEncoding(boolean enableByteStreamSplit) {
this.enableByteStreamSplit = enableByteStreamSplit;
/**
* Enable or disable BYTE_STREAM_SPLIT encoding for FLOAT and DOUBLE columns.
*
* @param enable whether BYTE_STREAM_SPLIT encoding should be enabled
* @return this builder for method chaining.
*/
public Builder withByteStreamSplitEncoding(boolean enable) {
this.byteStreamSplitEnabled.withDefaultValue(
enable ? ByteStreamSplitMode.FLOATING_POINT : ByteStreamSplitMode.NONE);
return this;
}

/**
* Enable or disable BYTE_STREAM_SPLIT encoding for specified columns.
*
* @param columnPath the path of the column (dot-string)
* @param enable whether BYTE_STREAM_SPLIT encoding should be enabled
* @return this builder for method chaining.
*/
public Builder withByteStreamSplitEncoding(String columnPath, boolean enable) {
this.byteStreamSplitEnabled.withValue(
columnPath, enable ? ByteStreamSplitMode.EXTENDED : ByteStreamSplitMode.NONE);
return this;
}

/**
* Enable or disable BYTE_STREAM_SPLIT encoding for FLOAT, DOUBLE, INT32, INT64 and FIXED_LEN_BYTE_ARRAY columns.
*
* @param enable whether BYTE_STREAM_SPLIT encoding should be enabled
* @return this builder for method chaining.
*/
public Builder withExtendedByteStreamSplitEncoding(boolean enable) {
this.byteStreamSplitEnabled.withDefaultValue(
enable ? ByteStreamSplitMode.EXTENDED : ByteStreamSplitMode.NONE);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
package org.apache.parquet.column.values.bytestreamsplit;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ByteStreamSplitValuesReader extends ValuesReader {

private static final Logger LOG = LoggerFactory.getLogger(ByteStreamSplitValuesReader.class);
private final int elementSizeInBytes;
private byte[] byteStreamData;
protected final int elementSizeInBytes;
protected ByteBuffer decodedDataBuffer;
private int indexInStream;
private int valuesCount;

Expand All @@ -39,17 +40,27 @@ protected ByteStreamSplitValuesReader(int elementSizeInBytes) {
this.valuesCount = 0;
}

protected void gatherElementDataFromStreams(byte[] gatheredData) throws ParquetDecodingException {
if (gatheredData.length != elementSizeInBytes) {
throw new ParquetDecodingException("gatherData buffer is not of the expected size.");
}
protected int nextElementByteOffset() {
if (indexInStream >= valuesCount) {
throw new ParquetDecodingException("Byte-stream data was already exhausted.");
}
for (int i = 0; i < elementSizeInBytes; ++i) {
gatheredData[i] = byteStreamData[i * valuesCount + indexInStream];
}
int offset = indexInStream * elementSizeInBytes;
++indexInStream;
return offset;
}

// Decode an entire data page
private byte[] decodeData(ByteBuffer encoded, int valuesCount) {
assert encoded.limit() == valuesCount * elementSizeInBytes;
byte[] decoded = new byte[encoded.limit()];
int destByteIndex = 0;
for (int srcValueIndex = 0; srcValueIndex < valuesCount; ++srcValueIndex) {
for (int stream = 0; stream < elementSizeInBytes; ++stream, ++destByteIndex) {
decoded[destByteIndex] = encoded.get(srcValueIndex + stream * valuesCount);
}
}
assert destByteIndex == decoded.length;
return decoded;
}

@Override
Expand All @@ -76,18 +87,12 @@ public void initFromPage(int valuesCount, ByteBufferInputStream stream)
throw new ParquetDecodingException(errorMessage);
}

// Allocate buffer for all of the byte stream data.
// Eagerly read and decode the data. This allows returning stable
// Binary views into the internal decode buffer for FIXED_LEN_BYTE_ARRAY.
final int totalSizeInBytes = stream.available();
byteStreamData = new byte[totalSizeInBytes];

// Eagerly read the data for each stream.
final int numRead = stream.read(byteStreamData, 0, totalSizeInBytes);
if (numRead != totalSizeInBytes) {
String errorMessage = String.format(
"Failed to read requested number of bytes. Expected: %d. Read %d.", totalSizeInBytes, numRead);
throw new ParquetDecodingException(errorMessage);
}

final ByteBuffer encodedData = stream.slice(totalSizeInBytes).slice(); // possibly zero-copy
final byte[] decodedData = decodeData(encodedData, this.valuesCount);
decodedDataBuffer = ByteBuffer.wrap(decodedData).order(ByteOrder.LITTLE_ENDIAN);
indexInStream = 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@
*/
package org.apache.parquet.column.values.bytestreamsplit;

import org.apache.parquet.bytes.BytesUtils;

public class ByteStreamSplitValuesReaderForDouble extends ByteStreamSplitValuesReader {

private final byte[] valueByteBuffer;

public ByteStreamSplitValuesReaderForDouble() {
super(Double.BYTES);
valueByteBuffer = new byte[Double.BYTES];
}

@Override
public double readDouble() {
gatherElementDataFromStreams(valueByteBuffer);
return Double.longBitsToDouble(BytesUtils.bytesToLong(valueByteBuffer));
return decodedDataBuffer.getDouble(nextElementByteOffset());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.column.values.bytestreamsplit;

import org.apache.parquet.io.api.Binary;

public class ByteStreamSplitValuesReaderForFLBA extends ByteStreamSplitValuesReader {
// Trivial, but overriden for clarity
public ByteStreamSplitValuesReaderForFLBA(int length) {
super(length);
}

@Override
public Binary readBytes() {
return Binary.fromConstantByteBuffer(decodedDataBuffer, nextElementByteOffset(), elementSizeInBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@
*/
package org.apache.parquet.column.values.bytestreamsplit;

import org.apache.parquet.bytes.BytesUtils;

public class ByteStreamSplitValuesReaderForFloat extends ByteStreamSplitValuesReader {

private final byte[] valueByteBuffer;

public ByteStreamSplitValuesReaderForFloat() {
super(Float.BYTES);
valueByteBuffer = new byte[Float.BYTES];
}

@Override
public float readFloat() {
gatherElementDataFromStreams(valueByteBuffer);
return Float.intBitsToFloat(BytesUtils.bytesToInt(valueByteBuffer));
return decodedDataBuffer.getFloat(nextElementByteOffset());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.column.values.bytestreamsplit;

public class ByteStreamSplitValuesReaderForInteger extends ByteStreamSplitValuesReader {
public ByteStreamSplitValuesReaderForInteger() {
super(4);
}

@Override
public int readInteger() {
return decodedDataBuffer.getInt(nextElementByteOffset());
}
}
Loading

0 comments on commit 09445b5

Please sign in to comment.