Skip to content

Commit

Permalink
add metrics for resource group (#8071)
Browse files Browse the repository at this point in the history
close #8053
  • Loading branch information
guo-shaoge authored Sep 13, 2023
1 parent d1a28a3 commit 987aa6e
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 9 deletions.
1 change: 0 additions & 1 deletion dbms/src/Common/TiFlashMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,4 @@ void TiFlashMetrics::removeReplicaSyncRUCounter(UInt32 keyspace_id)
registered_keyspace_sync_replica_ru_family->Remove(itr->second);
registered_keyspace_sync_replica_ru.erase(itr);
}

} // namespace DB
94 changes: 93 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,16 @@ namespace DB
"system calls duration in seconds", \
Histogram, \
F(type_fsync, {{"type", "fsync"}}, ExpBuckets{0.0001, 2, 20})) \
M(tiflash_storage_delta_index_cache, "", Counter, F(type_hit, {"type", "hit"}), F(type_miss, {"type", "miss"}))
M(tiflash_storage_delta_index_cache, "", Counter, F(type_hit, {"type", "hit"}), F(type_miss, {"type", "miss"})) \
M(tiflash_resource_group, \
"meta infos of resource groups", \
Gauge, \
F(type_remaining_tokens, {"type", "remaining_tokens"}), \
F(type_avg_speed, {"type", "avg_speed"}), \
F(type_total_consumption, {"type", "total_consumption"}), \
F(type_bucket_fill_rate, {"type", "bucket_fill_rate"}), \
F(type_bucket_capacity, {"type", "bucket_capacity"}), \
F(type_fetch_tokens_from_gac_count, {"type", "fetch_tokens_from_gac_count"}))


/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
Expand Down Expand Up @@ -743,6 +752,8 @@ struct EqualWidthBuckets
}
};

static const String METRIC_RESOURCE_GROUP_STR = "resource_group";

template <typename T>
struct MetricFamilyTrait
{
Expand All @@ -756,6 +767,15 @@ struct MetricFamilyTrait<prometheus::Counter>
{
return family.Add(std::forward<ArgType>(arg));
}
static auto & add(
prometheus::Family<prometheus::Counter> & family,
const String & resource_group_name,
ArgType && arg)
{
std::map<String, String> args_map = {std::forward<ArgType>(arg)};
args_map[METRIC_RESOURCE_GROUP_STR] = resource_group_name;
return family.Add(args_map);
}
};
template <>
struct MetricFamilyTrait<prometheus::Gauge>
Expand All @@ -766,6 +786,15 @@ struct MetricFamilyTrait<prometheus::Gauge>
{
return family.Add(std::forward<ArgType>(arg));
}
static auto & add(
prometheus::Family<prometheus::Gauge> & family,
const String & resource_group_name,
ArgType && arg)
{
std::map<String, String> args_map = {std::forward<ArgType>(arg)};
args_map[METRIC_RESOURCE_GROUP_STR] = resource_group_name;
return family.Add(args_map);
}
};
template <>
struct MetricFamilyTrait<prometheus::Histogram>
Expand All @@ -776,6 +805,15 @@ struct MetricFamilyTrait<prometheus::Histogram>
{
return family.Add(std::move(std::get<0>(arg)), std::move(std::get<1>(arg)));
}
static auto & add(
prometheus::Family<prometheus::Histogram> & family,
const String & resource_group_name,
ArgType && arg)
{
std::map<String, String> args_map = std::get<0>(arg);
args_map[METRIC_RESOURCE_GROUP_STR] = resource_group_name;
return family.Add(args_map, std::move(std::get<1>(arg)));
}
};

template <typename T>
Expand All @@ -790,7 +828,9 @@ struct MetricFamily
const std::string & help,
std::initializer_list<MetricArgType> args)
{
store_args = args;
auto & family = MetricTrait::build().Name(name).Help(help).Register(registry);
store_family = &family;

metrics.reserve(args.size() ? args.size() : 1);
for (auto arg : args)
Expand All @@ -806,9 +846,39 @@ struct MetricFamily
}

T & get(size_t idx = 0) { return *(metrics[idx]); }
T & get(size_t idx, const String & resource_group_name)
{
if (metrics_map.find(resource_group_name) == metrics_map.end())
{
addMetricsForResourceGroup(resource_group_name);
}
return *(metrics_map[resource_group_name][idx]);
}

