diff --git a/mysql-test/suite/rpl_mts/r/rpl_dep_exceed_worker_queue_size.result b/mysql-test/suite/rpl_mts/r/rpl_dep_exceed_worker_queue_size.result new file mode 100644 index 000000000000..a4a1b65eb371 --- /dev/null +++ b/mysql-test/suite/rpl_mts/r/rpl_dep_exceed_worker_queue_size.result @@ -0,0 +1,23 @@ +include/master-slave.inc +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +[connection master] +create table t1(a int) engine = innodb; +include/sync_slave_sql_with_master.inc +include/stop_slave.inc +insert into t1 values(1), (2), (3), (4), (5); +set @@global.debug = '+d,dep_wait_for_last_row_prepare'; +set @@global.debug = '+d,slave_worker_queue_size'; +set @@global.debug = '+d,after_clear_current_group_events'; +include/start_slave.inc +set debug_sync = "now wait_for clear.reached"; +set debug_sync = "now wait_for prepare.reached"; +set @@global.debug = '-d,dep_wait_for_last_row_prepare'; +set @@global.debug = '-d,after_clear_current_group_events'; +set @@global.debug = '-d,slave_worker_queue_size'; +set debug_sync= 'now signal prepare.done'; +set debug_sync= 'now signal clear.done'; +drop table t1; +include/sync_slave_sql_with_master.inc +include/rpl_end.inc diff --git a/mysql-test/suite/rpl_mts/t/rpl_dep_exceed_worker_queue_size-master.opt b/mysql-test/suite/rpl_mts/t/rpl_dep_exceed_worker_queue_size-master.opt new file mode 100644 index 000000000000..79ab1772e3e1 --- /dev/null +++ b/mysql-test/suite/rpl_mts/t/rpl_dep_exceed_worker_queue_size-master.opt @@ -0,0 +1,2 @@ +--gtid_mode=ON --enforce_gtid_consistency --log_slave_updates +--binlog_rows_event_max_rows=1 diff --git a/mysql-test/suite/rpl_mts/t/rpl_dep_exceed_worker_queue_size-slave.opt b/mysql-test/suite/rpl_mts/t/rpl_dep_exceed_worker_queue_size-slave.opt new file mode 100644 index 000000000000..463816fa5a9f --- /dev/null +++ b/mysql-test/suite/rpl_mts/t/rpl_dep_exceed_worker_queue_size-slave.opt @@ -0,0 +1,2 @@ +--gtid_mode=ON --enforce_gtid_consistency --log_slave_updates +--slave_parallel_workers=8 diff --git a/mysql-test/suite/rpl_mts/t/rpl_dep_exceed_worker_queue_size.test b/mysql-test/suite/rpl_mts/t/rpl_dep_exceed_worker_queue_size.test new file mode 100644 index 000000000000..0aaeaeb849f4 --- /dev/null +++ b/mysql-test/suite/rpl_mts/t/rpl_dep_exceed_worker_queue_size.test @@ -0,0 +1,44 @@ +source include/master-slave.inc; +source include/have_mts_dependency_replication.inc; +source include/have_debug.inc; + +connection master; +create table t1(a int) engine = innodb; +source include/sync_slave_sql_with_master.inc; + +connection slave; +# Stop the slave +source include/stop_slave.inc; + +connection master; +# Execute a multi-insert trx, this will generate 5 row events tied to the same +# table map event (because binlog_rows_event_max_rows=1 in master.opt) +insert into t1 values(1), (2), (3), (4), (5); + +connection slave; +# This will wait just before preparing the last row event in coordinator thread +set @@global.debug = '+d,dep_wait_for_last_row_prepare'; +# This will initialize the max queue size to 5, which we're guaranteed to exceed +set @@global.debug = '+d,slave_worker_queue_size'; +# Wait for worker queue to be cleared +set @@global.debug = '+d,after_clear_current_group_events'; + +source include/start_slave.inc; + +# Wait till the worker thread has cleared the queue +set debug_sync = "now wait_for clear.reached"; +# Wait till we're just about to prepare the last row event in the coordinator +set debug_sync = "now wait_for prepare.reached"; + +# Now continue both threads +set @@global.debug = '-d,dep_wait_for_last_row_prepare'; +set @@global.debug = '-d,after_clear_current_group_events'; +set @@global.debug = '-d,slave_worker_queue_size'; +set debug_sync= 'now signal prepare.done'; +set debug_sync= 'now signal clear.done'; + +connection master; +drop table t1; +source include/sync_slave_sql_with_master.inc; + +source include/rpl_end.inc; diff --git a/sql/log_event.cc b/sql/log_event.cc index 259f9133d2cb..d1c4484370d2 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -3271,6 +3271,17 @@ void Log_event::schedule_dep(Relay_log_info *rli) (rli->dbs_accessed_by_group.size() > 1); } + // when the number of events in a group is greater than max worker queue + // length the worker threads will clear the queue (i.e. destroy all buffered + // events, see @clear_current_group_events()), at that point we should treat + // the group as a sync group because we've lost its lineage and we don't want + // to accidentally reference any previous events to calculate dependencies + if (unlikely( + rli->num_events_in_current_group >= rli->mts_slave_worker_queue_len_max)) + { + rli->dep_sync_group= true; + } + if (unlikely(rli->dep_sync_group)) { wait_for_dep_workers_to_finish(rli, rli->trx_queued); @@ -3346,11 +3357,26 @@ void Log_event::schedule_dep(Relay_log_info *rli) rli->dep_sync_group= false; } +#ifndef DBUG_OFF + // assert: if we're done with an end event (i.e. done with scheduling the + // current trx) we should have reset the sync variable by now, otherwise the + // next trx will also be synced + if (ev->is_end_event) + DBUG_ASSERT(!rli->dep_sync_group); +#endif + + if (likely(!ev->is_end_event)) + ++rli->num_events_in_current_group; + else + rli->num_events_in_current_group= 0; + DBUG_VOID_RETURN; } /** Encapsulation for things to be done for terminal begin and end events + TODO (abhinavsharma): Refactor this, maybe split between start, end and + partition info events */ void Log_event::handle_terminal_dep_event(Relay_log_info *rli, @@ -3362,6 +3388,7 @@ Log_event::handle_terminal_dep_event(Relay_log_info *rli, DBUG_ASSERT(rli->table_map_events.empty()); DBUG_ASSERT(rli->keys_accessed_by_group.empty()); DBUG_ASSERT(!rli->trx_queued); + DBUG_ASSERT(!rli->num_events_in_current_group); // update rli state rli->current_begin_event= ev; @@ -12175,6 +12202,14 @@ void Rows_log_event::prepare_dep(Relay_log_info *rli, { DBUG_ENTER("Rows_log_event::prepare_dep"); + DBUG_EXECUTE_IF("dep_wait_for_last_row_prepare", { + if (get_flags(STMT_END_F)) + { + const char act[]= "now signal prepare.reached wait_for prepare.done"; + DBUG_ASSERT(opt_debug_sync_timeout > 0); + DBUG_ASSERT(!debug_sync_set_action(rli->info_thd, STRING_WITH_LEN(act))); + };}); + DBUG_ASSERT(rli->prev_event != NULL); DBUG_ASSERT(rli->table_map_events.count(get_table_id())); diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 81937689c39c..f51fe98bb2a7 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -1160,6 +1160,7 @@ class Relay_log_info : public Rpl_info mysql_cond_t dep_trx_all_done_cond; ulonglong num_in_flight_trx= 0; + ulonglong num_events_in_current_group= 0; // Statistics std::atomic begin_event_waits{0}; @@ -1232,6 +1233,7 @@ class Relay_log_info : public Rpl_info mysql_mutex_unlock(&dep_key_lookup_mutex); trx_queued= false; + num_events_in_current_group= 0; if (need_dep_lock) mysql_mutex_unlock(&dep_lock); diff --git a/sql/rpl_rli_pdb.cc b/sql/rpl_rli_pdb.cc index 0418d203bdad..ee3f1a1a3630 100644 --- a/sql/rpl_rli_pdb.cc +++ b/sql/rpl_rli_pdb.cc @@ -295,6 +295,7 @@ int Slave_worker::init_worker(Relay_log_info * rli, ulong i) jobs.entry= jobs.size= c_rli->mts_slave_worker_queue_len_max; DBUG_EXECUTE_IF("slave_worker_queue_size", { + c_rli->mts_slave_worker_queue_len_max= jobs.entry = jobs.size = 5; } ); @@ -2375,6 +2376,13 @@ void clear_current_group_events(Slave_worker *worker, worker->trans_retries = ULONG_MAX; else worker->trans_retries = 0; + + DBUG_EXECUTE_IF("after_clear_current_group_events", { + const char act[]= "now signal clear.reached wait_for clear.done"; + DBUG_ASSERT(opt_debug_sync_timeout > 0); + DBUG_ASSERT(!debug_sync_set_action(worker->info_thd, + STRING_WITH_LEN(act))); + };); } /**