Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-39332: [C++] Explicit error in ExecBatchBuilder when appending var length data exceeds offset limit (int32 max) #39383

Merged
merged 8 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions cpp/src/arrow/compute/light_array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <type_traits>

#include "arrow/util/bitmap_ops.h"
#include "arrow/util/int_util_overflow.h"
#include "arrow/util/macros.h"

namespace arrow {
namespace compute {
Expand Down Expand Up @@ -325,11 +327,10 @@ Status ResizableArrayData::ResizeVaryingLengthBuffer() {
column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie();

if (!column_metadata.is_fixed_length) {
int min_new_size = static_cast<int>(reinterpret_cast<const uint32_t*>(
buffers_[kFixedLengthBuffer]->data())[num_rows_]);
int64_t min_new_size = buffers_[kFixedLengthBuffer]->data_as<int32_t>()[num_rows_];
ARROW_DCHECK(var_len_buf_size_ > 0);
if (var_len_buf_size_ < min_new_size) {
int new_size = var_len_buf_size_;
int64_t new_size = var_len_buf_size_;
while (new_size < min_new_size) {
new_size *= 2;
}
Expand Down Expand Up @@ -465,12 +466,11 @@ void ExecBatchBuilder::Visit(const std::shared_ptr<ArrayData>& column, int num_r

if (!metadata.is_fixed_length) {
const uint8_t* ptr_base = column->buffers[2]->data();
const uint32_t* offsets =
reinterpret_cast<const uint32_t*>(column->buffers[1]->data()) + column->offset;
const int32_t* offsets = column->GetValues<int32_t>(1);
for (int i = 0; i < num_rows; ++i) {
uint16_t row_id = row_ids[i];
const uint8_t* field_ptr = ptr_base + offsets[row_id];
uint32_t field_length = offsets[row_id + 1] - offsets[row_id];
int32_t field_length = offsets[row_id + 1] - offsets[row_id];
process_value_fn(i, field_ptr, field_length);
}
} else {
Expand All @@ -480,7 +480,7 @@ void ExecBatchBuilder::Visit(const std::shared_ptr<ArrayData>& column, int num_r
const uint8_t* field_ptr =
column->buffers[1]->data() +
(column->offset + row_id) * static_cast<int64_t>(metadata.fixed_length);
process_value_fn(i, field_ptr, metadata.fixed_length);
process_value_fn(i, field_ptr, static_cast<int32_t>(metadata.fixed_length));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to change the type of metadata.fixed_length to int32_t, but that would bring big amount related changes overwhelming to this small PR. So I tend to leave it as is and do a simple cast here. Changing fixed_length of RowTableMetadata and KeyColumnMetaData to signed type could be a future enhancement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing fixed_length of RowTableMetadata and KeyColumnMetaData to signed type could be a future enhancement.

Cool, thank you!

}
}
}
Expand Down Expand Up @@ -511,30 +511,30 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source
break;
case 1:
Visit(source, num_rows_to_append, row_ids,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
target->mutable_data(1)[num_rows_before + i] = *ptr;
});
break;
case 2:
Visit(
source, num_rows_to_append, row_ids,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
reinterpret_cast<uint16_t*>(target->mutable_data(1))[num_rows_before + i] =
*reinterpret_cast<const uint16_t*>(ptr);
});
break;
case 4:
Visit(
source, num_rows_to_append, row_ids,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
reinterpret_cast<uint32_t*>(target->mutable_data(1))[num_rows_before + i] =
*reinterpret_cast<const uint32_t*>(ptr);
});
break;
case 8:
Visit(
source, num_rows_to_append, row_ids,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
reinterpret_cast<uint64_t*>(target->mutable_data(1))[num_rows_before + i] =
*reinterpret_cast<const uint64_t*>(ptr);
});
Expand All @@ -544,7 +544,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source
num_rows_to_append -
NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
Visit(source, num_rows_to_process, row_ids,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
uint64_t* dst = reinterpret_cast<uint64_t*>(
target->mutable_data(1) +
static_cast<int64_t>(num_bytes) * (num_rows_before + i));
Expand All @@ -558,7 +558,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source
if (num_rows_to_append > num_rows_to_process) {
Visit(source, num_rows_to_append - num_rows_to_process,
row_ids + num_rows_to_process,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
uint64_t* dst = reinterpret_cast<uint64_t*>(
target->mutable_data(1) +
static_cast<int64_t>(num_bytes) *
Expand All @@ -575,16 +575,23 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source

// Step 1: calculate target offsets
//
uint32_t* offsets = reinterpret_cast<uint32_t*>(target->mutable_data(1));
uint32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before];
int32_t* offsets = reinterpret_cast<int32_t*>(target->mutable_data(1));
int32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before];
Visit(source, num_rows_to_append, row_ids,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
offsets[num_rows_before + i] = num_bytes;
});
for (int i = 0; i < num_rows_to_append; ++i) {
uint32_t length = offsets[num_rows_before + i];
int32_t length = offsets[num_rows_before + i];
offsets[num_rows_before + i] = sum;
sum += length;
int32_t new_sum_maybe_overflow = 0;
if (ARROW_PREDICT_FALSE(
arrow::internal::AddWithOverflow(sum, length, &new_sum_maybe_overflow))) {
return Status::Invalid("Overflow detected in ExecBatchBuilder when appending ",
num_rows_before + i + 1, "-th element of length ", length,
" bytes to current length ", sum, " bytes");
}
sum = new_sum_maybe_overflow;
}
offsets[num_rows_before + num_rows_to_append] = sum;

Expand All @@ -598,7 +605,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source
num_rows_to_append -
NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
Visit(source, num_rows_to_process, row_ids,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
uint64_t* dst = reinterpret_cast<uint64_t*>(target->mutable_data(2) +
offsets[num_rows_before + i]);
const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
Expand All @@ -608,7 +615,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source
}
});
Visit(source, num_rows_to_append - num_rows_to_process, row_ids + num_rows_to_process,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
uint64_t* dst = reinterpret_cast<uint64_t*>(
target->mutable_data(2) +
offsets[num_rows_before + num_rows_to_process + i]);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/light_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ class ARROW_EXPORT ResizableArrayData {
MemoryPool* pool_;
int num_rows_;
int num_rows_allocated_;
int var_len_buf_size_;
int64_t var_len_buf_size_;
static constexpr int kMaxBuffers = 3;
std::shared_ptr<ResizableBuffer> buffers_[kMaxBuffers];
};
Expand Down
64 changes: 64 additions & 0 deletions cpp/src/arrow/compute/light_array_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,70 @@ TEST(ExecBatchBuilder, AppendValuesBeyondLimit) {
ASSERT_EQ(0, pool->bytes_allocated());
}

TEST(ExecBatchBuilder, AppendVarLengthBeyondLimit) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment referring to the GH issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// GH-39332: check appending variable-length data past 2GB.
if constexpr (sizeof(void*) == 4) {
GTEST_SKIP() << "Test only works on 64-bit platforms";
}

std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
MemoryPool* pool = owned_pool.get();
constexpr auto eight_mb = 8 * 1024 * 1024;
constexpr auto eight_mb_minus_one = eight_mb - 1;
// String of size 8mb to repetitively fill the heading multiple of 8mbs of an array
// of int32_max bytes.
std::string str_8mb(eight_mb, 'a');
// String of size (8mb - 1) to be the last element of an array of int32_max bytes.
std::string str_8mb_minus_1(eight_mb_minus_one, 'b');
std::shared_ptr<Array> values_8mb = ConstantArrayGenerator::String(1, str_8mb);
std::shared_ptr<Array> values_8mb_minus_1 =
ConstantArrayGenerator::String(1, str_8mb_minus_1);

ExecBatch batch_8mb({values_8mb}, 1);
ExecBatch batch_8mb_minus_1({values_8mb_minus_1}, 1);

auto num_rows = std::numeric_limits<int32_t>::max() / eight_mb;
std::vector<uint16_t> body_row_ids(num_rows, 0);
std::vector<uint16_t> tail_row_id(1, 0);

{
// Building an array of (int32_max + 1) = (8mb * num_rows + 8mb) bytes should raise an
// error of overflow.
ExecBatchBuilder builder;
ASSERT_OK(builder.AppendSelected(pool, batch_8mb, num_rows, body_row_ids.data(),
/*num_cols=*/1));
std::stringstream ss;
ss << "Invalid: Overflow detected in ExecBatchBuilder when appending " << num_rows + 1
<< "-th element of length " << eight_mb << " bytes to current length "
<< eight_mb * num_rows << " bytes";
ASSERT_RAISES_WITH_MESSAGE(
Invalid, ss.str(),
builder.AppendSelected(pool, batch_8mb, 1, tail_row_id.data(),
/*num_cols=*/1));
}

{
// Building an array of int32_max = (8mb * num_rows + 8mb - 1) bytes should succeed.
ExecBatchBuilder builder;
ASSERT_OK(builder.AppendSelected(pool, batch_8mb, num_rows, body_row_ids.data(),
/*num_cols=*/1));
ASSERT_OK(builder.AppendSelected(pool, batch_8mb_minus_1, 1, tail_row_id.data(),
/*num_cols=*/1));
ExecBatch built = builder.Flush();
auto datum = built[0];
ASSERT_TRUE(datum.is_array());
auto array = datum.array_as<StringArray>();
ASSERT_EQ(array->length(), num_rows + 1);
for (int i = 0; i < num_rows; ++i) {
ASSERT_EQ(array->GetString(i), str_8mb);
}
ASSERT_EQ(array->GetString(num_rows), str_8mb_minus_1);
ASSERT_NE(0, pool->bytes_allocated());
}

ASSERT_EQ(0, pool->bytes_allocated());
}

TEST(KeyColumnArray, FromExecBatch) {
ExecBatch batch =
JSONToExecBatch({int64(), boolean()}, "[[1, true], [2, false], [null, null]]");
Expand Down
9 changes: 8 additions & 1 deletion cpp/src/arrow/testing/generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/string.h"

Expand Down Expand Up @@ -103,7 +104,13 @@ std::shared_ptr<arrow::Array> ConstantArrayGenerator::Float64(int64_t size,

std::shared_ptr<arrow::Array> ConstantArrayGenerator::String(int64_t size,
std::string value) {
return ConstantArray<StringType>(size, value);
using BuilderType = typename TypeTraits<StringType>::BuilderType;
auto type = TypeTraits<StringType>::type_singleton();
auto builder_fn = [&](BuilderType* builder) {
DCHECK_OK(builder->Append(std::string_view(value.data())));
};
return ArrayFromBuilderVisitor(type, value.size() * size, size, builder_fn)
.ValueOrDie();
}

std::shared_ptr<arrow::Array> ConstantArrayGenerator::Zeroes(
Expand Down