Skip to content

Commit

Permalink
Add bucket verification in TableWriter Fuzzer (#10039)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: #10039

Reviewed By: xiaoxmeng

Differential Revision: D58141952

Pulled By: kewang1024

fbshipit-source-id: e80a93029e6df947354af68818ce0c1f467149eb
  • Loading branch information
kewang1024 authored and facebook-github-bot committed Jun 6, 2024
1 parent 4845e90 commit 7637d56
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 41 deletions.
7 changes: 4 additions & 3 deletions velox/docs/develop/testing/writer-fuzzer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@
Writer Fuzzer
=============

Writer fuzzer tests table write plan with up to 5 regular columns and
up to 3 partition keys.
Writer fuzzer tests table write plan with up to 5 regular columns, up to
3 partition keys and up to 3 bucket columns.

At each iteration, fuzzer randomly generate a table write plan with different
table properties, as of now, only support partitioned and unpartitioned table.
table properties including un-partitioned and partitioned, non-bucketed and bucketed.

The fuzzer then generates inputs and runs the query plan and compares the
results with PrestoDB.
As of now, we compare:
1. How many rows were written.
2. Output directories have the same directory layout and hierarchy.
3. Same data were written by velox and prestoDB.

How to run
----------
Expand Down
20 changes: 17 additions & 3 deletions velox/exec/fuzzer/PrestoQueryRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,8 @@ std::optional<std::string> PrestoQueryRunner::toSql(

// Returns a CTAS sql with specified table properties from TableWriteNode,
// example sql:
// CREATE TABLE tmp_write WITH (PARTITIONED_BY = ARRAY['p0'])
// AS SELECT * FROM tmp
// CREATE TABLE tmp_write WITH (PARTITIONED_BY = ARRAY['p0'], BUCKETED_COUNT =
// 20, BUCKETED_BY = ARRAY['b0', 'b1']) AS SELECT * FROM tmp
std::stringstream sql;
sql << "CREATE TABLE tmp_write";
std::vector<std::string> partitionKeys;
Expand All @@ -558,7 +558,21 @@ std::optional<std::string> PrestoQueryRunner::toSql(
appendComma(i, sql);
sql << "'" << partitionKeys[i] << "'";
}
sql << "])";
sql << "]";

if (insertTableHandle->bucketProperty() != nullptr) {
const auto bucketCount =
insertTableHandle->bucketProperty()->bucketCount();
const auto bucketColumns =
insertTableHandle->bucketProperty()->bucketedBy();
sql << ", BUCKET_COUNT = " << bucketCount << ", BUCKETED_BY = ARRAY[";
for (int i = 0; i < bucketColumns.size(); ++i) {
appendComma(i, sql);
sql << "'" << bucketColumns[i] << "'";
}
sql << "]";
}
sql << ")";
}

sql << " AS SELECT * FROM tmp";
Expand Down
141 changes: 106 additions & 35 deletions velox/exec/fuzzer/WriterFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class WriterFuzzer {

void verifyWriter(
const std::vector<RowVectorPtr>& input,
int32_t bucketCount,
const std::vector<std::string>& bucketColumns,
const std::vector<std::string>& partitionKeys,
const std::string& outputDirectoryPath);

Expand All @@ -122,13 +124,15 @@ class WriterFuzzer {
// Query Presto to find out table's location on disk.
std::string getReferenceOutputDirectoryPath(int32_t layers);

// Compares if two directories have same partitions
void comparePartitions(
// Compares if two directories have same partitions and each partition has
// same number of buckets.
void comparePartitionAndBucket(
const std::string& outputDirectoryPath,
const std::string& referenceOutputDirectoryPath);
const std::string& referenceOutputDirectoryPath,
int32_t bucketCount);

// Returns all the partition names in tableDirectoryPath.
std::set<std::string> getPartitionNames(
// Returns all the partition name and how many files in each partition.
std::map<std::string, int32_t> getPartitionNameAndFilecount(
const std::string& tableDirectoryPath);

const std::vector<TypePtr> kRegularColumnTypes_{
Expand All @@ -141,6 +145,17 @@ class WriterFuzzer {
VARBINARY(),
TIMESTAMP(),
};
// Supported bucket column types:
// https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketing.java#L142
const std::vector<TypePtr> kSupportedBucketColumnTypes_{
BOOLEAN(),
TINYINT(),
SMALLINT(),
INTEGER(),
BIGINT(),
VARCHAR(),
TIMESTAMP(),
};
// Supported partition key column types
// According to VectorHasher::typeKindSupportsValueIds and
// https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java#L575
Expand Down Expand Up @@ -215,19 +230,35 @@ void WriterFuzzer::go() {

std::vector<std::string> names;
std::vector<TypePtr> types;
std::vector<std::string> bucketColumns;
std::vector<std::string> partitionKeys;

generateColumns(5, "c", kRegularColumnTypes_, 2, names, types);
const auto partitionOffset = names.size();

// 50% of times test bucketed write.
int32_t bucketCount = 0;
if (vectorFuzzer_.coinToss(0.5)) {
bucketColumns = generateColumns(
5, "b", kSupportedBucketColumnTypes_, 1, names, types);
bucketCount =
boost::random::uniform_int_distribution<int32_t>(1, 3)(rng_);
}

// 50% of times test partitioned write.
const auto partitionOffset = names.size();
if (vectorFuzzer_.coinToss(0.5)) {
partitionKeys =
generateColumns(3, "p", kPartitionKeyTypes_, 1, names, types);
}
auto input = generateInputData(names, types, partitionOffset);

auto tempDirPath = exec::test::TempDirectoryPath::create();
verifyWriter(input, partitionKeys, tempDirPath->getPath());
verifyWriter(
input,
bucketCount,
bucketColumns,
partitionKeys,
tempDirPath->getPath());

LOG(INFO) << "==============================> Done with iteration "
<< iteration++;
Expand Down Expand Up @@ -263,9 +294,8 @@ std::vector<RowVectorPtr> WriterFuzzer::generateInputData(
auto inputType = ROW(std::move(names), std::move(types));
std::vector<RowVectorPtr> input;

// For partition keys, limit the distinct value to 4 to avoid exceeding
// partition number limit of 100. Since we could have up to 3 partition
// keys, it would generate up to 64 partitions.
// For partition keys, limit the distinct value to 4. Since we could have up
// to 3 partition keys, it would generate up to 64 partitions.
std::vector<VectorPtr> partitionValues;
for (auto i = partitionOffset; i < inputType->size(); ++i) {
partitionValues.push_back(vectorFuzzer_.fuzz(inputType->childAt(i), 4));
Expand All @@ -292,12 +322,16 @@ std::vector<RowVectorPtr> WriterFuzzer::generateInputData(

void WriterFuzzer::verifyWriter(
const std::vector<RowVectorPtr>& input,
int32_t bucketCount,
const std::vector<std::string>& bucketColumns,
const std::vector<std::string>& partitionKeys,
const std::string& outputDirectoryPath) {
const auto plan = PlanBuilder()
.values(input)
.tableWrite(outputDirectoryPath, partitionKeys)
.planNode();
const auto plan =
PlanBuilder()
.values(input)
.tableWrite(
outputDirectoryPath, partitionKeys, bucketCount, bucketColumns)
.planNode();

const auto maxDrivers =
boost::random::uniform_int_distribution<int32_t>(1, 16)(rng_);
Expand Down Expand Up @@ -325,11 +359,12 @@ void WriterFuzzer::verifyWriter(
assertEqualResults(expectedResult, plan->outputType(), {result}),
"Velox and reference DB results don't match");

// 2. Verifies directory layout.
// 2. Verifies directory layout for partitioned (bucketed) table.
if (!partitionKeys.empty()) {
const auto referencedOutputDirectoryPath =
getReferenceOutputDirectoryPath(partitionKeys.size());
comparePartitions(outputDirectoryPath, referencedOutputDirectoryPath);
comparePartitionAndBucket(
outputDirectoryPath, referencedOutputDirectoryPath, bucketCount);
}

// 3. Verifies data itself.
Expand Down Expand Up @@ -360,7 +395,12 @@ RowVectorPtr WriterFuzzer::execute(
if (!splits.empty()) {
builder.splits(splits);
}
return builder.maxDrivers(maxDrivers).copyResults(pool_.get());
return builder.maxDrivers(maxDrivers)
.connectorSessionProperty(
kHiveConnectorId,
connector::hive::HiveConfig::kMaxPartitionsPerWritersSession,
"400")
.copyResults(pool_.get());
}

RowVectorPtr WriterFuzzer::veloxToPrestoResult(const RowVectorPtr& result) {
Expand Down Expand Up @@ -390,47 +430,78 @@ std::string WriterFuzzer::getReferenceOutputDirectoryPath(int32_t layers) {
return tableDirectoryPath.string();
}

void WriterFuzzer::comparePartitions(
void WriterFuzzer::comparePartitionAndBucket(
const std::string& outputDirectoryPath,
const std::string& referenceOutputDirectoryPath) {
const auto partitions = getPartitionNames(outputDirectoryPath);
LOG(INFO) << "Velox written partitions:" << std::endl;
for (const std::string& partition : partitions) {
LOG(INFO) << partition << std::endl;
const std::string& referenceOutputDirectoryPath,
int32_t bucketCount) {
LOG(INFO) << "Velox output directory:" << outputDirectoryPath << std::endl;
const auto partitionNameAndFileCount =
getPartitionNameAndFilecount(outputDirectoryPath);
LOG(INFO) << "Partitions and file count:" << std::endl;
std::vector<std::string> partitionNames;
partitionNames.reserve(partitionNameAndFileCount.size());
for (const auto& i : partitionNameAndFileCount) {
LOG(INFO) << i.first << ":" << i.second << std::endl;
partitionNames.emplace_back(i.first);
}

const auto referencedPartitions =
getPartitionNames(referenceOutputDirectoryPath);
LOG(INFO) << "Presto written partitions:" << std::endl;
for (const std::string& partition : referencedPartitions) {
LOG(INFO) << partition << std::endl;
LOG(INFO) << "Presto output directory:" << referenceOutputDirectoryPath
<< std::endl;
const auto referencedPartitionNameAndFileCount =
getPartitionNameAndFilecount(referenceOutputDirectoryPath);
LOG(INFO) << "Partitions and file count:" << std::endl;
std::vector<std::string> referencePartitionNames;
referencePartitionNames.reserve(referencedPartitionNameAndFileCount.size());
for (const auto& i : referencedPartitionNameAndFileCount) {
LOG(INFO) << i.first << ":" << i.second << std::endl;
referencePartitionNames.emplace_back(i.first);
}

VELOX_CHECK(
partitions == referencedPartitions,
"Velox and reference DB output directory hierarchies don't match");
if (bucketCount == 0) {
// If not bucketed, only verify if their partition names match
VELOX_CHECK(
partitionNames == referencePartitionNames,
"Velox and reference DB output partitions don't match");
} else {
VELOX_CHECK(
partitionNameAndFileCount == referencedPartitionNameAndFileCount,
"Velox and reference DB output partition and bucket don't match");
}
}

std::set<std::string> WriterFuzzer::getPartitionNames(
// static
std::map<std::string, int32_t> WriterFuzzer::getPartitionNameAndFilecount(
const std::string& tableDirectoryPath) {
auto fileSystem = filesystems::getFileSystem("/", nullptr);
auto directories = listFolders(tableDirectoryPath);
std::set<std::string> partitionNames;
std::map<std::string, int32_t> partitionNameAndFileCount;

for (std::string directory : directories) {
// If it's a hidden directory, ignore
if (directory.find("/.") != std::string::npos) {
continue;
}

// Count non-empty non-hidden files
const auto files = fileSystem->list(directory);
int32_t fileCount = 0;
for (const auto& file : files) {
// Presto query runner sometime creates empty files, ignore those.
if (file.find("/.") == std::string::npos &&
fileSystem->openFileForRead(file)->size() > 0) {
fileCount++;
}
}

// Remove the path prefix to get the partition name
// For example: /test/tmp_write/p0=1/p1=2020
// partition name is /p0=1/p1=2020
directory.erase(0, fileSystem->extractPath(tableDirectoryPath).length());
partitionNames.insert(directory);

partitionNameAndFileCount.emplace(directory, fileCount);
}

return partitionNames;
return partitionNameAndFileCount;
}

} // namespace
Expand Down

0 comments on commit 7637d56

Please sign in to comment.