Skip to content

Commit

Permalink
feat: Add option to HiveDataSink to always create a file (#12331)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #12331

This change adds an option to HiveDataSink (plumbed via HiveInsertTableHandle) to always
create a file.  This is useful for query engines that write bucketed tables by running a Task per
output bucket rather than by setting the HiveBucketProperty. In this case, even if there is no data
to process, Velox will still produce an empty file for the output bucket.

Note that this option is not possible to support when the HiveBucketProperty is set or there a
partition keys in the data, as this means that the number of files/directories respectively is
determined at runtime from the input data. Checks for this are enforced at plan generation time.

Reviewed By: Yuhta

Differential Revision: D69629165

fbshipit-source-id: d3ea37c4ad58c1562e3940f35595ffdfade70c90
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed Feb 14, 2025
1 parent 704aaf7 commit 7861409
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 18 deletions.
15 changes: 14 additions & 1 deletion velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,13 @@ HiveDataSink::HiveDataSink(
"Unsupported commit strategy: {}",
commitStrategyToString(commitStrategy_));

if (insertTableHandle_->ensureFiles()) {
VELOX_CHECK(
!isPartitioned() && !isBucketed(),
"ensureFiles is not supported with bucketing or partition keys in the data");
ensureWriter(HiveWriterId::unpartitionedId());
}

if (!isBucketed()) {
return;
}
Expand Down Expand Up @@ -1011,6 +1018,8 @@ folly::dynamic HiveInsertTableHandle::serialize() const {
params[key] = value;
}
obj["serdeParameters"] = params;

obj["ensureFiles"] = ensureFiles_;
return obj;
}

Expand Down Expand Up @@ -1040,13 +1049,17 @@ HiveInsertTableHandlePtr HiveInsertTableHandle::create(
serdeParameters.emplace(pair.first.asString(), pair.second.asString());
}

bool ensureFiles = obj["ensureFiles"].asBool();

return std::make_shared<HiveInsertTableHandle>(
inputColumns,
locationHandle,
storageFormat,
bucketProperty,
compressionKind,
serdeParameters);
serdeParameters,
nullptr, // writerOptions is not serializable
ensureFiles);
}

void HiveInsertTableHandle::registerSerDe() {
Expand Down
32 changes: 28 additions & 4 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "velox/connectors/Connector.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/PartitionIdGenerator.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/dwio/common/Options.h"
#include "velox/dwio/common/Writer.h"
#include "velox/dwio/common/WriterFactory.h"
Expand All @@ -30,8 +31,6 @@ class Writer;

namespace facebook::velox::connector::hive {

class HiveColumnHandle;

class LocationHandle;
using LocationHandlePtr = std::shared_ptr<const LocationHandle>;

Expand Down Expand Up @@ -206,19 +205,39 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
std::optional<common::CompressionKind> compressionKind = {},
const std::unordered_map<std::string, std::string>& serdeParameters = {},
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions =
nullptr)
nullptr,
// When this option is set the HiveDataSink will always write a file even
// if there's no data. This is useful when the table is bucketed, but the
// engine handles ensuring a 1 to 1 mapping from task to bucket.
const bool ensureFiles = false)
: inputColumns_(std::move(inputColumns)),
locationHandle_(std::move(locationHandle)),
storageFormat_(storageFormat),
bucketProperty_(std::move(bucketProperty)),
compressionKind_(compressionKind),
serdeParameters_(serdeParameters),
writerOptions_(writerOptions) {
writerOptions_(writerOptions),
ensureFiles_(ensureFiles) {
if (compressionKind.has_value()) {
VELOX_CHECK(
compressionKind.value() != common::CompressionKind_MAX,
"Unsupported compression type: CompressionKind_MAX");
}

if (ensureFiles_) {
// If ensureFiles is set and either the bucketProperty is set or some
// partition keys are in the data, there is not a 1:1 mapping from Task to
// files so we can't proactively create writers.
VELOX_CHECK(
bucketProperty_ == nullptr || bucketProperty_->bucketCount() == 0,
"ensureFiles is not supported with bucketing");

for (const auto& inputColumn : inputColumns_) {
VELOX_CHECK(
!inputColumn->isPartitionKey(),
"ensureFiles is not supported with partition keys in the data");
}
}
}

virtual ~HiveInsertTableHandle() = default;
Expand Down Expand Up @@ -248,6 +267,10 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
return writerOptions_;
}

bool ensureFiles() const {
return ensureFiles_;
}

