Skip to content

Commit

Permalink
GH-14923: [C++][Parquet] Fix DELTA_BINARY_PACKED problem on reading t…
Browse files Browse the repository at this point in the history
…he last block with malford bit-width (#15241)

Problem is mentioned here: #14923

This patch fixes that issue. And the code is a bit complex.
- Rename variables, since original name is confusing for me. 
  - `block_initialized_` -> `first_block_initialized_`, because it's a mask that indicates that if the first block in page is initialized.
  - `total_value_count_` -> `total_values_remaining_`. Because it's not `total values within a page`, it means `remaing values to be decoded within a page`
  - `values_count_current_mini_block_` -> `values_remaining_current_mini_block_`, ditto
- Add variables
  - `total_value_count_`: the total value numbers within a page.
- Change Syntax
  -  Change `InitBlock()` to `InitBlock()` and `InitMiniBlock`
- Implemention, most logic is in `InitBlock()` and `InitMiniBlock`
- Testing. Thanks @ rok. I use a page within 65 values with bitwidth `32 32 165 165`.

And personally, I use the code here for testing:

```c++
   for (uint32_t i = num_miniblocks; i < mini_blocks_per_block_; i++) {
-    bit_width_data[i] = 0;
+    //    bit_width_data[i] = 0;
+    bit_width_data[i] = static_cast<uint8_t>(random());
   }
```

The code works well in both debug and release mode.

* Closes: #14923

Lead-authored-by: mwish <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
mapleFU and pitrou authored Jan 18, 2023
1 parent 4e439f6 commit e837f73
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 49 deletions.
7 changes: 3 additions & 4 deletions cpp/src/arrow/util/bit_stream_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

#pragma once

#include <string.h>

#include <algorithm>
#include <cstdint>
#include <cstring>

#include "arrow/util/bit_util.h"
#include "arrow/util/bpacking.h"
Expand Down Expand Up @@ -153,7 +152,7 @@ class BitReader {
/// 'num_bytes'. The value is assumed to be byte-aligned so the stream will
/// be advanced to the start of the next byte before 'v' is read. Returns
/// false if there are not enough bytes left.
/// Assume the v was stored in buffer_ as a litte-endian format
/// Assume the v was stored in buffer_ as a little-endian format
template <typename T>
bool GetAligned(int num_bytes, T* v);

Expand Down Expand Up @@ -318,7 +317,7 @@ inline bool BitReader::GetValue(int num_bits, T* v) {
template <typename T>
inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) {
DCHECK(buffer_ != NULL);
DCHECK_LE(num_bits, static_cast<int>(sizeof(T) * 8));
DCHECK_LE(num_bits, static_cast<int>(sizeof(T) * 8)) << "num_bits: " << num_bits;

int bit_offset = bit_offset_;
int byte_offset = byte_offset_;
Expand Down
73 changes: 44 additions & 29 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2377,8 +2377,8 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
}

int ValidValuesCount() {
// total_value_count_ in header ignores of null values
return static_cast<int>(total_value_count_);
// total_values_remaining_ in header ignores of null values
return static_cast<int>(total_values_remaining_);
}

int Decode(T* buffer, int max_values) override {
Expand Down Expand Up @@ -2420,7 +2420,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
!decoder_->GetVlqInt(&mini_blocks_per_block_) ||
!decoder_->GetVlqInt(&total_value_count_) ||
!decoder_->GetZigZagVlqInt(&last_value_)) {
ParquetException::EofException();
ParquetException::EofException("InitHeader EOF");
}

if (values_per_block_ == 0) {
Expand All @@ -2444,42 +2444,52 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
std::to_string(values_per_mini_block_));
}

total_values_remaining_ = total_value_count_;
delta_bit_widths_ = AllocateBuffer(pool_, mini_blocks_per_block_);
block_initialized_ = false;
values_current_mini_block_ = 0;
first_block_initialized_ = false;
values_remaining_current_mini_block_ = 0;
}

void InitBlock() {
if (!decoder_->GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
DCHECK_GT(total_values_remaining_, 0) << "InitBlock called at EOF";

if (!decoder_->GetZigZagVlqInt(&min_delta_))
ParquetException::EofException("InitBlock EOF");

// read the bitwidth of each miniblock
uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) {
if (!decoder_->GetAligned<uint8_t>(1, bit_width_data + i)) {
ParquetException::EofException();
}
if (bit_width_data[i] > kMaxDeltaBitWidth) {
throw ParquetException("delta bit width " + std::to_string(bit_width_data[i]) +
" larger than integer bit width " +
std::to_string(kMaxDeltaBitWidth));
ParquetException::EofException("Decode bit-width EOF");
}
// Note that non-conformant bitwidth entries are allowed by the Parquet spec
// for extraneous miniblocks in the last block (GH-14923), so we check
// the bitwidths when actually using them (see InitMiniBlock()).
}

mini_block_idx_ = 0;
delta_bit_width_ = bit_width_data[0];
values_current_mini_block_ = values_per_mini_block_;
block_initialized_ = true;
first_block_initialized_ = true;
InitMiniBlock(bit_width_data[0]);
}

