Skip to content

Commit

Permalink
Async gc. (#4362)
Browse files Browse the repository at this point in the history
* Add GC.

* Impl gc.

* Fix linking.

* Using thread pool.

* Using try_dequeue.

* Stop workers when destruct.

* Set default worker size to 8.

* Add repeat task for all.

* Update flags.

* Add comments.

* Fix nightly.
  • Loading branch information
CPWstatic authored Jun 30, 2022
1 parent 8503c92 commit 1de182c
Show file tree
Hide file tree
Showing 20 changed files with 119 additions and 25 deletions.
20 changes: 0 additions & 20 deletions .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -194,26 +194,6 @@ jobs:
with:
files: fastcov.info
fail_ci_if_error: false
- name: Setup concurrently mode cluster
run: |
make CONTAINERIZED=true ENABLE_SSL=true CA_SIGNED=true QUERY_CONCURRENTLY=true up
working-directory: tests/
timeout-minutes: 2
- name: Pytest on concurrently mode server
run: |
make RM_DIR=false DEBUG=false J=8 test
working-directory: tests/
timeout-minutes: 15
- name: TCK on concurrently mode server
run: |
make RM_DIR=false DEBUG=false J=8 tck
working-directory: tests/
timeout-minutes: 60
- name: Down concurrently mode cluster
run: |
make RM_DIR=false down
working-directory: tests/
timeout-minutes: 2
- name: Upload logs
uses: actions/upload-artifact@v2
if: ${{ failure() }}
Expand Down
1 change: 1 addition & 0 deletions src/common/expression/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ set(expression_test_common_libs
$<TARGET_OBJECTS:graph_obj>
$<TARGET_OBJECTS:ssl_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:gc_obj>
)


Expand Down
16 changes: 16 additions & 0 deletions src/common/thread/GenericThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ class GenericThreadPool final : public boost::noncopyable, public nebula::cpp::N
*/
void purgeTimerTask(uint64_t id);

/**
* To add a repeated timer task for all workers which will be executed in each period.
* @ms interval in milliseconds
* @task a callable object
* @args variadic arguments
* @return ID of the added task, unique for this worker
*/
template <typename F, typename... Args>
void addRepeatTaskForAll(size_t ms, F &&f, Args &&... args);

private:
size_t nrThreads_{0};
std::atomic<size_t> nextThread_{0};
Expand Down Expand Up @@ -156,6 +166,12 @@ uint64_t GenericThreadPool::addRepeatTask(size_t ms, F &&f, Args &&... args) {
return ((idx << GenericWorker::TIMER_ID_BITS) | id);
}

template <typename F, typename... Args>
void GenericThreadPool::addRepeatTaskForAll(size_t ms, F &&f, Args &&... args) {
for (auto idx = 0UL; idx < nrThreads_; ++idx) {
pool_[idx]->addRepeatTask(ms, std::forward<F>(f), std::forward<Args>(args)...);
}
}
} // namespace thread
} // namespace nebula

Expand Down
2 changes: 2 additions & 0 deletions src/daemons/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:graph_obj>
$<TARGET_OBJECTS:ft_es_graph_adapter_obj>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:gc_obj>
${common_deps}
LIBRARIES
${PROXYGEN_LIBRARIES}
Expand Down Expand Up @@ -245,6 +246,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:meta_v2_thrift_obj>
$<TARGET_OBJECTS:gc_obj>
${storage_meta_deps}
${common_deps}
LIBRARIES
Expand Down
1 change: 1 addition & 0 deletions src/graph/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ nebula_add_subdirectory(stats)
nebula_add_subdirectory(util)
nebula_add_subdirectory(validator)
nebula_add_subdirectory(visitor)
nebula_add_subdirectory(gc)
4 changes: 0 additions & 4 deletions src/graph/context/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
# Copyright (c) 2020 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.

