Skip to content

Commit

Permalink
ORC-1264: [C++] Add a writer option to align compression block with r…
Browse files Browse the repository at this point in the history
…ow group

## What changes were proposed in this pull request?
Add support for the ORC writer to ensure that the compression block is aligned with the row group boundary。

## Why are the changes needed?
To reduce unnecessary I/O and decompression when PPD is in effect, we can enforce the compression block to be aligned with the row group boundary. For more detail, see [link](https://issues.apache.org/jira/projects/ORC/issues/ORC-1264?filter=allopenissues)

## How was this patch tested?
Uts in TestWriter.cc can convert this patch.

## Was this patch authored or co-authored using generative AI tooling?
NO

Closes #2005 from luffy-zh/ORC-1264.

Lead-authored-by: luffy-zh <[email protected]>
Co-authored-by: Hao Zou <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
2 people authored and dongjoon-hyun committed Nov 7, 2024
1 parent b0ee122 commit 08c3480
Show file tree
Hide file tree
Showing 11 changed files with 386 additions and 49 deletions.
19 changes: 19 additions & 0 deletions c++/include/orc/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ namespace orc {
};
ReaderMetrics* getDefaultReaderMetrics();

// Row group index of a single column in a stripe.
struct RowGroupIndex {
// Positions are represented as a two-dimensional array where the first
// dimension is row group index and the second dimension is the position
// list of the row group. The size of the second dimension should be equal
// among all row groups.
std::vector<std::vector<uint64_t>> positions;
};

/**
* Options for creating a Reader.
*/
Expand Down Expand Up @@ -605,6 +614,16 @@ namespace orc {
*/
virtual std::map<uint32_t, BloomFilterIndex> getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const = 0;

/**
* Get row group index of all selected columns in the specified stripe
* @param stripeIndex index of the stripe to be read for row group index.
* @param included index of selected columns to return (if not specified,
* all columns will be returned).
* @return map of row group index keyed by its column index.
*/
virtual std::map<uint32_t, RowGroupIndex> getRowGroupIndex(
uint32_t stripeIndex, const std::set<uint32_t>& included = {}) const = 0;
};

/**
Expand Down
13 changes: 13 additions & 0 deletions c++/include/orc/Writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,19 @@ namespace orc {
* @return if not set, return default value which is 64 KB.
*/
uint64_t getMemoryBlockSize() const;

/**
* Set whether the compression block should be aligned to row group boundary.
* The boolean type may not be aligned to row group boundary due to the
* requirement of the Boolean RLE encoder to pack input bits into bytes
*/
WriterOptions& setAlignBlockBoundToRowGroup(bool alignBlockBoundToRowGroup);

/**
* Get if the compression block should be aligned to row group boundary.
* @return if not set, return default value which is false.
*/
bool getAlignBlockBoundToRowGroup() const;
};

class Writer {
Expand Down
111 changes: 111 additions & 0 deletions c++/src/ColumnWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ namespace orc {
// PASS
}

void ColumnWriter::finishStreams() {
notNullEncoder->finishEncode();
}

class StructColumnWriter : public ColumnWriter {
public:
StructColumnWriter(const Type& type, const StreamsFactory& factory,
Expand Down Expand Up @@ -283,6 +287,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::vector<std::unique_ptr<ColumnWriter>> children_;
};
Expand Down Expand Up @@ -416,6 +422,13 @@ namespace orc {
}
}

void StructColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
for (uint32_t i = 0; i < children_.size(); ++i) {
children_[i]->finishStreams();
}
}