void InitMiniBlock(int bit_width) {
if (ARROW_PREDICT_FALSE(bit_width > kMaxDeltaBitWidth)) {
throw ParquetException("delta bit width larger than integer bit width");
}
delta_bit_width_ = bit_width;
values_remaining_current_mini_block_ = values_per_mini_block_;
}

int GetInternal(T* buffer, int max_values) {
max_values = static_cast<int>(std::min<int64_t>(max_values, total_value_count_));
max_values = static_cast<int>(std::min<int64_t>(max_values, total_values_remaining_));
if (max_values == 0) {
return 0;
}

int i = 0;
while (i < max_values) {
if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) {
if (ARROW_PREDICT_FALSE(!block_initialized_)) {
if (ARROW_PREDICT_FALSE(values_remaining_current_mini_block_ == 0)) {
if (ARROW_PREDICT_FALSE(!first_block_initialized_)) {
buffer[i++] = last_value_;
DCHECK_EQ(i, 1); // we're at the beginning of the page
if (ARROW_PREDICT_FALSE(i == max_values)) {
Expand All @@ -2499,16 +2509,15 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
} else {
++mini_block_idx_;
if (mini_block_idx_ < mini_blocks_per_block_) {
delta_bit_width_ = delta_bit_widths_->data()[mini_block_idx_];
values_current_mini_block_ = values_per_mini_block_;
InitMiniBlock(delta_bit_widths_->data()[mini_block_idx_]);
} else {
InitBlock();
}
}
}

int values_decode =
std::min(values_current_mini_block_, static_cast<uint32_t>(max_values - i));
int values_decode = std::min(values_remaining_current_mini_block_,
static_cast<uint32_t>(max_values - i));
if (decoder_->GetBatch(delta_bit_width_, buffer + i, values_decode) !=
values_decode) {
ParquetException::EofException();
Expand All @@ -2520,19 +2529,19 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
static_cast<UT>(last_value_);
last_value_ = buffer[i + j];
}
values_current_mini_block_ -= values_decode;
values_remaining_current_mini_block_ -= values_decode;
i += values_decode;
}
total_value_count_ -= max_values;
total_values_remaining_ -= max_values;
this->num_values_ -= max_values;

if (ARROW_PREDICT_FALSE(total_value_count_ == 0)) {
uint32_t padding_bits = values_current_mini_block_ * delta_bit_width_;
if (ARROW_PREDICT_FALSE(total_values_remaining_ == 0)) {
uint32_t padding_bits = values_remaining_current_mini_block_ * delta_bit_width_;
// skip the padding bits
if (!decoder_->Advance(padding_bits)) {
ParquetException::EofException();
}
values_current_mini_block_ = 0;
values_remaining_current_mini_block_ = 0;
}
return max_values;
}
Expand All @@ -2542,10 +2551,16 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
uint32_t values_per_block_;
uint32_t mini_blocks_per_block_;
uint32_t values_per_mini_block_;
uint32_t values_current_mini_block_;
uint32_t total_value_count_;

bool block_initialized_;
uint32_t total_values_remaining_;
// Remaining values in current mini block. If the current block is the last mini block,
// values_remaining_current_mini_block_ may greater than total_values_remaining_.
uint32_t values_remaining_current_mini_block_;

// If the page doesn't contain any block, `first_block_initialized_` will
// always be false. Otherwise, it will be true when first block initialized.
bool first_block_initialized_;
T min_delta_;
uint32_t mini_block_idx_;
std::shared_ptr<ResizableBuffer> delta_bit_widths_;
Expand Down
98 changes: 82 additions & 16 deletions cpp/src/parquet/encoding_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "arrow/util/bitmap_writer.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/endian.h"
#include "arrow/util/string.h"

#include "parquet/encoding.h"
#include "parquet/platform.h"
Expand Down Expand Up @@ -1325,29 +1326,32 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
CheckRoundtripSpaced(valid_bits, valid_bits_offset);
}

void CheckRoundtrip() override {
auto encoder =
MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
void CheckDecoding() {
auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
auto read_batch_sizes = kReadBatchSizes;
read_batch_sizes.push_back(num_values_);
// Exercise different batch sizes
for (const int read_batch_size : read_batch_sizes) {
decoder->SetData(num_values_, encode_buffer_->data(),
static_cast<int>(encode_buffer_->size()));

int values_decoded = 0;
while (values_decoded < num_values_) {
values_decoded += decoder->Decode(decode_buf_ + values_decoded, read_batch_size);
}
ASSERT_EQ(num_values_, values_decoded);
ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
}
}

void CheckRoundtrip() override {
auto encoder =
MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
// Encode a number of times to exercise the flush logic
for (size_t i = 0; i < kNumRoundTrips; ++i) {
encoder->Put(draws_, num_values_);
encode_buffer_ = encoder->FlushValues();
// Exercise different batch sizes
for (const int read_batch_size : read_batch_sizes) {
decoder->SetData(num_values_, encode_buffer_->data(),
static_cast<int>(encode_buffer_->size()));

int values_decoded = 0;
while (values_decoded < num_values_) {
values_decoded +=
decoder->Decode(decode_buf_ + values_decoded, read_batch_size);
}
ASSERT_EQ(num_values_, values_decoded);
ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
}
CheckDecoding();
}
}

Expand Down Expand Up @@ -1398,6 +1402,7 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
ASSERT_NO_FATAL_FAILURE(
this->Execute((values_per_mini_block * values_per_block) + 1, 10));
ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
ASSERT_NO_FATAL_FAILURE(this->Execute(65, 1));
ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
/*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
/*null_probability*/ 0.1));
Expand Down Expand Up @@ -1427,5 +1432,66 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
}
}

