Skip to content

Commit

Permalink
Merge branch 'facebookincubator:main' into export-D68287956
Browse files Browse the repository at this point in the history
  • Loading branch information
natashasehgal authored Jan 31, 2025
2 parents d594085 + eb91ba6 commit a54dcc4
Show file tree
Hide file tree
Showing 28 changed files with 152 additions and 76 deletions.
8 changes: 4 additions & 4 deletions velox/common/base/VeloxException.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,16 @@ bool isStackTraceEnabled(VeloxException::Type type) {
using namespace std::literals::chrono_literals;
const bool isSysException = type == VeloxException::Type::kSystem;
if ((isSysException &&
!config::globalConfig.exceptionSystemStacktraceEnabled) ||
!config::globalConfig().exceptionSystemStacktraceEnabled) ||
(!isSysException &&
!config::globalConfig.exceptionUserStacktraceEnabled)) {
!config::globalConfig().exceptionUserStacktraceEnabled)) {
// VeloxException stacktraces are disabled.
return false;
}

const int32_t rateLimitMs = isSysException
? config::globalConfig.exceptionSystemStacktraceRateLimitMs
: config::globalConfig.exceptionUserStacktraceRateLimitMs;
? config::globalConfig().exceptionSystemStacktraceRateLimitMs
: config::globalConfig().exceptionUserStacktraceRateLimitMs;
// not static so the global config can be manipulated at runtime
if (0 == rateLimitMs) {
// VeloxException stacktraces are not rate-limited
Expand Down
8 changes: 4 additions & 4 deletions velox/common/base/tests/ExceptionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ void testExceptionTraceCollectionControl(bool userException, bool enabled) {
SCOPED_TRACE(fmt::format(
"enabled: {}, user flag: {}, sys flag: {}",
enabled,
config::globalConfig.exceptionUserStacktraceEnabled,
config::globalConfig.exceptionSystemStacktraceEnabled));
config::globalConfig().exceptionUserStacktraceEnabled,
config::globalConfig().exceptionSystemStacktraceEnabled));
ASSERT_EQ(userException, e.exceptionType() == VeloxException::Type::kUser);
ASSERT_EQ(enabled, e.stackTrace() != nullptr);
}
Expand Down Expand Up @@ -179,8 +179,8 @@ void testExceptionTraceCollectionRateControl(
"userException: {}, hasRateLimit: {}, user limit: {}ms, sys limit: {}ms",
userException,
hasRateLimit,
config::globalConfig.exceptionUserStacktraceRateLimitMs,
config::globalConfig.exceptionSystemStacktraceRateLimitMs));
config::globalConfig().exceptionUserStacktraceRateLimitMs,
config::globalConfig().exceptionSystemStacktraceRateLimitMs));
ASSERT_EQ(
userException, e.exceptionType() == VeloxException::Type::kUser);
ASSERT_EQ(!hasRateLimit || ((iter % 2) == 0), e.stackTrace() != nullptr);
Expand Down
8 changes: 4 additions & 4 deletions velox/common/caching/tests/AsyncDataCacheTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ TEST_P(AsyncDataCacheTest, pin) {

TEST_P(AsyncDataCacheTest, replace) {
constexpr int64_t kMaxBytes = 64 << 20;
config::globalConfig.exceptionUserStacktraceEnabled = false;
config::globalConfig().exceptionUserStacktraceEnabled = false;
initializeCache(kMaxBytes);
// Load 10x the max size, inject an error every 21 batches.
loadLoop(0, kMaxBytes * 10, 21);
Expand All @@ -736,7 +736,7 @@ TEST_P(AsyncDataCacheTest, replace) {

TEST_P(AsyncDataCacheTest, evictAccounting) {
constexpr int64_t kMaxBytes = 64 << 20;
config::globalConfig.exceptionUserStacktraceEnabled = false;
config::globalConfig().exceptionUserStacktraceEnabled = false;
initializeCache(kMaxBytes);
auto pool = manager_->addLeafPool("test");

Expand All @@ -760,7 +760,7 @@ TEST_P(AsyncDataCacheTest, evictAccounting) {
TEST_P(AsyncDataCacheTest, largeEvict) {
constexpr int64_t kMaxBytes = 256 << 20;
constexpr int32_t kNumThreads = 24;
config::globalConfig.exceptionUserStacktraceEnabled = false;
config::globalConfig().exceptionUserStacktraceEnabled = false;
initializeCache(kMaxBytes);
// Load 10x the max size, inject an allocation of 1/8 the capacity every 4
// batches.
Expand Down Expand Up @@ -839,7 +839,7 @@ TEST_P(AsyncDataCacheTest, DISABLED_ssd) {
constexpr uint64_t kRamBytes = 32 << 20;
constexpr uint64_t kSsdBytes = 512UL << 20;
#endif
config::globalConfig.exceptionUserStacktraceEnabled = false;
config::globalConfig().exceptionUserStacktraceEnabled = false;
initializeCache(kRamBytes, kSsdBytes);
cache_->setVerifyHook(
[&](const AsyncDataCacheEntry& entry) { checkContents(entry); });
Expand Down
5 changes: 4 additions & 1 deletion velox/common/config/GlobalConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

namespace facebook::velox::config {

GlobalConfiguration globalConfig;
GlobalConfiguration& globalConfig() {
static GlobalConfiguration config;
return config;
}

} // namespace facebook::velox::config
2 changes: 1 addition & 1 deletion velox/common/config/GlobalConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ struct GlobalConfiguration {
std::string saveInputOnExpressionSystemFailurePath;
};

extern GlobalConfiguration globalConfig;
GlobalConfiguration& globalConfig();

} // namespace facebook::velox::config
2 changes: 1 addition & 1 deletion velox/common/memory/MallocAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ MallocAllocator::MallocAllocator(size_t capacity, uint32_t reservationByteLimit)

MallocAllocator::~MallocAllocator() {
// TODO: Remove the check when memory leak issue is resolved.
if (config::globalConfig.memoryLeakCheckEnabled) {
if (config::globalConfig().memoryLeakCheckEnabled) {
VELOX_CHECK(
((allocatedBytes_ - reservations_.read()) == 0) &&
(numAllocated_ == 0) && (numMapped_ == 0),
Expand Down
4 changes: 2 additions & 2 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ std::vector<std::shared_ptr<MemoryPool>> createSharedLeafMemoryPools(
VELOX_CHECK_EQ(sysPool.name(), kSysRootName);
std::vector<std::shared_ptr<MemoryPool>> leafPools;
const size_t numSharedPools =
std::max(1, config::globalConfig.memoryNumSharedLeafPools);
std::max(1, config::globalConfig().memoryNumSharedLeafPools);
leafPools.reserve(numSharedPools);
for (size_t i = 0; i < numSharedPools; ++i) {
leafPools.emplace_back(
Expand Down Expand Up @@ -129,7 +129,7 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
sysRoot_->name());
VELOX_CHECK_EQ(
sharedLeafPools_.size(),
std::max(1, config::globalConfig.memoryNumSharedLeafPools));
std::max(1, config::globalConfig().memoryNumSharedLeafPools));
}

MemoryManager::~MemoryManager() {
Expand Down
6 changes: 3 additions & 3 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,18 @@ struct MemoryManagerOptions {

/// If true, enable memory usage tracking in the default memory pool.
bool trackDefaultUsage{
config::globalConfig.enableMemoryUsageTrackInDefaultMemoryPool};
config::globalConfig().enableMemoryUsageTrackInDefaultMemoryPool};

/// If true, check the memory pool and usage leaks on destruction.
///
/// TODO: deprecate this flag after all the existing memory leak use cases
/// have been fixed.
bool checkUsageLeak{config::globalConfig.memoryLeakCheckEnabled};
bool checkUsageLeak{config::globalConfig().memoryLeakCheckEnabled};

/// If true, the memory pool will be running in debug mode to track the
/// allocation and free call stacks to detect the source of memory leak for
/// testing purpose.
bool debugEnabled{config::globalConfig.memoryPoolDebugEnabled};
bool debugEnabled{config::globalConfig().memoryPoolDebugEnabled};

/// Terminates the process and generates a core file on an allocation failure
bool coreOnAllocationFailureEnabled{false};
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/MemoryAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ void MemoryAllocator::useHugePages(
const ContiguousAllocation& data,
bool enable) {
#ifdef linux
if (!config::globalConfig.memoryUseHugepages) {
if (!config::globalConfig().memoryUseHugepages) {
return;
}
auto maybeRange = data.hugePageRange();
Expand Down
4 changes: 2 additions & 2 deletions velox/common/memory/MemoryAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ struct Stats {

template <typename Op>
void recordAllocate(int64_t bytes, int32_t count, Op op) {
if (config::globalConfig.timeAllocations) {
if (config::globalConfig().timeAllocations) {
auto index = sizeIndex(bytes);
velox::ClockTimer timer(sizes[index].allocateClocks);
op();
Expand All @@ -107,7 +107,7 @@ struct Stats {

template <typename Op>
void recordFree(int64_t bytes, Op op) {
if (config::globalConfig.timeAllocations) {
if (config::globalConfig().timeAllocations) {
auto index = sizeIndex(bytes);
ClockTimer timer(sizes[index].freeClocks);
op();
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ std::string MemoryPoolImpl::treeMemoryUsage(bool skipEmptyPool) const {
if (parent_ != nullptr) {
return parent_->treeMemoryUsage(skipEmptyPool);
}
if (config::globalConfig.suppressMemoryCapacityExceedingErrorMessage) {
if (config::globalConfig().suppressMemoryCapacityExceedingErrorMessage) {
return "";
}
std::stringstream out;
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {

/// If true, tracks the allocation and free call stacks to detect the source
/// of memory leak for testing purpose.
bool debugEnabled{config::globalConfig.memoryPoolDebugEnabled};
bool debugEnabled{config::globalConfig().memoryPoolDebugEnabled};

/// Terminates the process and generates a core file on an allocation
/// failure
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/MmapAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ MachinePageCount MmapAllocator::freeNonContiguousInternal(
ClockTimer timer(clocks);
pages = sizeClass->free(allocation);
}
if ((pages > 0) && config::globalConfig.timeAllocations) {
if ((pages > 0) && config::globalConfig().timeAllocations) {
// Increment the free time only if the allocation contained
// pages in the class. Note that size class indices in the
// allocator are not necessarily the same as in the stats.
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ class IndexSource {
velox::ContinueFuture& future) = 0;
};

virtual std::unique_ptr<LookupResultIterator> lookup(
virtual std::shared_ptr<LookupResultIterator> lookup(
const LookupRequest& request) = 0;

virtual std::unordered_map<std::string, RuntimeCounter> runtimeStats() = 0;
Expand Down
2 changes: 1 addition & 1 deletion velox/docs/functions/presto/map.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ Map Functions
``n`` must be a non-negative BIGINT value.::

SELECT map_top_n(map(ARRAY['a', 'b', 'c'], ARRAY[2, 3, 1]), 2) --- {'b' -> 3, 'a' -> 2}
SELECT map_top_n(map(ARRAY['a', 'b', 'c'], ARRAY[NULL, 3, NULL]), 2) --- {'b' -> 3, 'a' -> NULL}
SELECT map_top_n(map(ARRAY['a', 'b', 'c'], ARRAY[NULL, 3, NULL]), 2) --- {'b' -> 3, 'c' -> NULL}

.. function:: map_top_n_keys(map(K,V), n) -> array(K)

Expand Down
13 changes: 7 additions & 6 deletions velox/dwio/common/tests/LoggedExceptionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ namespace {
void testTraceCollectionSwitchControl(bool enabled) {
// Logged exception is system type of velox exception.
// Disable rate control in the test.
config::globalConfig.exceptionUserStacktraceRateLimitMs = 0;
config::globalConfig.exceptionSystemStacktraceRateLimitMs = 0;
config::globalConfig().exceptionUserStacktraceRateLimitMs = 0;
config::globalConfig().exceptionSystemStacktraceRateLimitMs = 0;

// NOTE: the user config should not affect the tracing behavior of system type
// of exception collection.
config::globalConfig.exceptionUserStacktraceEnabled = folly::Random::oneIn(2);
config::globalConfig.exceptionSystemStacktraceEnabled =
config::globalConfig().exceptionUserStacktraceEnabled =
folly::Random::oneIn(2);
config::globalConfig().exceptionSystemStacktraceEnabled =
enabled ? true : false;

try {
Expand All @@ -40,8 +41,8 @@ void testTraceCollectionSwitchControl(bool enabled) {
SCOPED_TRACE(fmt::format(
"enabled: {}, user flag: {}, sys flag: {}",
enabled,
config::globalConfig.exceptionUserStacktraceEnabled,
config::globalConfig.exceptionSystemStacktraceEnabled));
config::globalConfig().exceptionUserStacktraceEnabled,
config::globalConfig().exceptionSystemStacktraceEnabled));
ASSERT_TRUE(e.exceptionType() == VeloxException::Type::kSystem);
ASSERT_EQ(enabled, e.stackTrace() != nullptr);
}
Expand Down
8 changes: 3 additions & 5 deletions velox/exec/IndexLookupJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ IndexLookupJoin::IndexLookupJoin(
expressionEvaluator_(connectorQueryCtx_->expressionEvaluator()),
connector_(connector::getConnector(lookupTableHandle_->connectorId())),
joinNode_{joinNode} {
VELOX_CHECK_EQ(joinNode_->sources()[1], joinNode_->lookupSource());
duplicateJoinKeyCheck(joinNode_->leftKeys());
duplicateJoinKeyCheck(joinNode_->rightKeys());
}
Expand Down Expand Up @@ -159,8 +158,8 @@ void IndexLookupJoin::initLookupInput() {
lookupIndexOpt.has_value(),
"Lookup condition column {} is not found",
columnName);
// A lookup column can only be used in on lookup condition.
VELOX_CHECK(
// A lookup column can only be used in one lookup condition.
VELOX_CHECK_EQ(
lookupConditionColumnNames.count(columnName),
0,
"Lookup condition column {} from lookup table used in more than one lookup conditions",
Expand Down Expand Up @@ -454,7 +453,7 @@ RowVectorPtr IndexLookupJoin::produceOutputForLeftJoin() {
bits::fillBits(
rawLookupOutputNulls_,
numOutputRows,
numOutputMissedInputRows,
numOutputRows + numOutputMissedInputRows,
bits::kNull);
for (auto i = 0; i < numOutputMissedInputRows; ++i) {
rawProbeOutputRowIndices_[numOutputRows++] = ++lastProcessedInputRow;
Expand Down Expand Up @@ -509,7 +508,6 @@ RowVectorPtr IndexLookupJoin::produceRemainingOutputForLeftJoin() {
VELOX_CHECK_NULL(lookupResult_);
VELOX_CHECK(hasRemainingOutputForLeftJoin());
VELOX_CHECK_NULL(rawLookupInputHitIndices_);

prepareOutputRowMappings(outputBatchSize_);
VELOX_CHECK_NOT_NULL(rawLookupOutputNulls_);

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/IndexLookupJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class IndexLookupJoin : public Operator {
ContinueFuture lookupFuture_{ContinueFuture::makeEmpty()};
// Used to fetch lookup results for each input batch, and reset after
// processing all the outputs from the result.
std::unique_ptr<connector::IndexSource::LookupResultIterator>
std::shared_ptr<connector::IndexSource::LookupResultIterator>
lookupResultIter_;
// Used to store the lookup result fetched from 'lookupResultIter_' for output
// processing. We might split the output result into multiple output batches
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ void Operator::MemoryReclaimer::enterArbitration() {
}

Driver* const runningDriver = driverThreadCtx->driverCtx()->driver;
if (!config::globalConfig.memoryPoolCapacityTransferAcrossTasks) {
if (!config::globalConfig().memoryPoolCapacityTransferAcrossTasks) {
if (auto opDriver = ensureDriver()) {
// NOTE: the current running driver might not be the driver of the
// operator that requests memory arbitration. The reason is that an
Expand Down Expand Up @@ -680,7 +680,7 @@ void Operator::MemoryReclaimer::leaveArbitration() noexcept {
return;
}
Driver* const runningDriver = driverThreadCtx->driverCtx()->driver;
if (!config::globalConfig.memoryPoolCapacityTransferAcrossTasks) {
if (!config::globalConfig().memoryPoolCapacityTransferAcrossTasks) {
if (auto opDriver = ensureDriver()) {
VELOX_CHECK_EQ(
runningDriver->task()->taskId(),
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/fuzzer/FuzzerUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ void setupMemory(
int64_t allocatorCapacity,
int64_t arbitratorCapacity,
bool enableGlobalArbitration) {
config::globalConfig.enableMemoryUsageTrackInDefaultMemoryPool = true;
config::globalConfig.memoryLeakCheckEnabled = true;
config::globalConfig().enableMemoryUsageTrackInDefaultMemoryPool = true;
config::globalConfig().memoryLeakCheckEnabled = true;
facebook::velox::memory::SharedArbitrator::registerFactory();
facebook::velox::memory::MemoryManagerOptions options;
options.allocatorCapacity = allocatorCapacity;
Expand Down
Loading

0 comments on commit a54dcc4

Please sign in to comment.