From eba55e090fa03c928b0ec062097a657df222a43d Mon Sep 17 00:00:00 2001 From: "Rossi(Ruoxi) Sun" Date: Thu, 18 Jan 2024 19:44:26 +0800 Subject: [PATCH] GH-39332: [C++] Explicit error in ExecBatchBuilder when appending var length data exceeds offset limit (int32 max) (#39383) ### Rationale for this change When appending var length data in `ExecBatchBuilder`, the offset is potentially to overflow if the batch contains 4GB data or more. This may further result in segmentation fault during the subsequent data content copying. For details, please refer to this comment: https://github.com/apache/arrow/issues/39332#issuecomment-1870690063. The solution is let user to use the "large" counterpart data type to avoid the overflow, but we may need explicit error information when such overflow happens. ### What changes are included in this PR? 1. Detect the offset overflow in appending data in `ExecBatchBuilder` and explicitly throw. 2. Change the offset type from `uint32_t` to `int32_t` in `ExecBatchBuilder` and respects the `BinaryBuilder::memory_limit()` which is `2GB - 2B` as the rest part of the codebase. ### Are these changes tested? UT included. ### Are there any user-facing changes? No. * Closes: #39332 Lead-authored-by: zanmato Co-authored-by: Antoine Pitrou Co-authored-by: Rossi(Ruoxi) Sun Co-authored-by: Antoine Pitrou Signed-off-by: Antoine Pitrou --- cpp/src/arrow/compute/light_array.cc | 47 ++++++++++------- cpp/src/arrow/compute/light_array.h | 2 +- cpp/src/arrow/compute/light_array_test.cc | 64 +++++++++++++++++++++++ cpp/src/arrow/testing/generator.cc | 9 +++- 4 files changed, 100 insertions(+), 22 deletions(-) diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc index 66d8477b029a0..b225e04b05cea 100644 --- a/cpp/src/arrow/compute/light_array.cc +++ b/cpp/src/arrow/compute/light_array.cc @@ -20,6 +20,8 @@ #include #include "arrow/util/bitmap_ops.h" +#include "arrow/util/int_util_overflow.h" +#include "arrow/util/macros.h" namespace arrow { namespace compute { @@ -325,11 +327,10 @@ Status ResizableArrayData::ResizeVaryingLengthBuffer() { column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie(); if (!column_metadata.is_fixed_length) { - int min_new_size = static_cast(reinterpret_cast( - buffers_[kFixedLengthBuffer]->data())[num_rows_]); + int64_t min_new_size = buffers_[kFixedLengthBuffer]->data_as()[num_rows_]; ARROW_DCHECK(var_len_buf_size_ > 0); if (var_len_buf_size_ < min_new_size) { - int new_size = var_len_buf_size_; + int64_t new_size = var_len_buf_size_; while (new_size < min_new_size) { new_size *= 2; } @@ -465,12 +466,11 @@ void ExecBatchBuilder::Visit(const std::shared_ptr& column, int num_r if (!metadata.is_fixed_length) { const uint8_t* ptr_base = column->buffers[2]->data(); - const uint32_t* offsets = - reinterpret_cast(column->buffers[1]->data()) + column->offset; + const int32_t* offsets = column->GetValues(1); for (int i = 0; i < num_rows; ++i) { uint16_t row_id = row_ids[i]; const uint8_t* field_ptr = ptr_base + offsets[row_id]; - uint32_t field_length = offsets[row_id + 1] - offsets[row_id]; + int32_t field_length = offsets[row_id + 1] - offsets[row_id]; process_value_fn(i, field_ptr, field_length); } } else { @@ -480,7 +480,7 @@ void ExecBatchBuilder::Visit(const std::shared_ptr& column, int num_r const uint8_t* field_ptr = column->buffers[1]->data() + (column->offset + row_id) * static_cast(metadata.fixed_length); - process_value_fn(i, field_ptr, metadata.fixed_length); + process_value_fn(i, field_ptr, static_cast(metadata.fixed_length)); } } } @@ -511,14 +511,14 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source break; case 1: Visit(source, num_rows_to_append, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { target->mutable_data(1)[num_rows_before + i] = *ptr; }); break; case 2: Visit( source, num_rows_to_append, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { reinterpret_cast(target->mutable_data(1))[num_rows_before + i] = *reinterpret_cast(ptr); }); @@ -526,7 +526,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source case 4: Visit( source, num_rows_to_append, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { reinterpret_cast(target->mutable_data(1))[num_rows_before + i] = *reinterpret_cast(ptr); }); @@ -534,7 +534,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source case 8: Visit( source, num_rows_to_append, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { reinterpret_cast(target->mutable_data(1))[num_rows_before + i] = *reinterpret_cast(ptr); }); @@ -544,7 +544,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source num_rows_to_append - NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t)); Visit(source, num_rows_to_process, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { uint64_t* dst = reinterpret_cast( target->mutable_data(1) + static_cast(num_bytes) * (num_rows_before + i)); @@ -558,7 +558,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source if (num_rows_to_append > num_rows_to_process) { Visit(source, num_rows_to_append - num_rows_to_process, row_ids + num_rows_to_process, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { uint64_t* dst = reinterpret_cast( target->mutable_data(1) + static_cast(num_bytes) * @@ -575,16 +575,23 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source // Step 1: calculate target offsets // - uint32_t* offsets = reinterpret_cast(target->mutable_data(1)); - uint32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before]; + int32_t* offsets = reinterpret_cast(target->mutable_data(1)); + int32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before]; Visit(source, num_rows_to_append, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { offsets[num_rows_before + i] = num_bytes; }); for (int i = 0; i < num_rows_to_append; ++i) { - uint32_t length = offsets[num_rows_before + i]; + int32_t length = offsets[num_rows_before + i]; offsets[num_rows_before + i] = sum; - sum += length; + int32_t new_sum_maybe_overflow = 0; + if (ARROW_PREDICT_FALSE( + arrow::internal::AddWithOverflow(sum, length, &new_sum_maybe_overflow))) { + return Status::Invalid("Overflow detected in ExecBatchBuilder when appending ", + num_rows_before + i + 1, "-th element of length ", length, + " bytes to current length ", sum, " bytes"); + } + sum = new_sum_maybe_overflow; } offsets[num_rows_before + num_rows_to_append] = sum; @@ -598,7 +605,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source num_rows_to_append - NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t)); Visit(source, num_rows_to_process, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { uint64_t* dst = reinterpret_cast(target->mutable_data(2) + offsets[num_rows_before + i]); const uint64_t* src = reinterpret_cast(ptr); @@ -608,7 +615,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source } }); Visit(source, num_rows_to_append - num_rows_to_process, row_ids + num_rows_to_process, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { uint64_t* dst = reinterpret_cast( target->mutable_data(2) + offsets[num_rows_before + num_rows_to_process + i]); diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h index 84aa86d64bb62..67de71bf56c92 100644 --- a/cpp/src/arrow/compute/light_array.h +++ b/cpp/src/arrow/compute/light_array.h @@ -353,7 +353,7 @@ class ARROW_EXPORT ResizableArrayData { MemoryPool* pool_; int num_rows_; int num_rows_allocated_; - int var_len_buf_size_; + int64_t var_len_buf_size_; static constexpr int kMaxBuffers = 3; std::shared_ptr buffers_[kMaxBuffers]; }; diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc index d50e9675517c3..ecc5f3ad37931 100644 --- a/cpp/src/arrow/compute/light_array_test.cc +++ b/cpp/src/arrow/compute/light_array_test.cc @@ -407,6 +407,70 @@ TEST(ExecBatchBuilder, AppendValuesBeyondLimit) { ASSERT_EQ(0, pool->bytes_allocated()); } +TEST(ExecBatchBuilder, AppendVarLengthBeyondLimit) { + // GH-39332: check appending variable-length data past 2GB. + if constexpr (sizeof(void*) == 4) { + GTEST_SKIP() << "Test only works on 64-bit platforms"; + } + + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + MemoryPool* pool = owned_pool.get(); + constexpr auto eight_mb = 8 * 1024 * 1024; + constexpr auto eight_mb_minus_one = eight_mb - 1; + // String of size 8mb to repetitively fill the heading multiple of 8mbs of an array + // of int32_max bytes. + std::string str_8mb(eight_mb, 'a'); + // String of size (8mb - 1) to be the last element of an array of int32_max bytes. + std::string str_8mb_minus_1(eight_mb_minus_one, 'b'); + std::shared_ptr values_8mb = ConstantArrayGenerator::String(1, str_8mb); + std::shared_ptr values_8mb_minus_1 = + ConstantArrayGenerator::String(1, str_8mb_minus_1); + + ExecBatch batch_8mb({values_8mb}, 1); + ExecBatch batch_8mb_minus_1({values_8mb_minus_1}, 1); + + auto num_rows = std::numeric_limits::max() / eight_mb; + std::vector body_row_ids(num_rows, 0); + std::vector tail_row_id(1, 0); + + { + // Building an array of (int32_max + 1) = (8mb * num_rows + 8mb) bytes should raise an + // error of overflow. + ExecBatchBuilder builder; + ASSERT_OK(builder.AppendSelected(pool, batch_8mb, num_rows, body_row_ids.data(), + /*num_cols=*/1)); + std::stringstream ss; + ss << "Invalid: Overflow detected in ExecBatchBuilder when appending " << num_rows + 1 + << "-th element of length " << eight_mb << " bytes to current length " + << eight_mb * num_rows << " bytes"; + ASSERT_RAISES_WITH_MESSAGE( + Invalid, ss.str(), + builder.AppendSelected(pool, batch_8mb, 1, tail_row_id.data(), + /*num_cols=*/1)); + } + + { + // Building an array of int32_max = (8mb * num_rows + 8mb - 1) bytes should succeed. + ExecBatchBuilder builder; + ASSERT_OK(builder.AppendSelected(pool, batch_8mb, num_rows, body_row_ids.data(), + /*num_cols=*/1)); + ASSERT_OK(builder.AppendSelected(pool, batch_8mb_minus_1, 1, tail_row_id.data(), + /*num_cols=*/1)); + ExecBatch built = builder.Flush(); + auto datum = built[0]; + ASSERT_TRUE(datum.is_array()); + auto array = datum.array_as(); + ASSERT_EQ(array->length(), num_rows + 1); + for (int i = 0; i < num_rows; ++i) { + ASSERT_EQ(array->GetString(i), str_8mb); + } + ASSERT_EQ(array->GetString(num_rows), str_8mb_minus_1); + ASSERT_NE(0, pool->bytes_allocated()); + } + + ASSERT_EQ(0, pool->bytes_allocated()); +} + TEST(KeyColumnArray, FromExecBatch) { ExecBatch batch = JSONToExecBatch({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); diff --git a/cpp/src/arrow/testing/generator.cc b/cpp/src/arrow/testing/generator.cc index 36c88c20efe6e..5ea6a541e8922 100644 --- a/cpp/src/arrow/testing/generator.cc +++ b/cpp/src/arrow/testing/generator.cc @@ -38,6 +38,7 @@ #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/logging.h" #include "arrow/util/macros.h" #include "arrow/util/string.h" @@ -103,7 +104,13 @@ std::shared_ptr ConstantArrayGenerator::Float64(int64_t size, std::shared_ptr ConstantArrayGenerator::String(int64_t size, std::string value) { - return ConstantArray(size, value); + using BuilderType = typename TypeTraits::BuilderType; + auto type = TypeTraits::type_singleton(); + auto builder_fn = [&](BuilderType* builder) { + DCHECK_OK(builder->Append(std::string_view(value.data()))); + }; + return ArrayFromBuilderVisitor(type, value.size() * size, size, builder_fn) + .ValueOrDie(); } std::shared_ptr ConstantArrayGenerator::Zeroes(