TYPED_TEST(TestDeltaBitPackEncoding, NonZeroPaddedMiniblockBitWidth) {
// GH-14923: depending on the number of encoded values, some of the miniblock
// bitwidths are actually padding bytes that may take non-conformant values
// according to the Parquet spec.

// Same values as in DeltaBitPackEncoder
constexpr int kValuesPerBlock = 128;
constexpr int kMiniBlocksPerBlock = 4;
constexpr int kValuesPerMiniBlock = kValuesPerBlock / kMiniBlocksPerBlock;

// num_values must be kept small enough for kHeaderLength below
for (const int num_values : {2, 62, 63, 64, 65, 95, 96, 97, 127}) {
ARROW_SCOPED_TRACE("num_values = ", num_values);

// Generate input data with a small half_range to make the header length
// deterministic (see kHeaderLength).
this->InitBoundData(num_values, /*repeats=*/1, /*half_range=*/31);
ASSERT_EQ(this->num_values_, num_values);

auto encoder = MakeTypedEncoder<TypeParam>(Encoding::DELTA_BINARY_PACKED, false,
this->descr_.get());
encoder->Put(this->draws_, this->num_values_);
auto encoded = encoder->FlushValues();
const auto encoded_size = encoded->size();

// Make mutable copy of encoded buffer
this->encode_buffer_ = AllocateBuffer(default_memory_pool(), encoded_size);
uint8_t* data = this->encode_buffer_->mutable_data();
memcpy(data, encoded->data(), encoded_size);

// The number of padding bytes at the end of the miniblock bitwidths array.
// We subtract 1 from num_values because the first data value is encoded
// in the header, thus does not participate in miniblock encoding.
const int num_padding_bytes =
(kValuesPerBlock - num_values + 1) / kValuesPerMiniBlock;
ARROW_SCOPED_TRACE("num_padding_bytes = ", num_padding_bytes);

// The header length is:
// - 2 bytes for ULEB128-encoded block size (== kValuesPerBlock)
// - 1 byte for ULEB128-encoded miniblocks per block (== kMiniBlocksPerBlock)
// - 1 byte for ULEB128-encoded num_values
// - 1 byte for ULEB128-encoded first value
// (this assumes that num_values and the first value are narrow enough)
constexpr int kHeaderLength = 5;
// After the header, there is a zigzag ULEB128-encoded min delta for the first block,
// then the miniblock bitwidths for the first block.
// Given a narrow enough range, the zigzag ULEB128-encoded min delta is 1 byte long.
uint8_t* mini_block_bitwidths = data + kHeaderLength + 1;

// Garble padding bytes; decoding should succeed.
for (int i = 0; i < num_padding_bytes; ++i) {
mini_block_bitwidths[kMiniBlocksPerBlock - i - 1] = 0xFFU;
}
ASSERT_NO_THROW(this->CheckDecoding());

// Not a padding byte but an actual miniblock bitwidth; decoding should error out.
mini_block_bitwidths[kMiniBlocksPerBlock - num_padding_bytes - 1] = 0xFFU;
EXPECT_THROW(this->CheckDecoding(), ParquetException);
}
}

} // namespace test
} // namespace parquet

0 comments on commit e837f73

Please sign in to comment.