template <typename BatchType>
class IntegerColumnWriter : public ColumnWriter {
public:
Expand All @@ -433,6 +446,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
std::unique_ptr<RleEncoder> rleEncoder;

Expand Down Expand Up @@ -528,6 +543,12 @@ namespace orc {
rleEncoder->recordPosition(rowIndexPosition.get());
}

template <typename BatchType>
void IntegerColumnWriter<BatchType>::finishStreams() {
ColumnWriter::finishStreams();
rleEncoder->finishEncode();
}

template <typename BatchType>
class ByteColumnWriter : public ColumnWriter {
public:
Expand All @@ -544,6 +565,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

private:
std::unique_ptr<ByteRleEncoder> byteRleEncoder_;
};
Expand Down Expand Up @@ -637,6 +660,12 @@ namespace orc {
byteRleEncoder_->recordPosition(rowIndexPosition.get());
}

template <typename BatchType>
void ByteColumnWriter<BatchType>::finishStreams() {
ColumnWriter::finishStreams();
byteRleEncoder_->finishEncode();
}

template <typename BatchType>
class BooleanColumnWriter : public ColumnWriter {
public:
Expand All @@ -654,6 +683,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

private:
std::unique_ptr<ByteRleEncoder> rleEncoder_;
};
Expand Down Expand Up @@ -750,6 +781,12 @@ namespace orc {
rleEncoder_->recordPosition(rowIndexPosition.get());
}

template <typename BatchType>
void BooleanColumnWriter<BatchType>::finishStreams() {
ColumnWriter::finishStreams();
rleEncoder_->finishEncode();
}

template <typename ValueType, typename BatchType>
class FloatingColumnWriter : public ColumnWriter {
public:
Expand All @@ -767,6 +804,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

private:
bool isFloat_;
std::unique_ptr<AppendOnlyBufferedStream> dataStream_;
Expand Down Expand Up @@ -878,6 +917,12 @@ namespace orc {
dataStream_->recordPosition(rowIndexPosition.get());
}

template <typename ValueType, typename BatchType>
void FloatingColumnWriter<ValueType, BatchType>::finishStreams() {
ColumnWriter::finishStreams();
dataStream_->finishStream();
}

/**
* Implementation of increasing sorted string dictionary
*/
Expand Down Expand Up @@ -1041,6 +1086,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
/**
* dictionary related functions
Expand Down Expand Up @@ -1234,6 +1281,14 @@ namespace orc {
}
}

void StringColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
if (!useDictionary) {
directDataStream->finishStream();
directLengthEncoder->finishEncode();
}
}

bool StringColumnWriter::checkDictionaryKeyRatio() {
if (!doneDictionaryCheck) {
useDictionary = dictionary.size() <=
Expand Down Expand Up @@ -1583,6 +1638,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder;

Expand Down Expand Up @@ -1723,6 +1780,12 @@ namespace orc {
nanoRleEncoder->recordPosition(rowIndexPosition.get());
}

void TimestampColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
secRleEncoder->finishEncode();
nanoRleEncoder->finishEncode();
}

class DateColumnWriter : public IntegerColumnWriter<LongVectorBatch> {
public:
DateColumnWriter(const Type& type, const StreamsFactory& factory, const WriterOptions& options);
Expand Down Expand Up @@ -1792,6 +1855,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
RleVersion rleVersion;
uint64_t precision;
Expand Down Expand Up @@ -1910,6 +1975,12 @@ namespace orc {
scaleEncoder->recordPosition(rowIndexPosition.get());
}

void Decimal64ColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
valueStream->finishStream();
scaleEncoder->finishEncode();
}

class Decimal64ColumnWriterV2 : public ColumnWriter {
public:
Decimal64ColumnWriterV2(const Type& type, const StreamsFactory& factory,
Expand All @@ -1926,6 +1997,8 @@ namespace orc {

virtual void recordPosition() const override;

virtual void finishStreams() override;

protected:
uint64_t precision;
uint64_t scale;
Expand Down Expand Up @@ -2016,6 +2089,11 @@ namespace orc {
valueEncoder->recordPosition(rowIndexPosition.get());
}

void Decimal64ColumnWriterV2::finishStreams() {
ColumnWriter::finishStreams();
valueEncoder->finishEncode();
}

class Decimal128ColumnWriter : public Decimal64ColumnWriter {
public:
Decimal128ColumnWriter(const Type& type, const StreamsFactory& factory,
Expand Down Expand Up @@ -2131,6 +2209,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::unique_ptr<RleEncoder> lengthEncoder_;
RleVersion rleVersion_;
Expand Down Expand Up @@ -2307,6 +2387,14 @@ namespace orc {
}
}

void ListColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
lengthEncoder_->finishEncode();
if (child_) {
child_->finishStreams();
}
}

class MapColumnWriter : public ColumnWriter {
public:
MapColumnWriter(const Type& type, const StreamsFactory& factory, const WriterOptions& options);
Expand Down Expand Up @@ -2339,6 +2427,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::unique_ptr<ColumnWriter> keyWriter_;
std::unique_ptr<ColumnWriter> elemWriter_;
Expand Down Expand Up @@ -2557,6 +2647,17 @@ namespace orc {
}
}

void MapColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
lengthEncoder_->finishEncode();
if (keyWriter_) {
keyWriter_->finishStreams();
}
if (elemWriter_) {
elemWriter_->finishStreams();
}
}

class UnionColumnWriter : public ColumnWriter {
public:
UnionColumnWriter(const Type& type, const StreamsFactory& factory,
Expand Down Expand Up @@ -2589,6 +2690,8 @@ namespace orc {

virtual void reset() override;

virtual void finishStreams() override;

private:
std::unique_ptr<ByteRleEncoder> rleEncoder_;
std::vector<std::unique_ptr<ColumnWriter>> children_;
Expand Down Expand Up @@ -2760,6 +2863,14 @@ namespace orc {
}
}

void UnionColumnWriter::finishStreams() {
ColumnWriter::finishStreams();
rleEncoder_->finishEncode();
for (uint32_t i = 0; i < children_.size(); ++i) {
children_[i]->finishStreams();
}
}

std::unique_ptr<ColumnWriter> buildWriter(const Type& type, const StreamsFactory& factory,
const WriterOptions& options) {
switch (static_cast<int64_t>(type.getKind())) {
Expand Down
12 changes: 12 additions & 0 deletions c++/src/ColumnWriter.hh
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,18 @@ namespace orc {
*/
virtual void writeDictionary();

/**
* Finalize the encoding and compressing process. This function should be
* called after all data required for encoding has been added. It ensures
* that any remaining data is processed and the final state of the streams
* is set.
* Note: boolean type cannot cut off the current byte if it is not filled
* with 8 bits, otherwise Boolean RLE may incorrectly read the unfilled
* trailing bits. In this case, the last byte will be the head of the next
* compression block.
*/
virtual void finishStreams();

protected:
/**
* Utility function to translate ColumnStatistics into protobuf form and
Expand Down
17 changes: 9 additions & 8 deletions c++/src/Compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ namespace orc {
}
virtual void finishStream() override {
compressInternal();
BufferedOutputStream::finishStream();
}

protected:
Expand Down Expand Up @@ -982,13 +983,7 @@ namespace orc {
}

uint64_t BlockCompressionStream::flush() {
void* data;
int size;
if (!Next(&data, &size)) {
throw CompressionError("Failed to flush compression buffer.");
}
BufferedOutputStream::BackUp(outputSize - outputPosition);
bufferSize = outputSize = outputPosition = 0;
finishStream();
return BufferedOutputStream::flush();
}

Expand Down Expand Up @@ -1031,7 +1026,13 @@ namespace orc {
}

void BlockCompressionStream::finishStream() {
doBlockCompression();
void* data;
int size;
if (!Next(&data, &size)) {
throw CompressionError("Failed to flush compression buffer.");
}
BufferedOutputStream::BackUp(outputSize - outputPosition);
bufferSize = outputSize = outputPosition = 0;
}

/**
Expand Down
Loading

0 comments on commit 08c3480

Please sign in to comment.