diff --git a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp index b1b04cf8ce69..7630425775b6 100644 --- a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp +++ b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp @@ -22,6 +22,7 @@ #include "velox/connectors/hive/HiveConnector.h" // @manual #include "velox/core/QueryCtx.h" #include "velox/dwio/parquet/RegisterParquetWriter.h" // @manual +#include "velox/dwio/parquet/reader/PageReader.h" #include "velox/dwio/parquet/tests/ParquetTestBase.h" #include "velox/exec/Cursor.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" @@ -77,6 +78,26 @@ class ParquetWriterTest : public ParquetTestBase { opts); }; + parquet::thrift::PageType::type getDataPageVersion( + const dwio::common::MemorySink* sinkPtr, + const parquet::ColumnChunkMetaDataPtr& colChunkPtr) { + std::string_view sinkData(sinkPtr->data(), sinkPtr->size()); + auto readFile = std::make_shared(sinkData); + auto file = std::make_shared(std::move(readFile)); + auto inputStream = std::make_unique( + std::move(file), + colChunkPtr.dataPageOffset(), + 150, + *leafPool_, + LogType::TEST); + auto pageReader = std::make_unique( + std::move(inputStream), + *leafPool_, + colChunkPtr.compression(), + colChunkPtr.totalCompressedSize()); + return pageReader->readPageHeader().type; + }; + inline static const std::string kHiveConnectorId = "test-hive"; }; @@ -146,6 +167,64 @@ TEST_F(ParquetWriterTest, compression) { assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_); }; +TEST_F(ParquetWriterTest, datapageVersion) { + auto schema = ROW({"c0"}, {INTEGER()}); + const int64_t kRows = 1; + const auto data = makeRowVector({ + makeFlatVector(kRows, [](auto row) { return 987; }), + }); + + // Set parquet datapage version and write data - then read to ensure the + // property took effect. + const auto testDataPageVersion = + [&](parquet::WriterOptions::ParquetDataPageVersion dataPageVersion) { + // Create an in-memory writer. + auto sink = std::make_unique( + 200 * 1024 * 1024, + dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto sinkPtr = sink.get(); + parquet::WriterOptions writerOptions; + writerOptions.memoryPool = leafPool_.get(); + writerOptions.parquetDataPageVersion = dataPageVersion; + + auto writer = std::make_unique( + std::move(sink), writerOptions, rootPool_, schema); + writer->write(data); + writer->close(); + + dwio::common::ReaderOptions readerOptions{leafPool_.get()}; + auto reader = createReaderInMemory(*sinkPtr, readerOptions); + + auto colChunkPtr = reader->fileMetaData().rowGroup(0).columnChunk(0); + std::string_view sinkData(sinkPtr->data(), sinkPtr->size()); + + auto readFile = std::make_shared(sinkData); + auto file = std::make_shared(std::move(readFile)); + + auto inputStream = std::make_unique( + std::move(file), + colChunkPtr.dataPageOffset(), + 150, + *leafPool_, + LogType::TEST); + auto pageReader = std::make_unique( + std::move(inputStream), + *leafPool_, + colChunkPtr.compression(), + colChunkPtr.totalCompressedSize()); + + return pageReader->readPageHeader().type; + }; + + ASSERT_EQ( + testDataPageVersion(parquet::WriterOptions::ParquetDataPageVersion::V1), + thrift::PageType::type::DATA_PAGE); + + ASSERT_EQ( + testDataPageVersion(parquet::WriterOptions::ParquetDataPageVersion::V2), + thrift::PageType::type::DATA_PAGE_V2); +}; + DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) { SCOPED_TESTVALUE_SET( "facebook::velox::parquet::Writer::write", diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index aa73361de3cf..556290fc3aa5 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -147,6 +147,14 @@ std::shared_ptr getArrowParquetWriterOptions( static_cast(flushPolicy->rowsInRowGroup())); properties = properties->codec_options(options.codecOptions); properties = properties->enable_store_decimal_as_integer(); + if (options.parquetDataPageVersion == + parquet::WriterOptions::ParquetDataPageVersion::V2) { + properties = + properties->data_page_version(arrow::ParquetDataPageVersion::V2); + } else { + properties = + properties->data_page_version(arrow::ParquetDataPageVersion::V1); + } return properties->build(); } @@ -238,6 +246,22 @@ std::optional getTimestampTimeZone( return std::nullopt; } +std::optional +getParquetDataPageVersion( + const config::ConfigBase& config, + const char* configKey) { + if (const auto version = config.get(configKey)) { + if (version == "PARQUET_1_0") { + return parquet::WriterOptions::ParquetDataPageVersion::V1; + } else if (version == "PARQUET_2_0") { + return parquet::WriterOptions::ParquetDataPageVersion::V2; + } else { + VELOX_FAIL("Unsupported parquet datapage version {}", version.value()); + } + } + return std::nullopt; +} + } // namespace Writer::Writer( @@ -470,6 +494,15 @@ void WriterOptions::processConfigs( : getTimestampTimeZone( connectorConfig, core::QueryConfig::kSessionTimezone); } + + if (!parquetDataPageVersion) { + parquetDataPageVersion = + getParquetDataPageVersion(session, kParquetSessionDataPageVersion) + .has_value() + ? getParquetDataPageVersion(session, kParquetSessionDataPageVersion) + : getParquetDataPageVersion( + connectorConfig, kParquetHiveConnectorDataPageVersion); + } } } // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 33824133dc38..7e1d01983706 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -24,6 +24,7 @@ #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Writer.h" #include "velox/dwio/common/WriterFactory.h" +#include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/vector/ComplexVector.h" @@ -109,6 +110,13 @@ struct WriterOptions : public dwio::common::WriterOptions { std::optional parquetWriteTimestampTimeZone; bool writeInt96AsTimestamp = false; + enum class ParquetDataPageVersion { + V1, + V2, + }; + + std::optional parquetDataPageVersion; + // Parsing session and hive configs. // This isn't a typo; session and hive connector config names are different @@ -117,6 +125,10 @@ struct WriterOptions : public dwio::common::WriterOptions { "hive.parquet.writer.timestamp_unit"; static constexpr const char* kParquetHiveConnectorWriteTimestampUnit = "hive.parquet.writer.timestamp-unit"; + static constexpr const char* kParquetSessionDataPageVersion = + "parquet_datapage_version"; + static constexpr const char* kParquetHiveConnectorDataPageVersion = + "hive.parquet.datapage.version"; // Process hive connector and session configs. void processConfigs(