Skip to content

Commit

Permalink
Optimize Presto Serialization
Browse files Browse the repository at this point in the history
Uses serialization API that takes rows instead or ranges in
PartitionedOutput. 1.5 - 2x more efficient CPU-wise.
  • Loading branch information
Orri Erling committed Dec 13, 2023
1 parent 085c753 commit 188ce94
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 57 deletions.
52 changes: 21 additions & 31 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,27 @@ BlockingReason Destination::advance(
OutputBufferManager& bufferManager,
const std::function<void()>& bufferReleaseFn,
bool* atEnd,
ContinueFuture* future) {
if (rangeIdx_ >= ranges_.size()) {
ContinueFuture* future,
Scratch& scratch) {
if (rowIdx_ >= rows_.size()) {
*atEnd = true;
return BlockingReason::kNotBlocked;
}

const auto firstRow = rowIdx_;
const uint32_t adjustedMaxBytes = (maxBytes * targetSizePct_) / 100;
if (bytesInCurrent_ >= adjustedMaxBytes) {
return flush(bufferManager, bufferReleaseFn, future);
}

// Collect ranges to serialize.
rangesToSerialize_.clear();
// Collect rows to serialize.
bool shouldFlush = false;
while (rangeIdx_ < ranges_.size() && !shouldFlush) {
auto& currRange = ranges_[rangeIdx_];
auto startRow = rowsInCurrentRange_;
for (; rowsInCurrentRange_ < currRange.size && !shouldFlush;
rowsInCurrentRange_++) {
++rowsInCurrent_;
bytesInCurrent_ += sizes[currRange.begin + rowsInCurrentRange_];
shouldFlush = bytesInCurrent_ >= adjustedMaxBytes ||
rowsInCurrent_ >= targetNumRows_;
}
rangesToSerialize_.push_back(
{currRange.begin + startRow, rowsInCurrentRange_ - startRow});
if (rowsInCurrentRange_ == currRange.size) {
rowsInCurrentRange_ = 0;
rangeIdx_++;
}
while (rowIdx_ < rows_.size() && !shouldFlush) {
bytesInCurrent_ += sizes[rowIdx_];
++rowIdx_;
++rowsInCurrent_;
shouldFlush =
bytesInCurrent_ >= adjustedMaxBytes || rowsInCurrent_ >= targetNumRows_;
}

// Serialize
Expand All @@ -67,9 +58,9 @@ BlockingReason Destination::advance(
current_->createStreamTree(rowType, rowsInCurrent_);
}
current_->append(
output, folly::Range(&rangesToSerialize_[0], rangesToSerialize_.size()));
output, folly::Range(&rows_[firstRow], rowIdx_ - firstRow), scratch);
// Update output state variable.
if (rangeIdx_ == ranges_.size()) {
if (rowIdx_ == rows_.size()) {
*atEnd = true;
}
if (shouldFlush || (eagerFlush_ && rowsInCurrent_ > 0)) {
Expand Down Expand Up @@ -189,12 +180,7 @@ void PartitionedOutput::initializeDestinations() {

void PartitionedOutput::initializeSizeBuffers() {
auto numInput = input_->size();
if (numInput > topLevelRanges_.size()) {
vector_size_t numOld = topLevelRanges_.size();
topLevelRanges_.resize(numInput);
for (auto i = numOld; i < numInput; ++i) {
topLevelRanges_[i] = IndexRange{i, 1};
}
if (numInput > rowSize_.size()) {
rowSize_.resize(numInput);
sizePointers_.resize(numInput);
// Set all the size pointers since 'rowSize_' may have been reallocated.
Expand All @@ -207,11 +193,14 @@ void PartitionedOutput::initializeSizeBuffers() {
void PartitionedOutput::estimateRowSizes() {
auto numInput = input_->size();
std::fill(rowSize_.begin(), rowSize_.end(), 0);
raw_vector<vector_size_t> storage;
auto numbers = iota(numInput, storage);
for (int i = 0; i < output_->childrenSize(); ++i) {
VectorStreamGroup::estimateSerializedSize(
output_->childAt(i),
folly::Range(topLevelRanges_.data(), numInput),
sizePointers_.data());
folly::Range(numbers, numInput),
sizePointers_.data(),
scratch_);
}
}

Expand Down Expand Up @@ -332,7 +321,8 @@ RowVectorPtr PartitionedOutput::getOutput() {
*bufferManager,
bufferReleaseFn_,
&atEnd,
&future_);
&future_,
scratch_);
if (blockingReason_ != BlockingReason::kNotBlocked) {
blockedDestination = destination.get();
workLeft = false;
Expand Down
36 changes: 14 additions & 22 deletions velox/exec/PartitionedOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@ class Destination {

// Resets the destination before starting a new batch.
void beginBatch() {
ranges_.clear();
rangesToSerialize_.clear();
rangeIdx_ = 0;
rowsInCurrentRange_ = 0;
rows_.clear();
rowIdx_ = 0;
}

void addRow(vector_size_t row) {
ranges_.push_back(IndexRange{row, 1});
rows_.push_back(row);
}

void addRows(const IndexRange& rows) {
ranges_.push_back(rows);
for (auto i = 0; i < rows.size; ++i) {
rows_.push_back(rows.begin + i);
}
}

// Serializes row from 'output' till either 'maxBytes' have been serialized or
Expand All @@ -61,7 +61,8 @@ class Destination {
OutputBufferManager& bufferManager,
const std::function<void()>& bufferReleaseFn,
bool* atEnd,
ContinueFuture* future);
ContinueFuture* future,
Scratch& scratch);

BlockingReason flush(
OutputBufferManager& bufferManager,
Expand Down Expand Up @@ -103,17 +104,11 @@ class Destination {
uint64_t bytesInCurrent_{0};
// Number of rows serialized in 'current_'
vector_size_t rowsInCurrent_{0};
std::vector<IndexRange> ranges_;
// List of ranges to be serialized. This is only used by
// Destination::advance() and defined as a member variable to reuse allocated
// capacity between calls.
std::vector<IndexRange> rangesToSerialize_;

// First range index of 'ranges_' that is not appended to 'current_'.
vector_size_t rangeIdx_{0};
// Number of rows serialized in the current range pointed to by 'rangeIdx_'.
// This is non-zero if the current range was partially serialized.
vector_size_t rowsInCurrentRange_{0};
raw_vector<vector_size_t> rows_;

// First index of 'rows_' that is not appended to 'current_'.
vector_size_t rowIdx_{0};

// The current stream where the input is serialized to. This is cleared on
// every flush() call.
std::unique_ptr<VectorStreamGroup> current_;
Expand Down Expand Up @@ -208,10 +203,6 @@ class PartitionedOutput : public Operator {
BlockingReason blockingReason_{BlockingReason::kNotBlocked};
ContinueFuture future_;
bool finished_{false};
// top-level row numbers used as input to
// VectorStreamGroup::estimateSerializedSize member variable is used to avoid
// re-allocating memory
std::vector<IndexRange> topLevelRanges_;
std::vector<vector_size_t*> sizePointers_;
std::vector<vector_size_t> rowSize_;
std::vector<std::unique_ptr<detail::Destination>> destinations_;
Expand All @@ -223,6 +214,7 @@ class PartitionedOutput : public Operator {
SelectivityVector nullRows_;
std::vector<uint32_t> partitions_;
std::vector<DecodedVector> decodedVectors_;
Scratch scratch_;
};

} // namespace facebook::velox::exec
5 changes: 4 additions & 1 deletion velox/exec/tests/OutputBufferManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ class OutputBufferManagerTest : public testing::Test {
auto range = IndexRange{0, size};
data->createStreamTree(
std::dynamic_pointer_cast<const RowType>(vector->type()), size);
Scratch scratch;
data->append(
std::dynamic_pointer_cast<RowVector>(vector), folly::Range(&range, 1));
std::dynamic_pointer_cast<RowVector>(vector),
folly::Range(&range, 1),
scratch);
auto listener = bufferManager_->newListener();
IOBufOutputStream stream(*pool_, listener.get(), data->size());
data->flush(&stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,12 @@ class MaxSizeForStatsAggregate
return ++i < numToProcess;
});

Scratch scratch;
getVectorSerde()->estimateSerializedSize(
vector,
folly::Range(elementIndices_.data(), elementIndices_.size()),
elementSizePtrs_.data());
elementSizePtrs_.data(),
scratch);
}

void doUpdateSingleGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,12 @@ class SumDataSizeForStatsAggregate
return ++i < numRowsToProcess;
});

Scratch scratch;
getVectorSerde()->estimateSerializedSize(
vector,
folly::Range(rowIndices_.data(), rowIndices_.size()),
rowSizePtrs_.data());
rowSizePtrs_.data(),
scratch);
}

void doUpdateSingleGroup(
Expand Down
3 changes: 2 additions & 1 deletion velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2522,7 +2522,8 @@ void estimateFlatSerializedSizeVarcharOrVarbinary(
bytes += sizeof(int32_t) + rawValues[offset].size();
}
}
*(sizes[i]) += bytes + bits::nbytes(numNulls) + 4 * numNulls;
*(sizes[i]) +=
bytes + bits::nbytes(ranges[i].size) + 4 * (ranges[i].size - numNulls);
}
}

Expand Down
123 changes: 123 additions & 0 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,129 @@ TEST_P(PrestoSerializerTest, emptyArrayOfRowVector) {
testRoundTrip(arrayOfRow);
}

struct SerializeCase {
int32_t nullPct;
int32_t numSelected;
int32_t bits;
int32_t vectorSize;
uint64_t irTime{0};
uint64_t rrTime{0};

std::string toString() {
return fmt::format(
"{} of {} {} bit {}%null: {} ir / {} rr",
numSelected,
vectorSize,
bits,
nullPct,
irTime,
rrTime);
}
};

TEST_P(PrestoSerializerTest, timeFlat) {
// Serialize different fractions of a 10K vector of int32_t and int64_t with
// IndexRange and row range variants with and without nulls.
constexpr int32_t kPad = 8;
std::vector<int32_t> numSelectedValues = {
3
#if ALL_CASES
,
30,
300,
10000
#endif
};
std::vector<std::vector<IndexRange>> indexRanges;
std::vector<std::vector<vector_size_t>> rowSets;
std::vector<int32_t> nullPctValues = {0, 1, 10, 90};
std::vector<int32_t> bitsValues = {32, 64};
const int32_t vectorSize = 10000;
for (auto numSelected : numSelectedValues) {
std::vector<IndexRange> ir;
std::vector<vector_size_t> rr;
int32_t step = vectorSize / numSelected;
for (auto r = 0; r < vectorSize; r += step) {
ir.push_back(IndexRange{r, 1});
rr.push_back(r);
}
std::cout << rr.size();
indexRanges.push_back(std::move(ir));
rr.resize(rr.size() + kPad, 999999999);
rowSets.push_back(std::move(rr));
}
VectorMaker vm(pool_.get());
std::vector<VectorPtr> v32s;
std::vector<VectorPtr> v64s;
for (auto nullPct : nullPctValues) {
auto v32 = vm.flatVector<int32_t>(
vectorSize,
[&](auto row) { return row; },
[&](auto row) { return nullPct == 0 ? false : row % 100 < nullPct; });
auto v64 = vm.flatVector<int64_t>(
vectorSize,
[&](auto row) { return row; },
[&](auto row) { return nullPct == 0 ? false : row % 100 < nullPct; });
v32s.push_back(std::move(v32));
v64s.push_back(std::move(v64));
}
std::vector<SerializeCase> cases;
Scratch scratch;

auto runCase = [&](int32_t nullIdx, int32_t selIdx, int32_t bits) {
SerializeCase item;
item.vectorSize = vectorSize;
item.nullPct = nullPctValues[nullIdx];
item.numSelected = numSelectedValues[selIdx];
item.bits = bits;
int32_t numRepeat = 500 * vectorSize / indexRanges[selIdx].size();

VectorPtr vector = bits == 32 ? v32s[nullIdx] : v64s[nullIdx];
auto rowType = ROW({vector->type()});
auto rowVector = vm.rowVector({vector});
{
#if ALL_CASES
MicrosecondTimer t(&item.irTime);
auto group = std::make_unique<VectorStreamGroup>(pool_.get());
group->createStreamTree(rowType, rowSets[selIdx].size() - kPad);
for (auto repeat = 0; repeat < numRepeat; ++repeat) {
group->append(
rowVector,
folly::Range(
indexRanges[selIdx].data(), indexRanges[selIdx].size()),
scratch);
}
#endif
}

{
MicrosecondTimer t(&item.rrTime);
auto group = std::make_unique<VectorStreamGroup>(pool_.get());
group->createStreamTree(rowType, rowSets[selIdx].size());

for (auto repeat = 0; repeat < numRepeat; ++repeat) {
group->append(
rowVector,
folly::Range(rowSets[selIdx].data(), rowSets[selIdx].size() - kPad),
scratch);
}
}
return item;
};

for (auto bits : bitsValues) {
for (auto nullIdx = 0; nullIdx < nullPctValues.size(); ++nullIdx) {
for (auto selIdx = 0; selIdx < numSelectedValues.size(); ++selIdx) {
int32_t numRepeat = 10 / numSelectedValues[selIdx];
cases.push_back(runCase(nullIdx, selIdx, bits));
}
}
}
for (auto& item : cases) {
std::cout << item.toString() << std::endl;
}
}

TEST_P(PrestoSerializerTest, typeMismatch) {
auto data = makeRowVector({
makeFlatVector<int64_t>({1, 2, 3}),
Expand Down

0 comments on commit 188ce94

Please sign in to comment.