Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass spark 3.4 unit test when enabling native parquet write #466

Merged
merged 1 commit into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ bool HiveConfig::isFileColumnNamesReadAsLowerCase(const Config* session) const {
return config_->get<bool>(kFileColumnNamesReadAsLowerCase, false);
}

bool HiveConfig::isPartitionPathAsLowerCaseSession(
const Config* session) const {
return config_->get<bool>(kPartitionPathAsLowerCaseSession, true);
}

int64_t HiveConfig::maxCoalescedBytes() const {
return config_->get<int64_t>(kMaxCoalescedBytes, 128 << 20);
}
Expand Down
5 changes: 5 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ class HiveConfig {
static constexpr const char* kFileColumnNamesReadAsLowerCaseSession =
"file_column_names_read_as_lower_case";

static constexpr const char* kPartitionPathAsLowerCaseSession =
"partition_path_as_lower_case";

/// Sets the max coalesce bytes for a request.
static constexpr const char* kMaxCoalescedBytes = "max-coalesced-bytes";

Expand Down Expand Up @@ -202,6 +205,8 @@ class HiveConfig {

bool isFileColumnNamesReadAsLowerCase(const Config* session) const;

bool isPartitionPathAsLowerCaseSession(const Config* session) const;

int64_t maxCoalescedBytes() const;

int32_t maxCoalescedDistanceBytes() const;
Expand Down
75 changes: 66 additions & 9 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,28 @@ namespace facebook::velox::connector::hive {

namespace {

RowVectorPtr makeDataInput(
const std::vector<column_index_t>& partitonCols,
const RowVectorPtr& input,
const RowTypePtr& dataType) {
std::vector<VectorPtr> childVectors;
childVectors.reserve(dataType->size());
for (uint32_t i = 0; i < input->childrenSize(); i++) {
if (std::find(partitonCols.cbegin(), partitonCols.cend(), i) ==
partitonCols.cend()) {
childVectors.push_back(input->childAt(i));
}
}

return std::make_shared<RowVector>(
input->pool(),
dataType,
input->nulls(),
input->size(),
std::move(childVectors),
input->getNullCount());
}

// Returns a subset of column indices corresponding to partition keys.
std::vector<column_index_t> getPartitionChannels(
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
Expand Down Expand Up @@ -301,12 +323,15 @@ HiveDataSink::HiveDataSink(
connectorQueryCtx->sessionProperties())),
partitionChannels_(getPartitionChannels(insertTableHandle_)),
partitionIdGenerator_(
!partitionChannels_.empty() ? std::make_unique<PartitionIdGenerator>(
inputType_,
partitionChannels_,
maxOpenWriters_,
connectorQueryCtx_->memoryPool())
: nullptr),
!partitionChannels_.empty()
? std::make_unique<PartitionIdGenerator>(
inputType_,
partitionChannels_,
maxOpenWriters_,
connectorQueryCtx_->memoryPool(),
hiveConfig_->isPartitionPathAsLowerCaseSession(
connectorQueryCtx->sessionProperties()))
: nullptr),
bucketCount_(
insertTableHandle_->bucketProperty() == nullptr
? 0
Expand All @@ -331,6 +356,18 @@ HiveDataSink::HiveDataSink(
"Unsupported commit strategy: {}",
commitStrategyToString(commitStrategy_));

// Get the data input type based on the inputType and the parition index.
std::vector<TypePtr> childTypes;
std::vector<std::string> childNames;
for (auto i = 0; i < inputType_->size(); i++) {
if (std::find(partitionChannels_.cbegin(), partitionChannels_.cend(), i) ==
partitionChannels_.end()) {
childNames.push_back(inputType_->nameOf(i));
childTypes.push_back(inputType_->childAt(i));
}
}
dataType_ = ROW(std::move(childNames), std::move(childTypes));

if (!isBucketed()) {
return;
}
Expand Down Expand Up @@ -400,8 +437,17 @@ void HiveDataSink::appendData(RowVectorPtr input) {

void HiveDataSink::write(size_t index, const VectorPtr& input) {
WRITER_NON_RECLAIMABLE_SECTION_GUARD(index);
writers_[index]->write(input);
writerInfo_[index]->numWrittenRows += input->size();
// Skip the partition columns before writing.
auto dataInput = input;
if (!isBucketed()) {
dataInput = makeDataInput(
partitionChannels_,
std::dynamic_pointer_cast<RowVector>(input),
dataType_);
}

writers_[index]->write(dataInput);
writerInfo_[index]->numWrittenRows += dataInput->size();
}

std::string HiveDataSink::stateString(State state) {
Expand Down Expand Up @@ -597,7 +643,12 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
dwio::common::WriterOptions options;
const auto* connectorSessionProperties =
connectorQueryCtx_->sessionProperties();
options.schema = inputType_;
if (!isBucketed()) {
options.schema = dataType_;
} else {
options.schema = inputType_;
}

options.memoryPool = writerInfo_.back()->writerPool.get();
options.compressionKind = insertTableHandle_->compressionKind();
if (canReclaim()) {
Expand Down Expand Up @@ -737,6 +788,12 @@ std::pair<std::string, std::string> HiveDataSink::getWriterFileNames(
const std::string writeFileName = isCommitRequired()
? fmt::format(".tmp.velox.{}_{}", targetFileName, makeUuid())
: targetFileName;
if (insertTableHandle_->tableStorageFormat() ==
dwio::common::FileFormat::PARQUET) {
return {
fmt::format("{}{}", targetFileName, ".parquet"),
fmt::format("{}{}", writeFileName, ".parquet")};
}
return {targetFileName, writeFileName};
}

Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,8 @@ class HiveDataSink : public DataSink {
void closeInternal();

const RowTypePtr inputType_;
// Written data columns into file.
RowTypePtr dataType_;
const std::shared_ptr<const HiveInsertTableHandle> insertTableHandle_;
const ConnectorQueryCtx* const connectorQueryCtx_;
const CommitStrategy commitStrategy_;
Expand Down
9 changes: 6 additions & 3 deletions velox/connectors/hive/PartitionIdGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ PartitionIdGenerator::PartitionIdGenerator(
const RowTypePtr& inputType,
std::vector<column_index_t> partitionChannels,
uint32_t maxPartitions,
memory::MemoryPool* pool)
memory::MemoryPool* pool,
bool partitionPathAsLowerCase)
: partitionChannels_(std::move(partitionChannels)),
maxPartitions_(maxPartitions) {
maxPartitions_(maxPartitions),
partitionPathAsLowerCase_(partitionPathAsLowerCase) {
VELOX_USER_CHECK(
!partitionChannels_.empty(), "There must be at least one partition key.");
for (auto channel : partitionChannels_) {
Expand Down Expand Up @@ -98,7 +100,8 @@ void PartitionIdGenerator::run(

std::string PartitionIdGenerator::partitionName(uint64_t partitionId) const {
return FileUtils::makePartName(
extractPartitionKeyValues(partitionValues_, partitionId));
extractPartitionKeyValues(partitionValues_, partitionId),
partitionPathAsLowerCase_);
}

void PartitionIdGenerator::computeValueIds(
Expand Down
5 changes: 4 additions & 1 deletion velox/connectors/hive/PartitionIdGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class PartitionIdGenerator {
const RowTypePtr& inputType,
std::vector<column_index_t> partitionChannels,
uint32_t maxPartitions,
memory::MemoryPool* pool);
memory::MemoryPool* pool,
bool partitionPathAsLowerCase = true);

/// Generate sequential partition IDs for input vector.
/// @param input Input RowVector.
Expand Down Expand Up @@ -77,6 +78,8 @@ class PartitionIdGenerator {

const uint32_t maxPartitions_;

bool partitionPathAsLowerCase_;

std::vector<std::unique_ptr<exec::VectorHasher>> hashers_;

// A mapping from value ID produced by VectorHashers to a partition ID.
Expand Down
12 changes: 9 additions & 3 deletions velox/dwio/catalog/fbhive/FileUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,20 @@ std::string FileUtils::unescapePathName(const std::string& data) {
}

std::string FileUtils::makePartName(
const std::vector<std::pair<std::string, std::string>>& entries) {
const std::vector<std::pair<std::string, std::string>>& entries,
bool partitionPathAsLowerCase) {
size_t size = 0;
size_t escapeCount = 0;
std::for_each(entries.begin(), entries.end(), [&](auto& pair) {
auto keySize = pair.first.size();
DWIO_ENSURE_GT(keySize, 0);
size += keySize;
escapeCount += countEscape(pair.first);
if (partitionPathAsLowerCase) {
escapeCount += countEscape(toLower(pair.first));
} else {
escapeCount += countEscape(pair.first);
}

auto valSize = pair.second.size();
if (valSize == 0) {
size += DEFAULT_PARTITION_VALUE.size();
Expand All @@ -182,7 +188,7 @@ std::string FileUtils::makePartName(
if (ret.size() > 0) {
ret += "/";
}
ret += escapePathName(toLower(pair.first));
ret += escapePathName(pair.first);
ret += "=";
if (pair.second.size() == 0) {
ret += DEFAULT_PARTITION_VALUE;
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/catalog/fbhive/FileUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class FileUtils {
/// Creates the partition directory path from the list of partition key/value
/// pairs, will do url-encoding when needed.
static std::string makePartName(
const std::vector<std::pair<std::string, std::string>>& entries);
const std::vector<std::pair<std::string, std::string>>& entries,
bool partitionPathAsLowerCase = true);

/// Converts the hive-metastore-compliant path name back to the corresponding
/// partition key/value pairs.
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/catalog/fbhive/test/FileUtilsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ TEST(FileUtilsTests, MakePartName) {
{"ds", "2016-01-01"}, {"FOO", ""}, {"a\nb:c", "a#b=c"}};
ASSERT_EQ(
FileUtils::makePartName(pairs),
"ds=2016-01-01/foo=__HIVE_DEFAULT_PARTITION__/a%0Ab%3Ac=a%23b%3Dc");
"ds=2016-01-01/FOO=__HIVE_DEFAULT_PARTITION__/a%0Ab%3Ac=a%23b%3Dc");
}

TEST(FileUtilsTests, ParsePartKeyValues) {
Expand Down
Loading
Loading