From d60dc70a7d10356274c715b76545a84c14aae5d3 Mon Sep 17 00:00:00 2001 From: Abhinav Sharma Date: Wed, 17 Jul 2024 17:03:04 -0700 Subject: [PATCH] Fixing missed notification in wait_for_all_committing_trxs_to_finish() Summary: There were two problems with the existing code that caused missed notification such that `wait_for_all_committing_trxs_to_finish()` waited forever: 1. In group commit the follower threads were incrementing committing trxs without holding LOCK_log which meant that the counter could increment after going down to 0 and since the counter doesn't use mutex (it's atomic) there can be a race such that the predicate check while waiting fails and we never notify after so we're stuck. Another problem with this was that the counter can go to 0 if only the leader thread decrements it before the followers have the chance to increment it. This will cause us to wake up before all trxs are actually finished. 2. Since we don't hold a mutex while inc and dec the counter there can be a race such that the waiting thread fails the predicate on a non-zero counter, then before we go into waiting, the counter is dec to 0 and we notify. But since the notification happened before we went into waiting we miss it completely and wait forever. To fix #1 we make sure that we increment the counter only while holding LOCK_log. To fix #2 we take a mutex before notifying when the counter goes to 0, this way the notification will be either entirely before or after the wait. Out of paranoia also added a gvar to skip waiting altogether and converted the wait() to a wait_for(). Test Plan: Ran a mysqlslap with high concurrency on both primary and replica and created snapshots on loop. Also exiting mtr for functional testing. Reviewers: yoshinori, yichenshen Reviewed By: yichenshen Differential Revision: https://phabricator.intern.facebook.com/D59882151 --- mysql-test/r/mysqld--help-notwin.result | 4 +++ ...applied_opid_in_snapshot_info_basic.result | 19 +++++++++++ ...e_applied_opid_in_snapshot_info_basic.test | 17 ++++++++++ sql/binlog.cc | 34 ++++++++++--------- sql/binlog.h | 12 ++++--- sql/mysqld.cc | 1 + sql/mysqld.h | 2 ++ sql/sys_vars.cc | 6 ++++ 8 files changed, 75 insertions(+), 20 deletions(-) create mode 100644 mysql-test/suite/sys_vars/r/include_applied_opid_in_snapshot_info_basic.result create mode 100644 mysql-test/suite/sys_vars/t/include_applied_opid_in_snapshot_info_basic.test diff --git a/mysql-test/r/mysqld--help-notwin.result b/mysql-test/r/mysqld--help-notwin.result index f3b33e5ffaad..2932612c1066 100644 --- a/mysql-test/r/mysqld--help-notwin.result +++ b/mysql-test/r/mysqld--help-notwin.result @@ -789,6 +789,9 @@ The following options may be given as the first argument: Include the table name in the error text when receiving a duplicate key error and log the query into a new duplicate key query log file. + --include-applied-opid-in-snapshot-info + Include applied opid in consistent snapshot info + (Defaults to on; use --skip-include-applied-opid-in-snapshot-info to disable.) --index-stats-control=name Control the collection of index statistics. The data is exposed via the index_statistics table in performance @@ -3539,6 +3542,7 @@ hlc-upper-bound-delta 0 host-cache-size 279 hostname my-hostname improved-dup-key-error FALSE +include-applied-opid-in-snapshot-info TRUE index-stats-control OFF_HARD information-schema-engine TempTable information-schema-stats-expiry 86400 diff --git a/mysql-test/suite/sys_vars/r/include_applied_opid_in_snapshot_info_basic.result b/mysql-test/suite/sys_vars/r/include_applied_opid_in_snapshot_info_basic.result new file mode 100644 index 000000000000..993f7dd3e66c --- /dev/null +++ b/mysql-test/suite/sys_vars/r/include_applied_opid_in_snapshot_info_basic.result @@ -0,0 +1,19 @@ +Default value is true +SELECT @@global.include_applied_opid_in_snapshot_info; +@@global.include_applied_opid_in_snapshot_info +1 +SELECT @@session.include_applied_opid_in_snapshot_info; +ERROR HY000: Variable 'include_applied_opid_in_snapshot_info' is a GLOBAL variable +Expected error 'Variable is a GLOBAL variable' +SET @@global.include_applied_opid_in_snapshot_info = true; +SELECT @@global.include_applied_opid_in_snapshot_info; +@@global.include_applied_opid_in_snapshot_info +1 +SET @@global.include_applied_opid_in_snapshot_info = false; +SELECT @@global.include_applied_opid_in_snapshot_info; +@@global.include_applied_opid_in_snapshot_info +0 +SET @@global.include_applied_opid_in_snapshot_info = default; +SELECT @@global.include_applied_opid_in_snapshot_info; +@@global.include_applied_opid_in_snapshot_info +1 diff --git a/mysql-test/suite/sys_vars/t/include_applied_opid_in_snapshot_info_basic.test b/mysql-test/suite/sys_vars/t/include_applied_opid_in_snapshot_info_basic.test new file mode 100644 index 000000000000..2bff1d2377d2 --- /dev/null +++ b/mysql-test/suite/sys_vars/t/include_applied_opid_in_snapshot_info_basic.test @@ -0,0 +1,17 @@ +-- source include/load_sysvars.inc + +--echo Default value is true +SELECT @@global.include_applied_opid_in_snapshot_info; + +--Error ER_INCORRECT_GLOBAL_LOCAL_VAR +SELECT @@session.include_applied_opid_in_snapshot_info; +--echo Expected error 'Variable is a GLOBAL variable' + +SET @@global.include_applied_opid_in_snapshot_info = true; +SELECT @@global.include_applied_opid_in_snapshot_info; + +SET @@global.include_applied_opid_in_snapshot_info = false; +SELECT @@global.include_applied_opid_in_snapshot_info; + +SET @@global.include_applied_opid_in_snapshot_info = default; +SELECT @@global.include_applied_opid_in_snapshot_info; diff --git a/sql/binlog.cc b/sql/binlog.cc index a2c06f6e2e90..2dfe3b98061e 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -9817,12 +9817,14 @@ void MYSQL_BIN_LOG::lock_commits(snapshot_info_st *ss_info) { global_sid_lock->unlock(); - // unlike drain_committing_trxs() this waits for the entire pipeline to finish - // i.e. after_commit hook etc. - wait_for_all_committing_trxs_to_finish(); + if (include_applied_opid_in_snapshot_info) { + // unlike drain_committing_trxs() this waits for the entire pipeline to + // finish i.e. after_commit hook etc. + wait_for_all_committing_trxs_to_finish(); - if (!get_applied_opid_set(&ss_info->applied_opid_set)) { - ss_info->applied_opid_set = "-1:-1"; + if (!get_applied_opid_set(&ss_info->applied_opid_set)) { + ss_info->applied_opid_set = "-1:-1"; + } } ss_info->dbtids = get_dbtids_str(); @@ -11174,6 +11176,7 @@ int MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var, assert(thd_count > 0); DBUG_PRINT("info", ("Number of threads in group commit %llu", thd_count)); counter_histogram_increment(&histogram_binlog_group_commit, thd_count); + inc_num_committing_trxs(thd_count); *out_queue_var = first_seen; *total_bytes_var = total_bytes; @@ -11659,6 +11662,10 @@ int MYSQL_BIN_LOG::finish_commit(THD *thd) { DBUG_TRACE; DEBUG_SYNC(thd, "reached_finish_commit"); + // decrement num committing trxs after this method is done + raii::Sentry<> num_committing_trxs_guard{ + [&]() -> void { dec_num_committing_trxs(); }}; + /* In some unlikely situations, it can happen that binary log is closed before the thread flushes it's cache. @@ -12115,19 +12122,9 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) { &LOCK_log)) { DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(), thd->commit_error)); - // incrementing num committing trxs after holding LOCK_log - inc_num_committing_trxs(); - const int ret = finish_commit(thd); - dec_num_committing_trxs(); - return ret; + return finish_commit(thd); } - // incrementing num committing trxs after holding LOCK_log - inc_num_committing_trxs(); - // decrement num committing trxs after this method is done - raii::Sentry<> num_committing_trxs_guard{ - [&]() -> void { dec_num_committing_trxs(); }}; - if (enable_raft_plugin) { enable_raft_plugin_save = enable_raft_plugin; check_and_register_log_entities(thd); @@ -12139,6 +12136,11 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) { bool update_binlog_end_pos_after_sync; if (unlikely(!is_open())) { final_queue = fetch_and_process_flush_stage_queue(true); + uint32_t num_trxs = 0; + for (THD *head = final_queue; head; head = head->next_to_commit) { + ++num_trxs; + } + inc_num_committing_trxs(num_trxs); leave_mutex_before_commit_stage = &LOCK_log; /* binary log is closed, flush stage and sync stage should be diff --git a/sql/binlog.h b/sql/binlog.h index 35cbded97a98..cc3cae18e6f8 100644 --- a/sql/binlog.h +++ b/sql/binlog.h @@ -566,12 +566,14 @@ class MYSQL_BIN_LOG : public TC_LOG { std::mutex m_num_committing_trxs_lock; std::condition_variable m_num_committing_trxs_cv; - void inc_num_committing_trxs() { - ++m_num_committing_trxs; + void inc_num_committing_trxs(uint32_t inc_by = 1) { + mysql_mutex_assert_owner(&LOCK_log); + m_num_committing_trxs += inc_by; } void dec_num_committing_trxs() { if (--m_num_committing_trxs == 0) { + std::unique_lock lock(m_num_committing_trxs_lock); m_num_committing_trxs_cv.notify_all(); } } @@ -614,9 +616,11 @@ class MYSQL_BIN_LOG : public TC_LOG { public: void wait_for_all_committing_trxs_to_finish() { + mysql_mutex_assert_owner(&LOCK_log); std::unique_lock lock(m_num_committing_trxs_lock); - m_num_committing_trxs_cv.wait(lock, - [&] { return m_num_committing_trxs == 0; }); + while (m_num_committing_trxs) { + m_num_committing_trxs_cv.wait_for(lock, std::chrono::milliseconds(5)); + } } /* diff --git a/sql/mysqld.cc b/sql/mysqld.cc index e7fd2e46aae2..d160f67c4011 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -1584,6 +1584,7 @@ bool opt_core_file = false; bool skip_core_dump_on_error = false; bool show_query_digest = false; bool skip_sys_tables_engine_check = false; +bool include_applied_opid_in_snapshot_info = true; /* write_control_level: * Global variable to control write throttling for short running writes */ diff --git a/sql/mysqld.h b/sql/mysqld.h index c35baaf0df18..103c86c59489 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -794,6 +794,8 @@ extern PSI_file_key key_file_binlog_index_cache; extern bool skip_sys_tables_engine_check; +extern bool include_applied_opid_in_snapshot_info; + #ifdef HAVE_PSI_INTERFACE extern PSI_mutex_key key_LOCK_tc; diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index a65f5c3d7640..5a180dfa9758 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -10540,3 +10540,9 @@ static Sys_var_dbtids Sys_dbtids("dbtids", "DB transaction IDs", NON_PERSIST GLOBAL_VAR(dbtids), NO_CMD_LINE, DEFAULT(nullptr), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(nullptr)); + +static Sys_var_bool Sys_include_applied_opid_in_snapshot_info( + "include_applied_opid_in_snapshot_info", + "Include applied opid in consistent snapshot info", + GLOBAL_VAR(include_applied_opid_in_snapshot_info), CMD_LINE(OPT_ARG), + DEFAULT(true));