From c3bd1c6fdad506527f5e26ad5e9866fa90c00a6c Mon Sep 17 00:00:00 2001 From: Santosh Praneeth Banda Date: Mon, 11 Mar 2019 09:45:56 -0700 Subject: [PATCH] FB8-80: Display row information in processlist for rbr threads (#953) Summary: Jira ticket: https://jira.percona.com/browse/FB8-80 Reference Patch: https://github.com/facebook/mysql-5.6/commit/564824a Set row_query using the fields unpacked from the row event. Use row_query in show processlist. Pull Request resolved: https://github.com/facebook/mysql-5.6/pull/953 Reviewed By: lth Differential Revision: D14137212 Pulled By: lth fbshipit-source-id: ba5ad2d6819 --- ...em_access_with_rows_query_log_event.result | 2 +- .../rpl/r/rpl_multi_source_row_stages.result | 36 +++++----- mysql-test/suite/rpl/r/rpl_row_stages.result | 66 +++++++++---------- .../rpl/t/rpl_multi_source_row_stages.test | 6 +- mysql-test/suite/rpl/t/rpl_row_stages.test | 19 ++++-- sql/log_event.cc | 41 +++++++++++- sql/rpl_record.cc | 45 +++++++++++-- sql/rpl_record.h | 4 +- sql/sql_class.h | 2 + sql/sql_show.cc | 26 ++++++-- 10 files changed, 174 insertions(+), 73 deletions(-) diff --git a/mysql-test/suite/rpl/r/rpl_invalid_mem_access_with_rows_query_log_event.result b/mysql-test/suite/rpl/r/rpl_invalid_mem_access_with_rows_query_log_event.result index 0f9e9d79cedb..b40658aa70eb 100644 --- a/mysql-test/suite/rpl/r/rpl_invalid_mem_access_with_rows_query_log_event.result +++ b/mysql-test/suite/rpl/r/rpl_invalid_mem_access_with_rows_query_log_event.result @@ -31,7 +31,7 @@ INSERT INTO t1 VALUES(1); SET DEBUG_SYNC= "now WAIT_FOR deleted_rows_query_ev"; SELECT state,info FROM information_schema.processlist WHERE state like 'debug%';; state debug sync point: now -info NULL +info INSERT INTO test.t1 (1) SET DEBUG_SYNC = "now SIGNAL go_ahead"; # # Step 6) Cleanup diff --git a/mysql-test/suite/rpl/r/rpl_multi_source_row_stages.result b/mysql-test/suite/rpl/r/rpl_multi_source_row_stages.result index 9d6f37398156..198af1a53927 100644 --- a/mysql-test/suite/rpl/r/rpl_multi_source_row_stages.result +++ b/mysql-test/suite/rpl/r/rpl_multi_source_row_stages.result @@ -44,36 +44,36 @@ Commit; check on channel_1 include/start_slave_sql.inc [FOR CHANNEL 'channel_1'] SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (write), db=test, info LIKE "INSERT INTO t1 VALUES (1)", Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=test, info LIKE "INSERT INTO t1 VALUES (1)", command=Query] -[SHOW PROCESSLIST reports: db=test, info LIKE "INSERT INTO t1 VALUES (1)", command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (write), db=test, info LIKE "INSERT INTO test.t1 (1)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=test, info LIKE "INSERT INTO test.t1 (1)", command=Query] +[SHOW PROCESSLIST reports: db=test, info LIKE "INSERT INTO test.t1 (1)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (update), db=test, info LIKE "UPDATE t1 SET a= 2", Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=test, info LIKE "UPDATE t1 SET a= 2", command=Query] -[SHOW PROCESSLIST reports: db=test, info LIKE "UPDATE t1 SET a= 2", command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (update), db=test, info LIKE "UPDATE test.t1 (1)(2)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=test, info LIKE "UPDATE test.t1 (1)(2)", command=Query] +[SHOW PROCESSLIST reports: db=test, info LIKE "UPDATE test.t1 (1)(2)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (delete), db=test, info LIKE "DELETE FROM t1 WHERE a=2", Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=test, info LIKE "DELETE FROM t1 WHERE a=2", command=Query] -[SHOW PROCESSLIST reports: db=test, info LIKE "DELETE FROM t1 WHERE a=2", command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (delete), db=test, info LIKE "DELETE FROM test.t1 (2)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=test, info LIKE "DELETE FROM test.t1 (2)", command=Query] +[SHOW PROCESSLIST reports: db=test, info LIKE "DELETE FROM test.t1 (2)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; check on channel_3 include/start_slave_sql.inc [FOR CHANNEL 'channel_3'] SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (write), db=test, info LIKE "INSERT INTO t1 VALUES (1)", Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=test, info LIKE "INSERT INTO t1 VALUES (1)", command=Query] -[SHOW PROCESSLIST reports: db=test, info LIKE "INSERT INTO t1 VALUES (1)", command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (write), db=test, info LIKE "INSERT INTO test.t1 (1)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=test, info LIKE "INSERT INTO test.t1 (1)", command=Query] +[SHOW PROCESSLIST reports: db=test, info LIKE "INSERT INTO test.t1 (1)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (update), db=test, info LIKE "UPDATE t1 SET a= 2", Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=test, info LIKE "UPDATE t1 SET a= 2", command=Query] -[SHOW PROCESSLIST reports: db=test, info LIKE "UPDATE t1 SET a= 2", command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (update), db=test, info LIKE "UPDATE test.t1 (1)(2)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=test, info LIKE "UPDATE test.t1 (1)(2)", command=Query] +[SHOW PROCESSLIST reports: db=test, info LIKE "UPDATE test.t1 (1)(2)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (delete), db=test, info LIKE "DELETE FROM t1 WHERE a=2", Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=test, info LIKE "DELETE FROM t1 WHERE a=2", command=Query] -[SHOW PROCESSLIST reports: db=test, info LIKE "DELETE FROM t1 WHERE a=2", command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (delete), db=test, info LIKE "DELETE FROM test.t1 (2)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=test, info LIKE "DELETE FROM test.t1 (2)", command=Query] +[SHOW PROCESSLIST reports: db=test, info LIKE "DELETE FROM test.t1 (2)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; # Removing debug point 'dbug.rpl_apply_sync_barrier' from @@GLOBAL.debug [connection server_1] diff --git a/mysql-test/suite/rpl/r/rpl_row_stages.result b/mysql-test/suite/rpl/r/rpl_row_stages.result index eef076374fec..d41ac8bc2bbe 100644 --- a/mysql-test/suite/rpl/r/rpl_row_stages.result +++ b/mysql-test/suite/rpl/r/rpl_row_stages.result @@ -20,19 +20,19 @@ COMMIT; # Adding debug point 'dbug.rpl_apply_sync_barrier' to @@GLOBAL.debug include/start_slave_sql.inc SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (write), db=db_wl7364, info LIKE "INSERT INTO t1 VALUES (1)", Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info LIKE "INSERT INTO t1 VALUES (1)", command=Query] -[SHOW PROCESSLIST reports: db=db_wl7364, info LIKE "INSERT INTO t1 VALUES (1)", command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (write), db=db_wl7364, info LIKE "INSERT INTO db_wl7364.t1 (1)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info LIKE "INSERT INTO db_wl7364.t1 (1)", command=Query] +[SHOW PROCESSLIST reports: db=db_wl7364, info LIKE "INSERT INTO db_wl7364.t1 (1)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (update), db=db_wl7364, info LIKE "UPDATE t1 SET c1= 2", Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info LIKE "UPDATE t1 SET c1= 2", command=Query] -[SHOW PROCESSLIST reports: db=db_wl7364, info LIKE "UPDATE t1 SET c1= 2", command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (update), db=db_wl7364, info LIKE "UPDATE db_wl7364.t1 (1)(2)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info LIKE "UPDATE db_wl7364.t1 (1)(2)", command=Query] +[SHOW PROCESSLIST reports: db=db_wl7364, info LIKE "UPDATE db_wl7364.t1 (1)(2)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (delete), db=db_wl7364, info LIKE "DELETE FROM t1 WHERE c1=2", Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info LIKE "DELETE FROM t1 WHERE c1=2", command=Query] -[SHOW PROCESSLIST reports: db=db_wl7364, info LIKE "DELETE FROM t1 WHERE c1=2", command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (delete), db=db_wl7364, info LIKE "DELETE FROM db_wl7364.t1 (2)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info LIKE "DELETE FROM db_wl7364.t1 (2)", command=Query] +[SHOW PROCESSLIST reports: db=db_wl7364, info LIKE "DELETE FROM db_wl7364.t1 (2)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; [connection master] SET @@SESSION.BINLOG_ROWS_QUERY_LOG_EVENTS=0; @@ -50,19 +50,19 @@ COMMIT; # Adding debug point 'dbug.rpl_apply_sync_barrier' to @@GLOBAL.debug include/start_slave_sql.inc SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (write), db=db_wl7364, info IS NULL, Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info IS NULL, command=Query] -[SHOW PROCESSLIST reports: db=db_wl7364, info LIKE 'NULL', command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (write), db=db_wl7364, info LIKE "INSERT INTO db_wl7364.t1 (1)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info LIKE "INSERT INTO db_wl7364.t1 (1)", command=Query] +[SHOW PROCESSLIST reports: db=db_wl7364, info LIKE "INSERT INTO db_wl7364.t1 (1)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (update), db=db_wl7364, info IS NULL, Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info IS NULL, command=Query] -[SHOW PROCESSLIST reports: db=db_wl7364, info LIKE 'NULL', command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (update), db=db_wl7364, info LIKE "UPDATE db_wl7364.t1 (1)(2)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info LIKE "UPDATE db_wl7364.t1 (1)(2)", command=Query] +[SHOW PROCESSLIST reports: db=db_wl7364, info LIKE "UPDATE db_wl7364.t1 (1)(2)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (delete), db=db_wl7364, info IS NULL, Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info IS NULL, command=Query] -[SHOW PROCESSLIST reports: db=db_wl7364, info LIKE 'NULL', command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (delete), db=db_wl7364, info LIKE "DELETE FROM db_wl7364.t1 (2)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info LIKE "DELETE FROM db_wl7364.t1 (2)", command=Query] +[SHOW PROCESSLIST reports: db=db_wl7364, info LIKE "DELETE FROM db_wl7364.t1 (2)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; # Removing debug point 'dbug.rpl_apply_sync_barrier' from @@GLOBAL.debug [connection master] @@ -80,22 +80,22 @@ AYzz6oU= BINLOG 'mSKWVhMBAAAAMgAAAOYCAAAAAPAAAAAAAAEACWRiX3dsNzM2NAACdDEAAQMAAfg9wnk=mSKWVh4BAAAAKAAAAA4DAAAAAPAAAAAAAAEAAgAB//4BAAAAnirPSw=='; [connection master] SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (write), db=db_wl7364, info LIKE "BINLOG %", Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info LIKE "BINLOG %", command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (write), db=db_wl7364, info LIKE "INSERT INTO db_wl7364.t1 (1)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info LIKE "INSERT INTO db_wl7364.t1 (1)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; [connection master1] BINLOG 'mSKWVhMBAAAAMgAAAGsDAAAAAPAAAAAAAAEACWRiX3dsNzM2NAACdDEAAQMAAWFoKHE=mSKWVh8BAAAALgAAAJkDAAAAAPAAAAAAAAEAAgAB///+AQAAAP4CAAAAuiHqhw=='; [connection master] SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (update), db=db_wl7364, info LIKE "BINLOG %", Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info LIKE "BINLOG %", command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (update), db=db_wl7364, info LIKE "UPDATE db_wl7364.t1 (1)(2)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info LIKE "UPDATE db_wl7364.t1 (1)(2)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; [connection master1] BINLOG 'mSKWVhMBAAAAMgAAAPwDAAAAAPAAAAAAAAEACWRiX3dsNzM2NAACdDEAAQMAAd+b01A=mSKWViABAAAAKAAAACQEAAAAAPAAAAAAAAEAAgAB//4CAAAAN7fZXg== '; [connection master] SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (delete), db=db_wl7364, info LIKE "BINLOG %", Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info LIKE "BINLOG %", command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (delete), db=db_wl7364, info LIKE "DELETE FROM db_wl7364.t1 (2)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db_wl7364, info LIKE "DELETE FROM db_wl7364.t1 (2)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; [connection master1] COMMIT; @@ -220,19 +220,19 @@ COMMIT; # Adding debug point 'dbug.rpl_apply_sync_barrier' to @@GLOBAL.debug include/start_slave_sql.inc SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (write), db=db2, info LIKE "INSERT INTO t1 VALUES (1)", Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db2, info LIKE "INSERT INTO t1 VALUES (1)", command=Query] -[SHOW PROCESSLIST reports: db=db2, info LIKE "INSERT INTO t1 VALUES (1)", command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (write), db=db2, info LIKE "INSERT INTO db2.t1 (1)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db2, info LIKE "INSERT INTO db2.t1 (1)", command=Query] +[SHOW PROCESSLIST reports: db=db2, info LIKE "INSERT INTO db2.t1 (1)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (update), db=db2, info LIKE "UPDATE t1 SET c1= 2", Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db2, info LIKE "UPDATE t1 SET c1= 2", command=Query] -[SHOW PROCESSLIST reports: db=db2, info LIKE "UPDATE t1 SET c1= 2", command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (update), db=db2, info LIKE "UPDATE db2.t1 (1)(2)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db2, info LIKE "UPDATE db2.t1 (1)(2)", command=Query] +[SHOW PROCESSLIST reports: db=db2, info LIKE "UPDATE db2.t1 (1)(2)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; -include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (delete), db=db2, info LIKE "DELETE FROM t1 WHERE c1=2", Query] -include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db2, info LIKE "DELETE FROM t1 WHERE c1=2", command=Query] -[SHOW PROCESSLIST reports: db=db2, info LIKE "DELETE FROM t1 WHERE c1=2", command=Query] +include/assert.inc [PERFORMANCE_SCHEMA.threads reports: state=Applying batch of row changes (delete), db=db2, info LIKE "DELETE FROM db2.t1 (2)", Query] +include/assert.inc [INFORMATION_SCHEMA.processlist reports: db=db2, info LIKE "DELETE FROM db2.t1 (2)", command=Query] +[SHOW PROCESSLIST reports: db=db2, info LIKE "DELETE FROM db2.t1 (2)", command=Query] SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; [connection slave] # Removing debug point 'dbug.rpl_apply_sync_barrier' from @@GLOBAL.debug diff --git a/mysql-test/suite/rpl/t/rpl_multi_source_row_stages.test b/mysql-test/suite/rpl/t/rpl_multi_source_row_stages.test index 6ac93848b94e..b56dedf9ebf7 100644 --- a/mysql-test/suite/rpl/t/rpl_multi_source_row_stages.test +++ b/mysql-test/suite/rpl/t/rpl_multi_source_row_stages.test @@ -114,17 +114,17 @@ while ($j) if ($i == 3) { --let $state_progress= Applying batch of row changes (write) - --let $info= INSERT INTO t1 VALUES (1) + --let $info= INSERT INTO test.t1 (1) } if ($i == 2) { --let $state_progress= Applying batch of row changes (update) - --let $info= UPDATE t1 SET a= 2 + --let $info= UPDATE test.t1 (1)(2) } if ($i == 1) { --let $state_progress= Applying batch of row changes (delete) - --let $info= DELETE FROM t1 WHERE a=2 + --let $info= DELETE FROM test.t1 (2) } # this session waits for the progress to be updated diff --git a/mysql-test/suite/rpl/t/rpl_row_stages.test b/mysql-test/suite/rpl/t/rpl_row_stages.test index b6140fe18e6c..84fc15b11dae 100644 --- a/mysql-test/suite/rpl/t/rpl_row_stages.test +++ b/mysql-test/suite/rpl/t/rpl_row_stages.test @@ -60,19 +60,19 @@ while($i) if ($i == 3) { --let $state_progress= Applying batch of row changes (write) - --let $info= INSERT INTO t1 VALUES (1) + --let $info= INSERT INTO db_wl7364.t1 (1) } if ($i == 2) { --let $state_progress= Applying batch of row changes (update) - --let $info= UPDATE t1 SET c1= 2 + --let $info= UPDATE db_wl7364.t1 (1)(2) } if ($i == 1) { --let $state_progress= Applying batch of row changes (delete) - --let $info= DELETE FROM t1 WHERE c1=2 + --let $info= DELETE FROM db_wl7364.t1 (2) } # this session waits for the progress to be updated @@ -135,16 +135,19 @@ while($i) if ($i == 3) { --let $state_progress= Applying batch of row changes (write) + --let $info= INSERT INTO db_wl7364.t1 (1) } if ($i == 2) { --let $state_progress= Applying batch of row changes (update) + --let $info= UPDATE db_wl7364.t1 (1)(2) } if ($i == 1) { --let $state_progress= Applying batch of row changes (delete) + --let $info= DELETE FROM db_wl7364.t1 (2) } SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; @@ -207,7 +210,6 @@ AYzz6oU= --let $state_skip_show_processlist=1 --let $state_thd_name= thread/sql/one_connection --let $state_command= Query ---let $state_info= BINLOG % --let $state_db= $db --source include/rpl_connection_master.inc @@ -217,6 +219,7 @@ AYzz6oU= SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; --let $state_stage= Applying batch of row changes (write) +--let $state_info= INSERT INTO db_wl7364.t1 (1) --source ../include/rpl_row_stages_validate.inc SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; @@ -234,6 +237,7 @@ SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; --let $state_stage= Applying batch of row changes (update) +--let $state_info= UPDATE db_wl7364.t1 (1)(2) --source ../include/rpl_row_stages_validate.inc SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; @@ -251,6 +255,7 @@ SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; SET debug_sync= 'now WAIT_FOR signal.rpl_row_apply_progress_updated'; --let $state_stage= Applying batch of row changes (delete) +--let $state_info= DELETE FROM db_wl7364.t1 (2) --source ../include/rpl_row_stages_validate.inc SET debug_sync= 'now SIGNAL signal.rpl_row_apply_process_next_row'; @@ -493,19 +498,19 @@ while($i) if ($i == 3) { --let $state_progress= Applying batch of row changes (write) - --let $info= INSERT INTO t1 VALUES (1) + --let $info= INSERT INTO db2.t1 (1) } if ($i == 2) { --let $state_progress= Applying batch of row changes (update) - --let $info= UPDATE t1 SET c1= 2 + --let $info= UPDATE db2.t1 (1)(2) } if ($i == 1) { --let $state_progress= Applying batch of row changes (delete) - --let $info= DELETE FROM t1 WHERE c1=2 + --let $info= DELETE FROM db2.t1 (2) } # this session waits for the progress to be updated diff --git a/sql/log_event.cc b/sql/log_event.cc index 785a2f4f183d..78716fedc27a 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -7599,6 +7599,35 @@ int Rows_log_event::unpack_current_row(const Relay_log_info *const rli, bool is_after_image, bool only_seek) { DBUG_ASSERT(m_table); + mysql_mutex_lock(&thd->LOCK_thd_query); + // For a update event we should not clear the query when unpacking after + // image. This can be checked using the cols parameter. + if (cols != &m_cols_ai) thd->row_query.clear(); + + Log_event_type type = get_type_code(); + if (type == Log_event_type::WRITE_ROWS_EVENT || + type == Log_event_type::WRITE_ROWS_EVENT_V1) + thd->row_query = "INSERT INTO "; + else if (type == Log_event_type::DELETE_ROWS_EVENT || + type == Log_event_type::DELETE_ROWS_EVENT_V1) + thd->row_query = "DELETE FROM "; + else { + DBUG_ASSERT(type == Log_event_type::UPDATE_ROWS_EVENT || + type == Log_event_type::UPDATE_ROWS_EVENT_V1 || + type == Log_event_type::PARTIAL_UPDATE_ROWS_EVENT); + if (cols != &m_cols_ai) { + thd->row_query = "UPDATE "; + } + } + + if (cols != &m_cols_ai) { + thd->row_query.append(m_table->s->db.str, m_table->s->db.length); + thd->row_query.append("."); + thd->row_query.append(m_table->s->table_name.str, + m_table->s->table_name.length); + thd->row_query.append(" "); + } + enum_row_image_type row_image_type; if (is_after_image) { DBUG_ASSERT(get_general_type_code() != binary_log::DELETE_ROWS_EVENT); @@ -7615,11 +7644,18 @@ int Rows_log_event::unpack_current_row(const Relay_log_info *const rli, (get_type_code() == binary_log::PARTIAL_UPDATE_ROWS_EVENT); ASSERT_OR_RETURN_ERROR(m_curr_row <= m_rows_end, HA_ERR_CORRUPT_EVENT); if (::unpack_row(rli, m_table, m_width, m_curr_row, cols, &m_curr_row_end, - m_rows_end, row_image_type, has_value_options, only_seek)) { + m_rows_end, row_image_type, has_value_options, only_seek, + thd->row_query)) { int error = thd->get_stmt_da()->mysql_errno(); DBUG_ASSERT(error); + mysql_mutex_unlock(&thd->LOCK_thd_query); return error; } +#ifdef HAVE_PSI_THREAD_INTERFACE + PSI_THREAD_CALL(set_thread_info) + (thd->row_query.c_str(), thd->row_query.size()); +#endif + mysql_mutex_unlock(&thd->LOCK_thd_query); // After the row is unpacked, we need to update all hidden generated columns // for functional indexes since those values are not included in the binlog @@ -9781,6 +9817,9 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) { in dispatch_command() after command execution is completed. */ if (thd->slave_thread) free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC)); + mysql_mutex_lock(&thd->LOCK_thd_query); + thd->row_query.clear(); + mysql_mutex_unlock(&thd->LOCK_thd_query); } DBUG_RETURN(error); diff --git a/sql/rpl_record.cc b/sql/rpl_record.cc index 690aef140bc8..0fac62901b83 100644 --- a/sql/rpl_record.cc +++ b/sql/rpl_record.cc @@ -400,6 +400,36 @@ size_t pack_row(TABLE *table, MY_BITMAP const *columns_in_image, DBUG_RETURN(static_cast(pack_ptr - row_data)); } +static void insert_row_fields(std::string &row_query, TABLE *table) { + static constexpr size_t max_field_len = 100; + row_query.append("("); + String buf; +#ifndef DBUG_OFF + uint index = 0; +#endif + for (Field **ptr = table->field; *ptr; ptr++) { +#ifndef DBUG_OFF + // Set the read bit to avoid assertions in val_str(). + bool flip = false; + if (table->read_set && !bitmap_is_set(table->read_set, index)) { + flip = true; + bitmap_flip_bit(table->read_set, index); + } +#endif + String *s = (*ptr)->val_str(&buf, &buf); + if (!s) + row_query.append("NULL"); + else + row_query.append(s->ptr(), min(max_field_len, s->length())); + row_query.append(","); +#ifndef DBUG_OFF + if (flip) bitmap_flip_bit(table->read_set, index); + ++index; +#endif + } + if (row_query.back() == ',') row_query.back() = ')'; +} + /** Read the value_options from a Partial_update_rows_log_event, and if value_options has any bit set, also read partial_bits. @@ -457,7 +487,8 @@ bool unpack_row_with_column_info(TABLE *table, uchar const *const row_data, MY_BITMAP const *column_image, uchar const **const row_image_end_p, uchar const *const event_end, bool only_seek, - table_def *tabledef, TABLE *conv_table) { + table_def *tabledef, TABLE *conv_table, + std::string &row_query) { DBUG_ENTER("unpack_row_with_column_info"); uint image_column_count = bitmap_bits_set(column_image); @@ -522,6 +553,7 @@ bool unpack_row_with_column_info(TABLE *table, uchar const *const row_data, } } } + insert_row_fields(row_query, table); *row_image_end_p = pack_ptr; DBUG_RETURN(false); } @@ -657,7 +689,8 @@ bool unpack_row(Relay_log_info const *rli, TABLE *table, uchar const **const row_image_end_p, uchar const *const event_end, enum_row_image_type row_image_type, - bool event_has_value_options, bool only_seek) { + bool event_has_value_options, bool only_seek, + std::string &row_query) { DBUG_ENTER("unpack_row"); DBUG_ASSERT(rli != nullptr); DBUG_ASSERT(table != nullptr); @@ -684,9 +717,9 @@ bool unpack_row(Relay_log_info const *rli, TABLE *table, my_error(ER_SLAVE_CORRUPT_EVENT, MYF(0)); DBUG_RETURN(true); } - DBUG_RETURN(unpack_row_with_column_info(table, row_data, column_image, - row_image_end_p, event_end, - only_seek, tabledef, conv_table)); + DBUG_RETURN(unpack_row_with_column_info( + table, row_data, column_image, row_image_end_p, event_end, only_seek, + tabledef, conv_table, row_query)); } uint image_column_count = bitmap_bits_set(column_image); @@ -943,6 +976,8 @@ bool unpack_row(Relay_log_info const *rli, TABLE *table, } } + insert_row_fields(row_query, table); + // We have read all the null bits. DBUG_ASSERT(null_bits.tell() == image_column_count); diff --git a/sql/rpl_record.h b/sql/rpl_record.h index da5e7478acec..bf6c43f6800c 100644 --- a/sql/rpl_record.h +++ b/sql/rpl_record.h @@ -25,6 +25,7 @@ #include #include +#include #include "my_inttypes.h" @@ -46,7 +47,8 @@ bool unpack_row(Relay_log_info const *rli, TABLE *table, uchar const **const row_image_end_p, uchar const *const event_end, enum_row_image_type row_image_type, - bool event_has_value_options, bool only_seek); + bool event_has_value_options, bool only_seek, + std::string &row_query); // Fill table's record[0] with default values. int prepare_record(TABLE *const table, const MY_BITMAP *cols, const bool check); diff --git a/sql/sql_class.h b/sql/sql_class.h index b2f808a74a6a..895db7553fc1 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -3850,6 +3850,8 @@ class THD : public MDL_context_owner, public: /* connection timeout error message */ char *conn_timeout_err_msg; + /* protected by LOCK_thd_query */ + std::string row_query; /** Initialize the optimizer cost model. diff --git a/sql/sql_show.cc b/sql/sql_show.cc index 0d00e9663b27..7fffbb4e6a91 100644 --- a/sql/sql_show.cc +++ b/sql/sql_show.cc @@ -2099,8 +2099,17 @@ class List_process_list : public Do_THD_Impl { /* INFO */ mysql_mutex_lock(&inspect_thd->LOCK_thd_query); { - const char *query_str = inspect_thd->query().str; - size_t query_length = inspect_thd->query().length; + const char *query_str; + size_t query_length; + + if (!inspect_thd->row_query.empty()) { + query_str = inspect_thd->row_query.c_str(); + query_length = inspect_thd->row_query.size(); + } else { + query_str = inspect_thd->query().str; + query_length = inspect_thd->query().length; + } + String buf; if (inspect_thd->is_a_srv_session()) { buf.append(query_length ? "PLUGIN: " : "PLUGIN"); @@ -2286,8 +2295,17 @@ class Fill_process_list : public Do_THD_Impl { /* INFO */ mysql_mutex_lock(&inspect_thd->LOCK_thd_query); { - const char *query_str = inspect_thd->query().str; - size_t query_length = inspect_thd->query().length; + const char *query_str; + size_t query_length; + + if (!inspect_thd->row_query.empty()) { + query_str = inspect_thd->row_query.c_str(); + query_length = inspect_thd->row_query.size(); + } else { + query_str = inspect_thd->query().str; + query_length = inspect_thd->query().length; + } + String buf; if (inspect_thd->is_a_srv_session()) { buf.append(query_length ? "PLUGIN: " : "PLUGIN");