Skip to content

Commit

Permalink
feat(fuzzer): Add TopNRowNumberFuzzer
Browse files Browse the repository at this point in the history
  • Loading branch information
aditi-pandit committed Feb 6, 2025
1 parent a7faee2 commit 04b0749
Show file tree
Hide file tree
Showing 12 changed files with 867 additions and 182 deletions.
19 changes: 17 additions & 2 deletions velox/exec/fuzzer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,35 @@ target_link_libraries(

add_library(velox_row_number_fuzzer_lib RowNumberFuzzer.cpp)

target_link_libraries(
velox_row_number_fuzzer_lib
velox_fuzzer_util
velox_type
velox_vector_fuzzer
velox_exec_test_lib
velox_expression_test_utility)

# RowNumber Fuzzer.
add_executable(velox_row_number_fuzzer RowNumberFuzzerRunner.cpp)

target_link_libraries(
velox_row_number_fuzzer velox_row_number_fuzzer_lib)

add_library(velox_topn_row_number_fuzzer_lib TopNRowNumberFuzzer.cpp)

target_link_libraries(
velox_row_number_fuzzer_lib
velox_topn_row_number_fuzzer_lib
velox_fuzzer_util
velox_type
velox_vector_fuzzer
velox_exec_test_lib
velox_expression_test_utility)

# TopNRowNumber Fuzzer.
add_executable(velox_topn_row_number_fuzzer TopNRowNumberFuzzerRunner.cpp)

target_link_libraries(
velox_topn_row_number_fuzzer velox_topn_row_number_fuzzer_lib)

add_library(velox_join_fuzzer JoinFuzzer.cpp)

target_link_libraries(
Expand Down
51 changes: 51 additions & 0 deletions velox/exec/fuzzer/DuckQueryRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ std::optional<std::string> DuckQueryRunner::toSql(
return toSql(rowNumberNode);
}

if (const auto topNRowNumberNode =
std::dynamic_pointer_cast<const core::TopNRowNumberNode>(plan)) {
return toSql(topNRowNumberNode);
}

if (const auto joinNode =
std::dynamic_pointer_cast<const core::HashJoinNode>(plan)) {
return toSql(joinNode);
Expand Down Expand Up @@ -377,4 +382,50 @@ std::optional<std::string> DuckQueryRunner::toSql(

return sql.str();
}

std::optional<std::string> DuckQueryRunner::toSql(
const std::shared_ptr<const core::TopNRowNumberNode>& topNRowNumberNode) {
std::stringstream sql;
sql << "SELECT * FROM (SELECT ";

const auto& inputType = topNRowNumberNode->sources()[0]->outputType();
for (auto i = 0; i < inputType->size(); ++i) {
appendComma(i, sql);
sql << inputType->nameOf(i);
}

sql << ", row_number() OVER (";

const auto& partitionKeys = topNRowNumberNode->partitionKeys();
if (!partitionKeys.empty()) {
sql << "partition by ";
for (auto i = 0; i < partitionKeys.size(); ++i) {
appendComma(i, sql);
sql << partitionKeys[i]->name();
}
}

const auto& sortingKeys = topNRowNumberNode->sortingKeys();
const auto& sortingOrders = topNRowNumberNode->sortingOrders();

if (!sortingKeys.empty()) {
sql << " ORDER BY ";
for (auto j = 0; j < sortingKeys.size(); ++j) {
appendComma(j, sql);
sql << sortingKeys[j]->name() << " " << sortingOrders[j].toString();
}
}

// TopNRowNumberNode should have a single source.
std::optional<std::string> source = toSql(topNRowNumberNode->sources()[0]);
if (!source) {
return std::nullopt;
}
sql << ") as row_number FROM " << *source << ") ";

sql << " where row_number <= " << topNRowNumberNode->limit();

return sql.str();
}

} // namespace facebook::velox::exec::test
3 changes: 3 additions & 0 deletions velox/exec/fuzzer/DuckQueryRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class DuckQueryRunner : public ReferenceQueryRunner {
std::optional<std::string> toSql(
const std::shared_ptr<const core::RowNumberNode>& rowNumberNode);

std::optional<std::string> toSql(
const std::shared_ptr<const core::TopNRowNumberNode>& topNRowNumberNode);

std::unordered_set<std::string> aggregateFunctionNames_;
};

Expand Down
108 changes: 108 additions & 0 deletions velox/exec/fuzzer/FuzzerUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/dwio/catalog/fbhive/FileUtils.h"
#include "velox/dwio/dwrf/RegisterDwrfReader.h"
#include "velox/dwio/dwrf/writer/Writer.h"
#include "velox/exec/fuzzer/DuckQueryRunner.h"
#include "velox/exec/fuzzer/PrestoQueryRunner.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/expression/SignatureBinder.h"
#include "velox/functions/prestosql/types/IPPrefixType.h"
#include "velox/serializers/CompactRowSerializer.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/serializers/UnsafeRowSerializer.h"

using namespace facebook::velox::dwio::catalog::fbhive;

Expand Down Expand Up @@ -192,6 +198,18 @@ RowTypePtr concat(const RowTypePtr& a, const RowTypePtr& b) {
return ROW(std::move(names), std::move(types));
}

std::vector<RowVectorPtr> flatten(const std::vector<RowVectorPtr>& vectors) {
std::vector<RowVectorPtr> flatVectors;
for (const auto& vector : vectors) {
auto flat = BaseVector::create<RowVector>(
vector->type(), vector->size(), vector->pool());
flat->copy(vector.get(), 0, 0, vector->size());
flatVectors.push_back(flat);
}

return flatVectors;
}

// Sometimes we generate zero-column input of type ROW({}) or a column of type
// UNKNOWN(). Such data cannot be written to a file and therefore cannot
// be tested with TableScan.
Expand All @@ -205,6 +223,11 @@ bool isTableScanSupported(const TypePtr& type) {
if (type->kind() == TypeKind::HUGEINT) {
return false;
}
// Disable testing with TableScan when input contains TIMESTAMP type, due to
// the issue #8127.
if (type->kind() == TypeKind::TIMESTAMP) {
return false;
}

for (auto i = 0; i < type->size(); ++i) {
if (!isTableScanSupported(type->childAt(i))) {
Expand Down Expand Up @@ -380,6 +403,26 @@ void registerHiveConnector(
connector::registerConnector(hiveConnector);
}

void setupReadWrite() {
filesystems::registerLocalFileSystem();
dwrf::registerDwrfReaderFactory();

if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kPresto)) {
serializer::presto::PrestoVectorSerde::registerNamedVectorSerde();
}
if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kCompactRow)) {
serializer::CompactRowVectorSerde::registerNamedVectorSerde();
}
if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kUnsafeRow)) {
serializer::spark::UnsafeRowVectorSerde::registerNamedVectorSerde();
}

