Skip to content

Commit

Permalink
ORC-1767: [C++] Improve writing performance of encoded string column …
Browse files Browse the repository at this point in the history
…and support EncodedStringVectorBatch for StringColumnWriter

### What changes were proposed in this pull request?

Improve writing performance of encoded string column and support EncodedStringVectorBatch for StringColumnWriter.
Performance was measured in ClickHouse#15

### Why are the changes needed?

### How was this patch tested?

original tests.

### Was this patch authored or co-authored using generative AI tooling?

Closes #2010 from taiyang-li/apache_improve_dict_write.

Lead-authored-by: taiyang-li <[email protected]>
Co-authored-by: 李扬 <[email protected]>
Signed-off-by: ffacs <[email protected]>
  • Loading branch information
taiyang-li authored and ffacs committed Sep 3, 2024
1 parent 3e07b3f commit 97a3797
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 24 deletions.
26 changes: 26 additions & 0 deletions c++/include/orc/Vector.hh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ namespace orc {
bool hasNulls;
// whether the vector batch is encoded
bool isEncoded;
// whether the dictionary is decoded into vector batch
bool dictionaryDecoded;

// custom memory pool
MemoryPool& memoryPool;
Expand Down Expand Up @@ -88,6 +90,14 @@ namespace orc {
*/
virtual bool hasVariableLength();

/**
* Decode possible dictionary into vector batch.
*/
void decodeDictionary();

protected:
virtual void decodeDictionaryImpl() {}

private:
ColumnVectorBatch(const ColumnVectorBatch&);
ColumnVectorBatch& operator=(const ColumnVectorBatch&);
Expand Down Expand Up @@ -248,6 +258,10 @@ namespace orc {
~EncodedStringVectorBatch() override;
std::string toString() const override;
void resize(uint64_t capacity) override;

// Calculate data and length in StringVectorBatch from dictionary and index
void decodeDictionaryImpl() override;

std::shared_ptr<StringDictionary> dictionary;

// index for dictionary entry
Expand All @@ -264,6 +278,9 @@ namespace orc {
bool hasVariableLength() override;

std::vector<ColumnVectorBatch*> fields;

protected:
void decodeDictionaryImpl() override;
};

struct ListVectorBatch : public ColumnVectorBatch {
Expand All @@ -283,6 +300,9 @@ namespace orc {

// the concatenated elements
std::unique_ptr<ColumnVectorBatch> elements;

protected:
void decodeDictionaryImpl() override;
};

struct MapVectorBatch : public ColumnVectorBatch {
Expand All @@ -304,6 +324,9 @@ namespace orc {
std::unique_ptr<ColumnVectorBatch> keys;
// the concatenated elements
std::unique_ptr<ColumnVectorBatch> elements;

protected:
void decodeDictionaryImpl() override;
};

struct UnionVectorBatch : public ColumnVectorBatch {
Expand All @@ -327,6 +350,9 @@ namespace orc {

// the sub-columns
std::vector<ColumnVectorBatch*> children;

protected:
void decodeDictionaryImpl() override;
};

struct Decimal {
Expand Down
60 changes: 36 additions & 24 deletions c++/src/ColumnWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -889,10 +889,17 @@ namespace orc {
size_t length;
};

struct DictEntryWithIndex {
DictEntryWithIndex(const char* str, size_t len, size_t index)
: entry(str, len), index(index) {}
DictEntry entry;
size_t index;
};

SortedStringDictionary() : totalLength_(0) {}

// insert a new string into dictionary, return its insertion order
size_t insert(const char* data, size_t len);
size_t insert(const char* str, size_t len);

// write dictionary data & length to output buffer
void flush(AppendOnlyBufferedStream* dataStream, RleEncoder* lengthEncoder) const;
Expand All @@ -913,7 +920,9 @@ namespace orc {

private:
struct LessThan {
bool operator()(const DictEntry& left, const DictEntry& right) const {
bool operator()(const DictEntryWithIndex& l, const DictEntryWithIndex& r) {
const auto& left = l.entry;
const auto& right = r.entry;
int ret = memcmp(left.data, right.data, std::min(left.length, right.length));
if (ret != 0) {
return ret < 0;
Expand All @@ -922,8 +931,8 @@ namespace orc {
}
};

std::map<DictEntry, size_t, LessThan> dict_;
std::vector<std::vector<char>> data_;
mutable std::vector<DictEntryWithIndex> flatDict_;
std::unordered_map<std::string, size_t> keyToIndex_;
uint64_t totalLength_;

// use friend class here to avoid being bothered by const function calls
Expand All @@ -936,14 +945,10 @@ namespace orc {

// insert a new string into dictionary, return its insertion order
size_t SortedStringDictionary::insert(const char* str, size_t len) {
auto ret = dict_.insert({DictEntry(str, len), dict_.size()});
size_t index = flatDict_.size();
auto ret = keyToIndex_.emplace(std::string(str, len), index);
if (ret.second) {
// make a copy to internal storage
data_.push_back(std::vector<char>(len));
memcpy(data_.back().data(), str, len);
// update dictionary entry to link pointer to internal storage
DictEntry* entry = const_cast<DictEntry*>(&(ret.first->first));
entry->data = data_.back().data();
flatDict_.emplace_back(ret.first->first.data(), ret.first->first.size(), index);
totalLength_ += len;
}
return ret.first->second;
Expand All @@ -952,9 +957,12 @@ namespace orc {
// write dictionary data & length to output buffer
void SortedStringDictionary::flush(AppendOnlyBufferedStream* dataStream,
RleEncoder* lengthEncoder) const {
for (auto it = dict_.cbegin(); it != dict_.cend(); ++it) {
dataStream->write(it->first.data, it->first.length);
lengthEncoder->write(static_cast<int64_t>(it->first.length));
std::sort(flatDict_.begin(), flatDict_.end(), LessThan());

for (const auto& entryWithIndex : flatDict_) {
const auto& entry = entryWithIndex.entry;
dataStream->write(entry.data, entry.length);
lengthEncoder->write(static_cast<int64_t>(entry.length));
}
}

Expand All @@ -970,10 +978,9 @@ namespace orc {
*/
void SortedStringDictionary::reorder(std::vector<int64_t>& idxBuffer) const {
// iterate the dictionary to get mapping from insertion order to value order
std::vector<size_t> mapping(dict_.size());
size_t dictIdx = 0;
for (auto it = dict_.cbegin(); it != dict_.cend(); ++it) {
mapping[it->second] = dictIdx++;
std::vector<size_t> mapping(flatDict_.size());
for (size_t i = 0; i < flatDict_.size(); ++i) {
mapping[flatDict_[i].index] = i;
}

// do the transformation
Expand All @@ -985,15 +992,20 @@ namespace orc {
// get dict entries in insertion order
void SortedStringDictionary::getEntriesInInsertionOrder(
std::vector<const DictEntry*>& entries) const {
entries.resize(dict_.size());
for (auto it = dict_.cbegin(); it != dict_.cend(); ++it) {
entries[it->second] = &(it->first);
std::sort(flatDict_.begin(), flatDict_.end(),
[](const DictEntryWithIndex& left, const DictEntryWithIndex& right) {
return left.index < right.index;
});

entries.resize(flatDict_.size());
for (size_t i = 0; i < flatDict_.size(); ++i) {
entries[i] = &(flatDict_[i].entry);
}
}

// return count of entries
size_t SortedStringDictionary::size() const {
return dict_.size();
return flatDict_.size();
}

// return total length of strings in the dictioanry
Expand All @@ -1003,8 +1015,8 @@ namespace orc {

void SortedStringDictionary::clear() {
totalLength_ = 0;
data_.clear();
dict_.clear();
keyToIndex_.clear();
flatDict_.clear();
}

class StringColumnWriter : public ColumnWriter {
Expand Down
45 changes: 45 additions & 0 deletions c++/src/Vector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace orc {
notNull(pool, cap),
hasNulls(false),
isEncoded(false),
dictionaryDecoded(false),
memoryPool(pool) {
std::memset(notNull.data(), 1, capacity);
}
Expand Down Expand Up @@ -61,6 +62,13 @@ namespace orc {
return false;
}

void ColumnVectorBatch::decodeDictionary() {
if (dictionaryDecoded) return;

decodeDictionaryImpl();
dictionaryDecoded = true;
}

StringDictionary::StringDictionary(MemoryPool& pool)
: dictionaryBlob(pool), dictionaryOffset(pool) {
// PASS
Expand Down Expand Up @@ -88,6 +96,17 @@ namespace orc {
}
}

void EncodedStringVectorBatch::decodeDictionaryImpl() {
size_t n = index.size();
resize(n);

for (size_t i = 0; i < n; ++i) {
if (!hasNulls || notNull[i]) {
dictionary->getValueByIndex(index[i], data[i], length[i]);
}
}
}

StringVectorBatch::StringVectorBatch(uint64_t capacity, MemoryPool& pool)
: ColumnVectorBatch(capacity, pool),
data(pool, capacity),
Expand Down Expand Up @@ -174,6 +193,12 @@ namespace orc {
return false;
}

void StructVectorBatch::decodeDictionaryImpl() {
for (const auto& field : fields) {
field->decodeDictionary();
}
}

ListVectorBatch::ListVectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool), offsets(pool, cap + 1) {
offsets.zeroOut();
Expand Down Expand Up @@ -211,6 +236,10 @@ namespace orc {
return true;
}

void ListVectorBatch::decodeDictionaryImpl() {
elements->decodeDictionary();
}

MapVectorBatch::MapVectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool), offsets(pool, cap + 1) {
offsets.zeroOut();
Expand Down Expand Up @@ -251,6 +280,16 @@ namespace orc {
return true;
}

void MapVectorBatch::decodeDictionaryImpl() {
if (keys) {
keys->decodeDictionary();
}

if (elements) {
elements->decodeDictionary();
}
}

UnionVectorBatch::UnionVectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool), tags(pool, cap), offsets(pool, cap) {
tags.zeroOut();
Expand Down Expand Up @@ -310,6 +349,12 @@ namespace orc {
return false;
}

void UnionVectorBatch::decodeDictionaryImpl() {
for (const auto& child : children) {
child->decodeDictionary();
}
}

Decimal64VectorBatch::Decimal64VectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool),
precision(0),
Expand Down
53 changes: 53 additions & 0 deletions c++/test/TestDictionaryEncoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -434,4 +434,57 @@ namespace orc {
testDictionaryMultipleStripes(DICT_THRESHOLD, false);
testDictionaryMultipleStripes(FALLBACK_THRESHOLD, false);
}

TEST(DictionaryEncoding, decodeDictionary) {
size_t rowCount = 8192;
size_t dictionarySize = 100;
auto* memoryPool = getDefaultPool();

auto encodedStringBatch = std::make_shared<EncodedStringVectorBatch>(rowCount, *memoryPool);
EXPECT_FALSE(encodedStringBatch->dictionaryDecoded);
encodedStringBatch->numElements = rowCount;
encodedStringBatch->hasNulls = true;
encodedStringBatch->isEncoded = true;
encodedStringBatch->dictionary = std::make_shared<StringDictionary>(*memoryPool);

auto& dictionary = *encodedStringBatch->dictionary;
dictionary.dictionaryBlob.resize(3 * dictionarySize);
dictionary.dictionaryOffset.resize(dictionarySize + 1);
dictionary.dictionaryOffset[0] = 0;
for (uint64_t i = 0; i < dictionarySize; ++i) {
std::ostringstream oss;
oss << std::setw(3) << std::setfill('0') << i;

auto str = oss.str();
memcpy(&dictionary.dictionaryBlob[3 * i], str.data(), str.size());
dictionary.dictionaryOffset[i + 1] = 3 * (i + 1);
}

for (uint64_t i = 0; i < rowCount; ++i) {
if (i % 10 == 0) {
encodedStringBatch->notNull[i] = 0;
encodedStringBatch->index[i] = 0;
} else {
encodedStringBatch->notNull[i] = 1;
encodedStringBatch->index[i] = i % dictionarySize;
}
}

encodedStringBatch->decodeDictionary();
EXPECT_TRUE(encodedStringBatch->dictionaryDecoded);
EXPECT_EQ(0, encodedStringBatch->blob.size());

for (uint64_t i = 0; i < rowCount; ++i) {
if (encodedStringBatch->notNull[i]) {
auto index = encodedStringBatch->index[i];
char* buf = nullptr;
int64_t buf_size = 0;
dictionary.getValueByIndex(index, buf, buf_size);

EXPECT_EQ(buf, encodedStringBatch->data[i]);
EXPECT_EQ(buf_size, encodedStringBatch->length[i]);
}
}
}

} // namespace orc

0 comments on commit 97a3797

Please sign in to comment.