From 9ff293934e3c09e812e42668ef81ad69bc791fc7 Mon Sep 17 00:00:00 2001 From: Wei He Date: Fri, 8 Dec 2023 16:29:22 -0800 Subject: [PATCH] Extract AggregationFuzzerBase from AggregationFuzzer (#7916) Summary: Extract AggregationFuzzerBase from AggregationFuzzer. This is needed for building WindowFuzzer that reuses common logic in AggregaitonFuzzerBase. This is the first piece of https://github.com/facebookincubator/velox/issues/7754. Differential Revision: D51692940 --- velox/exec/tests/AggregationFuzzerTest.cpp | 205 +---- velox/exec/tests/CMakeLists.txt | 2 + .../exec/tests/SparkAggregationFuzzerTest.cpp | 4 +- velox/exec/tests/utils/AggregationFuzzer.cpp | 827 +----------------- velox/exec/tests/utils/AggregationFuzzer.h | 73 +- .../tests/utils/AggregationFuzzerBase.cpp | 650 ++++++++++++++ .../exec/tests/utils/AggregationFuzzerBase.h | 343 ++++++++ .../tests/utils/AggregationFuzzerRunner.h | 90 +- .../tests/utils/AggregationFuzzerUtils.cpp | 34 + .../exec/tests/utils/AggregationFuzzerUtils.h | 298 +++++++ velox/exec/tests/utils/CMakeLists.txt | 22 +- 11 files changed, 1407 insertions(+), 1141 deletions(-) create mode 100644 velox/exec/tests/utils/AggregationFuzzerBase.cpp create mode 100644 velox/exec/tests/utils/AggregationFuzzerBase.h create mode 100644 velox/exec/tests/utils/AggregationFuzzerUtils.cpp create mode 100644 velox/exec/tests/utils/AggregationFuzzerUtils.h diff --git a/velox/exec/tests/AggregationFuzzerTest.cpp b/velox/exec/tests/AggregationFuzzerTest.cpp index a0123261ea58c..4e764c4d08506 100644 --- a/velox/exec/tests/AggregationFuzzerTest.cpp +++ b/velox/exec/tests/AggregationFuzzerTest.cpp @@ -21,6 +21,7 @@ #include #include "velox/exec/tests/utils/AggregationFuzzerRunner.h" +#include "velox/exec/tests/utils/AggregationFuzzerUtils.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/DuckQueryRunner.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -46,207 +47,6 @@ DEFINE_string( namespace facebook::velox::exec::test { namespace { -class MinMaxInputGenerator : public InputGenerator { - public: - MinMaxInputGenerator(const std::string& name) : indexOfN_{indexOfN(name)} {} - - std::vector generate( - const std::vector& types, - VectorFuzzer& fuzzer, - FuzzerGenerator& rng, - memory::MemoryPool* pool) override { - // TODO Generate inputs free of nested nulls. - if (types.size() <= indexOfN_) { - return {}; - } - - // Make sure to use the same value of 'n' for all batches in a given Fuzzer - // iteration. - if (!n_.has_value()) { - n_ = boost::random::uniform_int_distribution(0, 9'999)(rng); - } - - const auto size = fuzzer.getOptions().vectorSize; - - std::vector inputs; - inputs.reserve(types.size()); - for (auto i = 0; i < types.size() - 1; ++i) { - inputs.push_back(fuzzer.fuzz(types[i])); - } - - VELOX_CHECK( - types.back()->isBigint(), - "Unexpected type: {}", - types.back()->toString()) - inputs.push_back( - BaseVector::createConstant(BIGINT(), n_.value(), size, pool)); - return inputs; - } - - void reset() override { - n_.reset(); - } - - private: - // Returns zero-based index of the 'n' argument, 1 for min and max. 2 for - // min_by and max_by. - static int32_t indexOfN(const std::string& name) { - if (name == "min" || name == "max") { - return 1; - } - - if (name == "min_by" || name == "max_by") { - return 2; - } - - VELOX_FAIL("Unexpected function name: {}", name) - } - - // Zero-based index of the 'n' argument. - const int32_t indexOfN_; - std::optional n_; -}; - -class ApproxDistinctInputGenerator : public InputGenerator { - public: - std::vector generate( - const std::vector& types, - VectorFuzzer& fuzzer, - FuzzerGenerator& rng, - memory::MemoryPool* pool) override { - if (types.size() != 2) { - return {}; - } - - // Make sure to use the same value of 'e' for all batches in a given Fuzzer - // iteration. - if (!e_.has_value()) { - // Generate value in [0.0040625, 0.26] range. - static constexpr double kMin = 0.0040625; - static constexpr double kMax = 0.26; - e_ = kMin + (kMax - kMin) * boost::random::uniform_01()(rng); - } - - const auto size = fuzzer.getOptions().vectorSize; - - VELOX_CHECK( - types.back()->isDouble(), - "Unexpected type: {}", - types.back()->toString()) - return { - fuzzer.fuzz(types[0]), - BaseVector::createConstant(DOUBLE(), e_.value(), size, pool)}; - } - - void reset() override { - e_.reset(); - } - - private: - std::optional e_; -}; - -class ApproxPercentileInputGenerator : public InputGenerator { - public: - std::vector generate( - const std::vector& types, - VectorFuzzer& fuzzer, - FuzzerGenerator& rng, - memory::MemoryPool* pool) override { - // The arguments are: x, [w], percentile(s), [accuracy]. - // - // First argument is always 'x'. If second argument's type is BIGINT, then - // it is 'w'. Otherwise, it is percentile(x). - - const auto size = fuzzer.getOptions().vectorSize; - - std::vector inputs; - inputs.reserve(types.size()); - inputs.push_back(fuzzer.fuzz(types[0])); - - if (types[1]->isBigint()) { - velox::test::VectorMaker vectorMaker{pool}; - auto weight = vectorMaker.flatVector(size, [&](auto row) { - return boost::random::uniform_int_distribution(1, 1'000)(rng); - }); - - inputs.push_back(weight); - } - - const int percentileTypeIndex = types[1]->isBigint() ? 2 : 1; - const TypePtr& percentileType = types[percentileTypeIndex]; - if (percentileType->isDouble()) { - if (!percentile_.has_value()) { - percentile_ = pickPercentile(fuzzer, rng); - } - - inputs.push_back(BaseVector::createConstant( - DOUBLE(), percentile_.value(), size, pool)); - } else { - VELOX_CHECK(percentileType->isArray()); - VELOX_CHECK(percentileType->childAt(0)->isDouble()); - - if (percentiles_.empty()) { - percentiles_.push_back(pickPercentile(fuzzer, rng)); - percentiles_.push_back(pickPercentile(fuzzer, rng)); - percentiles_.push_back(pickPercentile(fuzzer, rng)); - } - - auto arrayVector = - BaseVector::create(ARRAY(DOUBLE()), 1, pool); - auto elementsVector = arrayVector->elements()->asFlatVector(); - elementsVector->resize(percentiles_.size()); - for (auto i = 0; i < percentiles_.size(); ++i) { - elementsVector->set(i, percentiles_[i]); - } - arrayVector->setOffsetAndSize(0, 0, percentiles_.size()); - - inputs.push_back(BaseVector::wrapInConstant(size, 0, arrayVector)); - } - - if (types.size() > percentileTypeIndex + 1) { - // Last argument is 'accuracy'. - VELOX_CHECK(types.back()->isDouble()); - if (!accuracy_.has_value()) { - accuracy_ = boost::random::uniform_01()(rng); - } - - inputs.push_back( - BaseVector::createConstant(DOUBLE(), accuracy_.value(), size, pool)); - } - - return inputs; - } - - void reset() override { - percentile_.reset(); - percentiles_.clear(); - accuracy_.reset(); - } - - private: - double pickPercentile(VectorFuzzer& fuzzer, FuzzerGenerator& rng) { - // 10% of the times generate random value in [0, 1] range. - // 90% of the times use one of the common values. - if (fuzzer.coinToss(0.1)) { - return boost::random::uniform_01()(rng); - } - - static const std::vector kPercentiles = { - 0.1, 0.25, 0.5, 0.75, 0.90, 0.95, 0.99, 0.999, 0.9999}; - - const auto index = - boost::random::uniform_int_distribution()(rng) % - kPercentiles.size(); - - return kPercentiles[index]; - } - - std::optional percentile_; - std::vector percentiles_; - std::optional accuracy_; -}; - std::unordered_map> getCustomInputGenerators() { return { @@ -905,8 +705,9 @@ int main(int argc, char** argv) { }; using Runner = facebook::velox::exec::test::AggregationFuzzerRunner; + using Options = facebook::velox::exec::test::AggregationFuzzerOptions; - Runner::Options options; + Options options; options.onlyFunctions = FLAGS_only; options.skipFunctions = skipFunctions; options.customVerificationFunctions = customVerificationFunctions; diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 47517b75e994a..e0f6dcb104d33 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -204,6 +204,7 @@ add_executable(velox_aggregation_fuzzer_test AggregationFuzzerTest.cpp) target_link_libraries( velox_aggregation_fuzzer_test velox_aggregation_fuzzer + velox_aggregation_fuzzer_base velox_aggregates velox_window velox_vector_test_lib @@ -215,6 +216,7 @@ add_executable(spark_aggregation_fuzzer_test SparkAggregationFuzzerTest.cpp) target_link_libraries( spark_aggregation_fuzzer_test velox_aggregation_fuzzer + velox_aggregation_fuzzer_base velox_functions_spark_aggregates velox_window velox_vector_test_lib diff --git a/velox/exec/tests/SparkAggregationFuzzerTest.cpp b/velox/exec/tests/SparkAggregationFuzzerTest.cpp index dbcdfc2b9958e..2c84ea206840c 100644 --- a/velox/exec/tests/SparkAggregationFuzzerTest.cpp +++ b/velox/exec/tests/SparkAggregationFuzzerTest.cpp @@ -20,6 +20,7 @@ #include #include "velox/exec/tests/utils/AggregationFuzzerRunner.h" +#include "velox/exec/tests/utils/AggregationFuzzerUtils.h" #include "velox/exec/tests/utils/DuckQueryRunner.h" #include "velox/functions/sparksql/aggregates/Register.h" @@ -78,8 +79,9 @@ int main(int argc, char** argv) { }); using Runner = facebook::velox::exec::test::AggregationFuzzerRunner; + using Options = facebook::velox::exec::test::AggregationFuzzerOptions; - Runner::Options options; + Options options; options.onlyFunctions = FLAGS_only; options.skipFunctions = skipFunctions; options.customVerificationFunctions = customVerificationFunctions; diff --git a/velox/exec/tests/utils/AggregationFuzzer.cpp b/velox/exec/tests/utils/AggregationFuzzer.cpp index 20bfa5697bf5a..bd2cc1e32d1a6 100644 --- a/velox/exec/tests/utils/AggregationFuzzer.cpp +++ b/velox/exec/tests/utils/AggregationFuzzer.cpp @@ -30,76 +30,33 @@ #include "velox/expression/tests/utils/ArgumentTypeFuzzer.h" #include "velox/exec/PartitionFunction.h" +#include "velox/exec/tests/utils/AggregationFuzzerBase.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/expression/tests/utils/FuzzerToolkit.h" #include "velox/vector/VectorSaver.h" #include "velox/vector/fuzzer/VectorFuzzer.h" #include "velox/vector/tests/utils/VectorMaker.h" -DEFINE_int32(steps, 10, "Number of plans to generate and execute."); - -DEFINE_int32( - duration_sec, - 0, - "For how long it should run (in seconds). If zero, " - "it executes exactly --steps iterations and exits."); - -DEFINE_int32( - batch_size, - 100, - "The number of elements on each generated vector."); - -DEFINE_int32(num_batches, 10, "The number of generated vectors."); - -DEFINE_int32( - max_num_varargs, - 5, - "The maximum number of variadic arguments fuzzer will generate for " - "functions that accept variadic arguments. Fuzzer will generate up to " - "max_num_varargs arguments for the variadic list in addition to the " - "required arguments by the function."); - -DEFINE_double( - null_ratio, - 0.1, - "Chance of adding a null constant to the plan, or null value in a vector " - "(expressed as double from 0 to 1)."); - -DEFINE_string( - repro_persist_path, - "", - "Directory path for persistence of data and SQL when fuzzer fails for " - "future reproduction. Empty string disables this feature."); - -DEFINE_bool( - enable_window_reference_verification, - false, - "When true, the results of the window aggregation are compared to reference DB results"); - DEFINE_bool( enable_sorted_aggregations, false, "When true, generates plans with aggregations over sorted inputs"); DEFINE_bool( - persist_and_run_once, - false, - "Persist repro info before evaluation and only run one iteration. " - "This is to rerun with the seed number and persist repro info upon a " - "crash failure. Only effective if repro_persist_path is set."); - -DEFINE_bool( - log_signature_stats, + enable_window_reference_verification, false, - "Log statistics about function signatures"); + "When true, the results of the window aggregation are compared to reference DB results"); using facebook::velox::test::CallableSignature; using facebook::velox::test::SignatureTemplate; namespace facebook::velox::exec::test { + +class AggregationFuzzerBase; + namespace { -class AggregationFuzzer { +class AggregationFuzzer : public AggregationFuzzerBase { public: AggregationFuzzer( AggregateFunctionSignatureMap signatureMap, @@ -112,28 +69,10 @@ class AggregationFuzzer { const std::unordered_map& queryConfigs, std::unique_ptr referenceQueryRunner); - struct PlanWithSplits { - core::PlanNodePtr plan; - std::vector splits; - }; - void go(); void go(const std::string& planPath); private: - static inline const std::string kHiveConnectorId = "test-hive"; - - std::shared_ptr findInputGenerator( - const CallableSignature& signature); - - static exec::Split makeSplit(const std::string& filePath); - - std::vector makeSplits( - const std::vector& inputs, - const std::string& path); - - PlanWithSplits deserialize(const folly::dynamic& obj); - struct Stats { // Names of functions that were tested. std::unordered_set functionNames; @@ -177,66 +116,8 @@ class AggregationFuzzer { void print(size_t numIterations) const; }; - static VectorFuzzer::Options getFuzzerOptions( - VectorFuzzer::Options::TimestampPrecision timestampPrecision) { - VectorFuzzer::Options opts; - opts.vectorSize = FLAGS_batch_size; - opts.stringVariableLength = true; - opts.stringLength = 4'000; - opts.nullRatio = FLAGS_null_ratio; - opts.timestampPrecision = timestampPrecision; - return opts; - } - - void seed(size_t seed) { - currentSeed_ = seed; - vectorFuzzer_.reSeed(seed); - rng_.seed(currentSeed_); - } - - void reSeed() { - seed(rng_()); - } - - // Generate at least one and up to 5 scalar columns to be used as grouping, - // partition or sorting keys. - // Column names are generated using template 'N', where N is - // zero-based ordinal number of the column. - std::vector generateKeys( - const std::string& prefix, - std::vector& names, - std::vector& types); - - // Similar to generateKeys, but restricts types to orderable types (i.e. no - // maps). - std::vector generateSortingKeys( - const std::string& prefix, - std::vector& names, - std::vector& types); - - struct SignatureStats { - /// Number of times a signature was chosen. - size_t numRuns{0}; - - /// Number of times generated query plan failed. - size_t numFailed{0}; - }; - - std::pair pickSignature(); - - std::vector generateInputData( - std::vector names, - std::vector types, - const std::optional& signature); - - // Generate a RowVector of the given types of children with an additional - // child named "row_number" of BIGINT row numbers that differentiates every - // row. Row numbers start from 0. This additional input vector is needed for - // result verification of window aggregations. - std::vector generateInputDataWithRowNumber( - std::vector names, - std::vector types, - const CallableSignature& signature); + void updateReferenceQueryStats( + AggregationFuzzerBase::ReferenceQueryErrorCode ec); // Return 'true' if query plans failed. bool verifyWindow( @@ -265,17 +146,6 @@ class AggregationFuzzer { void verifyAggregation(const std::vector& plans); - std::optional computeReferenceResults( - const core::PlanNodePtr& plan, - const std::vector& input); - - velox::test::ResultOrError execute( - const core::PlanNodePtr& plan, - const std::vector& splits = {}, - bool injectSpill = false, - bool abandonPartial = false, - int32_t maxDrivers = 2); - static bool hasPartialGroupBy(const core::PlanNodePtr& plan) { auto partialAgg = core::PlanNode::findFirstNode( plan.get(), [](const core::PlanNode* node) { @@ -335,47 +205,6 @@ class AggregationFuzzer { } } - // @param customVerification If false, results are compared as is. Otherwise, - // only row counts are compared. - // @param customVerifiers Custom verifier for each aggregate function. These - // can be null. If not null and customVerification is true, custom verifier is - // used to further verify the results. - void testPlan( - const PlanWithSplits& planWithSplits, - bool injectSpill, - bool abandonPartial, - bool customVerification, - const std::vector>& customVerifiers, - const velox::test::ResultOrError& expected, - int32_t maxDrivers = 2); - - void printSignatureStats(); - - const std::unordered_map> - customVerificationFunctions_; - const std::unordered_map> - customInputGenerators_; - const std::unordered_map queryConfigs_; - const bool persistAndRunOnce_; - const std::string reproPersistPath_; - - std::unique_ptr referenceQueryRunner_; - - std::vector signatures_; - std::vector signatureTemplates_; - - // Stats for 'signatures_' and 'signatureTemplates_'. Stats for 'signatures_' - // come before stats for 'signatureTemplates_'. - std::vector signatureStats_; - - FuzzerGenerator rng_; - size_t currentSeed_{0}; - - std::shared_ptr rootPool_{ - memory::defaultMemoryManager().addRootPool()}; - std::shared_ptr pool_{rootPool_->addLeafChild("leaf")}; - VectorFuzzer vectorFuzzer_; - Stats stats_; }; } // namespace @@ -405,34 +234,9 @@ void aggregateFuzzer( namespace { -std::string printStat(size_t n, size_t total) { - return fmt::format("{} ({:.2f}%)", n, (double)n / total * 100); -} - -void printStats( - size_t numFunctions, - size_t numSignatures, - size_t numSupportedFunctions, - size_t numSupportedSignatures) { - LOG(INFO) << fmt::format( - "Total functions: {} ({} signatures)", numFunctions, numSignatures); - LOG(INFO) << "Functions with at least one supported signature: " - << printStat(numSupportedFunctions, numFunctions); - - size_t numNotSupportedFunctions = numFunctions - numSupportedFunctions; - LOG(INFO) << "Functions with no supported signature: " - << printStat(numNotSupportedFunctions, numFunctions); - LOG(INFO) << "Supported function signatures: " - << printStat(numSupportedSignatures, numSignatures); - - size_t numNotSupportedSignatures = numSignatures - numSupportedSignatures; - LOG(INFO) << "Unsupported function signatures: " - << printStat(numNotSupportedSignatures, numSignatures); -} - AggregationFuzzer::AggregationFuzzer( AggregateFunctionSignatureMap signatureMap, - size_t initialSeed, + size_t seed, const std::unordered_map>& customVerificationFunctions, const std::unordered_map>& @@ -440,21 +244,13 @@ AggregationFuzzer::AggregationFuzzer( VectorFuzzer::Options::TimestampPrecision timestampPrecision, const std::unordered_map& queryConfigs, std::unique_ptr referenceQueryRunner) - : customVerificationFunctions_{customVerificationFunctions}, - customInputGenerators_{customInputGenerators}, - queryConfigs_{queryConfigs}, - persistAndRunOnce_{FLAGS_persist_and_run_once}, - reproPersistPath_{FLAGS_repro_persist_path}, - referenceQueryRunner_{std::move(referenceQueryRunner)}, - vectorFuzzer_{getFuzzerOptions(timestampPrecision), pool_.get()} { - filesystems::registerLocalFileSystem(); - auto hiveConnector = - connector::getConnectorFactory( - connector::hive::HiveConnectorFactory::kHiveConnectorName) - ->newConnector(kHiveConnectorId, nullptr); - connector::registerConnector(hiveConnector); - - seed(initialSeed); + : AggregationFuzzerBase{ + seed, + customVerificationFunctions, + customInputGenerators, + timestampPrecision, + queryConfigs, + std::move(referenceQueryRunner)} { VELOX_CHECK(!signatureMap.empty(), "No function signatures available."); if (persistAndRunOnce_ && reproPersistPath_.empty()) { @@ -464,88 +260,8 @@ AggregationFuzzer::AggregationFuzzer( exit(1); } - size_t numFunctions = 0; - size_t numSignatures = 0; - size_t numSupportedFunctions = 0; - size_t numSupportedSignatures = 0; - - for (auto& [name, signatures] : signatureMap) { - ++numFunctions; - bool hasSupportedSignature = false; - for (auto& signature : signatures) { - ++numSignatures; - - if (signature->variableArity()) { - LOG(WARNING) << "Skipping variadic function signature: " << name - << signature->toString(); - continue; - } - - if (!signature->variables().empty()) { - bool skip = false; - std::unordered_set typeVariables; - for (auto& [variableName, variable] : signature->variables()) { - if (variable.isIntegerParameter()) { - LOG(WARNING) << "Skipping generic function signature: " << name - << signature->toString(); - skip = true; - break; - } - - typeVariables.insert(variableName); - } - if (skip) { - continue; - } - - signatureTemplates_.push_back( - {name, signature.get(), std::move(typeVariables)}); - } else { - CallableSignature callable{ - .name = name, - .args = {}, - .returnType = SignatureBinder::tryResolveType( - signature->returnType(), {}, {}), - .constantArgs = {}}; - VELOX_CHECK_NOT_NULL(callable.returnType); - - // Process each argument and figure out its type. - for (const auto& arg : signature->argumentTypes()) { - auto resolvedType = SignatureBinder::tryResolveType(arg, {}, {}); - VELOX_CHECK_NOT_NULL(resolvedType); - - // SignatureBinder::tryResolveType produces ROW types with empty field - // names. These won't work with TableScan. - if (resolvedType->isRow()) { - std::vector names; - for (auto i = 0; i < resolvedType->size(); ++i) { - names.push_back(fmt::format("field{}", i)); - } - - std::vector types = resolvedType->asRow().children(); - - resolvedType = ROW(std::move(names), std::move(types)); - } - - callable.args.emplace_back(resolvedType); - } - - signatures_.emplace_back(callable); - } - - ++numSupportedSignatures; - hasSupportedSignature = true; - } - if (hasSupportedSignature) { - ++numSupportedFunctions; - } - } - - printStats( - numFunctions, - numSignatures, - numSupportedFunctions, - numSupportedSignatures); + addAggregationSignatures(signatureMap); + printStats(functionsStats); sortCallableSignatures(signatures_); sortSignatureTemplates(signatureTemplates_); @@ -553,271 +269,6 @@ AggregationFuzzer::AggregationFuzzer( signatureStats_.resize(signatures_.size() + signatureTemplates_.size()); } -template -bool isDone(size_t i, T startTime) { - if (FLAGS_duration_sec > 0) { - std::chrono::duration elapsed = - std::chrono::system_clock::now() - startTime; - return elapsed.count() >= FLAGS_duration_sec; - } - return i >= FLAGS_steps; -} - -std::string makeFunctionCall( - const std::string& name, - const std::vector& argNames, - bool sortedInputs) { - std::ostringstream call; - call << name << "(" << folly::join(", ", argNames); - if (sortedInputs) { - call << " ORDER BY " << folly::join(", ", argNames); - } - call << ")"; - - return call.str(); -} - -std::vector makeNames(size_t n) { - std::vector names; - for (auto i = 0; i < n; ++i) { - names.push_back(fmt::format("c{}", i)); - } - return names; -} - -folly::dynamic serialize( - const AggregationFuzzer::PlanWithSplits& planWithSplits, - const std::string& dirPath, - std::unordered_map& filePaths) { - folly::dynamic obj = folly::dynamic::object(); - obj["plan"] = planWithSplits.plan->serialize(); - if (planWithSplits.splits.empty()) { - return obj; - } - - folly::dynamic jsonSplits = folly::dynamic::array(); - jsonSplits.reserve(planWithSplits.splits.size()); - for (const auto& split : planWithSplits.splits) { - const auto filePath = - std::dynamic_pointer_cast( - split.connectorSplit) - ->filePath; - if (filePaths.count(filePath) == 0) { - const auto newFilePath = fmt::format("{}/{}", dirPath, filePaths.size()); - fs::copy(filePath, newFilePath); - filePaths.insert({filePath, newFilePath}); - } - jsonSplits.push_back(filePaths.at(filePath)); - } - obj["splits"] = jsonSplits; - return obj; -} - -void persistReproInfo( - const std::vector& plans, - const std::string& basePath) { - if (!common::generateFileDirectory(basePath.c_str())) { - return; - } - - // Create a new directory - const auto dirPathOptional = - common::generateTempFolderPath(basePath.c_str(), "aggregationVerifier"); - if (!dirPathOptional.has_value()) { - LOG(ERROR) - << "Failed to create directory for persisting plans using base path: " - << basePath; - return; - } - - const auto dirPath = dirPathOptional.value(); - - // Save plans and splits. - const std::string planPath = fmt::format("{}/{}", dirPath, kPlanNodeFileName); - std::unordered_map filePaths; - try { - folly::dynamic array = folly::dynamic::array(); - array.reserve(plans.size()); - for (auto planWithSplits : plans) { - array.push_back(serialize(planWithSplits, dirPath, filePaths)); - } - auto planJson = folly::toJson(array); - saveStringToFile(planJson, planPath.c_str()); - LOG(INFO) << "Persisted aggregation plans to " << planPath; - } catch (std::exception& e) { - LOG(ERROR) << "Failed to store aggregation plans to " << planPath << ": " - << e.what(); - } -} - -std::pair -AggregationFuzzer::pickSignature() { - size_t idx = boost::random::uniform_int_distribution( - 0, signatures_.size() + signatureTemplates_.size() - 1)(rng_); - CallableSignature signature; - if (idx < signatures_.size()) { - signature = signatures_[idx]; - } else { - const auto& signatureTemplate = - signatureTemplates_[idx - signatures_.size()]; - signature.name = signatureTemplate.name; - velox::test::ArgumentTypeFuzzer typeFuzzer( - *signatureTemplate.signature, rng_); - VELOX_CHECK(typeFuzzer.fuzzArgumentTypes(FLAGS_max_num_varargs)); - signature.args = typeFuzzer.argumentTypes(); - } - - return {signature, signatureStats_[idx]}; -} - -std::vector AggregationFuzzer::generateKeys( - const std::string& prefix, - std::vector& names, - std::vector& types) { - static const std::vector kNonFloatingPointTypes{ - BOOLEAN(), - TINYINT(), - SMALLINT(), - INTEGER(), - BIGINT(), - VARCHAR(), - VARBINARY(), - TIMESTAMP(), - }; - - auto numKeys = boost::random::uniform_int_distribution(1, 5)(rng_); - std::vector keys; - for (auto i = 0; i < numKeys; ++i) { - keys.push_back(fmt::format("{}{}", prefix, i)); - - // Pick random, possibly complex, type. - types.push_back(vectorFuzzer_.randType(kNonFloatingPointTypes, 2)); - names.push_back(keys.back()); - } - return keys; -} - -std::vector AggregationFuzzer::generateSortingKeys( - const std::string& prefix, - std::vector& names, - std::vector& types) { - auto numKeys = boost::random::uniform_int_distribution(1, 5)(rng_); - std::vector keys; - for (auto i = 0; i < numKeys; ++i) { - keys.push_back(fmt::format("{}{}", prefix, i)); - - // Pick random, possibly complex, type. - types.push_back(vectorFuzzer_.randOrderableType(2)); - names.push_back(keys.back()); - } - return keys; -} - -std::shared_ptr AggregationFuzzer::findInputGenerator( - const CallableSignature& signature) { - auto generatorIt = customInputGenerators_.find(signature.name); - if (generatorIt != customInputGenerators_.end()) { - return generatorIt->second; - } - - return nullptr; -} - -std::vector AggregationFuzzer::generateInputData( - std::vector names, - std::vector types, - const std::optional& signature) { - std::shared_ptr generator; - if (signature.has_value()) { - generator = findInputGenerator(signature.value()); - } - - const auto size = vectorFuzzer_.getOptions().vectorSize; - - auto inputType = ROW(std::move(names), std::move(types)); - std::vector input; - for (auto i = 0; i < FLAGS_num_batches; ++i) { - std::vector children; - - if (generator != nullptr) { - children = generator->generate( - signature->args, vectorFuzzer_, rng_, pool_.get()); - } - - for (auto i = children.size(); i < inputType->size(); ++i) { - children.push_back(vectorFuzzer_.fuzz(inputType->childAt(i), size)); - } - - input.push_back(std::make_shared( - pool_.get(), inputType, nullptr, size, std::move(children))); - } - - if (generator != nullptr) { - generator->reset(); - } - - return input; -} - -std::vector AggregationFuzzer::generateInputDataWithRowNumber( - std::vector names, - std::vector types, - const CallableSignature& signature) { - names.push_back("row_number"); - types.push_back(BIGINT()); - - auto generator = findInputGenerator(signature); - - std::vector input; - auto size = vectorFuzzer_.getOptions().vectorSize; - velox::test::VectorMaker vectorMaker{pool_.get()}; - int64_t rowNumber = 0; - for (auto j = 0; j < FLAGS_num_batches; ++j) { - std::vector children; - - if (generator != nullptr) { - children = - generator->generate(signature.args, vectorFuzzer_, rng_, pool_.get()); - } - - for (auto i = children.size(); i < types.size() - 1; ++i) { - children.push_back(vectorFuzzer_.fuzz(types[i], size)); - } - children.push_back(vectorMaker.flatVector( - size, [&](auto /*row*/) { return rowNumber++; })); - input.push_back(vectorMaker.rowVector(names, children)); - } - - if (generator != nullptr) { - generator->reset(); - } - - return input; -} - -// static -exec::Split AggregationFuzzer::makeSplit(const std::string& filePath) { - return exec::Split{std::make_shared( - kHiveConnectorId, filePath, dwio::common::FileFormat::DWRF)}; -} - -AggregationFuzzer::PlanWithSplits AggregationFuzzer::deserialize( - const folly::dynamic& obj) { - auto plan = velox::ISerializable::deserialize( - obj["plan"], pool_.get()); - - std::vector splits; - if (obj.count("splits") > 0) { - auto paths = - ISerializable::deserialize>(obj["splits"]); - for (const auto& path : paths) { - splits.push_back(makeSplit(path)); - } - } - - return PlanWithSplits{plan, splits}; -} - void AggregationFuzzer::go(const std::string& planPath) { Type::registerSerDe(); connector::hive::HiveTableHandle::registerSerDe(); @@ -992,103 +443,6 @@ void AggregationFuzzer::go() { printSignatureStats(); } -void AggregationFuzzer::printSignatureStats() { - if (!FLAGS_log_signature_stats) { - return; - } - - for (auto i = 0; i < signatureStats_.size(); ++i) { - const auto& stats = signatureStats_[i]; - if (stats.numRuns == 0) { - continue; - } - - if (stats.numFailed * 1.0 / stats.numRuns < 0.5) { - continue; - } - - if (i < signatures_.size()) { - LOG(INFO) << "Signature #" << i << " failed " << stats.numFailed - << " out of " << stats.numRuns - << " times: " << signatures_[i].toString(); - } else { - const auto& signatureTemplate = - signatureTemplates_[i - signatures_.size()]; - LOG(INFO) << "Signature template #" << i << " failed " << stats.numFailed - << " out of " << stats.numRuns - << " times: " << signatureTemplate.name << "(" - << signatureTemplate.signature->toString() << ")"; - } - } -} - -velox::test::ResultOrError AggregationFuzzer::execute( - const core::PlanNodePtr& plan, - const std::vector& splits, - bool injectSpill, - bool abandonPartial, - int32_t maxDrivers) { - LOG(INFO) << "Executing query plan: " << std::endl - << plan->toString(true, true); - - velox::test::ResultOrError resultOrError; - try { - std::shared_ptr spillDirectory; - AssertQueryBuilder builder(plan); - - builder.configs(queryConfigs_); - - if (injectSpill) { - spillDirectory = exec::test::TempDirectoryPath::create(); - builder.spillDirectory(spillDirectory->path) - .config(core::QueryConfig::kSpillEnabled, "true") - .config(core::QueryConfig::kAggregationSpillEnabled, "true") - .config(core::QueryConfig::kTestingSpillPct, "100"); - } - - if (abandonPartial) { - builder.config(core::QueryConfig::kAbandonPartialAggregationMinRows, "1") - .config(core::QueryConfig::kAbandonPartialAggregationMinPct, "0") - .config(core::QueryConfig::kMaxPartialAggregationMemory, "0") - .config(core::QueryConfig::kMaxExtendedPartialAggregationMemory, "0"); - } - - if (!splits.empty()) { - builder.splits(splits); - } - - resultOrError.result = - builder.maxDrivers(maxDrivers).copyResults(pool_.get()); - } catch (VeloxUserError& e) { - // NOTE: velox user exception is accepted as it is caused by the invalid - // fuzzer test inputs. - resultOrError.exceptionPtr = std::current_exception(); - } - - return resultOrError; -} - -std::optional -AggregationFuzzer::computeReferenceResults( - const core::PlanNodePtr& plan, - const std::vector& input) { - if (auto sql = referenceQueryRunner_->toSql(plan)) { - try { - return referenceQueryRunner_->execute( - sql.value(), input, plan->outputType()); - } catch (std::exception& e) { - ++stats_.numReferenceQueryFailed; - LOG(WARNING) << "Query failed in the reference DB"; - return std::nullopt; - } - } else { - LOG(INFO) << "Query not supported by the reference DB"; - ++stats_.numVerificationNotSupported; - } - - return std::nullopt; -} - void makeAlternativePlansWithValues( const std::vector& groupingKeys, const std::vector& aggregates, @@ -1263,57 +617,12 @@ void makeStreamingPlansWithTableScan( .planNode()); } -void AggregationFuzzer::testPlan( - const PlanWithSplits& planWithSplits, - bool injectSpill, - bool abandonPartial, - bool customVerification, - const std::vector>& customVerifiers, - const velox::test::ResultOrError& expected, - int32_t maxDrivers) { - auto actual = execute( - planWithSplits.plan, - planWithSplits.splits, - injectSpill, - abandonPartial, - maxDrivers); - - // Compare results or exceptions (if any). Fail is anything is different. - if (expected.exceptionPtr || actual.exceptionPtr) { - // Throws in case exceptions are not compatible. - velox::test::compareExceptions(expected.exceptionPtr, actual.exceptionPtr); - return; - } - - if (!customVerification) { - VELOX_CHECK( - assertEqualResults({expected.result}, {actual.result}), - "Logically equivalent plans produced different results"); - return; - } - - VELOX_CHECK_EQ( - expected.result->size(), - actual.result->size(), - "Logically equivalent plans produced different number of rows"); - - for (auto& verifier : customVerifiers) { - if (verifier == nullptr) { - continue; - } - - if (verifier->supportsCompare()) { - VELOX_CHECK( - verifier->compare(expected.result, actual.result), - "Logically equivalent plans produced different results"); - } else if (verifier->supportsVerify()) { - VELOX_CHECK( - verifier->verify(actual.result), - "Result of a logically equivalent plan failed custom verification"); - } else { - VELOX_UNREACHABLE( - "Custom verifier must support either 'compare' or 'verify' API."); - } +void AggregationFuzzer::updateReferenceQueryStats( + AggregationFuzzerBase::ReferenceQueryErrorCode ec) { + if (ec == ReferenceQueryErrorCode::kReferenceQueryFail) { + ++stats_.numReferenceQueryFailed; + } else if (ec == ReferenceQueryErrorCode::kReferenceQueryUnsupported) { + ++stats_.numVerificationNotSupported; } } @@ -1348,7 +657,9 @@ bool AggregationFuzzer::verifyWindow( if (!customVerification && enableWindowVerification) { if (resultOrError.result) { - if (auto expectedResult = computeReferenceResults(plan, input)) { + auto referenceResult = computeReferenceResults(plan, input); + updateReferenceQueryStats(referenceResult.second); + if (auto expectedResult = referenceResult.first) { ++stats_.numVerified; VELOX_CHECK( assertEqualResults( @@ -1372,54 +683,6 @@ bool AggregationFuzzer::verifyWindow( } } -namespace { -void writeToFile( - const std::string& path, - const VectorPtr& vector, - memory::MemoryPool* pool) { - dwrf::WriterOptions options; - options.schema = vector->type(); - options.memoryPool = pool; - auto writeFile = std::make_unique(path, true, false); - auto sink = - std::make_unique(std::move(writeFile), path); - dwrf::Writer writer(std::move(sink), options); - writer.write(vector); - writer.close(); -} - -bool isTableScanSupported(const TypePtr& type) { - if (type->kind() == TypeKind::ROW && type->size() == 0) { - return false; - } - if (type->kind() == TypeKind::UNKNOWN) { - return false; - } - - for (auto i = 0; i < type->size(); ++i) { - if (!isTableScanSupported(type->childAt(i))) { - return false; - } - } - - return true; -} -} // namespace - -std::vector AggregationFuzzer::makeSplits( - const std::vector& inputs, - const std::string& path) { - std::vector splits; - auto writerPool = rootPool_->addAggregateChild("writer"); - for (auto i = 0; i < inputs.size(); ++i) { - const std::string filePath = fmt::format("{}/{}", path, i); - writeToFile(filePath, inputs[i], writerPool.get()); - splits.push_back(makeSplit(filePath)); - } - - return splits; -} - void resetCustomVerifiers( const std::vector>& customVerifiers) { for (auto& verifier : customVerifiers) { @@ -1541,7 +804,9 @@ bool AggregationFuzzer::verifyAggregation( std::optional expectedResult; if (resultOrError.result != nullptr) { if (!customVerification) { - expectedResult = computeReferenceResults(firstPlan, input); + auto referenceResult = computeReferenceResults(firstPlan, input); + updateReferenceQueryStats(referenceResult.second); + expectedResult = referenceResult.first; } else { ++stats_.numVerificationSkipped; @@ -1592,7 +857,9 @@ bool AggregationFuzzer::verifySortedAggregation( ++stats_.numFailed; } - auto expectedResult = computeReferenceResults(firstPlan, input); + auto referenceResult = computeReferenceResults(firstPlan, input); + updateReferenceQueryStats(referenceResult.second); + auto expectedResult = referenceResult.first; if (expectedResult && resultOrError.result) { ++stats_.numVerified; VELOX_CHECK( @@ -1694,7 +961,9 @@ void AggregationFuzzer::verifyAggregation( std::optional expectedResult; if (!customVerification) { - expectedResult = computeReferenceResults(plan, input); + auto referenceResult = computeReferenceResults(plan, input); + updateReferenceQueryStats(referenceResult.second); + expectedResult = referenceResult.first; } if (expectedResult && resultOrError.result) { @@ -1713,26 +982,26 @@ void AggregationFuzzer::verifyAggregation( void AggregationFuzzer::Stats::print(size_t numIterations) const { LOG(INFO) << "Total functions tested: " << functionNames.size(); LOG(INFO) << "Total masked aggregations: " - << printStat(numMask, numIterations); + << printPercentageStat(numMask, numIterations); LOG(INFO) << "Total global aggregations: " - << printStat(numGlobal, numIterations); + << printPercentageStat(numGlobal, numIterations); LOG(INFO) << "Total group-by aggregations: " - << printStat(numGroupBy, numIterations); + << printPercentageStat(numGroupBy, numIterations); LOG(INFO) << "Total distinct aggregations: " - << printStat(numDistinct, numIterations); + << printPercentageStat(numDistinct, numIterations); LOG(INFO) << "Total aggregations over sorted inputs: " - << printStat(numSortedInputs, numIterations); + << printPercentageStat(numSortedInputs, numIterations); LOG(INFO) << "Total window expressions: " - << printStat(numWindow, numIterations); + << printPercentageStat(numWindow, numIterations); LOG(INFO) << "Total aggregations verified against reference DB: " - << printStat(numVerified, numIterations); + << printPercentageStat(numVerified, numIterations); LOG(INFO) << "Total aggregations not verified (non-deterministic function / not supported by reference DB / reference DB failed): " - << printStat(numVerificationSkipped, numIterations) << " / " - << printStat(numVerificationNotSupported, numIterations) << " / " - << printStat(numReferenceQueryFailed, numIterations); + << printPercentageStat(numVerificationSkipped, numIterations) << " / " + << printPercentageStat(numVerificationNotSupported, numIterations) + << " / " << printPercentageStat(numReferenceQueryFailed, numIterations); LOG(INFO) << "Total failed aggregations: " - << printStat(numFailed, numIterations); + << printPercentageStat(numFailed, numIterations); } } // namespace diff --git a/velox/exec/tests/utils/AggregationFuzzer.h b/velox/exec/tests/utils/AggregationFuzzer.h index 28be9e6a7109c..4a5e831c6dea8 100644 --- a/velox/exec/tests/utils/AggregationFuzzer.h +++ b/velox/exec/tests/utils/AggregationFuzzer.h @@ -16,82 +16,12 @@ #pragma once #include "velox/exec/Aggregate.h" +#include "velox/exec/tests/utils/AggregationFuzzerBase.h" #include "velox/exec/tests/utils/ReferenceQueryRunner.h" #include "velox/vector/fuzzer/VectorFuzzer.h" namespace facebook::velox::exec::test { -static constexpr const std::string_view kPlanNodeFileName = "plan_nodes"; - -class InputGenerator { - public: - virtual ~InputGenerator() = default; - - /// Generates function inputs of the specified types. May return an empty list - /// to let Fuzzer use randomly-generated inputs. - virtual std::vector generate( - const std::vector& types, - VectorFuzzer& fuzzer, - FuzzerGenerator& rng, - memory::MemoryPool* pool) = 0; - - /// Called after generating all inputs for a given fuzzer iteration. - virtual void reset() = 0; -}; - -/// Verifies aggregation results either directly or by comparing with results -/// from a logically equivalent plan or reference DB. -/// -/// Can be used to sort results of array_agg before comparing (uses 'compare' -/// API) or verify approx_distinct by comparing its results with results of -/// count(distinct) (uses 'verify' API). -class ResultVerifier { - public: - virtual ~ResultVerifier() = default; - - /// Returns true if 'compare' API is supported. The verifier must support - /// either 'compare' or 'verify' API. If both are supported, 'compare' API is - /// used and 'verify' API is ignored. - virtual bool supportsCompare() = 0; - - /// Return true if 'verify' API is support. The verifier must support either - /// 'compare' or 'verify' API. - virtual bool supportsVerify() = 0; - - /// Called once before possibly multiple calls to 'compare' or 'verify' APIs - /// to specify the input data, grouping keys (may be empty), the aggregate - /// function and the name of the column that will store aggregate function - /// results. - /// - /// Can be used by array_distinct verifier to compute count(distinct) once and - /// re-use its results for multiple 'verify' calls. - virtual void initialize( - const std::vector& input, - const std::vector& groupingKeys, - const core::AggregationNode::Aggregate& aggregate, - const std::string& aggregateName) = 0; - - /// Compares results of two logically equivalent Velox plans or a Velox plan - /// and a reference DB query. - /// - /// 'initialize' must be called first. 'compare' may be called multiple times - /// after single 'initialize' call. - virtual bool compare( - const RowVectorPtr& result, - const RowVectorPtr& otherResult) = 0; - - /// Verifies results of a Velox plan or reference DB query. - /// - /// 'initialize' must be called first. 'verify' may be called multiple times - /// after single 'initialize' call. - virtual bool verify(const RowVectorPtr& result) = 0; - - /// Clears internal state after possibly multiple calls to 'compare' and - /// 'verify'. 'initialize' must be called again after 'reset' to allow calling - /// 'compare' or 'verify' again. - virtual void reset() = 0; -}; - /// Runs the aggregation fuzzer. /// @param signatureMap Map of all aggregate function signatures. /// @param seed Random seed - Pass the same seed for reproducibility. @@ -112,4 +42,5 @@ void aggregateFuzzer( const std::unordered_map& queryConfigs, const std::optional& planPath, std::unique_ptr referenceQueryRunner); + } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/AggregationFuzzerBase.cpp b/velox/exec/tests/utils/AggregationFuzzerBase.cpp new file mode 100644 index 0000000000000..3df52f56bb85d --- /dev/null +++ b/velox/exec/tests/utils/AggregationFuzzerBase.cpp @@ -0,0 +1,650 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/exec/tests/utils/AggregationFuzzerBase.h" + +#include +#include "velox/common/base/Fs.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/TableHandle.h" +#include "velox/dwio/dwrf/reader/DwrfReader.h" +#include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/expression/SignatureBinder.h" +#include "velox/expression/tests/utils/ArgumentTypeFuzzer.h" +#include "velox/vector/VectorSaver.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" + +DEFINE_int32(steps, 10, "Number of plans to generate and execute."); + +DEFINE_int32( + duration_sec, + 0, + "For how long it should run (in seconds). If zero, " + "it executes exactly --steps iterations and exits."); + +DEFINE_int32( + batch_size, + 100, + "The number of elements on each generated vector."); + +DEFINE_int32(num_batches, 10, "The number of generated vectors."); + +DEFINE_int32( + max_num_varargs, + 5, + "The maximum number of variadic arguments fuzzer will generate for " + "functions that accept variadic arguments. Fuzzer will generate up to " + "max_num_varargs arguments for the variadic list in addition to the " + "required arguments by the function."); + +DEFINE_double( + null_ratio, + 0.1, + "Chance of adding a null constant to the plan, or null value in a vector " + "(expressed as double from 0 to 1)."); + +DEFINE_string( + repro_persist_path, + "", + "Directory path for persistence of data and SQL when fuzzer fails for " + "future reproduction. Empty string disables this feature."); + +DEFINE_bool( + persist_and_run_once, + false, + "Persist repro info before evaluation and only run one iteration. " + "This is to rerun with the seed number and persist repro info upon a " + "crash failure. Only effective if repro_persist_path is set."); + +DEFINE_bool( + log_signature_stats, + false, + "Log statistics about function signatures"); + +namespace facebook::velox::exec::test { + +bool AggregationFuzzerBase::addSignature( + const std::string& name, + const FunctionSignaturePtr& signature) { + ++functionsStats.numSignatures; + + if (signature->variableArity()) { + LOG(WARNING) << "Skipping variadic function signature: " << name + << signature->toString(); + return false; + } + + if (!signature->variables().empty()) { + bool skip = false; + std::unordered_set typeVariables; + for (auto& [variableName, variable] : signature->variables()) { + if (variable.isIntegerParameter()) { + LOG(WARNING) << "Skipping generic function signature: " << name + << signature->toString(); + skip = true; + break; + } + + typeVariables.insert(variableName); + } + if (skip) { + return false; + } + + signatureTemplates_.push_back( + {name, signature.get(), std::move(typeVariables)}); + } else { + CallableSignature callable{ + .name = name, + .args = {}, + .returnType = + SignatureBinder::tryResolveType(signature->returnType(), {}, {}), + .constantArgs = {}}; + VELOX_CHECK_NOT_NULL(callable.returnType); + + // Process each argument and figure out its type. + for (const auto& arg : signature->argumentTypes()) { + auto resolvedType = SignatureBinder::tryResolveType(arg, {}, {}); + VELOX_CHECK_NOT_NULL(resolvedType); + + // SignatureBinder::tryResolveType produces ROW types with empty + // field names. These won't work with TableScan. + if (resolvedType->isRow()) { + std::vector names; + for (auto i = 0; i < resolvedType->size(); ++i) { + names.push_back(fmt::format("field{}", i)); + } + + std::vector types = resolvedType->asRow().children(); + + resolvedType = ROW(std::move(names), std::move(types)); + } + + callable.args.emplace_back(resolvedType); + } + + signatures_.emplace_back(callable); + } + + ++functionsStats.numSupportedSignatures; + return true; +} + +void AggregationFuzzerBase::addAggregationSignatures( + const AggregateFunctionSignatureMap& signatureMap) { + for (auto& [name, signatures] : signatureMap) { + ++functionsStats.numFunctions; + bool hasSupportedSignature = false; + for (auto& signature : signatures) { + hasSupportedSignature |= addSignature(name, signature); + } + if (hasSupportedSignature) { + ++functionsStats.numSupportedFunctions; + } + } +} + +std::pair +AggregationFuzzerBase::pickSignature() { + size_t idx = boost::random::uniform_int_distribution( + 0, signatures_.size() + signatureTemplates_.size() - 1)(rng_); + CallableSignature signature; + if (idx < signatures_.size()) { + signature = signatures_[idx]; + } else { + const auto& signatureTemplate = + signatureTemplates_[idx - signatures_.size()]; + signature.name = signatureTemplate.name; + velox::test::ArgumentTypeFuzzer typeFuzzer( + *signatureTemplate.signature, rng_); + VELOX_CHECK(typeFuzzer.fuzzArgumentTypes(FLAGS_max_num_varargs)); + signature.args = typeFuzzer.argumentTypes(); + } + + return {signature, signatureStats_[idx]}; +} + +std::vector AggregationFuzzerBase::generateKeys( + const std::string& prefix, + std::vector& names, + std::vector& types) { + static const std::vector kNonFloatingPointTypes{ + BOOLEAN(), + TINYINT(), + SMALLINT(), + INTEGER(), + BIGINT(), + VARCHAR(), + VARBINARY(), + TIMESTAMP(), + }; + + auto numKeys = boost::random::uniform_int_distribution(1, 5)(rng_); + std::vector keys; + for (auto i = 0; i < numKeys; ++i) { + keys.push_back(fmt::format("{}{}", prefix, i)); + + // Pick random, possibly complex, type. + types.push_back(vectorFuzzer_.randType(kNonFloatingPointTypes, 2)); + names.push_back(keys.back()); + } + return keys; +} + +std::vector AggregationFuzzerBase::generateSortingKeys( + const std::string& prefix, + std::vector& names, + std::vector& types) { + std::vector keys; + auto numKeys = boost::random::uniform_int_distribution(1, 5)(rng_); + for (auto i = 0; i < numKeys; ++i) { + keys.push_back(fmt::format("{}{}", prefix, i)); + + // Pick random, possibly complex, type. + types.push_back(vectorFuzzer_.randOrderableType(2)); + names.push_back(keys.back()); + } + + return keys; +} + +std::shared_ptr AggregationFuzzerBase::findInputGenerator( + const CallableSignature& signature) { + auto generatorIt = customInputGenerators_.find(signature.name); + if (generatorIt != customInputGenerators_.end()) { + return generatorIt->second; + } + + return nullptr; +} + +std::vector AggregationFuzzerBase::generateInputData( + std::vector names, + std::vector types, + const std::optional& signature) { + std::shared_ptr generator; + if (signature.has_value()) { + generator = findInputGenerator(signature.value()); + } + + const auto size = vectorFuzzer_.getOptions().vectorSize; + + auto inputType = ROW(std::move(names), std::move(types)); + std::vector input; + for (auto i = 0; i < FLAGS_num_batches; ++i) { + std::vector children; + + if (generator != nullptr) { + children = generator->generate( + signature->args, vectorFuzzer_, rng_, pool_.get()); + } + + for (auto i = children.size(); i < inputType->size(); ++i) { + children.push_back(vectorFuzzer_.fuzz(inputType->childAt(i), size)); + } + + input.push_back(std::make_shared( + pool_.get(), inputType, nullptr, size, std::move(children))); + } + + if (generator != nullptr) { + generator->reset(); + } + + return input; +} + +std::vector AggregationFuzzerBase::generateInputDataWithRowNumber( + std::vector names, + std::vector types, + const CallableSignature& signature) { + names.push_back("row_number"); + types.push_back(BIGINT()); + + auto generator = findInputGenerator(signature); + + std::vector input; + auto size = vectorFuzzer_.getOptions().vectorSize; + velox::test::VectorMaker vectorMaker{pool_.get()}; + int64_t rowNumber = 0; + for (auto j = 0; j < FLAGS_num_batches; ++j) { + std::vector children; + + if (generator != nullptr) { + children = + generator->generate(signature.args, vectorFuzzer_, rng_, pool_.get()); + } + + for (auto i = children.size(); i < types.size() - 1; ++i) { + children.push_back(vectorFuzzer_.fuzz(types[i], size)); + } + children.push_back(vectorMaker.flatVector( + size, [&](auto /*row*/) { return rowNumber++; })); + input.push_back(vectorMaker.rowVector(names, children)); + } + + if (generator != nullptr) { + generator->reset(); + } + + return input; +} + +// static +exec::Split AggregationFuzzerBase::makeSplit(const std::string& filePath) { + return exec::Split{std::make_shared( + kHiveConnectorId, filePath, dwio::common::FileFormat::DWRF)}; +} + +AggregationFuzzerBase::PlanWithSplits AggregationFuzzerBase::deserialize( + const folly::dynamic& obj) { + auto plan = velox::ISerializable::deserialize( + obj["plan"], pool_.get()); + + std::vector splits; + if (obj.count("splits") > 0) { + auto paths = + ISerializable::deserialize>(obj["splits"]); + for (const auto& path : paths) { + splits.push_back(makeSplit(path)); + } + } + + return PlanWithSplits{plan, splits}; +} + +void AggregationFuzzerBase::printSignatureStats() { + if (!FLAGS_log_signature_stats) { + return; + } + + for (auto i = 0; i < signatureStats_.size(); ++i) { + const auto& stats = signatureStats_[i]; + if (stats.numRuns == 0) { + continue; + } + + if (stats.numFailed * 1.0 / stats.numRuns < 0.5) { + continue; + } + + if (i < signatures_.size()) { + LOG(INFO) << "Signature #" << i << " failed " << stats.numFailed + << " out of " << stats.numRuns + << " times: " << signatures_[i].toString(); + } else { + const auto& signatureTemplate = + signatureTemplates_[i - signatures_.size()]; + LOG(INFO) << "Signature template #" << i << " failed " << stats.numFailed + << " out of " << stats.numRuns + << " times: " << signatureTemplate.name << "(" + << signatureTemplate.signature->toString() << ")"; + } + } +} + +velox::test::ResultOrError AggregationFuzzerBase::execute( + const core::PlanNodePtr& plan, + const std::vector& splits, + bool injectSpill, + bool abandonPartial, + int32_t maxDrivers) { + LOG(INFO) << "Executing query plan: " << std::endl + << plan->toString(true, true); + + velox::test::ResultOrError resultOrError; + try { + std::shared_ptr spillDirectory; + AssertQueryBuilder builder(plan); + + builder.configs(queryConfigs_); + + if (injectSpill) { + spillDirectory = exec::test::TempDirectoryPath::create(); + builder.spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kAggregationSpillEnabled, "true") + .config(core::QueryConfig::kTestingSpillPct, "100"); + } + + if (abandonPartial) { + builder.config(core::QueryConfig::kAbandonPartialAggregationMinRows, "1") + .config(core::QueryConfig::kAbandonPartialAggregationMinPct, "0") + .config(core::QueryConfig::kMaxPartialAggregationMemory, "0") + .config(core::QueryConfig::kMaxExtendedPartialAggregationMemory, "0"); + } + + if (!splits.empty()) { + builder.splits(splits); + } + + resultOrError.result = + builder.maxDrivers(maxDrivers).copyResults(pool_.get()); + } catch (VeloxUserError& e) { + // NOTE: velox user exception is accepted as it is caused by the invalid + // fuzzer test inputs. + resultOrError.exceptionPtr = std::current_exception(); + } + + return resultOrError; +} + +std::pair< + std::optional, + AggregationFuzzerBase::ReferenceQueryErrorCode> +AggregationFuzzerBase::computeReferenceResults( + const core::PlanNodePtr& plan, + const std::vector& input) { + if (auto sql = referenceQueryRunner_->toSql(plan)) { + try { + return std::make_pair( + referenceQueryRunner_->execute( + sql.value(), input, plan->outputType()), + ReferenceQueryErrorCode::kSuccess); + } catch (std::exception& e) { + // ++stats_.numReferenceQueryFailed; + LOG(WARNING) << "Query failed in the reference DB"; + return std::make_pair( + std::nullopt, ReferenceQueryErrorCode::kReferenceQueryFail); + } + } else { + LOG(INFO) << "Query not supported by the reference DB"; + // ++stats_.numVerificationNotSupported; + } + + return std::make_pair( + std::nullopt, ReferenceQueryErrorCode::kReferenceQueryUnsupported); +} + +void AggregationFuzzerBase::testPlan( + const PlanWithSplits& planWithSplits, + bool injectSpill, + bool abandonPartial, + bool customVerification, + const std::vector>& customVerifiers, + const velox::test::ResultOrError& expected, + int32_t maxDrivers) { + auto actual = execute( + planWithSplits.plan, + planWithSplits.splits, + injectSpill, + abandonPartial, + maxDrivers); + + // Compare results or exceptions (if any). Fail is anything is different. + if (expected.exceptionPtr || actual.exceptionPtr) { + // Throws in case exceptions are not compatible. + velox::test::compareExceptions(expected.exceptionPtr, actual.exceptionPtr); + return; + } + + if (!customVerification) { + VELOX_CHECK( + assertEqualResults({expected.result}, {actual.result}), + "Logically equivalent plans produced different results"); + return; + } + + VELOX_CHECK_EQ( + expected.result->size(), + actual.result->size(), + "Logically equivalent plans produced different number of rows"); + + for (auto& verifier : customVerifiers) { + if (verifier == nullptr) { + continue; + } + + if (verifier->supportsCompare()) { + VELOX_CHECK( + verifier->compare(expected.result, actual.result), + "Logically equivalent plans produced different results"); + } else if (verifier->supportsVerify()) { + VELOX_CHECK( + verifier->verify(actual.result), + "Result of a logically equivalent plan failed custom verification"); + } else { + VELOX_UNREACHABLE( + "Custom verifier must support either 'compare' or 'verify' API."); + } + } +} + +namespace { +void writeToFile( + const std::string& path, + const VectorPtr& vector, + memory::MemoryPool* pool) { + dwrf::WriterOptions options; + options.schema = vector->type(); + options.memoryPool = pool; + auto writeFile = std::make_unique(path, true, false); + auto sink = + std::make_unique(std::move(writeFile), path); + dwrf::Writer writer(std::move(sink), options); + writer.write(vector); + writer.close(); +} +} // namespace + +bool isTableScanSupported(const TypePtr& type) { + if (type->kind() == TypeKind::ROW && type->size() == 0) { + return false; + } + if (type->kind() == TypeKind::UNKNOWN) { + return false; + } + + for (auto i = 0; i < type->size(); ++i) { + if (!isTableScanSupported(type->childAt(i))) { + return false; + } + } + + return true; +} + +std::vector AggregationFuzzerBase::makeSplits( + const std::vector& inputs, + const std::string& path) { + std::vector splits; + auto writerPool = rootPool_->addAggregateChild("writer"); + for (auto i = 0; i < inputs.size(); ++i) { + const std::string filePath = fmt::format("{}/{}", path, i); + writeToFile(filePath, inputs[i], writerPool.get()); + splits.push_back(makeSplit(filePath)); + } + + return splits; +} + +std::string printPercentageStat(size_t n, size_t total) { + return fmt::format("{} ({:.2f}%)", n, (double)n / total * 100); +} + +void printStats(const AggregationFuzzerBase::FunctionsStats& stats) { + LOG(INFO) << fmt::format( + "Total functions: {} ({} signatures)", + stats.numFunctions, + stats.numSignatures); + LOG(INFO) << "Functions with at least one supported signature: " + << printPercentageStat( + stats.numSupportedFunctions, stats.numFunctions); + + size_t numNotSupportedFunctions = + stats.numFunctions - stats.numSupportedFunctions; + LOG(INFO) << "Functions with no supported signature: " + << printPercentageStat( + numNotSupportedFunctions, stats.numFunctions); + LOG(INFO) << "Supported function signatures: " + << printPercentageStat( + stats.numSupportedSignatures, stats.numSignatures); + + size_t numNotSupportedSignatures = + stats.numSignatures - stats.numSupportedSignatures; + LOG(INFO) << "Unsupported function signatures: " + << printPercentageStat( + numNotSupportedSignatures, stats.numSignatures); +} + +std::string makeFunctionCall( + const std::string& name, + const std::vector& argNames, + bool sortedInputs) { + std::ostringstream call; + call << name << "(" << folly::join(", ", argNames); + if (sortedInputs) { + call << " ORDER BY " << folly::join(", ", argNames); + } + call << ")"; + + return call.str(); +} + +std::vector makeNames(size_t n) { + std::vector names; + for (auto i = 0; i < n; ++i) { + names.push_back(fmt::format("c{}", i)); + } + return names; +} + +folly::dynamic serialize( + const AggregationFuzzerBase::PlanWithSplits& planWithSplits, + const std::string& dirPath, + std::unordered_map& filePaths) { + folly::dynamic obj = folly::dynamic::object(); + obj["plan"] = planWithSplits.plan->serialize(); + if (planWithSplits.splits.empty()) { + return obj; + } + + folly::dynamic jsonSplits = folly::dynamic::array(); + jsonSplits.reserve(planWithSplits.splits.size()); + for (const auto& split : planWithSplits.splits) { + const auto filePath = + std::dynamic_pointer_cast( + split.connectorSplit) + ->filePath; + if (filePaths.count(filePath) == 0) { + const auto newFilePath = fmt::format("{}/{}", dirPath, filePaths.size()); + fs::copy(filePath, newFilePath); + filePaths.insert({filePath, newFilePath}); + } + jsonSplits.push_back(filePaths.at(filePath)); + } + obj["splits"] = jsonSplits; + return obj; +} + +void persistReproInfo( + const std::vector& plans, + const std::string& basePath) { + if (!common::generateFileDirectory(basePath.c_str())) { + return; + } + + // Create a new directory + const auto dirPathOptional = + common::generateTempFolderPath(basePath.c_str(), "aggregationVerifier"); + if (!dirPathOptional.has_value()) { + LOG(ERROR) + << "Failed to create directory for persisting plans using base path: " + << basePath; + return; + } + + const auto dirPath = dirPathOptional.value(); + + // Save plans and splits. + const std::string planPath = fmt::format("{}/{}", dirPath, kPlanNodeFileName); + std::unordered_map filePaths; + try { + folly::dynamic array = folly::dynamic::array(); + array.reserve(plans.size()); + for (auto planWithSplits : plans) { + array.push_back(serialize(planWithSplits, dirPath, filePaths)); + } + auto planJson = folly::toJson(array); + saveStringToFile(planJson, planPath.c_str()); + LOG(INFO) << "Persisted aggregation plans to " << planPath; + } catch (std::exception& e) { + LOG(ERROR) << "Failed to store aggregation plans to " << planPath << ": " + << e.what(); + } +} + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/AggregationFuzzerBase.h b/velox/exec/tests/utils/AggregationFuzzerBase.h new file mode 100644 index 0000000000000..d57c2c8f508f2 --- /dev/null +++ b/velox/exec/tests/utils/AggregationFuzzerBase.h @@ -0,0 +1,343 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/exec/Aggregate.h" +#include "velox/exec/Split.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/ReferenceQueryRunner.h" +#include "velox/expression/tests/utils/FuzzerToolkit.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" +#include "velox/vector/tests/utils/VectorMaker.h" + +DECLARE_int32(steps); + +DECLARE_int32(duration_sec); + +DECLARE_int32(batch_size); + +DECLARE_int32(num_batches); + +DECLARE_int32(max_num_varargs); + +DECLARE_double(null_ratio); + +DECLARE_string(repro_persist_path); + +DECLARE_bool(persist_and_run_once); + +DECLARE_bool(log_signature_stats); + +using facebook::velox::test::CallableSignature; +using facebook::velox::test::SignatureTemplate; + +namespace facebook::velox::exec::test { + +constexpr const std::string_view kPlanNodeFileName = "plan_nodes"; + +class InputGenerator { + public: + virtual ~InputGenerator() = default; + + /// Generates function inputs of the specified types. May return an empty list + /// to let Fuzzer use randomly-generated inputs. + virtual std::vector generate( + const std::vector& types, + VectorFuzzer& fuzzer, + FuzzerGenerator& rng, + memory::MemoryPool* pool) = 0; + + /// Called after generating all inputs for a given fuzzer iteration. + virtual void reset() = 0; +}; + +/// Verifies aggregation results either directly or by comparing with results +/// from a logically equivalent plan or reference DB. +/// +/// Can be used to sort results of array_agg before comparing (uses 'compare' +/// API) or verify approx_distinct by comparing its results with results of +/// count(distinct) (uses 'verify' API). +class ResultVerifier { + public: + virtual ~ResultVerifier() = default; + + /// Returns true if 'compare' API is supported. The verifier must support + /// either 'compare' or 'verify' API. If both are supported, 'compare' API is + /// used and 'verify' API is ignored. + virtual bool supportsCompare() = 0; + + /// Return true if 'verify' API is support. The verifier must support either + /// 'compare' or 'verify' API. + virtual bool supportsVerify() = 0; + + /// Called once before possibly multiple calls to 'compare' or 'verify' APIs + /// to specify the input data, grouping keys (may be empty), the aggregate + /// function and the name of the column that will store aggregate function + /// results. + /// + /// Can be used by array_distinct verifier to compute count(distinct) once and + /// re-use its results for multiple 'verify' calls. + virtual void initialize( + const std::vector& input, + const std::vector& groupingKeys, + const core::AggregationNode::Aggregate& aggregate, + const std::string& aggregateName) = 0; + + /// Compares results of two logically equivalent Velox plans or a Velox plan + /// and a reference DB query. + /// + /// 'initialize' must be called first. 'compare' may be called multiple times + /// after single 'initialize' call. + virtual bool compare( + const RowVectorPtr& result, + const RowVectorPtr& otherResult) = 0; + + /// Verifies results of a Velox plan or reference DB query. + /// + /// 'initialize' must be called first. 'verify' may be called multiple times + /// after single 'initialize' call. + virtual bool verify(const RowVectorPtr& result) = 0; + + /// Clears internal state after possibly multiple calls to 'compare' and + /// 'verify'. 'initialize' must be called again after 'reset' to allow calling + /// 'compare' or 'verify' again. + virtual void reset() = 0; +}; + +class AggregationFuzzerBase { + public: + AggregationFuzzerBase( + size_t initialSeed, + const std::unordered_map>& + customVerificationFunctions, + const std::unordered_map>& + customInputGenerators, + VectorFuzzer::Options::TimestampPrecision timestampPrecision, + const std::unordered_map& queryConfigs, + std::unique_ptr referenceQueryRunner) + : customVerificationFunctions_{customVerificationFunctions}, + customInputGenerators_{customInputGenerators}, + queryConfigs_{queryConfigs}, + persistAndRunOnce_{FLAGS_persist_and_run_once}, + reproPersistPath_{FLAGS_repro_persist_path}, + referenceQueryRunner_{std::move(referenceQueryRunner)}, + vectorFuzzer_{getFuzzerOptions(timestampPrecision), pool_.get()} { + filesystems::registerLocalFileSystem(); + auto hiveConnector = + connector::getConnectorFactory( + connector::hive::HiveConnectorFactory::kHiveConnectorName) + ->newConnector(kHiveConnectorId, nullptr); + connector::registerConnector(hiveConnector); + + seed(initialSeed); + } + + struct PlanWithSplits { + core::PlanNodePtr plan; + std::vector splits; + }; + + struct FunctionsStats { + size_t numFunctions = 0; + size_t numSignatures = 0; + size_t numSupportedFunctions = 0; + size_t numSupportedSignatures = 0; + }; + + struct SignatureStats { + /// Number of times a signature was chosen. + size_t numRuns{0}; + + /// Number of times generated query plan failed. + size_t numFailed{0}; + }; + + enum ReferenceQueryErrorCode { + kSuccess, + kReferenceQueryFail, + kReferenceQueryUnsupported + }; + + protected: + static inline const std::string kHiveConnectorId = "test-hive"; + + bool addSignature( + const std::string& name, + const FunctionSignaturePtr& signature); + + void addAggregationSignatures( + const AggregateFunctionSignatureMap& signatureMap); + + std::shared_ptr findInputGenerator( + const CallableSignature& signature); + + static exec::Split makeSplit(const std::string& filePath); + + std::vector makeSplits( + const std::vector& inputs, + const std::string& path); + + PlanWithSplits deserialize(const folly::dynamic& obj); + + static VectorFuzzer::Options getFuzzerOptions( + VectorFuzzer::Options::TimestampPrecision timestampPrecision) { + VectorFuzzer::Options opts; + opts.vectorSize = FLAGS_batch_size; + opts.stringVariableLength = true; + opts.stringLength = 4'000; + opts.nullRatio = FLAGS_null_ratio; + opts.timestampPrecision = timestampPrecision; + return opts; + } + + void seed(size_t seed) { + currentSeed_ = seed; + vectorFuzzer_.reSeed(seed); + rng_.seed(currentSeed_); + } + + void reSeed() { + seed(rng_()); + } + + // Generate at least one and up to 5 scalar columns to be used as grouping, + // partition or sorting keys. + // Column names are generated using template 'N', where N is + // zero-based ordinal number of the column. + std::vector generateKeys( + const std::string& prefix, + std::vector& names, + std::vector& types); + + // Similar to generateKeys, but restricts types to orderable types (i.e. no + // maps). + std::vector generateSortingKeys( + const std::string& prefix, + std::vector& names, + std::vector& types); + + std::pair pickSignature(); + + std::vector generateInputData( + std::vector names, + std::vector types, + const std::optional& signature); + + // Generate a RowVector of the given types of children with an additional + // child named "row_number" of BIGINT row numbers that differentiates every + // row. Row numbers start from 0. This additional input vector is needed for + // result verification of window aggregations. + std::vector generateInputDataWithRowNumber( + std::vector names, + std::vector types, + const CallableSignature& signature); + + std::pair, ReferenceQueryErrorCode> + computeReferenceResults( + const core::PlanNodePtr& plan, + const std::vector& input); + + velox::test::ResultOrError execute( + const core::PlanNodePtr& plan, + const std::vector& splits = {}, + bool injectSpill = false, + bool abandonPartial = false, + int32_t maxDrivers = 2); + + // @param customVerification If false, results are compared as is. Otherwise, + // only row counts are compared. + // @param customVerifiers Custom verifier for each aggregate function. These + // can be null. If not null and customVerification is true, custom verifier is + // used to further verify the results. + void testPlan( + const PlanWithSplits& planWithSplits, + bool injectSpill, + bool abandonPartial, + bool customVerification, + const std::vector>& customVerifiers, + const velox::test::ResultOrError& expected, + int32_t maxDrivers = 2); + + void printSignatureStats(); + + const std::unordered_map> + customVerificationFunctions_; + const std::unordered_map> + customInputGenerators_; + const std::unordered_map queryConfigs_; + const bool persistAndRunOnce_; + const std::string reproPersistPath_; + + std::unique_ptr referenceQueryRunner_; + + std::vector signatures_; + std::vector signatureTemplates_; + + FunctionsStats functionsStats; + + // Stats for 'signatures_' and 'signatureTemplates_'. Stats for 'signatures_' + // come before stats for 'signatureTemplates_'. + std::vector signatureStats_; + + FuzzerGenerator rng_; + size_t currentSeed_{0}; + + std::shared_ptr rootPool_{ + memory::defaultMemoryManager().addRootPool()}; + std::shared_ptr pool_{rootPool_->addLeafChild("leaf")}; + VectorFuzzer vectorFuzzer_; +}; + +// Returns true if the elapsed time is greater than or equal to +// FLAGS_duration_sec. If FLAGS_duration_sec is 0, returns true if the +// iterations is greater than or equal to FLAGS_steps. +template +bool isDone(size_t i, T startTime) { + if (FLAGS_duration_sec > 0) { + std::chrono::duration elapsed = + std::chrono::system_clock::now() - startTime; + return elapsed.count() >= FLAGS_duration_sec; + } + return i >= FLAGS_steps; +} + +// Returns whether type is supported in TableScan. Empty Row type and Unknown +// type are not supported. +bool isTableScanSupported(const TypePtr& type); + +// Prints statistics about supported and unsupported function signatures. +void printStats(const AggregationFuzzerBase::FunctionsStats& stats); + +// Prints (n / total) in percentage format. +std::string printPercentageStat(size_t n, size_t total); + +// Make an aggregation call string for the given function name and arguments. +std::string makeFunctionCall( + const std::string& name, + const std::vector& argNames, + bool sortedInputs); + +// Returns a list of column names for c0 to cn. +std::vector makeNames(size_t n); + +// Persist plans to files under basePath. +void persistReproInfo( + const std::vector& plans, + const std::string& basePath); + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/AggregationFuzzerRunner.h b/velox/exec/tests/utils/AggregationFuzzerRunner.h index 3a06381019d30..06464332313ba 100644 --- a/velox/exec/tests/utils/AggregationFuzzerRunner.h +++ b/velox/exec/tests/utils/AggregationFuzzerRunner.h @@ -25,6 +25,7 @@ #include "velox/common/file/FileSystems.h" #include "velox/exec/Aggregate.h" #include "velox/exec/tests/utils/AggregationFuzzer.h" +#include "velox/exec/tests/utils/AggregationFuzzerUtils.h" #include "velox/parse/TypeResolver.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/vector/fuzzer/VectorFuzzer.h" @@ -65,50 +66,12 @@ namespace facebook::velox::exec::test { /// --seed 123 \ /// --v=1 \ /// --only "min,max" - class AggregationFuzzerRunner { public: - struct Options { - /// Comma-separated list of functions to test. By default, all functions - /// are tested. - std::string onlyFunctions; - - /// Set of functions to not test. - std::unordered_set skipFunctions; - - /// Set of functions whose results are non-deterministic. These can be - /// order-dependent functions whose results depend on the order of input - /// rows, or functions that return complex-typed results containing - /// floating-point fields. - /// - /// For some functions, the result can be transformed to a deterministic - /// value. If such transformation exists, it can be specified to be used for - /// results verification. If no transformation is specified, results are not - /// verified. - /// - /// Keys are function names. Values are optional transformations. "{}" - /// should be used to indicate the original value, i.e. "f({})" - /// transformation applies function 'f' to aggregation result. - std::unordered_map> - customVerificationFunctions; - - std::unordered_map> - customInputGenerators; - - /// Timestamp precision to use when generating inputs of type TIMESTAMP. - VectorFuzzer::Options::TimestampPrecision timestampPrecision{ - VectorFuzzer::Options::TimestampPrecision::kNanoSeconds}; - - /// A set of configuration properties to use when running query plans. - /// Could be used to specify timezone or enable/disable settings that - /// affect semantics of individual aggregate functions. - std::unordered_map queryConfigs; - }; - static int run( size_t seed, std::unique_ptr referenceQueryRunner, - const Options& options) { + const AggregationFuzzerOptions& options) { return runFuzzer( seed, std::nullopt, std::move(referenceQueryRunner), options); } @@ -119,12 +82,12 @@ class AggregationFuzzerRunner { return runFuzzer(0, planPath, std::move(referenceQueryRunner), {}); } - private: + protected: static int runFuzzer( size_t seed, const std::optional& planPath, std::unique_ptr referenceQueryRunner, - const Options& options) { + const AggregationFuzzerOptions& options) { auto signatures = facebook::velox::exec::getAggregateFunctionSignatures(); if (signatures.empty()) { LOG(ERROR) << "No aggregate functions registered."; @@ -156,51 +119,6 @@ class AggregationFuzzerRunner { // Calling gtest here so that it can be recognized as tests in CI systems. return RUN_ALL_TESTS(); } - - static std::unordered_set splitNames(const std::string& names) { - // Parse, lower case and trim it. - std::vector nameList; - folly::split(',', names, nameList); - std::unordered_set nameSet; - - for (const auto& it : nameList) { - auto str = folly::trimWhitespace(it).toString(); - folly::toLowerAscii(str); - nameSet.insert(str); - } - return nameSet; - } - - // Parse the comma separated list of function names, and use it to filter the - // input signatures. - static facebook::velox::exec::AggregateFunctionSignatureMap filterSignatures( - const facebook::velox::exec::AggregateFunctionSignatureMap& input, - const std::string& onlyFunctions, - const std::unordered_set& skipFunctions) { - if (onlyFunctions.empty() && skipFunctions.empty()) { - return input; - } - - facebook::velox::exec::AggregateFunctionSignatureMap output; - if (!onlyFunctions.empty()) { - // Parse, lower case and trim it. - auto nameSet = splitNames(onlyFunctions); - for (const auto& it : input) { - if (nameSet.count(it.first) > 0) { - output.insert(it); - } - } - } else { - output = input; - } - - for (auto s : skipFunctions) { - auto str = s; - folly::toLowerAscii(str); - output.erase(str); - } - return output; - } }; } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/AggregationFuzzerUtils.cpp b/velox/exec/tests/utils/AggregationFuzzerUtils.cpp new file mode 100644 index 0000000000000..86811f83b2b5a --- /dev/null +++ b/velox/exec/tests/utils/AggregationFuzzerUtils.cpp @@ -0,0 +1,34 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/exec/tests/utils/AggregationFuzzerUtils.h" + +namespace facebook::velox::exec::test { + +std::unordered_set splitNames(const std::string& names) { + // Parse, lower case and trim it. + std::vector nameList; + folly::split(',', names, nameList); + std::unordered_set nameSet; + + for (const auto& it : nameList) { + auto str = folly::trimWhitespace(it).toString(); + folly::toLowerAscii(str); + nameSet.insert(str); + } + return nameSet; +} + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/AggregationFuzzerUtils.h b/velox/exec/tests/utils/AggregationFuzzerUtils.h new file mode 100644 index 0000000000000..7eeff500c82fd --- /dev/null +++ b/velox/exec/tests/utils/AggregationFuzzerUtils.h @@ -0,0 +1,298 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include "velox/exec/tests/utils/AggregationFuzzerBase.h" + +namespace facebook::velox::exec::test { + +class MinMaxInputGenerator : public InputGenerator { + public: + MinMaxInputGenerator(const std::string& name) : indexOfN_{indexOfN(name)} {} + + std::vector generate( + const std::vector& types, + VectorFuzzer& fuzzer, + FuzzerGenerator& rng, + memory::MemoryPool* pool) override { + // TODO Generate inputs free of nested nulls. + if (types.size() <= indexOfN_) { + return {}; + } + + // Make sure to use the same value of 'n' for all batches in a given Fuzzer + // iteration. + if (!n_.has_value()) { + n_ = boost::random::uniform_int_distribution(0, 9'999)(rng); + } + + const auto size = fuzzer.getOptions().vectorSize; + + std::vector inputs; + inputs.reserve(types.size()); + for (auto i = 0; i < types.size() - 1; ++i) { + inputs.push_back(fuzzer.fuzz(types[i])); + } + + VELOX_CHECK( + types.back()->isBigint(), + "Unexpected type: {}", + types.back()->toString()) + inputs.push_back( + BaseVector::createConstant(BIGINT(), n_.value(), size, pool)); + return inputs; + } + + void reset() override { + n_.reset(); + } + + private: + // Returns zero-based index of the 'n' argument, 1 for min and max. 2 for + // min_by and max_by. + static int32_t indexOfN(const std::string& name) { + if (name == "min" || name == "max") { + return 1; + } + + if (name == "min_by" || name == "max_by") { + return 2; + } + + VELOX_FAIL("Unexpected function name: {}", name) + } + + // Zero-based index of the 'n' argument. + const int32_t indexOfN_; + std::optional n_; +}; + +class ApproxDistinctInputGenerator : public InputGenerator { + public: + std::vector generate( + const std::vector& types, + VectorFuzzer& fuzzer, + FuzzerGenerator& rng, + memory::MemoryPool* pool) override { + if (types.size() != 2) { + return {}; + } + + // Make sure to use the same value of 'e' for all batches in a given Fuzzer + // iteration. + if (!e_.has_value()) { + // Generate value in [0.0040625, 0.26] range. + static constexpr double kMin = 0.0040625; + static constexpr double kMax = 0.26; + e_ = kMin + (kMax - kMin) * boost::random::uniform_01()(rng); + } + + const auto size = fuzzer.getOptions().vectorSize; + + VELOX_CHECK( + types.back()->isDouble(), + "Unexpected type: {}", + types.back()->toString()) + return { + fuzzer.fuzz(types[0]), + BaseVector::createConstant(DOUBLE(), e_.value(), size, pool)}; + } + + void reset() override { + e_.reset(); + } + + private: + std::optional e_; +}; + +class ApproxPercentileInputGenerator : public InputGenerator { + public: + std::vector generate( + const std::vector& types, + VectorFuzzer& fuzzer, + FuzzerGenerator& rng, + memory::MemoryPool* pool) override { + // The arguments are: x, [w], percentile(s), [accuracy]. + // + // First argument is always 'x'. If second argument's type is BIGINT, then + // it is 'w'. Otherwise, it is percentile(x). + + const auto size = fuzzer.getOptions().vectorSize; + + std::vector inputs; + inputs.reserve(types.size()); + inputs.push_back(fuzzer.fuzz(types[0])); + + if (types[1]->isBigint()) { + velox::test::VectorMaker vectorMaker{pool}; + auto weight = vectorMaker.flatVector(size, [&](auto row) { + return boost::random::uniform_int_distribution(1, 1'000)(rng); + }); + + inputs.push_back(weight); + } + + const int percentileTypeIndex = types[1]->isBigint() ? 2 : 1; + const TypePtr& percentileType = types[percentileTypeIndex]; + if (percentileType->isDouble()) { + if (!percentile_.has_value()) { + percentile_ = pickPercentile(fuzzer, rng); + } + + inputs.push_back(BaseVector::createConstant( + DOUBLE(), percentile_.value(), size, pool)); + } else { + VELOX_CHECK(percentileType->isArray()); + VELOX_CHECK(percentileType->childAt(0)->isDouble()); + + if (percentiles_.empty()) { + percentiles_.push_back(pickPercentile(fuzzer, rng)); + percentiles_.push_back(pickPercentile(fuzzer, rng)); + percentiles_.push_back(pickPercentile(fuzzer, rng)); + } + + auto arrayVector = + BaseVector::create(ARRAY(DOUBLE()), 1, pool); + auto elementsVector = arrayVector->elements()->asFlatVector(); + elementsVector->resize(percentiles_.size()); + for (auto i = 0; i < percentiles_.size(); ++i) { + elementsVector->set(i, percentiles_[i]); + } + arrayVector->setOffsetAndSize(0, 0, percentiles_.size()); + + inputs.push_back(BaseVector::wrapInConstant(size, 0, arrayVector)); + } + + if (types.size() > percentileTypeIndex + 1) { + // Last argument is 'accuracy'. + VELOX_CHECK(types.back()->isDouble()); + if (!accuracy_.has_value()) { + accuracy_ = boost::random::uniform_01()(rng); + } + + inputs.push_back( + BaseVector::createConstant(DOUBLE(), accuracy_.value(), size, pool)); + } + + return inputs; + } + + void reset() override { + percentile_.reset(); + percentiles_.clear(); + accuracy_.reset(); + } + + private: + double pickPercentile(VectorFuzzer& fuzzer, FuzzerGenerator& rng) { + // 10% of the times generate random value in [0, 1] range. + // 90% of the times use one of the common values. + if (fuzzer.coinToss(0.1)) { + return boost::random::uniform_01()(rng); + } + + static const std::vector kPercentiles = { + 0.1, 0.25, 0.5, 0.75, 0.90, 0.95, 0.99, 0.999, 0.9999}; + + const auto index = + boost::random::uniform_int_distribution()(rng) % + kPercentiles.size(); + + return kPercentiles[index]; + } + + std::optional percentile_; + std::vector percentiles_; + std::optional accuracy_; +}; + +struct AggregationFuzzerOptions { + /// Comma-separated list of functions to test. By default, all functions + /// are tested. + std::string onlyFunctions; + + /// Set of functions to not test. + std::unordered_set skipFunctions; + + /// Set of functions whose results are non-deterministic. These can be + /// order-dependent functions whose results depend on the order of input + /// rows, or functions that return complex-typed results containing + /// floating-point fields. + /// + /// For some functions, the result can be transformed to a deterministic + /// value. If such transformation exists, it can be specified to be used for + /// results verification. If no transformation is specified, results are not + /// verified. + /// + /// Keys are function names. Values are optional transformations. "{}" + /// should be used to indicate the original value, i.e. "f({})" + /// transformation applies function 'f' to aggregation result. + std::unordered_map> + customVerificationFunctions; + + std::unordered_map> + customInputGenerators; + + std::unordered_set orderDependentFunctions; + + /// Timestamp precision to use when generating inputs of type TIMESTAMP. + VectorFuzzer::Options::TimestampPrecision timestampPrecision{ + VectorFuzzer::Options::TimestampPrecision::kNanoSeconds}; + + /// A set of configuration properties to use when running query plans. + /// Could be used to specify timezone or enable/disable settings that + /// affect semantics of individual aggregate functions. + std::unordered_map queryConfigs; +}; + +// Parse the comma separated list of function names and return a set of them. +std::unordered_set splitNames(const std::string& names); + +// Parse the comma separated list of function names, and use it to filter the +// input signatures. +template +SignatureMapType filterSignatures( + const SignatureMapType& input, + const std::string& onlyFunctions, + const std::unordered_set& skipFunctions) { + if (onlyFunctions.empty() && skipFunctions.empty()) { + return input; + } + + SignatureMapType output; + if (!onlyFunctions.empty()) { + // Parse, lower case and trim it. + auto nameSet = splitNames(onlyFunctions); + for (const auto& it : input) { + if (nameSet.count(it.first) > 0) { + output.insert(it); + } + } + } else { + output = input; + } + + for (auto s : skipFunctions) { + auto str = s; + folly::toLowerAscii(str); + output.erase(str); + } + return output; +} + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/CMakeLists.txt b/velox/exec/tests/utils/CMakeLists.txt index 6e409defbd3b7..80b48f24e73b3 100644 --- a/velox/exec/tests/utils/CMakeLists.txt +++ b/velox/exec/tests/utils/CMakeLists.txt @@ -55,7 +55,25 @@ target_link_libraries( velox_type_parser Folly::folly) +add_library(velox_aggregation_fuzzer_base AggregationFuzzerBase.cpp + AggregationFuzzerUtils.cpp) + +target_link_libraries( + velox_aggregation_fuzzer_base + velox_exec_test_lib + velox_temp_path + velox_common_base + velox_file + velox_hive_connector + velox_dwio_dwrf_reader + velox_dwio_dwrf_writer + velox_type + velox_vector_fuzzer + velox_exec_test_lib + velox_expression_test_utility) + add_library(velox_aggregation_fuzzer AggregationFuzzer.cpp) -target_link_libraries(velox_aggregation_fuzzer velox_type velox_vector_fuzzer - velox_exec_test_lib velox_expression_test_utility) +target_link_libraries( + velox_aggregation_fuzzer velox_type velox_vector_fuzzer velox_exec_test_lib + velox_expression_test_utility velox_aggregation_fuzzer_base)