Skip to content

Commit

Permalink
Add physicalWrittenBytes into the PlanNodeStats (facebookincubator#8213)
Browse files Browse the repository at this point in the history
Pass spark 3.4 unit test when enabling native parquet write (#466)
  • Loading branch information
JkSelf authored and marin-ma committed Jan 5, 2024
1 parent 01ae31d commit e949717
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 70 deletions.
5 changes: 5 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ bool HiveConfig::isFileColumnNamesReadAsLowerCase(const Config* session) const {
return config_->get<bool>(kFileColumnNamesReadAsLowerCase, false);
}

bool HiveConfig::isPartitionPathAsLowerCaseSession(
const Config* session) const {
return config_->get<bool>(kPartitionPathAsLowerCaseSession, true);
}

int64_t HiveConfig::maxCoalescedBytes() const {
return config_->get<int64_t>(kMaxCoalescedBytes, 128 << 20);
}
Expand Down
5 changes: 5 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ class HiveConfig {
static constexpr const char* kFileColumnNamesReadAsLowerCaseSession =
"file_column_names_read_as_lower_case";

static constexpr const char* kPartitionPathAsLowerCaseSession =
"partition_path_as_lower_case";

/// Sets the max coalesce bytes for a request.
static constexpr const char* kMaxCoalescedBytes = "max-coalesced-bytes";

Expand Down Expand Up @@ -202,6 +205,8 @@ class HiveConfig {

bool isFileColumnNamesReadAsLowerCase(const Config* session) const;

bool isPartitionPathAsLowerCaseSession(const Config* session) const;

int64_t maxCoalescedBytes() const;

int32_t maxCoalescedDistanceBytes() const;
Expand Down
69 changes: 60 additions & 9 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,28 @@ namespace facebook::velox::connector::hive {

namespace {

RowVectorPtr makeDataInput(
const std::vector<column_index_t>& partitonCols,
const RowVectorPtr& input,
const RowTypePtr& dataType) {
std::vector<VectorPtr> childVectors;
childVectors.reserve(dataType->size());
for (uint32_t i = 0; i < input->childrenSize(); i++) {
if (std::find(partitonCols.cbegin(), partitonCols.cend(), i) ==
partitonCols.cend()) {
childVectors.push_back(input->childAt(i));
}
}

return std::make_shared<RowVector>(
input->pool(),
dataType,
input->nulls(),
input->size(),
std::move(childVectors),
input->getNullCount());
}

// Returns a subset of column indices corresponding to partition keys.
std::vector<column_index_t> getPartitionChannels(
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
Expand Down Expand Up @@ -301,12 +323,15 @@ HiveDataSink::HiveDataSink(
connectorQueryCtx->sessionProperties())),
partitionChannels_(getPartitionChannels(insertTableHandle_)),
partitionIdGenerator_(
!partitionChannels_.empty() ? std::make_unique<PartitionIdGenerator>(
inputType_,
partitionChannels_,
maxOpenWriters_,
connectorQueryCtx_->memoryPool())
: nullptr),
!partitionChannels_.empty()
? std::make_unique<PartitionIdGenerator>(
inputType_,
partitionChannels_,
maxOpenWriters_,
connectorQueryCtx_->memoryPool(),
hiveConfig_->isPartitionPathAsLowerCaseSession(
connectorQueryCtx->sessionProperties()))
: nullptr),
bucketCount_(
insertTableHandle_->bucketProperty() == nullptr
? 0
Expand All @@ -331,6 +356,18 @@ HiveDataSink::HiveDataSink(
"Unsupported commit strategy: {}",
commitStrategyToString(commitStrategy_));

// Get the data input type based on the inputType and the parition index.
std::vector<TypePtr> childTypes;
std::vector<std::string> childNames;
for (auto i = 0; i < inputType_->size(); i++) {
if (std::find(partitionChannels_.cbegin(), partitionChannels_.cend(), i) ==
partitionChannels_.end()) {
childNames.push_back(inputType_->nameOf(i));
childTypes.push_back(inputType_->childAt(i));
}
}
dataType_ = ROW(std::move(childNames), std::move(childTypes));

if (!isBucketed()) {
return;
}
Expand Down Expand Up @@ -400,8 +437,17 @@ void HiveDataSink::appendData(RowVectorPtr input) {

void HiveDataSink::write(size_t index, const VectorPtr& input) {
WRITER_NON_RECLAIMABLE_SECTION_GUARD(index);
writers_[index]->write(input);
writerInfo_[index]->numWrittenRows += input->size();
// Skip the partition columns before writing.
auto dataInput = input;
if (!isBucketed()) {
dataInput = makeDataInput(
partitionChannels_,
std::dynamic_pointer_cast<RowVector>(input),
dataType_);
}

writers_[index]->write(dataInput);
writerInfo_[index]->numWrittenRows += dataInput->size();
}

std::string HiveDataSink::stateString(State state) {
Expand Down Expand Up @@ -597,7 +643,12 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
dwio::common::WriterOptions options;
const auto* connectorSessionProperties =
connectorQueryCtx_->sessionProperties();
options.schema = inputType_;
if (!isBucketed()) {
options.schema = dataType_;
} else {
options.schema = inputType_;
}

options.memoryPool = writerInfo_.back()->writerPool.get();
options.compressionKind = insertTableHandle_->compressionKind();
if (canReclaim()) {
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,8 @@ class HiveDataSink : public DataSink {
void closeInternal();

const RowTypePtr inputType_;
// Written data columns into file.
RowTypePtr dataType_;
const std::shared_ptr<const HiveInsertTableHandle> insertTableHandle_;
const ConnectorQueryCtx* const connectorQueryCtx_;
const CommitStrategy commitStrategy_;
Expand Down
9 changes: 6 additions & 3 deletions velox/connectors/hive/PartitionIdGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ PartitionIdGenerator::PartitionIdGenerator(
const RowTypePtr& inputType,
std::vector<column_index_t> partitionChannels,
uint32_t maxPartitions,
memory::MemoryPool* pool)
memory::MemoryPool* pool,
bool partitionPathAsLowerCase)
: partitionChannels_(std::move(partitionChannels)),
maxPartitions_(maxPartitions) {
maxPartitions_(maxPartitions),
partitionPathAsLowerCase_(partitionPathAsLowerCase) {
VELOX_USER_CHECK(
!partitionChannels_.empty(), "There must be at least one partition key.");
for (auto channel : partitionChannels_) {
Expand Down Expand Up @@ -98,7 +100,8 @@ void PartitionIdGenerator::run(

std::string PartitionIdGenerator::partitionName(uint64_t partitionId) const {
return FileUtils::makePartName(
extractPartitionKeyValues(partitionValues_, partitionId));
extractPartitionKeyValues(partitionValues_, partitionId),
partitionPathAsLowerCase_);
}

void PartitionIdGenerator::computeValueIds(
Expand Down
5 changes: 4 additions & 1 deletion velox/connectors/hive/PartitionIdGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class PartitionIdGenerator {
const RowTypePtr& inputType,
std::vector<column_index_t> partitionChannels,
uint32_t maxPartitions,
memory::MemoryPool* pool);
memory::MemoryPool* pool,
bool partitionPathAsLowerCase = true);

/// Generate sequential partition IDs for input vector.
/// @param input Input RowVector.
Expand Down Expand Up @@ -77,6 +78,8 @@ class PartitionIdGenerator {

const uint32_t maxPartitions_;

bool partitionPathAsLowerCase_;

std::vector<std::unique_ptr<exec::VectorHasher>> hashers_;

// A mapping from value ID produced by VectorHashers to a partition ID.
Expand Down
12 changes: 9 additions & 3 deletions velox/dwio/catalog/fbhive/FileUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,20 @@ std::string FileUtils::unescapePathName(const std::string& data) {
}

std::string FileUtils::makePartName(
const std::vector<std::pair<std::string, std::string>>& entries) {
const std::vector<std::pair<std::string, std::string>>& entries,
bool partitionPathAsLowerCase) {
size_t size = 0;
size_t escapeCount = 0;
std::for_each(entries.begin(), entries.end(), [&](auto& pair) {
auto keySize = pair.first.size();
DWIO_ENSURE_GT(keySize, 0);
size += keySize;
escapeCount += countEscape(pair.first);
if (partitionPathAsLowerCase) {
escapeCount += countEscape(toLower(pair.first));
} else {
escapeCount += countEscape(pair.first);
}

auto valSize = pair.second.size();
if (valSize == 0) {
size += DEFAULT_PARTITION_VALUE.size();
Expand All @@ -182,7 +188,7 @@ std::string FileUtils::makePartName(
if (ret.size() > 0) {
ret += "/";
}
ret += escapePathName(toLower(pair.first));
ret += escapePathName(pair.first);
ret += "=";
if (pair.second.size() == 0) {
ret += DEFAULT_PARTITION_VALUE;
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/catalog/fbhive/FileUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class FileUtils {
/// Creates the partition directory path from the list of partition key/value
/// pairs, will do url-encoding when needed.
static std::string makePartName(
const std::vector<std::pair<std::string, std::string>>& entries);
const std::vector<std::pair<std::string, std::string>>& entries,
bool partitionPathAsLowerCase = true);

/// Converts the hive-metastore-compliant path name back to the corresponding
/// partition key/value pairs.
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/catalog/fbhive/test/FileUtilsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ TEST(FileUtilsTests, MakePartName) {
{"ds", "2016-01-01"}, {"FOO", ""}, {"a\nb:c", "a#b=c"}};
ASSERT_EQ(
FileUtils::makePartName(pairs),
"ds=2016-01-01/foo=__HIVE_DEFAULT_PARTITION__/a%0Ab%3Ac=a%23b%3Dc");
"ds=2016-01-01/FOO=__HIVE_DEFAULT_PARTITION__/a%0Ab%3Ac=a%23b%3Dc");
}

TEST(FileUtilsTests, ParsePartKeyValues) {
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/PlanNodeStats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ void PlanNodeStats::addTotals(const OperatorStats& stats) {
peakMemoryBytes += stats.memoryStats.peakTotalMemoryReservation;
numMemoryAllocations += stats.memoryStats.numMemoryAllocations;

physicalWrittenBytes += stats.physicalWrittenBytes;

for (const auto& [name, runtimeStats] : stats.runtimeStats) {
if (UNLIKELY(customStats.count(name) == 0)) {
customStats.insert(std::make_pair(name, runtimeStats));
Expand Down Expand Up @@ -154,6 +156,7 @@ folly::dynamic toPlanStatsJson(const facebook::velox::exec::TaskStats& stats) {
stat["blockedWallNanos"] = operatorStat.second->blockedWallNanos;
stat["peakMemoryBytes"] = operatorStat.second->peakMemoryBytes;
stat["numMemoryAllocations"] = operatorStat.second->numMemoryAllocations;
stat["physicalWrittenBytes"] = operatorStat.second->physicalWrittenBytes;
stat["numDrivers"] = operatorStat.second->numDrivers;
stat["numSplits"] = operatorStat.second->numSplits;
stat["spilledInputBytes"] = operatorStat.second->spilledInputBytes;
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/PlanNodeStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ struct PlanNodeStats {

uint64_t numMemoryAllocations{0};

uint64_t physicalWrittenBytes{0};

/// Operator-specific counters.
std::unordered_map<std::string, RuntimeMetric> customStats;

Expand Down
Loading

0 comments on commit e949717

Please sign in to comment.