Skip to content

Commit

Permalink
GH-45339: [Parquet][C++] Fix statistics load logic for no row group a…
Browse files Browse the repository at this point in the history
…nd multiple row groups (#45350)

### Rationale for this change

Loading `arrow::ArrayStatistics` logic depends on `parquet::ColumnChunkMetaData`.

We can't get `parquet::ColumnChunkMetaData` when requested row groups are empty because no associated row group and column chunk exist.

We can't use multiple `parquet::ColumnChunkMetaData`s for now because we don't have statistics merge logic. So we can't load statistics when we use multiple row groups. 

### What changes are included in this PR?

* Don't load statistics when no row groups are used
* Don't load statistics when multiple row groups are used
* Add `parquet::ArrowReaderProperties::{set_,}should_load_statistics()` to enforce loading statistics by loading row group one by one

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* GitHub Issue: #45339

Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
  • Loading branch information
kou authored Jan 30, 2025
1 parent ca2f4d6 commit 3e6e8f3
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 12 deletions.
102 changes: 102 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4296,6 +4296,108 @@ TEST(TestArrowReaderAdHoc, ReadFloat16Files) {
}
}

TEST(TestArrowFileReader, RecordBatchReaderEmptyRowGroups) {
const int num_columns = 1;
const int num_rows = 3;
const int num_chunks = 1;

std::shared_ptr<Table> table;
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, num_chunks, &table));

const int64_t row_group_size = num_rows;
std::shared_ptr<Buffer> buffer;
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
default_arrow_writer_properties(), &buffer));

auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
std::unique_ptr<FileReader> file_reader;
ASSERT_OK(
FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader));
// This is the important part in this test.
std::vector<int> row_group_indices = {};
ASSERT_OK_AND_ASSIGN(auto record_batch_reader,
file_reader->GetRecordBatchReader(row_group_indices));
std::shared_ptr<::arrow::RecordBatch> record_batch;
ASSERT_OK(record_batch_reader->ReadNext(&record_batch));
// No read record batch for empty row groups request.
ASSERT_FALSE(record_batch);
}

TEST(TestArrowFileReader, RecordBatchReaderEmptyInput) {
const int num_columns = 1;
// This is the important part in this test.
const int num_rows = 0;
const int num_chunks = 1;

std::shared_ptr<Table> table;
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, num_chunks, &table));

const int64_t row_group_size = num_rows;
std::shared_ptr<Buffer> buffer;
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
default_arrow_writer_properties(), &buffer));

auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
std::unique_ptr<FileReader> file_reader;
ASSERT_OK(
FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader));
ASSERT_OK_AND_ASSIGN(auto record_batch_reader, file_reader->GetRecordBatchReader());
std::shared_ptr<::arrow::RecordBatch> record_batch;
ASSERT_OK(record_batch_reader->ReadNext(&record_batch));
// No read record batch for empty data.
ASSERT_FALSE(record_batch);
}

TEST(TestArrowColumnReader, NextBatchZeroBatchSize) {
const int num_columns = 1;
const int num_rows = 3;
const int num_chunks = 1;

std::shared_ptr<Table> table;
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, num_chunks, &table));

const int64_t row_group_size = num_rows;
std::shared_ptr<Buffer> buffer;
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
default_arrow_writer_properties(), &buffer));

auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
std::unique_ptr<FileReader> file_reader;
ASSERT_OK(
FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader));
std::unique_ptr<arrow::ColumnReader> column_reader;
ASSERT_OK(file_reader->GetColumn(0, &column_reader));
std::shared_ptr<ChunkedArray> chunked_array;
// This is the important part in this test.
ASSERT_OK(column_reader->NextBatch(0, &chunked_array));
ASSERT_EQ(0, chunked_array->length());
}

TEST(TestArrowColumnReader, NextBatchEmptyInput) {
const int num_columns = 1;
// This is the important part in this test.
const int num_rows = 0;
const int num_chunks = 1;

std::shared_ptr<Table> table;
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, num_chunks, &table));

const int64_t row_group_size = num_rows;
std::shared_ptr<Buffer> buffer;
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
default_arrow_writer_properties(), &buffer));

auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
std::unique_ptr<FileReader> file_reader;
ASSERT_OK(
FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader));
std::unique_ptr<arrow::ColumnReader> column_reader;
ASSERT_OK(file_reader->GetColumn(0, &column_reader));
std::shared_ptr<ChunkedArray> chunked_array;
ASSERT_OK(column_reader->NextBatch(10, &chunked_array));
ASSERT_EQ(0, chunked_array->length());
}

