diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index dc9b41793cf54..774d0df54b48e 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -19,10 +19,9 @@ #pragma once -#include - #include #include +#include #include "arrow/util/bit_util.h" #include "arrow/util/bpacking.h" @@ -153,7 +152,7 @@ class BitReader { /// 'num_bytes'. The value is assumed to be byte-aligned so the stream will /// be advanced to the start of the next byte before 'v' is read. Returns /// false if there are not enough bytes left. - /// Assume the v was stored in buffer_ as a litte-endian format + /// Assume the v was stored in buffer_ as a little-endian format template bool GetAligned(int num_bytes, T* v); @@ -318,7 +317,7 @@ inline bool BitReader::GetValue(int num_bits, T* v) { template inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) { DCHECK(buffer_ != NULL); - DCHECK_LE(num_bits, static_cast(sizeof(T) * 8)); + DCHECK_LE(num_bits, static_cast(sizeof(T) * 8)) << "num_bits: " << num_bits; int bit_offset = bit_offset_; int byte_offset = byte_offset_; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index b9e6619a17796..dc0edb1c8014d 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2377,8 +2377,8 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(total_value_count_); + // total_values_remaining_ in header ignores of null values + return static_cast(total_values_remaining_); } int Decode(T* buffer, int max_values) override { @@ -2420,7 +2420,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderGetVlqInt(&mini_blocks_per_block_) || !decoder_->GetVlqInt(&total_value_count_) || !decoder_->GetZigZagVlqInt(&last_value_)) { - ParquetException::EofException(); + ParquetException::EofException("InitHeader EOF"); } if (values_per_block_ == 0) { @@ -2444,42 +2444,52 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderGetZigZagVlqInt(&min_delta_)) ParquetException::EofException(); + DCHECK_GT(total_values_remaining_, 0) << "InitBlock called at EOF"; + + if (!decoder_->GetZigZagVlqInt(&min_delta_)) + ParquetException::EofException("InitBlock EOF"); // read the bitwidth of each miniblock uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) { if (!decoder_->GetAligned(1, bit_width_data + i)) { - ParquetException::EofException(); - } - if (bit_width_data[i] > kMaxDeltaBitWidth) { - throw ParquetException("delta bit width " + std::to_string(bit_width_data[i]) + - " larger than integer bit width " + - std::to_string(kMaxDeltaBitWidth)); + ParquetException::EofException("Decode bit-width EOF"); } + // Note that non-conformant bitwidth entries are allowed by the Parquet spec + // for extraneous miniblocks in the last block (GH-14923), so we check + // the bitwidths when actually using them (see InitMiniBlock()). } + mini_block_idx_ = 0; - delta_bit_width_ = bit_width_data[0]; - values_current_mini_block_ = values_per_mini_block_; - block_initialized_ = true; + first_block_initialized_ = true; + InitMiniBlock(bit_width_data[0]); + } + + void InitMiniBlock(int bit_width) { + if (ARROW_PREDICT_FALSE(bit_width > kMaxDeltaBitWidth)) { + throw ParquetException("delta bit width larger than integer bit width"); + } + delta_bit_width_ = bit_width; + values_remaining_current_mini_block_ = values_per_mini_block_; } int GetInternal(T* buffer, int max_values) { - max_values = static_cast(std::min(max_values, total_value_count_)); + max_values = static_cast(std::min(max_values, total_values_remaining_)); if (max_values == 0) { return 0; } int i = 0; while (i < max_values) { - if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) { - if (ARROW_PREDICT_FALSE(!block_initialized_)) { + if (ARROW_PREDICT_FALSE(values_remaining_current_mini_block_ == 0)) { + if (ARROW_PREDICT_FALSE(!first_block_initialized_)) { buffer[i++] = last_value_; DCHECK_EQ(i, 1); // we're at the beginning of the page if (ARROW_PREDICT_FALSE(i == max_values)) { @@ -2499,16 +2509,15 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderdata()[mini_block_idx_]; - values_current_mini_block_ = values_per_mini_block_; + InitMiniBlock(delta_bit_widths_->data()[mini_block_idx_]); } else { InitBlock(); } } } - int values_decode = - std::min(values_current_mini_block_, static_cast(max_values - i)); + int values_decode = std::min(values_remaining_current_mini_block_, + static_cast(max_values - i)); if (decoder_->GetBatch(delta_bit_width_, buffer + i, values_decode) != values_decode) { ParquetException::EofException(); @@ -2520,19 +2529,19 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(last_value_); last_value_ = buffer[i + j]; } - values_current_mini_block_ -= values_decode; + values_remaining_current_mini_block_ -= values_decode; i += values_decode; } - total_value_count_ -= max_values; + total_values_remaining_ -= max_values; this->num_values_ -= max_values; - if (ARROW_PREDICT_FALSE(total_value_count_ == 0)) { - uint32_t padding_bits = values_current_mini_block_ * delta_bit_width_; + if (ARROW_PREDICT_FALSE(total_values_remaining_ == 0)) { + uint32_t padding_bits = values_remaining_current_mini_block_ * delta_bit_width_; // skip the padding bits if (!decoder_->Advance(padding_bits)) { ParquetException::EofException(); } - values_current_mini_block_ = 0; + values_remaining_current_mini_block_ = 0; } return max_values; } @@ -2542,10 +2551,16 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder delta_bit_widths_; diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index b8363d29cdb25..a0e3fe9545d7b 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -35,6 +35,7 @@ #include "arrow/util/bitmap_writer.h" #include "arrow/util/checked_cast.h" #include "arrow/util/endian.h" +#include "arrow/util/string.h" #include "parquet/encoding.h" #include "parquet/platform.h" @@ -1325,29 +1326,32 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { CheckRoundtripSpaced(valid_bits, valid_bits_offset); } - void CheckRoundtrip() override { - auto encoder = - MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); + void CheckDecoding() { auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); auto read_batch_sizes = kReadBatchSizes; read_batch_sizes.push_back(num_values_); + // Exercise different batch sizes + for (const int read_batch_size : read_batch_sizes) { + decoder->SetData(num_values_, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + + int values_decoded = 0; + while (values_decoded < num_values_) { + values_decoded += decoder->Decode(decode_buf_ + values_decoded, read_batch_size); + } + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); + } + } + + void CheckRoundtrip() override { + auto encoder = + MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); // Encode a number of times to exercise the flush logic for (size_t i = 0; i < kNumRoundTrips; ++i) { encoder->Put(draws_, num_values_); encode_buffer_ = encoder->FlushValues(); - // Exercise different batch sizes - for (const int read_batch_size : read_batch_sizes) { - decoder->SetData(num_values_, encode_buffer_->data(), - static_cast(encode_buffer_->size())); - - int values_decoded = 0; - while (values_decoded < num_values_) { - values_decoded += - decoder->Decode(decode_buf_ + values_decoded, read_batch_size); - } - ASSERT_EQ(num_values_, values_decoded); - ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); - } + CheckDecoding(); } } @@ -1398,6 +1402,7 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { ASSERT_NO_FATAL_FAILURE( this->Execute((values_per_mini_block * values_per_block) + 1, 10)); ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0)); + ASSERT_NO_FATAL_FAILURE(this->Execute(65, 1)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_probability*/ 0.1)); @@ -1427,5 +1432,66 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { } } +TYPED_TEST(TestDeltaBitPackEncoding, NonZeroPaddedMiniblockBitWidth) { + // GH-14923: depending on the number of encoded values, some of the miniblock + // bitwidths are actually padding bytes that may take non-conformant values + // according to the Parquet spec. + + // Same values as in DeltaBitPackEncoder + constexpr int kValuesPerBlock = 128; + constexpr int kMiniBlocksPerBlock = 4; + constexpr int kValuesPerMiniBlock = kValuesPerBlock / kMiniBlocksPerBlock; + + // num_values must be kept small enough for kHeaderLength below + for (const int num_values : {2, 62, 63, 64, 65, 95, 96, 97, 127}) { + ARROW_SCOPED_TRACE("num_values = ", num_values); + + // Generate input data with a small half_range to make the header length + // deterministic (see kHeaderLength). + this->InitBoundData(num_values, /*repeats=*/1, /*half_range=*/31); + ASSERT_EQ(this->num_values_, num_values); + + auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, + this->descr_.get()); + encoder->Put(this->draws_, this->num_values_); + auto encoded = encoder->FlushValues(); + const auto encoded_size = encoded->size(); + + // Make mutable copy of encoded buffer + this->encode_buffer_ = AllocateBuffer(default_memory_pool(), encoded_size); + uint8_t* data = this->encode_buffer_->mutable_data(); + memcpy(data, encoded->data(), encoded_size); + + // The number of padding bytes at the end of the miniblock bitwidths array. + // We subtract 1 from num_values because the first data value is encoded + // in the header, thus does not participate in miniblock encoding. + const int num_padding_bytes = + (kValuesPerBlock - num_values + 1) / kValuesPerMiniBlock; + ARROW_SCOPED_TRACE("num_padding_bytes = ", num_padding_bytes); + + // The header length is: + // - 2 bytes for ULEB128-encoded block size (== kValuesPerBlock) + // - 1 byte for ULEB128-encoded miniblocks per block (== kMiniBlocksPerBlock) + // - 1 byte for ULEB128-encoded num_values + // - 1 byte for ULEB128-encoded first value + // (this assumes that num_values and the first value are narrow enough) + constexpr int kHeaderLength = 5; + // After the header, there is a zigzag ULEB128-encoded min delta for the first block, + // then the miniblock bitwidths for the first block. + // Given a narrow enough range, the zigzag ULEB128-encoded min delta is 1 byte long. + uint8_t* mini_block_bitwidths = data + kHeaderLength + 1; + + // Garble padding bytes; decoding should succeed. + for (int i = 0; i < num_padding_bytes; ++i) { + mini_block_bitwidths[kMiniBlocksPerBlock - i - 1] = 0xFFU; + } + ASSERT_NO_THROW(this->CheckDecoding()); + + // Not a padding byte but an actual miniblock bitwidth; decoding should error out. + mini_block_bitwidths[kMiniBlocksPerBlock - num_padding_bytes - 1] = 0xFFU; + EXPECT_THROW(this->CheckDecoding(), ParquetException); + } +} + } // namespace test } // namespace parquet