bool supportsMultiThreading() const override {
return true;
}
Expand Down Expand Up @@ -276,6 +299,7 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
const std::optional<common::CompressionKind> compressionKind_;
const std::unordered_map<std::string, std::string> serdeParameters_;
const std::shared_ptr<dwio::common::WriterOptions> writerOptions_;
const bool ensureFiles_;
};

/// Parameters for Hive writers.
Expand Down
111 changes: 107 additions & 4 deletions velox/connectors/hive/tests/HiveDataSinkTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
const std::shared_ptr<connector::hive::HiveBucketProperty>&
bucketProperty = nullptr,
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions =
nullptr) {
nullptr,
const bool ensureFiles = false) {
return makeHiveInsertTableHandle(
outputRowType->names(),
outputRowType->children(),
Expand All @@ -163,7 +164,8 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
fileFormat,
CompressionKind::CompressionKind_ZSTD,
{},
writerOptions);
writerOptions,
ensureFiles);
}

std::shared_ptr<HiveDataSink> createDataSink(
Expand All @@ -174,7 +176,8 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
const std::shared_ptr<connector::hive::HiveBucketProperty>&
bucketProperty = nullptr,
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions =
nullptr) {
nullptr,
const bool ensureFiles = false) {
return std::make_shared<HiveDataSink>(
rowType,
createHiveInsertTableHandle(
Expand All @@ -183,7 +186,8 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
fileFormat,
partitionedBy,
bucketProperty,
writerOptions),
writerOptions,
ensureFiles),
connectorQueryCtx_.get(),
CommitStrategy::kNoCommit,
connectorConfig_);
Expand Down Expand Up @@ -1211,6 +1215,105 @@ TEST_F(HiveDataSinkTest, flushPolicyWithDWRF) {
ASSERT_EQ(reader->getRowsPerStripe()[0], 500);
}

TEST_F(HiveDataSinkTest, ensureFilesNoData) {
const auto outputDirectory = TempDirectoryPath::create();
auto dataSink = createDataSink(
rowType_,
outputDirectory->getPath(),
dwio::common::FileFormat::DWRF,
{}, // partitionBy
nullptr, // bucketProperty
nullptr, // writeOptions
true // ensureFiles
);

ASSERT_TRUE(dataSink->finish());

auto partitions = dataSink->close();
auto stats = dataSink->stats();
ASSERT_FALSE(stats.empty());
ASSERT_GT(stats.numWrittenBytes, 0);
ASSERT_EQ(stats.numWrittenFiles, 1);
ASSERT_EQ(partitions.size(), 1);

std::vector<RowVectorPtr> vectors{RowVector::createEmpty(rowType_, pool())};
createDuckDbTable(vectors);
verifyWrittenData(outputDirectory->getPath());
}

TEST_F(HiveDataSinkTest, ensureFilesWithData) {
const auto outputDirectory = TempDirectoryPath::create();
auto dataSink = createDataSink(
rowType_,
outputDirectory->getPath(),
dwio::common::FileFormat::DWRF,
{}, // partitionBy
nullptr, // bucketProperty
nullptr, // writeOptions
true // ensureFiles
);

const int numBatches = 10;
const auto vectors = createVectors(500, numBatches);
for (const auto& vector : vectors) {
dataSink->appendData(vector);
}

ASSERT_TRUE(dataSink->finish());

auto partitions = dataSink->close();
auto stats = dataSink->stats();
ASSERT_FALSE(stats.empty());
ASSERT_GT(stats.numWrittenBytes, 0);
ASSERT_EQ(stats.numWrittenFiles, 1);
ASSERT_EQ(partitions.size(), 1);

createDuckDbTable(vectors);
verifyWrittenData(outputDirectory->getPath());
}

TEST_F(HiveDataSinkTest, ensureFilesUnsupported) {
VELOX_ASSERT_THROW(
makeHiveInsertTableHandle(
rowType_->names(),
rowType_->children(),
{rowType_->names()[0]}, // partitionedBy
nullptr, // bucketProperty
makeLocationHandle(
"/path/to/test",
std::nullopt,
connector::hive::LocationHandle::TableType::kNew),
dwio::common::FileFormat::DWRF,
CompressionKind::CompressionKind_ZSTD,
{}, // serdeParameters
nullptr, // writeOptions
true // ensureFiles
),
"ensureFiles is not supported with partition keys in the data");

VELOX_ASSERT_THROW(
makeHiveInsertTableHandle(
rowType_->names(),
rowType_->children(),
{}, // partitionedBy
{std::make_shared<HiveBucketProperty>(
HiveBucketProperty::Kind::kPrestoNative,
1,
std::vector<std::string>{rowType_->names()[0]},
std::vector<TypePtr>{rowType_->children()[0]},
std::vector<std::shared_ptr<const HiveSortingColumn>>{})},
makeLocationHandle(
"/path/to/test",
std::nullopt,
connector::hive::LocationHandle::TableType::kNew),
dwio::common::FileFormat::DWRF,
CompressionKind::CompressionKind_ZSTD,
{}, // serdeParameters
nullptr, // writeOptions
true // ensureFiles
),
"ensureFiles is not supported with bucketing");
}
} // namespace
} // namespace facebook::velox::connector::hive