private:
void addMetricsForResourceGroup(const String & resource_group_name)
{
std::vector<T *> metrics_temp;

for (auto arg : store_args)
{
auto & metric = MetricTrait::add(*store_family, resource_group_name, std::forward<MetricArgType>(arg));
metrics_temp.emplace_back(&metric);
}

if (store_args.size() == 0)
{
auto & metric = MetricTrait::add(*store_family, resource_group_name, MetricArgType{});
metrics_temp.emplace_back(&metric);
}
metrics_map[resource_group_name] = metrics_temp;
}

std::vector<T *> metrics;
prometheus::Family<T> * store_family;
std::vector<MetricArgType> store_args;
// <resource_group_name, metrics>
std::unordered_map<String, std::vector<T *>> metrics_map;
};

/// Centralized registry of TiFlash metrics.
Expand Down Expand Up @@ -881,21 +951,43 @@ APPLY_FOR_METRICS(MAKE_METRIC_ENUM_M, MAKE_METRIC_ENUM_F)

// NOLINTNEXTLINE(bugprone-reserved-identifier)
#define __GET_METRIC_MACRO(_1, _2, NAME, ...) NAME
// NOLINTNEXTLINE(bugprone-reserved-identifier)
#define __GET_RESOURCE_GROUP_METRIC_MACRO(_1, _2, _3, NAME, ...) NAME

