Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-1264: [C++] Add a writer option to align compression block with row group #2005

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions c++/include/orc/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,13 @@ namespace orc {
};
ReaderMetrics* getDefaultReaderMetrics();

struct RowGroupPositions {
uint64_t columnId;
std::vector<int32_t> positions;
// Row group index of a single column in a stripe.
struct RowGroupIndex {
// Positions are represented as a two-dimensional array where the first
// dimension is row group index and the second dimension is the position
// list of the row group. The size of the second dimension should be equal
// among all row groups.
std::vector<std::vector<uint64_t>> positions;
};

/**
Expand Down Expand Up @@ -610,6 +614,16 @@ namespace orc {
*/
virtual std::map<uint32_t, BloomFilterIndex> getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const = 0;

/**
* Get row group index of all selected columns in the specified stripe
* @param stripeIndex index of the stripe to be read for row group index.
* @param included index of selected columns to return (if not specified,
* all columns will be returned).
* @return map of row group index keyed by its column index.
*/
virtual std::map<uint32_t, RowGroupIndex> getRowGroupIndex(
uint32_t stripeIndex, const std::set<uint32_t>& included = {}) const = 0;
};

/**
Expand Down Expand Up @@ -663,11 +677,6 @@ namespace orc {
*/
virtual void seekToRow(uint64_t rowNumber) = 0;

/**
* Get the row group positions of the specified column in the current stripe.
* @return the position entries for the specified columns.
*/
virtual std::vector<RowGroupPositions> getPositionEntries(int columnId) = 0;
};
} // namespace orc

Expand Down
85 changes: 57 additions & 28 deletions c++/src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1241,23 +1241,6 @@ namespace orc {
useTightNumericVector_);
}

std::vector<RowGroupPositions> RowReaderImpl::getPositionEntries(int columnId) {
loadStripeIndex();
std::vector<RowGroupPositions> result;
auto rowIndex = rowIndexes_[columnId];
RowGroupPositions rgPositions;
rgPositions.columnId = columnId;
for (auto rowIndexEntry : rowIndex.entry()) {
auto posVector = rgPositions.positions;
for (auto position : rowIndexEntry.positions()) {
posVector.push_back(position);
}
result.push_back(rgPositions);
}

return result;
}

void ensureOrcFooter(InputStream* stream, DataBuffer<char>* buffer, uint64_t postscriptLength) {
const std::string MAGIC("ORC");
const uint64_t magicLength = MAGIC.length();
Expand Down Expand Up @@ -1443,17 +1426,10 @@ namespace orc {
uint32_t stripeIndex, const std::set<uint32_t>& included) const {
std::map<uint32_t, BloomFilterIndex> ret;

// find stripe info
if (stripeIndex >= static_cast<uint32_t>(footer_->stripes_size())) {
throw std::logic_error("Illegal stripe index: " +
to_string(static_cast<int64_t>(stripeIndex)));
}
const proto::StripeInformation currentStripeInfo =
footer_->stripes(static_cast<int>(stripeIndex));
const proto::StripeFooter currentStripeFooter = getStripeFooter(currentStripeInfo, *contents_);
uint64_t offset;
auto currentStripeFooter = loadCurrentStripeFooter(stripeIndex, offset);

// iterate stripe footer to get stream of bloom_filter
uint64_t offset = static_cast<uint64_t>(currentStripeInfo.offset());
for (int i = 0; i < currentStripeFooter.streams_size(); i++) {
const proto::Stream& stream = currentStripeFooter.streams(i);
uint32_t column = static_cast<uint32_t>(stream.column());
Expand Down Expand Up @@ -1491,6 +1467,59 @@ namespace orc {
return ret;
}

proto::StripeFooter ReaderImpl::loadCurrentStripeFooter(uint32_t stripeIndex,
uint64_t& offset) const {
// find stripe info
if (stripeIndex >= static_cast<uint32_t>(footer_->stripes_size())) {
throw std::logic_error("Illegal stripe index: " +
to_string(static_cast<int64_t>(stripeIndex)));
}
const proto::StripeInformation currentStripeInfo =
footer_->stripes(static_cast<int>(stripeIndex));
offset = static_cast<uint64_t>(currentStripeInfo.offset());
return getStripeFooter(currentStripeInfo, *contents_);
}

std::map<uint32_t, RowGroupIndex> ReaderImpl::getRowGroupIndex(
uint32_t stripeIndex, const std::set<uint32_t>& included) const {
std::map<uint32_t, RowGroupIndex> ret;
uint64_t offset;
auto currentStripeFooter = loadCurrentStripeFooter(stripeIndex, offset);

// iterate stripe footer to get stream of row_index
for (int i = 0; i < currentStripeFooter.streams_size(); i++) {
const proto::Stream& stream = currentStripeFooter.streams(i);
uint32_t column = static_cast<uint32_t>(stream.column());
uint64_t length = static_cast<uint64_t>(stream.length());
RowGroupIndex rowGroupIndex = ret[column];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RowGroupIndex rowGroupIndex = ret[column];
RowGroupIndex& rowGroupIndex = ret[column];

This should break the test, or we need to add at least a test case to cover this new API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the RowGroupIndex size check in TestWriter.cc:97.


if (stream.kind() == proto::Stream_Kind_ROW_INDEX &&
(included.empty() || included.find(column) != included.end())) {
std::unique_ptr<SeekableInputStream> pbStream =
createDecompressor(contents_->compression,
std::make_unique<SeekableFileInputStream>(
contents_->stream.get(), offset, length, *contents_->pool),
contents_->blockSize, *(contents_->pool), contents_->readerMetrics);

proto::RowIndex pbRowIndex;
if (!pbRowIndex.ParseFromZeroCopyStream(pbStream.get())) {
throw ParseError("Failed to parse RowIndex");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: provide more debug info, like stripeIndex and columnId?

}

// add rowGroupIndex to result for one column
for (auto& rowIndexEntry : pbRowIndex.entry()) {
std::vector<uint64_t> posVector;
for (auto& position : rowIndexEntry.positions()) {
posVector.push_back(position);
}
rowGroupIndex.positions.push_back(posVector);
}
}
offset += length;
}
return ret;
}

RowReader::~RowReader() {
// PASS
}
Expand All @@ -1499,8 +1528,8 @@ namespace orc {
// PASS
}

InputStream::~InputStream(){
// PASS
InputStream::~InputStream() {
// PASS
};

} // namespace orc
6 changes: 4 additions & 2 deletions c++/src/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,6 @@ namespace orc {
const SchemaEvolution* getSchemaEvolution() const {
return &schemaEvolution_;
}

std::vector<RowGroupPositions> getPositionEntries(int columnId) override;
};

class ReaderImpl : public Reader {
Expand All @@ -270,6 +268,7 @@ namespace orc {
void getRowIndexStatistics(const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
const proto::StripeFooter& currentStripeFooter,
std::vector<std::vector<proto::ColumnStatistics>>* indexStats) const;
proto::StripeFooter loadCurrentStripeFooter(uint32_t stripeIndex, uint64_t& offset) const;

// metadata
mutable bool isMetadataLoaded_;
Expand Down Expand Up @@ -375,6 +374,9 @@ namespace orc {

std::map<uint32_t, BloomFilterIndex> getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const override;

std::map<uint32_t, RowGroupIndex> getRowGroupIndex(
uint32_t stripeIndex, const std::set<uint32_t>& included) const override;
};
} // namespace orc

Expand Down
Loading
Loading