diff --git a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp index b1b04cf8ce69..2d7f063c13df 100644 --- a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp +++ b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp @@ -22,8 +22,10 @@ #include "velox/connectors/hive/HiveConnector.h" // @manual #include "velox/core/QueryCtx.h" #include "velox/dwio/parquet/RegisterParquetWriter.h" // @manual +#include "velox/dwio/parquet/reader/PageReader.h" #include "velox/dwio/parquet/tests/ParquetTestBase.h" #include "velox/exec/Cursor.h" +#include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/QueryAssertions.h" @@ -77,6 +79,26 @@ class ParquetWriterTest : public ParquetTestBase { opts); }; + facebook::velox::parquet::thrift::PageType::type getDataPageVersion( + const dwio::common::MemorySink* sinkPtr, + const facebook::velox::parquet::ColumnChunkMetaDataPtr& colChunkPtr) { + std::string_view sinkData(sinkPtr->data(), sinkPtr->size()); + auto readFile = std::make_shared(sinkData); + auto file = std::make_shared(std::move(readFile)); + auto inputStream = std::make_unique( + std::move(file), + colChunkPtr.dataPageOffset(), + 150, + *leafPool_, + LogType::TEST); + auto pageReader = std::make_unique( + std::move(inputStream), + *leafPool_, + colChunkPtr.compression(), + colChunkPtr.totalCompressedSize()); + return pageReader->readPageHeader().type; + }; + inline static const std::string kHiveConnectorId = "test-hive"; }; @@ -146,6 +168,50 @@ TEST_F(ParquetWriterTest, compression) { assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_); }; +TEST_F(ParquetWriterTest, datapageVersion) { + auto schema = ROW({"c0"}, {INTEGER()}); + const int64_t kRows = 1; + const auto data = makeRowVector({ + makeFlatVector(kRows, [](auto row) { return 987; }), + }); + + // Set parquet datapage version and write data - then read to ensure the + // property took effect. + const auto testDataPageVersion = + [&](facebook::velox::parquet::arrow::ParquetDataPageVersion + dataPageVersion) { + // Create an in-memory writer. + auto sink = std::make_unique( + 200 * 1024 * 1024, + dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto sinkPtr = sink.get(); + facebook::velox::parquet::WriterOptions writerOptions; + writerOptions.memoryPool = leafPool_.get(); + writerOptions.parquetDataPageVersion = dataPageVersion; + + auto writer = std::make_unique( + std::move(sink), writerOptions, rootPool_, schema); + writer->write(data); + writer->close(); + + dwio::common::ReaderOptions readerOptions{leafPool_.get()}; + auto reader = createReaderInMemory(*sinkPtr, readerOptions); + auto readDataPageVersion = getDataPageVersion( + sinkPtr, reader->fileMetaData().rowGroup(0).columnChunk(0)); + return readDataPageVersion; + }; + + ASSERT_EQ( + testDataPageVersion( + facebook::velox::parquet::arrow::ParquetDataPageVersion::V1), + thrift::PageType::type::DATA_PAGE); + + ASSERT_EQ( + testDataPageVersion( + facebook::velox::parquet::arrow::ParquetDataPageVersion::V2), + thrift::PageType::type::DATA_PAGE_V2); +}; + DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) { SCOPED_TESTVALUE_SET( "facebook::velox::parquet::Writer::write", diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index aa73361de3cf..e58b3a91fd47 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -147,6 +147,8 @@ std::shared_ptr getArrowParquetWriterOptions( static_cast(flushPolicy->rowsInRowGroup())); properties = properties->codec_options(options.codecOptions); properties = properties->enable_store_decimal_as_integer(); + properties = properties->data_page_version(options.parquetDataPageVersion.value()); + return properties->build(); } @@ -238,6 +240,21 @@ std::optional getTimestampTimeZone( return std::nullopt; } +std::optional getParquetDataPageVersion( + const config::ConfigBase& config, + const char* configKey) { + if (const auto version = config.get(configKey)) { + if (version == "PARQUET_1_0") { + return arrow::ParquetDataPageVersion::V1; + } else if (version == "PARQUET_2_0") { + return arrow::ParquetDataPageVersion::V2; + } else { + VELOX_FAIL("Unsupported parquet datapage version {}", version.value()); + } + } + return std::nullopt; +} + } // namespace Writer::Writer( @@ -470,6 +487,13 @@ void WriterOptions::processConfigs( : getTimestampTimeZone( connectorConfig, core::QueryConfig::kSessionTimezone); } + + if (!parquetDataPageVersion) { + parquetDataPageVersion = + getParquetDataPageVersion(session, kParquetSessionDataPageVersion).has_value() + ? getParquetDataPageVersion(session, kParquetSessionDataPageVersion) + : getParquetDataPageVersion(connectorConfig, kParquetSessionDataPageVersion); + } } } // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 33824133dc38..c9dcdbf057bd 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -24,6 +24,7 @@ #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Writer.h" #include "velox/dwio/common/WriterFactory.h" +#include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/vector/ComplexVector.h" @@ -108,6 +109,7 @@ struct WriterOptions : public dwio::common::WriterOptions { /// Timestamp time zone for Parquet write through Arrow bridge. std::optional parquetWriteTimestampTimeZone; bool writeInt96AsTimestamp = false; + std::optional parquetDataPageVersion; // Parsing session and hive configs. @@ -117,6 +119,8 @@ struct WriterOptions : public dwio::common::WriterOptions { "hive.parquet.writer.timestamp_unit"; static constexpr const char* kParquetHiveConnectorWriteTimestampUnit = "hive.parquet.writer.timestamp-unit"; + static constexpr const char* kParquetSessionDataPageVersion = + "parquet_writer_version"; // Process hive connector and session configs. void processConfigs(