From a4472640cb32b39f4d3f4acf5c4eb8d6ccc49840 Mon Sep 17 00:00:00 2001 From: leipeng Date: Tue, 28 Feb 2023 11:27:06 +0800 Subject: [PATCH 1/2] rdb_sst_info: commit sst_file in dedicated threads --- storage/rocksdb/rdb_sst_info.cc | 20 ++++++++++++++++++++ storage/rocksdb/rdb_sst_info.h | 3 +++ 2 files changed, 23 insertions(+) diff --git a/storage/rocksdb/rdb_sst_info.cc b/storage/rocksdb/rdb_sst_info.cc index dd4f111d1a06..a9f9f7131279 100644 --- a/storage/rocksdb/rdb_sst_info.cc +++ b/storage/rocksdb/rdb_sst_info.cc @@ -355,6 +355,7 @@ Rdb_sst_info::Rdb_sst_info(rocksdb::DB *const db, const std::string &tablename, Rdb_sst_info::~Rdb_sst_info() { assert(m_sst_file == nullptr); + SHIP_ASSERT(m_commiting_threads.empty()); for (const auto &sst_file : m_committed_files) { // In case something went wrong attempt to delete the temporary file. @@ -392,13 +393,23 @@ int Rdb_sst_info::open_new_sst_file() { } void Rdb_sst_info::commit_sst_file(Rdb_sst_file_ordered *sst_file) { + m_commiting_threads_mutex.lock(); + m_commiting_threads.emplace_back( + &Rdb_sst_info::commit_sst_file_func, this, sst_file); + m_commiting_threads_mutex.unlock(); +} + +void Rdb_sst_info::commit_sst_file_func(Rdb_sst_file_ordered* sst_file) { const rocksdb::Status s = sst_file->commit(); + + m_commiting_threads_mutex.lock(); if (!s.ok()) { set_error_msg(sst_file->get_name(), s); set_background_error(HA_ERR_ROCKSDB_BULK_LOAD); } m_committed_files.push_back(sst_file->get_name()); + m_commiting_threads_mutex.unlock(); delete sst_file; } @@ -472,11 +483,20 @@ int Rdb_sst_info::finish(Rdb_sst_commit_info *commit_info, return ret; } + auto join_commiting_threads = [this]() { + for (auto& thr : m_commiting_threads) { + thr.join(); + } + m_commiting_threads.clear(); + }; + join_commiting_threads(); + m_print_client_error = print_client_error; if (m_curr_size > 0) { // Close out any existing files close_curr_sst_file(); + join_commiting_threads(); } // This checks out the list of files so that the caller can collect/group diff --git a/storage/rocksdb/rdb_sst_info.h b/storage/rocksdb/rdb_sst_info.h index 8d7351f58294..1195619b3c2f 100644 --- a/storage/rocksdb/rdb_sst_info.h +++ b/storage/rocksdb/rdb_sst_info.h @@ -132,6 +132,8 @@ class Rdb_sst_info { // List of committed SST files - we'll ingest them later in one single batch std::vector m_committed_files; + std::vector m_commiting_threads; + std::mutex m_commiting_threads_mutex; const bool m_tracing; bool m_print_client_error; @@ -139,6 +141,7 @@ class Rdb_sst_info { int open_new_sst_file(); void close_curr_sst_file(); void commit_sst_file(Rdb_sst_file_ordered *sst_file); + void commit_sst_file_func(Rdb_sst_file_ordered*); void set_error_msg(const std::string &sst_file_name, const rocksdb::Status &s); From 33787bb63388a23c200f74c495580b28b585b029 Mon Sep 17 00:00:00 2001 From: leipeng Date: Wed, 1 Mar 2023 12:30:15 +0800 Subject: [PATCH 2/2] rdb_sst_info: commit sst_file in dedicated threads: simplify --- storage/rocksdb/rdb_sst_info.cc | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/storage/rocksdb/rdb_sst_info.cc b/storage/rocksdb/rdb_sst_info.cc index a9f9f7131279..007cfbbdc718 100644 --- a/storage/rocksdb/rdb_sst_info.cc +++ b/storage/rocksdb/rdb_sst_info.cc @@ -483,22 +483,18 @@ int Rdb_sst_info::finish(Rdb_sst_commit_info *commit_info, return ret; } - auto join_commiting_threads = [this]() { - for (auto& thr : m_commiting_threads) { - thr.join(); - } - m_commiting_threads.clear(); - }; - join_commiting_threads(); - m_print_client_error = print_client_error; if (m_curr_size > 0) { // Close out any existing files close_curr_sst_file(); - join_commiting_threads(); } + for (auto& thr : m_commiting_threads) { + thr.join(); + } + m_commiting_threads.clear(); + // This checks out the list of files so that the caller can collect/group // them and ingest them all in one go, and any racing calls to commit // won't see them at all