// direct-as-possible translation of
// pyarrow/tests/test_parquet.py::test_validate_schema_write_table
TEST(TestArrowWriterAdHoc, SchemaMismatch) {
Expand Down
55 changes: 48 additions & 7 deletions cpp/src/parquet/arrow/arrow_statistics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,25 +185,26 @@ TEST(StatisticsTest, TruncateOnlyHalfMinMax) {

namespace {
::arrow::Result<std::shared_ptr<::arrow::Array>> StatisticsReadArray(
std::shared_ptr<::arrow::DataType> data_type, std::shared_ptr<::arrow::Array> array) {
std::shared_ptr<::arrow::DataType> data_type, std::shared_ptr<::arrow::Array> array,
std::shared_ptr<WriterProperties> writer_properties = default_writer_properties(),
const ArrowReaderProperties& reader_properties = default_arrow_reader_properties()) {
auto schema = ::arrow::schema({::arrow::field("column", data_type)});
auto record_batch = ::arrow::RecordBatch::Make(schema, array->length(), {array});
ARROW_ASSIGN_OR_RAISE(auto sink, ::arrow::io::BufferOutputStream::Create());
const auto arrow_writer_properties =
parquet::ArrowWriterProperties::Builder().store_schema()->build();
ARROW_ASSIGN_OR_RAISE(
auto writer,
FileWriter::Open(*schema, ::arrow::default_memory_pool(), sink,
default_writer_properties(), arrow_writer_properties));
ARROW_ASSIGN_OR_RAISE(auto writer,
FileWriter::Open(*schema, ::arrow::default_memory_pool(), sink,
writer_properties, arrow_writer_properties));
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch));
ARROW_RETURN_NOT_OK(writer->Close());
ARROW_ASSIGN_OR_RAISE(auto buffer, sink->Finish());

auto reader =
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
std::unique_ptr<FileReader> file_reader;
ARROW_RETURN_NOT_OK(
FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader));
ARROW_RETURN_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), std::move(reader),
reader_properties, &file_reader));
std::shared_ptr<::arrow::ChunkedArray> chunked_array;
ARROW_RETURN_NOT_OK(file_reader->ReadColumn(0, &chunked_array));
return chunked_array->chunk(0);
Expand Down Expand Up @@ -326,4 +327,44 @@ TEST(TestStatisticsRead, Duration) {
::arrow::duration(::arrow::TimeUnit::NANO));
}

TEST(TestStatisticsRead, MultipleRowGroupsDefault) {
auto arrow_type = ::arrow::int32();
auto built_array = ArrayFromJSON(arrow_type, R"([1, null, -1])");
auto writer_properties = WriterProperties::Builder().max_row_group_length(2)->build();
ASSERT_OK_AND_ASSIGN(
auto read_array,
StatisticsReadArray(arrow_type, std::move(built_array), writer_properties));
auto typed_read_array = std::static_pointer_cast<::arrow::Int32Array>(read_array);
auto statistics = typed_read_array->statistics();
ASSERT_EQ(nullptr, statistics);
}

TEST(TestStatisticsRead, MultipleRowGroupsShouldLoadStatistics) {
auto arrow_type = ::arrow::int32();
auto built_array = ArrayFromJSON(arrow_type, R"([1, null, -1])");
auto writer_properties = WriterProperties::Builder().max_row_group_length(2)->build();
ArrowReaderProperties reader_properties;
reader_properties.set_should_load_statistics(true);
ASSERT_OK_AND_ASSIGN(auto read_array,
StatisticsReadArray(arrow_type, std::move(built_array),
writer_properties, reader_properties));
// If we use should_load_statistics, reader doesn't load multiple
// row groups at once. So the first array in the read chunked array
// has only 2 elements.
ASSERT_EQ(2, read_array->length());
auto typed_read_array = std::static_pointer_cast<::arrow::Int32Array>(read_array);
auto statistics = typed_read_array->statistics();
ASSERT_NE(nullptr, statistics);
ASSERT_EQ(true, statistics->null_count.has_value());
ASSERT_EQ(1, statistics->null_count.value());
ASSERT_EQ(false, statistics->distinct_count.has_value());
ASSERT_EQ(true, statistics->min.has_value());
// This is not -1 because this array has only the first 2 elements.
ASSERT_EQ(1, std::get<int64_t>(*statistics->min));
ASSERT_EQ(true, statistics->is_min_exact);
ASSERT_EQ(true, statistics->max.has_value());
ASSERT_EQ(1, std::get<int64_t>(*statistics->max));
ASSERT_EQ(true, statistics->is_max_exact);
}

} // namespace parquet::arrow
20 changes: 17 additions & 3 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class FileReaderImpl : public FileReader {
ctx->iterator_factory = SomeRowGroupsFactory(row_groups);
ctx->filter_leaves = true;
ctx->included_leaves = included_leaves;
ctx->reader_properties = &reader_properties_;
return GetReader(manifest_.schema_fields[i], ctx, out);
}

