Skip to content

Commit

Permalink
Merge branch 'facebookincubator:main' into orc_read
Browse files Browse the repository at this point in the history
  • Loading branch information
wypb authored Dec 20, 2023
2 parents 2d588a1 + 6b17ea5 commit 0d21b06
Show file tree
Hide file tree
Showing 77 changed files with 1,856 additions and 416 deletions.
8 changes: 4 additions & 4 deletions .circleci/dist_compile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ jobs:
command: |
mkdir -p /tmp/spark_aggregate_fuzzer_repro/
chmod -R 777 /tmp/spark_aggregate_fuzzer_repro
_build/debug/velox/exec/tests/spark_aggregation_fuzzer_test \
_build/debug/velox/functions/sparksql/fuzzer/spark_aggregation_fuzzer_test \
--seed ${RANDOM} \
--duration_sec 60 \
--logtostderr=1 \
Expand All @@ -332,7 +332,7 @@ jobs:
mkdir -p /tmp/aggregate_fuzzer_repro/
rm -rfv /tmp/aggregate_fuzzer_repro/*
chmod -R 777 /tmp/aggregate_fuzzer_repro
_build/debug/velox/exec/tests/velox_aggregation_fuzzer_test \
_build/debug/velox/functions/prestosql/fuzzer/velox_aggregation_fuzzer_test \
--seed ${RANDOM} \
--duration_sec 1800 \
--logtostderr=1 \
Expand Down Expand Up @@ -503,7 +503,7 @@ jobs:
fuzzer_output: "/tmp/spark_aggregate_fuzzer.log"
fuzzer_repro: "/tmp/spark_aggregate_fuzzer_repro"
fuzzer_name: "SparkAggregate"
fuzzer_exe: "_build/debug/velox/exec/tests/spark_aggregation_fuzzer_test"
fuzzer_exe: "_build/debug/velox/functions/sparksql/fuzzer/spark_aggregation_fuzzer_test"
fuzzer_args: " --seed ${RANDOM} --duration_sec 600 --logtostderr=1 --minloglevel=0 \
--repro_persist_path=/tmp/spark_aggregate_fuzzer_repro"

Expand All @@ -518,7 +518,7 @@ jobs:
fuzzer_output: "/tmp/aggregate_fuzzer.log"
fuzzer_repro: "/tmp/aggregate_fuzzer_repro"
fuzzer_name: "Aggregate"
fuzzer_exe: "_build/debug/velox/exec/tests/velox_aggregation_fuzzer_test"
fuzzer_exe: "_build/debug/velox/functions/prestosql/fuzzer/velox_aggregation_fuzzer_test"
fuzzer_args: " --seed ${RANDOM} --duration_sec 3600 --logtostderr=1 --minloglevel=0 \
--repro_persist_path=/tmp/aggregate_fuzzer_repro"

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/scheduled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ jobs:
name: spark
path: |
velox/_build/debug/velox/expression/tests/spark_expression_fuzzer_test
velox/_build/debug/velox/expression/tests/spark_aggregation_fuzzer_test
velox/_build/debug/velox/functions/sparksql/fuzzer/spark_aggregation_fuzzer_test
- name: Upload aggregation fuzzer
uses: actions/upload-artifact@v3
with:
name: aggregation
path: velox/_build/debug/velox/exec/tests/velox_aggregation_fuzzer_test
path: velox/_build/debug/velox/functions/prestosql/fuzzer/velox_aggregation_fuzzer_test

- name: Upload join fuzzer
uses: actions/upload-artifact@v3
Expand Down
2 changes: 1 addition & 1 deletion scripts/setup-adapters.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ DEPENDENCY_DIR=${DEPENDENCY_DIR:-$(pwd)}

function install_aws-sdk-cpp {
local AWS_REPO_NAME="aws/aws-sdk-cpp"
local AWS_SDK_VERSION="1.10.57"
local AWS_SDK_VERSION="1.11.169"

github_checkout $AWS_REPO_NAME $AWS_SDK_VERSION --depth 1 --recurse-submodules
cmake_install -DCMAKE_BUILD_TYPE=Debug -DBUILD_SHARED_LIBS:BOOL=OFF -DMINIMIZE_SIZE:BOOL=ON -DENABLE_TESTING:BOOL=OFF -DBUILD_ONLY:STRING="s3;identity-management"
Expand Down
28 changes: 15 additions & 13 deletions velox/benchmarks/basic/VectorFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ namespace {

using namespace facebook::velox;

std::shared_ptr<memory::MemoryPool> pool{
memory::MemoryManager::getInstance()->addLeafPool()};
memory::MemoryPool* pool() {
static auto leaf = memory::MemoryManager::getInstance()->addLeafPool();
return leaf.get();
}

VectorFuzzer::Options getOpts(size_t n, double nullRatio = 0) {
VectorFuzzer::Options opts;
Expand All @@ -39,25 +41,25 @@ VectorFuzzer::Options getOpts(size_t n, double nullRatio = 0) {
}

BENCHMARK_MULTI(flatInteger, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzFlat(BIGINT()));
return n;
}

BENCHMARK_RELATIVE_MULTI(flatIntegerHalfNull, n) {
VectorFuzzer fuzzer(getOpts(n, 0.5), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n, 0.5), pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzFlat(BIGINT()));
return n;
}

BENCHMARK_RELATIVE_MULTI(flatDouble, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzFlat(DOUBLE()));
return n;
}

BENCHMARK_RELATIVE_MULTI(flatBool, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzFlat(BOOLEAN()));
return n;
}
Expand All @@ -66,7 +68,7 @@ BENCHMARK_RELATIVE_MULTI(flatVarcharAscii, n) {
auto opts = getOpts(n);
opts.charEncodings = {UTF8CharList::ASCII};

VectorFuzzer fuzzer(opts, pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(opts, pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzFlat(VARCHAR()));
return n;
}
Expand All @@ -75,37 +77,37 @@ BENCHMARK_RELATIVE_MULTI(flatVarcharUtf8, n) {
auto opts = getOpts(n);
opts.charEncodings = {UTF8CharList::EXTENDED_UNICODE};

VectorFuzzer fuzzer(opts, pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(opts, pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzFlat(VARCHAR()));
return n;
}

BENCHMARK_DRAW_LINE();

BENCHMARK_RELATIVE_MULTI(constantInteger, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzConstant(BIGINT()));
return n;
}

BENCHMARK_RELATIVE_MULTI(dictionaryInteger, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzDictionary(fuzzer.fuzzFlat(BIGINT())));
return n;
}

BENCHMARK_DRAW_LINE();

BENCHMARK_RELATIVE_MULTI(flatArray, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
const size_t elementsSize = n * fuzzer.getOptions().containerLength;
folly::doNotOptimizeAway(
fuzzer.fuzzArray(fuzzer.fuzzFlat(BIGINT(), elementsSize), n));
return n;
}

BENCHMARK_RELATIVE_MULTI(flatMap, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
const size_t elementsSize = n * fuzzer.getOptions().containerLength;
folly::doNotOptimizeAway(fuzzer.fuzzMap(
fuzzer.fuzzFlat(BIGINT(), elementsSize),
Expand All @@ -115,7 +117,7 @@ BENCHMARK_RELATIVE_MULTI(flatMap, n) {
}

BENCHMARK_RELATIVE_MULTI(flatMapArrayNested, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
const size_t elementsSize = n * fuzzer.getOptions().containerLength;

folly::doNotOptimizeAway(fuzzer.fuzzMap(
Expand Down
15 changes: 15 additions & 0 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
namespace facebook::velox {

void registerVeloxMetrics() {
/// ================== Task Execution Counters =================
// The number of driver yield count when exceeds the per-driver cpu time slice
// limit if enforced.
DEFINE_METRIC(kMetricDriverYieldCount, facebook::velox::StatType::COUNT);

/// ================== Cache Counters =================

// Tracks hive handle generation latency in range of [0, 100s] and reports
// P50, P90, P99, and P100.
DEFINE_HISTOGRAM_METRIC(
Expand Down Expand Up @@ -96,6 +103,14 @@ void registerVeloxMetrics() {
DEFINE_METRIC(
kMetricArbitratorFreeCapacityBytes, facebook::velox::StatType::AVG);

// Tracks the memory pool usage leak in bytes.
DEFINE_METRIC(
kMetricMemoryPoolUsageLeakBytes, facebook::velox::StatType::SUM);

// Tracks the memory pool reservation leak in bytes.
DEFINE_METRIC(
kMetricMemoryPoolReservationLeakBytes, facebook::velox::StatType::SUM);

/// ================== Spill related Counters =================

// The number of bytes in memory to spill.
Expand Down
9 changes: 9 additions & 0 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ constexpr folly::StringPiece kMetricMemoryReclaimWaitTimeoutCount{
constexpr folly::StringPiece kMetricMemoryNonReclaimableCount{
"velox.memory_non_reclaimable_count"};

constexpr folly::StringPiece kMetricMemoryPoolUsageLeakBytes{
"velox.memory_pool_usage_leak_bytes"};

constexpr folly::StringPiece kMetricMemoryPoolReservationLeakBytes{
"velox.memory_pool_reservation_leak_bytes"};

constexpr folly::StringPiece kMetricArbitratorRequestsCount{
"velox.arbitrator_requests_count"};

Expand All @@ -67,6 +73,9 @@ constexpr folly::StringPiece kMetricArbitratorArbitrationTimeMs{
constexpr folly::StringPiece kMetricArbitratorFreeCapacityBytes{
"velox.arbitrator_free_capacity_bytes"};

constexpr folly::StringPiece kMetricDriverYieldCount{
"velox.driver_yield_count"};

constexpr folly::StringPiece kMetricSpilledInputBytes{
"velox.spill_input_bytes"};

Expand Down
2 changes: 2 additions & 0 deletions velox/common/base/SpillConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ SpillConfig::SpillConfig(
uint8_t _startPartitionBit,
uint8_t _joinPartitionBits,
int32_t _maxSpillLevel,
uint64_t _maxSpillRunRows,
uint64_t _writerFlushThresholdSize,
int32_t _testSpillPct,
const std::string& _compressionKind,
Expand All @@ -46,6 +47,7 @@ SpillConfig::SpillConfig(
startPartitionBit(_startPartitionBit),
joinPartitionBits(_joinPartitionBits),
maxSpillLevel(_maxSpillLevel),
maxSpillRunRows(_maxSpillRunRows),
writerFlushThresholdSize(_writerFlushThresholdSize),
testSpillPct(_testSpillPct),
compressionKind(common::stringToCompressionKind(_compressionKind)),
Expand Down
6 changes: 6 additions & 0 deletions velox/common/base/SpillConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct SpillConfig {
uint8_t _startPartitionBit,
uint8_t _joinPartitionBits,
int32_t _maxSpillLevel,
uint64_t _maxSpillRunRows,
uint64_t _writerFlushThresholdSize,
int32_t _testSpillPct,
const std::string& _compressionKind,
Expand Down Expand Up @@ -104,6 +105,11 @@ struct SpillConfig {
/// partition bits at the end.
int32_t maxSpillLevel;

/// The max row numbers to fill and spill for each spill run. This is used to
/// cap the memory used for spilling. If it is zero, then there is no limit
/// and spilling might run out of memory.
uint64_t maxSpillRunRows;

/// Minimum memory footprint size required to reclaim memory from a file
/// writer by flushing its buffered data to disk.
uint64_t writerFlushThresholdSize;
Expand Down
5 changes: 4 additions & 1 deletion velox/common/base/tests/SpillConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ TEST(SpillConfig, spillLevel) {
0,
0,
0,
0,
"none");
struct {
uint8_t bitOffset;
Expand Down Expand Up @@ -76,7 +77,7 @@ TEST(SpillConfig, spillLevelLimit) {
int32_t numBits;
uint8_t bitOffset;
int32_t maxSpillLevel;
int32_t expectedExceeds;
bool expectedExceeds;

std::string debugString() const {
return fmt::format(
Expand Down Expand Up @@ -124,6 +125,7 @@ TEST(SpillConfig, spillLevelLimit) {
testData.maxSpillLevel,
0,
0,
0,
"none");

ASSERT_EQ(
Expand Down Expand Up @@ -168,6 +170,7 @@ TEST(SpillConfig, spillableReservationPercentages) {
0,
0,
0,
1'000'000,
0,
0,
"none");
Expand Down
23 changes: 23 additions & 0 deletions velox/common/io/IoStatistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,42 @@ class IoCounter {
return sum_;
}

uint64_t min() const {
return min_;
}

uint64_t max() const {
return max_;
}

void increment(uint64_t amount) {
++count_;
sum_ += amount;
casLoop(min_, amount, std::greater());
casLoop(max_, amount, std::less());
}

void merge(const IoCounter& other) {
sum_ += other.sum_;
count_ += other.count_;
casLoop(min_, other.min_, std::greater());
casLoop(max_, other.max_, std::less());
}

private:
template <typename Compare>
static void
casLoop(std::atomic<uint64_t>& value, uint64_t newValue, Compare compare) {
uint64_t old = value;
while (compare(old, newValue) &&
!value.compare_exchange_weak(old, newValue)) {
}
}

std::atomic<uint64_t> count_{0};
std::atomic<uint64_t> sum_{0};
std::atomic<uint64_t> min_{std::numeric_limits<uint64_t>::max()};
std::atomic<uint64_t> max_{0};
};

class IoStatistics {
Expand Down
2 changes: 0 additions & 2 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
.alignment = alignment_,
.maxCapacity = kMaxMemory,
.trackUsage = options.trackDefaultUsage,
.checkUsageLeak = options.checkUsageLeak,
.debugEnabled = options.debugEnabled,
.coreOnAllocationFailureEnabled =
options.coreOnAllocationFailureEnabled})} {
Expand Down Expand Up @@ -161,7 +160,6 @@ std::shared_ptr<MemoryPool> MemoryManager::addRootPool(
options.alignment = alignment_;
options.maxCapacity = capacity;
options.trackUsage = true;
options.checkUsageLeak = checkUsageLeak_;
options.debugEnabled = debugEnabled_;
options.coreOnAllocationFailureEnabled = coreOnAllocationFailureEnabled_;

Expand Down
Loading

0 comments on commit 0d21b06

Please sign in to comment.