Skip to content

Commit

Permalink
add support for parquet_datapage_version session property
Browse files Browse the repository at this point in the history
  • Loading branch information
svm1 committed Jan 25, 2025
1 parent 292ea9d commit 2f489a5
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 0 deletions.
66 changes: 66 additions & 0 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
#include "velox/connectors/hive/HiveConnector.h" // @manual
#include "velox/core/QueryCtx.h"
#include "velox/dwio/parquet/RegisterParquetWriter.h" // @manual
#include "velox/dwio/parquet/reader/PageReader.h"
#include "velox/dwio/parquet/tests/ParquetTestBase.h"
#include "velox/exec/Cursor.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/QueryAssertions.h"
Expand Down Expand Up @@ -77,6 +79,26 @@ class ParquetWriterTest : public ParquetTestBase {
opts);
};

facebook::velox::parquet::thrift::PageType::type getDataPageVersion(
const dwio::common::MemorySink* sinkPtr,
const facebook::velox::parquet::ColumnChunkMetaDataPtr& colChunkPtr) {
std::string_view sinkData(sinkPtr->data(), sinkPtr->size());
auto readFile = std::make_shared<InMemoryReadFile>(sinkData);
auto file = std::make_shared<ReadFileInputStream>(std::move(readFile));
auto inputStream = std::make_unique<SeekableFileInputStream>(
std::move(file),
colChunkPtr.dataPageOffset(),
150,
*leafPool_,
LogType::TEST);
auto pageReader = std::make_unique<PageReader>(
std::move(inputStream),
*leafPool_,
colChunkPtr.compression(),
colChunkPtr.totalCompressedSize());
return pageReader->readPageHeader().type;
};

inline static const std::string kHiveConnectorId = "test-hive";
};

Expand Down Expand Up @@ -146,6 +168,50 @@ TEST_F(ParquetWriterTest, compression) {
assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_);
};

TEST_F(ParquetWriterTest, datapageVersion) {
auto schema = ROW({"c0"}, {INTEGER()});
const int64_t kRows = 1;
const auto data = makeRowVector({
makeFlatVector<int32_t>(kRows, [](auto row) { return 987; }),
});

// Set parquet datapage version and write data - then read to ensure the
// property took effect.
const auto testDataPageVersion =
[&](facebook::velox::parquet::arrow::ParquetDataPageVersion
dataPageVersion) {
// Create an in-memory writer.
auto sink = std::make_unique<MemorySink>(
200 * 1024 * 1024,
dwio::common::FileSink::Options{.pool = leafPool_.get()});
auto sinkPtr = sink.get();
facebook::velox::parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.parquetDataPageVersion = dataPageVersion;

auto writer = std::make_unique<facebook::velox::parquet::Writer>(
std::move(sink), writerOptions, rootPool_, schema);
writer->write(data);
writer->close();

dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto reader = createReaderInMemory(*sinkPtr, readerOptions);
auto readDataPageVersion = getDataPageVersion(
sinkPtr, reader->fileMetaData().rowGroup(0).columnChunk(0));
return readDataPageVersion;
};

ASSERT_EQ(
testDataPageVersion(
facebook::velox::parquet::arrow::ParquetDataPageVersion::V1),
thrift::PageType::type::DATA_PAGE);

ASSERT_EQ(
testDataPageVersion(
facebook::velox::parquet::arrow::ParquetDataPageVersion::V2),
thrift::PageType::type::DATA_PAGE_V2);
};

DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) {
SCOPED_TESTVALUE_SET(
"facebook::velox::parquet::Writer::write",
Expand Down
37 changes: 37 additions & 0 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
static_cast<int64_t>(flushPolicy->rowsInRowGroup()));
properties = properties->codec_options(options.codecOptions);
properties = properties->enable_store_decimal_as_integer();
properties = properties->data_page_version(options.parquetDataPageVersion.value());

return properties->build();
}

Expand Down Expand Up @@ -238,6 +240,21 @@ std::optional<std::string> getTimestampTimeZone(
return std::nullopt;
}

std::optional<arrow::ParquetDataPageVersion> getParquetDataPageVersion(
const config::ConfigBase& config,
const char* configKey) {
if (const auto version = config.get<std::string>(configKey)) {
if (version == "PARQUET_1_0") {
return arrow::ParquetDataPageVersion::V1;
} else if (version == "PARQUET_2_0") {
return arrow::ParquetDataPageVersion::V2;
} else {
VELOX_FAIL("Unsupported parquet datapage version {}", version.value());
}
}
return std::nullopt;
}

} // namespace

Writer::Writer(
Expand Down Expand Up @@ -470,6 +487,26 @@ void WriterOptions::processConfigs(
: getTimestampTimeZone(
connectorConfig, core::QueryConfig::kSessionTimezone);
}

// OG
// auto sessionParquetDataPageVersion = getParquetDataPageVersion(
// *sessionProperties, kParquetSessionDataPageVersion);

// if (sessionParquetDataPageVersion != DataPageVersion::kDefault) {
// parquetWriterOptions->parquetDataPageVersion =
// sessionParquetDataPageVersion;
// } else {
// parquetWriterOptions->parquetDataPageVersion = getParquetDataPageVersion(
// *hiveConfig->config(),
// kParquetSessionDataPageVersion);
// }

if (!parquetDataPageVersion) {
parquetDataPageVersion =
getParquetDataPageVersion(session, kParquetSessionDataPageVersion).has_value()
? getParquetDataPageVersion(session, kParquetSessionDataPageVersion)
: getParquetDataPageVersion(connectorConfig, kParquetSessionDataPageVersion);
}
}

} // namespace facebook::velox::parquet
4 changes: 4 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "velox/dwio/common/Options.h"
#include "velox/dwio/common/Writer.h"
#include "velox/dwio/common/WriterFactory.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/dwio/parquet/writer/arrow/Types.h"
#include "velox/dwio/parquet/writer/arrow/util/Compression.h"
#include "velox/vector/ComplexVector.h"
Expand Down Expand Up @@ -108,6 +109,7 @@ struct WriterOptions : public dwio::common::WriterOptions {
/// Timestamp time zone for Parquet write through Arrow bridge.
std::optional<std::string> parquetWriteTimestampTimeZone;
bool writeInt96AsTimestamp = false;
std::optional<arrow::ParquetDataPageVersion> parquetDataPageVersion;

// Parsing session and hive configs.

Expand All @@ -117,6 +119,8 @@ struct WriterOptions : public dwio::common::WriterOptions {
"hive.parquet.writer.timestamp_unit";
static constexpr const char* kParquetHiveConnectorWriteTimestampUnit =
"hive.parquet.writer.timestamp-unit";
static constexpr const char* kParquetSessionDataPageVersion =
"parquet_writer_version";

// Process hive connector and session configs.
void processConfigs(
Expand Down

0 comments on commit 2f489a5

Please sign in to comment.