Expand Down
12 changes: 8 additions & 4 deletions velox/exec/tests/utils/HiveConnectorTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ HiveConnectorTestBase::makeHiveInsertTableHandle(
std::shared_ptr<connector::hive::LocationHandle> locationHandle,
const dwio::common::FileFormat tableStorageFormat,
const std::optional<common::CompressionKind> compressionKind,
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions) {
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions,
const bool ensureFiles) {
return makeHiveInsertTableHandle(
tableColumnNames,
tableColumnTypes,
Expand All @@ -294,7 +295,8 @@ HiveConnectorTestBase::makeHiveInsertTableHandle(
tableStorageFormat,
compressionKind,
{},
writerOptions);
writerOptions,
ensureFiles);
}

// static
Expand All @@ -308,7 +310,8 @@ HiveConnectorTestBase::makeHiveInsertTableHandle(
const dwio::common::FileFormat tableStorageFormat,
const std::optional<common::CompressionKind> compressionKind,
const std::unordered_map<std::string, std::string>& serdeParameters,
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions) {
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions,
const bool ensureFiles) {
std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>>
columnHandles;
std::vector<std::string> bucketedBy;
Expand Down Expand Up @@ -365,7 +368,8 @@ HiveConnectorTestBase::makeHiveInsertTableHandle(
bucketProperty,
compressionKind,
serdeParameters,
writerOptions);
writerOptions,
ensureFiles);
}

std::shared_ptr<connector::hive::HiveColumnHandle>
Expand Down
8 changes: 6 additions & 2 deletions velox/exec/tests/utils/HiveConnectorTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ class HiveConnectorTestBase : public OperatorTestBase {
/// @param locationHandle Location handle for the table write.
/// @param compressionKind compression algorithm to use for table write.
/// @param serdeParameters Table writer configuration parameters.
/// @param ensureFiles When this option is set the HiveDataSink will always
/// create a file even if there is no data.
static std::shared_ptr<connector::hive::HiveInsertTableHandle>
makeHiveInsertTableHandle(
const std::vector<std::string>& tableColumnNames,
Expand All @@ -187,7 +189,8 @@ class HiveConnectorTestBase : public OperatorTestBase {
const std::optional<common::CompressionKind> compressionKind = {},
const std::unordered_map<std::string, std::string>& serdeParameters = {},
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions =
nullptr);
nullptr,
const bool ensureFiles = false);

static std::shared_ptr<connector::hive::HiveInsertTableHandle>
makeHiveInsertTableHandle(
Expand All @@ -199,7 +202,8 @@ class HiveConnectorTestBase : public OperatorTestBase {
dwio::common::FileFormat::DWRF,
const std::optional<common::CompressionKind> compressionKind = {},
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions =
nullptr);
nullptr,
const bool ensureFiles = false);

static std::shared_ptr<connector::hive::HiveColumnHandle> regularColumn(
const std::string& name,
Expand Down
7 changes: 5 additions & 2 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ core::PlanNodePtr PlanBuilder::TableWriterBuilder::build(core::PlanNodeId id) {
bucketProperty,
compressionKind_,
serdeParameters_,
options_);
options_,
ensureFiles_);

auto insertHandle =
std::make_shared<core::InsertTableHandle>(connectorId_, hiveHandle);
Expand Down Expand Up @@ -508,7 +509,8 @@ PlanBuilder& PlanBuilder::tableWrite(
const std::shared_ptr<dwio::common::WriterOptions>& options,
const std::string& outputFileName,
const common::CompressionKind compressionKind,
const RowTypePtr& schema) {
const RowTypePtr& schema,
const bool ensureFiles) {
return TableWriterBuilder(*this)
.outputDirectoryPath(outputDirectoryPath)
.outputFileName(outputFileName)
Expand All @@ -523,6 +525,7 @@ PlanBuilder& PlanBuilder::tableWrite(
.serdeParameters(serdeParameters)
.options(options)
.compressionKind(compressionKind)
.ensureFiles(ensureFiles)
.endTableWriter();
}

Expand Down
Loading

0 comments on commit 7861409

Please sign in to comment.