Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-262: [C++] Support async I/O prefetch #2048

Closed
wants to merge 48 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
09ad258
support async io prefetch for orc c++ lib
taiyang-li Sep 24, 2024
b0aeb9e
fix failed ci
taiyang-li Oct 9, 2024
bad04f8
fix styles
taiyang-li Oct 9, 2024
8692901
add uts
taiyang-li Oct 9, 2024
a6159e5
fix failed uts
taiyang-li Oct 10, 2024
1808c90
fix building
taiyang-li Oct 10, 2024
4a3fe36
fix format
taiyang-li Oct 10, 2024
2d8f77d
fix failed ci
taiyang-li Oct 10, 2024
90c3539
fix bugs when ranges has the same offsets
taiyang-li Oct 10, 2024
52e02a4
fix bugs when ranges has the same offsets
taiyang-li Oct 10, 2024
a6691bc
fix failed cit
taiyang-li Oct 10, 2024
a2d6723
Update Cache.hh
taiyang-li Oct 16, 2024
fc58f1f
change as request
taiyang-li Oct 25, 2024
07c803d
change as request
taiyang-li Oct 25, 2024
156f4eb
change as request
taiyang-li Oct 25, 2024
bb5b3ae
fix format
taiyang-li Oct 25, 2024
ba57df2
fix style
taiyang-li Nov 4, 2024
3303fe9
fix style
taiyang-li Nov 5, 2024
ac5a188
fix conflicts
taiyang-li Nov 11, 2024
2046a01
fix style
taiyang-li Nov 11, 2024
48ddd50
hide cacheoptions
taiyang-li Nov 15, 2024
f3d76f1
protext read range cache from parallel accessing
taiyang-li Nov 15, 2024
64861ce
hide some private structures
taiyang-li Nov 15, 2024
2bd47b0
fix building
taiyang-li Nov 15, 2024
75dbf47
fix code style
taiyang-li Nov 19, 2024
0e14b04
change as requested
taiyang-li Nov 21, 2024
e7ec8f9
change
taiyang-li Nov 21, 2024
60ca6e9
add metrics
taiyang-li Nov 21, 2024
5e7c4db
revert files
taiyang-li Nov 21, 2024
9c00e51
Update c++/include/orc/OrcFile.hh
taiyang-li Nov 21, 2024
5dc0266
Update c++/include/orc/Reader.hh
taiyang-li Nov 21, 2024
7e40819
Update c++/src/io/Cache.hh
taiyang-li Nov 21, 2024
bed31a5
Update c++/src/io/Cache.hh
taiyang-li Nov 21, 2024
8dd4ca4
Update c++/include/orc/Reader.hh
taiyang-li Nov 21, 2024
ce3d455
Update c++/include/orc/OrcFile.hh
taiyang-li Nov 21, 2024
e20fc4e
Update c++/src/Options.hh
taiyang-li Nov 21, 2024
6b960bc
change as reeust
taiyang-li Nov 27, 2024
5596d74
change as reeust
taiyang-li Nov 27, 2024
0964efd
fix format and building
taiyang-li Nov 27, 2024
d44ff5a
fix style
taiyang-li Nov 27, 2024
e4610e7
fix format
taiyang-li Nov 27, 2024
fbe7945
Update c++/src/io/Cache.hh
taiyang-li Nov 28, 2024
e85f23a
Update c++/include/orc/Reader.hh
taiyang-li Nov 28, 2024
2ef6a41
Update c++/include/orc/Reader.hh
taiyang-li Nov 28, 2024
c822928
Update c++/src/io/Cache.hh
taiyang-li Nov 28, 2024
e277a65
add params for uts
taiyang-li Nov 28, 2024
b8d2221
add some tests
taiyang-li Nov 28, 2024
0d56a7e
fix stuyle
taiyang-li Nov 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion c++/include/orc/OrcFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#ifndef ORC_FILE_HH
#define ORC_FILE_HH

#include <future>
#include <string>

