Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build: Enable Spark query runner as reference in aggregation fuzzer test #9559

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 0 additions & 53 deletions .github/workflows/experimental.yml
assignUser marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,6 @@ jobs:
name: aggregation
path: velox/_build/debug/velox/functions/prestosql/fuzzer/velox_aggregation_fuzzer_test

- name: Upload spark aggregation fuzzer
uses: actions/upload-artifact@v3
with:
name: spark_aggregation_fuzzer
path: velox/_build/debug/velox/functions/sparksql/fuzzer/spark_aggregation_fuzzer_test

- name: Upload aggregation fuzzer
uses: actions/upload-artifact@v3
with:
name: aggregation
path: velox/_build/debug/velox/functions/prestosql/fuzzer/velox_aggregation_fuzzer_test
assignUser marked this conversation as resolved.
Show resolved Hide resolved

- name: Upload join fuzzer
uses: actions/upload-artifact@v3
with:
Expand Down Expand Up @@ -180,47 +168,6 @@ jobs:
/tmp/aggregate_fuzzer_repro
/tmp/server.log

linux-spark-fuzzer-run:
runs-on: ubuntu-latest
needs: compile
timeout-minutes: 120
steps:

- name: "Checkout Repo"
uses: actions/checkout@v3
with:
ref: "${{ inputs.ref || 'main' }}"

- name: "Install dependencies"
run: source ./scripts/setup-ubuntu.sh && install_apt_deps

- name: Download spark aggregation fuzzer
uses: actions/download-artifact@v3
with:
name: spark_aggregation_fuzzer

- name: "Run Spark Aggregate Fuzzer"
run: |
mkdir -p /tmp/spark_aggregate_fuzzer_repro/
chmod -R 777 /tmp/spark_aggregate_fuzzer_repro
chmod +x spark_aggregation_fuzzer_test
./spark_aggregation_fuzzer_test \
--seed ${RANDOM} \
--duration_sec 1800 \
--logtostderr=1 \
--minloglevel=0 \
--repro_persist_path=/tmp/spark_aggregate_fuzzer_repro \
--enable_sorted_aggregations=true \
&& echo -e "\n\nSpark Aggregation Fuzzer run finished successfully."

- name: Archive Spark aggregate production artifacts
if: always()
uses: actions/upload-artifact@v3
with:
name: spark-agg-fuzzer-failure-artifacts
path: |
/tmp/spark_aggregate_fuzzer_repro

linux-join-fuzzer-run:
runs-on: ubuntu-latest
needs: compile
Expand Down
6 changes: 5 additions & 1 deletion .github/workflows/scheduled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ jobs:
spark-aggregate-fuzzer-run:
name: Spark Aggregate Fuzzer
runs-on: ubuntu-latest
container: ghcr.io/facebookincubator/velox-dev:centos9
container: ghcr.io/facebookincubator/velox-dev:spark-server
needs: compile
timeout-minutes: 60
steps:
Expand All @@ -469,12 +469,16 @@ jobs:

