Skip to content

Commit

Permalink
[enhancement](memtracker) Refactor mem tracker hierarchy (apache#13585)
Browse files Browse the repository at this point in the history
mem tracker can be logically divided into 4 layers: 1)process 2)type 3)query/load/compation task etc. 4)exec node etc.

type includes

enum Type {
        GLOBAL = 0,        // Life cycle is the same as the process, e.g. Cache and default Orphan
        QUERY = 1,         // Count the memory consumption of all Query tasks.
        LOAD = 2,          // Count the memory consumption of all Load tasks.
        COMPACTION = 3,    // Count the memory consumption of all Base and Cumulative tasks.
        SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks.
        CLONE = 5, // Count the memory consumption of all EngineCloneTask. Note: Memory that does not contain make/release snapshots.
        BATCHLOAD = 6,  // Count the memory consumption of all EngineBatchLoadTask.
        CONSISTENCY = 7 // Count the memory consumption of all EngineChecksumTask.
    }
Object pointers are no longer saved between each layer, and the values of process and each type are periodically aggregated.

other fix:

In [fix](memtracker) Fix transmit_tracker null pointer because phamp is not thread safe apache#13528, I tried to separate the memory that was manually abandoned in the query from the orphan mem tracker. But in the actual test, the accuracy of this part of the memory cannot be guaranteed, so put it back to the orphan mem tracker again.
  • Loading branch information
xinyiZzz authored Nov 8, 2022
1 parent e1654bc commit 0b945fe
Show file tree
Hide file tree
Showing 90 changed files with 720 additions and 1,436 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ Status HashTable::resize_buckets(int64_t num_buckets) {

int64_t old_num_buckets = _num_buckets;
int64_t delta_bytes = (num_buckets - old_num_buckets) * sizeof(Bucket);
Status st = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->check_limit(
Status st = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
delta_bytes);
if (!st) {
LOG_EVERY_N(WARNING, 100) << "resize bucket failed: " << st.to_string();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1535,7 +1535,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
size_t thread_slot_num = 0;
mem_consume = _scanner_mem_tracker->consumption();
// check limit for total memory and _scan_row_batches memory
if (mem_consume < (state->instance_mem_tracker()->limit() * 6) / 10 &&
if (mem_consume < (state->query_mem_tracker()->limit() * 6) / 10 &&
_scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2) {
thread_slot_num = max_thread - assigned_thread_num;
} else {
Expand Down
8 changes: 4 additions & 4 deletions be/src/exec/partitioned_aggregation_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -911,14 +911,14 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
<< "Backend: " << BackendOptions::get_localhost() << ", "
<< "fragment: " << print_id(state_->fragment_instance_id()) << " "
<< "Used: "
<< thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->consumption()
<< thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->consumption()
<< ", Limit: "
<< thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->limit() << ". "
<< thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->limit() << ". "
<< "You can change the limit by session variable exec_mem_limit.";
string details = Substitute(str.str(), _id, tuple_data_size);
*status = thread_context()
->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()
->mem_limit_exceeded(state_, details, tuple_data_size);
->_thread_mem_tracker_mgr->limiter_mem_tracker()
->fragment_mem_limit_exceeded(state_, details, tuple_data_size);
return nullptr;
}
memset(tuple_data, 0, fixed_size);
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/partitioned_hash_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ Status PartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state,
MAX_EXPR_VALUES_ARRAY_SIZE / expr_values_bytes_per_row_));

int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_);
if (UNLIKELY(!thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->check_limit(
if (UNLIKELY(!thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
mem_usage))) {
capacity_ = 0;
string details = Substitute(
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int
: _parent(parent), _index_channel(index_channel), _node_id(node_id) {
_node_channel_tracker = std::make_unique<MemTracker>(fmt::format(
"NodeChannel:indexID={}:threadId={}", std::to_string(_index_channel->_index_id),
thread_context()->thread_id_str()));
thread_context()->get_thread_id()));
}

NodeChannel::~NodeChannel() noexcept {
Expand Down Expand Up @@ -624,15 +624,15 @@ void NodeChannel::try_send_batch(RuntimeState* state) {
_add_batch_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
_add_batch_closure->cntl.http_request().set_content_type("application/json");
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
_brpc_http_stub->tablet_writer_add_batch_by_http(&_add_batch_closure->cntl, NULL,
&_add_batch_closure->result,
_add_batch_closure);
}
} else {
_add_batch_closure->cntl.http_request().Clear();
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
_stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
&_add_batch_closure->result, _add_batch_closure);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class ReusableClosure final : public google::protobuf::Closure {
~ReusableClosure() override {
// shouldn't delete when Run() is calling or going to be called, wait for current Run() done.
join();
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
cntl.Reset();
}

Expand Down Expand Up @@ -124,7 +124,7 @@ class ReusableClosure final : public google::protobuf::Closure {

// plz follow this order: reset() -> set_in_flight() -> send brpc batch
void reset() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
cntl.Reset();
cid = cntl.call_id();
}
Expand Down
66 changes: 40 additions & 26 deletions be/src/http/default_path_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ void config_handler(const WebPageHandler::ArgumentMap& args, std::stringstream*
// Registered to handle "/memz", and prints out memory allocation statistics.
void mem_usage_handler(const WebPageHandler::ArgumentMap& args, std::stringstream* output) {
(*output) << "<pre>"
<< "Mem Limit: "
<< PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->limit(),
TUnit::BYTES)
<< "Mem Limit: " << PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
<< std::endl
<< "Physical Mem From Perf: "
<< PrettyPrinter::print(PerfCounters::get_vm_rss(), TUnit::BYTES) << std::endl
Expand Down Expand Up @@ -121,50 +119,66 @@ void display_tablets_callback(const WebPageHandler::ArgumentMap& args, EasyJson*
// Registered to handle "/mem_tracker", and prints out memory tracker information.
void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstream* output) {
(*output) << "<h1>Memory usage by subsystem</h1>\n";
std::vector<MemTracker::Snapshot> snapshots;
auto iter = args.find("type");
if (iter != args.end()) {
if (iter->second == "global") {
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL);
} else if (iter->second == "query") {
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::QUERY);
} else if (iter->second == "load") {
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::LOAD);
} else if (iter->second == "compaction") {
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::COMPACTION);
} else if (iter->second == "schema_change") {
MemTrackerLimiter::make_type_snapshots(&snapshots,
MemTrackerLimiter::Type::SCHEMA_CHANGE);
} else if (iter->second == "clone") {
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::CLONE);
} else if (iter->second == "batch_load") {
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::BATCHLOAD);
} else if (iter->second == "consistency") {
MemTrackerLimiter::make_type_snapshots(&snapshots,
MemTrackerLimiter::Type::CONSISTENCY);
}
} else {
(*output) << "<h4>*Note: (see documentation for details)</h4>\n";
(*output) << "<h4> 1.`/mem_tracker?type=global` to view the memory statistics of each "
"type</h4>\n";
(*output) << "<h4> 2.`/mem_tracker` counts virtual memory, which is equal to `Actual "
"memory used` in `/memz`</h4>\n";
(*output) << "<h4> 3.`process` is equal to the sum of all types of memory, "
"`/mem_tracker` can be logically divided into 4 layers: 1)`process` 2)`type` "
"3)`query/load/compation task etc.` 4)`exec node etc.`</h4>\n";
MemTrackerLimiter::make_process_snapshots(&snapshots);
}