#include "orc/Reader.hh"
Expand All @@ -36,6 +37,20 @@ namespace orc {
*/
class InputStream {
public:
using Buffer = DataBuffer<char>;
using BufferPtr = std::shared_ptr<Buffer>;

taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
struct BufferSlice {
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
BufferSlice() : buffer(nullptr), offset(0), length(0) {}

BufferSlice(BufferPtr buffer, uint64_t offset, uint64_t length)
: buffer(std::move(buffer)), offset(offset), length(length) {}

BufferPtr buffer;
uint64_t offset;
uint64_t length;
};

virtual ~InputStream();

/**
Expand All @@ -58,6 +73,17 @@ namespace orc {
*/
virtual void read(void* buf, uint64_t length, uint64_t offset) = 0;

/**
* Read data asynchronously.
* @param offset the position in the stream to read from.
* @param length the number of bytes to read.
* @return a future that will be set to the buffer when the read is complete.
*/
virtual std::future<BufferPtr> readAsync(uint64_t /*offset*/, uint64_t /*length*/,
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
MemoryPool& /*pool*/) {
throw NotImplementedYet("readAsync not supported yet");
}

/**
* Get the name of the stream for error messages.
*/
Expand Down Expand Up @@ -153,4 +179,4 @@ namespace orc {
const WriterOptions& options);
} // namespace orc

#endif
#endif
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
27 changes: 27 additions & 0 deletions c++/include/orc/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ namespace orc {
// classes that hold data members so we can maintain binary compatibility
struct ReaderOptionsPrivate;
struct RowReaderOptionsPrivate;
struct CacheOptions;
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
class InputStream;
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved

namespace proto {
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
class Footer;
class Metadata;
}; // namespace proto

/**
* Expose the reader metrics including the latency and
Expand Down Expand Up @@ -605,6 +612,26 @@ namespace orc {
*/
virtual std::map<uint32_t, BloomFilterIndex> getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const = 0;

/**
* Get the input stream for the ORC file.
*/
virtual InputStream* getStream() const = 0;
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved

/**
* Get the footer of the ORC file.
*/
virtual const proto::Footer* getFooter() const = 0;
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved

/**
* Get the schema of the ORC file.
*/
virtual const proto::Metadata* getMetadata() const = 0;
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved

virtual void preBuffer(const std::vector<int>& stripes, const std::list<uint64_t>& includeTypes,
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
const CacheOptions& options) = 0;

virtual void releaseBuffer(uint64_t boundary) = 0;
};

/**
Expand Down
1 change: 1 addition & 0 deletions c++/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ set(SOURCE_FILES
orc_proto.pb.h
io/InputStream.cc
io/OutputStream.cc
io/Cache.cc
sargs/ExpressionTree.cc
sargs/Literal.cc
sargs/PredicateLeaf.cc
Expand Down
34 changes: 0 additions & 34 deletions c++/src/MemoryPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ namespace orc {
for (uint64_t i = currentSize_; i > newSize; --i) {
(buf_ + i - 1)->~T();
}
} else if (newSize > currentSize_) {
for (uint64_t i = currentSize_; i < newSize; ++i) {
new (buf_ + i) T();
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
}
}
currentSize_ = newSize;
}
Expand Down Expand Up @@ -134,9 +130,6 @@ namespace orc {
template <>
void DataBuffer<char>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
memset(buf_ + currentSize_, 0, newSize - currentSize_);
}
currentSize_ = newSize;
}

Expand All @@ -152,9 +145,6 @@ namespace orc {
template <>
void DataBuffer<char*>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(char*));
}
currentSize_ = newSize;
}

Expand All @@ -170,9 +160,6 @@ namespace orc {
template <>
void DataBuffer<double>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(double));
}
currentSize_ = newSize;
}

Expand All @@ -188,9 +175,6 @@ namespace orc {
template <>
void DataBuffer<float>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(float));
}
currentSize_ = newSize;
}

Expand All @@ -206,9 +190,6 @@ namespace orc {
template <>
void DataBuffer<int64_t>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int64_t));
}
currentSize_ = newSize;
}

Expand All @@ -224,9 +205,6 @@ namespace orc {
template <>
void DataBuffer<int32_t>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int32_t));
}
currentSize_ = newSize;
}

Expand All @@ -242,9 +220,6 @@ namespace orc {
template <>
void DataBuffer<int16_t>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int16_t));
}
currentSize_ = newSize;
}

Expand All @@ -260,9 +235,6 @@ namespace orc {
template <>
void DataBuffer<int8_t>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int8_t));
}
currentSize_ = newSize;
}

Expand All @@ -278,9 +250,6 @@ namespace orc {
template <>
void DataBuffer<uint64_t>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(uint64_t));
}
currentSize_ = newSize;
}

Expand All @@ -296,9 +265,6 @@ namespace orc {
template <>
void DataBuffer<unsigned char>::resize(uint64_t newSize) {
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, newSize - currentSize_);
}
currentSize_ = newSize;
}

Expand Down
79 changes: 76 additions & 3 deletions c++/src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ namespace orc {
buildTypeNameIdMap(contents_->schema.get());
}

RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> contents, const RowReaderOptions& opts)
RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> contents, const RowReaderOptions& opts,
std::shared_ptr<ReadRangeCache> cachedSource)
: localTimezone_(getLocalTimezone()),
contents_(contents),
throwOnHive11DecimalOverflow_(opts.getThrowOnHive11DecimalOverflow()),
Expand All @@ -255,7 +256,8 @@ namespace orc {
firstRowOfStripe_(*contents_->pool, 0),
enableEncodedBlock_(opts.getEnableLazyDecoding()),
readerTimezone_(getTimezoneByName(opts.getTimezoneName())),
schemaEvolution_(opts.getReadType(), contents_->schema.get()) {
schemaEvolution_(opts.getReadType(), contents_->schema.get()),
cachedSource_(std::move(cachedSource)) {
uint64_t numberOfStripes;
numberOfStripes = static_cast<uint64_t>(footer_->stripes_size());
currentStripe_ = numberOfStripes;
Expand Down Expand Up @@ -838,7 +840,7 @@ namespace orc {
// load stripe statistics for PPD
readMetadata();
}
return std::make_unique<RowReaderImpl>(contents_, opts);
return std::make_unique<RowReaderImpl>(contents_, opts, cachedSource_);
}

uint64_t maxStreamsForType(const proto::Type& type) {
Expand Down Expand Up @@ -1474,6 +1476,77 @@ namespace orc {
return ret;
}

void ReaderImpl::releaseBuffer(uint64_t boundary) {
if (cachedSource_) {
cachedSource_->evictEntriesBefore(boundary);
}
}

void ReaderImpl::preBuffer(const std::vector<int>& stripes,
const std::list<uint64_t>& includeTypes, const CacheOptions& options) {
if (stripes.empty() || includeTypes.empty()) {
return;
}

orc::RowReaderOptions row_reader_options;
row_reader_options.includeTypes(includeTypes);
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
ColumnSelector column_selector(contents_.get());
std::vector<bool> selected_columns;
column_selector.updateSelected(selected_columns, row_reader_options);

std::vector<ReadRange> ranges;
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
ranges.reserve(includeTypes.size());
for (auto stripe : stripes) {
// get stripe information
const auto& stripe_info = footer_->stripes(stripe);
uint64_t stripe_footer_start =
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
stripe_info.offset() + stripe_info.index_length() + stripe_info.data_length();
uint64_t stripe_footer_length = stripe_info.footer_length();

// get stripe footer
std::unique_ptr<SeekableInputStream> pb_stream = createDecompressor(
contents_->compression,
std::make_unique<SeekableFileInputStream>(contents_->stream.get(), stripe_footer_start,
stripe_footer_length, *contents_->pool),
contents_->blockSize, *contents_->pool, contents_->readerMetrics);
proto::StripeFooter stripe_footer;
if (!stripe_footer.ParseFromZeroCopyStream(pb_stream.get())) {
throw ParseError(std::string("bad StripeFooter from ") + pb_stream->getName());
}

// traverse all streams in stripe footer, choose selected streams to prebuffer
uint64_t offset = stripe_info.offset();
for (int i = 0; i < stripe_footer.streams_size(); i++) {
const proto::Stream& stream = stripe_footer.streams(i);
if (offset + stream.length() > stripe_footer_start) {
std::stringstream msg;
msg << "Malformed stream meta at stream index " << i << " in stripe " << stripe
<< ": streamOffset=" << offset << ", streamLength=" << stream.length()
<< ", stripeOffset=" << stripe_info.offset()
<< ", stripeIndexLength=" << stripe_info.index_length()
<< ", stripeDataLength=" << stripe_info.data_length();
throw ParseError(msg.str());
}

if (stream.has_kind() && selected_columns[stream.column()]) {
const auto& kind = stream.kind();
if (kind == proto::Stream_Kind_DATA || kind == proto::Stream_Kind_DICTIONARY_DATA ||
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
kind == proto::Stream_Kind_PRESENT || kind == proto::Stream_Kind_LENGTH ||
kind == proto::Stream_Kind_SECONDARY) {
ranges.emplace_back(offset, stream.length());
}
}

offset += stream.length();
}

if (!cachedSource_)
cachedSource_ = std::make_shared<ReadRangeCache>(getStream(), options, contents_->pool);

cachedSource_->cache(std::move(ranges));
}
}

RowReader::~RowReader() {
// PASS
}
Expand Down
Loading
Loading