- name: Run Spark Aggregate Fuzzer
run: |
bash /opt/start-spark.sh
# Sleep for 60 seconds to allow Spark server to start.
sleep 60
mkdir -p /tmp/spark_aggregate_fuzzer_repro/logs/
chmod -R 777 /tmp/spark_aggregate_fuzzer_repro
chmod +x spark_aggregation_fuzzer_test
./spark_aggregation_fuzzer_test \
--seed ${RANDOM} \
--duration_sec $DURATION \
--enable_sorted_aggregations=false \
--minloglevel=0 \
--stderrthreshold=2 \
--log_dir=/tmp/spark_aggregate_fuzzer_repro/logs \
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/fuzzer/FuzzerUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ void setupMemory(
void registerHiveConnector(
const std::unordered_map<std::string, std::string>& hiveConfigs) {
auto configs = hiveConfigs;
// Make sure not to run out of open file descriptors.
configs[connector::hive::HiveConfig::kNumCacheFileHandles] = "1000";
if (!connector::hasConnectorFactory(
connector::hive::HiveConnectorFactory::kHiveConnectorName)) {
connector::registerConnectorFactory(
Expand Down
39 changes: 18 additions & 21 deletions velox/functions/sparksql/fuzzer/SparkAggregationFuzzerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
#include <gtest/gtest.h>
#include <unordered_set>

#include "velox/dwio/parquet/RegisterParquetWriter.h"
#include "velox/exec/fuzzer/AggregationFuzzerOptions.h"
#include "velox/exec/fuzzer/AggregationFuzzerRunner.h"
#include "velox/exec/fuzzer/DuckQueryRunner.h"
#include "velox/exec/fuzzer/TransformResultVerifier.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/functions/sparksql/aggregates/Register.h"
#include "velox/functions/sparksql/fuzzer/SparkQueryRunner.h"
#include "velox/serializers/CompactRowSerializer.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/serializers/UnsafeRowSerializer.h"
Expand All @@ -45,6 +46,7 @@ DEFINE_string(
int main(int argc, char** argv) {
facebook::velox::functions::aggregate::sparksql::registerAggregateFunctions(
"", false);
facebook::velox::parquet::registerParquetWriterFactory();

::testing::InitGoogleTest(&argc, argv);

Expand All @@ -71,10 +73,13 @@ int main(int argc, char** argv) {
}
facebook::velox::memory::MemoryManager::initialize({});

// TODO: List of the functions that at some point crash or fail and need to
// be fixed before we can enable. Constant argument of bloom_filter_agg cause
// fuzzer test fail.
std::unordered_set<std::string> skipFunctions = {"bloom_filter_agg"};
// Spark does not provide user-accessible aggregate functions with the
// following names.
std::unordered_set<std::string> skipFunctions = {
"bloom_filter_agg",
"first_ignore_null",
"last_ignore_null",
"regr_replacement"};

using facebook::velox::exec::test::TransformResultVerifier;

Expand Down Expand Up @@ -113,21 +118,9 @@ int main(int argc, char** argv) {
size_t initialSeed = FLAGS_seed == 0 ? std::time(nullptr) : FLAGS_seed;
std::shared_ptr<facebook::velox::memory::MemoryPool> rootPool{
facebook::velox::memory::memoryManager()->addRootPool()};
auto duckQueryRunner =
std::make_unique<facebook::velox::exec::test::DuckQueryRunner>(
rootPool.get());
duckQueryRunner->disableAggregateFunctions(
{// https://github.com/facebookincubator/velox/issues/7677
"max_by",
"min_by",
// The skewness functions of Velox and DuckDB use different
// algorithms.
// https://github.com/facebookincubator/velox/issues/4845
"skewness",
// Spark's kurtosis uses Pearson's formula for calculating the kurtosis
// coefficient. Meanwhile, DuckDB employs the sample kurtosis calculation
// formula. The results from the two methods are completely different.
"kurtosis"});
auto sparkQueryRunner = std::make_unique<
facebook::velox::functions::sparksql::fuzzer::SparkQueryRunner>(
rootPool.get(), "localhost:15002", "fuzzer", "aggregate");

using Runner = facebook::velox::exec::test::AggregationFuzzerRunner;
using Options = facebook::velox::exec::test::AggregationFuzzerOptions;
Expand All @@ -137,5 +130,9 @@ int main(int argc, char** argv) {
options.skipFunctions = skipFunctions;
options.customVerificationFunctions = customVerificationFunctions;
options.orderableGroupKeys = true;
return Runner::run(initialSeed, std::move(duckQueryRunner), options);
options.timestampPrecision =
facebook::velox::VectorFuzzer::Options::TimestampPrecision::kMicroSeconds;
options.hiveConfigs = {
{facebook::velox::connector::hive::HiveConfig::kReadTimestampUnit, "6"}};
return Runner::run(initialSeed, std::move(sparkQueryRunner), options);
}
Loading