diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc index 66d8477b029a0..b225e04b05cea 100644 --- a/cpp/src/arrow/compute/light_array.cc +++ b/cpp/src/arrow/compute/light_array.cc @@ -20,6 +20,8 @@ #include #include "arrow/util/bitmap_ops.h" +#include "arrow/util/int_util_overflow.h" +#include "arrow/util/macros.h" namespace arrow { namespace compute { @@ -325,11 +327,10 @@ Status ResizableArrayData::ResizeVaryingLengthBuffer() { column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie(); if (!column_metadata.is_fixed_length) { - int min_new_size = static_cast(reinterpret_cast( - buffers_[kFixedLengthBuffer]->data())[num_rows_]); + int64_t min_new_size = buffers_[kFixedLengthBuffer]->data_as()[num_rows_]; ARROW_DCHECK(var_len_buf_size_ > 0); if (var_len_buf_size_ < min_new_size) { - int new_size = var_len_buf_size_; + int64_t new_size = var_len_buf_size_; while (new_size < min_new_size) { new_size *= 2; } @@ -465,12 +466,11 @@ void ExecBatchBuilder::Visit(const std::shared_ptr& column, int num_r if (!metadata.is_fixed_length) { const uint8_t* ptr_base = column->buffers[2]->data(); - const uint32_t* offsets = - reinterpret_cast(column->buffers[1]->data()) + column->offset; + const int32_t* offsets = column->GetValues(1); for (int i = 0; i < num_rows; ++i) { uint16_t row_id = row_ids[i]; const uint8_t* field_ptr = ptr_base + offsets[row_id]; - uint32_t field_length = offsets[row_id + 1] - offsets[row_id]; + int32_t field_length = offsets[row_id + 1] - offsets[row_id]; process_value_fn(i, field_ptr, field_length); } } else { @@ -480,7 +480,7 @@ void ExecBatchBuilder::Visit(const std::shared_ptr& column, int num_r const uint8_t* field_ptr = column->buffers[1]->data() + (column->offset + row_id) * static_cast(metadata.fixed_length); - process_value_fn(i, field_ptr, metadata.fixed_length); + process_value_fn(i, field_ptr, static_cast(metadata.fixed_length)); } } } @@ -511,14 +511,14 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source break; case 1: Visit(source, num_rows_to_append, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { target->mutable_data(1)[num_rows_before + i] = *ptr; }); break; case 2: Visit( source, num_rows_to_append, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { reinterpret_cast(target->mutable_data(1))[num_rows_before + i] = *reinterpret_cast(ptr); }); @@ -526,7 +526,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source case 4: Visit( source, num_rows_to_append, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { reinterpret_cast(target->mutable_data(1))[num_rows_before + i] = *reinterpret_cast(ptr); }); @@ -534,7 +534,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source case 8: Visit( source, num_rows_to_append, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { reinterpret_cast(target->mutable_data(1))[num_rows_before + i] = *reinterpret_cast(ptr); }); @@ -544,7 +544,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source num_rows_to_append - NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t)); Visit(source, num_rows_to_process, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { uint64_t* dst = reinterpret_cast( target->mutable_data(1) + static_cast(num_bytes) * (num_rows_before + i)); @@ -558,7 +558,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source if (num_rows_to_append > num_rows_to_process) { Visit(source, num_rows_to_append - num_rows_to_process, row_ids + num_rows_to_process, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { uint64_t* dst = reinterpret_cast( target->mutable_data(1) + static_cast(num_bytes) * @@ -575,16 +575,23 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source // Step 1: calculate target offsets // - uint32_t* offsets = reinterpret_cast(target->mutable_data(1)); - uint32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before]; + int32_t* offsets = reinterpret_cast(target->mutable_data(1)); + int32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before]; Visit(source, num_rows_to_append, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { offsets[num_rows_before + i] = num_bytes; }); for (int i = 0; i < num_rows_to_append; ++i) { - uint32_t length = offsets[num_rows_before + i]; + int32_t length = offsets[num_rows_before + i]; offsets[num_rows_before + i] = sum; - sum += length; + int32_t new_sum_maybe_overflow = 0; + if (ARROW_PREDICT_FALSE( + arrow::internal::AddWithOverflow(sum, length, &new_sum_maybe_overflow))) { + return Status::Invalid("Overflow detected in ExecBatchBuilder when appending ", + num_rows_before + i + 1, "-th element of length ", length, + " bytes to current length ", sum, " bytes"); + } + sum = new_sum_maybe_overflow; } offsets[num_rows_before + num_rows_to_append] = sum; @@ -598,7 +605,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source num_rows_to_append - NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t)); Visit(source, num_rows_to_process, row_ids, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { uint64_t* dst = reinterpret_cast(target->mutable_data(2) + offsets[num_rows_before + i]); const uint64_t* src = reinterpret_cast(ptr); @@ -608,7 +615,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source } }); Visit(source, num_rows_to_append - num_rows_to_process, row_ids + num_rows_to_process, - [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + [&](int i, const uint8_t* ptr, int32_t num_bytes) { uint64_t* dst = reinterpret_cast( target->mutable_data(2) + offsets[num_rows_before + num_rows_to_process + i]); diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h index 84aa86d64bb62..67de71bf56c92 100644 --- a/cpp/src/arrow/compute/light_array.h +++ b/cpp/src/arrow/compute/light_array.h @@ -353,7 +353,7 @@ class ARROW_EXPORT ResizableArrayData { MemoryPool* pool_; int num_rows_; int num_rows_allocated_; - int var_len_buf_size_; + int64_t var_len_buf_size_; static constexpr int kMaxBuffers = 3; std::shared_ptr buffers_[kMaxBuffers]; }; diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc index d50e9675517c3..ecc5f3ad37931 100644 --- a/cpp/src/arrow/compute/light_array_test.cc +++ b/cpp/src/arrow/compute/light_array_test.cc @@ -407,6 +407,70 @@ TEST(ExecBatchBuilder, AppendValuesBeyondLimit) { ASSERT_EQ(0, pool->bytes_allocated()); } +TEST(ExecBatchBuilder, AppendVarLengthBeyondLimit) { + // GH-39332: check appending variable-length data past 2GB. + if constexpr (sizeof(void*) == 4) { + GTEST_SKIP() << "Test only works on 64-bit platforms"; + } + + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + MemoryPool* pool = owned_pool.get(); + constexpr auto eight_mb = 8 * 1024 * 1024; + constexpr auto eight_mb_minus_one = eight_mb - 1; + // String of size 8mb to repetitively fill the heading multiple of 8mbs of an array + // of int32_max bytes. + std::string str_8mb(eight_mb, 'a'); + // String of size (8mb - 1) to be the last element of an array of int32_max bytes. + std::string str_8mb_minus_1(eight_mb_minus_one, 'b'); + std::shared_ptr values_8mb = ConstantArrayGenerator::String(1, str_8mb); + std::shared_ptr values_8mb_minus_1 = + ConstantArrayGenerator::String(1, str_8mb_minus_1); + + ExecBatch batch_8mb({values_8mb}, 1); + ExecBatch batch_8mb_minus_1({values_8mb_minus_1}, 1); + + auto num_rows = std::numeric_limits::max() / eight_mb; + std::vector body_row_ids(num_rows, 0); + std::vector tail_row_id(1, 0); + + { + // Building an array of (int32_max + 1) = (8mb * num_rows + 8mb) bytes should raise an + // error of overflow. + ExecBatchBuilder builder; + ASSERT_OK(builder.AppendSelected(pool, batch_8mb, num_rows, body_row_ids.data(), + /*num_cols=*/1)); + std::stringstream ss; + ss << "Invalid: Overflow detected in ExecBatchBuilder when appending " << num_rows + 1 + << "-th element of length " << eight_mb << " bytes to current length " + << eight_mb * num_rows << " bytes"; + ASSERT_RAISES_WITH_MESSAGE( + Invalid, ss.str(), + builder.AppendSelected(pool, batch_8mb, 1, tail_row_id.data(), + /*num_cols=*/1)); + } + + { + // Building an array of int32_max = (8mb * num_rows + 8mb - 1) bytes should succeed. + ExecBatchBuilder builder; + ASSERT_OK(builder.AppendSelected(pool, batch_8mb, num_rows, body_row_ids.data(), + /*num_cols=*/1)); + ASSERT_OK(builder.AppendSelected(pool, batch_8mb_minus_1, 1, tail_row_id.data(), + /*num_cols=*/1)); + ExecBatch built = builder.Flush(); + auto datum = built[0]; + ASSERT_TRUE(datum.is_array()); + auto array = datum.array_as(); + ASSERT_EQ(array->length(), num_rows + 1); + for (int i = 0; i < num_rows; ++i) { + ASSERT_EQ(array->GetString(i), str_8mb); + } + ASSERT_EQ(array->GetString(num_rows), str_8mb_minus_1); + ASSERT_NE(0, pool->bytes_allocated()); + } + + ASSERT_EQ(0, pool->bytes_allocated()); +} + TEST(KeyColumnArray, FromExecBatch) { ExecBatch batch = JSONToExecBatch({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); diff --git a/cpp/src/arrow/testing/generator.cc b/cpp/src/arrow/testing/generator.cc index 36c88c20efe6e..5ea6a541e8922 100644 --- a/cpp/src/arrow/testing/generator.cc +++ b/cpp/src/arrow/testing/generator.cc @@ -38,6 +38,7 @@ #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/logging.h" #include "arrow/util/macros.h" #include "arrow/util/string.h" @@ -103,7 +104,13 @@ std::shared_ptr ConstantArrayGenerator::Float64(int64_t size, std::shared_ptr ConstantArrayGenerator::String(int64_t size, std::string value) { - return ConstantArray(size, value); + using BuilderType = typename TypeTraits::BuilderType; + auto type = TypeTraits::type_singleton(); + auto builder_fn = [&](BuilderType* builder) { + DCHECK_OK(builder->Append(std::string_view(value.data()))); + }; + return ArrayFromBuilderVisitor(type, value.size() * size, size, builder_fn) + .ValueOrDie(); } std::shared_ptr ConstantArrayGenerator::Zeroes(