Skip to content

Commit

Permalink
Fixing missed notification in wait_for_all_committing_trxs_to_finish()
Browse files Browse the repository at this point in the history
Summary:
There were two problems with the existing code that caused
missed notification such that `wait_for_all_committing_trxs_to_finish()`
waited forever:
1. In group commit the follower threads were incrementing committing
   trxs without holding LOCK_log which meant that the counter could
   increment after going down to 0 and since the counter doesn't use
   mutex (it's atomic) there can be a race such that the predicate check
   while waiting fails and we never notify after so we're stuck. Another
   problem with this was that the counter can go to 0 if only the leader
   thread decrements it before the followers have the chance to
   increment it. This will cause us to wake up before all trxs are
   actually finished.
2. Since we don't hold a mutex while inc and dec the counter there can
   be a race such that the waiting thread fails the predicate on a
   non-zero counter, then before we go into waiting, the counter is dec
   to 0 and we notify. But since the notification happened before we
   went into waiting we miss it completely and wait forever.

To fix facebook#1 we make sure that we increment the counter only while holding
LOCK_log. To fix facebook#2 we take a mutex before notifying when the counter
goes to 0, this way the notification will be either entirely before or
after the wait.

Out of paranoia also added a gvar to skip waiting altogether and
converted the wait() to a wait_for().

Test Plan:
Ran a mysqlslap with high concurrency on both primary and
replica and created snapshots on loop. Also exiting mtr for functional
testing.

Reviewers: yoshinori, yichenshen

Reviewed By: yichenshen

Differential Revision: https://phabricator.intern.facebook.com/D59882151
  • Loading branch information
abhinav04sharma committed Jul 18, 2024
1 parent 1d51c17 commit d60dc70
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 20 deletions.
4 changes: 4 additions & 0 deletions mysql-test/r/mysqld--help-notwin.result
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,9 @@ The following options may be given as the first argument:
Include the table name in the error text when receiving a
duplicate key error and log the query into a new
duplicate key query log file.
--include-applied-opid-in-snapshot-info
Include applied opid in consistent snapshot info
(Defaults to on; use --skip-include-applied-opid-in-snapshot-info to disable.)
--index-stats-control=name
Control the collection of index statistics. The data is
exposed via the index_statistics table in performance
Expand Down Expand Up @@ -3539,6 +3542,7 @@ hlc-upper-bound-delta 0
host-cache-size 279
hostname my-hostname
improved-dup-key-error FALSE
include-applied-opid-in-snapshot-info TRUE
index-stats-control OFF_HARD
information-schema-engine TempTable
information-schema-stats-expiry 86400
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Default value is true
SELECT @@global.include_applied_opid_in_snapshot_info;
@@global.include_applied_opid_in_snapshot_info
1
SELECT @@session.include_applied_opid_in_snapshot_info;
ERROR HY000: Variable 'include_applied_opid_in_snapshot_info' is a GLOBAL variable
Expected error 'Variable is a GLOBAL variable'
SET @@global.include_applied_opid_in_snapshot_info = true;
SELECT @@global.include_applied_opid_in_snapshot_info;
@@global.include_applied_opid_in_snapshot_info
1
SET @@global.include_applied_opid_in_snapshot_info = false;
SELECT @@global.include_applied_opid_in_snapshot_info;
@@global.include_applied_opid_in_snapshot_info
0
SET @@global.include_applied_opid_in_snapshot_info = default;
SELECT @@global.include_applied_opid_in_snapshot_info;
@@global.include_applied_opid_in_snapshot_info
1
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- source include/load_sysvars.inc

--echo Default value is true
SELECT @@global.include_applied_opid_in_snapshot_info;

--Error ER_INCORRECT_GLOBAL_LOCAL_VAR
SELECT @@session.include_applied_opid_in_snapshot_info;
--echo Expected error 'Variable is a GLOBAL variable'

SET @@global.include_applied_opid_in_snapshot_info = true;
SELECT @@global.include_applied_opid_in_snapshot_info;

SET @@global.include_applied_opid_in_snapshot_info = false;
SELECT @@global.include_applied_opid_in_snapshot_info;

SET @@global.include_applied_opid_in_snapshot_info = default;
SELECT @@global.include_applied_opid_in_snapshot_info;
34 changes: 18 additions & 16 deletions sql/binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9817,12 +9817,14 @@ void MYSQL_BIN_LOG::lock_commits(snapshot_info_st *ss_info) {

global_sid_lock->unlock();

// unlike drain_committing_trxs() this waits for the entire pipeline to finish
// i.e. after_commit hook etc.
wait_for_all_committing_trxs_to_finish();
if (include_applied_opid_in_snapshot_info) {
// unlike drain_committing_trxs() this waits for the entire pipeline to
// finish i.e. after_commit hook etc.
wait_for_all_committing_trxs_to_finish();

if (!get_applied_opid_set(&ss_info->applied_opid_set)) {
ss_info->applied_opid_set = "-1:-1";
if (!get_applied_opid_set(&ss_info->applied_opid_set)) {
ss_info->applied_opid_set = "-1:-1";
}
}

ss_info->dbtids = get_dbtids_str();
Expand Down Expand Up @@ -11174,6 +11176,7 @@ int MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
assert(thd_count > 0);
DBUG_PRINT("info", ("Number of threads in group commit %llu", thd_count));
counter_histogram_increment(&histogram_binlog_group_commit, thd_count);
inc_num_committing_trxs(thd_count);

*out_queue_var = first_seen;
*total_bytes_var = total_bytes;
Expand Down Expand Up @@ -11659,6 +11662,10 @@ int MYSQL_BIN_LOG::finish_commit(THD *thd) {
DBUG_TRACE;
DEBUG_SYNC(thd, "reached_finish_commit");

// decrement num committing trxs after this method is done
raii::Sentry<> num_committing_trxs_guard{
[&]() -> void { dec_num_committing_trxs(); }};

/*
In some unlikely situations, it can happen that binary
log is closed before the thread flushes it's cache.
Expand Down Expand Up @@ -12115,19 +12122,9 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) {
&LOCK_log)) {
DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d", thd->thread_id(),
thd->commit_error));
// incrementing num committing trxs after holding LOCK_log
inc_num_committing_trxs();
const int ret = finish_commit(thd);
dec_num_committing_trxs();
return ret;
return finish_commit(thd);
}

// incrementing num committing trxs after holding LOCK_log
inc_num_committing_trxs();
// decrement num committing trxs after this method is done
raii::Sentry<> num_committing_trxs_guard{
[&]() -> void { dec_num_committing_trxs(); }};

if (enable_raft_plugin) {
enable_raft_plugin_save = enable_raft_plugin;
check_and_register_log_entities(thd);
Expand All @@ -12139,6 +12136,11 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) {
bool update_binlog_end_pos_after_sync;
if (unlikely(!is_open())) {
final_queue = fetch_and_process_flush_stage_queue(true);
uint32_t num_trxs = 0;
for (THD *head = final_queue; head; head = head->next_to_commit) {
++num_trxs;
}
inc_num_committing_trxs(num_trxs);
leave_mutex_before_commit_stage = &LOCK_log;
/*
binary log is closed, flush stage and sync stage should be
Expand Down
12 changes: 8 additions & 4 deletions sql/binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -566,12 +566,14 @@ class MYSQL_BIN_LOG : public TC_LOG {
std::mutex m_num_committing_trxs_lock;
std::condition_variable m_num_committing_trxs_cv;

void inc_num_committing_trxs() {
++m_num_committing_trxs;
void inc_num_committing_trxs(uint32_t inc_by = 1) {
mysql_mutex_assert_owner(&LOCK_log);
m_num_committing_trxs += inc_by;
}

void dec_num_committing_trxs() {
if (--m_num_committing_trxs == 0) {
std::unique_lock<std::mutex> lock(m_num_committing_trxs_lock);
m_num_committing_trxs_cv.notify_all();
}
}
Expand Down Expand Up @@ -614,9 +616,11 @@ class MYSQL_BIN_LOG : public TC_LOG {

public:
void wait_for_all_committing_trxs_to_finish() {
mysql_mutex_assert_owner(&LOCK_log);
std::unique_lock<std::mutex> lock(m_num_committing_trxs_lock);
m_num_committing_trxs_cv.wait(lock,
[&] { return m_num_committing_trxs == 0; });
while (m_num_committing_trxs) {
m_num_committing_trxs_cv.wait_for(lock, std::chrono::milliseconds(5));
}
}

/*
Expand Down
1 change: 1 addition & 0 deletions sql/mysqld.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,7 @@ bool opt_core_file = false;
bool skip_core_dump_on_error = false;
bool show_query_digest = false;
bool skip_sys_tables_engine_check = false;
bool include_applied_opid_in_snapshot_info = true;
/* write_control_level:
* Global variable to control write throttling for short running writes
*/
Expand Down
2 changes: 2 additions & 0 deletions sql/mysqld.h
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,8 @@ extern PSI_file_key key_file_binlog_index_cache;

extern bool skip_sys_tables_engine_check;

extern bool include_applied_opid_in_snapshot_info;

#ifdef HAVE_PSI_INTERFACE

extern PSI_mutex_key key_LOCK_tc;
Expand Down
6 changes: 6 additions & 0 deletions sql/sys_vars.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10540,3 +10540,9 @@ static Sys_var_dbtids Sys_dbtids("dbtids", "DB transaction IDs",
NON_PERSIST GLOBAL_VAR(dbtids), NO_CMD_LINE,
DEFAULT(nullptr), NO_MUTEX_GUARD,
NOT_IN_BINLOG, ON_CHECK(nullptr));

static Sys_var_bool Sys_include_applied_opid_in_snapshot_info(
"include_applied_opid_in_snapshot_info",
"Include applied opid in consistent snapshot info",
GLOBAL_VAR(include_applied_opid_in_snapshot_info), CMD_LINE(OPT_ARG),
DEFAULT(true));

0 comments on commit d60dc70

Please sign in to comment.