From 7861409dd719728e9f551e9a3f67b8408da30cb0 Mon Sep 17 00:00:00 2001 From: Kevin Wilfong Date: Fri, 14 Feb 2025 09:56:35 -0800 Subject: [PATCH] feat: Add option to HiveDataSink to always create a file (#12331) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/12331 This change adds an option to HiveDataSink (plumbed via HiveInsertTableHandle) to always create a file. This is useful for query engines that write bucketed tables by running a Task per output bucket rather than by setting the HiveBucketProperty. In this case, even if there is no data to process, Velox will still produce an empty file for the output bucket. Note that this option is not possible to support when the HiveBucketProperty is set or there a partition keys in the data, as this means that the number of files/directories respectively is determined at runtime from the input data. Checks for this are enforced at plan generation time. Reviewed By: Yuhta Differential Revision: D69629165 fbshipit-source-id: d3ea37c4ad58c1562e3940f35595ffdfade70c90 --- velox/connectors/hive/HiveDataSink.cpp | 15 ++- velox/connectors/hive/HiveDataSink.h | 32 ++++- .../hive/tests/HiveDataSinkTest.cpp | 111 +++++++++++++++++- .../tests/utils/HiveConnectorTestBase.cpp | 12 +- .../exec/tests/utils/HiveConnectorTestBase.h | 8 +- velox/exec/tests/utils/PlanBuilder.cpp | 7 +- velox/exec/tests/utils/PlanBuilder.h | 14 ++- 7 files changed, 181 insertions(+), 18 deletions(-) diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 3d8b9bb05bfb..36e1d78314da 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -412,6 +412,13 @@ HiveDataSink::HiveDataSink( "Unsupported commit strategy: {}", commitStrategyToString(commitStrategy_)); + if (insertTableHandle_->ensureFiles()) { + VELOX_CHECK( + !isPartitioned() && !isBucketed(), + "ensureFiles is not supported with bucketing or partition keys in the data"); + ensureWriter(HiveWriterId::unpartitionedId()); + } + if (!isBucketed()) { return; } @@ -1011,6 +1018,8 @@ folly::dynamic HiveInsertTableHandle::serialize() const { params[key] = value; } obj["serdeParameters"] = params; + + obj["ensureFiles"] = ensureFiles_; return obj; } @@ -1040,13 +1049,17 @@ HiveInsertTableHandlePtr HiveInsertTableHandle::create( serdeParameters.emplace(pair.first.asString(), pair.second.asString()); } + bool ensureFiles = obj["ensureFiles"].asBool(); + return std::make_shared( inputColumns, locationHandle, storageFormat, bucketProperty, compressionKind, - serdeParameters); + serdeParameters, + nullptr, // writerOptions is not serializable + ensureFiles); } void HiveInsertTableHandle::registerSerDe() { diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index 8274d8204f03..ca8ea8adc86d 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -19,6 +19,7 @@ #include "velox/connectors/Connector.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/PartitionIdGenerator.h" +#include "velox/connectors/hive/TableHandle.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Writer.h" #include "velox/dwio/common/WriterFactory.h" @@ -30,8 +31,6 @@ class Writer; namespace facebook::velox::connector::hive { -class HiveColumnHandle; - class LocationHandle; using LocationHandlePtr = std::shared_ptr; @@ -206,19 +205,39 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle { std::optional compressionKind = {}, const std::unordered_map& serdeParameters = {}, const std::shared_ptr& writerOptions = - nullptr) + nullptr, + // When this option is set the HiveDataSink will always write a file even + // if there's no data. This is useful when the table is bucketed, but the + // engine handles ensuring a 1 to 1 mapping from task to bucket. + const bool ensureFiles = false) : inputColumns_(std::move(inputColumns)), locationHandle_(std::move(locationHandle)), storageFormat_(storageFormat), bucketProperty_(std::move(bucketProperty)), compressionKind_(compressionKind), serdeParameters_(serdeParameters), - writerOptions_(writerOptions) { + writerOptions_(writerOptions), + ensureFiles_(ensureFiles) { if (compressionKind.has_value()) { VELOX_CHECK( compressionKind.value() != common::CompressionKind_MAX, "Unsupported compression type: CompressionKind_MAX"); } + + if (ensureFiles_) { + // If ensureFiles is set and either the bucketProperty is set or some + // partition keys are in the data, there is not a 1:1 mapping from Task to + // files so we can't proactively create writers. + VELOX_CHECK( + bucketProperty_ == nullptr || bucketProperty_->bucketCount() == 0, + "ensureFiles is not supported with bucketing"); + + for (const auto& inputColumn : inputColumns_) { + VELOX_CHECK( + !inputColumn->isPartitionKey(), + "ensureFiles is not supported with partition keys in the data"); + } + } } virtual ~HiveInsertTableHandle() = default; @@ -248,6 +267,10 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle { return writerOptions_; } + bool ensureFiles() const { + return ensureFiles_; + } + bool supportsMultiThreading() const override { return true; } @@ -276,6 +299,7 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle { const std::optional compressionKind_; const std::unordered_map serdeParameters_; const std::shared_ptr writerOptions_; + const bool ensureFiles_; }; /// Parameters for Hive writers. diff --git a/velox/connectors/hive/tests/HiveDataSinkTest.cpp b/velox/connectors/hive/tests/HiveDataSinkTest.cpp index eef50a45090e..eec694818ed0 100644 --- a/velox/connectors/hive/tests/HiveDataSinkTest.cpp +++ b/velox/connectors/hive/tests/HiveDataSinkTest.cpp @@ -150,7 +150,8 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { const std::shared_ptr& bucketProperty = nullptr, const std::shared_ptr& writerOptions = - nullptr) { + nullptr, + const bool ensureFiles = false) { return makeHiveInsertTableHandle( outputRowType->names(), outputRowType->children(), @@ -163,7 +164,8 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { fileFormat, CompressionKind::CompressionKind_ZSTD, {}, - writerOptions); + writerOptions, + ensureFiles); } std::shared_ptr createDataSink( @@ -174,7 +176,8 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { const std::shared_ptr& bucketProperty = nullptr, const std::shared_ptr& writerOptions = - nullptr) { + nullptr, + const bool ensureFiles = false) { return std::make_shared( rowType, createHiveInsertTableHandle( @@ -183,7 +186,8 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { fileFormat, partitionedBy, bucketProperty, - writerOptions), + writerOptions, + ensureFiles), connectorQueryCtx_.get(), CommitStrategy::kNoCommit, connectorConfig_); @@ -1211,6 +1215,105 @@ TEST_F(HiveDataSinkTest, flushPolicyWithDWRF) { ASSERT_EQ(reader->getRowsPerStripe()[0], 500); } +TEST_F(HiveDataSinkTest, ensureFilesNoData) { + const auto outputDirectory = TempDirectoryPath::create(); + auto dataSink = createDataSink( + rowType_, + outputDirectory->getPath(), + dwio::common::FileFormat::DWRF, + {}, // partitionBy + nullptr, // bucketProperty + nullptr, // writeOptions + true // ensureFiles + ); + + ASSERT_TRUE(dataSink->finish()); + + auto partitions = dataSink->close(); + auto stats = dataSink->stats(); + ASSERT_FALSE(stats.empty()); + ASSERT_GT(stats.numWrittenBytes, 0); + ASSERT_EQ(stats.numWrittenFiles, 1); + ASSERT_EQ(partitions.size(), 1); + + std::vector vectors{RowVector::createEmpty(rowType_, pool())}; + createDuckDbTable(vectors); + verifyWrittenData(outputDirectory->getPath()); +} + +TEST_F(HiveDataSinkTest, ensureFilesWithData) { + const auto outputDirectory = TempDirectoryPath::create(); + auto dataSink = createDataSink( + rowType_, + outputDirectory->getPath(), + dwio::common::FileFormat::DWRF, + {}, // partitionBy + nullptr, // bucketProperty + nullptr, // writeOptions + true // ensureFiles + ); + + const int numBatches = 10; + const auto vectors = createVectors(500, numBatches); + for (const auto& vector : vectors) { + dataSink->appendData(vector); + } + + ASSERT_TRUE(dataSink->finish()); + + auto partitions = dataSink->close(); + auto stats = dataSink->stats(); + ASSERT_FALSE(stats.empty()); + ASSERT_GT(stats.numWrittenBytes, 0); + ASSERT_EQ(stats.numWrittenFiles, 1); + ASSERT_EQ(partitions.size(), 1); + + createDuckDbTable(vectors); + verifyWrittenData(outputDirectory->getPath()); +} + +TEST_F(HiveDataSinkTest, ensureFilesUnsupported) { + VELOX_ASSERT_THROW( + makeHiveInsertTableHandle( + rowType_->names(), + rowType_->children(), + {rowType_->names()[0]}, // partitionedBy + nullptr, // bucketProperty + makeLocationHandle( + "/path/to/test", + std::nullopt, + connector::hive::LocationHandle::TableType::kNew), + dwio::common::FileFormat::DWRF, + CompressionKind::CompressionKind_ZSTD, + {}, // serdeParameters + nullptr, // writeOptions + true // ensureFiles + ), + "ensureFiles is not supported with partition keys in the data"); + + VELOX_ASSERT_THROW( + makeHiveInsertTableHandle( + rowType_->names(), + rowType_->children(), + {}, // partitionedBy + {std::make_shared( + HiveBucketProperty::Kind::kPrestoNative, + 1, + std::vector{rowType_->names()[0]}, + std::vector{rowType_->children()[0]}, + std::vector>{})}, + makeLocationHandle( + "/path/to/test", + std::nullopt, + connector::hive::LocationHandle::TableType::kNew), + dwio::common::FileFormat::DWRF, + CompressionKind::CompressionKind_ZSTD, + {}, // serdeParameters + nullptr, // writeOptions + true // ensureFiles + ), + "ensureFiles is not supported with bucketing"); +} } // namespace } // namespace facebook::velox::connector::hive diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.cpp b/velox/exec/tests/utils/HiveConnectorTestBase.cpp index bf17dd788d5e..351117910aaa 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.cpp +++ b/velox/exec/tests/utils/HiveConnectorTestBase.cpp @@ -284,7 +284,8 @@ HiveConnectorTestBase::makeHiveInsertTableHandle( std::shared_ptr locationHandle, const dwio::common::FileFormat tableStorageFormat, const std::optional compressionKind, - const std::shared_ptr& writerOptions) { + const std::shared_ptr& writerOptions, + const bool ensureFiles) { return makeHiveInsertTableHandle( tableColumnNames, tableColumnTypes, @@ -294,7 +295,8 @@ HiveConnectorTestBase::makeHiveInsertTableHandle( tableStorageFormat, compressionKind, {}, - writerOptions); + writerOptions, + ensureFiles); } // static @@ -308,7 +310,8 @@ HiveConnectorTestBase::makeHiveInsertTableHandle( const dwio::common::FileFormat tableStorageFormat, const std::optional compressionKind, const std::unordered_map& serdeParameters, - const std::shared_ptr& writerOptions) { + const std::shared_ptr& writerOptions, + const bool ensureFiles) { std::vector> columnHandles; std::vector bucketedBy; @@ -365,7 +368,8 @@ HiveConnectorTestBase::makeHiveInsertTableHandle( bucketProperty, compressionKind, serdeParameters, - writerOptions); + writerOptions, + ensureFiles); } std::shared_ptr diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index 98ca4b803757..c9ac54dfb297 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -175,6 +175,8 @@ class HiveConnectorTestBase : public OperatorTestBase { /// @param locationHandle Location handle for the table write. /// @param compressionKind compression algorithm to use for table write. /// @param serdeParameters Table writer configuration parameters. + /// @param ensureFiles When this option is set the HiveDataSink will always + /// create a file even if there is no data. static std::shared_ptr makeHiveInsertTableHandle( const std::vector& tableColumnNames, @@ -187,7 +189,8 @@ class HiveConnectorTestBase : public OperatorTestBase { const std::optional compressionKind = {}, const std::unordered_map& serdeParameters = {}, const std::shared_ptr& writerOptions = - nullptr); + nullptr, + const bool ensureFiles = false); static std::shared_ptr makeHiveInsertTableHandle( @@ -199,7 +202,8 @@ class HiveConnectorTestBase : public OperatorTestBase { dwio::common::FileFormat::DWRF, const std::optional compressionKind = {}, const std::shared_ptr& writerOptions = - nullptr); + nullptr, + const bool ensureFiles = false); static std::shared_ptr regularColumn( const std::string& name, diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 8739ae9cd965..68640fdf8c40 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -270,7 +270,8 @@ core::PlanNodePtr PlanBuilder::TableWriterBuilder::build(core::PlanNodeId id) { bucketProperty, compressionKind_, serdeParameters_, - options_); + options_, + ensureFiles_); auto insertHandle = std::make_shared(connectorId_, hiveHandle); @@ -508,7 +509,8 @@ PlanBuilder& PlanBuilder::tableWrite( const std::shared_ptr& options, const std::string& outputFileName, const common::CompressionKind compressionKind, - const RowTypePtr& schema) { + const RowTypePtr& schema, + const bool ensureFiles) { return TableWriterBuilder(*this) .outputDirectoryPath(outputDirectoryPath) .outputFileName(outputFileName) @@ -523,6 +525,7 @@ PlanBuilder& PlanBuilder::tableWrite( .serdeParameters(serdeParameters) .options(options) .compressionKind(compressionKind) + .ensureFiles(ensureFiles) .endTableWriter(); } diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 0c4c5ee68b52..c944b409ad7b 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -408,6 +408,13 @@ class PlanBuilder { return *this; } + /// @param ensureFiles When set the Task will always output a file, even if + /// it's empty. + TableWriterBuilder& ensureFiles(const bool ensureFiles) { + ensureFiles_ = ensureFiles; + return *this; + } + /// Stop the TableWriterBuilder. PlanBuilder& endTableWriter() { planBuilder_.planNode_ = build(planBuilder_.nextPlanNodeId()); @@ -436,6 +443,8 @@ class PlanBuilder { dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF}; common::CompressionKind compressionKind_{common::CompressionKind_NONE}; + + bool ensureFiles_{false}; }; /// Start a TableWriterBuilder. @@ -629,6 +638,8 @@ class PlanBuilder { /// output data files. /// @param schema Output schema to be passed to the writer. By default use the /// output of the previous operator. + /// @param ensureFiles When this option is set the HiveDataSink will always + /// create a file even if there is no data. PlanBuilder& tableWrite( const std::string& outputDirectoryPath, const std::vector& partitionBy, @@ -644,7 +655,8 @@ class PlanBuilder { const std::shared_ptr& options = nullptr, const std::string& outputFileName = "", const common::CompressionKind = common::CompressionKind_NONE, - const RowTypePtr& schema = nullptr); + const RowTypePtr& schema = nullptr, + const bool ensureFiles = false); /// Add a TableWriteMergeNode. PlanBuilder& tableWriteMerge(