From 4b368631626c042e3512bdb6aeefae264c54225d Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Thu, 21 Jul 2022 14:07:08 +0800 Subject: [PATCH 01/15] fix issue 5401 Signed-off-by: Lloyd-Pottiger --- dbms/src/Server/StorageConfigParser.cpp | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/dbms/src/Server/StorageConfigParser.cpp b/dbms/src/Server/StorageConfigParser.cpp index d43ccb850f1..e8db575912c 100644 --- a/dbms/src/Server/StorageConfigParser.cpp +++ b/dbms/src/Server/StorageConfigParser.cpp @@ -455,42 +455,42 @@ UInt64 StorageIORateLimitConfig::totalWeight() const UInt64 StorageIORateLimitConfig::getFgWriteMaxBytesPerSec() const { - if (totalWeight() <= 0 || writeWeight() <= 0) + if (writeWeight() <= 0 || totalWeight() <= 0) { return 0; } - return use_max_bytes_per_sec ? max_bytes_per_sec / totalWeight() * fg_write_weight - : max_write_bytes_per_sec / writeWeight() * fg_write_weight; + return use_max_bytes_per_sec ? static_cast(1.0 * max_bytes_per_sec / totalWeight() * fg_write_weight) + : static_cast(1.0 * max_write_bytes_per_sec / writeWeight() * fg_write_weight); } UInt64 StorageIORateLimitConfig::getBgWriteMaxBytesPerSec() const { - if (totalWeight() <= 0 || writeWeight() <= 0) + if (writeWeight() <= 0 || totalWeight() <= 0) { return 0; } - return use_max_bytes_per_sec ? max_bytes_per_sec / totalWeight() * bg_write_weight - : max_write_bytes_per_sec / writeWeight() * bg_write_weight; + return use_max_bytes_per_sec ? static_cast(1.0 * max_bytes_per_sec / totalWeight() * bg_write_weight) + : static_cast(1.0 * max_write_bytes_per_sec / writeWeight() * bg_write_weight); } UInt64 StorageIORateLimitConfig::getFgReadMaxBytesPerSec() const { - if (totalWeight() <= 0 || readWeight() <= 0) + if (readWeight() <= 0 || totalWeight() <= 0) { return 0; } - return use_max_bytes_per_sec ? max_bytes_per_sec / totalWeight() * fg_read_weight - : max_read_bytes_per_sec / readWeight() * fg_read_weight; + return use_max_bytes_per_sec ? static_cast(1.0 * max_bytes_per_sec / totalWeight() * fg_read_weight) + : static_cast(1.0 * max_read_bytes_per_sec / readWeight() * fg_read_weight); } UInt64 StorageIORateLimitConfig::getBgReadMaxBytesPerSec() const { - if (totalWeight() <= 0 || readWeight() <= 0) + if (readWeight() <= 0 || totalWeight() <= 0) { return 0; } - return use_max_bytes_per_sec ? max_bytes_per_sec / totalWeight() * bg_read_weight - : max_read_bytes_per_sec / readWeight() * bg_read_weight; + return use_max_bytes_per_sec ? static_cast(1.0 * max_bytes_per_sec / totalWeight() * bg_read_weight) + : static_cast(1.0 * max_read_bytes_per_sec / readWeight() * bg_read_weight); } UInt64 StorageIORateLimitConfig::getWriteMaxBytesPerSec() const From d5c3c4a17caa68ed19a47f532819ec117a3e136e Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Thu, 21 Jul 2022 17:48:50 +0800 Subject: [PATCH 02/15] run getCurrentIOInfo in auto_tune_thread Signed-off-by: Lloyd-Pottiger --- dbms/src/Encryption/RateLimiter.cpp | 71 +++++++++++-------------- dbms/src/Encryption/RateLimiter.h | 80 ++++++++++++++--------------- 2 files changed, 69 insertions(+), 82 deletions(-) diff --git a/dbms/src/Encryption/RateLimiter.cpp b/dbms/src/Encryption/RateLimiter.cpp index 38fd8468341..a686c5f2275 100644 --- a/dbms/src/Encryption/RateLimiter.cpp +++ b/dbms/src/Encryption/RateLimiter.cpp @@ -292,42 +292,22 @@ ReadLimiter::ReadLimiter( std::function getIOStatistic_, Int64 rate_limit_per_sec_, LimiterType type_, - Int64 get_io_stat_period_us, UInt64 refill_period_ms_) : WriteLimiter(rate_limit_per_sec_, type_, refill_period_ms_) - , getIOStatistic(std::move(getIOStatistic_)) - , last_stat_bytes(getIOStatistic()) - , last_stat_time(now()) + , get_io_statistic(std::move(getIOStatistic_)) + , last_stat_bytes(get_io_statistic()) , log(&Poco::Logger::get("ReadLimiter")) - , get_io_statistic_period_us(get_io_stat_period_us) {} Int64 ReadLimiter::getAvailableBalance() { - TimePoint us = now(); - // Not call getIOStatisctics() every time for performance. - // If the clock back, elapsed_us could be negative. - Int64 elapsed_us = std::chrono::duration_cast(us - last_stat_time).count(); - if (get_io_statistic_period_us != 0 && elapsed_us < get_io_statistic_period_us) - { - return available_balance; - } - - return refreshAvailableBalance(); -} - -Int64 ReadLimiter::refreshAvailableBalance() -{ - TimePoint us = now(); - Int64 bytes = getIOStatistic(); + Int64 bytes = get_io_statistic(); if (bytes < last_stat_bytes) { LOG_FMT_WARNING( log, - "last_stat {}:{} current_stat {}:{}", - last_stat_time.time_since_epoch().count(), + "last_stat: {} current_stat: {}", last_stat_bytes, - us.time_since_epoch().count(), bytes); } else @@ -338,7 +318,6 @@ Int64 ReadLimiter::refreshAvailableBalance() alloc_bytes += real_alloc_bytes; } last_stat_bytes = bytes; - last_stat_time = us; return available_balance; } @@ -384,14 +363,15 @@ void ReadLimiter::refillAndAlloc() IORateLimiter::IORateLimiter() : log(&Poco::Logger::get("IORateLimiter")) , stop(false) + , update_io_stat_period_ms(200) // 200ms {} IORateLimiter::~IORateLimiter() { stop.store(true, std::memory_order_relaxed); - if (auto_tune_thread.joinable()) + if (auto_tune_and_get_io_info_thread.joinable()) { - auto_tune_thread.join(); + auto_tune_and_get_io_info_thread.join(); } } @@ -409,13 +389,13 @@ extern thread_local bool is_background_thread; WriteLimiterPtr IORateLimiter::getWriteLimiter() { - std::lock_guard lock(mtx_); + std::lock_guard lock(mtx); return is_background_thread ? bg_write_limiter : fg_write_limiter; } ReadLimiterPtr IORateLimiter::getReadLimiter() { - std::lock_guard lock(mtx_); + std::lock_guard lock(mtx); return is_background_thread ? bg_read_limiter : fg_read_limiter; } @@ -426,7 +406,7 @@ void IORateLimiter::updateConfig(Poco::Util::AbstractConfiguration & config_) { return; } - std::lock_guard lock(mtx_); + std::lock_guard lock(mtx); updateReadLimiter(io_config.getBgReadMaxBytesPerSec(), io_config.getFgReadMaxBytesPerSec()); updateWriteLimiter(io_config.getBgWriteMaxBytesPerSec(), io_config.getFgWriteMaxBytesPerSec()); } @@ -455,10 +435,9 @@ void IORateLimiter::updateReadLimiter(Int64 bg_bytes, Int64 fg_bytes) { LOG_FMT_INFO(log, "updateReadLimiter: bg_bytes {} fg_bytes {}", bg_bytes, fg_bytes); auto get_bg_read_io_statistic = [&]() { - return getCurrentIOInfo().bg_read_bytes; + return io_info.bg_read_bytes; }; auto get_fg_read_io_statistic = [&]() { - auto io_info = getCurrentIOInfo(); return std::max(0, io_info.total_read_bytes - io_info.bg_read_bytes); }; @@ -575,7 +554,7 @@ std::pair IORateLimiter::getReadWriteBytes(const std::string & fna #endif } -IORateLimiter::IOInfo IORateLimiter::getCurrentIOInfo() +IOInfo IORateLimiter::getCurrentIOInfo() { static const pid_t pid = getpid(); IOInfo io_info; @@ -599,7 +578,7 @@ IORateLimiter::IOInfo IORateLimiter::getCurrentIOInfo() void IORateLimiter::setStop() { - std::lock_guard lock(mtx_); + std::lock_guard lock(mtx); if (bg_write_limiter != nullptr) { auto sz = bg_write_limiter->setStop(); @@ -624,17 +603,27 @@ void IORateLimiter::setStop() void IORateLimiter::runAutoTune() { - auto auto_tune_worker = [&]() { + auto auto_tune_and_get_io_info_worker = [&]() { + using time_point = std::chrono::time_point; + using clock = std::chrono::system_clock; + time_point auot_tune_time = clock::now(); + time_point update_io_stat_time = clock::now(); while (!stop.load(std::memory_order_relaxed)) { - ::sleep(io_config.auto_tune_sec > 0 ? io_config.auto_tune_sec : 1); - if (io_config.auto_tune_sec > 0) + auto now_time_point = clock::now(); + if ((io_config.auto_tune_sec > 0) && (now_time_point - auot_tune_time > std::chrono::seconds(io_config.auto_tune_sec))) { autoTune(); + auot_tune_time = now_time_point; + } + if ((bg_read_limiter || fg_read_limiter) && (now_time_point - update_io_stat_time > std::chrono::milliseconds(update_io_stat_period_ms))) + { + io_info = getCurrentIOInfo(); + update_io_stat_time = now_time_point; } } }; - auto_tune_thread = std::thread(auto_tune_worker); + auto_tune_and_get_io_info_thread = std::thread(auto_tune_and_get_io_info_worker); } std::unique_ptr IORateLimiter::createIOLimitTuner() @@ -643,7 +632,7 @@ std::unique_ptr IORateLimiter::createIOLimitTuner() ReadLimiterPtr bg_read, fg_read; StorageIORateLimitConfig t_io_config; { - std::lock_guard lock(mtx_); + std::lock_guard lock(mtx); bg_write = bg_write_limiter; fg_write = fg_write_limiter; bg_read = bg_read_limiter; @@ -666,12 +655,12 @@ void IORateLimiter::autoTune() auto tune_result = tuner->tune(); if (tune_result.read_tuned) { - std::lock_guard lock(mtx_); + std::lock_guard lock(mtx); updateReadLimiter(tune_result.max_bg_read_bytes_per_sec, tune_result.max_fg_read_bytes_per_sec); } if (tune_result.write_tuned) { - std::lock_guard lock(mtx_); + std::lock_guard lock(mtx); updateWriteLimiter(tune_result.max_bg_write_bytes_per_sec, tune_result.max_fg_write_bytes_per_sec); } } diff --git a/dbms/src/Encryption/RateLimiter.h b/dbms/src/Encryption/RateLimiter.h index f44beeb8ed7..1b1055dc033 100644 --- a/dbms/src/Encryption/RateLimiter.h +++ b/dbms/src/Encryption/RateLimiter.h @@ -46,6 +46,37 @@ enum class LimiterType BG_READ = 4, }; +/// IOInfo is used to store IO information. +/// total_write_bytes is the total bytes of the write. +/// total_read_bytes is the total bytes of the read. +/// bg_write_bytes is the bytes of the background write. +/// bg_read_bytes is the bytes of the background read. +/// update_time is the time of the last update. +struct IOInfo +{ + Int64 total_write_bytes; + Int64 total_read_bytes; + Int64 bg_write_bytes; + Int64 bg_read_bytes; + std::chrono::time_point update_time; + + IOInfo() + : total_write_bytes(0) + , total_read_bytes(0) + , bg_write_bytes(0) + , bg_read_bytes(0) + {} + + std::string toString() const + { + return fmt::format("total_write_bytes {} total_read_bytes {} bg_write_bytes {} bg_read_bytes {}", + total_write_bytes, + total_read_bytes, + bg_write_bytes, + bg_read_bytes); + } +}; + // WriteLimiter is to control write rate (bytes per second). // Because of the storage engine is append-only, the amount of data written by the storage engine // is equal to the amount of data written to the disk by the operating system. So, WriteLimiter @@ -159,35 +190,25 @@ class ReadLimiter : public WriteLimiter std::function getIOStatistic_, Int64 rate_limit_per_sec_, LimiterType type_, - Int64 get_io_stat_period_us = 2000, UInt64 refill_period_ms_ = 100); #ifndef DBMS_PUBLIC_GTEST protected: #endif - virtual void refillAndAlloc() override; - virtual void consumeBytes(Int64 bytes) override; - virtual bool canGrant(Int64 bytes) override; + void refillAndAlloc() override; + void consumeBytes(Int64 bytes) override; + bool canGrant(Int64 bytes) override; #ifndef DBMS_PUBLIC_GTEST private: #endif Int64 getAvailableBalance(); - Int64 refreshAvailableBalance(); - std::function getIOStatistic; + std::function get_io_statistic; Int64 last_stat_bytes; - using TimePoint = std::chrono::time_point; - static TimePoint now() - { - return std::chrono::time_point_cast(std::chrono::system_clock::now()); - } - TimePoint last_stat_time; Poco::Logger * log; - - Int64 get_io_statistic_period_us; }; using ReadLimiterPtr = std::shared_ptr; @@ -209,31 +230,6 @@ class IORateLimiter void setStop(); - struct IOInfo - { - Int64 total_write_bytes; - Int64 total_read_bytes; - Int64 bg_write_bytes; - Int64 bg_read_bytes; - std::chrono::time_point update_time; - - IOInfo() - : total_write_bytes(0) - , total_read_bytes(0) - , bg_write_bytes(0) - , bg_read_bytes(0) - {} - - std::string toString() const - { - return fmt::format("total_write_bytes {} total_read_bytes {} bg_write_bytes {} bg_read_bytes {}", - total_write_bytes, - total_read_bytes, - bg_write_bytes, - bg_read_bytes); - } - }; - #ifndef DBMS_PUBLIC_GTEST private: #endif @@ -254,7 +250,7 @@ class IORateLimiter WriteLimiterPtr fg_write_limiter; ReadLimiterPtr bg_read_limiter; ReadLimiterPtr fg_read_limiter; - std::mutex mtx_; + std::mutex mtx; std::mutex bg_thread_ids_mtx; std::vector bg_thread_ids; @@ -263,7 +259,9 @@ class IORateLimiter Poco::Logger * log; std::atomic stop; - std::thread auto_tune_thread; + std::thread auto_tune_and_get_io_info_thread; + IOInfo io_info; + const UInt64 update_io_stat_period_ms; // Noncopyable and nonmovable. DISALLOW_COPY_AND_MOVE(IORateLimiter); From 8fa8611b649568d31fa4c21b8ab2b5de8b61c23e Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Fri, 22 Jul 2022 15:19:37 +0800 Subject: [PATCH 03/15] fix unit test Signed-off-by: Lloyd-Pottiger --- dbms/src/Encryption/RateLimiter.cpp | 4 +- dbms/src/Encryption/RateLimiter.h | 10 +++-- .../Encryption/tests/gtest_rate_limiter.cpp | 39 ++++++++----------- dbms/src/TestUtils/MockReadLimiter.h | 5 +-- 4 files changed, 27 insertions(+), 31 deletions(-) diff --git a/dbms/src/Encryption/RateLimiter.cpp b/dbms/src/Encryption/RateLimiter.cpp index a686c5f2275..44ef26fb3e2 100644 --- a/dbms/src/Encryption/RateLimiter.cpp +++ b/dbms/src/Encryption/RateLimiter.cpp @@ -360,10 +360,10 @@ void ReadLimiter::refillAndAlloc() } } -IORateLimiter::IORateLimiter() +IORateLimiter::IORateLimiter(UInt64 update_io_stat_period_ms_) : log(&Poco::Logger::get("IORateLimiter")) , stop(false) - , update_io_stat_period_ms(200) // 200ms + , update_io_stat_period_ms(update_io_stat_period_ms_) // 200ms {} IORateLimiter::~IORateLimiter() diff --git a/dbms/src/Encryption/RateLimiter.h b/dbms/src/Encryption/RateLimiter.h index 1b1055dc033..012999e7a01 100644 --- a/dbms/src/Encryption/RateLimiter.h +++ b/dbms/src/Encryption/RateLimiter.h @@ -178,9 +178,7 @@ using WriteLimiterPtr = std::shared_ptr; // // Constructor parameters: // -// `getIOStatistic_` is the function that obtain the amount of data read from /proc. -// -// `get_io_stat_period_us` is the interval between calling getIOStatistic_. +// `getIOStatistic_` is the function that obtain the amount of data from `getCurrentIOInfo()` which read from /proc filesystem. // // Other parameters are the same as WriteLimiter. class ReadLimiter : public WriteLimiter @@ -215,10 +213,14 @@ using ReadLimiterPtr = std::shared_ptr; // IORateLimiter is the wrapper of WriteLimiter and ReadLimiter. // Currently, It supports four limiter type: background write, foreground write, background read and foreground read. +// +// Constructor parameters: +// +// `update_io_stat_period_ms` is the interval between calling getCurrentIOInfo. Default is 200ms. class IORateLimiter { public: - IORateLimiter(); + explicit IORateLimiter(UInt64 update_io_stat_period_ms = 200); ~IORateLimiter(); WriteLimiterPtr getWriteLimiter(); diff --git a/dbms/src/Encryption/tests/gtest_rate_limiter.cpp b/dbms/src/Encryption/tests/gtest_rate_limiter.cpp index 51984b86460..149c4446a13 100644 --- a/dbms/src/Encryption/tests/gtest_rate_limiter.cpp +++ b/dbms/src/Encryption/tests/gtest_rate_limiter.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -177,7 +178,7 @@ TEST(WriteLimiterTest, LimiterStat) ASSERT_EQ(stat.pct(), static_cast(alloc_bytes * 1000 / stat.elapsed_ms) * 100 / stat.maxBytesPerSec()) << stat.toString(); } -TEST(ReadLimiterTest, GetIOStatPeroid2000us) +TEST(ReadLimiterTest, GetIOStatPeroid200ms) { Int64 consumed = 0; auto get_stat = [&consumed]() { @@ -187,41 +188,31 @@ TEST(ReadLimiterTest, GetIOStatPeroid2000us) limiter.request(bytes); consumed += bytes; }; - Int64 get_io_stat_period_us = 2000; - auto wait_refresh = [&]() { - std::chrono::microseconds sleep_time(get_io_stat_period_us + 1); - std::this_thread::sleep_for(sleep_time); - }; using TimePointMS = std::chrono::time_point; Int64 bytes_per_sec = 1000; UInt64 refill_period_ms = 20; - ReadLimiter limiter(get_stat, bytes_per_sec, LimiterType::UNKNOW, get_io_stat_period_us, refill_period_ms); + ReadLimiter limiter(get_stat, bytes_per_sec, LimiterType::UNKNOW, refill_period_ms); TimePointMS t0 = std::chrono::time_point_cast(std::chrono::system_clock::now()); // Refill 20 every 20ms. ASSERT_EQ(limiter.getAvailableBalance(), 20); request(limiter, 1); - ASSERT_EQ(limiter.getAvailableBalance(), 20); - ASSERT_EQ(limiter.refreshAvailableBalance(), 19); - request(limiter, 9); ASSERT_EQ(limiter.getAvailableBalance(), 19); - wait_refresh(); - ASSERT_EQ(limiter.getAvailableBalance(), 10); - request(limiter, 11); - wait_refresh(); + request(limiter, 20); ASSERT_EQ(limiter.getAvailableBalance(), -1); request(limiter, 50); TimePointMS t1 = std::chrono::time_point_cast(std::chrono::system_clock::now()); UInt64 elasped = std::chrono::duration_cast(t1 - t0).count(); ASSERT_GE(elasped, refill_period_ms); - ASSERT_EQ(limiter.getAvailableBalance(), 19); - wait_refresh(); ASSERT_EQ(limiter.getAvailableBalance(), -31); request(limiter, 1); TimePointMS t2 = std::chrono::time_point_cast(std::chrono::system_clock::now()); elasped = std::chrono::duration_cast(t2 - t1).count(); ASSERT_GE(elasped, 2 * refill_period_ms); + ASSERT_EQ(limiter.getAvailableBalance(), 8); + request(limiter, 9); + ASSERT_EQ(limiter.getAvailableBalance(), -1); } void testSetStop(int blocked_thread_cnt) @@ -278,8 +269,12 @@ TEST(ReadLimiterTest, LimiterStat) limiter.request(bytes); consumed += bytes; }; - Int64 get_io_stat_period_us = 2000; - ReadLimiter read_limiter(get_stat, 1000, LimiterType::UNKNOW, get_io_stat_period_us, 100); + + Int64 bytes_per_sec = 1000; + UInt64 refill_period_ms = 100; + ReadLimiter read_limiter(get_stat, bytes_per_sec, LimiterType::UNKNOW, refill_period_ms); + ASSERT_EQ(read_limiter.getAvailableBalance(), 100); + try { read_limiter.getStat(); @@ -314,7 +309,7 @@ TEST(ReadLimiterTest, LimiterStat) request(read_limiter, 100); std::this_thread::sleep_for(100ms); - read_limiter.refreshAvailableBalance(); + ASSERT_EQ(read_limiter.getAvailableBalance(), 0); stat = read_limiter.getStat(); ASSERT_EQ(stat.alloc_bytes, 100ul); @@ -344,7 +339,7 @@ TEST(ReadLimiterTest, LimiterStat) } std::this_thread::sleep_for(100ms); - read_limiter.refreshAvailableBalance(); + ASSERT_EQ(read_limiter.getAvailableBalance(), -947); stat = read_limiter.getStat(); ASSERT_EQ(stat.alloc_bytes, alloc_bytes); @@ -376,7 +371,7 @@ TEST(IORateLimiterTest, IOStat) int buf_size = 4096; int ret = ::posix_memalign(&buf, buf_size, buf_size); ASSERT_EQ(ret, 0) << strerror(errno); - std::unique_ptr> defer_free(buf, [](void * p) { ::free(p); }); + std::unique_ptr> defer_free(buf, [](void * p) { ::free(p); }); // NOLINT(cppcoreguidelines-no-malloc) ssize_t n = ::pwrite(fd, buf, buf_size, 0); ASSERT_EQ(n, buf_size) << strerror(errno); @@ -418,7 +413,7 @@ TEST(IORateLimiterTest, IOStatMultiThread) void * buf = nullptr; int ret = ::posix_memalign(&buf, buf_size, buf_size); - std::unique_ptr> auto_free(buf, [](void * p) { free(p); }); + std::unique_ptr> auto_free(buf, [](void * p) { free(p); }); // NOLINT(cppcoreguidelines-no-malloc) ASSERT_EQ(ret, 0) << strerror(errno); ssize_t n = ::pwrite(fd, buf, buf_size, 0); diff --git a/dbms/src/TestUtils/MockReadLimiter.h b/dbms/src/TestUtils/MockReadLimiter.h index 8acc96371e3..0bb69145a67 100644 --- a/dbms/src/TestUtils/MockReadLimiter.h +++ b/dbms/src/TestUtils/MockReadLimiter.h @@ -23,9 +23,8 @@ class MockReadLimiter final : public ReadLimiter std::function getIOStatistic_, Int64 rate_limit_per_sec_, LimiterType type_ = LimiterType::UNKNOW, - Int64 get_io_stat_period_us = 2000, UInt64 refill_period_ms_ = 100) - : ReadLimiter(getIOStatistic_, rate_limit_per_sec_, type_, get_io_stat_period_us, refill_period_ms_) + : ReadLimiter(getIOStatistic_, rate_limit_per_sec_, type_, refill_period_ms_) { } @@ -33,7 +32,7 @@ class MockReadLimiter final : public ReadLimiter void consumeBytes(Int64 bytes) override { // Need soft limit here. - WriteLimiter::consumeBytes(bytes); + WriteLimiter::consumeBytes(bytes); // NOLINT(bugprone-parent-virtual-call) } }; From 1e946eef0045130bd9fb37da313c03db2d6faa65 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Mon, 25 Jul 2022 19:28:24 +0800 Subject: [PATCH 04/15] address comments Signed-off-by: Lloyd-Pottiger --- dbms/src/Encryption/RateLimiter.cpp | 25 ++++++++++++++++--------- dbms/src/Encryption/RateLimiter.h | 6 ++++-- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/dbms/src/Encryption/RateLimiter.cpp b/dbms/src/Encryption/RateLimiter.cpp index 44ef26fb3e2..ce095e6273d 100644 --- a/dbms/src/Encryption/RateLimiter.cpp +++ b/dbms/src/Encryption/RateLimiter.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -289,19 +290,19 @@ void WriteLimiter::updateMaxBytesPerSec(Int64 max_bytes_per_sec) } ReadLimiter::ReadLimiter( - std::function getIOStatistic_, + std::function get_read_bytes_, Int64 rate_limit_per_sec_, LimiterType type_, UInt64 refill_period_ms_) : WriteLimiter(rate_limit_per_sec_, type_, refill_period_ms_) - , get_io_statistic(std::move(getIOStatistic_)) - , last_stat_bytes(get_io_statistic()) + , get_read_bytes(std::move(get_read_bytes_)) + , last_stat_bytes(get_read_bytes()) , log(&Poco::Logger::get("ReadLimiter")) {} Int64 ReadLimiter::getAvailableBalance() { - Int64 bytes = get_io_statistic(); + Int64 bytes = get_read_bytes(); if (bytes < last_stat_bytes) { LOG_FMT_WARNING( @@ -363,7 +364,7 @@ void ReadLimiter::refillAndAlloc() IORateLimiter::IORateLimiter(UInt64 update_io_stat_period_ms_) : log(&Poco::Logger::get("IORateLimiter")) , stop(false) - , update_io_stat_period_ms(update_io_stat_period_ms_) // 200ms + , update_io_stat_period_ms(update_io_stat_period_ms_) {} IORateLimiter::~IORateLimiter() @@ -435,9 +436,11 @@ void IORateLimiter::updateReadLimiter(Int64 bg_bytes, Int64 fg_bytes) { LOG_FMT_INFO(log, "updateReadLimiter: bg_bytes {} fg_bytes {}", bg_bytes, fg_bytes); auto get_bg_read_io_statistic = [&]() { + std::shared_lock lock(io_info_mtx); return io_info.bg_read_bytes; }; auto get_fg_read_io_statistic = [&]() { + std::shared_lock lock(io_info_mtx); return std::max(0, io_info.total_read_bytes - io_info.bg_read_bytes); }; @@ -606,20 +609,24 @@ void IORateLimiter::runAutoTune() auto auto_tune_and_get_io_info_worker = [&]() { using time_point = std::chrono::time_point; using clock = std::chrono::system_clock; - time_point auot_tune_time = clock::now(); + time_point auto_tune_time = clock::now(); time_point update_io_stat_time = clock::now(); while (!stop.load(std::memory_order_relaxed)) { auto now_time_point = clock::now(); - if ((io_config.auto_tune_sec > 0) && (now_time_point - auot_tune_time > std::chrono::seconds(io_config.auto_tune_sec))) + if ((io_config.auto_tune_sec > 0) && (now_time_point - auto_tune_time > std::chrono::seconds(io_config.auto_tune_sec))) { autoTune(); - auot_tune_time = now_time_point; + auto_tune_time = now_time_point; } if ((bg_read_limiter || fg_read_limiter) && (now_time_point - update_io_stat_time > std::chrono::milliseconds(update_io_stat_period_ms))) { - io_info = getCurrentIOInfo(); + IOInfo io_info_tmp = getCurrentIOInfo(); update_io_stat_time = now_time_point; + io_info_mtx.lock(); + io_info = io_info_tmp; + io_info_mtx.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(update_io_stat_period_ms - 1)); } } }; diff --git a/dbms/src/Encryption/RateLimiter.h b/dbms/src/Encryption/RateLimiter.h index 012999e7a01..7cfedcc03d3 100644 --- a/dbms/src/Encryption/RateLimiter.h +++ b/dbms/src/Encryption/RateLimiter.h @@ -26,6 +26,7 @@ #include #include #include +#include // TODO: separate IO utility(i.e. FileProvider, RateLimiter) from Encryption directory namespace Poco::Util @@ -185,7 +186,7 @@ class ReadLimiter : public WriteLimiter { public: ReadLimiter( - std::function getIOStatistic_, + std::function get_read_bytes_, Int64 rate_limit_per_sec_, LimiterType type_, UInt64 refill_period_ms_ = 100); @@ -204,7 +205,7 @@ class ReadLimiter : public WriteLimiter Int64 getAvailableBalance(); - std::function get_io_statistic; + std::function get_read_bytes; Int64 last_stat_bytes; Poco::Logger * log; }; @@ -263,6 +264,7 @@ class IORateLimiter std::atomic stop; std::thread auto_tune_and_get_io_info_thread; IOInfo io_info; + mutable std::shared_mutex io_info_mtx; const UInt64 update_io_stat_period_ms; // Noncopyable and nonmovable. From faee37a0beaa1dc332045eabe4aa9b8f55dea9a9 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Mon, 25 Jul 2022 19:31:11 +0800 Subject: [PATCH 05/15] format Signed-off-by: Lloyd-Pottiger --- dbms/src/Encryption/RateLimiter.cpp | 2 -- dbms/src/Encryption/RateLimiter.h | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/dbms/src/Encryption/RateLimiter.cpp b/dbms/src/Encryption/RateLimiter.cpp index ce095e6273d..d30939893b7 100644 --- a/dbms/src/Encryption/RateLimiter.cpp +++ b/dbms/src/Encryption/RateLimiter.cpp @@ -19,8 +19,6 @@ #include #include #include -#include - #include #include #include diff --git a/dbms/src/Encryption/RateLimiter.h b/dbms/src/Encryption/RateLimiter.h index 7cfedcc03d3..3a28c4bb88f 100644 --- a/dbms/src/Encryption/RateLimiter.h +++ b/dbms/src/Encryption/RateLimiter.h @@ -25,8 +25,8 @@ #include #include #include -#include #include +#include // TODO: separate IO utility(i.e. FileProvider, RateLimiter) from Encryption directory namespace Poco::Util From 183ff8b1be40794715768407247afe1cfb0f8bd6 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Thu, 28 Jul 2022 14:57:51 +0800 Subject: [PATCH 06/15] make variables of read_info atomic variables Signed-off-by: Lloyd-Pottiger --- dbms/src/Encryption/RateLimiter.cpp | 47 +++++++++++------------------ dbms/src/Encryption/RateLimiter.h | 42 +++++++++++--------------- 2 files changed, 34 insertions(+), 55 deletions(-) diff --git a/dbms/src/Encryption/RateLimiter.cpp b/dbms/src/Encryption/RateLimiter.cpp index d30939893b7..957c8614761 100644 --- a/dbms/src/Encryption/RateLimiter.cpp +++ b/dbms/src/Encryption/RateLimiter.cpp @@ -19,6 +19,7 @@ #include #include #include + #include #include #include @@ -434,12 +435,10 @@ void IORateLimiter::updateReadLimiter(Int64 bg_bytes, Int64 fg_bytes) { LOG_FMT_INFO(log, "updateReadLimiter: bg_bytes {} fg_bytes {}", bg_bytes, fg_bytes); auto get_bg_read_io_statistic = [&]() { - std::shared_lock lock(io_info_mtx); - return io_info.bg_read_bytes; + return read_info.bg_read_bytes.load(); }; auto get_fg_read_io_statistic = [&]() { - std::shared_lock lock(io_info_mtx); - return std::max(0, io_info.total_read_bytes - io_info.bg_read_bytes); + return std::max(0, read_info.total_read_bytes - read_info.bg_read_bytes); }; if (bg_bytes == 0) @@ -506,7 +505,7 @@ void IORateLimiter::setBackgroundThreadIds(std::vector thread_ids) LOG_FMT_INFO(log, "bg_thread_ids {} => {}", bg_thread_ids.size(), bg_thread_ids); } -std::pair IORateLimiter::getReadWriteBytes(const std::string & fname [[maybe_unused]]) +Int64 IORateLimiter::getReadBytes(const std::string & fname [[maybe_unused]]) { #if __linux__ std::ifstream ifs(fname); @@ -518,7 +517,6 @@ std::pair IORateLimiter::getReadWriteBytes(const std::string & fna } std::string s; Int64 read_bytes = -1; - Int64 write_bytes = -1; while (std::getline(ifs, s)) { if (s.empty()) @@ -537,44 +535,36 @@ std::pair IORateLimiter::getReadWriteBytes(const std::string & fna boost::algorithm::trim(values[1]); read_bytes = std::stoll(values[1]); } - else if (values[0] == "write_bytes") - { - boost::algorithm::trim(values[1]); - write_bytes = std::stoll(values[1]); - } } - if (read_bytes == -1 || write_bytes == -1) + if (read_bytes == -1) { - auto msg = fmt::format("read_bytes: {} write_bytes: {} Invalid result.", read_bytes, write_bytes); + auto msg = fmt::format("read_bytes: {}. Invalid result.", read_bytes); LOG_ERROR(log, msg); throw Exception(msg, ErrorCodes::UNKNOWN_EXCEPTION); } - return {read_bytes, write_bytes}; + return read_bytes; #else - return {0, 0}; + return {0}; #endif } -IOInfo IORateLimiter::getCurrentIOInfo() +void IORateLimiter::getCurrentIOInfo() { static const pid_t pid = getpid(); - IOInfo io_info; + read_info.reset(); - // Read I/O info of each background threads. + // Read read info of each background threads. for (pid_t tid : bg_thread_ids) { const std::string thread_io_fname = fmt::format("/proc/{}/task/{}/io", pid, tid); - Int64 read_bytes, write_bytes; - std::tie(read_bytes, write_bytes) = getReadWriteBytes(thread_io_fname); - io_info.bg_read_bytes += read_bytes; - io_info.bg_write_bytes += write_bytes; + Int64 read_bytes; + read_bytes = getReadBytes(thread_io_fname); + read_info.bg_read_bytes += read_bytes; } - // Read I/O info of this process. + // Read read info of this process. static const std::string proc_io_fname = fmt::format("/proc/{}/io", pid); - std::tie(io_info.total_read_bytes, io_info.total_write_bytes) = getReadWriteBytes(proc_io_fname); - io_info.update_time = std::chrono::system_clock::now(); - return io_info; + read_info.total_read_bytes = getReadBytes(proc_io_fname); } void IORateLimiter::setStop() @@ -619,11 +609,8 @@ void IORateLimiter::runAutoTune() } if ((bg_read_limiter || fg_read_limiter) && (now_time_point - update_io_stat_time > std::chrono::milliseconds(update_io_stat_period_ms))) { - IOInfo io_info_tmp = getCurrentIOInfo(); + getCurrentIOInfo(); update_io_stat_time = now_time_point; - io_info_mtx.lock(); - io_info = io_info_tmp; - io_info_mtx.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(update_io_stat_period_ms - 1)); } } diff --git a/dbms/src/Encryption/RateLimiter.h b/dbms/src/Encryption/RateLimiter.h index 3a28c4bb88f..41d93230973 100644 --- a/dbms/src/Encryption/RateLimiter.h +++ b/dbms/src/Encryption/RateLimiter.h @@ -47,34 +47,28 @@ enum class LimiterType BG_READ = 4, }; -/// IOInfo is used to store IO information. -/// total_write_bytes is the total bytes of the write. +/// ReadInfo is used to store IO information. /// total_read_bytes is the total bytes of the read. -/// bg_write_bytes is the bytes of the background write. /// bg_read_bytes is the bytes of the background read. -/// update_time is the time of the last update. -struct IOInfo +struct ReadInfo { - Int64 total_write_bytes; - Int64 total_read_bytes; - Int64 bg_write_bytes; - Int64 bg_read_bytes; - std::chrono::time_point update_time; - - IOInfo() - : total_write_bytes(0) - , total_read_bytes(0) - , bg_write_bytes(0) + std::atomic total_read_bytes; + std::atomic bg_read_bytes; + + ReadInfo() + : total_read_bytes(0) , bg_read_bytes(0) {} + void reset() + { + total_read_bytes = 0; + bg_read_bytes = 0; + } + std::string toString() const { - return fmt::format("total_write_bytes {} total_read_bytes {} bg_write_bytes {} bg_read_bytes {}", - total_write_bytes, - total_read_bytes, - bg_write_bytes, - bg_read_bytes); + return fmt::format("total_read_bytes {} bg_read_bytes {}", total_read_bytes, bg_read_bytes); } }; @@ -237,8 +231,8 @@ class IORateLimiter private: #endif - std::pair getReadWriteBytes(const std::string & fname); - IOInfo getCurrentIOInfo(); + Int64 getReadBytes(const std::string & fname); + void getCurrentIOInfo(); std::unique_ptr createIOLimitTuner(); void autoTune(); @@ -257,14 +251,12 @@ class IORateLimiter std::mutex bg_thread_ids_mtx; std::vector bg_thread_ids; - IOInfo last_io_info; Poco::Logger * log; std::atomic stop; std::thread auto_tune_and_get_io_info_thread; - IOInfo io_info; - mutable std::shared_mutex io_info_mtx; + ReadInfo read_info; const UInt64 update_io_stat_period_ms; // Noncopyable and nonmovable. From 688f5301791ca6fbef24d021df1954ad243f3f37 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Thu, 28 Jul 2022 15:35:27 +0800 Subject: [PATCH 07/15] reslove conflict Signed-off-by: Lloyd-Pottiger --- dbms/src/Encryption/RateLimiter.cpp | 6 +++--- dbms/src/Encryption/RateLimiter.h | 6 +++--- dbms/src/Server/StorageConfigParser.cpp | 10 +++++----- dbms/src/Server/StorageConfigParser.h | 11 ++++++----- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/dbms/src/Encryption/RateLimiter.cpp b/dbms/src/Encryption/RateLimiter.cpp index 957c8614761..0a4fcbef298 100644 --- a/dbms/src/Encryption/RateLimiter.cpp +++ b/dbms/src/Encryption/RateLimiter.cpp @@ -296,7 +296,7 @@ ReadLimiter::ReadLimiter( : WriteLimiter(rate_limit_per_sec_, type_, refill_period_ms_) , get_read_bytes(std::move(get_read_bytes_)) , last_stat_bytes(get_read_bytes()) - , log(&Poco::Logger::get("ReadLimiter")) + , log(Logger::get("ReadLimiter")) {} Int64 ReadLimiter::getAvailableBalance() @@ -361,7 +361,7 @@ void ReadLimiter::refillAndAlloc() } IORateLimiter::IORateLimiter(UInt64 update_io_stat_period_ms_) - : log(&Poco::Logger::get("IORateLimiter")) + : log(Logger::get("IORateLimiter")) , stop(false) , update_io_stat_period_ms(update_io_stat_period_ms_) {} @@ -674,7 +674,7 @@ IOLimitTuner::IOLimitTuner( , bg_read_stat(std::move(bg_read_stat_)) , fg_read_stat(std::move(fg_read_stat_)) , io_config(io_config_) - , log(&Poco::Logger::get("IOLimitTuner")) + , log(Logger::get("IOLimitTuner")) {} IOLimitTuner::TuneResult IOLimitTuner::tune() const diff --git a/dbms/src/Encryption/RateLimiter.h b/dbms/src/Encryption/RateLimiter.h index 41d93230973..c09e33917c8 100644 --- a/dbms/src/Encryption/RateLimiter.h +++ b/dbms/src/Encryption/RateLimiter.h @@ -201,7 +201,7 @@ class ReadLimiter : public WriteLimiter std::function get_read_bytes; Int64 last_stat_bytes; - Poco::Logger * log; + LoggerPtr log; }; using ReadLimiterPtr = std::shared_ptr; @@ -252,7 +252,7 @@ class IORateLimiter std::mutex bg_thread_ids_mtx; std::vector bg_thread_ids; - Poco::Logger * log; + LoggerPtr log; std::atomic stop; std::thread auto_tune_and_get_io_info_thread; @@ -469,6 +469,6 @@ class IOLimitTuner LimiterStatUPtr bg_read_stat; LimiterStatUPtr fg_read_stat; StorageIORateLimitConfig io_config; - Poco::Logger * log; + LoggerPtr log; }; } // namespace DB diff --git a/dbms/src/Server/StorageConfigParser.cpp b/dbms/src/Server/StorageConfigParser.cpp index e8db575912c..1079935a318 100644 --- a/dbms/src/Server/StorageConfigParser.cpp +++ b/dbms/src/Server/StorageConfigParser.cpp @@ -61,7 +61,7 @@ static String getNormalizedPath(const String & s) return getCanonicalPath(Poco::Path{s}.toString()); } -void TiFlashStorageConfig::parseStoragePath(const String & storage, Poco::Logger * log) +void TiFlashStorageConfig::parseStoragePath(const String & storage, const LoggerPtr & log) { std::istringstream ss(storage); cpptoml::parser p(ss); @@ -181,7 +181,7 @@ void TiFlashStorageConfig::parseStoragePath(const String & storage, Poco::Logger } } -void TiFlashStorageConfig::parseMisc(const String & storage_section, Poco::Logger * log) +void TiFlashStorageConfig::parseMisc(const String & storage_section, const LoggerPtr & log) { std::istringstream ss(storage_section); cpptoml::parser p(ss); @@ -233,7 +233,7 @@ Strings TiFlashStorageConfig::getAllNormalPaths() const return all_normal_path; } -bool TiFlashStorageConfig::parseFromDeprecatedConfiguration(Poco::Util::LayeredConfiguration & config, Poco::Logger * log) +bool TiFlashStorageConfig::parseFromDeprecatedConfiguration(Poco::Util::LayeredConfiguration & config, const LoggerPtr & log) { if (!config.has("path")) return false; @@ -302,7 +302,7 @@ bool TiFlashStorageConfig::parseFromDeprecatedConfiguration(Poco::Util::LayeredC return true; } -std::tuple TiFlashStorageConfig::parseSettings(Poco::Util::LayeredConfiguration & config, Poco::Logger * log) +std::tuple TiFlashStorageConfig::parseSettings(Poco::Util::LayeredConfiguration & config, const LoggerPtr & log) { size_t global_capacity_quota = 0; // "0" by default, means no quota, use the whole disk capacity. TiFlashStorageConfig storage_config; @@ -379,7 +379,7 @@ std::tuple TiFlashStorageConfig::parseSettings(Poc return std::make_tuple(global_capacity_quota, storage_config); } -void StorageIORateLimitConfig::parse(const String & storage_io_rate_limit, Poco::Logger * log) +void StorageIORateLimitConfig::parse(const String & storage_io_rate_limit, const LoggerPtr & log) { std::istringstream ss(storage_io_rate_limit); cpptoml::parser p(ss); diff --git a/dbms/src/Server/StorageConfigParser.h b/dbms/src/Server/StorageConfigParser.h index 4efc5637634..f3a779d2f63 100644 --- a/dbms/src/Server/StorageConfigParser.h +++ b/dbms/src/Server/StorageConfigParser.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -75,7 +76,7 @@ struct StorageIORateLimitConfig , auto_tune_sec(5) {} - void parse(const String & storage_io_rate_limit, Poco::Logger * log); + void parse(const String & storage_io_rate_limit, const LoggerPtr & log); std::string toString() const; @@ -109,14 +110,14 @@ struct TiFlashStorageConfig Strings getAllNormalPaths() const; - static std::tuple parseSettings(Poco::Util::LayeredConfiguration & config, Poco::Logger * log); + static std::tuple parseSettings(Poco::Util::LayeredConfiguration & config, const LoggerPtr & log); private: - void parseStoragePath(const String & storage_section, Poco::Logger * log); + void parseStoragePath(const String & storage_section, const LoggerPtr & log); - bool parseFromDeprecatedConfiguration(Poco::Util::LayeredConfiguration & config, Poco::Logger * log); + bool parseFromDeprecatedConfiguration(Poco::Util::LayeredConfiguration & config, const LoggerPtr & log); - void parseMisc(const String & storage_section, Poco::Logger * log); + void parseMisc(const String & storage_section, const LoggerPtr & log); }; From 3883fbcfd372e351fbff32dff89ca8372e863c43 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Thu, 28 Jul 2022 16:17:51 +0800 Subject: [PATCH 08/15] fix Signed-off-by: Lloyd-Pottiger --- dbms/src/Encryption/RateLimiter.cpp | 7 ++++--- dbms/src/Encryption/RateLimiter.h | 7 ------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/dbms/src/Encryption/RateLimiter.cpp b/dbms/src/Encryption/RateLimiter.cpp index 0a4fcbef298..2149d31664e 100644 --- a/dbms/src/Encryption/RateLimiter.cpp +++ b/dbms/src/Encryption/RateLimiter.cpp @@ -551,20 +551,21 @@ Int64 IORateLimiter::getReadBytes(const std::string & fname [[maybe_unused]]) void IORateLimiter::getCurrentIOInfo() { static const pid_t pid = getpid(); - read_info.reset(); // Read read info of each background threads. + Int64 bg_read_bytes_tmp{0}; for (pid_t tid : bg_thread_ids) { const std::string thread_io_fname = fmt::format("/proc/{}/task/{}/io", pid, tid); Int64 read_bytes; read_bytes = getReadBytes(thread_io_fname); - read_info.bg_read_bytes += read_bytes; + bg_read_bytes_tmp += read_bytes; } + read_info.bg_read_bytes.store(bg_read_bytes_tmp); // Read read info of this process. static const std::string proc_io_fname = fmt::format("/proc/{}/io", pid); - read_info.total_read_bytes = getReadBytes(proc_io_fname); + read_info.total_read_bytes.store(getReadBytes(proc_io_fname)); } void IORateLimiter::setStop() diff --git a/dbms/src/Encryption/RateLimiter.h b/dbms/src/Encryption/RateLimiter.h index c09e33917c8..c30031b3ad9 100644 --- a/dbms/src/Encryption/RateLimiter.h +++ b/dbms/src/Encryption/RateLimiter.h @@ -25,7 +25,6 @@ #include #include #include -#include #include // TODO: separate IO utility(i.e. FileProvider, RateLimiter) from Encryption directory @@ -60,12 +59,6 @@ struct ReadInfo , bg_read_bytes(0) {} - void reset() - { - total_read_bytes = 0; - bg_read_bytes = 0; - } - std::string toString() const { return fmt::format("total_read_bytes {} bg_read_bytes {}", total_read_bytes, bg_read_bytes); From 1fc3b2577882070737f7a246ccabcf25b760d07d Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Thu, 28 Jul 2022 16:27:08 +0800 Subject: [PATCH 09/15] Update dbms/src/Encryption/RateLimiter.h --- dbms/src/Encryption/RateLimiter.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Encryption/RateLimiter.h b/dbms/src/Encryption/RateLimiter.h index 066d3355f55..dfcb4ac5a7f 100644 --- a/dbms/src/Encryption/RateLimiter.h +++ b/dbms/src/Encryption/RateLimiter.h @@ -166,7 +166,7 @@ using WriteLimiterPtr = std::shared_ptr; // // Constructor parameters: // -// `getIOStatistic_` is the function that obtain the amount of data from `getCurrentIOInfo()` which read from /proc filesystem. +// `get_read_bytes_` is the function that obtain the amount of data from `getCurrentIOInfo()` which read from /proc filesystem. // // Other parameters are the same as WriteLimiter. class ReadLimiter : public WriteLimiter From 568a08d5757754fff51d8651aaa7fdf1fd331f3f Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Thu, 28 Jul 2022 18:30:31 +0800 Subject: [PATCH 10/15] remove total_read_bytes and add fg_read_bytes Signed-off-by: Lloyd-Pottiger --- dbms/src/Encryption/RateLimiter.cpp | 9 +++++---- dbms/src/Encryption/RateLimiter.h | 10 +++++----- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/dbms/src/Encryption/RateLimiter.cpp b/dbms/src/Encryption/RateLimiter.cpp index 2149d31664e..f3d350477a3 100644 --- a/dbms/src/Encryption/RateLimiter.cpp +++ b/dbms/src/Encryption/RateLimiter.cpp @@ -438,7 +438,7 @@ void IORateLimiter::updateReadLimiter(Int64 bg_bytes, Int64 fg_bytes) return read_info.bg_read_bytes.load(); }; auto get_fg_read_io_statistic = [&]() { - return std::max(0, read_info.total_read_bytes - read_info.bg_read_bytes); + return read_info.fg_read_bytes.load(); }; if (bg_bytes == 0) @@ -544,7 +544,7 @@ Int64 IORateLimiter::getReadBytes(const std::string & fname [[maybe_unused]]) } return read_bytes; #else - return {0}; + return 0; #endif } @@ -561,11 +561,12 @@ void IORateLimiter::getCurrentIOInfo() read_bytes = getReadBytes(thread_io_fname); bg_read_bytes_tmp += read_bytes; } - read_info.bg_read_bytes.store(bg_read_bytes_tmp); + read_info.bg_read_bytes.store(bg_read_bytes_tmp, std::memory_order_relaxed); // Read read info of this process. static const std::string proc_io_fname = fmt::format("/proc/{}/io", pid); - read_info.total_read_bytes.store(getReadBytes(proc_io_fname)); + Int64 fg_read_bytes_tmp{getReadBytes(proc_io_fname) - bg_read_bytes_tmp}; + read_info.fg_read_bytes.store(std::min(0, fg_read_bytes_tmp), std::memory_order_relaxed); } void IORateLimiter::setStop() diff --git a/dbms/src/Encryption/RateLimiter.h b/dbms/src/Encryption/RateLimiter.h index dfcb4ac5a7f..ee951254d22 100644 --- a/dbms/src/Encryption/RateLimiter.h +++ b/dbms/src/Encryption/RateLimiter.h @@ -47,21 +47,21 @@ enum class LimiterType }; /// ReadInfo is used to store IO information. -/// total_read_bytes is the total bytes of the read. /// bg_read_bytes is the bytes of the background read. +/// fg_read_bytes is the bytes of the foreground read. struct ReadInfo { - std::atomic total_read_bytes; std::atomic bg_read_bytes; + std::atomic fg_read_bytes; ReadInfo() - : total_read_bytes(0) - , bg_read_bytes(0) + : bg_read_bytes(0) + , fg_read_bytes(0) {} std::string toString() const { - return fmt::format("total_read_bytes {} bg_read_bytes {}", total_read_bytes, bg_read_bytes); + return fmt::format("fg_read_bytes {} bg_read_bytes {}", fg_read_bytes, bg_read_bytes); } }; From 48e291fb71683b86f1403136228cf6b8822099c2 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Thu, 28 Jul 2022 18:34:41 +0800 Subject: [PATCH 11/15] load with memory_order_relaxed Signed-off-by: Lloyd-Pottiger --- dbms/src/Encryption/RateLimiter.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Encryption/RateLimiter.cpp b/dbms/src/Encryption/RateLimiter.cpp index f3d350477a3..9cf75a8d505 100644 --- a/dbms/src/Encryption/RateLimiter.cpp +++ b/dbms/src/Encryption/RateLimiter.cpp @@ -435,10 +435,10 @@ void IORateLimiter::updateReadLimiter(Int64 bg_bytes, Int64 fg_bytes) { LOG_FMT_INFO(log, "updateReadLimiter: bg_bytes {} fg_bytes {}", bg_bytes, fg_bytes); auto get_bg_read_io_statistic = [&]() { - return read_info.bg_read_bytes.load(); + return read_info.bg_read_bytes.load(std::memory_order_relaxed); }; auto get_fg_read_io_statistic = [&]() { - return read_info.fg_read_bytes.load(); + return read_info.fg_read_bytes.load(std::memory_order_relaxed); }; if (bg_bytes == 0) From 11bec268f2493d31c801cbca106544022596614d Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Fri, 29 Jul 2022 14:03:34 +0800 Subject: [PATCH 12/15] update the sleep position Signed-off-by: Lloyd-Pottiger --- dbms/src/Encryption/RateLimiter.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Encryption/RateLimiter.cpp b/dbms/src/Encryption/RateLimiter.cpp index 9cf75a8d505..4775a77438c 100644 --- a/dbms/src/Encryption/RateLimiter.cpp +++ b/dbms/src/Encryption/RateLimiter.cpp @@ -600,9 +600,10 @@ void IORateLimiter::runAutoTune() using time_point = std::chrono::time_point; using clock = std::chrono::system_clock; time_point auto_tune_time = clock::now(); - time_point update_io_stat_time = clock::now(); + time_point update_io_stat_time = auto_tune_time; while (!stop.load(std::memory_order_relaxed)) { + std::this_thread::sleep_for(std::chrono::milliseconds(update_io_stat_period_ms)); auto now_time_point = clock::now(); if ((io_config.auto_tune_sec > 0) && (now_time_point - auto_tune_time > std::chrono::seconds(io_config.auto_tune_sec))) { @@ -613,7 +614,6 @@ void IORateLimiter::runAutoTune() { getCurrentIOInfo(); update_io_stat_time = now_time_point; - std::this_thread::sleep_for(std::chrono::milliseconds(update_io_stat_period_ms - 1)); } } }; From 35dd172706244f71e9b56cea9fc5c243be9fcf1b Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Mon, 1 Aug 2022 10:49:42 +0800 Subject: [PATCH 13/15] fix ut Signed-off-by: Lloyd-Pottiger --- dbms/src/Encryption/RateLimiter.cpp | 2 +- .../Encryption/tests/gtest_rate_limiter.cpp | 24 +++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/dbms/src/Encryption/RateLimiter.cpp b/dbms/src/Encryption/RateLimiter.cpp index 4775a77438c..9a15ab9ea76 100644 --- a/dbms/src/Encryption/RateLimiter.cpp +++ b/dbms/src/Encryption/RateLimiter.cpp @@ -566,7 +566,7 @@ void IORateLimiter::getCurrentIOInfo() // Read read info of this process. static const std::string proc_io_fname = fmt::format("/proc/{}/io", pid); Int64 fg_read_bytes_tmp{getReadBytes(proc_io_fname) - bg_read_bytes_tmp}; - read_info.fg_read_bytes.store(std::min(0, fg_read_bytes_tmp), std::memory_order_relaxed); + read_info.fg_read_bytes.store(std::max(0, fg_read_bytes_tmp), std::memory_order_relaxed); } void IORateLimiter::setStop() diff --git a/dbms/src/Encryption/tests/gtest_rate_limiter.cpp b/dbms/src/Encryption/tests/gtest_rate_limiter.cpp index 149c4446a13..3b053a2355a 100644 --- a/dbms/src/Encryption/tests/gtest_rate_limiter.cpp +++ b/dbms/src/Encryption/tests/gtest_rate_limiter.cpp @@ -19,9 +19,11 @@ #include #include +#include #include #include #include +#include "common/types.h" #ifdef __linux__ #include @@ -379,12 +381,10 @@ TEST(IORateLimiterTest, IOStat) n = ::pread(fd, buf, buf_size, 0); ASSERT_EQ(n, buf_size) << strerror(errno); - //int ret = ::fsync(fd); - //ASSERT_EQ(ret, 0) << strerror(errno); - - auto io_info = io_rate_limiter.getCurrentIOInfo(); - ASSERT_GE(io_info.total_write_bytes, buf_size); - ASSERT_GE(io_info.total_read_bytes, buf_size); + io_rate_limiter.getCurrentIOInfo(); + Int64 bg_read_bytes = io_rate_limiter.read_info.bg_read_bytes.load(std::memory_order_relaxed); + Int64 fg_read_bytes = io_rate_limiter.read_info.fg_read_bytes.load(std::memory_order_relaxed); + ASSERT_GE(bg_read_bytes + fg_read_bytes, buf_size); } TEST(IORateLimiterTest, IOStatMultiThread) @@ -448,12 +448,12 @@ TEST(IORateLimiterTest, IOStatMultiThread) IORateLimiter io_rate_limiter; io_rate_limiter.setBackgroundThreadIds(bg_pids); - auto io_info = io_rate_limiter.getCurrentIOInfo(); - std::cout << io_info.toString() << std::endl; - ASSERT_GE(io_info.total_read_bytes, buf_size * (bg_thread_count + fg_thread_count)); - ASSERT_GE(io_info.total_write_bytes, buf_size * (bg_thread_count + fg_thread_count)); - ASSERT_GE(io_info.bg_read_bytes, buf_size * bg_thread_count); - ASSERT_GE(io_info.bg_write_bytes, buf_size * bg_thread_count); + + io_rate_limiter.getCurrentIOInfo(); + Int64 bg_read_bytes = io_rate_limiter.read_info.bg_read_bytes.load(std::memory_order_relaxed); + Int64 fg_read_bytes = io_rate_limiter.read_info.fg_read_bytes.load(std::memory_order_relaxed); + ASSERT_GE(fg_read_bytes, buf_size * fg_thread_count); + ASSERT_GE(bg_read_bytes, buf_size * bg_thread_count); stop.store(true); for (auto & t : threads) From e304dea993ed37db193ead9d8cec0559cc7891fc Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Mon, 1 Aug 2022 11:33:49 +0800 Subject: [PATCH 14/15] avoid meanless calculation Signed-off-by: Lloyd-Pottiger --- dbms/src/Encryption/RateLimiter.cpp | 4 ++++ dbms/src/Encryption/tests/gtest_rate_limiter.cpp | 3 +-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dbms/src/Encryption/RateLimiter.cpp b/dbms/src/Encryption/RateLimiter.cpp index 9a15ab9ea76..e2777e4bdcf 100644 --- a/dbms/src/Encryption/RateLimiter.cpp +++ b/dbms/src/Encryption/RateLimiter.cpp @@ -310,6 +310,10 @@ Int64 ReadLimiter::getAvailableBalance() last_stat_bytes, bytes); } + else if (bytes == last_stat_bytes) + { + return available_balance; + } else { Int64 real_alloc_bytes = bytes - last_stat_bytes; diff --git a/dbms/src/Encryption/tests/gtest_rate_limiter.cpp b/dbms/src/Encryption/tests/gtest_rate_limiter.cpp index 3b053a2355a..854473bae44 100644 --- a/dbms/src/Encryption/tests/gtest_rate_limiter.cpp +++ b/dbms/src/Encryption/tests/gtest_rate_limiter.cpp @@ -18,11 +18,10 @@ #include #include -#include -#include #include #include #include + #include "common/types.h" #ifdef __linux__ From 820766bd4b0a5c95846868fa725867fad561e530 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Wed, 3 Aug 2022 16:32:23 +0800 Subject: [PATCH 15/15] rename some variables and change default value to 30 Signed-off-by: Lloyd-Pottiger --- dbms/src/Encryption/RateLimiter.cpp | 26 +++++++++++++------------- dbms/src/Encryption/RateLimiter.h | 8 ++++---- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/dbms/src/Encryption/RateLimiter.cpp b/dbms/src/Encryption/RateLimiter.cpp index e2777e4bdcf..053dc5a816b 100644 --- a/dbms/src/Encryption/RateLimiter.cpp +++ b/dbms/src/Encryption/RateLimiter.cpp @@ -302,7 +302,7 @@ ReadLimiter::ReadLimiter( Int64 ReadLimiter::getAvailableBalance() { Int64 bytes = get_read_bytes(); - if (bytes < last_stat_bytes) + if (unlikely(bytes < last_stat_bytes)) { LOG_FMT_WARNING( log, @@ -310,7 +310,7 @@ Int64 ReadLimiter::getAvailableBalance() last_stat_bytes, bytes); } - else if (bytes == last_stat_bytes) + else if (likely(bytes == last_stat_bytes)) { return available_balance; } @@ -364,18 +364,18 @@ void ReadLimiter::refillAndAlloc() } } -IORateLimiter::IORateLimiter(UInt64 update_io_stat_period_ms_) +IORateLimiter::IORateLimiter(UInt64 update_read_info_period_ms_) : log(Logger::get("IORateLimiter")) , stop(false) - , update_io_stat_period_ms(update_io_stat_period_ms_) + , update_read_info_period_ms(update_read_info_period_ms_) {} IORateLimiter::~IORateLimiter() { stop.store(true, std::memory_order_relaxed); - if (auto_tune_and_get_io_info_thread.joinable()) + if (auto_tune_and_get_read_info_thread.joinable()) { - auto_tune_and_get_io_info_thread.join(); + auto_tune_and_get_read_info_thread.join(); } } @@ -600,28 +600,28 @@ void IORateLimiter::setStop() void IORateLimiter::runAutoTune() { - auto auto_tune_and_get_io_info_worker = [&]() { + auto auto_tune_and_get_read_info_worker = [&]() { using time_point = std::chrono::time_point; using clock = std::chrono::system_clock; time_point auto_tune_time = clock::now(); - time_point update_io_stat_time = auto_tune_time; + time_point update_read_info_time = auto_tune_time; while (!stop.load(std::memory_order_relaxed)) { - std::this_thread::sleep_for(std::chrono::milliseconds(update_io_stat_period_ms)); + std::this_thread::sleep_for(std::chrono::milliseconds(update_read_info_period_ms)); auto now_time_point = clock::now(); - if ((io_config.auto_tune_sec > 0) && (now_time_point - auto_tune_time > std::chrono::seconds(io_config.auto_tune_sec))) + if ((io_config.auto_tune_sec > 0) && (now_time_point - auto_tune_time >= std::chrono::seconds(io_config.auto_tune_sec))) { autoTune(); auto_tune_time = now_time_point; } - if ((bg_read_limiter || fg_read_limiter) && (now_time_point - update_io_stat_time > std::chrono::milliseconds(update_io_stat_period_ms))) + if ((bg_read_limiter || fg_read_limiter) && likely(now_time_point - update_read_info_time >= std::chrono::milliseconds(update_read_info_period_ms))) { getCurrentIOInfo(); - update_io_stat_time = now_time_point; + update_read_info_time = now_time_point; } } }; - auto_tune_and_get_io_info_thread = std::thread(auto_tune_and_get_io_info_worker); + auto_tune_and_get_read_info_thread = std::thread(auto_tune_and_get_read_info_worker); } std::unique_ptr IORateLimiter::createIOLimitTuner() diff --git a/dbms/src/Encryption/RateLimiter.h b/dbms/src/Encryption/RateLimiter.h index ee951254d22..4e7846b9a64 100644 --- a/dbms/src/Encryption/RateLimiter.h +++ b/dbms/src/Encryption/RateLimiter.h @@ -204,11 +204,11 @@ using ReadLimiterPtr = std::shared_ptr; // // Constructor parameters: // -// `update_io_stat_period_ms` is the interval between calling getCurrentIOInfo. Default is 200ms. +// `update_read_info_period_ms` is the interval between calling getCurrentIOInfo. Default is 30ms. class IORateLimiter { public: - explicit IORateLimiter(UInt64 update_io_stat_period_ms = 200); + explicit IORateLimiter(UInt64 update_read_info_period_ms_ = 30); ~IORateLimiter(); WriteLimiterPtr getWriteLimiter(); @@ -248,9 +248,9 @@ class IORateLimiter LoggerPtr log; std::atomic stop; - std::thread auto_tune_and_get_io_info_thread; + std::thread auto_tune_and_get_read_info_thread; ReadInfo read_info; - const UInt64 update_io_stat_period_ms; + const UInt64 update_read_info_period_ms; // Noncopyable and nonmovable. DISALLOW_COPY_AND_MOVE(IORateLimiter);