diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 3d8b9bb05bfb..36e1d78314da 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -412,6 +412,13 @@ HiveDataSink::HiveDataSink( "Unsupported commit strategy: {}", commitStrategyToString(commitStrategy_)); + if (insertTableHandle_->ensureFiles()) { + VELOX_CHECK( + !isPartitioned() && !isBucketed(), + "ensureFiles is not supported with bucketing or partition keys in the data"); + ensureWriter(HiveWriterId::unpartitionedId()); + } + if (!isBucketed()) { return; } @@ -1011,6 +1018,8 @@ folly::dynamic HiveInsertTableHandle::serialize() const { params[key] = value; } obj["serdeParameters"] = params; + + obj["ensureFiles"] = ensureFiles_; return obj; } @@ -1040,13 +1049,17 @@ HiveInsertTableHandlePtr HiveInsertTableHandle::create( serdeParameters.emplace(pair.first.asString(), pair.second.asString()); } + bool ensureFiles = obj["ensureFiles"].asBool(); + return std::make_shared( inputColumns, locationHandle, storageFormat, bucketProperty, compressionKind, - serdeParameters); + serdeParameters, + nullptr, // writerOptions is not serializable + ensureFiles); } void HiveInsertTableHandle::registerSerDe() { diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index 8274d8204f03..ca8ea8adc86d 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -19,6 +19,7 @@ #include "velox/connectors/Connector.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/PartitionIdGenerator.h" +#include "velox/connectors/hive/TableHandle.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Writer.h" #include "velox/dwio/common/WriterFactory.h" @@ -30,8 +31,6 @@ class Writer; namespace facebook::velox::connector::hive { -class HiveColumnHandle; - class LocationHandle; using LocationHandlePtr = std::shared_ptr; @@ -206,19 +205,39 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle { std::optional compressionKind = {}, const std::unordered_map& serdeParameters = {}, const std::shared_ptr& writerOptions = - nullptr) + nullptr, + // When this option is set the HiveDataSink will always write a file even + // if there's no data. This is useful when the table is bucketed, but the + // engine handles ensuring a 1 to 1 mapping from task to bucket. + const bool ensureFiles = false) : inputColumns_(std::move(inputColumns)), locationHandle_(std::move(locationHandle)), storageFormat_(storageFormat), bucketProperty_(std::move(bucketProperty)), compressionKind_(compressionKind), serdeParameters_(serdeParameters), - writerOptions_(writerOptions) { + writerOptions_(writerOptions), + ensureFiles_(ensureFiles) { if (compressionKind.has_value()) { VELOX_CHECK( compressionKind.value() != common::CompressionKind_MAX, "Unsupported compression type: CompressionKind_MAX"); } + + if (ensureFiles_) { + // If ensureFiles is set and either the bucketProperty is set or some + // partition keys are in the data, there is not a 1:1 mapping from Task to + // files so we can't proactively create writers. + VELOX_CHECK( + bucketProperty_ == nullptr || bucketProperty_->bucketCount() == 0, + "ensureFiles is not supported with bucketing"); + + for (const auto& inputColumn : inputColumns_) { + VELOX_CHECK( + !inputColumn->isPartitionKey(), + "ensureFiles is not supported with partition keys in the data"); + } + } } virtual ~HiveInsertTableHandle() = default; @@ -248,6 +267,10 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle { return writerOptions_; } + bool ensureFiles() const { + return ensureFiles_; + } + bool supportsMultiThreading() const override { return true; } @@ -276,6 +299,7 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle { const std::optional compressionKind_; const std::unordered_map serdeParameters_; const std::shared_ptr writerOptions_; + const bool ensureFiles_; }; /// Parameters for Hive writers. diff --git a/velox/connectors/hive/tests/HiveDataSinkTest.cpp b/velox/connectors/hive/tests/HiveDataSinkTest.cpp index eef50a45090e..eec694818ed0 100644 --- a/velox/connectors/hive/tests/HiveDataSinkTest.cpp +++ b/velox/connectors/hive/tests/HiveDataSinkTest.cpp @@ -150,7 +150,8 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { const std::shared_ptr& bucketProperty = nullptr, const std::shared_ptr& writerOptions = - nullptr) { + nullptr, + const bool ensureFiles = false) { return makeHiveInsertTableHandle( outputRowType->names(), outputRowType->children(), @@ -163,7 +164,8 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { fileFormat, CompressionKind::CompressionKind_ZSTD, {}, - writerOptions); + writerOptions, + ensureFiles); } std::shared_ptr createDataSink( @@ -174,7 +176,8 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { const std::shared_ptr& bucketProperty = nullptr, const std::shared_ptr& writerOptions = - nullptr) { + nullptr, + const bool ensureFiles = false) { return std::make_shared( rowType, createHiveInsertTableHandle( @@ -183,7 +186,8 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { fileFormat, partitionedBy, bucketProperty, - writerOptions), + writerOptions, + ensureFiles), connectorQueryCtx_.get(), CommitStrategy::kNoCommit, connectorConfig_); @@ -1211,6 +1215,105 @@ TEST_F(HiveDataSinkTest, flushPolicyWithDWRF) { ASSERT_EQ(reader->getRowsPerStripe()[0], 500); } +TEST_F(HiveDataSinkTest, ensureFilesNoData) { + const auto outputDirectory = TempDirectoryPath::create(); + auto dataSink = createDataSink( + rowType_, + outputDirectory->getPath(), + dwio::common::FileFormat::DWRF, + {}, // partitionBy + nullptr, // bucketProperty + nullptr, // writeOptions + true // ensureFiles + ); + + ASSERT_TRUE(dataSink->finish()); + + auto partitions = dataSink->close(); + auto stats = dataSink->stats(); + ASSERT_FALSE(stats.empty()); + ASSERT_GT(stats.numWrittenBytes, 0); + ASSERT_EQ(stats.numWrittenFiles, 1); + ASSERT_EQ(partitions.size(), 1); + + std::vector vectors{RowVector::createEmpty(rowType_, pool())}; + createDuckDbTable(vectors); + verifyWrittenData(outputDirectory->getPath()); +} + +TEST_F(HiveDataSinkTest, ensureFilesWithData) { + const auto outputDirectory = TempDirectoryPath::create(); + auto dataSink = createDataSink( + rowType_, + outputDirectory->getPath(), + dwio::common::FileFormat::DWRF, + {}, // partitionBy + nullptr, // bucketProperty + nullptr, // writeOptions + true // ensureFiles + ); + + const int numBatches = 10; + const auto vectors = createVectors(500, numBatches); + for (const auto& vector : vectors) { + dataSink->appendData(vector); + } + + ASSERT_TRUE(dataSink->finish()); + + auto partitions = dataSink->close(); + auto stats = dataSink->stats(); + ASSERT_FALSE(stats.empty()); + ASSERT_GT(stats.numWrittenBytes, 0); + ASSERT_EQ(stats.numWrittenFiles, 1); + ASSERT_EQ(partitions.size(), 1); + + createDuckDbTable(vectors); + verifyWrittenData(outputDirectory->getPath()); +} + +TEST_F(HiveDataSinkTest, ensureFilesUnsupported) { + VELOX_ASSERT_THROW( + makeHiveInsertTableHandle( + rowType_->names(), + rowType_->children(), + {rowType_->names()[0]}, // partitionedBy + nullptr, // bucketProperty + makeLocationHandle( + "/path/to/test", + std::nullopt, + connector::hive::LocationHandle::TableType::kNew), + dwio::common::FileFormat::DWRF, + CompressionKind::CompressionKind_ZSTD, + {}, // serdeParameters + nullptr, // writeOptions + true // ensureFiles + ), + "ensureFiles is not supported with partition keys in the data"); + + VELOX_ASSERT_THROW( + makeHiveInsertTableHandle( + rowType_->names(), + rowType_->children(), + {}, // partitionedBy + {std::make_shared( + HiveBucketProperty::Kind::kPrestoNative, + 1, + std::vector{rowType_->names()[0]}, + std::vector{rowType_->children()[0]}, + std::vector>{})}, + makeLocationHandle( + "/path/to/test", + std::nullopt, + connector::hive::LocationHandle::TableType::kNew), + dwio::common::FileFormat::DWRF, + CompressionKind::CompressionKind_ZSTD, + {}, // serdeParameters + nullptr, // writeOptions + true // ensureFiles + ), + "ensureFiles is not supported with bucketing"); +} } // namespace } // namespace facebook::velox::connector::hive diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.cpp b/velox/exec/tests/utils/HiveConnectorTestBase.cpp index bf17dd788d5e..351117910aaa 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.cpp +++ b/velox/exec/tests/utils/HiveConnectorTestBase.cpp @@ -284,7 +284,8 @@ HiveConnectorTestBase::makeHiveInsertTableHandle( std::shared_ptr locationHandle, const dwio::common::FileFormat tableStorageFormat, const std::optional compressionKind, - const std::shared_ptr& writerOptions) { + const std::shared_ptr& writerOptions, + const bool ensureFiles) { return makeHiveInsertTableHandle( tableColumnNames, tableColumnTypes, @@ -294,7 +295,8 @@ HiveConnectorTestBase::makeHiveInsertTableHandle( tableStorageFormat, compressionKind, {}, - writerOptions); + writerOptions, + ensureFiles); } // static @@ -308,7 +310,8 @@ HiveConnectorTestBase::makeHiveInsertTableHandle( const dwio::common::FileFormat tableStorageFormat, const std::optional compressionKind, const std::unordered_map& serdeParameters, - const std::shared_ptr& writerOptions) { + const std::shared_ptr& writerOptions, + const bool ensureFiles) { std::vector> columnHandles; std::vector bucketedBy; @@ -365,7 +368,8 @@ HiveConnectorTestBase::makeHiveInsertTableHandle( bucketProperty, compressionKind, serdeParameters, - writerOptions); + writerOptions, + ensureFiles); } std::shared_ptr diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index 98ca4b803757..c9ac54dfb297 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -175,6 +175,8 @@ class HiveConnectorTestBase : public OperatorTestBase { /// @param locationHandle Location handle for the table write. /// @param compressionKind compression algorithm to use for table write. /// @param serdeParameters Table writer configuration parameters. + /// @param ensureFiles When this option is set the HiveDataSink will always + /// create a file even if there is no data. static std::shared_ptr makeHiveInsertTableHandle( const std::vector& tableColumnNames, @@ -187,7 +189,8 @@ class HiveConnectorTestBase : public OperatorTestBase { const std::optional compressionKind = {}, const std::unordered_map& serdeParameters = {}, const std::shared_ptr& writerOptions = - nullptr); + nullptr, + const bool ensureFiles = false); static std::shared_ptr makeHiveInsertTableHandle( @@ -199,7 +202,8 @@ class HiveConnectorTestBase : public OperatorTestBase { dwio::common::FileFormat::DWRF, const std::optional compressionKind = {}, const std::shared_ptr& writerOptions = - nullptr); + nullptr, + const bool ensureFiles = false); static std::shared_ptr regularColumn( const std::string& name, diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 8739ae9cd965..68640fdf8c40 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -270,7 +270,8 @@ core::PlanNodePtr PlanBuilder::TableWriterBuilder::build(core::PlanNodeId id) { bucketProperty, compressionKind_, serdeParameters_, - options_); + options_, + ensureFiles_); auto insertHandle = std::make_shared(connectorId_, hiveHandle); @@ -508,7 +509,8 @@ PlanBuilder& PlanBuilder::tableWrite( const std::shared_ptr& options, const std::string& outputFileName, const common::CompressionKind compressionKind, - const RowTypePtr& schema) { + const RowTypePtr& schema, + const bool ensureFiles) { return TableWriterBuilder(*this) .outputDirectoryPath(outputDirectoryPath) .outputFileName(outputFileName) @@ -523,6 +525,7 @@ PlanBuilder& PlanBuilder::tableWrite( .serdeParameters(serdeParameters) .options(options) .compressionKind(compressionKind) + .ensureFiles(ensureFiles) .endTableWriter(); } diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 0c4c5ee68b52..c944b409ad7b 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -408,6 +408,13 @@ class PlanBuilder { return *this; } + /// @param ensureFiles When set the Task will always output a file, even if + /// it's empty. + TableWriterBuilder& ensureFiles(const bool ensureFiles) { + ensureFiles_ = ensureFiles; + return *this; + } + /// Stop the TableWriterBuilder. PlanBuilder& endTableWriter() { planBuilder_.planNode_ = build(planBuilder_.nextPlanNodeId()); @@ -436,6 +443,8 @@ class PlanBuilder { dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF}; common::CompressionKind compressionKind_{common::CompressionKind_NONE}; + + bool ensureFiles_{false}; }; /// Start a TableWriterBuilder. @@ -629,6 +638,8 @@ class PlanBuilder { /// output data files. /// @param schema Output schema to be passed to the writer. By default use the /// output of the previous operator. + /// @param ensureFiles When this option is set the HiveDataSink will always + /// create a file even if there is no data. PlanBuilder& tableWrite( const std::string& outputDirectoryPath, const std::vector& partitionBy, @@ -644,7 +655,8 @@ class PlanBuilder { const std::shared_ptr& options = nullptr, const std::string& outputFileName = "", const common::CompressionKind = common::CompressionKind_NONE, - const RowTypePtr& schema = nullptr); + const RowTypePtr& schema = nullptr, + const bool ensureFiles = false); /// Add a TableWriteMergeNode. PlanBuilder& tableWriteMerge(