(*output) << "<table data-toggle='table' "
" data-pagination='true' "
" data-search='true' "
" class='table table-striped'>\n";
(*output) << "<thead><tr>"
"<th data-sortable='true'>Level</th>"
"<th data-sortable='true'>Type</th>"
"<th data-sortable='true'>Label</th>"
"<th>Parent</th>"
"<th data-sortable='true'>Parent Label</th>"
"<th>Limit</th>"
"<th data-sortable='true' "
">Current Consumption(Bytes)</th>"
"<th>Current Consumption(Normalize)</th>"
"<th data-sortable='true' "
">Peak Consumption(Bytes)</th>"
"<th>Peak Consumption(Normalize)</th>"
"<th data-sortable='true' "
">Child Count</th>"
"</tr></thead>";
(*output) << "<tbody>\n";

size_t upper_level;
size_t cur_level = 1;
// the level equal or lower than upper_level will show in web page
auto iter = args.find("upper_level");
if (iter != args.end()) {
upper_level = std::stol(iter->second);
} else {
upper_level = 3;
}

std::vector<MemTracker::Snapshot> snapshots;
ExecEnv::GetInstance()->process_mem_tracker()->make_snapshot(&snapshots, cur_level,
upper_level);
MemTracker::make_global_mem_tracker_snapshot(&snapshots);
for (const auto& item : snapshots) {
string limit_str = item.limit == -1 ? "none" : AccurateItoaKMGT(item.limit);
string current_consumption_normalize = AccurateItoaKMGT(item.cur_consumption);
string peak_consumption_normalize = AccurateItoaKMGT(item.peak_consumption);
(*output) << strings::Substitute(
"<tr><td>$0</td><td>$1</td><td>$2</td><td>$3</td><td>$4</td><td>$5</td><td>$6</"
"td><td>$7</td><td>$8</td></tr>\n",
item.level, item.label, item.parent, limit_str, item.cur_consumption,
current_consumption_normalize, item.peak_consumption, peak_consumption_normalize,
item.child_count);
"td><td>$7</td></tr>\n",
item.type, item.label, item.parent_label, limit_str, item.cur_consumption,
current_consumption_normalize, item.peak_consumption, peak_consumption_normalize);
}
(*output) << "</tbody></table>\n";
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Status BaseCompaction::execute_compact_impl() {
return Status::OLAPInternalError(OLAP_ERR_BE_CLONE_OCCURRED);
}

SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::COMPACTION);
SCOPED_ATTACH_TASK(_mem_tracker);