nebula_add_library(
graph_context_obj OBJECT
QueryContext.cpp
Expand Down
10 changes: 9 additions & 1 deletion src/graph/context/ExecutionContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

#include "graph/context/ExecutionContext.h"

#include "graph/gc/GC.h"
#include "graph/service/GraphFlags.h"

namespace nebula {
namespace graph {
constexpr int64_t ExecutionContext::kLatestVersion;
Expand All @@ -23,7 +26,12 @@ void ExecutionContext::setResult(const std::string& name, Result&& result) {
}

void ExecutionContext::dropResult(const std::string& name) {
valueMap_[name].clear();
auto& val = valueMap_[name];
if (FLAGS_enable_async_gc) {
GC::instance().clear(std::move(val));
} else {
val.clear();
}
}

size_t ExecutionContext::numVersions(const std::string& name) const {
Expand Down
1 change: 1 addition & 0 deletions src/graph/context/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ SET(CONTEXT_TEST_LIBS
$<TARGET_OBJECTS:graph_stats_obj>
$<TARGET_OBJECTS:meta_client_stats_obj>
$<TARGET_OBJECTS:storage_client_stats_obj>
$<TARGET_OBJECTS:gc_obj>
)

if(ENABLE_STANDALONE_VERSION)
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ SET(EXEC_QUERY_TEST_OBJS
$<TARGET_OBJECTS:graph_stats_obj>
$<TARGET_OBJECTS:meta_client_stats_obj>
$<TARGET_OBJECTS:storage_client_stats_obj>
$<TARGET_OBJECTS:gc_obj>
)

if(ENABLE_STANDALONE_VERSION)
Expand Down
4 changes: 4 additions & 0 deletions src/graph/gc/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
nebula_add_library(
gc_obj OBJECT
GC.cpp
)
34 changes: 34 additions & 0 deletions src/graph/gc/GC.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2022 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#include "graph/gc/GC.h"

#include "graph/service/GraphFlags.h"

namespace nebula {
namespace graph {
GC& GC::instance() {
static GC gc;
return gc;
}

GC::GC() {
if (FLAGS_gc_worker_size == 0) {
workers_.start(std::thread::hardware_concurrency(), "GC");
} else {
workers_.start(FLAGS_gc_worker_size, "GC");
}
workers_.addRepeatTaskForAll(50, &GC::periodicTask, this);
}

void GC::clear(std::vector<Result>&& garbage) {
queue_.enqueue(std::move(garbage));
}

void GC::periodicTask() {
// TODO: maybe could release by batch
queue_.try_dequeue();
}
} // namespace graph
} // namespace nebula
36 changes: 36 additions & 0 deletions src/graph/gc/GC.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2022 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#ifndef GRAPH_GC_H_
#define GRAPH_GC_H_

#include "common/base/Base.h"
#include "common/thread/GenericThreadPool.h"
#include "graph/context/Result.h"

namespace nebula {
namespace graph {

// Clean the unused memory on background threads, this is helpful
// for big queries since the memory release of interim results may
// cost too much time.
class GC {
public:
static GC& instance();

~GC() {
workers_.stop();
}

void clear(std::vector<Result>&& garbage);

private:
GC();
void periodicTask();
folly::UMPMCQueue<std::vector<Result>, false> queue_;
thread::GenericThreadPool workers_;
};
} // namespace graph
} // namespace nebula
#endif // GRAPH_GC_H_
1 change: 1 addition & 0 deletions src/graph/optimizer/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ set(OPTIMIZER_TEST_LIB
$<TARGET_OBJECTS:graph_stats_obj>
$<TARGET_OBJECTS:meta_client_stats_obj>
$<TARGET_OBJECTS:storage_client_stats_obj>
$<TARGET_OBJECTS:gc_obj>
)

if(ENABLE_STANDALONE_VERSION)
Expand Down
1 change: 1 addition & 0 deletions src/graph/planner/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ nebula_add_test(
$<TARGET_OBJECTS:idgenerator_obj>
$<TARGET_OBJECTS:graph_context_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:gc_obj>
LIBRARIES
gtest
${PROXYGEN_LIBRARIES}
Expand Down
6 changes: 6 additions & 0 deletions src/graph/service/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,9 @@ DEFINE_int32(min_batch_size,
"The min batch size for handling dataset in multi job mode, only enabled when "
"max_job_size is greater than 1.");
DEFINE_int32(max_job_size, 1, "The max job size in multi job mode.");

DEFINE_bool(enable_async_gc, false, "If enable async gc.");
DEFINE_uint32(
gc_worker_size,
0,
"Background garbage clean workers, default number is 0 which means using hardware core size.");
2 changes: 2 additions & 0 deletions src/graph/service/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ DECLARE_int32(num_rows_to_check_memory);
DECLARE_int32(min_batch_size);
DECLARE_int32(max_job_size);

DECLARE_bool(enable_async_gc);
DECLARE_uint32(gc_worker_size);
#endif // GRAPH_GRAPHFLAGS_H_
1 change: 1 addition & 0 deletions src/graph/util/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ nebula_add_test(
$<TARGET_OBJECTS:graph_stats_obj>
$<TARGET_OBJECTS:meta_client_stats_obj>
$<TARGET_OBJECTS:storage_client_stats_obj>
$<TARGET_OBJECTS:gc_obj>
LIBRARIES
gtest
gtest_main
Expand Down
1 change: 1 addition & 0 deletions src/graph/validator/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ set(VALIDATOR_TEST_LIBS
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:version_obj>
$<TARGET_OBJECTS:ssl_obj>
$<TARGET_OBJECTS:gc_obj>
)

if(ENABLE_STANDALONE_VERSION)
Expand Down
1 change: 1 addition & 0 deletions src/graph/visitor/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ nebula_add_test(
$<TARGET_OBJECTS:version_obj>
$<TARGET_OBJECTS:ssl_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:gc_obj>
LIBRARIES
gtest
${THRIFT_LIBRARIES}
Expand Down
1 change: 1 addition & 0 deletions src/parser/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ set(PARSER_TEST_LIBS
$<TARGET_OBJECTS:ssl_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:plan_obj>
$<TARGET_OBJECTS:gc_obj>
)

if(ENABLE_STANDALONE_VERSION)
Expand Down

0 comments on commit 1de182c

Please sign in to comment.