Expand Down Expand Up @@ -475,6 +476,8 @@ class LeafReader : public ColumnReaderImpl {
record_reader_->Reset();
// Pre-allocation gives much better performance for flat columns
record_reader_->Reserve(records_to_read);
const bool should_load_statistics = ctx_->reader_properties->should_load_statistics();
int64_t num_target_row_groups = 0;
while (records_to_read > 0) {
if (!record_reader_->HasMoreData()) {
break;
Expand All @@ -483,11 +486,21 @@ class LeafReader : public ColumnReaderImpl {
records_to_read -= records_read;
if (records_read == 0) {
NextRowGroup();
} else {
num_target_row_groups++;
// We can't mix multiple row groups when we load statistics
// because statistics are associated with a row group. If we
// want to mix multiple row groups and keep valid statistics,
// we need to implement a statistics merge logic.
if (should_load_statistics) {
break;
}
}
}
RETURN_NOT_OK(TransferColumnData(record_reader_.get(),
input_->column_chunk_metadata(), field_, descr_,
ctx_.get(), &out_));
RETURN_NOT_OK(TransferColumnData(
record_reader_.get(),
num_target_row_groups == 1 ? input_->column_chunk_metadata() : nullptr, field_,
descr_, ctx_.get(), &out_));
return Status::OK();
END_PARQUET_CATCH_EXCEPTIONS
}
Expand Down Expand Up @@ -1214,6 +1227,7 @@ Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_facto
ctx->pool = pool_;
ctx->iterator_factory = iterator_factory;
ctx->filter_leaves = false;
ctx->reader_properties = &reader_properties_;
std::unique_ptr<ColumnReaderImpl> result;
RETURN_NOT_OK(GetReader(manifest_.schema_fields[i], ctx, &result));
*out = std::move(result);
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/parquet/arrow/reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ template <typename ArrowType, typename ParquetType>
void AttachStatistics(::arrow::ArrayData* data,
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const ReaderContext* ctx) {
if (!metadata) {
return;
}

using ArrowCType = typename ArrowType::c_type;

auto statistics = metadata->statistics().get();
Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/arrow/reader_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ struct ReaderContext {
FileColumnIteratorFactory iterator_factory;
bool filter_leaves;
std::shared_ptr<std::unordered_set<int>> included_leaves;
ArrowReaderProperties* reader_properties;

bool IncludesLeaf(int leaf_index) const {
if (this->filter_leaves) {
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/parquet/arrow/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ ::arrow::enable_if_floating_point<ArrowType, Status> NullableArray(
}

::arrow::NumericBuilder<ArrowType> builder;
RETURN_NOT_OK(builder.AppendValues(values.data(), values.size(), valid_bytes.data()));
if (values.size() > 0) {
RETURN_NOT_OK(builder.AppendValues(values.data(), values.size(), valid_bytes.data()));
}
return builder.Finish(out);
}

Expand Down
13 changes: 12 additions & 1 deletion cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,8 @@ class PARQUET_EXPORT ArrowReaderProperties {
pre_buffer_(true),
cache_options_(::arrow::io::CacheOptions::LazyDefaults()),
coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO),
arrow_extensions_enabled_(false) {}
arrow_extensions_enabled_(false),
should_load_statistics_(false) {}

/// \brief Set whether to use the IO thread pool to parse columns in parallel.
///
Expand Down Expand Up @@ -996,6 +997,15 @@ class PARQUET_EXPORT ArrowReaderProperties {
}
bool get_arrow_extensions_enabled() const { return arrow_extensions_enabled_; }

/// \brief Set whether to load statistics as much as possible.
///
/// Default is false.
void set_should_load_statistics(bool should_load_statistics) {
should_load_statistics_ = should_load_statistics;
}
/// Return whether loading statistics as much as possible.
bool should_load_statistics() const { return should_load_statistics_; }

private:
bool use_threads_;
std::unordered_set<int> read_dict_indices_;
Expand All @@ -1005,6 +1015,7 @@ class PARQUET_EXPORT ArrowReaderProperties {
::arrow::io::CacheOptions cache_options_;
::arrow::TimeUnit::type coerce_int96_timestamp_unit_;
bool arrow_extensions_enabled_;
bool should_load_statistics_;
};

/// EXPERIMENTAL: Constructs the default ArrowReaderProperties
Expand Down

0 comments on commit 3e6e8f3

Please sign in to comment.