// Make sure not to run out of open file descriptors.
std::unordered_map<std::string, std::string> hiveConfig = {
{connector::hive::HiveConfig::kNumCacheFileHandles, "1000"}};
registerHiveConnector(hiveConfig);
}

std::unique_ptr<ReferenceQueryRunner> setupReferenceQueryRunner(
memory::MemoryPool* aggregatePool,
const std::string& prestoUrl,
Expand Down Expand Up @@ -421,4 +464,69 @@ computeReferenceResultsAsVector(
ReferenceQueryRunner* referenceQueryRunner) {
return referenceQueryRunner->executeAndReturnVector(plan);
}

RowVectorPtr execute(
const PlanWithSplits& plan,
const std::shared_ptr<memory::MemoryPool>& pool,
bool injectSpill,
bool injectOOM,
const std::optional<std::string>& spillConfig,
int maxSpillLevel) {
LOG(INFO) << "Executing query plan: " << plan.plan->toString(true, true);

AssertQueryBuilder builder(plan.plan);
if (!plan.splits.empty()) {
builder.splits(plan.splits);
}

int32_t spillPct{0};
if (injectSpill) {
VELOX_CHECK(
spillConfig.has_value(),
"Spill config not set for execute with spilling");
VELOX_CHECK_GE(
maxSpillLevel, 0, "Max spill should be set for execute with spilling");
std::shared_ptr<TempDirectoryPath> spillDirectory;
spillDirectory = exec::test::TempDirectoryPath::create();
builder.config(core::QueryConfig::kSpillEnabled, true)
.config(core::QueryConfig::kMaxSpillLevel, maxSpillLevel)
.config(spillConfig.value(), true)
.spillDirectory(spillDirectory->getPath());
spillPct = 10;
}

ScopedOOMInjector oomInjector(
[]() -> bool { return folly::Random::oneIn(10); },
10); // Check the condition every 10 ms.
if (injectOOM) {
oomInjector.enable();
}

// Wait for the task to be destroyed before start next query execution to
// avoid the potential interference of the background activities across query
// executions.
auto stopGuard = folly::makeGuard([&]() { waitForAllTasksToBeDeleted(); });

TestScopedSpillInjection scopedSpillInjection(spillPct);
RowVectorPtr result;
try {
result = builder.copyResults(pool.get());
} catch (VeloxRuntimeError& e) {
if (injectOOM &&
e.errorCode() == facebook::velox::error_code::kMemCapExceeded &&
e.message() == ScopedOOMInjector::kErrorMessage) {
// If we enabled OOM injection we expect the exception thrown by the
// ScopedOOMInjector.
return nullptr;
}

throw e;
}

if (VLOG_IS_ON(1)) {
VLOG(1) << std::endl << result->toString(0, result->size());
}

return result;
}
} // namespace facebook::velox::exec::test
28 changes: 27 additions & 1 deletion velox/exec/fuzzer/FuzzerUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ struct SortingKeyAndOrder {
: key_(std::move(key)), sortOrder_(std::move(sortOrder)) {}
};

struct PlanWithSplits {
core::PlanNodePtr plan;
std::vector<Split> splits;

explicit PlanWithSplits(
core::PlanNodePtr _plan,
const std::vector<Split>& _splits = {})
: plan(std::move(_plan)), splits(_splits) {}
};

