From 87cb8c1383dbdc3c2d80ee82ce0cdb355ec08f1c Mon Sep 17 00:00:00 2001 From: Wei He Date: Thu, 14 Dec 2023 13:16:55 -0800 Subject: [PATCH] Extract AggregationFuzzerBase from AggregationFuzzer (#7916) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/7916 Extract AggregationFuzzerBase from AggregationFuzzer. This is needed for building WindowFuzzer that reuses common logic in AggregaitonFuzzerBase. This diff also moves fuzzer-related files to velox/exec/fuzzer, including – AggregationFuzzer.h/cpp – AggregationFuzzerRunner.h – ReferenceQueryRunner.h – DuckQueryRunner.h/cpp – PrestoQueryRunner.h/cpp – InputGenerator.h – ResultVerifier.h – TransformResultVerifier.h It also moves the Presto functions' custom input generators and result verifiers to velox/functions/prestosql/fuzzer, including – MinMaxInputGenerator.h – ApproxDistinctInputGenerator.h – ApproxDistinctResultVerifier.h – ApproxPercentileInputGenerator.h – ApproxPercentileResultVerifier.h This is the first piece of https://github.com/facebookincubator/velox/issues/7754. Reviewed By: mbasmanova Differential Revision: D51692940 fbshipit-source-id: cf8aec101f9d0b8d47a0db1e66e5f9b651f10414 --- velox/exec/CMakeLists.txt | 1 + .../utils => fuzzer}/AggregationFuzzer.cpp | 842 ++---------------- velox/exec/fuzzer/AggregationFuzzer.h | 46 + velox/exec/fuzzer/AggregationFuzzerBase.cpp | 652 ++++++++++++++ velox/exec/fuzzer/AggregationFuzzerBase.h | 277 ++++++ velox/exec/fuzzer/AggregationFuzzerOptions.h | 63 ++ .../AggregationFuzzerRunner.h | 94 +- velox/exec/fuzzer/CMakeLists.txt | 47 + .../utils => fuzzer}/DuckQueryRunner.cpp | 2 +- .../{tests/utils => fuzzer}/DuckQueryRunner.h | 2 +- velox/exec/fuzzer/InputGenerator.h | 38 + .../utils => fuzzer}/PrestoQueryRunner.cpp | 2 +- .../utils => fuzzer}/PrestoQueryRunner.h | 2 +- .../utils => fuzzer}/ReferenceQueryRunner.h | 0 .../ResultVerifier.h} | 45 +- velox/exec/fuzzer/TransformResultVerifier.h | 88 ++ velox/exec/tests/AggregationFuzzerTest.cpp | 775 +--------------- velox/exec/tests/AggregationRunnerTest.cpp | 4 +- velox/exec/tests/CMakeLists.txt | 15 +- velox/exec/tests/PrestoQueryRunnerTest.cpp | 2 +- .../exec/tests/SparkAggregationFuzzerTest.cpp | 8 +- velox/exec/tests/utils/CMakeLists.txt | 14 +- velox/expression/tests/utils/FuzzerToolkit.h | 45 + .../fuzzer/ApproxDistinctInputGenerator.h | 64 ++ .../fuzzer/ApproxDistinctResultVerifier.h | 185 ++++ .../fuzzer/ApproxPercentileInputGenerator.h | 126 +++ .../fuzzer/ApproxPercentileResultVerifier.h | 367 ++++++++ .../prestosql/fuzzer/MinMaxInputGenerator.h | 87 ++ 28 files changed, 2183 insertions(+), 1710 deletions(-) rename velox/exec/{tests/utils => fuzzer}/AggregationFuzzer.cpp (55%) create mode 100644 velox/exec/fuzzer/AggregationFuzzer.h create mode 100644 velox/exec/fuzzer/AggregationFuzzerBase.cpp create mode 100644 velox/exec/fuzzer/AggregationFuzzerBase.h create mode 100644 velox/exec/fuzzer/AggregationFuzzerOptions.h rename velox/exec/{tests/utils => fuzzer}/AggregationFuzzerRunner.h (55%) create mode 100644 velox/exec/fuzzer/CMakeLists.txt rename velox/exec/{tests/utils => fuzzer}/DuckQueryRunner.cpp (99%) rename velox/exec/{tests/utils => fuzzer}/DuckQueryRunner.h (97%) create mode 100644 velox/exec/fuzzer/InputGenerator.h rename velox/exec/{tests/utils => fuzzer}/PrestoQueryRunner.cpp (99%) rename velox/exec/{tests/utils => fuzzer}/PrestoQueryRunner.h (98%) rename velox/exec/{tests/utils => fuzzer}/ReferenceQueryRunner.h (100%) rename velox/exec/{tests/utils/AggregationFuzzer.h => fuzzer/ResultVerifier.h} (63%) create mode 100644 velox/exec/fuzzer/TransformResultVerifier.h create mode 100644 velox/functions/prestosql/fuzzer/ApproxDistinctInputGenerator.h create mode 100644 velox/functions/prestosql/fuzzer/ApproxDistinctResultVerifier.h create mode 100644 velox/functions/prestosql/fuzzer/ApproxPercentileInputGenerator.h create mode 100644 velox/functions/prestosql/fuzzer/ApproxPercentileResultVerifier.h create mode 100644 velox/functions/prestosql/fuzzer/MinMaxInputGenerator.h diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 99e58c51c50c0..44cae04081a92 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -102,6 +102,7 @@ target_link_libraries( velox_common_compression) if(${VELOX_BUILD_TESTING}) + add_subdirectory(fuzzer) add_subdirectory(tests) elseif(${VELOX_BUILD_TEST_UTILS}) add_subdirectory(tests/utils) diff --git a/velox/exec/tests/utils/AggregationFuzzer.cpp b/velox/exec/fuzzer/AggregationFuzzer.cpp similarity index 55% rename from velox/exec/tests/utils/AggregationFuzzer.cpp rename to velox/exec/fuzzer/AggregationFuzzer.cpp index a1acaf330401f..253c8f4bc0799 100644 --- a/velox/exec/tests/utils/AggregationFuzzer.cpp +++ b/velox/exec/fuzzer/AggregationFuzzer.cpp @@ -13,68 +13,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "velox/exec/tests/utils/AggregationFuzzer.h" +#include "velox/exec/fuzzer/AggregationFuzzer.h" + #include -#include "velox/common/base/Fs.h" -#include "velox/common/file/FileSystems.h" -#include "velox/connectors/hive/HiveConnector.h" -#include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/TableHandle.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" -#include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" -#include "velox/expression/SignatureBinder.h" -#include "velox/expression/tests/utils/ArgumentTypeFuzzer.h" #include "velox/exec/PartitionFunction.h" -#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/fuzzer/AggregationFuzzerBase.h" #include "velox/expression/tests/utils/FuzzerToolkit.h" #include "velox/vector/VectorSaver.h" #include "velox/vector/fuzzer/VectorFuzzer.h" -#include "velox/vector/tests/utils/VectorMaker.h" - -DEFINE_int32(steps, 10, "Number of plans to generate and execute."); - -DEFINE_int32( - duration_sec, - 0, - "For how long it should run (in seconds). If zero, " - "it executes exactly --steps iterations and exits."); - -DEFINE_int32( - batch_size, - 100, - "The number of elements on each generated vector."); - -DEFINE_int32(num_batches, 10, "The number of generated vectors."); - -DEFINE_int32( - max_num_varargs, - 5, - "The maximum number of variadic arguments fuzzer will generate for " - "functions that accept variadic arguments. Fuzzer will generate up to " - "max_num_varargs arguments for the variadic list in addition to the " - "required arguments by the function."); - -DEFINE_double( - null_ratio, - 0.1, - "Chance of adding a null constant to the plan, or null value in a vector " - "(expressed as double from 0 to 1)."); - -DEFINE_string( - repro_persist_path, - "", - "Directory path for persistence of data and SQL when fuzzer fails for " - "future reproduction. Empty string disables this feature."); - -DEFINE_bool( - enable_window_reference_verification, - false, - "When true, the results of the window aggregation are compared to reference DB results"); DEFINE_bool( enable_sorted_aggregations, @@ -82,24 +35,20 @@ DEFINE_bool( "When true, generates plans with aggregations over sorted inputs"); DEFINE_bool( - persist_and_run_once, - false, - "Persist repro info before evaluation and only run one iteration. " - "This is to rerun with the seed number and persist repro info upon a " - "crash failure. Only effective if repro_persist_path is set."); - -DEFINE_bool( - log_signature_stats, + enable_window_reference_verification, false, - "Log statistics about function signatures"); + "When true, the results of the window aggregation are compared to reference DB results"); using facebook::velox::test::CallableSignature; using facebook::velox::test::SignatureTemplate; namespace facebook::velox::exec::test { + +class AggregationFuzzerBase; + namespace { -class AggregationFuzzer { +class AggregationFuzzer : public AggregationFuzzerBase { public: AggregationFuzzer( AggregateFunctionSignatureMap signatureMap, @@ -112,28 +61,10 @@ class AggregationFuzzer { const std::unordered_map& queryConfigs, std::unique_ptr referenceQueryRunner); - struct PlanWithSplits { - core::PlanNodePtr plan; - std::vector splits; - }; - void go(); void go(const std::string& planPath); private: - static inline const std::string kHiveConnectorId = "test-hive"; - - std::shared_ptr findInputGenerator( - const CallableSignature& signature); - - static exec::Split makeSplit(const std::string& filePath); - - std::vector makeSplits( - const std::vector& inputs, - const std::string& path); - - PlanWithSplits deserialize(const folly::dynamic& obj); - struct Stats { // Names of functions that were tested. std::unordered_set functionNames; @@ -177,66 +108,8 @@ class AggregationFuzzer { void print(size_t numIterations) const; }; - static VectorFuzzer::Options getFuzzerOptions( - VectorFuzzer::Options::TimestampPrecision timestampPrecision) { - VectorFuzzer::Options opts; - opts.vectorSize = FLAGS_batch_size; - opts.stringVariableLength = true; - opts.stringLength = 4'000; - opts.nullRatio = FLAGS_null_ratio; - opts.timestampPrecision = timestampPrecision; - return opts; - } - - void seed(size_t seed) { - currentSeed_ = seed; - vectorFuzzer_.reSeed(seed); - rng_.seed(currentSeed_); - } - - void reSeed() { - seed(rng_()); - } - - // Generate at least one and up to 5 scalar columns to be used as grouping, - // partition or sorting keys. - // Column names are generated using template 'N', where N is - // zero-based ordinal number of the column. - std::vector generateKeys( - const std::string& prefix, - std::vector& names, - std::vector& types); - - // Similar to generateKeys, but restricts types to orderable types (i.e. no - // maps). - std::vector generateSortingKeys( - const std::string& prefix, - std::vector& names, - std::vector& types); - - struct SignatureStats { - /// Number of times a signature was chosen. - size_t numRuns{0}; - - /// Number of times generated query plan failed. - size_t numFailed{0}; - }; - - std::pair pickSignature(); - - std::vector generateInputData( - std::vector names, - std::vector types, - const std::optional& signature); - - // Generate a RowVector of the given types of children with an additional - // child named "row_number" of BIGINT row numbers that differentiates every - // row. Row numbers start from 0. This additional input vector is needed for - // result verification of window aggregations. - std::vector generateInputDataWithRowNumber( - std::vector names, - std::vector types, - const CallableSignature& signature); + void updateReferenceQueryStats( + AggregationFuzzerBase::ReferenceQueryErrorCode errorCode); // Return 'true' if query plans failed. bool verifyWindow( @@ -265,17 +138,6 @@ class AggregationFuzzer { void verifyAggregation(const std::vector& plans); - std::optional computeReferenceResults( - const core::PlanNodePtr& plan, - const std::vector& input); - - velox::test::ResultOrError execute( - const core::PlanNodePtr& plan, - const std::vector& splits = {}, - bool injectSpill = false, - bool abandonPartial = false, - int32_t maxDrivers = 2); - static bool hasPartialGroupBy(const core::PlanNodePtr& plan) { auto partialAgg = core::PlanNode::findFirstNode( plan.get(), [](const core::PlanNode* node) { @@ -335,47 +197,6 @@ class AggregationFuzzer { } } - // @param customVerification If false, results are compared as is. Otherwise, - // only row counts are compared. - // @param customVerifiers Custom verifier for each aggregate function. These - // can be null. If not null and customVerification is true, custom verifier is - // used to further verify the results. - void testPlan( - const PlanWithSplits& planWithSplits, - bool injectSpill, - bool abandonPartial, - bool customVerification, - const std::vector>& customVerifiers, - const velox::test::ResultOrError& expected, - int32_t maxDrivers = 2); - - void printSignatureStats(); - - const std::unordered_map> - customVerificationFunctions_; - const std::unordered_map> - customInputGenerators_; - const std::unordered_map queryConfigs_; - const bool persistAndRunOnce_; - const std::string reproPersistPath_; - - std::unique_ptr referenceQueryRunner_; - - std::vector signatures_; - std::vector signatureTemplates_; - - // Stats for 'signatures_' and 'signatureTemplates_'. Stats for 'signatures_' - // come before stats for 'signatureTemplates_'. - std::vector signatureStats_; - - FuzzerGenerator rng_; - size_t currentSeed_{0}; - - std::shared_ptr rootPool_{ - memory::defaultMemoryManager().addRootPool()}; - std::shared_ptr pool_{rootPool_->addLeafChild("leaf")}; - VectorFuzzer vectorFuzzer_; - Stats stats_; }; } // namespace @@ -405,34 +226,9 @@ void aggregateFuzzer( namespace { -std::string printStat(size_t n, size_t total) { - return fmt::format("{} ({:.2f}%)", n, (double)n / total * 100); -} - -void printStats( - size_t numFunctions, - size_t numSignatures, - size_t numSupportedFunctions, - size_t numSupportedSignatures) { - LOG(INFO) << fmt::format( - "Total functions: {} ({} signatures)", numFunctions, numSignatures); - LOG(INFO) << "Functions with at least one supported signature: " - << printStat(numSupportedFunctions, numFunctions); - - size_t numNotSupportedFunctions = numFunctions - numSupportedFunctions; - LOG(INFO) << "Functions with no supported signature: " - << printStat(numNotSupportedFunctions, numFunctions); - LOG(INFO) << "Supported function signatures: " - << printStat(numSupportedSignatures, numSignatures); - - size_t numNotSupportedSignatures = numSignatures - numSupportedSignatures; - LOG(INFO) << "Unsupported function signatures: " - << printStat(numNotSupportedSignatures, numSignatures); -} - AggregationFuzzer::AggregationFuzzer( AggregateFunctionSignatureMap signatureMap, - size_t initialSeed, + size_t seed, const std::unordered_map>& customVerificationFunctions, const std::unordered_map>& @@ -440,21 +236,13 @@ AggregationFuzzer::AggregationFuzzer( VectorFuzzer::Options::TimestampPrecision timestampPrecision, const std::unordered_map& queryConfigs, std::unique_ptr referenceQueryRunner) - : customVerificationFunctions_{customVerificationFunctions}, - customInputGenerators_{customInputGenerators}, - queryConfigs_{queryConfigs}, - persistAndRunOnce_{FLAGS_persist_and_run_once}, - reproPersistPath_{FLAGS_repro_persist_path}, - referenceQueryRunner_{std::move(referenceQueryRunner)}, - vectorFuzzer_{getFuzzerOptions(timestampPrecision), pool_.get()} { - filesystems::registerLocalFileSystem(); - auto hiveConnector = - connector::getConnectorFactory( - connector::hive::HiveConnectorFactory::kHiveConnectorName) - ->newConnector(kHiveConnectorId, std::make_shared()); - connector::registerConnector(hiveConnector); - - seed(initialSeed); + : AggregationFuzzerBase{ + seed, + customVerificationFunctions, + customInputGenerators, + timestampPrecision, + queryConfigs, + std::move(referenceQueryRunner)} { VELOX_CHECK(!signatureMap.empty(), "No function signatures available."); if (persistAndRunOnce_ && reproPersistPath_.empty()) { @@ -464,88 +252,8 @@ AggregationFuzzer::AggregationFuzzer( exit(1); } - size_t numFunctions = 0; - size_t numSignatures = 0; - size_t numSupportedFunctions = 0; - size_t numSupportedSignatures = 0; - - for (auto& [name, signatures] : signatureMap) { - ++numFunctions; - bool hasSupportedSignature = false; - for (auto& signature : signatures) { - ++numSignatures; - - if (signature->variableArity()) { - LOG(WARNING) << "Skipping variadic function signature: " << name - << signature->toString(); - continue; - } - - if (!signature->variables().empty()) { - bool skip = false; - std::unordered_set typeVariables; - for (auto& [variableName, variable] : signature->variables()) { - if (variable.isIntegerParameter()) { - LOG(WARNING) << "Skipping generic function signature: " << name - << signature->toString(); - skip = true; - break; - } - - typeVariables.insert(variableName); - } - if (skip) { - continue; - } - - signatureTemplates_.push_back( - {name, signature.get(), std::move(typeVariables)}); - } else { - CallableSignature callable{ - .name = name, - .args = {}, - .returnType = SignatureBinder::tryResolveType( - signature->returnType(), {}, {}), - .constantArgs = {}}; - VELOX_CHECK_NOT_NULL(callable.returnType); - - // Process each argument and figure out its type. - for (const auto& arg : signature->argumentTypes()) { - auto resolvedType = SignatureBinder::tryResolveType(arg, {}, {}); - VELOX_CHECK_NOT_NULL(resolvedType); - - // SignatureBinder::tryResolveType produces ROW types with empty field - // names. These won't work with TableScan. - if (resolvedType->isRow()) { - std::vector names; - for (auto i = 0; i < resolvedType->size(); ++i) { - names.push_back(fmt::format("field{}", i)); - } - - std::vector types = resolvedType->asRow().children(); - - resolvedType = ROW(std::move(names), std::move(types)); - } - - callable.args.emplace_back(resolvedType); - } - - signatures_.emplace_back(callable); - } - - ++numSupportedSignatures; - hasSupportedSignature = true; - } - if (hasSupportedSignature) { - ++numSupportedFunctions; - } - } - - printStats( - numFunctions, - numSignatures, - numSupportedFunctions, - numSupportedSignatures); + addAggregationSignatures(signatureMap); + printStats(functionsStats); sortCallableSignatures(signatures_); sortSignatureTemplates(signatureTemplates_); @@ -553,271 +261,6 @@ AggregationFuzzer::AggregationFuzzer( signatureStats_.resize(signatures_.size() + signatureTemplates_.size()); } -template -bool isDone(size_t i, T startTime) { - if (FLAGS_duration_sec > 0) { - std::chrono::duration elapsed = - std::chrono::system_clock::now() - startTime; - return elapsed.count() >= FLAGS_duration_sec; - } - return i >= FLAGS_steps; -} - -std::string makeFunctionCall( - const std::string& name, - const std::vector& argNames, - bool sortedInputs) { - std::ostringstream call; - call << name << "(" << folly::join(", ", argNames); - if (sortedInputs) { - call << " ORDER BY " << folly::join(", ", argNames); - } - call << ")"; - - return call.str(); -} - -std::vector makeNames(size_t n) { - std::vector names; - for (auto i = 0; i < n; ++i) { - names.push_back(fmt::format("c{}", i)); - } - return names; -} - -folly::dynamic serialize( - const AggregationFuzzer::PlanWithSplits& planWithSplits, - const std::string& dirPath, - std::unordered_map& filePaths) { - folly::dynamic obj = folly::dynamic::object(); - obj["plan"] = planWithSplits.plan->serialize(); - if (planWithSplits.splits.empty()) { - return obj; - } - - folly::dynamic jsonSplits = folly::dynamic::array(); - jsonSplits.reserve(planWithSplits.splits.size()); - for (const auto& split : planWithSplits.splits) { - const auto filePath = - std::dynamic_pointer_cast( - split.connectorSplit) - ->filePath; - if (filePaths.count(filePath) == 0) { - const auto newFilePath = fmt::format("{}/{}", dirPath, filePaths.size()); - fs::copy(filePath, newFilePath); - filePaths.insert({filePath, newFilePath}); - } - jsonSplits.push_back(filePaths.at(filePath)); - } - obj["splits"] = jsonSplits; - return obj; -} - -void persistReproInfo( - const std::vector& plans, - const std::string& basePath) { - if (!common::generateFileDirectory(basePath.c_str())) { - return; - } - - // Create a new directory - const auto dirPathOptional = - common::generateTempFolderPath(basePath.c_str(), "aggregationVerifier"); - if (!dirPathOptional.has_value()) { - LOG(ERROR) - << "Failed to create directory for persisting plans using base path: " - << basePath; - return; - } - - const auto dirPath = dirPathOptional.value(); - - // Save plans and splits. - const std::string planPath = fmt::format("{}/{}", dirPath, kPlanNodeFileName); - std::unordered_map filePaths; - try { - folly::dynamic array = folly::dynamic::array(); - array.reserve(plans.size()); - for (auto planWithSplits : plans) { - array.push_back(serialize(planWithSplits, dirPath, filePaths)); - } - auto planJson = folly::toJson(array); - saveStringToFile(planJson, planPath.c_str()); - LOG(INFO) << "Persisted aggregation plans to " << planPath; - } catch (std::exception& e) { - LOG(ERROR) << "Failed to store aggregation plans to " << planPath << ": " - << e.what(); - } -} - -std::pair -AggregationFuzzer::pickSignature() { - size_t idx = boost::random::uniform_int_distribution( - 0, signatures_.size() + signatureTemplates_.size() - 1)(rng_); - CallableSignature signature; - if (idx < signatures_.size()) { - signature = signatures_[idx]; - } else { - const auto& signatureTemplate = - signatureTemplates_[idx - signatures_.size()]; - signature.name = signatureTemplate.name; - velox::test::ArgumentTypeFuzzer typeFuzzer( - *signatureTemplate.signature, rng_); - VELOX_CHECK(typeFuzzer.fuzzArgumentTypes(FLAGS_max_num_varargs)); - signature.args = typeFuzzer.argumentTypes(); - } - - return {signature, signatureStats_[idx]}; -} - -std::vector AggregationFuzzer::generateKeys( - const std::string& prefix, - std::vector& names, - std::vector& types) { - static const std::vector kNonFloatingPointTypes{ - BOOLEAN(), - TINYINT(), - SMALLINT(), - INTEGER(), - BIGINT(), - VARCHAR(), - VARBINARY(), - TIMESTAMP(), - }; - - auto numKeys = boost::random::uniform_int_distribution(1, 5)(rng_); - std::vector keys; - for (auto i = 0; i < numKeys; ++i) { - keys.push_back(fmt::format("{}{}", prefix, i)); - - // Pick random, possibly complex, type. - types.push_back(vectorFuzzer_.randType(kNonFloatingPointTypes, 2)); - names.push_back(keys.back()); - } - return keys; -} - -std::vector AggregationFuzzer::generateSortingKeys( - const std::string& prefix, - std::vector& names, - std::vector& types) { - auto numKeys = boost::random::uniform_int_distribution(1, 5)(rng_); - std::vector keys; - for (auto i = 0; i < numKeys; ++i) { - keys.push_back(fmt::format("{}{}", prefix, i)); - - // Pick random, possibly complex, type. - types.push_back(vectorFuzzer_.randOrderableType(2)); - names.push_back(keys.back()); - } - return keys; -} - -std::shared_ptr AggregationFuzzer::findInputGenerator( - const CallableSignature& signature) { - auto generatorIt = customInputGenerators_.find(signature.name); - if (generatorIt != customInputGenerators_.end()) { - return generatorIt->second; - } - - return nullptr; -} - -std::vector AggregationFuzzer::generateInputData( - std::vector names, - std::vector types, - const std::optional& signature) { - std::shared_ptr generator; - if (signature.has_value()) { - generator = findInputGenerator(signature.value()); - } - - const auto size = vectorFuzzer_.getOptions().vectorSize; - - auto inputType = ROW(std::move(names), std::move(types)); - std::vector input; - for (auto i = 0; i < FLAGS_num_batches; ++i) { - std::vector children; - - if (generator != nullptr) { - children = generator->generate( - signature->args, vectorFuzzer_, rng_, pool_.get()); - } - - for (auto i = children.size(); i < inputType->size(); ++i) { - children.push_back(vectorFuzzer_.fuzz(inputType->childAt(i), size)); - } - - input.push_back(std::make_shared( - pool_.get(), inputType, nullptr, size, std::move(children))); - } - - if (generator != nullptr) { - generator->reset(); - } - - return input; -} - -std::vector AggregationFuzzer::generateInputDataWithRowNumber( - std::vector names, - std::vector types, - const CallableSignature& signature) { - names.push_back("row_number"); - types.push_back(BIGINT()); - - auto generator = findInputGenerator(signature); - - std::vector input; - auto size = vectorFuzzer_.getOptions().vectorSize; - velox::test::VectorMaker vectorMaker{pool_.get()}; - int64_t rowNumber = 0; - for (auto j = 0; j < FLAGS_num_batches; ++j) { - std::vector children; - - if (generator != nullptr) { - children = - generator->generate(signature.args, vectorFuzzer_, rng_, pool_.get()); - } - - for (auto i = children.size(); i < types.size() - 1; ++i) { - children.push_back(vectorFuzzer_.fuzz(types[i], size)); - } - children.push_back(vectorMaker.flatVector( - size, [&](auto /*row*/) { return rowNumber++; })); - input.push_back(vectorMaker.rowVector(names, children)); - } - - if (generator != nullptr) { - generator->reset(); - } - - return input; -} - -// static -exec::Split AggregationFuzzer::makeSplit(const std::string& filePath) { - return exec::Split{std::make_shared( - kHiveConnectorId, filePath, dwio::common::FileFormat::DWRF)}; -} - -AggregationFuzzer::PlanWithSplits AggregationFuzzer::deserialize( - const folly::dynamic& obj) { - auto plan = velox::ISerializable::deserialize( - obj["plan"], pool_.get()); - - std::vector splits; - if (obj.count("splits") > 0) { - auto paths = - ISerializable::deserialize>(obj["splits"]); - for (const auto& path : paths) { - splits.push_back(makeSplit(path)); - } - } - - return PlanWithSplits{plan, splits}; -} - void AggregationFuzzer::go(const std::string& planPath) { Type::registerSerDe(); connector::hive::HiveTableHandle::registerSerDe(); @@ -992,103 +435,6 @@ void AggregationFuzzer::go() { printSignatureStats(); } -void AggregationFuzzer::printSignatureStats() { - if (!FLAGS_log_signature_stats) { - return; - } - - for (auto i = 0; i < signatureStats_.size(); ++i) { - const auto& stats = signatureStats_[i]; - if (stats.numRuns == 0) { - continue; - } - - if (stats.numFailed * 1.0 / stats.numRuns < 0.5) { - continue; - } - - if (i < signatures_.size()) { - LOG(INFO) << "Signature #" << i << " failed " << stats.numFailed - << " out of " << stats.numRuns - << " times: " << signatures_[i].toString(); - } else { - const auto& signatureTemplate = - signatureTemplates_[i - signatures_.size()]; - LOG(INFO) << "Signature template #" << i << " failed " << stats.numFailed - << " out of " << stats.numRuns - << " times: " << signatureTemplate.name << "(" - << signatureTemplate.signature->toString() << ")"; - } - } -} - -velox::test::ResultOrError AggregationFuzzer::execute( - const core::PlanNodePtr& plan, - const std::vector& splits, - bool injectSpill, - bool abandonPartial, - int32_t maxDrivers) { - LOG(INFO) << "Executing query plan: " << std::endl - << plan->toString(true, true); - - velox::test::ResultOrError resultOrError; - try { - std::shared_ptr spillDirectory; - AssertQueryBuilder builder(plan); - - builder.configs(queryConfigs_); - - if (injectSpill) { - spillDirectory = exec::test::TempDirectoryPath::create(); - builder.spillDirectory(spillDirectory->path) - .config(core::QueryConfig::kSpillEnabled, "true") - .config(core::QueryConfig::kAggregationSpillEnabled, "true") - .config(core::QueryConfig::kTestingSpillPct, "100"); - } - - if (abandonPartial) { - builder.config(core::QueryConfig::kAbandonPartialAggregationMinRows, "1") - .config(core::QueryConfig::kAbandonPartialAggregationMinPct, "0") - .config(core::QueryConfig::kMaxPartialAggregationMemory, "0") - .config(core::QueryConfig::kMaxExtendedPartialAggregationMemory, "0"); - } - - if (!splits.empty()) { - builder.splits(splits); - } - - resultOrError.result = - builder.maxDrivers(maxDrivers).copyResults(pool_.get()); - } catch (VeloxUserError& e) { - // NOTE: velox user exception is accepted as it is caused by the invalid - // fuzzer test inputs. - resultOrError.exceptionPtr = std::current_exception(); - } - - return resultOrError; -} - -std::optional -AggregationFuzzer::computeReferenceResults( - const core::PlanNodePtr& plan, - const std::vector& input) { - if (auto sql = referenceQueryRunner_->toSql(plan)) { - try { - return referenceQueryRunner_->execute( - sql.value(), input, plan->outputType()); - } catch (std::exception& e) { - ++stats_.numReferenceQueryFailed; - LOG(WARNING) << "Query failed in the reference DB"; - return std::nullopt; - } - } else { - LOG(INFO) << "Query not supported by the reference DB"; - ++stats_.numVerificationNotSupported; - } - - return std::nullopt; -} - void makeAlternativePlansWithValues( const std::vector& groupingKeys, const std::vector& aggregates, @@ -1263,57 +609,12 @@ void makeStreamingPlansWithTableScan( .planNode()); } -void AggregationFuzzer::testPlan( - const PlanWithSplits& planWithSplits, - bool injectSpill, - bool abandonPartial, - bool customVerification, - const std::vector>& customVerifiers, - const velox::test::ResultOrError& expected, - int32_t maxDrivers) { - auto actual = execute( - planWithSplits.plan, - planWithSplits.splits, - injectSpill, - abandonPartial, - maxDrivers); - - // Compare results or exceptions (if any). Fail is anything is different. - if (expected.exceptionPtr || actual.exceptionPtr) { - // Throws in case exceptions are not compatible. - velox::test::compareExceptions(expected.exceptionPtr, actual.exceptionPtr); - return; - } - - if (!customVerification) { - VELOX_CHECK( - assertEqualResults({expected.result}, {actual.result}), - "Logically equivalent plans produced different results"); - return; - } - - VELOX_CHECK_EQ( - expected.result->size(), - actual.result->size(), - "Logically equivalent plans produced different number of rows"); - - for (auto& verifier : customVerifiers) { - if (verifier == nullptr) { - continue; - } - - if (verifier->supportsCompare()) { - VELOX_CHECK( - verifier->compare(expected.result, actual.result), - "Logically equivalent plans produced different results"); - } else if (verifier->supportsVerify()) { - VELOX_CHECK( - verifier->verify(actual.result), - "Result of a logically equivalent plan failed custom verification"); - } else { - VELOX_UNREACHABLE( - "Custom verifier must support either 'compare' or 'verify' API."); - } +void AggregationFuzzer::updateReferenceQueryStats( + AggregationFuzzerBase::ReferenceQueryErrorCode errorCode) { + if (errorCode == ReferenceQueryErrorCode::kReferenceQueryFail) { + ++stats_.numReferenceQueryFailed; + } else if (errorCode == ReferenceQueryErrorCode::kReferenceQueryUnsupported) { + ++stats_.numVerificationNotSupported; } } @@ -1348,7 +649,9 @@ bool AggregationFuzzer::verifyWindow( if (!customVerification && enableWindowVerification) { if (resultOrError.result) { - if (auto expectedResult = computeReferenceResults(plan, input)) { + auto referenceResult = computeReferenceResults(plan, input); + updateReferenceQueryStats(referenceResult.second); + if (auto expectedResult = referenceResult.first) { ++stats_.numVerified; VELOX_CHECK( assertEqualResults( @@ -1372,57 +675,6 @@ bool AggregationFuzzer::verifyWindow( } } -namespace { -void writeToFile( - const std::string& path, - const VectorPtr& vector, - memory::MemoryPool* pool) { - dwrf::WriterOptions options; - options.schema = vector->type(); - options.memoryPool = pool; - auto writeFile = std::make_unique(path, true, false); - auto sink = - std::make_unique(std::move(writeFile), path); - dwrf::Writer writer(std::move(sink), options); - writer.write(vector); - writer.close(); -} - -bool isTableScanSupported(const TypePtr& type) { - if (type->kind() == TypeKind::ROW && type->size() == 0) { - return false; - } - if (type->kind() == TypeKind::UNKNOWN) { - return false; - } - if (type->kind() == TypeKind::HUGEINT) { - return false; - } - - for (auto i = 0; i < type->size(); ++i) { - if (!isTableScanSupported(type->childAt(i))) { - return false; - } - } - - return true; -} -} // namespace - -std::vector AggregationFuzzer::makeSplits( - const std::vector& inputs, - const std::string& path) { - std::vector splits; - auto writerPool = rootPool_->addAggregateChild("writer"); - for (auto i = 0; i < inputs.size(); ++i) { - const std::string filePath = fmt::format("{}/{}", path, i); - writeToFile(filePath, inputs[i], writerPool.get()); - splits.push_back(makeSplit(filePath)); - } - - return splits; -} - void resetCustomVerifiers( const std::vector>& customVerifiers) { for (auto& verifier : customVerifiers) { @@ -1544,7 +796,9 @@ bool AggregationFuzzer::verifyAggregation( std::optional expectedResult; if (resultOrError.result != nullptr) { if (!customVerification) { - expectedResult = computeReferenceResults(firstPlan, input); + auto referenceResult = computeReferenceResults(firstPlan, input); + updateReferenceQueryStats(referenceResult.second); + expectedResult = referenceResult.first; } else { ++stats_.numVerificationSkipped; @@ -1595,7 +849,9 @@ bool AggregationFuzzer::verifySortedAggregation( ++stats_.numFailed; } - auto expectedResult = computeReferenceResults(firstPlan, input); + auto referenceResult = computeReferenceResults(firstPlan, input); + updateReferenceQueryStats(referenceResult.second); + auto expectedResult = referenceResult.first; if (expectedResult && resultOrError.result) { ++stats_.numVerified; VELOX_CHECK( @@ -1697,7 +953,9 @@ void AggregationFuzzer::verifyAggregation( std::optional expectedResult; if (!customVerification) { - expectedResult = computeReferenceResults(plan, input); + auto referenceResult = computeReferenceResults(plan, input); + updateReferenceQueryStats(referenceResult.second); + expectedResult = referenceResult.first; } if (expectedResult && resultOrError.result) { @@ -1716,26 +974,26 @@ void AggregationFuzzer::verifyAggregation( void AggregationFuzzer::Stats::print(size_t numIterations) const { LOG(INFO) << "Total functions tested: " << functionNames.size(); LOG(INFO) << "Total masked aggregations: " - << printStat(numMask, numIterations); + << printPercentageStat(numMask, numIterations); LOG(INFO) << "Total global aggregations: " - << printStat(numGlobal, numIterations); + << printPercentageStat(numGlobal, numIterations); LOG(INFO) << "Total group-by aggregations: " - << printStat(numGroupBy, numIterations); + << printPercentageStat(numGroupBy, numIterations); LOG(INFO) << "Total distinct aggregations: " - << printStat(numDistinct, numIterations); + << printPercentageStat(numDistinct, numIterations); LOG(INFO) << "Total aggregations over sorted inputs: " - << printStat(numSortedInputs, numIterations); + << printPercentageStat(numSortedInputs, numIterations); LOG(INFO) << "Total window expressions: " - << printStat(numWindow, numIterations); + << printPercentageStat(numWindow, numIterations); LOG(INFO) << "Total aggregations verified against reference DB: " - << printStat(numVerified, numIterations); + << printPercentageStat(numVerified, numIterations); LOG(INFO) << "Total aggregations not verified (non-deterministic function / not supported by reference DB / reference DB failed): " - << printStat(numVerificationSkipped, numIterations) << " / " - << printStat(numVerificationNotSupported, numIterations) << " / " - << printStat(numReferenceQueryFailed, numIterations); + << printPercentageStat(numVerificationSkipped, numIterations) << " / " + << printPercentageStat(numVerificationNotSupported, numIterations) + << " / " << printPercentageStat(numReferenceQueryFailed, numIterations); LOG(INFO) << "Total failed aggregations: " - << printStat(numFailed, numIterations); + << printPercentageStat(numFailed, numIterations); } } // namespace diff --git a/velox/exec/fuzzer/AggregationFuzzer.h b/velox/exec/fuzzer/AggregationFuzzer.h new file mode 100644 index 0000000000000..a07fad4522e8c --- /dev/null +++ b/velox/exec/fuzzer/AggregationFuzzer.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/exec/Aggregate.h" +#include "velox/exec/fuzzer/AggregationFuzzerBase.h" +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" + +namespace facebook::velox::exec::test { + +/// Runs the aggregation fuzzer. +/// @param signatureMap Map of all aggregate function signatures. +/// @param seed Random seed - Pass the same seed for reproducibility. +/// @param orderDependentFunctions Map of functions that depend on order of +/// input. +/// @param planPath Path to persisted plan information. If this is +/// supplied, fuzzer will only verify the plans. +/// @param referenceQueryRunner Reference query runner for results +/// verification. +void aggregateFuzzer( + AggregateFunctionSignatureMap signatureMap, + size_t seed, + const std::unordered_map>& + orderDependentFunctions, + const std::unordered_map>& + customInputGenerators, + VectorFuzzer::Options::TimestampPrecision timestampPrecision, + const std::unordered_map& queryConfigs, + const std::optional& planPath, + std::unique_ptr referenceQueryRunner); + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.cpp b/velox/exec/fuzzer/AggregationFuzzerBase.cpp new file mode 100644 index 0000000000000..cb2e0d0d34728 --- /dev/null +++ b/velox/exec/fuzzer/AggregationFuzzerBase.cpp @@ -0,0 +1,652 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/exec/fuzzer/AggregationFuzzerBase.h" + +#include +#include "velox/common/base/Fs.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/dwio/dwrf/reader/DwrfReader.h" +#include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/expression/SignatureBinder.h" +#include "velox/expression/tests/utils/ArgumentTypeFuzzer.h" +#include "velox/vector/VectorSaver.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" + +DEFINE_int32(steps, 10, "Number of plans to generate and execute."); + +DEFINE_int32( + duration_sec, + 0, + "For how long it should run (in seconds). If zero, " + "it executes exactly --steps iterations and exits."); + +DEFINE_int32( + batch_size, + 100, + "The number of elements on each generated vector."); + +DEFINE_int32(num_batches, 10, "The number of generated vectors."); + +DEFINE_int32( + max_num_varargs, + 5, + "The maximum number of variadic arguments fuzzer will generate for " + "functions that accept variadic arguments. Fuzzer will generate up to " + "max_num_varargs arguments for the variadic list in addition to the " + "required arguments by the function."); + +DEFINE_double( + null_ratio, + 0.1, + "Chance of adding a null constant to the plan, or null value in a vector " + "(expressed as double from 0 to 1)."); + +DEFINE_string( + repro_persist_path, + "", + "Directory path for persistence of data and SQL when fuzzer fails for " + "future reproduction. Empty string disables this feature."); + +DEFINE_bool( + persist_and_run_once, + false, + "Persist repro info before evaluation and only run one iteration. " + "This is to rerun with the seed number and persist repro info upon a " + "crash failure. Only effective if repro_persist_path is set."); + +DEFINE_bool( + log_signature_stats, + false, + "Log statistics about function signatures"); + +namespace facebook::velox::exec::test { + +bool AggregationFuzzerBase::addSignature( + const std::string& name, + const FunctionSignaturePtr& signature) { + ++functionsStats.numSignatures; + + if (signature->variableArity()) { + LOG(WARNING) << "Skipping variadic function signature: " << name + << signature->toString(); + return false; + } + + if (!signature->variables().empty()) { + bool skip = false; + std::unordered_set typeVariables; + for (auto& [variableName, variable] : signature->variables()) { + if (variable.isIntegerParameter()) { + LOG(WARNING) << "Skipping generic function signature: " << name + << signature->toString(); + skip = true; + break; + } + + typeVariables.insert(variableName); + } + if (skip) { + return false; + } + + signatureTemplates_.push_back( + {name, signature.get(), std::move(typeVariables)}); + } else { + CallableSignature callable{ + .name = name, + .args = {}, + .returnType = + SignatureBinder::tryResolveType(signature->returnType(), {}, {}), + .constantArgs = {}}; + VELOX_CHECK_NOT_NULL(callable.returnType); + + // Process each argument and figure out its type. + for (const auto& arg : signature->argumentTypes()) { + auto resolvedType = SignatureBinder::tryResolveType(arg, {}, {}); + VELOX_CHECK_NOT_NULL(resolvedType); + + // SignatureBinder::tryResolveType produces ROW types with empty + // field names. These won't work with TableScan. + if (resolvedType->isRow()) { + std::vector names; + for (auto i = 0; i < resolvedType->size(); ++i) { + names.push_back(fmt::format("field{}", i)); + } + + std::vector types = resolvedType->asRow().children(); + + resolvedType = ROW(std::move(names), std::move(types)); + } + + callable.args.emplace_back(resolvedType); + } + + signatures_.emplace_back(callable); + } + + ++functionsStats.numSupportedSignatures; + return true; +} + +void AggregationFuzzerBase::addAggregationSignatures( + const AggregateFunctionSignatureMap& signatureMap) { + for (auto& [name, signatures] : signatureMap) { + ++functionsStats.numFunctions; + bool hasSupportedSignature = false; + for (auto& signature : signatures) { + hasSupportedSignature |= addSignature(name, signature); + } + if (hasSupportedSignature) { + ++functionsStats.numSupportedFunctions; + } + } +} + +std::pair +AggregationFuzzerBase::pickSignature() { + size_t idx = boost::random::uniform_int_distribution( + 0, signatures_.size() + signatureTemplates_.size() - 1)(rng_); + CallableSignature signature; + if (idx < signatures_.size()) { + signature = signatures_[idx]; + } else { + const auto& signatureTemplate = + signatureTemplates_[idx - signatures_.size()]; + signature.name = signatureTemplate.name; + velox::test::ArgumentTypeFuzzer typeFuzzer( + *signatureTemplate.signature, rng_); + VELOX_CHECK(typeFuzzer.fuzzArgumentTypes(FLAGS_max_num_varargs)); + signature.args = typeFuzzer.argumentTypes(); + } + + return {signature, signatureStats_[idx]}; +} + +std::vector AggregationFuzzerBase::generateKeys( + const std::string& prefix, + std::vector& names, + std::vector& types) { + static const std::vector kNonFloatingPointTypes{ + BOOLEAN(), + TINYINT(), + SMALLINT(), + INTEGER(), + BIGINT(), + VARCHAR(), + VARBINARY(), + TIMESTAMP(), + }; + + auto numKeys = boost::random::uniform_int_distribution(1, 5)(rng_); + std::vector keys; + for (auto i = 0; i < numKeys; ++i) { + keys.push_back(fmt::format("{}{}", prefix, i)); + + // Pick random, possibly complex, type. + types.push_back(vectorFuzzer_.randType(kNonFloatingPointTypes, 2)); + names.push_back(keys.back()); + } + return keys; +} + +std::vector AggregationFuzzerBase::generateSortingKeys( + const std::string& prefix, + std::vector& names, + std::vector& types) { + std::vector keys; + auto numKeys = boost::random::uniform_int_distribution(1, 5)(rng_); + for (auto i = 0; i < numKeys; ++i) { + keys.push_back(fmt::format("{}{}", prefix, i)); + + // Pick random, possibly complex, type. + types.push_back(vectorFuzzer_.randOrderableType(2)); + names.push_back(keys.back()); + } + + return keys; +} + +std::shared_ptr AggregationFuzzerBase::findInputGenerator( + const CallableSignature& signature) { + auto generatorIt = customInputGenerators_.find(signature.name); + if (generatorIt != customInputGenerators_.end()) { + return generatorIt->second; + } + + return nullptr; +} + +std::vector AggregationFuzzerBase::generateInputData( + std::vector names, + std::vector types, + const std::optional& signature) { + std::shared_ptr generator; + if (signature.has_value()) { + generator = findInputGenerator(signature.value()); + } + + const auto size = vectorFuzzer_.getOptions().vectorSize; + + auto inputType = ROW(std::move(names), std::move(types)); + std::vector input; + for (auto i = 0; i < FLAGS_num_batches; ++i) { + std::vector children; + + if (generator != nullptr) { + children = generator->generate( + signature->args, vectorFuzzer_, rng_, pool_.get()); + } + + for (auto j = children.size(); j < inputType->size(); ++j) { + children.push_back(vectorFuzzer_.fuzz(inputType->childAt(j), size)); + } + + input.push_back(std::make_shared( + pool_.get(), inputType, nullptr, size, std::move(children))); + } + + if (generator != nullptr) { + generator->reset(); + } + + return input; +} + +std::vector AggregationFuzzerBase::generateInputDataWithRowNumber( + std::vector names, + std::vector types, + const CallableSignature& signature) { + names.push_back("row_number"); + types.push_back(BIGINT()); + + auto generator = findInputGenerator(signature); + + std::vector input; + auto size = vectorFuzzer_.getOptions().vectorSize; + velox::test::VectorMaker vectorMaker{pool_.get()}; + int64_t rowNumber = 0; + for (auto j = 0; j < FLAGS_num_batches; ++j) { + std::vector children; + + if (generator != nullptr) { + children = + generator->generate(signature.args, vectorFuzzer_, rng_, pool_.get()); + } + + for (auto i = children.size(); i < types.size() - 1; ++i) { + children.push_back(vectorFuzzer_.fuzz(types[i], size)); + } + children.push_back(vectorMaker.flatVector( + size, [&](auto /*row*/) { return rowNumber++; })); + input.push_back(vectorMaker.rowVector(names, children)); + } + + if (generator != nullptr) { + generator->reset(); + } + + return input; +} + +// static +exec::Split AggregationFuzzerBase::makeSplit(const std::string& filePath) { + return exec::Split{std::make_shared( + kHiveConnectorId, filePath, dwio::common::FileFormat::DWRF)}; +} + +AggregationFuzzerBase::PlanWithSplits AggregationFuzzerBase::deserialize( + const folly::dynamic& obj) { + auto plan = velox::ISerializable::deserialize( + obj["plan"], pool_.get()); + + std::vector splits; + if (obj.count("splits") > 0) { + auto paths = + ISerializable::deserialize>(obj["splits"]); + for (const auto& path : paths) { + splits.push_back(makeSplit(path)); + } + } + + return PlanWithSplits{plan, splits}; +} + +void AggregationFuzzerBase::printSignatureStats() { + if (!FLAGS_log_signature_stats) { + return; + } + + for (auto i = 0; i < signatureStats_.size(); ++i) { + const auto& stats = signatureStats_[i]; + if (stats.numRuns == 0) { + continue; + } + + if (stats.numFailed * 1.0 / stats.numRuns < 0.5) { + continue; + } + + if (i < signatures_.size()) { + LOG(INFO) << "Signature #" << i << " failed " << stats.numFailed + << " out of " << stats.numRuns + << " times: " << signatures_[i].toString(); + } else { + const auto& signatureTemplate = + signatureTemplates_[i - signatures_.size()]; + LOG(INFO) << "Signature template #" << i << " failed " << stats.numFailed + << " out of " << stats.numRuns + << " times: " << signatureTemplate.name << "(" + << signatureTemplate.signature->toString() << ")"; + } + } +} + +velox::test::ResultOrError AggregationFuzzerBase::execute( + const core::PlanNodePtr& plan, + const std::vector& splits, + bool injectSpill, + bool abandonPartial, + int32_t maxDrivers) { + LOG(INFO) << "Executing query plan: " << std::endl + << plan->toString(true, true); + + velox::test::ResultOrError resultOrError; + try { + std::shared_ptr spillDirectory; + AssertQueryBuilder builder(plan); + + builder.configs(queryConfigs_); + + if (injectSpill) { + spillDirectory = exec::test::TempDirectoryPath::create(); + builder.spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kAggregationSpillEnabled, "true") + .config(core::QueryConfig::kTestingSpillPct, "100"); + } + + if (abandonPartial) { + builder.config(core::QueryConfig::kAbandonPartialAggregationMinRows, "1") + .config(core::QueryConfig::kAbandonPartialAggregationMinPct, "0") + .config(core::QueryConfig::kMaxPartialAggregationMemory, "0") + .config(core::QueryConfig::kMaxExtendedPartialAggregationMemory, "0"); + } + + if (!splits.empty()) { + builder.splits(splits); + } + + resultOrError.result = + builder.maxDrivers(maxDrivers).copyResults(pool_.get()); + } catch (VeloxUserError& e) { + // NOTE: velox user exception is accepted as it is caused by the invalid + // fuzzer test inputs. + resultOrError.exceptionPtr = std::current_exception(); + } + + return resultOrError; +} + +std::pair< + std::optional, + AggregationFuzzerBase::ReferenceQueryErrorCode> +AggregationFuzzerBase::computeReferenceResults( + const core::PlanNodePtr& plan, + const std::vector& input) { + if (auto sql = referenceQueryRunner_->toSql(plan)) { + try { + return std::make_pair( + referenceQueryRunner_->execute( + sql.value(), input, plan->outputType()), + ReferenceQueryErrorCode::kSuccess); + } catch (std::exception& e) { + // ++stats_.numReferenceQueryFailed; + LOG(WARNING) << "Query failed in the reference DB"; + return std::make_pair( + std::nullopt, ReferenceQueryErrorCode::kReferenceQueryFail); + } + } else { + LOG(INFO) << "Query not supported by the reference DB"; + // ++stats_.numVerificationNotSupported; + } + + return std::make_pair( + std::nullopt, ReferenceQueryErrorCode::kReferenceQueryUnsupported); +} + +void AggregationFuzzerBase::testPlan( + const PlanWithSplits& planWithSplits, + bool injectSpill, + bool abandonPartial, + bool customVerification, + const std::vector>& customVerifiers, + const velox::test::ResultOrError& expected, + int32_t maxDrivers) { + auto actual = execute( + planWithSplits.plan, + planWithSplits.splits, + injectSpill, + abandonPartial, + maxDrivers); + + // Compare results or exceptions (if any). Fail is anything is different. + if (expected.exceptionPtr || actual.exceptionPtr) { + // Throws in case exceptions are not compatible. + velox::test::compareExceptions(expected.exceptionPtr, actual.exceptionPtr); + return; + } + + if (!customVerification) { + VELOX_CHECK( + assertEqualResults({expected.result}, {actual.result}), + "Logically equivalent plans produced different results"); + return; + } + + VELOX_CHECK_EQ( + expected.result->size(), + actual.result->size(), + "Logically equivalent plans produced different number of rows"); + + for (auto& verifier : customVerifiers) { + if (verifier == nullptr) { + continue; + } + + if (verifier->supportsCompare()) { + VELOX_CHECK( + verifier->compare(expected.result, actual.result), + "Logically equivalent plans produced different results"); + } else if (verifier->supportsVerify()) { + VELOX_CHECK( + verifier->verify(actual.result), + "Result of a logically equivalent plan failed custom verification"); + } else { + VELOX_UNREACHABLE( + "Custom verifier must support either 'compare' or 'verify' API."); + } + } +} + +namespace { +void writeToFile( + const std::string& path, + const VectorPtr& vector, + memory::MemoryPool* pool) { + dwrf::WriterOptions options; + options.schema = vector->type(); + options.memoryPool = pool; + auto writeFile = std::make_unique(path, true, false); + auto sink = + std::make_unique(std::move(writeFile), path); + dwrf::Writer writer(std::move(sink), options); + writer.write(vector); + writer.close(); +} +} // namespace + +bool isTableScanSupported(const TypePtr& type) { + if (type->kind() == TypeKind::ROW && type->size() == 0) { + return false; + } + if (type->kind() == TypeKind::UNKNOWN) { + return false; + } + if (type->kind() == TypeKind::HUGEINT) { + return false; + } + + for (auto i = 0; i < type->size(); ++i) { + if (!isTableScanSupported(type->childAt(i))) { + return false; + } + } + + return true; +} + +std::vector AggregationFuzzerBase::makeSplits( + const std::vector& inputs, + const std::string& path) { + std::vector splits; + auto writerPool = rootPool_->addAggregateChild("writer"); + for (auto i = 0; i < inputs.size(); ++i) { + const std::string filePath = fmt::format("{}/{}", path, i); + writeToFile(filePath, inputs[i], writerPool.get()); + splits.push_back(makeSplit(filePath)); + } + + return splits; +} + +std::string printPercentageStat(size_t n, size_t total) { + return fmt::format("{} ({:.2f}%)", n, (double)n / total * 100); +} + +void printStats(const AggregationFuzzerBase::FunctionsStats& stats) { + LOG(INFO) << fmt::format( + "Total functions: {} ({} signatures)", + stats.numFunctions, + stats.numSignatures); + LOG(INFO) << "Functions with at least one supported signature: " + << printPercentageStat( + stats.numSupportedFunctions, stats.numFunctions); + + size_t numNotSupportedFunctions = + stats.numFunctions - stats.numSupportedFunctions; + LOG(INFO) << "Functions with no supported signature: " + << printPercentageStat( + numNotSupportedFunctions, stats.numFunctions); + LOG(INFO) << "Supported function signatures: " + << printPercentageStat( + stats.numSupportedSignatures, stats.numSignatures); + + size_t numNotSupportedSignatures = + stats.numSignatures - stats.numSupportedSignatures; + LOG(INFO) << "Unsupported function signatures: " + << printPercentageStat( + numNotSupportedSignatures, stats.numSignatures); +} + +std::string makeFunctionCall( + const std::string& name, + const std::vector& argNames, + bool sortedInputs) { + std::ostringstream call; + call << name << "(" << folly::join(", ", argNames); + if (sortedInputs) { + call << " ORDER BY " << folly::join(", ", argNames); + } + call << ")"; + + return call.str(); +} + +std::vector makeNames(size_t n) { + std::vector names; + for (auto i = 0; i < n; ++i) { + names.push_back(fmt::format("c{}", i)); + } + return names; +} + +folly::dynamic serialize( + const AggregationFuzzerBase::PlanWithSplits& planWithSplits, + const std::string& dirPath, + std::unordered_map& filePaths) { + folly::dynamic obj = folly::dynamic::object(); + obj["plan"] = planWithSplits.plan->serialize(); + if (planWithSplits.splits.empty()) { + return obj; + } + + folly::dynamic jsonSplits = folly::dynamic::array(); + jsonSplits.reserve(planWithSplits.splits.size()); + for (const auto& split : planWithSplits.splits) { + const auto filePath = + std::dynamic_pointer_cast( + split.connectorSplit) + ->filePath; + if (filePaths.count(filePath) == 0) { + const auto newFilePath = fmt::format("{}/{}", dirPath, filePaths.size()); + fs::copy(filePath, newFilePath); + filePaths.insert({filePath, newFilePath}); + } + jsonSplits.push_back(filePaths.at(filePath)); + } + obj["splits"] = jsonSplits; + return obj; +} + +void persistReproInfo( + const std::vector& plans, + const std::string& basePath) { + if (!common::generateFileDirectory(basePath.c_str())) { + return; + } + + // Create a new directory + const auto dirPathOptional = + common::generateTempFolderPath(basePath.c_str(), "aggregationVerifier"); + if (!dirPathOptional.has_value()) { + LOG(ERROR) + << "Failed to create directory for persisting plans using base path: " + << basePath; + return; + } + + const auto dirPath = dirPathOptional.value(); + + // Save plans and splits. + const std::string planPath = fmt::format("{}/{}", dirPath, kPlanNodeFileName); + std::unordered_map filePaths; + try { + folly::dynamic array = folly::dynamic::array(); + array.reserve(plans.size()); + for (auto planWithSplits : plans) { + array.push_back(serialize(planWithSplits, dirPath, filePaths)); + } + auto planJson = folly::toJson(array); + saveStringToFile(planJson, planPath.c_str()); + LOG(INFO) << "Persisted aggregation plans to " << planPath; + } catch (std::exception& e) { + LOG(ERROR) << "Failed to store aggregation plans to " << planPath << ": " + << e.what(); + } +} + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.h b/velox/exec/fuzzer/AggregationFuzzerBase.h new file mode 100644 index 0000000000000..9f5c494711079 --- /dev/null +++ b/velox/exec/fuzzer/AggregationFuzzerBase.h @@ -0,0 +1,277 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/exec/Aggregate.h" +#include "velox/exec/Split.h" +#include "velox/exec/fuzzer/InputGenerator.h" +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" +#include "velox/exec/fuzzer/ResultVerifier.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/expression/tests/utils/FuzzerToolkit.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" +#include "velox/vector/tests/utils/VectorMaker.h" + +DECLARE_int32(steps); + +DECLARE_int32(duration_sec); + +DECLARE_int32(batch_size); + +DECLARE_int32(num_batches); + +DECLARE_int32(max_num_varargs); + +DECLARE_double(null_ratio); + +DECLARE_string(repro_persist_path); + +DECLARE_bool(persist_and_run_once); + +DECLARE_bool(log_signature_stats); + +namespace facebook::velox::exec::test { + +using facebook::velox::test::CallableSignature; +using facebook::velox::test::SignatureTemplate; + +constexpr const std::string_view kPlanNodeFileName = "plan_nodes"; + +class AggregationFuzzerBase { + public: + AggregationFuzzerBase( + size_t initialSeed, + const std::unordered_map>& + customVerificationFunctions, + const std::unordered_map>& + customInputGenerators, + VectorFuzzer::Options::TimestampPrecision timestampPrecision, + const std::unordered_map& queryConfigs, + std::unique_ptr referenceQueryRunner) + : customVerificationFunctions_{customVerificationFunctions}, + customInputGenerators_{customInputGenerators}, + queryConfigs_{queryConfigs}, + persistAndRunOnce_{FLAGS_persist_and_run_once}, + reproPersistPath_{FLAGS_repro_persist_path}, + referenceQueryRunner_{std::move(referenceQueryRunner)}, + vectorFuzzer_{getFuzzerOptions(timestampPrecision), pool_.get()} { + filesystems::registerLocalFileSystem(); + auto hiveConnector = + connector::getConnectorFactory( + connector::hive::HiveConnectorFactory::kHiveConnectorName) + ->newConnector( + kHiveConnectorId, std::make_shared()); + connector::registerConnector(hiveConnector); + + seed(initialSeed); + } + + struct PlanWithSplits { + core::PlanNodePtr plan; + std::vector splits; + }; + + struct FunctionsStats { + size_t numFunctions = 0; + size_t numSignatures = 0; + size_t numSupportedFunctions = 0; + size_t numSupportedSignatures = 0; + }; + + struct SignatureStats { + /// Number of times a signature was chosen. + size_t numRuns{0}; + + /// Number of times generated query plan failed. + size_t numFailed{0}; + }; + + enum ReferenceQueryErrorCode { + kSuccess, + kReferenceQueryFail, + kReferenceQueryUnsupported + }; + + protected: + static inline const std::string kHiveConnectorId = "test-hive"; + + bool addSignature( + const std::string& name, + const FunctionSignaturePtr& signature); + + void addAggregationSignatures( + const AggregateFunctionSignatureMap& signatureMap); + + std::shared_ptr findInputGenerator( + const CallableSignature& signature); + + static exec::Split makeSplit(const std::string& filePath); + + std::vector makeSplits( + const std::vector& inputs, + const std::string& path); + + PlanWithSplits deserialize(const folly::dynamic& obj); + + static VectorFuzzer::Options getFuzzerOptions( + VectorFuzzer::Options::TimestampPrecision timestampPrecision) { + VectorFuzzer::Options opts; + opts.vectorSize = FLAGS_batch_size; + opts.stringVariableLength = true; + opts.stringLength = 4'000; + opts.nullRatio = FLAGS_null_ratio; + opts.timestampPrecision = timestampPrecision; + return opts; + } + + void seed(size_t seed) { + currentSeed_ = seed; + vectorFuzzer_.reSeed(seed); + rng_.seed(currentSeed_); + } + + void reSeed() { + seed(rng_()); + } + + // Generate at least one and up to 5 scalar columns to be used as grouping, + // partition or sorting keys. + // Column names are generated using template 'N', where N is + // zero-based ordinal number of the column. + std::vector generateKeys( + const std::string& prefix, + std::vector& names, + std::vector& types); + + // Similar to generateKeys, but restricts types to orderable types (i.e. no + // maps). + std::vector generateSortingKeys( + const std::string& prefix, + std::vector& names, + std::vector& types); + + std::pair pickSignature(); + + std::vector generateInputData( + std::vector names, + std::vector types, + const std::optional& signature); + + // Generate a RowVector of the given types of children with an additional + // child named "row_number" of BIGINT row numbers that differentiates every + // row. Row numbers start from 0. This additional input vector is needed for + // result verification of window aggregations. + std::vector generateInputDataWithRowNumber( + std::vector names, + std::vector types, + const CallableSignature& signature); + + std::pair, ReferenceQueryErrorCode> + computeReferenceResults( + const core::PlanNodePtr& plan, + const std::vector& input); + + velox::test::ResultOrError execute( + const core::PlanNodePtr& plan, + const std::vector& splits = {}, + bool injectSpill = false, + bool abandonPartial = false, + int32_t maxDrivers = 2); + + // @param customVerification If false, results are compared as is. Otherwise, + // only row counts are compared. + // @param customVerifiers Custom verifier for each aggregate function. These + // can be null. If not null and customVerification is true, custom verifier is + // used to further verify the results. + void testPlan( + const PlanWithSplits& planWithSplits, + bool injectSpill, + bool abandonPartial, + bool customVerification, + const std::vector>& customVerifiers, + const velox::test::ResultOrError& expected, + int32_t maxDrivers = 2); + + void printSignatureStats(); + + const std::unordered_map> + customVerificationFunctions_; + const std::unordered_map> + customInputGenerators_; + const std::unordered_map queryConfigs_; + const bool persistAndRunOnce_; + const std::string reproPersistPath_; + + std::unique_ptr referenceQueryRunner_; + + std::vector signatures_; + std::vector signatureTemplates_; + + FunctionsStats functionsStats; + + // Stats for 'signatures_' and 'signatureTemplates_'. Stats for 'signatures_' + // come before stats for 'signatureTemplates_'. + std::vector signatureStats_; + + FuzzerGenerator rng_; + size_t currentSeed_{0}; + + std::shared_ptr rootPool_{ + memory::defaultMemoryManager().addRootPool()}; + std::shared_ptr pool_{rootPool_->addLeafChild("leaf")}; + VectorFuzzer vectorFuzzer_; +}; + +// Returns true if the elapsed time is greater than or equal to +// FLAGS_duration_sec. If FLAGS_duration_sec is 0, returns true if the +// iterations is greater than or equal to FLAGS_steps. +template +bool isDone(size_t i, T startTime) { + if (FLAGS_duration_sec > 0) { + std::chrono::duration elapsed = + std::chrono::system_clock::now() - startTime; + return elapsed.count() >= FLAGS_duration_sec; + } + return i >= FLAGS_steps; +} + +// Returns whether type is supported in TableScan. Empty Row type and Unknown +// type are not supported. +bool isTableScanSupported(const TypePtr& type); + +// Prints statistics about supported and unsupported function signatures. +void printStats(const AggregationFuzzerBase::FunctionsStats& stats); + +// Prints (n / total) in percentage format. +std::string printPercentageStat(size_t n, size_t total); + +// Make an aggregation call string for the given function name and arguments. +std::string makeFunctionCall( + const std::string& name, + const std::vector& argNames, + bool sortedInputs); + +// Returns a list of column names from c0 to cn. +std::vector makeNames(size_t n); + +// Persist plans to files under basePath. +void persistReproInfo( + const std::vector& plans, + const std::string& basePath); + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/AggregationFuzzerOptions.h b/velox/exec/fuzzer/AggregationFuzzerOptions.h new file mode 100644 index 0000000000000..c657d90dcaec8 --- /dev/null +++ b/velox/exec/fuzzer/AggregationFuzzerOptions.h @@ -0,0 +1,63 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include "velox/exec/fuzzer/InputGenerator.h" +#include "velox/exec/fuzzer/ResultVerifier.h" + +namespace facebook::velox::exec::test { + +struct AggregationFuzzerOptions { + /// Comma-separated list of functions to test. By default, all functions + /// are tested. + std::string onlyFunctions; + + /// Set of functions to not test. + std::unordered_set skipFunctions; + + /// Set of functions whose results are non-deterministic. These can be + /// order-dependent functions whose results depend on the order of input + /// rows, or functions that return complex-typed results containing + /// floating-point fields. + /// + /// For some functions, the result can be transformed to a deterministic + /// value. If such transformation exists, it can be specified to be used for + /// results verification. If no transformation is specified, results are not + /// verified. + /// + /// Keys are function names. Values are optional transformations. "{}" + /// should be used to indicate the original value, i.e. "f({})" + /// transformation applies function 'f' to aggregation result. + std::unordered_map> + customVerificationFunctions; + + std::unordered_map> + customInputGenerators; + + std::unordered_set orderDependentFunctions; + + /// Timestamp precision to use when generating inputs of type TIMESTAMP. + VectorFuzzer::Options::TimestampPrecision timestampPrecision{ + VectorFuzzer::Options::TimestampPrecision::kNanoSeconds}; + + /// A set of configuration properties to use when running query plans. + /// Could be used to specify timezone or enable/disable settings that + /// affect semantics of individual aggregate functions. + std::unordered_map queryConfigs; +}; + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/AggregationFuzzerRunner.h b/velox/exec/fuzzer/AggregationFuzzerRunner.h similarity index 55% rename from velox/exec/tests/utils/AggregationFuzzerRunner.h rename to velox/exec/fuzzer/AggregationFuzzerRunner.h index 3a06381019d30..0697576551992 100644 --- a/velox/exec/tests/utils/AggregationFuzzerRunner.h +++ b/velox/exec/fuzzer/AggregationFuzzerRunner.h @@ -24,7 +24,8 @@ #include "velox/common/file/FileSystems.h" #include "velox/exec/Aggregate.h" -#include "velox/exec/tests/utils/AggregationFuzzer.h" +#include "velox/exec/fuzzer/AggregationFuzzer.h" +#include "velox/exec/fuzzer/AggregationFuzzerOptions.h" #include "velox/parse/TypeResolver.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/vector/fuzzer/VectorFuzzer.h" @@ -65,50 +66,12 @@ namespace facebook::velox::exec::test { /// --seed 123 \ /// --v=1 \ /// --only "min,max" - class AggregationFuzzerRunner { public: - struct Options { - /// Comma-separated list of functions to test. By default, all functions - /// are tested. - std::string onlyFunctions; - - /// Set of functions to not test. - std::unordered_set skipFunctions; - - /// Set of functions whose results are non-deterministic. These can be - /// order-dependent functions whose results depend on the order of input - /// rows, or functions that return complex-typed results containing - /// floating-point fields. - /// - /// For some functions, the result can be transformed to a deterministic - /// value. If such transformation exists, it can be specified to be used for - /// results verification. If no transformation is specified, results are not - /// verified. - /// - /// Keys are function names. Values are optional transformations. "{}" - /// should be used to indicate the original value, i.e. "f({})" - /// transformation applies function 'f' to aggregation result. - std::unordered_map> - customVerificationFunctions; - - std::unordered_map> - customInputGenerators; - - /// Timestamp precision to use when generating inputs of type TIMESTAMP. - VectorFuzzer::Options::TimestampPrecision timestampPrecision{ - VectorFuzzer::Options::TimestampPrecision::kNanoSeconds}; - - /// A set of configuration properties to use when running query plans. - /// Could be used to specify timezone or enable/disable settings that - /// affect semantics of individual aggregate functions. - std::unordered_map queryConfigs; - }; - static int run( size_t seed, std::unique_ptr referenceQueryRunner, - const Options& options) { + const AggregationFuzzerOptions& options) { return runFuzzer( seed, std::nullopt, std::move(referenceQueryRunner), options); } @@ -119,19 +82,19 @@ class AggregationFuzzerRunner { return runFuzzer(0, planPath, std::move(referenceQueryRunner), {}); } - private: + protected: static int runFuzzer( size_t seed, const std::optional& planPath, std::unique_ptr referenceQueryRunner, - const Options& options) { + const AggregationFuzzerOptions& options) { auto signatures = facebook::velox::exec::getAggregateFunctionSignatures(); if (signatures.empty()) { LOG(ERROR) << "No aggregate functions registered."; exit(1); } - auto filteredSignatures = filterSignatures( + auto filteredSignatures = velox::test::filterSignatures( signatures, options.onlyFunctions, options.skipFunctions); if (filteredSignatures.empty()) { LOG(ERROR) @@ -156,51 +119,6 @@ class AggregationFuzzerRunner { // Calling gtest here so that it can be recognized as tests in CI systems. return RUN_ALL_TESTS(); } - - static std::unordered_set splitNames(const std::string& names) { - // Parse, lower case and trim it. - std::vector nameList; - folly::split(',', names, nameList); - std::unordered_set nameSet; - - for (const auto& it : nameList) { - auto str = folly::trimWhitespace(it).toString(); - folly::toLowerAscii(str); - nameSet.insert(str); - } - return nameSet; - } - - // Parse the comma separated list of function names, and use it to filter the - // input signatures. - static facebook::velox::exec::AggregateFunctionSignatureMap filterSignatures( - const facebook::velox::exec::AggregateFunctionSignatureMap& input, - const std::string& onlyFunctions, - const std::unordered_set& skipFunctions) { - if (onlyFunctions.empty() && skipFunctions.empty()) { - return input; - } - - facebook::velox::exec::AggregateFunctionSignatureMap output; - if (!onlyFunctions.empty()) { - // Parse, lower case and trim it. - auto nameSet = splitNames(onlyFunctions); - for (const auto& it : input) { - if (nameSet.count(it.first) > 0) { - output.insert(it); - } - } - } else { - output = input; - } - - for (auto s : skipFunctions) { - auto str = s; - folly::toLowerAscii(str); - output.erase(str); - } - return output; - } }; } // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/CMakeLists.txt b/velox/exec/fuzzer/CMakeLists.txt new file mode 100644 index 0000000000000..f924e8c3bcda7 --- /dev/null +++ b/velox/exec/fuzzer/CMakeLists.txt @@ -0,0 +1,47 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_library(velox_fuzzer_util DuckQueryRunner.cpp PrestoQueryRunner.cpp) + +target_link_libraries(velox_fuzzer_util velox_core velox_exec_test_lib cpr::cpr + velox_type_parser Folly::folly) + +add_library(velox_aggregation_fuzzer_base AggregationFuzzerBase.cpp) + +target_link_libraries( + velox_aggregation_fuzzer_base + velox_exec_test_lib + velox_temp_path + velox_common_base + velox_file + velox_hive_connector + velox_dwio_dwrf_reader + velox_dwio_dwrf_writer + velox_type + velox_vector_fuzzer + velox_exec_test_lib + velox_expression_test_utility + velox_vector + velox_core) + +add_library(velox_aggregation_fuzzer AggregationFuzzer.cpp) + +target_link_libraries( + velox_aggregation_fuzzer + velox_type + velox_vector_fuzzer + velox_exec_test_lib + velox_expression_test_utility + velox_aggregation_fuzzer_base + velox_fuzzer_util) diff --git a/velox/exec/tests/utils/DuckQueryRunner.cpp b/velox/exec/fuzzer/DuckQueryRunner.cpp similarity index 99% rename from velox/exec/tests/utils/DuckQueryRunner.cpp rename to velox/exec/fuzzer/DuckQueryRunner.cpp index 7d85e69162510..23b1e01dd683f 100644 --- a/velox/exec/tests/utils/DuckQueryRunner.cpp +++ b/velox/exec/fuzzer/DuckQueryRunner.cpp @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "velox/exec/tests/utils/DuckQueryRunner.h" +#include "velox/exec/fuzzer/DuckQueryRunner.h" #include "velox/exec/tests/utils/QueryAssertions.h" namespace facebook::velox::exec::test { diff --git a/velox/exec/tests/utils/DuckQueryRunner.h b/velox/exec/fuzzer/DuckQueryRunner.h similarity index 97% rename from velox/exec/tests/utils/DuckQueryRunner.h rename to velox/exec/fuzzer/DuckQueryRunner.h index 8b6a2f5375b46..a683652946a4d 100644 --- a/velox/exec/tests/utils/DuckQueryRunner.h +++ b/velox/exec/fuzzer/DuckQueryRunner.h @@ -15,7 +15,7 @@ */ #pragma once -#include "velox/exec/tests/utils/ReferenceQueryRunner.h" +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" namespace facebook::velox::exec::test { diff --git a/velox/exec/fuzzer/InputGenerator.h b/velox/exec/fuzzer/InputGenerator.h new file mode 100644 index 0000000000000..3da5f820b137a --- /dev/null +++ b/velox/exec/fuzzer/InputGenerator.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/vector/fuzzer/VectorFuzzer.h" + +namespace facebook::velox::exec::test { + +class InputGenerator { + public: + virtual ~InputGenerator() = default; + + /// Generates function inputs of the specified types. May return an empty list + /// to let Fuzzer use randomly-generated inputs. + virtual std::vector generate( + const std::vector& types, + VectorFuzzer& fuzzer, + FuzzerGenerator& rng, + memory::MemoryPool* pool) = 0; + + /// Called after generating all inputs for a given fuzzer iteration. + virtual void reset() = 0; +}; + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/PrestoQueryRunner.cpp b/velox/exec/fuzzer/PrestoQueryRunner.cpp similarity index 99% rename from velox/exec/tests/utils/PrestoQueryRunner.cpp rename to velox/exec/fuzzer/PrestoQueryRunner.cpp index b8f9493d94444..d6ea1d59bfbff 100644 --- a/velox/exec/tests/utils/PrestoQueryRunner.cpp +++ b/velox/exec/fuzzer/PrestoQueryRunner.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "velox/exec/tests/utils/PrestoQueryRunner.h" +#include "velox/exec/fuzzer/PrestoQueryRunner.h" #include #include #include diff --git a/velox/exec/tests/utils/PrestoQueryRunner.h b/velox/exec/fuzzer/PrestoQueryRunner.h similarity index 98% rename from velox/exec/tests/utils/PrestoQueryRunner.h rename to velox/exec/fuzzer/PrestoQueryRunner.h index 05ffbaa73ecde..3d50612da3603 100644 --- a/velox/exec/tests/utils/PrestoQueryRunner.h +++ b/velox/exec/fuzzer/PrestoQueryRunner.h @@ -19,7 +19,7 @@ #include #include #include "velox/common/memory/Memory.h" -#include "velox/exec/tests/utils/ReferenceQueryRunner.h" +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" #include "velox/vector/ComplexVector.h" namespace facebook::velox::exec::test { diff --git a/velox/exec/tests/utils/ReferenceQueryRunner.h b/velox/exec/fuzzer/ReferenceQueryRunner.h similarity index 100% rename from velox/exec/tests/utils/ReferenceQueryRunner.h rename to velox/exec/fuzzer/ReferenceQueryRunner.h diff --git a/velox/exec/tests/utils/AggregationFuzzer.h b/velox/exec/fuzzer/ResultVerifier.h similarity index 63% rename from velox/exec/tests/utils/AggregationFuzzer.h rename to velox/exec/fuzzer/ResultVerifier.h index 28be9e6a7109c..093fd4fe281f6 100644 --- a/velox/exec/tests/utils/AggregationFuzzer.h +++ b/velox/exec/fuzzer/ResultVerifier.h @@ -15,29 +15,12 @@ */ #pragma once -#include "velox/exec/Aggregate.h" -#include "velox/exec/tests/utils/ReferenceQueryRunner.h" -#include "velox/vector/fuzzer/VectorFuzzer.h" +#include -namespace facebook::velox::exec::test { - -static constexpr const std::string_view kPlanNodeFileName = "plan_nodes"; - -class InputGenerator { - public: - virtual ~InputGenerator() = default; +#include "velox/core/PlanNode.h" +#include "velox/vector/ComplexVector.h" - /// Generates function inputs of the specified types. May return an empty list - /// to let Fuzzer use randomly-generated inputs. - virtual std::vector generate( - const std::vector& types, - VectorFuzzer& fuzzer, - FuzzerGenerator& rng, - memory::MemoryPool* pool) = 0; - - /// Called after generating all inputs for a given fuzzer iteration. - virtual void reset() = 0; -}; +namespace facebook::velox::exec::test { /// Verifies aggregation results either directly or by comparing with results /// from a logically equivalent plan or reference DB. @@ -92,24 +75,4 @@ class ResultVerifier { virtual void reset() = 0; }; -/// Runs the aggregation fuzzer. -/// @param signatureMap Map of all aggregate function signatures. -/// @param seed Random seed - Pass the same seed for reproducibility. -/// @param orderDependentFunctions Map of functions that depend on order of -/// input. -/// @param planPath Path to persisted plan information. If this is -/// supplied, fuzzer will only verify the plans. -/// @param referenceQueryRunner Reference query runner for results -/// verification. -void aggregateFuzzer( - AggregateFunctionSignatureMap signatureMap, - size_t seed, - const std::unordered_map>& - orderDependentFunctions, - const std::unordered_map>& - customInputGenerators, - VectorFuzzer::Options::TimestampPrecision timestampPrecision, - const std::unordered_map& queryConfigs, - const std::optional& planPath, - std::unique_ptr referenceQueryRunner); } // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/TransformResultVerifier.h b/velox/exec/fuzzer/TransformResultVerifier.h new file mode 100644 index 0000000000000..15776275103b1 --- /dev/null +++ b/velox/exec/fuzzer/TransformResultVerifier.h @@ -0,0 +1,88 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "velox/core/PlanNode.h" +#include "velox/exec/fuzzer/ResultVerifier.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox::exec::test { + +// Applies specified SQL transformation to the results before comparing. For +// example, sorts an array before comparing results of array_agg. +// +// Supports 'compare' API. +class TransformResultVerifier : public ResultVerifier { + public: + // @param transform fmt::format-compatible SQL expression to use to transform + // aggregation results before comparison. The string must have a single + // placeholder for the column name that contains aggregation results. For + // example, "array_sort({})". + explicit TransformResultVerifier(const std::string& transform) + : transform_{transform} {} + + static std::shared_ptr create(const std::string& transform) { + return std::make_shared(transform); + } + + bool supportsCompare() override { + return true; + } + + bool supportsVerify() override { + return false; + } + + void initialize( + const std::vector& /*input*/, + const std::vector& groupingKeys, + const core::AggregationNode::Aggregate& /*aggregate*/, + const std::string& aggregateName) override { + projections_ = groupingKeys; + projections_.push_back( + fmt::format(fmt::runtime(transform_), aggregateName)); + } + + bool compare(const RowVectorPtr& result, const RowVectorPtr& altResult) + override { + return assertEqualResults({transform(result)}, {transform(altResult)}); + } + + bool verify(const RowVectorPtr& /*result*/) override { + VELOX_UNSUPPORTED(); + } + + void reset() override { + projections_.clear(); + } + + private: + RowVectorPtr transform(const RowVectorPtr& data) { + VELOX_CHECK(!projections_.empty()); + auto plan = PlanBuilder().values({data}).project(projections_).planNode(); + return AssertQueryBuilder(plan).copyResults(data->pool()); + } + + const std::string transform_; + + std::vector projections_; +}; + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/AggregationFuzzerTest.cpp b/velox/exec/tests/AggregationFuzzerTest.cpp index a0123261ea58c..15dd3cd31d590 100644 --- a/velox/exec/tests/AggregationFuzzerTest.cpp +++ b/velox/exec/tests/AggregationFuzzerTest.cpp @@ -20,15 +20,18 @@ #include #include -#include "velox/exec/tests/utils/AggregationFuzzerRunner.h" -#include "velox/exec/tests/utils/AssertQueryBuilder.h" -#include "velox/exec/tests/utils/DuckQueryRunner.h" -#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/fuzzer/AggregationFuzzerOptions.h" +#include "velox/exec/fuzzer/AggregationFuzzerRunner.h" +#include "velox/exec/fuzzer/DuckQueryRunner.h" +#include "velox/exec/fuzzer/TransformResultVerifier.h" #include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" +#include "velox/functions/prestosql/fuzzer/ApproxDistinctInputGenerator.h" +#include "velox/functions/prestosql/fuzzer/ApproxDistinctResultVerifier.h" +#include "velox/functions/prestosql/fuzzer/ApproxPercentileInputGenerator.h" +#include "velox/functions/prestosql/fuzzer/ApproxPercentileResultVerifier.h" +#include "velox/functions/prestosql/fuzzer/MinMaxInputGenerator.h" #include "velox/functions/prestosql/registration/RegistrationFunctions.h" #include "velox/functions/prestosql/window/WindowFunctionsRegistration.h" -#include "velox/vector/FlatVector.h" -#include "velox/vector/tests/utils/VectorMaker.h" DEFINE_int64( seed, @@ -46,207 +49,6 @@ DEFINE_string( namespace facebook::velox::exec::test { namespace { -class MinMaxInputGenerator : public InputGenerator { - public: - MinMaxInputGenerator(const std::string& name) : indexOfN_{indexOfN(name)} {} - - std::vector generate( - const std::vector& types, - VectorFuzzer& fuzzer, - FuzzerGenerator& rng, - memory::MemoryPool* pool) override { - // TODO Generate inputs free of nested nulls. - if (types.size() <= indexOfN_) { - return {}; - } - - // Make sure to use the same value of 'n' for all batches in a given Fuzzer - // iteration. - if (!n_.has_value()) { - n_ = boost::random::uniform_int_distribution(0, 9'999)(rng); - } - - const auto size = fuzzer.getOptions().vectorSize; - - std::vector inputs; - inputs.reserve(types.size()); - for (auto i = 0; i < types.size() - 1; ++i) { - inputs.push_back(fuzzer.fuzz(types[i])); - } - - VELOX_CHECK( - types.back()->isBigint(), - "Unexpected type: {}", - types.back()->toString()) - inputs.push_back( - BaseVector::createConstant(BIGINT(), n_.value(), size, pool)); - return inputs; - } - - void reset() override { - n_.reset(); - } - - private: - // Returns zero-based index of the 'n' argument, 1 for min and max. 2 for - // min_by and max_by. - static int32_t indexOfN(const std::string& name) { - if (name == "min" || name == "max") { - return 1; - } - - if (name == "min_by" || name == "max_by") { - return 2; - } - - VELOX_FAIL("Unexpected function name: {}", name) - } - - // Zero-based index of the 'n' argument. - const int32_t indexOfN_; - std::optional n_; -}; - -class ApproxDistinctInputGenerator : public InputGenerator { - public: - std::vector generate( - const std::vector& types, - VectorFuzzer& fuzzer, - FuzzerGenerator& rng, - memory::MemoryPool* pool) override { - if (types.size() != 2) { - return {}; - } - - // Make sure to use the same value of 'e' for all batches in a given Fuzzer - // iteration. - if (!e_.has_value()) { - // Generate value in [0.0040625, 0.26] range. - static constexpr double kMin = 0.0040625; - static constexpr double kMax = 0.26; - e_ = kMin + (kMax - kMin) * boost::random::uniform_01()(rng); - } - - const auto size = fuzzer.getOptions().vectorSize; - - VELOX_CHECK( - types.back()->isDouble(), - "Unexpected type: {}", - types.back()->toString()) - return { - fuzzer.fuzz(types[0]), - BaseVector::createConstant(DOUBLE(), e_.value(), size, pool)}; - } - - void reset() override { - e_.reset(); - } - - private: - std::optional e_; -}; - -class ApproxPercentileInputGenerator : public InputGenerator { - public: - std::vector generate( - const std::vector& types, - VectorFuzzer& fuzzer, - FuzzerGenerator& rng, - memory::MemoryPool* pool) override { - // The arguments are: x, [w], percentile(s), [accuracy]. - // - // First argument is always 'x'. If second argument's type is BIGINT, then - // it is 'w'. Otherwise, it is percentile(x). - - const auto size = fuzzer.getOptions().vectorSize; - - std::vector inputs; - inputs.reserve(types.size()); - inputs.push_back(fuzzer.fuzz(types[0])); - - if (types[1]->isBigint()) { - velox::test::VectorMaker vectorMaker{pool}; - auto weight = vectorMaker.flatVector(size, [&](auto row) { - return boost::random::uniform_int_distribution(1, 1'000)(rng); - }); - - inputs.push_back(weight); - } - - const int percentileTypeIndex = types[1]->isBigint() ? 2 : 1; - const TypePtr& percentileType = types[percentileTypeIndex]; - if (percentileType->isDouble()) { - if (!percentile_.has_value()) { - percentile_ = pickPercentile(fuzzer, rng); - } - - inputs.push_back(BaseVector::createConstant( - DOUBLE(), percentile_.value(), size, pool)); - } else { - VELOX_CHECK(percentileType->isArray()); - VELOX_CHECK(percentileType->childAt(0)->isDouble()); - - if (percentiles_.empty()) { - percentiles_.push_back(pickPercentile(fuzzer, rng)); - percentiles_.push_back(pickPercentile(fuzzer, rng)); - percentiles_.push_back(pickPercentile(fuzzer, rng)); - } - - auto arrayVector = - BaseVector::create(ARRAY(DOUBLE()), 1, pool); - auto elementsVector = arrayVector->elements()->asFlatVector(); - elementsVector->resize(percentiles_.size()); - for (auto i = 0; i < percentiles_.size(); ++i) { - elementsVector->set(i, percentiles_[i]); - } - arrayVector->setOffsetAndSize(0, 0, percentiles_.size()); - - inputs.push_back(BaseVector::wrapInConstant(size, 0, arrayVector)); - } - - if (types.size() > percentileTypeIndex + 1) { - // Last argument is 'accuracy'. - VELOX_CHECK(types.back()->isDouble()); - if (!accuracy_.has_value()) { - accuracy_ = boost::random::uniform_01()(rng); - } - - inputs.push_back( - BaseVector::createConstant(DOUBLE(), accuracy_.value(), size, pool)); - } - - return inputs; - } - - void reset() override { - percentile_.reset(); - percentiles_.clear(); - accuracy_.reset(); - } - - private: - double pickPercentile(VectorFuzzer& fuzzer, FuzzerGenerator& rng) { - // 10% of the times generate random value in [0, 1] range. - // 90% of the times use one of the common values. - if (fuzzer.coinToss(0.1)) { - return boost::random::uniform_01()(rng); - } - - static const std::vector kPercentiles = { - 0.1, 0.25, 0.5, 0.75, 0.90, 0.95, 0.99, 0.999, 0.9999}; - - const auto index = - boost::random::uniform_int_distribution()(rng) % - kPercentiles.size(); - - return kPercentiles[index]; - } - - std::optional percentile_; - std::vector percentiles_; - std::optional accuracy_; -}; - std::unordered_map> getCustomInputGenerators() { return { @@ -260,562 +62,6 @@ getCustomInputGenerators() { }; } -// Applies specified SQL transformation to the results before comparing. For -// example, sorts an array before comparing results of array_agg. -// -// Supports 'compare' API. -class TransformResultVerifier : public ResultVerifier { - public: - // @param transform fmt::format-compatible SQL expression to use to transform - // aggregation results before comparison. The string must have a single - // placeholder for the column name that contains aggregation results. For - // example, "array_sort({})". - explicit TransformResultVerifier(const std::string& transform) - : transform_{transform} {} - - static std::shared_ptr create(const std::string& transform) { - return std::make_shared(transform); - } - - bool supportsCompare() override { - return true; - } - - bool supportsVerify() override { - return false; - } - - void initialize( - const std::vector& /*input*/, - const std::vector& groupingKeys, - const core::AggregationNode::Aggregate& /*aggregate*/, - const std::string& aggregateName) override { - projections_ = groupingKeys; - projections_.push_back( - fmt::format(fmt::runtime(transform_), aggregateName)); - } - - bool compare(const RowVectorPtr& result, const RowVectorPtr& altResult) - override { - return assertEqualResults({transform(result)}, {transform(altResult)}); - } - - bool verify(const RowVectorPtr& /*result*/) override { - VELOX_UNSUPPORTED(); - } - - void reset() override { - projections_.clear(); - } - - private: - RowVectorPtr transform(const RowVectorPtr& data) { - VELOX_CHECK(!projections_.empty()); - auto plan = PlanBuilder().values({data}).project(projections_).planNode(); - return AssertQueryBuilder(plan).copyResults(data->pool()); - } - - const std::string transform_; - - std::vector projections_; -}; - -// Compares results of approx_distinct(x[, e]) with count(distinct x). -// For each group calculates the difference between 2 values and counts number -// of groups where difference is > 2e. If total number of groups is >= 50, -// allows 2 groups > 2e. If number of groups is small (< 50), -// expects all groups to be under 2e. -class ApproxDistinctResultVerifier : public ResultVerifier { - public: - bool supportsCompare() override { - return false; - } - - bool supportsVerify() override { - return true; - } - - // Compute count(distinct x) over 'input'. - void initialize( - const std::vector& input, - const std::vector& groupingKeys, - const core::AggregationNode::Aggregate& aggregate, - const std::string& aggregateName) override { - auto plan = - PlanBuilder() - .values(input) - .singleAggregation(groupingKeys, {makeCountDistinctCall(aggregate)}) - .planNode(); - - expected_ = AssertQueryBuilder(plan).copyResults(input[0]->pool()); - groupingKeys_ = groupingKeys; - name_ = aggregateName; - error_ = extractError(aggregate, input[0]); - } - - bool compare( - const RowVectorPtr& /*result*/, - const RowVectorPtr& /*altResult*/) override { - VELOX_UNSUPPORTED(); - } - - bool verify(const RowVectorPtr& result) override { - // Union 'result' with 'expected_', group by on 'groupingKeys_' and produce - // pairs of actual and expected values per group. We cannot use join because - // grouping keys may have nulls. - auto planNodeIdGenerator = std::make_shared(); - auto expectedSource = PlanBuilder(planNodeIdGenerator) - .values({expected_}) - .appendColumns({"'expected' as label"}) - .planNode(); - - auto actualSource = PlanBuilder(planNodeIdGenerator) - .values({result}) - .appendColumns({"'actual' as label"}) - .planNode(); - - auto mapAgg = fmt::format("map_agg(label, {}) as m", name_); - auto plan = PlanBuilder(planNodeIdGenerator) - .localPartition({}, {expectedSource, actualSource}) - .singleAggregation(groupingKeys_, {mapAgg}) - .project({"m['actual'] as a", "m['expected'] as e"}) - .planNode(); - auto combined = AssertQueryBuilder(plan).copyResults(result->pool()); - - auto* actual = combined->childAt(0)->as>(); - auto* expected = combined->childAt(1)->as>(); - - const auto numGroups = result->size(); - VELOX_CHECK_EQ(numGroups, combined->size()); - - std::vector largeGaps; - for (auto i = 0; i < numGroups; ++i) { - VELOX_CHECK(!actual->isNullAt(i)) - VELOX_CHECK(!expected->isNullAt(i)) - - const auto actualCnt = actual->valueAt(i); - const auto expectedCnt = expected->valueAt(i); - if (actualCnt != expectedCnt) { - if (expectedCnt > 0) { - const auto gap = - std::abs(actualCnt - expectedCnt) * 1.0 / expectedCnt; - if (gap > 2 * error_) { - largeGaps.push_back(gap); - LOG(ERROR) << fmt::format( - "approx_distinct(x, {}) is more than 2 stddev away from " - "count(distinct x). Difference: {}, approx_distinct: {}, " - "count(distinct): {}. This is unusual, but doesn't necessarily " - "indicate a bug.", - error_, - gap, - actualCnt, - expectedCnt); - } - } else { - LOG(ERROR) << fmt::format( - "count(distinct x) returned 0, but approx_distinct(x, {}) is {}", - error_, - actualCnt); - return false; - } - } - } - - // We expect large deviations (>2 stddev) in < 5% of values. - if (numGroups >= 50) { - return largeGaps.size() <= 3; - } - - return largeGaps.empty(); - } - - void reset() override { - expected_.reset(); - } - - private: - static constexpr double kDefaultError = 0.023; - - static double extractError( - const core::AggregationNode::Aggregate& aggregate, - const RowVectorPtr& input) { - const auto& args = aggregate.call->inputs(); - - if (args.size() == 1) { - return kDefaultError; - } - - auto field = core::TypedExprs::asFieldAccess(args[1]); - VELOX_CHECK_NOT_NULL(field); - auto errorVector = - input->childAt(field->name())->as>(); - return errorVector->valueAt(0); - } - - static std::string makeCountDistinctCall( - const core::AggregationNode::Aggregate& aggregate) { - const auto& args = aggregate.call->inputs(); - VELOX_CHECK_GE(args.size(), 1) - - auto inputField = core::TypedExprs::asFieldAccess(args[0]); - VELOX_CHECK_NOT_NULL(inputField) - - std::string countDistinctCall = - fmt::format("count(distinct {})", inputField->name()); - - if (aggregate.mask != nullptr) { - countDistinctCall += - fmt::format(" filter (where {})", aggregate.mask->name()); - } - - return countDistinctCall; - } - - RowVectorPtr expected_; - std::vector groupingKeys_; - std::string name_; - double error_; -}; - -// Verifies results of approx_percentile by checking the range of percentiles -// represented by the result value and asserting that the requested percentile -// falls into that range within 'accuracy'. -class ApproxPercentileResultVerifier : public ResultVerifier { - public: - bool supportsCompare() override { - return false; - } - - bool supportsVerify() override { - return true; - } - - // Compute the range of percentiles represented by each of the input values. - void initialize( - const std::vector& input, - const std::vector& groupingKeys, - const core::AggregationNode::Aggregate& aggregate, - const std::string& aggregateName) override { - VELOX_CHECK(!input.empty()); - - int64_t numInputs = 0; - for (const auto& v : input) { - numInputs += v->size(); - } - - const auto& args = aggregate.call->inputs(); - const auto& valueField = fieldName(args[0]); - std::optional weightField; - if (args.size() >= 3 && args[1]->type()->isBigint()) { - weightField = fieldName(args[1]); - } - - groupingKeys_ = groupingKeys; - name_ = aggregateName; - - percentiles_ = extractPercentiles(input, aggregate); - VELOX_CHECK(!percentiles_.empty()); - - accuracy_ = extractAccuracy(aggregate, input[0]); - - // Compute percentiles for all values. - allRanges_ = - computePercentiles(input, valueField, weightField, aggregate.mask); - VELOX_CHECK_LE(allRanges_->size(), numInputs); - } - - bool compare( - const RowVectorPtr& /*result*/, - const RowVectorPtr& /*altResult*/) override { - VELOX_UNSUPPORTED(); - } - - bool verify(const RowVectorPtr& result) override { - // Compute acceptable ranges of percentiles for each value in 'result'. - auto ranges = getPercentileRanges(result); - // VELOX_CHECK_EQ(ranges->size(), result->size() * percentiles_.size()); - - auto& value = ranges->childAt(name_); - auto* minPct = ranges->childAt("min_pct")->as>(); - auto* maxPct = ranges->childAt("max_pct")->as>(); - auto* pctIndex = ranges->childAt("pct_index")->as>(); - - for (auto i = 0; i < ranges->size(); ++i) { - if (value->isNullAt(i)) { - VELOX_CHECK(minPct->isNullAt(i)); - VELOX_CHECK(maxPct->isNullAt(i)); - continue; - } - - VELOX_CHECK(!minPct->isNullAt(i)); - VELOX_CHECK(!maxPct->isNullAt(i)); - VELOX_CHECK(!pctIndex->isNullAt(i)); - - const auto pct = percentiles_[pctIndex->valueAt(i)]; - - std::pair range{minPct->valueAt(i), maxPct->valueAt(i)}; - if (!checkPercentileGap(pct, range, accuracy_)) { - return false; - } - } - - return true; - } - - void reset() override { - allRanges_.reset(); - } - - private: - static constexpr double kDefaultAccuracy = 0.0133; - - static double extractAccuracy( - const core::AggregationNode::Aggregate& aggregate, - const RowVectorPtr& input) { - const auto& args = aggregate.call->inputs(); - - column_index_t accuracyIndex = 2; - if (args.size() >= 3 && args[1]->type()->isBigint()) { - // We have a 'w' argument. - accuracyIndex = 3; - } - - if (args.size() <= accuracyIndex) { - return kDefaultAccuracy; - } - - auto field = core::TypedExprs::asFieldAccess(args[accuracyIndex]); - VELOX_CHECK_NOT_NULL(field); - auto accuracyVector = - input->childAt(field->name())->as>(); - return accuracyVector->valueAt(0); - } - - static bool checkPercentileGap( - double pct, - const std::pair& range, - double accuracy) { - double gap = 0.0; - if (pct < range.first) { - gap = range.first - pct; - } else if (pct > range.second) { - gap = pct - range.second; - } - - if (gap > accuracy) { - LOG(ERROR) << "approx_percentile(pct: " << pct - << ", accuracy: " << accuracy << ") is more than " << accuracy - << " away from acceptable range of [" << range.first << ", " - << range.second << "]. Difference: " << gap; - return false; - } - - return true; - } - - static std::vector append( - const std::vector& values, - const std::vector& newValues) { - auto combined = values; - combined.insert(combined.end(), newValues.begin(), newValues.end()); - return combined; - } - - // Groups input by 'groupingKeys_'. Within each group, sorts data on - // 'valueField', duplicates rows according to optional weight, filters out - // NULLs and rows where mask is not true, then computes ranges of row numbers - // and turns these into ranges of percentiles. - // - // @return A vector of grouping keys, followed by value column named 'name_', - // followed by min_pct and max_pct columns. - RowVectorPtr computePercentiles( - const std::vector& input, - const std::string& valueField, - const std::optional& weightField, - const core::FieldAccessTypedExprPtr& mask) { - VELOX_CHECK(!input.empty()) - const auto rowType = asRowType(input[0]->type()); - - const bool weighted = weightField.has_value(); - - std::vector projections = groupingKeys_; - projections.push_back(fmt::format("{} as x", valueField)); - if (weighted) { - projections.push_back(fmt::format("{} as w", weightField.value())); - } - - PlanBuilder planBuilder; - planBuilder.values(input); - - if (mask != nullptr) { - planBuilder.filter(mask->name()); - } - - planBuilder.project(projections).filter("x IS NOT NULL"); - - if (weighted) { - planBuilder.appendColumns({"sequence(1, w) as repeats"}) - .unnest(append(groupingKeys_, {"x"}), {"repeats"}); - } - - std::string partitionByClause; - if (!groupingKeys_.empty()) { - partitionByClause = - fmt::format("partition by {}", folly::join(", ", groupingKeys_)); - } - - std::vector windowCalls = { - fmt::format( - "row_number() OVER ({} order by x) as rn", partitionByClause), - fmt::format( - "count(1) OVER ({} order by x range between unbounded preceding and unbounded following) " - "as total", - partitionByClause), - }; - - planBuilder.window(windowCalls) - .appendColumns({ - "(rn::double - 1.0) / total::double as lower", - "rn::double / total::double as upper", - }) - .singleAggregation( - append(groupingKeys_, {"x"}), - {"min(lower) as min_pct", "max(upper) as max_pct"}) - .project(append( - groupingKeys_, - {fmt::format("x as {}", name_), "min_pct", "max_pct"})); - - auto plan = planBuilder.planNode(); - return AssertQueryBuilder(plan).copyResults(input[0]->pool()); - } - - static const std::string& fieldName(const core::TypedExprPtr& expression) { - auto field = core::TypedExprs::asFieldAccess(expression); - VELOX_CHECK_NOT_NULL(field); - return field->name(); - } - - // Extract 'percentile' argument. - static std::vector extractPercentiles( - const std::vector& input, - const core::AggregationNode::Aggregate& aggregate) { - const auto args = aggregate.call->inputs(); - column_index_t percentileIndex = 1; - if (args.size() >= 3 && args[1]->type()->isBigint()) { - percentileIndex = 2; - } - - const auto& percentileExpr = args[percentileIndex]; - - if (auto constantExpr = core::TypedExprs::asConstant(percentileExpr)) { - if (constantExpr->type()->isDouble()) { - return {constantExpr->value().value()}; - } - - return toList(constantExpr->valueVector()); - } - - const auto& percentileVector = input[0]->childAt(fieldName(percentileExpr)); - - if (percentileVector->type()->isDouble()) { - VELOX_CHECK(!percentileVector->isNullAt(0)); - return {percentileVector->as>()->valueAt(0)}; - } - - return toList(percentileVector); - } - - static std::vector toList(const VectorPtr& vector) { - VELOX_CHECK(vector->type()->equivalent(*ARRAY(DOUBLE()))); - - DecodedVector decoded(*vector); - auto arrayVector = decoded.base()->as(); - - VELOX_CHECK(!decoded.isNullAt(0)); - const auto offset = arrayVector->offsetAt(decoded.index(0)); - const auto size = arrayVector->sizeAt(decoded.index(0)); - - auto* elementsVector = arrayVector->elements()->as>(); - - std::vector percentiles; - percentiles.reserve(size); - for (auto i = 0; i < size; ++i) { - VELOX_CHECK(!elementsVector->isNullAt(offset + i)); - percentiles.push_back(elementsVector->valueAt(offset + i)); - } - return percentiles; - } - - // For each row ([k1, k2,] x) in 'result', lookup min_pct and max_pct in - // 'allRanges_'. Return a vector of ([k1, k2,] x, min_pct, max_pct) rows. - RowVectorPtr getPercentileRanges(const RowVectorPtr& result) { - auto planNodeIdGenerator = std::make_shared(); - - core::PlanNodePtr expectedSource; - core::PlanNodePtr actualSource; - if (result->childAt(name_)->type()->isArray()) { - expectedSource = - PlanBuilder(planNodeIdGenerator) - .values({allRanges_}) - .appendColumns({fmt::format( - "sequence(0, {}) as s", percentiles_.size() - 1)}) - .unnest( - append(groupingKeys_, {name_, "min_pct", "max_pct"}), - {"s"}, - "pct_index") - .project(append( - groupingKeys_, {name_, "min_pct", "max_pct", "pct_index"})) - .planNode(); - - actualSource = PlanBuilder(planNodeIdGenerator) - .values({result}) - .unnest(groupingKeys_, {name_}, "pct_index") - .project(append( - groupingKeys_, - { - fmt::format("{}_e as {}", name_, name_), - "null::double as min_pct", - "null::double as max_pct", - "pct_index - 1 as pct_index", - })) - .planNode(); - } else { - expectedSource = PlanBuilder(planNodeIdGenerator) - .values({allRanges_}) - .appendColumns({"0 as pct_index"}) - .planNode(); - - actualSource = PlanBuilder(planNodeIdGenerator) - .values({result}) - .appendColumns({ - "null::double as min_pct", - "null::double as max_pct", - "0 as pct_index", - }) - .planNode(); - } - - auto plan = PlanBuilder(planNodeIdGenerator) - .localPartition({}, {expectedSource, actualSource}) - .singleAggregation( - append(groupingKeys_, {name_, "pct_index"}), - { - "count(1) as cnt", - "arbitrary(min_pct) as min_pct", - "arbitrary(max_pct) as max_pct", - }) - .filter({"cnt = 2"}) - .planNode(); - return AssertQueryBuilder(plan).copyResults(result->pool()); - } - - std::vector groupingKeys_; - std::string name_; - std::vector percentiles_; - double accuracy_; - RowVectorPtr allRanges_; -}; - } // namespace } // namespace facebook::velox::exec::test @@ -905,8 +151,9 @@ int main(int argc, char** argv) { }; using Runner = facebook::velox::exec::test::AggregationFuzzerRunner; + using Options = facebook::velox::exec::test::AggregationFuzzerOptions; - Runner::Options options; + Options options; options.onlyFunctions = FLAGS_only; options.skipFunctions = skipFunctions; options.customVerificationFunctions = customVerificationFunctions; diff --git a/velox/exec/tests/AggregationRunnerTest.cpp b/velox/exec/tests/AggregationRunnerTest.cpp index 6496f11c50dec..fdfb69a59c389 100644 --- a/velox/exec/tests/AggregationRunnerTest.cpp +++ b/velox/exec/tests/AggregationRunnerTest.cpp @@ -20,8 +20,8 @@ #include #include #include "velox/common/base/Fs.h" -#include "velox/exec/tests/utils/AggregationFuzzerRunner.h" -#include "velox/exec/tests/utils/DuckQueryRunner.h" +#include "velox/exec/fuzzer/AggregationFuzzerRunner.h" +#include "velox/exec/fuzzer/DuckQueryRunner.h" #include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" #include "velox/functions/prestosql/registration/RegistrationFunctions.h" diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 5c15a13d09e9d..e03e099888d50 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -156,6 +156,7 @@ target_link_libraries( velox_dwio_common_test_utils velox_exec velox_exec_test_lib + velox_fuzzer_util velox_functions_lib velox_functions_prestosql velox_functions_test_lib @@ -204,9 +205,9 @@ add_executable(velox_aggregation_fuzzer_test AggregationFuzzerTest.cpp) target_link_libraries( velox_aggregation_fuzzer_test velox_aggregation_fuzzer + velox_aggregation_fuzzer_base velox_aggregates velox_window - velox_vector_test_lib gtest gtest_main) @@ -215,7 +216,9 @@ add_executable(spark_aggregation_fuzzer_test SparkAggregationFuzzerTest.cpp) target_link_libraries( spark_aggregation_fuzzer_test velox_aggregation_fuzzer + velox_aggregation_fuzzer_base velox_functions_spark_aggregates + velox_fuzzer_util velox_window velox_vector_test_lib gtest @@ -246,8 +249,14 @@ target_link_libraries( add_executable(velox_aggregation_runner_test AggregationRunnerTest.cpp) -target_link_libraries(velox_aggregation_runner_test velox_aggregation_fuzzer - velox_aggregates velox_vector_test_lib gtest gtest_main) +target_link_libraries( + velox_aggregation_runner_test + velox_aggregation_fuzzer + velox_fuzzer_util + velox_aggregates + velox_vector_test_lib + gtest + gtest_main) add_library(velox_simple_aggregate SimpleAverageAggregate.cpp SimpleArrayAggAggregate.cpp) diff --git a/velox/exec/tests/PrestoQueryRunnerTest.cpp b/velox/exec/tests/PrestoQueryRunnerTest.cpp index 27c95fa637e9c..959e272b01293 100644 --- a/velox/exec/tests/PrestoQueryRunnerTest.cpp +++ b/velox/exec/tests/PrestoQueryRunnerTest.cpp @@ -16,9 +16,9 @@ #include +#include "velox/exec/fuzzer/PrestoQueryRunner.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" -#include "velox/exec/tests/utils/PrestoQueryRunner.h" #include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" #include "velox/functions/prestosql/registration/RegistrationFunctions.h" #include "velox/parse/TypeResolver.h" diff --git a/velox/exec/tests/SparkAggregationFuzzerTest.cpp b/velox/exec/tests/SparkAggregationFuzzerTest.cpp index dbcdfc2b9958e..114eb72e41a89 100644 --- a/velox/exec/tests/SparkAggregationFuzzerTest.cpp +++ b/velox/exec/tests/SparkAggregationFuzzerTest.cpp @@ -19,8 +19,9 @@ #include #include -#include "velox/exec/tests/utils/AggregationFuzzerRunner.h" -#include "velox/exec/tests/utils/DuckQueryRunner.h" +#include "velox/exec/fuzzer/AggregationFuzzerOptions.h" +#include "velox/exec/fuzzer/AggregationFuzzerRunner.h" +#include "velox/exec/fuzzer/DuckQueryRunner.h" #include "velox/functions/sparksql/aggregates/Register.h" DEFINE_int64( @@ -78,8 +79,9 @@ int main(int argc, char** argv) { }); using Runner = facebook::velox::exec::test::AggregationFuzzerRunner; + using Options = facebook::velox::exec::test::AggregationFuzzerOptions; - Runner::Options options; + Options options; options.onlyFunctions = FLAGS_only; options.skipFunctions = skipFunctions; options.customVerificationFunctions = customVerificationFunctions; diff --git a/velox/exec/tests/utils/CMakeLists.txt b/velox/exec/tests/utils/CMakeLists.txt index 6e409defbd3b7..75cc5bc080dae 100644 --- a/velox/exec/tests/utils/CMakeLists.txt +++ b/velox/exec/tests/utils/CMakeLists.txt @@ -20,7 +20,6 @@ add_library( velox_exec_test_lib AssertQueryBuilder.cpp Cursor.cpp - DuckQueryRunner.cpp HiveConnectorTestBase.cpp LocalExchangeSource.cpp OperatorTestBase.cpp @@ -29,8 +28,7 @@ add_library( SumNonPODAggregate.cpp TpchQueryBuilder.cpp VectorTestUtil.cpp - PortUtil.cpp - PrestoQueryRunner.cpp) + PortUtil.cpp) target_link_libraries( velox_exec_test_lib @@ -50,12 +48,4 @@ target_link_libraries( velox_tpch_connector velox_presto_serializer velox_functions_prestosql - velox_aggregates - cpr::cpr - velox_type_parser - Folly::folly) - -add_library(velox_aggregation_fuzzer AggregationFuzzer.cpp) - -target_link_libraries(velox_aggregation_fuzzer velox_type velox_vector_fuzzer - velox_exec_test_lib velox_expression_test_utility) + velox_aggregates) diff --git a/velox/expression/tests/utils/FuzzerToolkit.h b/velox/expression/tests/utils/FuzzerToolkit.h index 2644cdb51688d..019395da2a48d 100644 --- a/velox/expression/tests/utils/FuzzerToolkit.h +++ b/velox/expression/tests/utils/FuzzerToolkit.h @@ -57,4 +57,49 @@ void sortSignatureTemplates(std::vector& signatures); void compareExceptions( std::exception_ptr exceptionPtr, std::exception_ptr otherExceptionPr); + +/// Parse the comma separated list of function names, and use it to filter the +/// input signatures. Return a signature map that (1) only include functions +/// appearing in onlyFunctions if onlyFunctions is non-empty, and (2) not +/// include any functions appearing in skipFunctions if skipFunctions is +/// non-empty. +/// @tparam SignatureMapType can be AggregateFunctionSignatureMap or +/// WindowFunctionMap. +template +SignatureMapType filterSignatures( + const SignatureMapType& input, + const std::string& onlyFunctions, + const std::unordered_set& skipFunctions) { + if (onlyFunctions.empty() && skipFunctions.empty()) { + return input; + } + + SignatureMapType output; + if (!onlyFunctions.empty()) { + // Parse, lower case and trim it. + std::vector nameList; + folly::split(',', onlyFunctions, nameList); + std::unordered_set nameSet; + for (const auto& it : nameList) { + auto str = folly::trimWhitespace(it).toString(); + folly::toLowerAscii(str); + nameSet.insert(str); + } + + for (const auto& it : input) { + if (nameSet.count(it.first) > 0) { + output.insert(it); + } + } + } else { + output = input; + } + + for (auto s : skipFunctions) { + auto str = s; + folly::toLowerAscii(str); + output.erase(str); + } + return output; +} } // namespace facebook::velox::test diff --git a/velox/functions/prestosql/fuzzer/ApproxDistinctInputGenerator.h b/velox/functions/prestosql/fuzzer/ApproxDistinctInputGenerator.h new file mode 100644 index 0000000000000..eba4733d319b4 --- /dev/null +++ b/velox/functions/prestosql/fuzzer/ApproxDistinctInputGenerator.h @@ -0,0 +1,64 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include "velox/exec/fuzzer/InputGenerator.h" + +namespace facebook::velox::exec::test { + +class ApproxDistinctInputGenerator : public InputGenerator { + public: + std::vector generate( + const std::vector& types, + VectorFuzzer& fuzzer, + FuzzerGenerator& rng, + memory::MemoryPool* pool) override { + if (types.size() != 2) { + return {}; + } + + // Make sure to use the same value of 'e' for all batches in a given Fuzzer + // iteration. + if (!e_.has_value()) { + // Generate value in [0.0040625, 0.26] range. + static constexpr double kMin = 0.0040625; + static constexpr double kMax = 0.26; + e_ = kMin + (kMax - kMin) * boost::random::uniform_01()(rng); + } + + const auto size = fuzzer.getOptions().vectorSize; + + VELOX_CHECK( + types.back()->isDouble(), + "Unexpected type: {}", + types.back()->toString()) + return { + fuzzer.fuzz(types[0]), + BaseVector::createConstant(DOUBLE(), e_.value(), size, pool)}; + } + + void reset() override { + e_.reset(); + } + + private: + std::optional e_; +}; + +} // namespace facebook::velox::exec::test diff --git a/velox/functions/prestosql/fuzzer/ApproxDistinctResultVerifier.h b/velox/functions/prestosql/fuzzer/ApproxDistinctResultVerifier.h new file mode 100644 index 0000000000000..36f40dc2786fa --- /dev/null +++ b/velox/functions/prestosql/fuzzer/ApproxDistinctResultVerifier.h @@ -0,0 +1,185 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "velox/core/PlanNode.h" +#include "velox/exec/fuzzer/ResultVerifier.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox::exec::test { + +// Compares results of approx_distinct(x[, e]) with count(distinct x). +// For each group calculates the difference between 2 values and counts number +// of groups where difference is > 2e. If total number of groups is >= 50, +// allows 2 groups > 2e. If number of groups is small (< 50), +// expects all groups to be under 2e. +class ApproxDistinctResultVerifier : public ResultVerifier { + public: + bool supportsCompare() override { + return false; + } + + bool supportsVerify() override { + return true; + } + + // Compute count(distinct x) over 'input'. + void initialize( + const std::vector& input, + const std::vector& groupingKeys, + const core::AggregationNode::Aggregate& aggregate, + const std::string& aggregateName) override { + auto plan = + PlanBuilder() + .values(input) + .singleAggregation(groupingKeys, {makeCountDistinctCall(aggregate)}) + .planNode(); + + expected_ = AssertQueryBuilder(plan).copyResults(input[0]->pool()); + groupingKeys_ = groupingKeys; + name_ = aggregateName; + error_ = extractError(aggregate, input[0]); + } + + bool compare( + const RowVectorPtr& /*result*/, + const RowVectorPtr& /*altResult*/) override { + VELOX_UNSUPPORTED(); + } + + bool verify(const RowVectorPtr& result) override { + // Union 'result' with 'expected_', group by on 'groupingKeys_' and produce + // pairs of actual and expected values per group. We cannot use join because + // grouping keys may have nulls. + auto planNodeIdGenerator = std::make_shared(); + auto expectedSource = PlanBuilder(planNodeIdGenerator) + .values({expected_}) + .appendColumns({"'expected' as label"}) + .planNode(); + + auto actualSource = PlanBuilder(planNodeIdGenerator) + .values({result}) + .appendColumns({"'actual' as label"}) + .planNode(); + + auto mapAgg = fmt::format("map_agg(label, {}) as m", name_); + auto plan = PlanBuilder(planNodeIdGenerator) + .localPartition({}, {expectedSource, actualSource}) + .singleAggregation(groupingKeys_, {mapAgg}) + .project({"m['actual'] as a", "m['expected'] as e"}) + .planNode(); + auto combined = AssertQueryBuilder(plan).copyResults(result->pool()); + + auto* actual = combined->childAt(0)->as>(); + auto* expected = combined->childAt(1)->as>(); + + const auto numGroups = result->size(); + VELOX_CHECK_EQ(numGroups, combined->size()); + + std::vector largeGaps; + for (auto i = 0; i < numGroups; ++i) { + VELOX_CHECK(!actual->isNullAt(i)) + VELOX_CHECK(!expected->isNullAt(i)) + + const auto actualCnt = actual->valueAt(i); + const auto expectedCnt = expected->valueAt(i); + if (actualCnt != expectedCnt) { + if (expectedCnt > 0) { + const auto gap = + std::abs(actualCnt - expectedCnt) * 1.0 / expectedCnt; + if (gap > 2 * error_) { + largeGaps.push_back(gap); + LOG(ERROR) << fmt::format( + "approx_distinct(x, {}) is more than 2 stddev away from " + "count(distinct x). Difference: {}, approx_distinct: {}, " + "count(distinct): {}. This is unusual, but doesn't necessarily " + "indicate a bug.", + error_, + gap, + actualCnt, + expectedCnt); + } + } else { + LOG(ERROR) << fmt::format( + "count(distinct x) returned 0, but approx_distinct(x, {}) is {}", + error_, + actualCnt); + return false; + } + } + } + + // We expect large deviations (>2 stddev) in < 5% of values. + if (numGroups >= 50) { + return largeGaps.size() <= 3; + } + + return largeGaps.empty(); + } + + void reset() override { + expected_.reset(); + } + + private: + static constexpr double kDefaultError = 0.023; + + static double extractError( + const core::AggregationNode::Aggregate& aggregate, + const RowVectorPtr& input) { + const auto& args = aggregate.call->inputs(); + + if (args.size() == 1) { + return kDefaultError; + } + + auto field = core::TypedExprs::asFieldAccess(args[1]); + VELOX_CHECK_NOT_NULL(field); + auto errorVector = + input->childAt(field->name())->as>(); + return errorVector->valueAt(0); + } + + static std::string makeCountDistinctCall( + const core::AggregationNode::Aggregate& aggregate) { + const auto& args = aggregate.call->inputs(); + VELOX_CHECK_GE(args.size(), 1) + + auto inputField = core::TypedExprs::asFieldAccess(args[0]); + VELOX_CHECK_NOT_NULL(inputField) + + std::string countDistinctCall = + fmt::format("count(distinct {})", inputField->name()); + + if (aggregate.mask != nullptr) { + countDistinctCall += + fmt::format(" filter (where {})", aggregate.mask->name()); + } + + return countDistinctCall; + } + + RowVectorPtr expected_; + std::vector groupingKeys_; + std::string name_; + double error_; +}; + +} // namespace facebook::velox::exec::test diff --git a/velox/functions/prestosql/fuzzer/ApproxPercentileInputGenerator.h b/velox/functions/prestosql/fuzzer/ApproxPercentileInputGenerator.h new file mode 100644 index 0000000000000..74fdc9048ac2c --- /dev/null +++ b/velox/functions/prestosql/fuzzer/ApproxPercentileInputGenerator.h @@ -0,0 +1,126 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include "velox/exec/fuzzer/InputGenerator.h" + +namespace facebook::velox::exec::test { + +class ApproxPercentileInputGenerator : public InputGenerator { + public: + std::vector generate( + const std::vector& types, + VectorFuzzer& fuzzer, + FuzzerGenerator& rng, + memory::MemoryPool* pool) override { + // The arguments are: x, [w], percentile(s), [accuracy]. + // + // First argument is always 'x'. If second argument's type is BIGINT, then + // it is 'w'. Otherwise, it is percentile(x). + + const auto size = fuzzer.getOptions().vectorSize; + + std::vector inputs; + inputs.reserve(types.size()); + inputs.push_back(fuzzer.fuzz(types[0])); + + if (types[1]->isBigint()) { + velox::test::VectorMaker vectorMaker{pool}; + auto weight = vectorMaker.flatVector(size, [&](auto /*row*/) { + return boost::random::uniform_int_distribution(1, 1'000)(rng); + }); + + inputs.push_back(weight); + } + + const int percentileTypeIndex = types[1]->isBigint() ? 2 : 1; + const TypePtr& percentileType = types[percentileTypeIndex]; + if (percentileType->isDouble()) { + if (!percentile_.has_value()) { + percentile_ = pickPercentile(fuzzer, rng); + } + + inputs.push_back(BaseVector::createConstant( + DOUBLE(), percentile_.value(), size, pool)); + } else { + VELOX_CHECK(percentileType->isArray()); + VELOX_CHECK(percentileType->childAt(0)->isDouble()); + + if (percentiles_.empty()) { + percentiles_.push_back(pickPercentile(fuzzer, rng)); + percentiles_.push_back(pickPercentile(fuzzer, rng)); + percentiles_.push_back(pickPercentile(fuzzer, rng)); + } + + auto arrayVector = + BaseVector::create(ARRAY(DOUBLE()), 1, pool); + auto elementsVector = arrayVector->elements()->asFlatVector(); + elementsVector->resize(percentiles_.size()); + for (auto i = 0; i < percentiles_.size(); ++i) { + elementsVector->set(i, percentiles_[i]); + } + arrayVector->setOffsetAndSize(0, 0, percentiles_.size()); + + inputs.push_back(BaseVector::wrapInConstant(size, 0, arrayVector)); + } + + if (types.size() > percentileTypeIndex + 1) { + // Last argument is 'accuracy'. + VELOX_CHECK(types.back()->isDouble()); + if (!accuracy_.has_value()) { + accuracy_ = boost::random::uniform_01()(rng); + } + + inputs.push_back( + BaseVector::createConstant(DOUBLE(), accuracy_.value(), size, pool)); + } + + return inputs; + } + + void reset() override { + percentile_.reset(); + percentiles_.clear(); + accuracy_.reset(); + } + + private: + double pickPercentile(VectorFuzzer& fuzzer, FuzzerGenerator& rng) { + // 10% of the times generate random value in [0, 1] range. + // 90% of the times use one of the common values. + if (fuzzer.coinToss(0.1)) { + return boost::random::uniform_01()(rng); + } + + static const std::vector kPercentiles = { + 0.1, 0.25, 0.5, 0.75, 0.90, 0.95, 0.99, 0.999, 0.9999}; + + const auto index = + boost::random::uniform_int_distribution()(rng) % + kPercentiles.size(); + + return kPercentiles[index]; + } + + std::optional percentile_; + std::vector percentiles_; + std::optional accuracy_; +}; + +} // namespace facebook::velox::exec::test diff --git a/velox/functions/prestosql/fuzzer/ApproxPercentileResultVerifier.h b/velox/functions/prestosql/fuzzer/ApproxPercentileResultVerifier.h new file mode 100644 index 0000000000000..620f988ce8438 --- /dev/null +++ b/velox/functions/prestosql/fuzzer/ApproxPercentileResultVerifier.h @@ -0,0 +1,367 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "velox/core/PlanNode.h" +#include "velox/exec/fuzzer/ResultVerifier.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox::exec::test { + +// Verifies results of approx_percentile by checking the range of percentiles +// represented by the result value and asserting that the requested percentile +// falls into that range within 'accuracy'. +class ApproxPercentileResultVerifier : public ResultVerifier { + public: + bool supportsCompare() override { + return false; + } + + bool supportsVerify() override { + return true; + } + + // Compute the range of percentiles represented by each of the input values. + void initialize( + const std::vector& input, + const std::vector& groupingKeys, + const core::AggregationNode::Aggregate& aggregate, + const std::string& aggregateName) override { + VELOX_CHECK(!input.empty()); + + int64_t numInputs = 0; + for (const auto& v : input) { + numInputs += v->size(); + } + + const auto& args = aggregate.call->inputs(); + const auto& valueField = fieldName(args[0]); + std::optional weightField; + if (args.size() >= 3 && args[1]->type()->isBigint()) { + weightField = fieldName(args[1]); + } + + groupingKeys_ = groupingKeys; + name_ = aggregateName; + + percentiles_ = extractPercentiles(input, aggregate); + VELOX_CHECK(!percentiles_.empty()); + + accuracy_ = extractAccuracy(aggregate, input[0]); + + // Compute percentiles for all values. + allRanges_ = + computePercentiles(input, valueField, weightField, aggregate.mask); + VELOX_CHECK_LE(allRanges_->size(), numInputs); + } + + bool compare( + const RowVectorPtr& /*result*/, + const RowVectorPtr& /*altResult*/) override { + VELOX_UNSUPPORTED(); + } + + bool verify(const RowVectorPtr& result) override { + // Compute acceptable ranges of percentiles for each value in 'result'. + auto ranges = getPercentileRanges(result); + // VELOX_CHECK_EQ(ranges->size(), result->size() * percentiles_.size()); + + auto& value = ranges->childAt(name_); + auto* minPct = ranges->childAt("min_pct")->as>(); + auto* maxPct = ranges->childAt("max_pct")->as>(); + auto* pctIndex = ranges->childAt("pct_index")->as>(); + + for (auto i = 0; i < ranges->size(); ++i) { + if (value->isNullAt(i)) { + VELOX_CHECK(minPct->isNullAt(i)); + VELOX_CHECK(maxPct->isNullAt(i)); + continue; + } + + VELOX_CHECK(!minPct->isNullAt(i)); + VELOX_CHECK(!maxPct->isNullAt(i)); + VELOX_CHECK(!pctIndex->isNullAt(i)); + + const auto pct = percentiles_[pctIndex->valueAt(i)]; + + std::pair range{minPct->valueAt(i), maxPct->valueAt(i)}; + if (!checkPercentileGap(pct, range, accuracy_)) { + return false; + } + } + + return true; + } + + void reset() override { + allRanges_.reset(); + } + + private: + static constexpr double kDefaultAccuracy = 0.0133; + + static double extractAccuracy( + const core::AggregationNode::Aggregate& aggregate, + const RowVectorPtr& input) { + const auto& args = aggregate.call->inputs(); + + column_index_t accuracyIndex = 2; + if (args.size() >= 3 && args[1]->type()->isBigint()) { + // We have a 'w' argument. + accuracyIndex = 3; + } + + if (args.size() <= accuracyIndex) { + return kDefaultAccuracy; + } + + auto field = core::TypedExprs::asFieldAccess(args[accuracyIndex]); + VELOX_CHECK_NOT_NULL(field); + auto accuracyVector = + input->childAt(field->name())->as>(); + return accuracyVector->valueAt(0); + } + + static bool checkPercentileGap( + double pct, + const std::pair& range, + double accuracy) { + double gap = 0.0; + if (pct < range.first) { + gap = range.first - pct; + } else if (pct > range.second) { + gap = pct - range.second; + } + + if (gap > accuracy) { + LOG(ERROR) << "approx_percentile(pct: " << pct + << ", accuracy: " << accuracy << ") is more than " << accuracy + << " away from acceptable range of [" << range.first << ", " + << range.second << "]. Difference: " << gap; + return false; + } + + return true; + } + + static std::vector append( + const std::vector& values, + const std::vector& newValues) { + auto combined = values; + combined.insert(combined.end(), newValues.begin(), newValues.end()); + return combined; + } + + // Groups input by 'groupingKeys_'. Within each group, sorts data on + // 'valueField', duplicates rows according to optional weight, filters out + // NULLs and rows where mask is not true, then computes ranges of row numbers + // and turns these into ranges of percentiles. + // + // @return A vector of grouping keys, followed by value column named 'name_', + // followed by min_pct and max_pct columns. + RowVectorPtr computePercentiles( + const std::vector& input, + const std::string& valueField, + const std::optional& weightField, + const core::FieldAccessTypedExprPtr& mask) { + VELOX_CHECK(!input.empty()) + const auto rowType = asRowType(input[0]->type()); + + const bool weighted = weightField.has_value(); + + std::vector projections = groupingKeys_; + projections.push_back(fmt::format("{} as x", valueField)); + if (weighted) { + projections.push_back(fmt::format("{} as w", weightField.value())); + } + + PlanBuilder planBuilder; + planBuilder.values(input); + + if (mask != nullptr) { + planBuilder.filter(mask->name()); + } + + planBuilder.project(projections).filter("x IS NOT NULL"); + + if (weighted) { + planBuilder.appendColumns({"sequence(1, w) as repeats"}) + .unnest(append(groupingKeys_, {"x"}), {"repeats"}); + } + + std::string partitionByClause; + if (!groupingKeys_.empty()) { + partitionByClause = + fmt::format("partition by {}", folly::join(", ", groupingKeys_)); + } + + std::vector windowCalls = { + fmt::format( + "row_number() OVER ({} order by x) as rn", partitionByClause), + fmt::format( + "count(1) OVER ({} order by x range between unbounded preceding and unbounded following) " + "as total", + partitionByClause), + }; + + planBuilder.window(windowCalls) + .appendColumns({ + "(rn::double - 1.0) / total::double as lower", + "rn::double / total::double as upper", + }) + .singleAggregation( + append(groupingKeys_, {"x"}), + {"min(lower) as min_pct", "max(upper) as max_pct"}) + .project(append( + groupingKeys_, + {fmt::format("x as {}", name_), "min_pct", "max_pct"})); + + auto plan = planBuilder.planNode(); + return AssertQueryBuilder(plan).copyResults(input[0]->pool()); + } + + static const std::string& fieldName(const core::TypedExprPtr& expression) { + auto field = core::TypedExprs::asFieldAccess(expression); + VELOX_CHECK_NOT_NULL(field); + return field->name(); + } + + // Extract 'percentile' argument. + static std::vector extractPercentiles( + const std::vector& input, + const core::AggregationNode::Aggregate& aggregate) { + const auto args = aggregate.call->inputs(); + column_index_t percentileIndex = 1; + if (args.size() >= 3 && args[1]->type()->isBigint()) { + percentileIndex = 2; + } + + const auto& percentileExpr = args[percentileIndex]; + + if (auto constantExpr = core::TypedExprs::asConstant(percentileExpr)) { + if (constantExpr->type()->isDouble()) { + return {constantExpr->value().value()}; + } + + return toList(constantExpr->valueVector()); + } + + const auto& percentileVector = input[0]->childAt(fieldName(percentileExpr)); + + if (percentileVector->type()->isDouble()) { + VELOX_CHECK(!percentileVector->isNullAt(0)); + return {percentileVector->as>()->valueAt(0)}; + } + + return toList(percentileVector); + } + + static std::vector toList(const VectorPtr& vector) { + VELOX_CHECK(vector->type()->equivalent(*ARRAY(DOUBLE()))); + + DecodedVector decoded(*vector); + auto arrayVector = decoded.base()->as(); + + VELOX_CHECK(!decoded.isNullAt(0)); + const auto offset = arrayVector->offsetAt(decoded.index(0)); + const auto size = arrayVector->sizeAt(decoded.index(0)); + + auto* elementsVector = arrayVector->elements()->as>(); + + std::vector percentiles; + percentiles.reserve(size); + for (auto i = 0; i < size; ++i) { + VELOX_CHECK(!elementsVector->isNullAt(offset + i)); + percentiles.push_back(elementsVector->valueAt(offset + i)); + } + return percentiles; + } + + // For each row ([k1, k2,] x) in 'result', lookup min_pct and max_pct in + // 'allRanges_'. Return a vector of ([k1, k2,] x, min_pct, max_pct) rows. + RowVectorPtr getPercentileRanges(const RowVectorPtr& result) { + auto planNodeIdGenerator = std::make_shared(); + + core::PlanNodePtr expectedSource; + core::PlanNodePtr actualSource; + if (result->childAt(name_)->type()->isArray()) { + expectedSource = + PlanBuilder(planNodeIdGenerator) + .values({allRanges_}) + .appendColumns({fmt::format( + "sequence(0, {}) as s", percentiles_.size() - 1)}) + .unnest( + append(groupingKeys_, {name_, "min_pct", "max_pct"}), + {"s"}, + "pct_index") + .project(append( + groupingKeys_, {name_, "min_pct", "max_pct", "pct_index"})) + .planNode(); + + actualSource = PlanBuilder(planNodeIdGenerator) + .values({result}) + .unnest(groupingKeys_, {name_}, "pct_index") + .project(append( + groupingKeys_, + { + fmt::format("{}_e as {}", name_, name_), + "null::double as min_pct", + "null::double as max_pct", + "pct_index - 1 as pct_index", + })) + .planNode(); + } else { + expectedSource = PlanBuilder(planNodeIdGenerator) + .values({allRanges_}) + .appendColumns({"0 as pct_index"}) + .planNode(); + + actualSource = PlanBuilder(planNodeIdGenerator) + .values({result}) + .appendColumns({ + "null::double as min_pct", + "null::double as max_pct", + "0 as pct_index", + }) + .planNode(); + } + + auto plan = PlanBuilder(planNodeIdGenerator) + .localPartition({}, {expectedSource, actualSource}) + .singleAggregation( + append(groupingKeys_, {name_, "pct_index"}), + { + "count(1) as cnt", + "arbitrary(min_pct) as min_pct", + "arbitrary(max_pct) as max_pct", + }) + .filter({"cnt = 2"}) + .planNode(); + return AssertQueryBuilder(plan).copyResults(result->pool()); + } + + std::vector groupingKeys_; + std::string name_; + std::vector percentiles_; + double accuracy_; + RowVectorPtr allRanges_; +}; + +} // namespace facebook::velox::exec::test diff --git a/velox/functions/prestosql/fuzzer/MinMaxInputGenerator.h b/velox/functions/prestosql/fuzzer/MinMaxInputGenerator.h new file mode 100644 index 0000000000000..e46038838acd8 --- /dev/null +++ b/velox/functions/prestosql/fuzzer/MinMaxInputGenerator.h @@ -0,0 +1,87 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include "velox/exec/fuzzer/InputGenerator.h" + +namespace facebook::velox::exec::test { + +class MinMaxInputGenerator : public InputGenerator { + public: + explicit MinMaxInputGenerator(const std::string& name) + : indexOfN_{indexOfN(name)} {} + + std::vector generate( + const std::vector& types, + VectorFuzzer& fuzzer, + FuzzerGenerator& rng, + memory::MemoryPool* pool) override { + // TODO Generate inputs free of nested nulls. + if (types.size() <= indexOfN_) { + return {}; + } + + // Make sure to use the same value of 'n' for all batches in a given Fuzzer + // iteration. + if (!n_.has_value()) { + n_ = boost::random::uniform_int_distribution(0, 9'999)(rng); + } + + const auto size = fuzzer.getOptions().vectorSize; + + std::vector inputs; + inputs.reserve(types.size()); + for (auto i = 0; i < types.size() - 1; ++i) { + inputs.push_back(fuzzer.fuzz(types[i])); + } + + VELOX_CHECK( + types.back()->isBigint(), + "Unexpected type: {}", + types.back()->toString()) + inputs.push_back( + BaseVector::createConstant(BIGINT(), n_.value(), size, pool)); + return inputs; + } + + void reset() override { + n_.reset(); + } + + private: + // Returns zero-based index of the 'n' argument, 1 for min and max. 2 for + // min_by and max_by. + static int32_t indexOfN(const std::string& name) { + if (name == "min" || name == "max") { + return 1; + } + + if (name == "min_by" || name == "max_by") { + return 2; + } + + VELOX_FAIL("Unexpected function name: {}", name) + } + + // Zero-based index of the 'n' argument. + const int32_t indexOfN_; + std::optional n_; +}; + +} // namespace facebook::velox::exec::test