// 2. do base compaction, merge rowsets
int64_t permits = get_compaction_permits();
Expand Down
8 changes: 1 addition & 7 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,7 @@ Compaction::Compaction(TabletSharedPtr tablet, const std::string& label)
_input_rowsets_size(0),
_input_row_num(0),
_state(CompactionState::INITED) {
#ifndef BE_TEST
_mem_tracker = std::make_shared<MemTrackerLimiter>(
-1, label, StorageEngine::instance()->compaction_mem_tracker());
_mem_tracker->enable_reset_zero();
#else
_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, label);
#endif
_mem_tracker = std::make_shared<MemTrackerLimiter>(MemTrackerLimiter::Type::COMPACTION, label);
}

Compaction::~Compaction() {}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Status CumulativeCompaction::execute_compact_impl() {
return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_CLONE_OCCURRED);
}

SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::COMPACTION);
SCOPED_ATTACH_TASK(_mem_tracker);

// 3. do cumulative compaction, merge rowsets
int64_t permits = get_compaction_permits();
Expand Down
12 changes: 12 additions & 0 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "olap/schema.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "runtime/load_channel_mgr.h"
#include "runtime/row_batch.h"
#include "runtime/tuple_row.h"
#include "service/backend_options.h"
Expand Down Expand Up @@ -283,12 +284,23 @@ void DeltaWriter::_reset_mem_table() {
if (_tablet->enable_unique_key_merge_on_write() && _delete_bitmap == nullptr) {
_delete_bitmap.reset(new DeleteBitmap(_tablet->tablet_id()));
}
#ifndef BE_TEST
auto mem_table_insert_tracker = std::make_shared<MemTracker>(
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
std::to_string(tablet_id()), _mem_table_num, _load_id.to_string()),
nullptr, ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set());
auto mem_table_flush_tracker = std::make_shared<MemTracker>(
fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
std::to_string(tablet_id()), _mem_table_num++, _load_id.to_string()),
nullptr, ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set());
#else
auto mem_table_insert_tracker = std::make_shared<MemTracker>(
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
std::to_string(tablet_id()), _mem_table_num, _load_id.to_string()));
auto mem_table_flush_tracker = std::make_shared<MemTracker>(
fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
std::to_string(tablet_id()), _mem_table_num++, _load_id.to_string()));
#endif
{
std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
_mem_table_tracker.push_back(mem_table_insert_tracker);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity,
_num_shards(num_shards),
_shards(nullptr),
_last_id(1) {
_mem_tracker = std::make_unique<MemTrackerLimiter>(-1, name);
_mem_tracker = std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, name);
CHECK(num_shards > 0) << "num_shards cannot be 0";
CHECK_EQ((num_shards & (num_shards - 1)), 0)
<< "num_shards should be power of two, but got " << num_shards;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ MemTable::~MemTable() {
_flush_mem_tracker->set_consumption(0);
DCHECK_EQ(_insert_mem_tracker->consumption(), 0)
<< std::endl
<< MemTracker::log_usage(_insert_mem_tracker->make_snapshot(0));
<< MemTracker::log_usage(_insert_mem_tracker->make_snapshot());
DCHECK_EQ(_flush_mem_tracker->consumption(), 0);
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ Status StorageEngine::start_bg_threads() {
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "path_scan_thread",
[this, data_dir]() {
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
this->_path_scan_thread_callback(data_dir);
},
&path_scan_thread));
Expand All @@ -135,7 +135,7 @@ Status StorageEngine::start_bg_threads() {
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "path_gc_thread",
[this, data_dir]() {
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
this->_path_gc_thread_callback(data_dir);
},
&path_gc_thread));
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,7 @@ Status BetaRowsetWriter::_check_correctness(std::unique_ptr<OlapReaderStatistics
}

Status BetaRowsetWriter::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) {
SCOPED_ATTACH_TASK(StorageEngine::instance()->segcompaction_mem_tracker(),
ThreadContext::TaskType::COMPACTION);
SCOPED_CONSUME_MEM_TRACKER(StorageEngine::instance()->segcompaction_mem_tracker());
// throttle segcompaction task if memory depleted.
if (MemTrackerLimiter::sys_mem_exceed_limit_check(GB_EXCHANGE_BYTE)) {
LOG(WARNING) << "skip segcompaction due to memory shortage";
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1596,10 +1596,10 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
}

RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
if (!_mem_tracker->check_limit(_memory_limitation, new_block->allocated_bytes())) {
if (_mem_tracker->consumption() + new_block->allocated_bytes() > _memory_limitation) {
RETURN_IF_ERROR(create_rowset());

if (!_mem_tracker->check_limit(_memory_limitation, new_block->allocated_bytes())) {
if (_mem_tracker->consumption() + new_block->allocated_bytes() > _memory_limitation) {
LOG(WARNING) << "Memory limitation is too small for Schema Change."
<< " _memory_limitation=" << _memory_limitation
<< ", new_block->allocated_bytes()=" << new_block->allocated_bytes()
Expand Down
Loading

0 comments on commit 0b945fe

Please sign in to comment.