diff --git a/include/my_sqlcommand.h b/include/my_sqlcommand.h index fe572f8cc883..8f46af99f8f1 100644 --- a/include/my_sqlcommand.h +++ b/include/my_sqlcommand.h @@ -213,6 +213,9 @@ enum enum_sql_command { SQLCOM_PURGE_RAFT_LOG, SQLCOM_PURGE_RAFT_LOG_BEFORE, SQLCOM_SHOW_RAFT_LOGS, + SQLCOM_START_SHARDBEATER, + SQLCOM_STOP_SHARDBEATER, + SQLCOM_SHOW_SHARDBEATER_STAT, /* This should be the last !!! */ SQLCOM_END }; diff --git a/mysql-test/r/information_schema_keywords.result b/mysql-test/r/information_schema_keywords.result index 06874348b079..f20090785f77 100644 --- a/mysql-test/r/information_schema_keywords.result +++ b/mysql-test/r/information_schema_keywords.result @@ -560,6 +560,7 @@ SERIALIZABLE 0 SERVER 0 SESSION 0 SET 1 +SHARDBEATER 0 SHARE 0 SHARED 0 SHOW 1 diff --git a/mysql-test/r/mysqld--help-notwin.result b/mysql-test/r/mysqld--help-notwin.result index fef063c1c421..bfaf9215efd0 100644 --- a/mysql-test/r/mysqld--help-notwin.result +++ b/mysql-test/r/mysqld--help-notwin.result @@ -461,6 +461,9 @@ The following options may be given as the first argument: This will control the intrinsic tmp table storage engine. If true then rocksdb intrinsic tmp table will be created. Otherwise default will be innodb intrinsic tmp tables. + --enable-shardbeater + Enables Shardbeater + (Defaults to on; use --skip-enable-shardbeater to disable.) --enable-sql-wsenv Enable dumping/loading file to/from warm storage for SELECT INTO OUTFILE/LOAD DATA. Set true to enable. --enable-super-log-bin-read-only @@ -2273,6 +2276,21 @@ The following options may be given as the first argument: to TRUE, the plugin will flag associated authenticated accounts to be mapped to proxy users when the server option check_proxy_users is enabled. + --shardbeat-blocked-dbs[=name] + List of comma separated database names on which not to + insert shardbeats + --shardbeat-interval-ms[=#] + Interval in milliseconds in which shardbeats are injected + on silent databases + --shardbeat-query-comment-format[=name] + Formatted string to be used for shardbeats insert + --shardbeat-table[=name] + Name of table in which to insert a shardbeat + --shardbeat-user[=name] + Name of user as which to insert regular shardbeats + --shardbeat-vlog-level[=#] + Verbosity level of logging into mysqld error log for + shardbeater --show-binlogs-encryption Scan binlogs to determine encryption property during show binlogs @@ -2808,6 +2826,7 @@ enable-query-checksum FALSE enable-raft-plugin FALSE enable-resultset-checksum FALSE enable-rocksdb-intrinsic-tmp-table FALSE +enable-shardbeater TRUE enable-sql-wsenv FALSE enable-super-log-bin-read-only FALSE enable-user-tables-engine-check FALSE @@ -3061,7 +3080,7 @@ performance-schema-max-socket-classes 10 performance-schema-max-socket-instances -1 performance-schema-max-sql-text-length 1024 performance-schema-max-stage-classes 175 -performance-schema-max-statement-classes 231 +performance-schema-max-statement-classes 234 performance-schema-max-statement-stack 10 performance-schema-max-table-handles -1 performance-schema-max-table-instances -1 @@ -3338,6 +3357,12 @@ session-track-system-variables time_zone,autocommit,character_set_client,charact session-track-transaction-info OFF set-read-only-on-shutdown FALSE sha256-password-proxy-users FALSE +shardbeat-blocked-dbs +shardbeat-interval-ms 60000 +shardbeat-query-comment-format +shardbeat-table blackhole +shardbeat-user +shardbeat-vlog-level 0 show-binlogs-encryption TRUE show-create-table-verbosity FALSE show-old-temporals FALSE diff --git a/mysql-test/suite/rpl_raft/r/rpl_raft_shardbeats.result b/mysql-test/suite/rpl_raft/r/rpl_raft_shardbeats.result new file mode 100644 index 000000000000..90603a0d261e --- /dev/null +++ b/mysql-test/suite/rpl_raft/r/rpl_raft_shardbeats.result @@ -0,0 +1,83 @@ +include/raft_3_node.inc +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +[connection master] +include/rpl_connect.inc [creating server_4] +include/rpl_connect.inc [creating server_5] +show status like 'rpl_raft_role'; +Variable_name Value +Rpl_raft_role LEADER +show status like 'rpl_raft_role'; +Variable_name Value +Rpl_raft_role FOLLOWER +reset master; +show status like 'rpl_raft_role'; +Variable_name Value +Rpl_raft_role FOLLOWER +reset master; +create database db1; +create database db2; +show shardbeater status; +ERROR HY000: Shardbeats not ON yet +set global shardbeat_interval_ms = 2000; +set global shardbeat_vlog_level = 1; +start shardbeater; +ERROR HY000: start shardbeater operation is disallowed on when shardbeater user is empty +set global shardbeat_query_comment_format='WH:1 .T:sb .I:{ipaddr} #S:{shard} #rs:{replicaset}'; +create user 'dba_scripts:sys.database'@'%'; +set global shardbeat_user='dba_scripts:sys.database'; +set global shardbeat_table=''; +start shardbeater; +ERROR HY000: start shardbeater operation is disallowed on when shardbeat_table is empty +set global shardbeat_table = default; +start shardbeater; +include/assert.inc [no user facing dbs hence test is skipped as well;] +ALTER DATABASE db1 DB_METADATA '{"shard": "50000000", "rs": "1234579"}'; +include/assert.inc [user facing dbs hence test is not skipped;] +include/assert.inc [no permissions so we should fail shardbeats] +include/assert.inc [we should get permission error 1142] +GRANT INSERT ON db1.* to 'dba_scripts:sys.database'@'%'; +include/assert.inc [now that we have grants shardbeats should go through] +use db1; +create table blackhole(i INT); +include/assert.inc [now that we have grants shardbeats should go through] +ALTER DATABASE db2 DB_METADATA '{"shard": "50000001", "rs": "1234579"}'; +GRANT INSERT ON db2.* to 'dba_scripts:sys.database'@'%'; +use db2; +create table blackhole(i INT); +include/assert.inc [now that we have grants shardbeats should go through] +include/assert.inc [the number of shardbeats should be 5] +include/assert.inc [the number of shardbeats should be 5] +show shardbeater status; +ERROR HY000: Shardbeats not ON yet +"Transfering leadership: server_1 -> server_2" +set @@global.rpl_raft_new_leader_uuid = 'uuid2'; +"The leadership has transferred and server_1 is now a FOLLOWER" + +--let = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 1) +--sleep 5 +--let = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 1) +--let = no new shardbeats on follower +--let = "" = "" +--source include/assert.inc +echo "Transfering leadership: server_2 -> server_1" +set @@global.rpl_raft_new_leader_uuid = 'uuid1'; +include/assert.inc [the number of shardbeats should be 5] +include/assert.inc [the number of shardbeats should be 5] +Cleanup +stop shardbeater; +set global shardbeat_interval_ms = default; +set global shardbeat_vlog_level = default; +set global shardbeat_query_comment_format= default; +set global shardbeat_user= default; +set global shardbeat_table= default; +DROP DATABASE db1; +DROP DATABASE db2; +DROP USER IF EXISTS 'dba_scripts:sys.database'@'%'; +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/rpl_end.inc diff --git a/mysql-test/suite/rpl_raft/t/rpl_raft_shardbeats.test b/mysql-test/suite/rpl_raft/t/rpl_raft_shardbeats.test new file mode 100644 index 000000000000..744bb2d39fa4 --- /dev/null +++ b/mysql-test/suite/rpl_raft/t/rpl_raft_shardbeats.test @@ -0,0 +1,200 @@ +source ../include/raft_3_node.inc; +# --source include/master-slave.inc +--source include/have_binlog_format_row.inc + +connection server_1; +let $uuid1= `select variable_value from performance_schema.global_status where variable_name = 'Rpl_raft_peer_uuid'`; +connection server_2; +let $uuid2= `select variable_value from performance_schema.global_status where variable_name = 'Rpl_raft_peer_uuid'`; +connection server_3; +let $uuid3= `select variable_value from performance_schema.global_status where variable_name = 'Rpl_raft_peer_uuid'`; + + +# connection slave +#connection slave; +#set global read_only=1; + +# create 2 user dbs. Equivalent of shards +connection server_1; +create database db1; +create database db2; + +# Shardbeater is not ON yet. + +--error ER_DISALLOWED_OPERATION +show shardbeater status; + +set global shardbeat_interval_ms = 2000; +set global shardbeat_vlog_level = 1; + +# Shardbeater start should fail since user and table is not populated + +--error ER_DISALLOWED_OPERATION +start shardbeater; +#show shardbeater status; + +set global shardbeat_query_comment_format='WH:1 .T:sb .I:{ipaddr} #S:{shard} #rs:{replicaset}'; + +# Create the user +create user 'dba_scripts:sys.database'@'%'; +set global shardbeat_user='dba_scripts:sys.database'; + +# Shardbeater start should fail as the table name is not present + +set global shardbeat_table=''; +--error ER_DISALLOWED_OPERATION +start shardbeater; + +# Set the value of shardbeat_table to blackhole which is yet to be created. +set global shardbeat_table = default; + +# Shardbeater start should now succeed. + +start shardbeater; +--sleep 5 + +--let $no_db_ok = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 1) +--let $db_name = query_get_value(SHOW SHARDBEATER STATUS, Db, 1) +--let $assert_text = no user facing dbs hence test is skipped as well; +--let $assert_cond = $no_db_ok = 0; +--source include/assert.inc + +ALTER DATABASE db1 DB_METADATA '{"shard": "50000000", "rs": "1234579"}'; +--sleep 5 + +--let $test_db_ok = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 1) +--let $test_db_fail = query_get_value(SHOW SHARDBEATER STATUS, Num_Fail, 1) +--let $last_few_fail = query_get_value(SHOW SHARDBEATER STATUS, Last_few_failures, 1) +--let $assert_text = user facing dbs hence test is not skipped; +--let $assert_cond = $test_db_ok = 0; +--source include/assert.inc + +--let $assert_text = no permissions so we should fail shardbeats +--let $assert_cond = $test_db_fail > 0; +--source include/assert.inc + +--let $assert_text = we should get permission error 1142 +--let $assert_cond = "$last_few_fail" LIKE "ErrCode: 1142%" +--source include/assert.inc + +# Since grants was missing writes would have failed. Lets now give +# permissions + +GRANT INSERT ON db1.* to 'dba_scripts:sys.database'@'%'; +--sleep 5 + +--let $test_db_ok = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 1) +--let $assert_text = now that we have grants shardbeats should go through +--let $assert_cond = $test_db_ok = 0; +--source include/assert.inc + +use db1; +create table blackhole(i INT); +--sleep 5 + +--let $test_db_ok = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 1) +--let $assert_text = now that we have grants shardbeats should go through +--let $assert_cond = $test_db_ok > 0; +--source include/assert.inc + +ALTER DATABASE db2 DB_METADATA '{"shard": "50000001", "rs": "1234579"}'; +GRANT INSERT ON db2.* to 'dba_scripts:sys.database'@'%'; +use db2; +create table blackhole(i INT); +--sleep 5 + +--let $test_db_ok1 = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 1) +--let $test_db_ok2 = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 2) +--let $test_db_ok = ($test_db_ok1 * $test_db_ok2) + +--let $assert_text = now that we have grants shardbeats should go through +--let $assert_cond = $test_db_ok > 0; +--source include/assert.inc + +# measure the number of heartbeats in 10 seconds +--let $test_db_ok1_b = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 1) +--let $test_db_ok2_b = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 2) + +--sleep 11 + +--let $test_db_ok1_a = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 1) +--let $test_db_ok2_a = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 2) + +--let $assert_text = the number of shardbeats should be 5 +--let $assert_cond = ($test_db_ok1_a - $test_db_ok1_b) >= 5 +--source include/assert.inc + +--let $assert_text = the number of shardbeats should be 5 +--let $assert_cond = ($test_db_ok2_a - $test_db_ok2_b) >= 5 +--source include/assert.inc + +connection server_2; +--error ER_DISALLOWED_OPERATION +show shardbeater status; + +echo "Transfering leadership: server_1 -> server_2"; +connection server_1; +replace_result $uuid2 uuid2; +eval set @@global.rpl_raft_new_leader_uuid = '$uuid2'; + +let $wait_condition= select @@global.read_only = 1; +source include/wait_condition.inc; + +echo "The leadership has transferred and server_1 is now a FOLLOWER" + +--let $test_db_ok_b = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 1) +--sleep 5 +--let $test_db_ok_a = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 1) + +--let $assert_text = no new shardbeats on follower +--let $assert_cond = "$test_db_ok_b" = "$test_db_ok_a" +--source include/assert.inc + +echo "Transfering leadership: server_2 -> server_1"; +connection server_2; +replace_result $uuid1 uuid1; +eval set @@global.rpl_raft_new_leader_uuid = '$uuid1'; +connection server_1; +let $wait_condition= select @@global.read_only = 0; +source include/wait_condition.inc; + +# measure the number of heartbeats in 10 seconds +--let $test_db_ok1_b = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 1) +--let $test_db_ok2_b = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 2) + +--sleep 11 + +--let $test_db_ok1_a = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 1) +--let $test_db_ok2_a = query_get_value(SHOW SHARDBEATER STATUS, Num_OK, 2) + +--let $assert_text = the number of shardbeats should be 5 +--let $assert_cond = ($test_db_ok1_a - $test_db_ok1_b) >= 5 +--source include/assert.inc + +--let $assert_text = the number of shardbeats should be 5 +--let $assert_cond = ($test_db_ok2_a - $test_db_ok2_b) >= 5 +--source include/assert.inc + +# ============================================================================= +# Cleanup +# ============================================================================= + +--echo Cleanup + +connection server_1; +stop shardbeater; +set global shardbeat_interval_ms = default; +set global shardbeat_vlog_level = default; +set global shardbeat_query_comment_format= default; +set global shardbeat_user= default; +set global shardbeat_table= default; +DROP DATABASE db1; +DROP DATABASE db2; +DROP USER IF EXISTS 'dba_scripts:sys.database'@'%'; + +let $sync_slave_connection= server_2; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; + +source include/rpl_end.inc; diff --git a/mysql-test/suite/sys_vars/r/all_vars.result b/mysql-test/suite/sys_vars/r/all_vars.result index fabfb6965377..fee39624d104 100644 --- a/mysql-test/suite/sys_vars/r/all_vars.result +++ b/mysql-test/suite/sys_vars/r/all_vars.result @@ -36,6 +36,8 @@ enable_acl_db_cache enable_acl_db_cache enable_hlc_bound enable_hlc_bound +enable_shardbeater +enable_shardbeater force_pk_for_equality_preds_on_pk force_pk_for_equality_preds_on_pk generated_random_password_length @@ -109,6 +111,10 @@ regexp_stack_limit regexp_time_limit regexp_time_limit resultset_metadata +shardbeat_interval_ms +shardbeat_interval_ms +shardbeat_vlog_level +shardbeat_vlog_level sql_require_primary_key sql_require_primary_key temptable_use_mmap diff --git a/mysql-test/suite/sys_vars/r/shardbeat_blocked_dbs_basic.result b/mysql-test/suite/sys_vars/r/shardbeat_blocked_dbs_basic.result new file mode 100644 index 000000000000..13c38728c341 --- /dev/null +++ b/mysql-test/suite/sys_vars/r/shardbeat_blocked_dbs_basic.result @@ -0,0 +1,21 @@ +set @@global.shardbeat_blocked_dbs='db100'; +select @@global.shardbeat_blocked_dbs; +@@global.shardbeat_blocked_dbs +db100 +set @@global.shardbeat_blocked_dbs = default; +select @@global.shardbeat_blocked_dbs; +@@global.shardbeat_blocked_dbs + +set @saved_shardbeat_blocked_dbs = @@global.shardbeat_blocked_dbs; +set @@global.shardbeat_blocked_dbs='db100, db200'; +set @@global.shardbeat_blocked_dbs='db100, db200, '; +set @@global.shardbeat_blocked_dbs=1; +ERROR 42000: Incorrect argument type to variable 'shardbeat_blocked_dbs' +select @@session.shardbeat_blocked_dbs; +ERROR HY000: Variable 'shardbeat_blocked_dbs' is a GLOBAL variable +set @@session.shardbeat_blocked_dbs='db100, db200'; +ERROR HY000: Variable 'shardbeat_blocked_dbs' is a GLOBAL variable and should be set with SET GLOBAL +set global shardbeat_blocked_dbs = @saved_shardbeat_blocked_dbs; +select @@global.shardbeat_blocked_dbs; +@@global.shardbeat_blocked_dbs + diff --git a/mysql-test/suite/sys_vars/r/shardbeat_query_comment_format_basic.result b/mysql-test/suite/sys_vars/r/shardbeat_query_comment_format_basic.result new file mode 100644 index 000000000000..c98911ac6a5f --- /dev/null +++ b/mysql-test/suite/sys_vars/r/shardbeat_query_comment_format_basic.result @@ -0,0 +1,19 @@ +set @@global.shardbeat_query_comment_format='WH:1 #.T:sb #.S:{shard} #.I:{ipaddr} #rs:{replicaset}'; +select @@global.shardbeat_query_comment_format; +@@global.shardbeat_query_comment_format +WH:1 #.T:sb #.S:{shard} #.I:{ipaddr} #rs:{replicaset} +set @@global.shardbeat_query_comment_format = default; +select @@global.shardbeat_query_comment_format; +@@global.shardbeat_query_comment_format + +set @saved_shardbeat_query_comment_format = @@global.shardbeat_query_comment_format; +set @@global.shardbeat_query_comment_format=1; +ERROR 42000: Incorrect argument type to variable 'shardbeat_query_comment_format' +select @@session.shardbeat_query_comment_format; +ERROR HY000: Variable 'shardbeat_query_comment_format' is a GLOBAL variable +set @@session.shardbeat_query_comment_format='WH:1 #.T:sb #.S:{shard} #.I:{ipaddr} #rs:{replicaset}'; +ERROR HY000: Variable 'shardbeat_query_comment_format' is a GLOBAL variable and should be set with SET GLOBAL +set global shardbeat_query_comment_format = @saved_shardbeat_query_comment_format; +select @@global.shardbeat_query_comment_format; +@@global.shardbeat_query_comment_format + diff --git a/mysql-test/suite/sys_vars/r/shardbeat_table_basic.result b/mysql-test/suite/sys_vars/r/shardbeat_table_basic.result new file mode 100644 index 000000000000..e85fb35cfd56 --- /dev/null +++ b/mysql-test/suite/sys_vars/r/shardbeat_table_basic.result @@ -0,0 +1,22 @@ +set @@global.shardbeat_table='blackhole'; +select @@global.shardbeat_table; +@@global.shardbeat_table +blackhole +set @@global.shardbeat_table = default; +select @@global.shardbeat_table; +@@global.shardbeat_table +blackhole +set @saved_shardbeat_table = @@global.shardbeat_table; +set @@global.shardbeat_table=1; +ERROR 42000: Incorrect argument type to variable 'shardbeat_table' +select @@session.shardbeat_table; +ERROR HY000: Variable 'shardbeat_table' is a GLOBAL variable +set @@session.shardbeat_table='blackhole'; +ERROR HY000: Variable 'shardbeat_table' is a GLOBAL variable and should be set with SET GLOBAL +set global shardbeat_table=""; +start shardbeater; +ERROR HY000: start shardbeater operation is disallowed on when shardbeater user is empty +set global shardbeat_table = @saved_shardbeat_table; +select @@global.shardbeat_table; +@@global.shardbeat_table +blackhole diff --git a/mysql-test/suite/sys_vars/r/shardbeat_user_basic.result b/mysql-test/suite/sys_vars/r/shardbeat_user_basic.result new file mode 100644 index 000000000000..cbd96622404b --- /dev/null +++ b/mysql-test/suite/sys_vars/r/shardbeat_user_basic.result @@ -0,0 +1,23 @@ +set @@global.shardbeat_user='dba_scripts'; +select @@global.shardbeat_user; +@@global.shardbeat_user +dba_scripts +set @@global.shardbeat_user = default; +select @@global.shardbeat_user; +@@global.shardbeat_user + +set @saved_shardbeat_user = @@global.shardbeat_user; +set @@global.shardbeat_user="dba_scripts:sys.database@'%'"; +set @@global.shardbeat_user=1; +ERROR 42000: Incorrect argument type to variable 'shardbeat_user' +select @@session.shardbeat_user; +ERROR HY000: Variable 'shardbeat_user' is a GLOBAL variable +set @@session.shardbeat_user='dba_scripts:sys.database'; +ERROR HY000: Variable 'shardbeat_user' is a GLOBAL variable and should be set with SET GLOBAL +set global shardbeat_user=''; +start shardbeater; +ERROR HY000: start shardbeater operation is disallowed on when shardbeater user is empty +set global shardbeat_user = @saved_shardbeat_user; +select @@global.shardbeat_user; +@@global.shardbeat_user + diff --git a/mysql-test/suite/sys_vars/t/shardbeat_blocked_dbs_basic.test b/mysql-test/suite/sys_vars/t/shardbeat_blocked_dbs_basic.test new file mode 100644 index 000000000000..3d43f25ad9d6 --- /dev/null +++ b/mysql-test/suite/sys_vars/t/shardbeat_blocked_dbs_basic.test @@ -0,0 +1,21 @@ +set @@global.shardbeat_blocked_dbs='db100'; +select @@global.shardbeat_blocked_dbs; + +set @@global.shardbeat_blocked_dbs = default; +select @@global.shardbeat_blocked_dbs; +set @saved_shardbeat_blocked_dbs = @@global.shardbeat_blocked_dbs; + +set @@global.shardbeat_blocked_dbs='db100, db200'; + +set @@global.shardbeat_blocked_dbs='db100, db200, '; + +--error ER_WRONG_TYPE_FOR_VAR +set @@global.shardbeat_blocked_dbs=1; + +--error ER_INCORRECT_GLOBAL_LOCAL_VAR +select @@session.shardbeat_blocked_dbs; +--error ER_GLOBAL_VARIABLE +set @@session.shardbeat_blocked_dbs='db100, db200'; + +set global shardbeat_blocked_dbs = @saved_shardbeat_blocked_dbs; +select @@global.shardbeat_blocked_dbs; diff --git a/mysql-test/suite/sys_vars/t/shardbeat_query_comment_format_basic.test b/mysql-test/suite/sys_vars/t/shardbeat_query_comment_format_basic.test new file mode 100644 index 000000000000..53769a968cf4 --- /dev/null +++ b/mysql-test/suite/sys_vars/t/shardbeat_query_comment_format_basic.test @@ -0,0 +1,17 @@ +set @@global.shardbeat_query_comment_format='WH:1 #.T:sb #.S:{shard} #.I:{ipaddr} #rs:{replicaset}'; +select @@global.shardbeat_query_comment_format; + +set @@global.shardbeat_query_comment_format = default; +select @@global.shardbeat_query_comment_format; +set @saved_shardbeat_query_comment_format = @@global.shardbeat_query_comment_format; + +--error ER_WRONG_TYPE_FOR_VAR +set @@global.shardbeat_query_comment_format=1; + +--error ER_INCORRECT_GLOBAL_LOCAL_VAR +select @@session.shardbeat_query_comment_format; +--error ER_GLOBAL_VARIABLE +set @@session.shardbeat_query_comment_format='WH:1 #.T:sb #.S:{shard} #.I:{ipaddr} #rs:{replicaset}'; + +set global shardbeat_query_comment_format = @saved_shardbeat_query_comment_format; +select @@global.shardbeat_query_comment_format; diff --git a/mysql-test/suite/sys_vars/t/shardbeat_table_basic.test b/mysql-test/suite/sys_vars/t/shardbeat_table_basic.test new file mode 100644 index 000000000000..0974884af17c --- /dev/null +++ b/mysql-test/suite/sys_vars/t/shardbeat_table_basic.test @@ -0,0 +1,22 @@ +set @@global.shardbeat_table='blackhole'; +select @@global.shardbeat_table; + +set @@global.shardbeat_table = default; +select @@global.shardbeat_table; +set @saved_shardbeat_table = @@global.shardbeat_table; + +--error ER_WRONG_TYPE_FOR_VAR +set @@global.shardbeat_table=1; + +--error ER_INCORRECT_GLOBAL_LOCAL_VAR +select @@session.shardbeat_table; +--error ER_GLOBAL_VARIABLE +set @@session.shardbeat_table='blackhole'; + +set global shardbeat_table=""; +connection default; +--error ER_DISALLOWED_OPERATION +start shardbeater; + +set global shardbeat_table = @saved_shardbeat_table; +select @@global.shardbeat_table; diff --git a/mysql-test/suite/sys_vars/t/shardbeat_user_basic.test b/mysql-test/suite/sys_vars/t/shardbeat_user_basic.test new file mode 100644 index 000000000000..71799d1e5448 --- /dev/null +++ b/mysql-test/suite/sys_vars/t/shardbeat_user_basic.test @@ -0,0 +1,24 @@ +set @@global.shardbeat_user='dba_scripts'; +select @@global.shardbeat_user; + +set @@global.shardbeat_user = default; +select @@global.shardbeat_user; +set @saved_shardbeat_user = @@global.shardbeat_user; + +set @@global.shardbeat_user="dba_scripts:sys.database@'%'"; + +--error ER_WRONG_TYPE_FOR_VAR +set @@global.shardbeat_user=1; + +--error ER_INCORRECT_GLOBAL_LOCAL_VAR +select @@session.shardbeat_user; +--error ER_GLOBAL_VARIABLE +set @@session.shardbeat_user='dba_scripts:sys.database'; + +set global shardbeat_user=''; +connection default; +--error ER_DISALLOWED_OPERATION +start shardbeater; + +set global shardbeat_user = @saved_shardbeat_user; +select @@global.shardbeat_user; diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index be8746d6d12b..67d571558ab7 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -902,7 +902,8 @@ ADD_DEPENDENCIES(binlog GenError) ADD_LIBRARY(rpl STATIC ${RPL_SOURCE}) TARGET_LINK_LIBRARIES(rpl sql_main) -SET (MASTER_SOURCE rpl_master.cc rpl_binlog_sender.cc binlog_reader.cc) +SET (MASTER_SOURCE rpl_master.cc rpl_binlog_sender.cc binlog_reader.cc + rpl_shardbeats.cc) ADD_DEPENDENCIES(rpl GenError) ADD_LIBRARY(master STATIC ${MASTER_SOURCE}) ADD_DEPENDENCIES(master GenError) diff --git a/sql/binlog.cc b/sql/binlog.cc index 248a97dfbc9f..8fded8d48c03 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -117,6 +117,7 @@ #include "sql/rpl_record.h" #include "sql/rpl_rli.h" // Relay_log_info #include "sql/rpl_rli_pdb.h" // Slave_worker +#include "sql/rpl_shardbeats.h" // Shardbeats_manager #include "sql/rpl_slave.h" #include "sql/rpl_slave_commit_order_manager.h" // Commit_order_manager #include "sql/rpl_transaction_ctx.h" @@ -1735,6 +1736,14 @@ bool MYSQL_BIN_LOG::write_transaction(THD *thd, binlog_cache_data *cache_data, */ ulonglong immediate_commit_timestamp = my_micro_time(); + // Use this timestamp to update the map of db trx times for shards + // in order to determine silent shards. + Shardbeats_manager *smgr = Shardbeats_manager::get(); + if (smgr) { + // We don't destroy shardbeats manager once created. + smgr->update_db_trx_times(thd, immediate_commit_timestamp); + } + /* When the original_commit_timestamp session variable is set to a value other than UNDEFINED_COMMIT_TIMESTAMP, it means that either the timestamp diff --git a/sql/lex.h b/sql/lex.h index 04dca1f966f1..7052b8ed0e9a 100644 --- a/sql/lex.h +++ b/sql/lex.h @@ -626,6 +626,7 @@ static const SYMBOL symbols[] = { {SYM("SESSION", SESSION_SYM)}, {SYM("SERVER", SERVER_SYM)}, {SYM("SET", SET_SYM)}, + {SYM("SHARDBEATER", SHARDBEATER)}, {SYM("SHARE", SHARE_SYM)}, {SYM("SHARED", SHARED_SYM)}, {SYM("SHOW", SHOW)}, diff --git a/sql/mysqld.cc b/sql/mysqld.cc index d2f6abf6484a..baf9a48b37e2 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -668,6 +668,7 @@ The documentation is based on the source files such as: #include "sql/rpl_mi.h" #include "sql/rpl_msr.h" // Multisource_info #include "sql/rpl_rli.h" // Relay_log_info +#include "sql/rpl_shardbeats.h" // Shardbeats_manager #include "sql/rpl_slave.h" // slave_load_tmpdir #include "sql/rpl_trx_tracking.h" #include "sql/sd_notify.h" // sd_notify_connect @@ -1276,7 +1277,7 @@ const char *binlog_error_action_list[] = {"IGNORE_ERROR", "ABORT_SERVER", uint32 gtid_executed_compression_period = 0; bool opt_log_unsafe_statements; bool opt_log_global_var_changes; -bool is_slave = false; +std::atomic is_slave(false); /* Counter to count the number of slave_stats_daemon threads created. Should be * at most 1. */ std::atomic slave_stats_daemon_thread_counter(0); @@ -2590,6 +2591,8 @@ static void mysqld_exit(int exit_code) { Srv_session::module_deinit(); delete_optimizer_cost_module(); clean_up_mutexes(); + Shardbeats_manager::destroy(); + my_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0); destroy_error_log(); #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE @@ -4874,6 +4877,10 @@ SHOW_VAR com_status_vars[] = { (char *)offsetof(System_status_var, com_stat[(uint)SQLCOM_SHOW_SLAVE_STAT]), SHOW_LONG_STATUS, SHOW_SCOPE_ALL}, + {"show_shardbeater_status", + (char *)offsetof(System_status_var, + com_stat[(uint)SQLCOM_SHOW_SHARDBEATER_STAT]), + SHOW_LONG_STATUS, SHOW_SCOPE_ALL}, {"show_status", (char *)offsetof(System_status_var, com_stat[(uint)SQLCOM_SHOW_STATUS]), SHOW_LONG_STATUS, SHOW_SCOPE_ALL}, @@ -4987,6 +4994,14 @@ SHOW_VAR com_status_vars[] = { {"show_raftlogs", (char *)offsetof(System_status_var, com_stat[(uint)SQLCOM_SHOW_RAFT_LOGS]), SHOW_LONG_STATUS, SHOW_SCOPE_ALL}, + {"shardbeater_start", + (char *)offsetof(System_status_var, + com_stat[(uint)SQLCOM_START_SHARDBEATER]), + SHOW_LONG_STATUS, SHOW_SCOPE_ALL}, + {"shardbeater_stop", + (char *)offsetof(System_status_var, + com_stat[(uint)SQLCOM_STOP_SHARDBEATER]), + SHOW_LONG_STATUS, SHOW_SCOPE_ALL}, {NullS, NullS, SHOW_LONG, SHOW_SCOPE_ALL}}; LEX_CSTRING sql_statement_names[(uint)SQLCOM_END + 1]; diff --git a/sql/mysqld.h b/sql/mysqld.h index 8657d2ffd0b7..abfa2f0553c2 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -540,7 +540,7 @@ extern uint write_cpu_limit_milliseconds; */ extern uint write_time_check_batch; -extern bool is_slave; +extern std::atomic is_slave; extern std::atomic slave_stats_daemon_thread_counter; extern bool bypass_write_throttle_admin_check; extern uint write_stats_count; diff --git a/sql/rpl_shardbeats.cc b/sql/rpl_shardbeats.cc new file mode 100644 index 000000000000..8117cfbbd63f --- /dev/null +++ b/sql/rpl_shardbeats.cc @@ -0,0 +1,967 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "sql/rpl_shardbeats.h" + +#include "include/mutex_lock.h" +#include "mysql/com_data.h" + +#include "sql/auth/auth_acls.h" +#include "sql/binlog.h" +#include "sql/dd/cache/dictionary_client.h" // dd::cache::Dictionary_client +#include "sql/log.h" +#include "sql/mysqld.h" +#include "sql/mysqld_thd_manager.h" +#include "sql/protocol_classic.h" +#include "sql/sql_class.h" +#include "sql/sql_lex.h" +#include "sql/sql_parse.h" +#include "sql/sql_profile.h" +#include "sql/sys_vars.h" + +#include + +#ifdef HAVE_PSI_INTERFACE +static PSI_cond_key key_COND_shardbeater_run_cond; +static PSI_mutex_key key_LOCK_shardbeater; +#endif /* HAVE_PSI_INTERFACE */ + +// expected table for sharbeats in each database +char *shardbeat_table = nullptr; +// ACL user to be used to inject shardbeats +char *shardbeat_user = nullptr; +// By default inject a shardbeat every minute +uint shardbeat_interval_ms = 60000; +// query comment to prefix the insert. +char *shardbeat_query_comment_format = nullptr; +// list of extra blocked dbs. +char *shardbeat_blocked_dbs = nullptr; +// verbosity level of logging +uint shardbeat_vlog_level = 0; +// enable the shardbeater +bool enable_shardbeater = true; + +static Sys_var_uint Sys_shardbeat_interval_ms( + "shardbeat_interval_ms", + "Interval in milliseconds in which shardbeats are injected on " + "silent databases", + GLOBAL_VAR(shardbeat_interval_ms), CMD_LINE(OPT_ARG), + VALID_RANGE(0, UINT_MAX), DEFAULT(60000), BLOCK_SIZE(1), NO_MUTEX_GUARD, + NOT_IN_BINLOG, ON_CHECK(NULL), ON_UPDATE(NULL)); + +static Sys_var_charptr Sys_shardbeat_table( + "shardbeat_table", "Name of table in which to insert a shardbeat", + GLOBAL_VAR(shardbeat_table), CMD_LINE(OPT_ARG), IN_FS_CHARSET, + DEFAULT("blackhole"), NO_MUTEX_GUARD, NOT_IN_BINLOG); + +static Sys_var_charptr Sys_shardbeat_user( + "shardbeat_user", "Name of user as which to insert regular shardbeats", + GLOBAL_VAR(shardbeat_user), CMD_LINE(OPT_ARG), IN_FS_CHARSET, DEFAULT(""), + NO_MUTEX_GUARD, NOT_IN_BINLOG); + +static bool shardbeat_comment_str_update(sys_var *, THD *, enum_var_type) { + Shardbeats_manager::get_or_create()->expand_format_comment(); + return false; +} + +static Sys_var_charptr Sys_shardbeat_insert_comment_fs( + "shardbeat_query_comment_format", + "Formatted string to be used for shardbeats insert", + GLOBAL_VAR(shardbeat_query_comment_format), CMD_LINE(OPT_ARG), + IN_FS_CHARSET, DEFAULT(""), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(NULL), + ON_UPDATE(shardbeat_comment_str_update)); + +static bool shardbeat_blocked_dbs_update(sys_var *, THD *, enum_var_type) { + Shardbeats_manager::get_or_create()->create_blocked_dbs(); + return false; +} + +static Sys_var_charptr Sys_shardbeat_blocked_dbs( + "shardbeat_blocked_dbs", + "List of comma separated database names on which not to insert shardbeats", + GLOBAL_VAR(shardbeat_blocked_dbs), CMD_LINE(OPT_ARG), IN_FS_CHARSET, + DEFAULT(""), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(NULL), + ON_UPDATE(shardbeat_blocked_dbs_update)); + +static Sys_var_uint Sys_shardbeat_vlog_level( + "shardbeat_vlog_level", + "Verbosity level of logging into mysqld error log for shardbeater", + GLOBAL_VAR(shardbeat_vlog_level), CMD_LINE(OPT_ARG), VALID_RANGE(0, 10), + DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(NULL), + ON_UPDATE(NULL)); + +static Sys_var_bool Sys_enable_shardbeater("enable_shardbeater", + "Enables Shardbeater", + GLOBAL_VAR(enable_shardbeater), + CMD_LINE(OPT_ARG), DEFAULT(true), + NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(NULL), ON_UPDATE(NULL)); + +// The global singleton for shardbeat interactions +std::unique_ptr shardbeats_mngr; + +namespace { + +const int INFREQUENT_LOGGING = 200131; + +typedef std::pair shard_rs_t; + +// Get mysql databases and their shard and replicaset metadata +// Skip all databases for which Shardbeats should not be generated +// Skip databases without trx metadata +// Skip databases which are "olm" : "1" +// return 1 on failure, 0 on success +int get_dbs_with_filter(THD *thd, + const std::unordered_set &skip_databases, + std::vector> *dbs) { + dd::cache::Dictionary_client::Auto_releaser releaser(thd->dd_client()); + + std::vector schema_vector; + if (thd->dd_client()->fetch_global_components(&schema_vector)) { + return 1; + } + + for (const dd::Schema *schema : schema_vector) { + std::string dbname = schema->name().c_str(); + + // skip BLOCKED_DBS + if (skip_databases.find(dbname) != skip_databases.end()) { + continue; + } + std::string db_metadata = schema->get_db_metadata().c_str(); + + // convert the db metadata into the shard and replicaset ids. + shard_rs_t tmp_info; + bool olm = false; + if (db_metadata.empty() || + THD::get_shard_rs_id(db_metadata, &tmp_info, &olm) || olm) { + // error in parsing db_metadata + static int error_msg_count = 0; + if ((error_msg_count++ % INFREQUENT_LOGGING == 0 && shardbeat_vlog_level >= 2) || + shardbeat_vlog_level >= 4) { + // NO_LINT_DEBUG + sql_print_error( + "shardbeater db_metadata absent/parsing error/ongoing olm for db: " + "%s " + "metadata:'%s' olm:%d", + dbname.c_str(), db_metadata.c_str(), olm); + } + continue; + } + + dbs->push_back(std::make_pair(std::move(dbname), tmp_info)); + } + + return 0; +} + +const char *EXPECTED_INTF_NAME = "eth0"; + +// Returns a best effort ipv6 address for the local host. +// to be used in query comments for external systems like +// wormhole tailers. Multiple ipv6 addresses can be present +// for same interface e.g. eth0 and this code uses a one time +// getnameinfo/(lookup into /etc/hosts) to determine the +// ipv6 addresses that is the best to be used. +std::string get_ipv6addr() { + struct ifaddrs *ifap = nullptr; + struct ifaddrs *ifa = nullptr; + int retc = getifaddrs(&ifap); + if (retc != 0) { + // NO_LINT_DEBUG + sql_print_error("shardbeater getifaddrs call returned error %d", retc); + return ""; + } + + std::string ret_value; + for (ifa = ifap; ifa != NULL; ifa = ifa->ifa_next) { + // check it is IPV6 + if (ifa->ifa_addr->sa_family != AF_INET6) { + continue; + } + + char addressBuffer[INET6_ADDRSTRLEN]; + inet_ntop(AF_INET6, &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr, + addressBuffer, INET6_ADDRSTRLEN); + + // HACK_ALERT + // We only look for eth0 interface now. + std::string intf_name(ifa->ifa_name); + if (intf_name != EXPECTED_INTF_NAME) { + continue; + } + + // Filter out ipv6 addresses that are not global + std::string value(addressBuffer); + size_t colon_count = std::count(value.begin(), value.end(), ':'); + if (colon_count != 7) { + continue; + } + + char hbuf[NI_MAXHOST]; + if (getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in6), hbuf, + sizeof(hbuf), NULL, 0, NI_NAMEREQD)) + continue; + + // NO_LINT_DEBUG + sql_print_information("hostname found = %s ipv6 addr = %s", hbuf, + addressBuffer); + ret_value = std::move(value); + break; + } + + freeifaddrs(ifap); + return ret_value; +} + +const char *STD_BLOCKED_DBS[] = {"mysql", "sys", "information_schema", + "performance_schema", "mtr"}; +} // namespace + +// The main function of the shardbeater thread +extern "C" void *generate_shardbeats(void *shardbeat_singleton) { + // Setup this thread + // borrowed from One_thread_connection_handler::add_connection + my_thread_init(); + + // NO_LINT_DEBUG + sql_print_information("Shard level Heartbeater thread started"); + + Shardbeats_manager *smgr = (Shardbeats_manager *)shardbeat_singleton; + DBUG_ASSERT(smgr != nullptr); + // Call the actual execute function. + smgr->execute(); + + // NO_LINT_DEBUG + sql_print_information("Shard level Heartbeater thread finished"); + + my_thread_end(); + pthread_exit(0); + return 0; +} + +Shardbeats_manager *Shardbeats_manager::get() { return shardbeats_mngr.get(); } + +Shardbeats_manager *Shardbeats_manager::get_or_create() { + if (!shardbeats_mngr) { + shardbeats_mngr.reset(new Shardbeats_manager); + } + return shardbeats_mngr.get(); +} + +void Shardbeats_manager::destroy() { shardbeats_mngr.reset(); } + +Shardbeats_manager::Shardbeats_manager() + : sb_thd(nullptr), + ro_ON_ts(0), + ro_OFF_ts(0), + current_ro_state(true), + is_slave_ON_ts(0), + is_slave_OFF_ts(0), + current_is_slave_state(false) { + // initialize the mutexes + init_mutexes(); + + // Save ipv6 address one time for WH comment. + ipv6_addr = get_ipv6addr(); + + create_blocked_dbs(); + + expand_format_comment(); +} + +Shardbeats_manager::~Shardbeats_manager() { cleanup_mutexes(); } + +void Shardbeats_manager::expand_format_comment() { + std::string fs; + if (shardbeat_query_comment_format && + !fs.assign(shardbeat_query_comment_format).empty()) { + boost::replace_all(fs, "{ipaddr}", ipv6_addr); + } + + mysql_mutex_lock(&LOCK_shardbeater); + format_str_comment.swap(fs); + mysql_mutex_unlock(&LOCK_shardbeater); +} + +void Shardbeats_manager::create_blocked_dbs() { + std::unordered_set skip_databases_tmp; + size_t num_elems = sizeof(STD_BLOCKED_DBS) / sizeof(const char *); + for (size_t i = 0; i < num_elems; i++) { + skip_databases_tmp.insert(STD_BLOCKED_DBS[i]); + } + skip_databases_iter = skip_databases; + std::string bdbstr; + if (shardbeat_blocked_dbs && !bdbstr.assign(shardbeat_blocked_dbs).empty()) { + // assume spaces and other white spaces after the commas + std::vector db_names; + boost::split(db_names, bdbstr, boost::is_any_of("\t ,"), + boost::token_compress_on); + for (const std::string &db : db_names) { + skip_databases_tmp.insert(db); + } + } + + mysql_mutex_lock(&LOCK_shardbeater); + skip_databases.swap(skip_databases_tmp); + mysql_mutex_unlock(&LOCK_shardbeater); +} + +void Shardbeats_manager::maintain_ro_stats(bool ro, + unsigned long long current_ts_ms) { + if (current_ro_state != ro) { + if (ro) { + // NO_LINT_DEBUG + sql_print_information( + "shardbeater: read_only detected as ON. read_only was OFF from " + "%llu(ms) " + "to " + "%llu(ms)", + ro_OFF_ts.load(), current_ts_ms); + ro_ON_ts = current_ts_ms; + } else { + // NO_LINT_DEBUG + sql_print_information( + "shardbeater: read_only detected as OFF. read_only was ON from " + "%llu(ms) " + "to " + "%llu(ms)", + ro_ON_ts.load(), current_ts_ms); + ro_OFF_ts = current_ts_ms; + } + current_ro_state = ro; + ro_failure_count = 0; + } + + if (ro) { + ro_failure_count++; + } +} + +void Shardbeats_manager::maintain_is_slave_stats( + bool isslave, unsigned long long current_ts_ms) { + if (current_is_slave_state != isslave) { + if (isslave) { + // NO_LINT_DEBUG + sql_print_information( + "shardbeater: is_replica detected as ON. is_replica was OFF from " + "%llu(ms) " + "to " + "%llu(ms)", + is_slave_OFF_ts.load(), current_ts_ms); + is_slave_ON_ts = current_ts_ms; + } else { + // NO_LINT_DEBUG + sql_print_information( + "shardbeater: is_replica detected as OFF. is_replica was ON from " + "%llu(ms) " + "to " + "%llu(ms)", + is_slave_ON_ts.load(), current_ts_ms); + is_slave_OFF_ts = current_ts_ms; + } + current_is_slave_state = isslave; + is_slave_failure_count = 0; + } + + if (isslave) { + is_slave_failure_count++; + } +} + +void Shardbeats_manager::check_and_insert_shardbeat( + const std::pair &dbinfo, unsigned long long SLA) { + std::string table_name(shardbeat_table); + // table name should be valid. Paranoid check as table name + // could have flipped. + if (table_name.empty()) { + return; + } + + const std::string &db(dbinfo.first); + if (skip_databases_iter.find(db) != skip_databases_iter.end()) { + return; + } + + const std::string &shard_id(dbinfo.second.first); + const std::string &rs_id(dbinfo.second.second); + if (shard_id.empty() || rs_id.empty()) { + // skip databases with empty db_metadata + static int error_msg_count = 0; + if ((error_msg_count++ % INFREQUENT_LOGGING == 0 && shardbeat_vlog_level >= 2) || + shardbeat_vlog_level >= 4) { + // NO_LINT_DEBUG + sql_print_information( + "shardbeater db_metadata empty for db. Will skip: db:'%s' shard:'%s' " + "rs:'%s'", + db.c_str(), shard_id.c_str(), rs_id.c_str()); + } + return; + } + + unsigned long long current_ts = my_micro_time(); + + const auto itr = db_last_trx_time.find(db); + bool skip = false; + if (itr != db_last_trx_time.end()) { + unsigned long long last_trx_ts = itr->second; + // last trx is in the past and a SLA period has passed. + skip = ((last_trx_ts + SLA) > current_ts); + if (skip) { + if (shardbeat_vlog_level >= 3) { + // NO_LINT_DEBUG + sql_print_information( + "Skipping db %s due to within SLA: %llu(us) %llu(us)", db.c_str(), + current_ts, last_trx_ts); + } + // insert heartbeats for silent shards only + return; + } + } + + int64_t &sb_num = shardbeat_count[db]; + sb_num++; + + int64_t sb_val = (int64_t)current_ts / 1000000; + + // borrowed from do_command + sb_thd->lex->set_current_select(nullptr); + sb_thd->clear_error(); // Clear error message + sb_thd->get_stmt_da()->reset_diagnostics_area(); + + // One typical comment format can be. + // shardbeat_query_comment_format = + // WH:1 #.T:sb #.S:{shard} #.I:{ipaddr} #rs:{replicaset} + char s_buf[300]; + + std::string comment; + if (!format_str_comment_iter.empty()) { + comment = format_str_comment_iter; + boost::replace_all(comment, "{shard}", shard_id); + boost::replace_all(comment, "{replicaset}", rs_id); + sprintf(s_buf, "/* %s */ INSERT INTO %s.%s VALUES(%ld);", comment.c_str(), + db.c_str(), table_name.c_str(), sb_val); + } else { + sprintf(s_buf, "INSERT INTO %s.%s VALUES(%ld);", db.c_str(), + table_name.c_str(), sb_val); + } + size_t len = strlen(s_buf); + char *buf = sb_thd->strmake(s_buf, len); + + if (shardbeat_vlog_level >= 3) { + // NO_LINT_DEBUG + sql_print_information("Inserting heartbeat %s %s sb_num:%ld sb_val:%ld %s", + db.c_str(), table_name.c_str(), sb_num, sb_val, + s_buf); + } + +#if defined(ENABLED_PROFILING) + sb_thd->profiling->start_new_query(); + sb_thd->profiling->set_query_source(buf, len); +#endif + + Protocol_classic *protocol = sb_thd->get_protocol_classic(); + COM_DATA com_data; + protocol->create_command(&com_data, COM_QUERY, (uchar *)buf, len); + + auto start_time = std::chrono::steady_clock::now(); + + dispatch_command(sb_thd, &com_data, COM_QUERY); + + uint64_t total_duration_ms = + std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_time) + .count(); + + post_write(db, current_ts); + +#if defined(ENABLED_PROFILING) + sb_thd->profiling->finish_current_query(); +#endif + + if (sb_thd->is_error()) { + if (shardbeat_vlog_level >= 2) { + // NO_LINT_DEBUG + sql_print_error( + "Error hit while inserting shardbeat: %s errcode: %u %s " + "duration_ms: %u sb_num:%ld sb_val:%ld", + db.c_str(), sb_thd->get_stmt_da()->mysql_errno(), + sb_thd->get_stmt_da()->message_text(), total_duration_ms, sb_num, + sb_val); + } + } +} + +void Shardbeats_manager::execute() { + sb_thd = new THD; + sb_thd->set_new_thread_id(); + sb_thd->thread_stack = (char *)&sb_thd; + sb_thd->store_globals(); + pthread_setname_np(pthread_self(), "shardbeats"); + + mysql_thread_set_psi_id(sb_thd->thread_id()); + + // NO_LINT_DEBUG + sql_print_information("shardbeater thread id: %lu", + sb_thd->system_thread_id()); + + Global_THD_manager *thd_manager = Global_THD_manager::get_instance(); + thd_manager->add_thd(sb_thd); + + // update the run state + transition_to_state(State::RUNNING); + + Security_context *sctx = sb_thd->security_context(); + if (acl_getroot(sb_thd, sctx, shardbeat_user, "localhost", "::1", nullptr)) { + // NO_LINT_DEBUG + sql_print_error( + "Failed to get ACL for shardbeat_user. Exit shardbeat thread"); + cleanup_thread(); + return; + } + + bool is_super = sctx->check_access(SUPER_ACL); + // NO_LINT_DEBUG + sql_print_information("Shardbeater user %s was resolved has super_priv: %d", + shardbeat_user, is_super); + + // borrowed from thd_prepare_connection + lex_start(sb_thd); + + Protocol_classic *protocol = sb_thd->get_protocol_classic(); + protocol->add_client_capability(CLIENT_MULTI_QUERIES); + protocol->set_vio(nullptr); + int64_t loop_cnt = 0; + + unsigned long long SLA_iter = shardbeat_interval_ms * 1000; + // Allow 1/10th of the SLA to be waited for locks per db. + // We are assuming that DDL like RENAME table will likely only + // happen for 1 database at a given time. + sb_thd->variables.lock_wait_timeout_nsec = (SLA_iter * 100); + sb_thd->variables.high_priority_lock_wait_timeout_nsec = (SLA_iter * 100); + // NO_LINT_DEBUG + sql_print_information("shardbeats SLA is %u milliseconds", + shardbeat_interval_ms); + + while (!sb_thd->killed && is_running()) { + // in microseconds. + unsigned long long SLA = shardbeat_interval_ms * 1000; + + if (SLA != SLA_iter) { + sb_thd->variables.lock_wait_timeout_nsec = (SLA * 100); + sb_thd->variables.high_priority_lock_wait_timeout_nsec = (SLA * 100); + // NO_LINT_DEBUG + sql_print_information("shardbeats SLA is %u milliseconds", + shardbeat_interval_ms); + SLA_iter = SLA; + } + + // Each loop iteration sleeps for half-life of SLA + usleep(SLA / 2); + loop_cnt++; + + if (!enable_shardbeater) { + static int error_msg_count = 0; + if (error_msg_count++ % INFREQUENT_LOGGING == 0 || + shardbeat_vlog_level >= 3) { + // NO_LINT_DEBUG + sql_print_information( + "shardbeater is disabled. Skipping sharbeats insertion"); + } + continue; + } + + std::string table_name(shardbeat_table); + // table name should be valid. + if (table_name.empty()) { + continue; + } + + // Early check on read_only. This injects the shardbeat only + // on the primary and filters out all the secondaries + bool ro_fail = check_readonly(sb_thd, false); + unsigned long long current_ts = my_micro_time(); + maintain_ro_stats(ro_fail, current_ts / 1000 /* milliseconds */); + if (ro_fail) { + continue; + } + + bool is_slave_check = is_slave; + maintain_is_slave_stats(is_slave_check, + current_ts / 1000 /* milliseconds */); + + // Do not insert shardbeats on replicas till they become primary + if (is_slave_check) { + continue; + } + + // make a copy of the DB trx time. Assuming this map is + // small around 100 entries max. This is to prevent having + // to hold LOCK_log + get_db_trx_times(&db_last_trx_time); + + // initialize things which are not supposed to change in an iteration + // but are allowed to be updated via global sysvar + mysql_mutex_lock(&LOCK_shardbeater); + format_str_comment_iter = format_str_comment; + skip_databases_iter = skip_databases; + mysql_mutex_unlock(&LOCK_shardbeater); + + // We fetch dbs each time, as Shard Migrations makes the set dynamic + std::vector> shards; + get_dbs_with_filter(sb_thd, skip_databases_iter, &shards); + + for (const auto &dbinfo : shards) { + check_and_insert_shardbeat(dbinfo, SLA); + + // Watch for shutdown initiation from server + if (sb_thd->killed) { + break; + } + } + } + + cleanup_thread(); +} + +void Shardbeats_manager::cleanup_thread() { + // NO_LINT_DEBUG + sql_print_information("Exiting loop of generate_shardbeats"); + + transition_to_state(State::STOPPED); + + // Cleanup and exit + sb_thd->release_resources(); + + Global_THD_manager *thd_manager = Global_THD_manager::get_instance(); + thd_manager->remove_thd(sb_thd); + delete sb_thd; + sb_thd = nullptr; +} + +Shardbeats_manager::Shard_stats::Shard_stats() + : last_errors(HISTORY_LENGTH), + last_hbs(HISTORY_LENGTH), + num_shardbeats(0), + num_failures(0) {} + +void Shardbeats_manager::post_write(const std::string &db, + unsigned long long current_ts) { + mysql_mutex_lock(&LOCK_shardbeater); + Shard_stats &stats = db_sb_stats[db]; + if (stats.db.empty()) { + stats.db = db; + } + + if (sb_thd->is_error()) { + if (stats.last_errors.empty() || stats.last_errors.back().mysql_errno != + sb_thd->get_stmt_da()->mysql_errno()) { + // A new error type needs to be pushed + Error_details ed{sb_thd->get_stmt_da()->mysql_errno(), current_ts, + current_ts}; + stats.last_errors.push_back(std::move(ed)); + } else { + stats.last_errors.back().error_time_last = current_ts; + } + stats.num_failures++; + } else { + stats.last_hbs.push_back(current_ts); + stats.num_shardbeats++; + } + mysql_mutex_unlock(&LOCK_shardbeater); +} + +std::string Shardbeats_manager::Error_details::to_string() const { + char buf[40]; + sprintf(buf, "ErrCode: %u (%llu, %llu)", mysql_errno, + error_time_start / 1000000, error_time_last / 1000000); + return buf; +} + +std::string Shardbeats_manager::Shard_stats::last_hbs_as_str() const { + std::vector vals; + for (unsigned long long i : last_hbs) { + char buf[10]; + sprintf(buf, "%llu", i / 1000000); + vals.push_back(buf); + } + std::stringstream s; + std::copy(vals.begin(), vals.end(), + std::ostream_iterator(s, ",")); + return s.str(); +} + +std::string Shardbeats_manager::Shard_stats::last_errors_as_str() const { + std::vector errs; + for (const Error_details &ed : last_errors) { + errs.push_back(ed.to_string()); + } + + std::stringstream s; + std::copy(errs.begin(), errs.end(), + std::ostream_iterator(s, ",")); + return s.str(); +} + +bool Shardbeats_manager::show_status_cmd(THD *thd) { + DBUG_ENTER("show_shardbeater_status"); + bool ret = false; + List field_list; + Protocol *protocol = thd->get_protocol(); + + field_list.push_back(new Item_empty_string("Running", 3)); + field_list.push_back(new Item_empty_string("User", USERNAME_LENGTH + 1)); + field_list.push_back(new Item_empty_string("Db", 30)); + field_list.push_back(new Item_return_int("Num_OK", 10, MYSQL_TYPE_LONGLONG)); + field_list.push_back( + new Item_return_int("Num_Fail", 10, MYSQL_TYPE_LONGLONG)); + + field_list.push_back(new Item_empty_string("Last_few_inserts", 50)); + field_list.push_back(new Item_empty_string("Last_few_failures", 150)); + field_list.push_back(new Item_return_int("Last Read Only OFF Time(ms)", 10, + MYSQL_TYPE_LONGLONG)); + field_list.push_back(new Item_return_int("Last Read Only ON Time(ms)", 10, + MYSQL_TYPE_LONGLONG)); + field_list.push_back(new Item_return_int("Last Is Not Replica Time(ms)", 10, + MYSQL_TYPE_LONGLONG)); + field_list.push_back( + new Item_return_int("Last Is Replica Time(ms)", 10, MYSQL_TYPE_LONGLONG)); + field_list.push_back( + new Item_return_int("Current Time(ms)", 10, MYSQL_TYPE_LONGLONG)); + + unsigned long long current_time_ms = my_micro_time() / 1000; + if (thd->send_result_metadata(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) { + ret = true; + goto err; + } + + mysql_mutex_lock(&LOCK_shardbeater); + if (db_sb_stats.empty()) { + protocol->start_row(); + + protocol->store(((state == State::RUNNING) ? "YES" : "NO"), + &my_charset_bin); + protocol->store(shardbeat_user, &my_charset_bin); + protocol->store("", &my_charset_bin); + protocol->store(0ULL); + protocol->store(0ULL); + protocol->store("", &my_charset_bin); + protocol->store("", &my_charset_bin); + protocol->store(ro_OFF_ts); + protocol->store(ro_ON_ts); + protocol->store(is_slave_OFF_ts); + protocol->store(is_slave_ON_ts); + protocol->store(current_time_ms); + if (protocol->end_row()) { + ret = true; + goto err; + } + } + + for (auto &iter : db_sb_stats) { + const std::string &db = iter.first; + const Shard_stats &stats = iter.second; + protocol->start_row(); + + protocol->store(((state == State::RUNNING) ? "YES" : "NO"), + &my_charset_bin); + protocol->store(shardbeat_user, &my_charset_bin); + protocol->store(db.c_str(), &my_charset_bin); + protocol->store(stats.num_shardbeats); + protocol->store(stats.num_failures); + protocol->store(stats.last_hbs_as_str().c_str(), &my_charset_bin); + protocol->store(stats.last_errors_as_str().c_str(), &my_charset_bin); + + protocol->store(ro_OFF_ts); + protocol->store(ro_ON_ts); + protocol->store(is_slave_OFF_ts); + protocol->store(is_slave_ON_ts); + protocol->store(current_time_ms); + + if (protocol->end_row()) { + ret = true; + goto err; + } + } + +err: + + my_eof(thd); + mysql_mutex_unlock(&LOCK_shardbeater); + DBUG_RETURN(ret); +} + +bool Shardbeats_manager::is_running() { + mysql_mutex_lock(&LOCK_shardbeater); + bool ret = (state == State::RUNNING); + mysql_mutex_unlock(&LOCK_shardbeater); + return ret; +} + +void Shardbeats_manager::transition_to_state(Shardbeats_manager::State s) { + mysql_mutex_lock(&LOCK_shardbeater); + state = s; + mysql_cond_broadcast(&COND_shardbeater_run_cond); + mysql_mutex_unlock(&LOCK_shardbeater); +} + +// Fork the shardbeater thread. +bool Shardbeats_manager::start_shardbeater_thread(THD *thd) { + DBUG_ENTER("start_shardbeater_thread"); + + Security_context *sctx = thd->security_context(); + if (!sctx->check_access(SUPER_ACL)) { + my_error(ER_SPECIFIC_ACCESS_DENIED_ERROR, MYF(0), "SUPER"); + DBUG_RETURN(true); + } + + std::string shardbeat_user_str(shardbeat_user); + if (shardbeat_user_str.empty()) { + my_error(ER_DISALLOWED_OPERATION, MYF(0), "start shardbeater", + " when shardbeater user is empty"); + DBUG_RETURN(true); + } + + std::string table_name(shardbeat_table); + if (table_name.empty()) { + my_error(ER_DISALLOWED_OPERATION, MYF(0), "start shardbeater", + " when shardbeat_table is empty"); + DBUG_RETURN(true); + } + + mysql_mutex_lock(&LOCK_shardbeater); + if (state != State::STOPPED) { + my_error(ER_DISALLOWED_OPERATION, MYF(0), "start shardbeater", + " when shardbeater thread is running"); + mysql_mutex_unlock(&LOCK_shardbeater); + DBUG_RETURN(true); + } + + if (is_slave) { + // NO_LINT_DEBUG + sql_print_warning( + "Shardbeater thread started but instance " + "is a replica. No shardbeats will be inserted " + "till promotion"); + } + + // Indicate that it is starting + state = State::STARTING; + + my_thread_handle th; + if ((mysql_thread_create(0, &th, &connection_attrib, generate_shardbeats, + (void *)this))) { + // NO_LINT_DEBUG + sql_print_error("Could not create shardbeater thread"); + state = State::STOPPED; + my_printf_error(ER_UNKNOWN_ERROR, "Shardbeater Thread Creation Error", + MYF(0)); + mysql_mutex_unlock(&LOCK_shardbeater); + DBUG_RETURN(true); + } + + int start_wait_timeout = 10; + while (state != State::RUNNING && start_wait_timeout > 0) { + struct timespec abstime; + set_timespec(&abstime, 2); + + mysql_cond_timedwait(&COND_shardbeater_run_cond, &LOCK_shardbeater, + &abstime); + start_wait_timeout -= 2; + } + + bool success = (state == State::RUNNING); + + if (success) { + my_ok(thd); + } else { + my_printf_error(ER_UNKNOWN_ERROR, + "Shardbeater Thread didn't start in 10 seconds", MYF(0)); + } + mysql_mutex_unlock(&LOCK_shardbeater); + DBUG_RETURN(!success); +} + +// set the state to STOPPING and wait for the shardbeater +// thread to exit. +bool Shardbeats_manager::stop_shardbeater_thread(THD *thd) { + DBUG_ENTER("stop_shardbeater_thread"); + + Security_context *sctx = thd->security_context(); + if (!sctx->check_access(SUPER_ACL)) { + my_error(ER_SPECIFIC_ACCESS_DENIED_ERROR, MYF(0), "SUPER"); + DBUG_RETURN(true); + } + + mysql_mutex_lock(&LOCK_shardbeater); + if (state != State::RUNNING) { + my_error(ER_DISALLOWED_OPERATION, MYF(0), "stop shardbeater", + " when shardbeater thread is not running"); + mysql_mutex_unlock(&LOCK_shardbeater); + DBUG_RETURN(true); + } + + state = State::STOPPING; + int stop_wait_timeout = 10; + while (state != State::STOPPED && stop_wait_timeout > 0) { + struct timespec abstime; + set_timespec(&abstime, 2); + + mysql_cond_timedwait(&COND_shardbeater_run_cond, &LOCK_shardbeater, + &abstime); + stop_wait_timeout -= 2; + } + + bool success = (state == State::STOPPED); + if (success) { + my_ok(thd); + } else { + my_printf_error(ER_UNKNOWN_ERROR, + "Shardbeater Thread didn't stop in 10 seconds", MYF(0)); + } + mysql_mutex_unlock(&LOCK_shardbeater); + DBUG_RETURN(!success); +} + +void Shardbeats_manager::cleanup_mutexes() { + mysql_mutex_destroy(&LOCK_shardbeater); + mysql_cond_destroy(&COND_shardbeater_run_cond); +} + +void Shardbeats_manager::init_mutexes() { + mysql_mutex_init(key_LOCK_shardbeater, &LOCK_shardbeater, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_shardbeater_run_cond, &COND_shardbeater_run_cond); +} + +// Make a copy of the map from db -> last_trx_time +// for the shardbeater, so that LOCK_log does not need +// to be held for long. +void Shardbeats_manager::get_db_trx_times( + std::unordered_map *db_trx_map) { + mysql_mutex_lock(mysql_bin_log.get_log_lock()); + *db_trx_map = m_last_trx_time; + mysql_mutex_unlock(mysql_bin_log.get_log_lock()); +} + +// Update the trx time for a particular shard/db +void Shardbeats_manager::update_db_trx_times( + THD *thd, unsigned long long oc_flush_timestamp) { + DBUG_ENTER("MYSQL_BIN_LOG::update_db_trx_time(THD *)"); + mysql_mutex_assert_owner(mysql_bin_log.get_log_lock()); + + if (!thd->db().str) { + DBUG_VOID_RETURN; + } + + const auto &db_lex_str = thd->db(); + std::string db(db_lex_str.str, db_lex_str.length); + + m_last_trx_time[db] = oc_flush_timestamp; + DBUG_VOID_RETURN; +} diff --git a/sql/rpl_shardbeats.h b/sql/rpl_shardbeats.h new file mode 100644 index 000000000000..95873cb9010e --- /dev/null +++ b/sql/rpl_shardbeats.h @@ -0,0 +1,233 @@ +#ifndef RPL_SHARDBEATS_H_INCLUDED +#define RPL_SHARDBEATS_H_INCLUDED + +#include +#include +#include +#include +#include +#include +#include "mysql/psi/mysql_cond.h" +#include "mysql/psi/mysql_mutex.h" + +class THD; + +// GLOBAL Variables +// the table that should be present on all shards +// where shardbeats are injected. +extern char *shardbeat_table; +// the user as which to add shardbeats +extern char *shardbeat_user; +// interval between shardbeats on silent shards +extern uint shardbeat_interval_ms; + +/** + * This class contains the functionality for Shardbeats. + * Shardbeats inject simple INSERTs/heartbeats for each user facing database. + * There is a designated heartbeat/shardbeat table in each database/shard, + * where we can insert an integer. Ideally this table will have BLACKHOLE + * Engine. Shardbeats will get replicated to the secondary and will help + * downstream tailers distinguish silent shards from failure situations. + * These shardbeats are injected on a global SLA that is guaranteed + * by the MYSQL service in agreement with downstream services. + * + * With the expectation that a shardbeat will be injected every + * SLA milliseconds, the absence of writes on the tailer side + * will lead to this logic. + * 1. no writes at all on a shard - Transaction problem or + * READ ONLY SHARD + * 2. no writes at all on any shard - Replication broken or + * mysql is unhealthy + * 3. Shardbeats but no user writes - Silent Shard + */ +class Shardbeats_manager { + public: + /** + * static accessor for singleton which will return + * nullptr if shardbeats is not enabled yet. + */ + static Shardbeats_manager *get(); + + /** + * static accessor for singleton which will never return + * nullptr. + */ + static Shardbeats_manager *get_or_create(); + + static void destroy(); + + ~Shardbeats_manager(); + + enum class State { STOPPED, STARTING, RUNNING, STOPPING }; + + // A simple struct to keep range of time + // that a mysql error consecutively happened + // on a db/shard + class Error_details { + public: + uint mysql_errno; + unsigned long long error_time_start; + unsigned long long error_time_last; + + // Return a diagnostic string for this error + std::string to_string() const; + }; + + class Shard_stats { + public: + static const size_t HISTORY_LENGTH = 5; + Shard_stats(); + std::string db; + + // Some commonly expected error codes. + // ER_OPTION_PREVENTS_STATEMENT (Read only) + // ER_NO_SUCH_TABLE (shardbeater table is missing) + // ER_LOCK_WAIT_TIMEOUT (during OLM) + // ER_DB_READ_ONLY (during cutover phase of OLM) + // maintain the last few errors + boost::circular_buffer last_errors; + + // Preserve timestamps of last few successful heartbeats + boost::circular_buffer last_hbs; + + // total successful shardbeats since startup + unsigned long long num_shardbeats; + + // total failed shardbeats since startup + unsigned long long num_failures; + + // Return a diagnostic string for the last_hbs array + std::string last_hbs_as_str() const; + + // Return a diagnostic string for the last_errors array + std::string last_errors_as_str() const; + }; + + // Get a copy of the db_trx_map. + // Protected with mysql_bin_log.LOCK_log + void get_db_trx_times( + std::unordered_map *db_trx_map); + + /** + * Update the last transaction time for this database. + * If the Database has a trx in ordered commit, it is not silent + * for that time + SLA + */ + void update_db_trx_times(THD *thd, unsigned long long oc_flush_timestamp); + + // Stop the Thread for Shardbeat injection + bool start_shardbeater_thread(THD *thd); + + // Start the Thread for Shardbeat injection + bool stop_shardbeater_thread(THD *thd); + + // Show status of the shardbeater + bool show_status_cmd(THD *thd); + + // Used by the shardbeater thread to update run state. + void transition_to_state(State s); + + // Is the state of shardbeater = SHARDBEATER_RUNNING + bool is_running(); + + // Actual function which is run inside the shardbeats thread. + void execute(); + + // format comment on update of global variable. + void expand_format_comment(); + + // create latest list of blocked dbs on update of global variable. + void create_blocked_dbs(); + + private: + // Private constructor + Shardbeats_manager(); + + // Once INSERT of sheardbeat has finished, + // maintain stats for success/failure + void post_write(const std::string &db, unsigned long long current_ts); + + void cleanup_thread(); + + void maintain_ro_stats(bool ro, unsigned long long current_ts_ms); + + void maintain_is_slave_stats(bool isslave, unsigned long long current_ts_ms); + + // Initialize the mutexes + void init_mutexes(); + + // Destroy the mutexes + void cleanup_mutexes(); + + typedef std::pair shard_rs_t; + // The core function for checking and inserting + // shardbeat into a silent shard + void check_and_insert_shardbeat( + const std::pair &dbinfo, unsigned long long SLA); + + private: + // The thd of the shardbeater thread + THD *sb_thd = nullptr; + + // shard -> shardbeat_stats for reporting + std::unordered_map db_sb_stats; + + // The last time in microsecs that a trx for this db + // went into ordered commit. protected by mysql_bin_log.LOCK_log + std::unordered_map m_last_trx_time; + + // conditional variable which is used to wait + // for running state changes + mysql_cond_t COND_shardbeater_run_cond; + + // mutex which protects all key datastructures, + // e.g. state and db_sb_stats + mysql_mutex_t LOCK_shardbeater; + + // Running state of the shardbeater + State state = State::STOPPED; + + // list of blocked dbs + std::unordered_set skip_databases; + + // blocked dbs list created for each iter. + std::unordered_set skip_databases_iter; + + // formatted string for query comment + std::string format_str_comment; + + // formatted string for each iter + std::string format_str_comment_iter; + + // read only failure logging helpers + uint64_t ro_failure_count = 0; + std::atomic ro_ON_ts; + std::atomic ro_OFF_ts; + std::atomic current_ro_state; + + // read only failure logging helpers + uint64_t is_slave_failure_count = 0; + std::atomic is_slave_ON_ts; + std::atomic is_slave_OFF_ts; + std::atomic current_is_slave_state; + + // convenience member to store the + // database -> last trx time during each iteration of + // shardbeats loop. + std::unordered_map db_last_trx_time; + + // To maintain the insertion id of shardbeat for each db. + std::unordered_map shardbeat_count; + + // convenience member to store db -> shard loop for each + // iteration of shardbeats loop + std::vector> shards; + + // the local ipv6 address which is provided as a comment + // in trx. + std::string ipv6_addr; +}; + +// extern Shardbeats_manager rpl_shardbeats; + +#endif diff --git a/sql/sp.cc b/sql/sp.cc index f1e974884a15..1f3e8c7c2021 100644 --- a/sql/sp.cc +++ b/sql/sp.cc @@ -2255,6 +2255,7 @@ uint sp_get_flags_for_command(LEX *lex) { case SQLCOM_SHOW_PROC_CODE: case SQLCOM_SHOW_SLAVE_HOSTS: case SQLCOM_SHOW_SLAVE_STAT: + case SQLCOM_SHOW_SHARDBEATER_STAT: case SQLCOM_SHOW_STATUS: case SQLCOM_SHOW_STATUS_FUNC: case SQLCOM_SHOW_STATUS_PROC: diff --git a/sql/sql_admission_control.cc b/sql/sql_admission_control.cc index 7446a53d3597..d1adaf4ae3d5 100644 --- a/sql/sql_admission_control.cc +++ b/sql/sql_admission_control.cc @@ -143,6 +143,7 @@ bool filter_command(enum_sql_command sql_command) { case SQLCOM_SHOW_PROCESSLIST: case SQLCOM_SHOW_MASTER_STAT: case SQLCOM_SHOW_SLAVE_STAT: + case SQLCOM_SHOW_SHARDBEATER_STAT: case SQLCOM_SHOW_GRANTS: case SQLCOM_SHOW_CREATE: case SQLCOM_SHOW_CHARSETS: diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 37389939f1f0..905ac4184bf8 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -622,7 +622,44 @@ void THD::set_transaction(Transaction_ctx *transaction_ctx) { m_transaction.reset(transaction_ctx); } -static std::string get_shard_id(const std::string &db_metadata) { +int THD::get_shard_rs_id(const std::string &db_metadata, + std::pair *shard_rs, + bool *olm) { + try { + rapidjson::Document db_metadata_root; + // The local_db_metadata format should be: + // {"shard":"", "replicaset":""} + if (db_metadata_root.Parse(db_metadata.c_str()).HasParseError() || + !db_metadata_root.IsObject()) { + return 1; + } + const auto iter = db_metadata_root.FindMember("shard"); + std::string shard_id; + if (iter != db_metadata_root.MemberEnd()) { + shard_id = iter->value.GetString(); + } + + const auto iter2 = db_metadata_root.FindMember("rs"); + std::string rs_id; + if (iter2 != db_metadata_root.MemberEnd()) { + rs_id = iter2->value.GetString(); + } + + const auto iter3 = db_metadata_root.FindMember("olm"); + bool olm_val_b = false; + if (iter3 != db_metadata_root.MemberEnd()) { + std::string olm_val_s = iter3->value.GetString(); + olm_val_b = (olm_val_s == "1"); + } + *shard_rs = std::make_pair(std::move(shard_id), std::move(rs_id)); + *olm = olm_val_b; + } catch (const std::exception &) { + return 1; + } + return 0; +} + +std::string THD::get_shard_id(const std::string &db_metadata) { try { rapidjson::Document db_metadata_root; // The local_db_metadata format should be: diff --git a/sql/sql_class.h b/sql/sql_class.h index 7ecfeb54708f..23f8be3b9623 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2286,6 +2286,12 @@ class THD : public MDL_context_owner, */ bool release_explicit_snapshot(); + static int get_shard_rs_id(const std::string &db_metadata, + std::pair *shard_rs, + bool *olm); + + static std::string get_shard_id(const std::string &db_metadata); + private: USER_CONN *m_user_connect; diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index d4c09a5db761..a35eb37e207a 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -134,6 +134,7 @@ #include "sql/rpl_lag_manager.h" #include "sql/rpl_master.h" // register_slave #include "sql/rpl_rli.h" // mysql_show_relaylog_events +#include "sql/rpl_shardbeats.h" // start/stop_heartbeater_thread #include "sql/rpl_slave.h" // change_master_cmd #include "sql/session_tracker.h" #include "sql/set_var.h" @@ -797,6 +798,7 @@ void init_sql_command_flags(void) { sql_command_flags[SQLCOM_SHOW_MASTER_STAT] = CF_STATUS_COMMAND; sql_command_flags[SQLCOM_SHOW_MEMORY_STATUS] = CF_STATUS_COMMAND; sql_command_flags[SQLCOM_SHOW_SLAVE_STAT] = CF_STATUS_COMMAND; + sql_command_flags[SQLCOM_SHOW_SHARDBEATER_STAT] = CF_STATUS_COMMAND; sql_command_flags[SQLCOM_SHOW_CREATE_PROC] = CF_STATUS_COMMAND; sql_command_flags[SQLCOM_SHOW_CREATE_FUNC] = CF_STATUS_COMMAND; sql_command_flags[SQLCOM_SHOW_CREATE_TRIGGER] = CF_STATUS_COMMAND; @@ -1077,6 +1079,7 @@ void init_sql_command_flags(void) { sql_command_flags[SQLCOM_SHOW_PROCESSLIST] |= CF_ALLOW_PROTOCOL_PLUGIN; sql_command_flags[SQLCOM_SHOW_MASTER_STAT] |= CF_ALLOW_PROTOCOL_PLUGIN; sql_command_flags[SQLCOM_SHOW_SLAVE_STAT] |= CF_ALLOW_PROTOCOL_PLUGIN; + sql_command_flags[SQLCOM_SHOW_SHARDBEATER_STAT] |= CF_ALLOW_PROTOCOL_PLUGIN; sql_command_flags[SQLCOM_SHOW_GRANTS] |= CF_ALLOW_PROTOCOL_PLUGIN; sql_command_flags[SQLCOM_SHOW_CREATE] |= CF_ALLOW_PROTOCOL_PLUGIN; sql_command_flags[SQLCOM_SHOW_CHARSETS] |= CF_ALLOW_PROTOCOL_PLUGIN; @@ -3953,6 +3956,19 @@ int mysql_execute_command(THD *thd, bool first_level, ulonglong *last_timer) { res = show_master_status(thd); break; } + case SQLCOM_SHOW_SHARDBEATER_STAT: { + /* Accept one of two privileges */ + if (check_global_access(thd, SUPER_ACL)) goto error; + Shardbeats_manager *smgr = Shardbeats_manager::get(); + if (!smgr) { + my_printf_error(ER_DISALLOWED_OPERATION, "Shardbeats not ON yet", + MYF(0)); + goto error; + } + + res = smgr->show_status_cmd(thd); + break; + } case SQLCOM_SHOW_ENGINE_STATUS: { if (check_global_access(thd, PROCESS_ACL)) goto error; res = ha_show_status(thd, lex->create_info->db_type, HA_ENGINE_STATUS); @@ -4104,6 +4120,14 @@ int mysql_execute_command(THD *thd, bool first_level, ulonglong *last_timer) { break; } + case SQLCOM_START_SHARDBEATER: { + res = Shardbeats_manager::get_or_create()->start_shardbeater_thread(thd); + break; + } + case SQLCOM_STOP_SHARDBEATER: { + res = Shardbeats_manager::get_or_create()->stop_shardbeater_thread(thd); + break; + } case SQLCOM_SLAVE_START: { res = start_slave_cmd(thd); break; diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 03b0d7f47f54..ae18de3a188c 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -1248,6 +1248,7 @@ void warn_about_deprecated_binary(THD *thd) %token REQUIRE_TABLE_PRIMARY_KEY_CHECK_SYM 996 /* MYSQL */ %token STREAM_SYM 997 /* MYSQL */ %token OFF_SYM 998 /* SQL-1999-R */ +%token SHARDBEATER 999 /* FB MYSQL */ /* Here is an intentional gap in token numbers. @@ -2159,6 +2160,7 @@ simple_statement: | find { $$= nullptr; } | get_diagnostics { $$= nullptr; } | group_replication { $$= nullptr; } + | shardbeater { $$= nullptr; } | grant { $$= nullptr; } | handler_stmt | help { $$= nullptr; } @@ -8606,6 +8608,19 @@ group_replication: } ; +shardbeater: + START_SYM SHARDBEATER + { + LEX *lex=Lex; + lex->sql_command = SQLCOM_START_SHARDBEATER; + } + | STOP_SYM SHARDBEATER + { + LEX *lex=Lex; + lex->sql_command = SQLCOM_STOP_SHARDBEATER; + } + ; + slave: slave_start start_slave_opts{} | STOP_SYM SLAVE opt_slave_thread_option_list opt_channel @@ -13246,6 +13261,10 @@ show_param: { Lex->sql_command = SQLCOM_SHOW_SLAVE_STAT; } + | SHARDBEATER STATUS_SYM + { + Lex->sql_command = SQLCOM_SHOW_SHARDBEATER_STAT; + } | CREATE PROCEDURE_SYM sp_name { LEX *lex= Lex;