Skip to content

Commit

Permalink
Add data verification support for partitioned table in writer fuzzer (#…
Browse files Browse the repository at this point in the history
…10088)

Summary: Pull Request resolved: #10088

Reviewed By: xiaoxmeng

Differential Revision: D58250169

Pulled By: kewang1024

fbshipit-source-id: f485711af6d450293a16d6ad5d121b003fcec888
  • Loading branch information
kewang1024 authored and facebook-github-bot committed Jun 9, 2024
1 parent ccaad8e commit 102526b
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 28 deletions.
5 changes: 2 additions & 3 deletions velox/dwio/catalog/fbhive/FileUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ namespace fbhive {
namespace {

constexpr size_t HEX_WIDTH = 2;
const std::string DEFAULT_PARTITION_VALUE{"__HIVE_DEFAULT_PARTITION__"};

constexpr auto charsToEscape = folly::make_array(
'"',
Expand Down Expand Up @@ -170,7 +169,7 @@ std::string FileUtils::makePartName(

auto valSize = pair.second.size();
if (valSize == 0) {
size += DEFAULT_PARTITION_VALUE.size();
size += kDefaultPartitionValue.size();
} else {
size += valSize;
escapeCount += countEscape(pair.second);
Expand All @@ -192,7 +191,7 @@ std::string FileUtils::makePartName(

ret += "=";
if (pair.second.size() == 0) {
ret += DEFAULT_PARTITION_VALUE;
ret += kDefaultPartitionValue;
} else {
ret += escapePathName(pair.second);
}
Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/catalog/fbhive/FileUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class FileUtils {

/// Converts a path name to a hive-metastore-compliant path name.
static std::string extractPartitionName(const std::string& filePath);

inline static const std::string kDefaultPartitionValue =
"__HIVE_DEFAULT_PARTITION__";
};

} // namespace fbhive
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/fuzzer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ target_link_libraries(
Folly::folly
velox_hive_connector
velox_dwio_dwrf_reader
velox_dwio_dwrf_writer)
velox_dwio_dwrf_writer
velox_dwio_catalog_fbhive)

add_library(velox_aggregation_fuzzer_base AggregationFuzzerBase.cpp)

Expand Down
67 changes: 61 additions & 6 deletions velox/exec/fuzzer/FuzzerUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,26 @@
#include <filesystem>
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/dwio/catalog/fbhive/FileUtils.h"
#include "velox/dwio/dwrf/reader/DwrfReader.h"
#include "velox/dwio/dwrf/writer/Writer.h"

using namespace facebook::velox::dwio::catalog::fbhive;

namespace facebook::velox::exec::test {

const std::string kPartitionDelimiter{"="};

// Extracts partition column name and partition value from directoryName.
std::pair<std::string, std::string> extractPartition(
const std::string& directoryName) {
auto partitionColumn =
directoryName.substr(0, directoryName.find(kPartitionDelimiter));
auto partitionValue = FileUtils::unescapePathName(
directoryName.substr(directoryName.find(kPartitionDelimiter) + 1));
return std::pair(partitionColumn, partitionValue);
}

void writeToFile(
const std::string& path,
const VectorPtr& vector,
Expand All @@ -36,6 +52,37 @@ void writeToFile(
writer.close();
}

// Recursive function to create splits with their corresponding schemas and
// store in splits.
// In a table directory, each partition would be stored as a
// sub-directory, multiple partition columns would make up nested directory
// structure.
//
// For example for a file path such as /p0=0/p1=0/0000_file1, creates
// split with partition keys (p0, 0), (p1 0)
void makeSplitsWithSchema(
const std::string& directory,
std::unordered_map<std::string, std::optional<std::string>>& partitionKeys,
std::vector<Split>& splits) {
for (auto const& entry : std::filesystem::directory_iterator{directory}) {
if (entry.is_directory()) {
auto directoryName = entry.path().string();
auto partition =
extractPartition(directoryName.substr(directory.size() + 1));
partitionKeys.insert(
{partition.first,
partition.second == FileUtils::kDefaultPartitionValue
? std::nullopt
: std::optional(partition.second)});
makeSplitsWithSchema(directoryName, partitionKeys, splits);
partitionKeys.erase(partition.first);
} else {
splits.emplace_back(
makeSplit(entry.path().string(), partitionKeys, std::nullopt));
}
}
}

std::vector<Split> makeSplits(
const std::vector<RowVectorPtr>& inputs,
const std::string& path,
Expand All @@ -52,16 +99,24 @@ std::vector<Split> makeSplits(

std::vector<Split> makeSplits(const std::string& directory) {
std::vector<Split> splits;
for (auto const& p : std::filesystem::directory_iterator{directory}) {
VELOX_CHECK(!p.is_directory());
splits.emplace_back(makeSplit(p.path().string()));
}
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
makeSplitsWithSchema(directory, partitionKeys, splits);
return splits;
}

Split makeSplit(const std::string& filePath) {
Split makeSplit(
const std::string& filePath,
const std::unordered_map<std::string, std::optional<std::string>>&
partitionKeys,
std::optional<int32_t> tableBucketNumber) {
return Split{std::make_shared<connector::hive::HiveConnectorSplit>(
kHiveConnectorId, filePath, dwio::common::FileFormat::DWRF)};
kHiveConnectorId,
filePath,
dwio::common::FileFormat::DWRF,
0,
std::numeric_limits<uint64_t>::max(),
partitionKeys,
tableBucketNumber)};
}

std::shared_ptr<connector::ConnectorSplit> makeConnectorSplit(
Expand Down
17 changes: 12 additions & 5 deletions velox/exec/fuzzer/FuzzerUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,21 @@ std::vector<Split> makeSplits(
const std::string& path,
const std::shared_ptr<memory::MemoryPool>& writerPool);

/// Create splits from files in a directory. Don't support nested directory.
///
/// TODO: Add support for nested directory, need to parse directory information
/// into split schema.
/// Create splits with schema information from a directory.
/// For example under directory /table1:
/// /table1/p0=0/p1=0/00000_file1
/// /table1/p0=0/p1=1/00000_file1
/// It would return splits:
/// split1 with partition keys (p0, 0), (p1, 0)
/// split2 with partition keys (p0, 0), (p1, 1)
std::vector<Split> makeSplits(const std::string& directory);

/// Create a split from an exsiting file.
Split makeSplit(const std::string& filePath);
Split makeSplit(
const std::string& filePath,
const std::unordered_map<std::string, std::optional<std::string>>&
partitionKeys = {},
std::optional<int32_t> tableBucketNumber = std::nullopt);

/// Create a connector split from an exsiting file.
std::shared_ptr<connector::ConnectorSplit> makeConnectorSplit(
Expand Down
75 changes: 62 additions & 13 deletions velox/exec/fuzzer/WriterFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "velox/common/base/Fs.h"
#include "velox/common/encode/Base64.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/exec/fuzzer/FuzzerUtil.h"
#include "velox/exec/fuzzer/PrestoQueryRunner.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
Expand Down Expand Up @@ -53,6 +55,8 @@ DEFINE_double(
"Chance of adding a null value in a vector "
"(expressed as double from 0 to 1).");

using namespace facebook::velox::connector::hive;

namespace facebook::velox::exec::test {

namespace {
Expand Down Expand Up @@ -108,11 +112,21 @@ class WriterFuzzer {

void verifyWriter(
const std::vector<RowVectorPtr>& input,
const std::vector<std::string>& names,
const std::vector<TypePtr>& types,
int32_t partitionOffset,
int32_t bucketCount,
const std::vector<std::string>& bucketColumns,
const std::vector<std::string>& partitionKeys,
const std::string& outputDirectoryPath);

// Generate table column handles based on table column properties
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
getTableColumnHandles(
const std::vector<std::string>& names,
const std::vector<TypePtr>& types,
int32_t partitionOffset);

// Executes velox query plan and returns the result.
RowVectorPtr execute(
const core::PlanNodePtr& plan,
Expand Down Expand Up @@ -233,6 +247,7 @@ void WriterFuzzer::go() {
std::vector<std::string> bucketColumns;
std::vector<std::string> partitionKeys;

// Regular table columns
generateColumns(5, "c", kRegularColumnTypes_, 2, names, types);

// 50% of times test bucketed write.
Expand All @@ -255,6 +270,9 @@ void WriterFuzzer::go() {
auto tempDirPath = exec::test::TempDirectoryPath::create();
verifyWriter(
input,
names,
types,
partitionOffset,
bucketCount,
bucketColumns,
partitionKeys,
Expand Down Expand Up @@ -322,7 +340,10 @@ std::vector<RowVectorPtr> WriterFuzzer::generateInputData(

void WriterFuzzer::verifyWriter(
const std::vector<RowVectorPtr>& input,
int32_t bucketCount,
const std::vector<std::string>& names,
const std::vector<TypePtr>& types,
const int32_t partitionOffset,
const int32_t bucketCount,
const std::vector<std::string>& bucketColumns,
const std::vector<std::string>& partitionKeys,
const std::string& outputDirectoryPath) {
Expand Down Expand Up @@ -368,22 +389,50 @@ void WriterFuzzer::verifyWriter(
}

// 3. Verifies data itself.
// TODO: verify partitioned (bucketed) once makeSplits support nested folders.
if (partitionKeys.empty()) {
auto splits = makeSplits(outputDirectoryPath);
auto readPlan =
PlanBuilder().tableScan(asRowType(input[0]->type())).planNode();
auto actual = execute(readPlan, maxDrivers, splits);
auto reference_data =
referenceQueryRunner_->execute("SELECT * FROM tmp_write");
VELOX_CHECK(
assertEqualResults(reference_data, {actual}),
"Velox and reference DB results don't match");
}
auto splits = makeSplits(outputDirectoryPath);
const auto columnHandles =
getTableColumnHandles(names, types, partitionOffset);
auto readPlan = PlanBuilder()
.tableScan(
asRowType(input[0]->type()),
{},
"",
asRowType(input[0]->type()),
columnHandles)
.project(names)
.planNode();
auto actual = execute(readPlan, maxDrivers, splits);
auto reference_data =
referenceQueryRunner_->execute("SELECT * FROM tmp_write");
VELOX_CHECK(
assertEqualResults(reference_data, {actual}),
"Velox and reference DB results don't match");

LOG(INFO) << "Verified results against reference DB";
}

std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
WriterFuzzer::getTableColumnHandles(
const std::vector<std::string>& names,
const std::vector<TypePtr>& types,
const int32_t partitionOffset) {
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
columnHandle;
for (int i = 0; i < names.size(); ++i) {
HiveColumnHandle::ColumnType columnType;
if (i < partitionOffset) {
columnType = HiveColumnHandle::ColumnType::kRegular;
} else {
columnType = HiveColumnHandle::ColumnType::kPartitionKey;
}
columnHandle.insert(
{names.at(i),
std::make_shared<HiveColumnHandle>(
names.at(i), columnType, types.at(i), types.at(i))});
}
return columnHandle;
}

RowVectorPtr WriterFuzzer::execute(
const core::PlanNodePtr& plan,
const int32_t maxDrivers,
Expand Down

0 comments on commit 102526b

Please sign in to comment.