/// Write the vector to the path.
void writeToFile(
const std::string& path,
Expand Down Expand Up @@ -86,6 +96,9 @@ bool isTableScanSupported(const TypePtr& type);
/// Concat two RowTypes.
RowTypePtr concat(const RowTypePtr& a, const RowTypePtr& b);

/// Flatten the input vectors for printing.
std::vector<RowVectorPtr> flatten(const std::vector<RowVectorPtr>& vectors);

/// Skip queries that use Timestamp, Varbinary, and IntervalDayTime types.
/// DuckDB doesn't support nanosecond precision for timestamps or casting from
/// Bigint to Interval.
Expand Down Expand Up @@ -119,12 +132,16 @@ TypePtr sanitizeTryResolveType(
const std::unordered_map<std::string, TypePtr>& typeVariablesBindings,
std::unordered_map<std::string, int>& integerVariablesBindings);

// Invoked to set up memory system with arbitration.
/// Invoked to set up memory system with arbitration.
void setupMemory(
int64_t allocatorCapacity,
int64_t arbitratorCapacity,
bool enableGlobalArbitration = true);

/// Sets up the Dwrf reader/writer, serializers and Hive connector for the
/// fuzzers.
void setupReadWrite();

/// Registers hive connector with configs. It should be called in the
/// constructor of fuzzers that test plans with TableScan or uses
/// PrestoQueryRunner that writes data to a local file.
Expand Down Expand Up @@ -157,4 +174,13 @@ computeReferenceResultsAsVector(
const core::PlanNodePtr& plan,
ReferenceQueryRunner* referenceQueryRunner);

/// Executes a plan with spilling and oom injection possibly.
RowVectorPtr execute(
const PlanWithSplits& plan,
const std::shared_ptr<memory::MemoryPool>& pool,
bool injectSpill,
bool injectOOM,
const std::optional<std::string>& spillConfig = std::nullopt,
int maxSpillLevel = -1);

} // namespace facebook::velox::exec::test
54 changes: 54 additions & 0 deletions velox/exec/fuzzer/PrestoQueryRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ std::optional<std::string> PrestoQueryRunner::toSql(
return toSql(rowNumberNode);
}

if (const auto topNRowNumberNode =
std::dynamic_pointer_cast<const core::TopNRowNumberNode>(plan)) {
return toSql(topNRowNumberNode);
}

if (auto tableWriteNode =
std::dynamic_pointer_cast<const core::TableWriteNode>(plan)) {
return toSql(tableWriteNode);
Expand Down Expand Up @@ -498,6 +503,55 @@ std::optional<std::string> PrestoQueryRunner::toSql(
return sql.str();
}

std::optional<std::string> PrestoQueryRunner::toSql(
const std::shared_ptr<const core::TopNRowNumberNode>& topNRowNumberNode) {
if (!isSupportedDwrfType(topNRowNumberNode->sources()[0]->outputType())) {
return std::nullopt;
}

std::stringstream sql;
sql << "SELECT * FROM (SELECT ";

const auto& inputType = topNRowNumberNode->sources()[0]->outputType();
for (auto i = 0; i < inputType->size(); ++i) {
appendComma(i, sql);
sql << inputType->nameOf(i);
}

sql << ", row_number() OVER (";

const auto& partitionKeys = topNRowNumberNode->partitionKeys();
if (!partitionKeys.empty()) {
sql << "partition by ";
for (auto i = 0; i < partitionKeys.size(); ++i) {
appendComma(i, sql);
sql << partitionKeys[i]->name();
}
}

const auto& sortingKeys = topNRowNumberNode->sortingKeys();
const auto& sortingOrders = topNRowNumberNode->sortingOrders();

if (!sortingKeys.empty()) {
sql << " ORDER BY ";
for (auto j = 0; j < sortingKeys.size(); ++j) {
appendComma(j, sql);
sql << sortingKeys[j]->name() << " " << sortingOrders[j].toString();
}
}

// TopNRowNumberNode should have a single source.
std::optional<std::string> source = toSql(topNRowNumberNode->sources()[0]);
if (!source) {
return std::nullopt;
}
sql << ") as row_number FROM " << *source << ") ";

sql << " where row_number <= " << topNRowNumberNode->limit();

return sql.str();
}

std::optional<std::string> PrestoQueryRunner::toSql(
const std::shared_ptr<const core::TableWriteNode>& tableWriteNode) {
auto insertTableHandle =
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/fuzzer/PrestoQueryRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner {
std::optional<std::string> toSql(
const std::shared_ptr<const velox::core::RowNumberNode>& rowNumberNode);

std::optional<std::string> toSql(
const std::shared_ptr<const core::TopNRowNumberNode>& rowNumberNode);

std::optional<std::string> toSql(
const std::shared_ptr<const core::TableWriteNode>& tableWriteNode);

Expand Down
Loading

0 comments on commit 04b0749

Please sign in to comment.