From 3e6e8f3fc59777957334c90e02d7826fb27e2488 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Thu, 30 Jan 2025 13:27:08 +0900 Subject: [PATCH] GH-45339: [Parquet][C++] Fix statistics load logic for no row group and multiple row groups (#45350) ### Rationale for this change Loading `arrow::ArrayStatistics` logic depends on `parquet::ColumnChunkMetaData`. We can't get `parquet::ColumnChunkMetaData` when requested row groups are empty because no associated row group and column chunk exist. We can't use multiple `parquet::ColumnChunkMetaData`s for now because we don't have statistics merge logic. So we can't load statistics when we use multiple row groups. ### What changes are included in this PR? * Don't load statistics when no row groups are used * Don't load statistics when multiple row groups are used * Add `parquet::ArrowReaderProperties::{set_,}should_load_statistics()` to enforce loading statistics by loading row group one by one ### Are these changes tested? Yes. ### Are there any user-facing changes? Yes. * GitHub Issue: #45339 Authored-by: Sutou Kouhei Signed-off-by: Sutou Kouhei --- .../parquet/arrow/arrow_reader_writer_test.cc | 102 ++++++++++++++++++ .../parquet/arrow/arrow_statistics_test.cc | 55 ++++++++-- cpp/src/parquet/arrow/reader.cc | 20 +++- cpp/src/parquet/arrow/reader_internal.cc | 4 + cpp/src/parquet/arrow/reader_internal.h | 1 + cpp/src/parquet/arrow/test_util.h | 4 +- cpp/src/parquet/properties.h | 13 ++- 7 files changed, 187 insertions(+), 12 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 47a00016b94b0..27cb849365ca7 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -4296,6 +4296,108 @@ TEST(TestArrowReaderAdHoc, ReadFloat16Files) { } } +TEST(TestArrowFileReader, RecordBatchReaderEmptyRowGroups) { + const int num_columns = 1; + const int num_rows = 3; + const int num_chunks = 1; + + std::shared_ptr table; + ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, num_chunks, &table)); + + const int64_t row_group_size = num_rows; + std::shared_ptr buffer; + ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size, + default_arrow_writer_properties(), &buffer)); + + auto reader = ParquetFileReader::Open(std::make_shared(buffer)); + std::unique_ptr file_reader; + ASSERT_OK( + FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader)); + // This is the important part in this test. + std::vector row_group_indices = {}; + ASSERT_OK_AND_ASSIGN(auto record_batch_reader, + file_reader->GetRecordBatchReader(row_group_indices)); + std::shared_ptr<::arrow::RecordBatch> record_batch; + ASSERT_OK(record_batch_reader->ReadNext(&record_batch)); + // No read record batch for empty row groups request. + ASSERT_FALSE(record_batch); +} + +TEST(TestArrowFileReader, RecordBatchReaderEmptyInput) { + const int num_columns = 1; + // This is the important part in this test. + const int num_rows = 0; + const int num_chunks = 1; + + std::shared_ptr
table; + ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, num_chunks, &table)); + + const int64_t row_group_size = num_rows; + std::shared_ptr buffer; + ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size, + default_arrow_writer_properties(), &buffer)); + + auto reader = ParquetFileReader::Open(std::make_shared(buffer)); + std::unique_ptr file_reader; + ASSERT_OK( + FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader)); + ASSERT_OK_AND_ASSIGN(auto record_batch_reader, file_reader->GetRecordBatchReader()); + std::shared_ptr<::arrow::RecordBatch> record_batch; + ASSERT_OK(record_batch_reader->ReadNext(&record_batch)); + // No read record batch for empty data. + ASSERT_FALSE(record_batch); +} + +TEST(TestArrowColumnReader, NextBatchZeroBatchSize) { + const int num_columns = 1; + const int num_rows = 3; + const int num_chunks = 1; + + std::shared_ptr
table; + ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, num_chunks, &table)); + + const int64_t row_group_size = num_rows; + std::shared_ptr buffer; + ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size, + default_arrow_writer_properties(), &buffer)); + + auto reader = ParquetFileReader::Open(std::make_shared(buffer)); + std::unique_ptr file_reader; + ASSERT_OK( + FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader)); + std::unique_ptr column_reader; + ASSERT_OK(file_reader->GetColumn(0, &column_reader)); + std::shared_ptr chunked_array; + // This is the important part in this test. + ASSERT_OK(column_reader->NextBatch(0, &chunked_array)); + ASSERT_EQ(0, chunked_array->length()); +} + +TEST(TestArrowColumnReader, NextBatchEmptyInput) { + const int num_columns = 1; + // This is the important part in this test. + const int num_rows = 0; + const int num_chunks = 1; + + std::shared_ptr
table; + ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, num_chunks, &table)); + + const int64_t row_group_size = num_rows; + std::shared_ptr buffer; + ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size, + default_arrow_writer_properties(), &buffer)); + + auto reader = ParquetFileReader::Open(std::make_shared(buffer)); + std::unique_ptr file_reader; + ASSERT_OK( + FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader)); + std::unique_ptr column_reader; + ASSERT_OK(file_reader->GetColumn(0, &column_reader)); + std::shared_ptr chunked_array; + ASSERT_OK(column_reader->NextBatch(10, &chunked_array)); + ASSERT_EQ(0, chunked_array->length()); +} + // direct-as-possible translation of // pyarrow/tests/test_parquet.py::test_validate_schema_write_table TEST(TestArrowWriterAdHoc, SchemaMismatch) { diff --git a/cpp/src/parquet/arrow/arrow_statistics_test.cc b/cpp/src/parquet/arrow/arrow_statistics_test.cc index a8e2287d37085..048518644c6eb 100644 --- a/cpp/src/parquet/arrow/arrow_statistics_test.cc +++ b/cpp/src/parquet/arrow/arrow_statistics_test.cc @@ -185,16 +185,17 @@ TEST(StatisticsTest, TruncateOnlyHalfMinMax) { namespace { ::arrow::Result> StatisticsReadArray( - std::shared_ptr<::arrow::DataType> data_type, std::shared_ptr<::arrow::Array> array) { + std::shared_ptr<::arrow::DataType> data_type, std::shared_ptr<::arrow::Array> array, + std::shared_ptr writer_properties = default_writer_properties(), + const ArrowReaderProperties& reader_properties = default_arrow_reader_properties()) { auto schema = ::arrow::schema({::arrow::field("column", data_type)}); auto record_batch = ::arrow::RecordBatch::Make(schema, array->length(), {array}); ARROW_ASSIGN_OR_RAISE(auto sink, ::arrow::io::BufferOutputStream::Create()); const auto arrow_writer_properties = parquet::ArrowWriterProperties::Builder().store_schema()->build(); - ARROW_ASSIGN_OR_RAISE( - auto writer, - FileWriter::Open(*schema, ::arrow::default_memory_pool(), sink, - default_writer_properties(), arrow_writer_properties)); + ARROW_ASSIGN_OR_RAISE(auto writer, + FileWriter::Open(*schema, ::arrow::default_memory_pool(), sink, + writer_properties, arrow_writer_properties)); ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch)); ARROW_RETURN_NOT_OK(writer->Close()); ARROW_ASSIGN_OR_RAISE(auto buffer, sink->Finish()); @@ -202,8 +203,8 @@ ::arrow::Result> StatisticsReadArray( auto reader = ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer)); std::unique_ptr file_reader; - ARROW_RETURN_NOT_OK( - FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader)); + ARROW_RETURN_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), std::move(reader), + reader_properties, &file_reader)); std::shared_ptr<::arrow::ChunkedArray> chunked_array; ARROW_RETURN_NOT_OK(file_reader->ReadColumn(0, &chunked_array)); return chunked_array->chunk(0); @@ -326,4 +327,44 @@ TEST(TestStatisticsRead, Duration) { ::arrow::duration(::arrow::TimeUnit::NANO)); } +TEST(TestStatisticsRead, MultipleRowGroupsDefault) { + auto arrow_type = ::arrow::int32(); + auto built_array = ArrayFromJSON(arrow_type, R"([1, null, -1])"); + auto writer_properties = WriterProperties::Builder().max_row_group_length(2)->build(); + ASSERT_OK_AND_ASSIGN( + auto read_array, + StatisticsReadArray(arrow_type, std::move(built_array), writer_properties)); + auto typed_read_array = std::static_pointer_cast<::arrow::Int32Array>(read_array); + auto statistics = typed_read_array->statistics(); + ASSERT_EQ(nullptr, statistics); +} + +TEST(TestStatisticsRead, MultipleRowGroupsShouldLoadStatistics) { + auto arrow_type = ::arrow::int32(); + auto built_array = ArrayFromJSON(arrow_type, R"([1, null, -1])"); + auto writer_properties = WriterProperties::Builder().max_row_group_length(2)->build(); + ArrowReaderProperties reader_properties; + reader_properties.set_should_load_statistics(true); + ASSERT_OK_AND_ASSIGN(auto read_array, + StatisticsReadArray(arrow_type, std::move(built_array), + writer_properties, reader_properties)); + // If we use should_load_statistics, reader doesn't load multiple + // row groups at once. So the first array in the read chunked array + // has only 2 elements. + ASSERT_EQ(2, read_array->length()); + auto typed_read_array = std::static_pointer_cast<::arrow::Int32Array>(read_array); + auto statistics = typed_read_array->statistics(); + ASSERT_NE(nullptr, statistics); + ASSERT_EQ(true, statistics->null_count.has_value()); + ASSERT_EQ(1, statistics->null_count.value()); + ASSERT_EQ(false, statistics->distinct_count.has_value()); + ASSERT_EQ(true, statistics->min.has_value()); + // This is not -1 because this array has only the first 2 elements. + ASSERT_EQ(1, std::get(*statistics->min)); + ASSERT_EQ(true, statistics->is_min_exact); + ASSERT_EQ(true, statistics->max.has_value()); + ASSERT_EQ(1, std::get(*statistics->max)); + ASSERT_EQ(true, statistics->is_max_exact); +} + } // namespace parquet::arrow diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 465ad5844d31a..03b725beb2a01 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -218,6 +218,7 @@ class FileReaderImpl : public FileReader { ctx->iterator_factory = SomeRowGroupsFactory(row_groups); ctx->filter_leaves = true; ctx->included_leaves = included_leaves; + ctx->reader_properties = &reader_properties_; return GetReader(manifest_.schema_fields[i], ctx, out); } @@ -475,6 +476,8 @@ class LeafReader : public ColumnReaderImpl { record_reader_->Reset(); // Pre-allocation gives much better performance for flat columns record_reader_->Reserve(records_to_read); + const bool should_load_statistics = ctx_->reader_properties->should_load_statistics(); + int64_t num_target_row_groups = 0; while (records_to_read > 0) { if (!record_reader_->HasMoreData()) { break; @@ -483,11 +486,21 @@ class LeafReader : public ColumnReaderImpl { records_to_read -= records_read; if (records_read == 0) { NextRowGroup(); + } else { + num_target_row_groups++; + // We can't mix multiple row groups when we load statistics + // because statistics are associated with a row group. If we + // want to mix multiple row groups and keep valid statistics, + // we need to implement a statistics merge logic. + if (should_load_statistics) { + break; + } } } - RETURN_NOT_OK(TransferColumnData(record_reader_.get(), - input_->column_chunk_metadata(), field_, descr_, - ctx_.get(), &out_)); + RETURN_NOT_OK(TransferColumnData( + record_reader_.get(), + num_target_row_groups == 1 ? input_->column_chunk_metadata() : nullptr, field_, + descr_, ctx_.get(), &out_)); return Status::OK(); END_PARQUET_CATCH_EXCEPTIONS } @@ -1214,6 +1227,7 @@ Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_facto ctx->pool = pool_; ctx->iterator_factory = iterator_factory; ctx->filter_leaves = false; + ctx->reader_properties = &reader_properties_; std::unique_ptr result; RETURN_NOT_OK(GetReader(manifest_.schema_fields[i], ctx, &result)); *out = std::move(result); diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 9d3171ea1a95d..59fe2b4600209 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -322,6 +322,10 @@ template void AttachStatistics(::arrow::ArrayData* data, std::unique_ptr<::parquet::ColumnChunkMetaData> metadata, const ReaderContext* ctx) { + if (!metadata) { + return; + } + using ArrowCType = typename ArrowType::c_type; auto statistics = metadata->statistics().get(); diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h index fab56c888045d..4ee3bf98bc54c 100644 --- a/cpp/src/parquet/arrow/reader_internal.h +++ b/cpp/src/parquet/arrow/reader_internal.h @@ -117,6 +117,7 @@ struct ReaderContext { FileColumnIteratorFactory iterator_factory; bool filter_leaves; std::shared_ptr> included_leaves; + ArrowReaderProperties* reader_properties; bool IncludesLeaf(int leaf_index) const { if (this->filter_leaves) { diff --git a/cpp/src/parquet/arrow/test_util.h b/cpp/src/parquet/arrow/test_util.h index c8fcbbb65d1b6..cfc57ce6ea743 100644 --- a/cpp/src/parquet/arrow/test_util.h +++ b/cpp/src/parquet/arrow/test_util.h @@ -229,7 +229,9 @@ ::arrow::enable_if_floating_point NullableArray( } ::arrow::NumericBuilder builder; - RETURN_NOT_OK(builder.AppendValues(values.data(), values.size(), valid_bytes.data())); + if (values.size() > 0) { + RETURN_NOT_OK(builder.AppendValues(values.data(), values.size(), valid_bytes.data())); + } return builder.Finish(out); } diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 8ae3660014f76..19436b84a379b 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -913,7 +913,8 @@ class PARQUET_EXPORT ArrowReaderProperties { pre_buffer_(true), cache_options_(::arrow::io::CacheOptions::LazyDefaults()), coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO), - arrow_extensions_enabled_(false) {} + arrow_extensions_enabled_(false), + should_load_statistics_(false) {} /// \brief Set whether to use the IO thread pool to parse columns in parallel. /// @@ -996,6 +997,15 @@ class PARQUET_EXPORT ArrowReaderProperties { } bool get_arrow_extensions_enabled() const { return arrow_extensions_enabled_; } + /// \brief Set whether to load statistics as much as possible. + /// + /// Default is false. + void set_should_load_statistics(bool should_load_statistics) { + should_load_statistics_ = should_load_statistics; + } + /// Return whether loading statistics as much as possible. + bool should_load_statistics() const { return should_load_statistics_; } + private: bool use_threads_; std::unordered_set read_dict_indices_; @@ -1005,6 +1015,7 @@ class PARQUET_EXPORT ArrowReaderProperties { ::arrow::io::CacheOptions cache_options_; ::arrow::TimeUnit::type coerce_int96_timestamp_unit_; bool arrow_extensions_enabled_; + bool should_load_statistics_; }; /// EXPERIMENTAL: Constructs the default ArrowReaderProperties