#ifndef GTEST_TIFLASH_METRICS
// NOLINTNEXTLINE(bugprone-reserved-identifier)
#define __GET_METRIC_0(family) TiFlashMetrics::instance().family.get()
// NOLINTNEXTLINE(bugprone-reserved-identifier)
#define __GET_METRIC_1(family, metric) TiFlashMetrics::instance().family.get(family##_metrics::metric)
// NOLINTNEXTLINE(bugprone-reserved-identifier)
#define __GET_RESOURCE_GROUP_METRIC_0(family, resource_group) TiFlashMetrics::instance().family.get(0, resource_group)
// NOLINTNEXTLINE(bugprone-reserved-identifier)
#define __GET_RESOURCE_GROUP_METRIC_1(family, metric, resource_group) \
TiFlashMetrics::instance().family.get(family##_metrics::metric, resource_group)
#else
// NOLINTNEXTLINE(bugprone-reserved-identifier)
#define __GET_METRIC_0(family) TestMetrics::instance().family.get()
// NOLINTNEXTLINE(bugprone-reserved-identifier)
#define __GET_METRIC_1(family, metric) TestMetrics::instance().family.get(family##_metrics::metric)
// NOLINTNEXTLINE(bugprone-reserved-identifier)
#define __GET_RESOURCE_GROUP_METRIC_0(family, resource_group) TestMetrics::instance().family.get(0, resource_group)
// NOLINTNEXTLINE(bugprone-reserved-identifier)
#define __GET_RESOURCE_GROUP_METRIC_1(family, metric, resource_group) \
TestMetrics::instance().family.get(family##_metrics::metric, resource_group)
#endif

#define GET_METRIC(...) \
__GET_METRIC_MACRO(__VA_ARGS__, __GET_METRIC_1, __GET_METRIC_0) \
(__VA_ARGS__)

#define GET_RESOURCE_GROUP_METRIC(...) \
__GET_RESOURCE_GROUP_METRIC_MACRO( \
__VA_ARGS__, \
__GET_RESOURCE_GROUP_METRIC_1, \
__GET_RESOURCE_GROUP_METRIC_0, \
__GET_METRIC_0) \
(__VA_ARGS__)

#define UPDATE_CUR_AND_MAX_METRIC(family, metric, metric_max) \
GET_METRIC(family, metric).Increment(); \
GET_METRIC(family, metric_max) \
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ void LocalAdmissionController::startBackgroudJob()
std::terminate();
}

auto last_metric_time_point = std::chrono::steady_clock::now();
while (!stopped.load())
{
std::function<void()> local_refill_token_callback = nullptr;
Expand All @@ -71,6 +72,14 @@ void LocalAdmissionController::startBackgroudJob()
std::unique_lock<std::mutex> lock(mu);
local_refill_token_callback = refill_token_callback;

auto now = std::chrono::steady_clock::now();
if (now - last_metric_time_point >= COLLECT_METRIC_INTERVAL)
{
last_metric_time_point = now;
for (const auto & resource_group : resource_groups)
resource_group.second->collectMetrics();
}

if (low_token_resource_groups.empty())
{
fetch_token_periodically = true;
Expand Down Expand Up @@ -364,7 +373,7 @@ void LocalAdmissionController::doWatch()
{
auto stream = etcd_client->watch(&watch_gac_grpc_context);
auto watch_req = setupWatchReq();
LOG_DEBUG(log, "watch req: {}", watch_req.DebugString());
LOG_DEBUG(log, "watchGAC req: {}", watch_req.DebugString());
const bool write_ok = stream->Write(watch_req);
if (!write_ok)
{
Expand Down
26 changes: 20 additions & 6 deletions dbms/src/Flash/ResourceControl/LocalAdmissionController.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Executor/toRU.h>
#include <Flash/Mpp/MPPTaskManager.h>
#include <Flash/Pipeline/Schedule/Tasks/Task.h>
Expand All @@ -27,6 +28,7 @@
#include <pingcap/kv/Cluster.h>

#include <atomic>
#include <magic_enum.hpp>
#include <memory>
#include <mutex>

Expand Down Expand Up @@ -172,12 +174,6 @@ class ResourceGroup final : private boost::noncopyable
return !burstable && bucket->lowToken();
}

double getRU() const
{
std::lock_guard lock(mu);
return bucket->peek();
}

// Return how many tokens should acquire from GAC for the next n seconds.
double getAcquireRUNum(uint32_t n, double amplification) const
{
Expand Down Expand Up @@ -305,6 +301,7 @@ class ResourceGroup final : private boost::noncopyable
std::lock_guard lock(mu);
auto ori = ru_consumption_delta;
ru_consumption_delta = 0.0;
total_consumption += ori;
return ori;
}

Expand All @@ -320,6 +317,7 @@ class ResourceGroup final : private boost::noncopyable
std::lock_guard lock(mu);
assert(last_fetch_tokens_from_gac_timepoint <= tp);
last_fetch_tokens_from_gac_timepoint = tp;
++fetch_tokens_from_gac_count;
}

bool inTrickleModeLease(const std::chrono::steady_clock::time_point & tp)
Expand All @@ -328,6 +326,19 @@ class ResourceGroup final : private boost::noncopyable
return bucket_mode == trickle_mode && tp < stop_trickle_timepoint;
}

void collectMetrics() const
{
std::lock_guard lock(mu);
const auto & config = bucket->getConfig();
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_remaining_tokens, name).Set(config.tokens);
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_avg_speed, name).Set(bucket->getAvgSpeedPerSec());
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_total_consumption, name).Set(total_consumption);
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_bucket_fill_rate, name).Set(config.fill_rate);
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_bucket_capacity, name).Set(config.capacity);
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_fetch_tokens_from_gac_count, name)
.Set(fetch_tokens_from_gac_count);
}

const std::string name;

uint32_t user_priority = 0;
Expand All @@ -354,6 +365,8 @@ class ResourceGroup final : private boost::noncopyable
std::chrono::time_point<std::chrono::steady_clock> last_fetch_tokens_from_gac_timepoint
= std::chrono::steady_clock::now();
std::chrono::time_point<std::chrono::steady_clock> stop_trickle_timepoint = std::chrono::steady_clock::now();
uint64_t fetch_tokens_from_gac_count = 0;
double total_consumption = 0.0;
};

using ResourceGroupPtr = std::shared_ptr<ResourceGroup>;
Expand Down Expand Up @@ -463,6 +476,7 @@ class LocalAdmissionController final : private boost::noncopyable
// If we cannot get GAC resp for DEGRADE_MODE_DURATION seconds, enter degrade mode.
static constexpr auto DEGRADE_MODE_DURATION = std::chrono::seconds(120);
static constexpr auto TARGET_REQUEST_PERIOD_MS = std::chrono::milliseconds(5000);
static constexpr auto COLLECT_METRIC_INTERVAL = std::chrono::seconds(5);
static constexpr double ACQUIRE_RU_AMPLIFICATION = 1.1;

static const std::string GAC_RESOURCE_GROUP_ETCD_PATH;
Expand Down
Loading

0 comments on commit 987aa6e

Please sign in to comment.