diff --git a/bin/yugabyted b/bin/yugabyted index f69ddd703e8..0ccec490492 100755 --- a/bin/yugabyted +++ b/bin/yugabyted @@ -153,10 +153,13 @@ PREFIX = { 'configure_read_replica modify': "", 'configure_read_replica delete': "", 'xcluster' : "Setup and manage xCluster Replication.", - 'xcluster checkpoint': "", + 'xcluster create_checkpoint': "", + 'xcluster add_to_checkpoint': "", 'xcluster set_up': "", + 'xcluster add_to_replication': "", 'xcluster status': "", - 'xcluster delete': "", + 'xcluster delete_replication': "", + 'xcluster remove_database_from_replication': "", 'backup': "", 'restore': "", 'connect' : "", @@ -188,10 +191,14 @@ USAGE = { 'configure_read_replica modify': "yugabyted configure_read_replica modify [flags]", 'configure_read_replica delete' : "yugabyted configure_read_replica delete", 'xcluster' : "yugabyted xcluster [command] [flags]", - 'xcluster checkpoint': "yugabyted xcluster checkpoint [flags]", + 'xcluster create_checkpoint': "yugabyted xcluster create_checkpoint [flags]", + 'xcluster add_to_checkpoint': "yugabyted xcluster add_to_checkpoint [flags]", 'xcluster set_up': "yugabyted xcluster set_up [flags]", - 'xcluster status': "yugabyted status set_up [flags]", - 'xcluster delete': "yugabyted status set_up [flags]", + 'xcluster add_to_replication': "yugabyted xcluster add_to_replication [flags]", + 'xcluster status': "yugabyted xcluster status [flags]", + 'xcluster delete_replication': "yugabyted xcluster delete_replication [flags]", + 'xcluster remove_database_from_replication': "yugabyted xcluster " + + "remove_database_from_replication [flags]", 'backup' : "yugabyted backup [flags]", 'restore' : "yugabyted restore [flags]", 'connect' : "yugabyted connect [command] [flags]", @@ -291,30 +298,52 @@ EXAMPLE = { " the YSQL catalog upgrade:\n" + "yugabyted finalize_upgrade --upgrade_ysql_timeout \n\n", 'xcluster' : "# Checkpoint a new xcluster replication:\n" + - "yugabyted xcluster checkpoint --replication_id= " + + "yugabyted xcluster create_checkpoint --replication_id= " + "--databases=\n\n" + "# Set-up xcluster replication:\n" + "yugabyted xcluster set_up --replication_id= " + "--target_address=\n\n" + "# Show status of outbound and inbound replications:\n" + "yugabyted xcluster status --replication_id=\n\n" + + "# Add a database to existing xcluster replication:\n" + + "## First add the database to the checkpoint:\n" + "yugabyted xcluster add_to_checkpoint --replication_id= " + + "--databases=\n\n" + + "## Then add the database to the replication:\n" + + "yugabyted xcluster add_to_replication --replication_id= " + + "--databases= " + + "--target_address=\n\n" "# Delete a xcluster replication:\n" + "yugabyted xcluster status --replication_id= " + + "--target_address=\n\n" + + "# Delete databases from xcluster replication:\n" + + "yugabyted xcluster remove_database_from_replication " + + "--replication_id= " + + "--databases= " + "--target_address=\n\n", - "xcluster_checkpoint" : "# Checkpoint a new xcluster replication:\n" + - "yugabyted xcluster checkpoint --replication_id= " + + "xcluster_create_checkpoint" : "# Checkpoint a new xcluster replication:\n" + + "yugabyted xcluster create_checkpoint --replication_id= " + + "--databases=\n\n", + "xcluster_add_to_checkpoint" : "# Add databases to checkpoint:\n" + + "yugabyted xcluster add_to_checkpoint --replication_id= " + "--databases=\n\n", "xcluster_set_up" : "# Set-up xcluster replication:\n" + "yugabyted xcluster set_up --replication_id= " + - "--target_address=\n\n" + - "# Set-up xcluster replication (after completing bootstrap):\n" + - "yugabyted xcluster set_up --replication_id= " + + "--target_address= --bootstrap_done\n\n", + "xcluster_add_to_replication" : "# Add database to xcluster replication:\n" + + "yugabyted xcluster add_to_replication --replication_id= " + + "--database= " + "--target_address= --bootstrap_done\n\n", "xcluster_status" : "# Show status of outbound and inbound replications:\n" + "yugabyted xcluster status --replication_id=\n\n", - "xcluster_delete" : "# Delete a xcluster replication:\n" + + "xcluster_delete_replication" : "# Delete a xcluster replication:\n" + "yugabyted xcluster status --replication_id= " + "--target_address=\n\n", + "xcluster_remove_database_from_replication": "# Delete databases from xcluster replication:\n" + + "yugabyted xcluster remove_database_from_replication " + + "--replication_id= " + + "--databases= " + + "--target_address=\n\n", } @@ -1414,23 +1443,7 @@ class ControlScript(object): else: self.display_pitr(pitr_object, pitr_yb_api_type, schedules) - def get_xcluster_error_msg(self, err, dbs_to_be_bootstrapped=None, - dbs_not_to_be_bootstrapped=None, - replication_id=None): - - # variable naming format: ___err - checkpoint_empty_db_yb_admin_err = "Database should have at least one table in order to " +\ - "be part of xCluster replication" - checkpoint_empty_db_ybd_err = "couldn't set xcluster checkpoint. " +\ - "All provided databases should have atleast 1 table in them." - checkpoint_rep_id_in_use_yb_admin_err = "xClusterOutboundReplicationGroup " +\ - "{} already exists".format(replication_id) - checkpoint_rep_id_in_use_ybd_err = "xCluster Replication with replication ID: " +\ - "'{}' already exists. Please use another replication ID".format(replication_id) - - set_up_db_not_found_yb_admin_err = "YSQL keyspace name not found" - set_up_object_not_found_yb_admin_err = "Could not find matching" - + def get_complete_bootstrap_msg(self, dbs_to_be_bootstrapped, dbs_not_to_be_bootstrapped): complete_bootstrap_err_msg = "" if dbs_to_be_bootstrapped and len(dbs_to_be_bootstrapped) != 0: complete_bootstrap_err_msg += "Please complete bootstrapping for " @@ -1449,25 +1462,70 @@ class ControlScript(object): complete_bootstrap_err_msg += "`{}` on target cluster.".format( ','.join(dbs_not_to_be_bootstrapped)) + return complete_bootstrap_err_msg - set_up_db_not_found_ybd_err = "The database(s) is missing from the target cluster. " +\ - complete_bootstrap_err_msg - set_up_object_not_found_ybd_err = "All of the SQL Objects are not present in the " +\ - "target database. " + complete_bootstrap_err_msg - - delete_replication_id_not_found_yb_admin_err = "xClusterOutboundReplicationGroup " +\ - "{} not found".format(replication_id) - delete_replication_id_not_found_ybd_err = "No outbound replication " +\ - "found with replication_id: {}".format(replication_id) - + def get_xcluster_error_msg(self, err, command, dbs_to_be_bootstrapped=None, + dbs_not_to_be_bootstrapped=None, + replication_id=None): + # variable naming format: <__err # Map format: : - yb_admin_err_msg_to_ybd_err_msg = { - checkpoint_empty_db_yb_admin_err: checkpoint_empty_db_ybd_err, - checkpoint_rep_id_in_use_yb_admin_err: checkpoint_rep_id_in_use_ybd_err, - set_up_db_not_found_yb_admin_err: set_up_db_not_found_ybd_err, - set_up_object_not_found_yb_admin_err: set_up_object_not_found_ybd_err, - delete_replication_id_not_found_yb_admin_err: delete_replication_id_not_found_ybd_err, - } + yb_admin_err_msg_to_ybd_err_msg = dict() + if command == "create_checkpoint": + empty_db_yb_admin_err = "Database should have at least one table in " +\ + "order to be part of xCluster replication" + empty_db_ybd_err = "couldn't create xcluster checkpoint. " +\ + "All provided databases should have atleast 1 table in them." + rep_id_in_use_yb_admin_err = "xClusterOutboundReplicationGroup " +\ + "{} already exists".format(replication_id) + rep_id_in_use_ybd_err = "xCluster Replication with replication ID: " +\ + "'{}' already exists. Please use another replication ID".format(replication_id) + yb_admin_err_msg_to_ybd_err_msg = { + empty_db_yb_admin_err: empty_db_ybd_err, + rep_id_in_use_yb_admin_err: rep_id_in_use_ybd_err, + } + elif command == "add_to_checkpoint": + empty_db_yb_admin_err = "Database should have at least one table in " +\ + "order to be part of xCluster replication" + empty_db_ybd_err = "couldn't add to xcluster checkpoint. " +\ + "Provided database should have atleast 1 table in it." + yb_admin_err_msg_to_ybd_err_msg = { + empty_db_yb_admin_err: empty_db_ybd_err, + } + elif command == "set_up": + db_not_found_yb_admin_err = "YSQL keyspace name not found" + object_not_found_yb_admin_err = "Could not find matching" + + complete_bootstrap_err_msg = self.get_complete_bootstrap_msg(dbs_to_be_bootstrapped, + dbs_not_to_be_bootstrapped) + + db_not_found_ybd_err = "The database(s) is missing from the target cluster. " +\ + complete_bootstrap_err_msg + object_not_found_ybd_err = "All of the SQL Objects are not present in the " +\ + "target database. " + complete_bootstrap_err_msg + yb_admin_err_msg_to_ybd_err_msg = { + db_not_found_yb_admin_err: db_not_found_ybd_err, + object_not_found_yb_admin_err: object_not_found_ybd_err, + } + elif command == "add_to_replication": + db_not_found_yb_admin_err = "YSQL keyspace name not found" + object_not_found_yb_admin_err = "Could not find matching" + db_already_replicated_yb_admin_err = "xCluster ReplicationGroup already contains " +\ + "all requested tables" + + complete_bootstrap_err_msg = self.get_complete_bootstrap_msg(dbs_to_be_bootstrapped, + dbs_not_to_be_bootstrapped) + + db_not_found_ybd_err = "The database is missing from the target cluster. " +\ + complete_bootstrap_err_msg + object_not_found_ybd_err = "All of the SQL Objects are not present in the " +\ + "target database. " + complete_bootstrap_err_msg + db_already_replicated_ybd_err = "The database is already added to the replication." + + yb_admin_err_msg_to_ybd_err_msg = { + db_not_found_yb_admin_err: db_not_found_ybd_err, + object_not_found_yb_admin_err: object_not_found_ybd_err, + db_already_replicated_yb_admin_err: db_already_replicated_ybd_err, + } for yb_admin_err, ybd_err in yb_admin_err_msg_to_ybd_err_msg.items(): if yb_admin_err in err: @@ -1476,26 +1534,27 @@ class ControlScript(object): return "" # Checkpoint xcluster replication - def xcluster_checkpoint(self): + def xcluster_create_checkpoint(self): if not self.script.is_running(): Output.log_error_and_exit(Output.make_red("ERROR") + ": No YugabyteDB node " + "is running in the data_dir {}".format(self.configs.saved_data.get("data_dir"))) - master_addrs = self.get_current_masters_from_api(self.advertise_ip()) + master_addrs = self.configs.saved_data.get("current_masters") if master_addrs == '': Output.log_error_and_exit(Output.make_red("Error") + ": cannot retreive the masters.") replication_id = self.configs.temp_data.get("xcluster_replication_id") databases = self.configs.temp_data.get("xcluster_databases") - out, err, ret_code = YBAdminProxy.checkpoint_xcluster(master_addrs, replication_id, + out, err, ret_code = YBAdminProxy.create_checkpoint_xcluster(master_addrs, replication_id, databases) is_bootstrap_required = False if ret_code: - err_msg = self.get_xcluster_error_msg(err, replication_id=replication_id) + err_msg = self.get_xcluster_error_msg(err, command = "create_checkpoint", + replication_id=replication_id) if not err_msg: - err_msg = "couldn't set xcluster checkpoint." + err_msg = "couldn't create xcluster checkpoint." Output.log_error_and_exit(Output.make_red("Error") + ": {}".format(err_msg)) else: @@ -1507,7 +1566,7 @@ class ControlScript(object): status_display_info = {} status_details = [ - (Output.make_yellow("Status"), "xCluster checkpoint success.") + (Output.make_yellow("Status"), "xCluster create checkpoint success.") ] all_databases = databases.split(',') @@ -1580,8 +1639,132 @@ class ControlScript(object): if is_bootstrap_required: Output.print_out(bootstrap_help_msg) - # TODO: Maybe add info that table creation is required on target cluster even if - # bootstrap is not required. + # Add to checkpoint xcluster replication + def xcluster_add_to_checkpoint(self): + if not self.script.is_running(): + Output.log_error_and_exit(Output.make_red("ERROR") + ": No YugabyteDB node " + + "is running in the data_dir {}".format(self.configs.saved_data.get("data_dir"))) + + master_addrs = self.configs.saved_data.get("current_masters") + if master_addrs == '': + Output.log_error_and_exit(Output.make_red("Error") + ": cannot retreive the masters.") + + replication_id = self.configs.temp_data.get("xcluster_replication_id") + databases = self.configs.temp_data.get("xcluster_databases") + databases = databases.split(',') + + # Format for not_checkpointed_dbs map: : + not_checkpointed_dbs = dict() + + databases_to_be_bootstrapped = [] + databases_not_to_be_bootstrapped = [] + + for database in databases: + out, err, ret_code = YBAdminProxy.add_to_checkpoint_xcluster(master_addrs, + replication_id, database) + if ret_code: + err_msg = self.get_xcluster_error_msg(err, command = "add_to_checkpoint", + replication_id=replication_id) + if not err_msg: + err_msg = "couldn't add to xcluster checkpoint." + + not_checkpointed_dbs[database] = err_msg + continue + else: + Output.log("Database {} added to checkpoint.".format(database)) + bootstrap_required_msg = "Bootstrap is required" + if bootstrap_required_msg in out: + databases_to_be_bootstrapped.append(database) + else: + databases_not_to_be_bootstrapped.append(database) + + status_details = [] + status_display_info = {} + + if len(not_checkpointed_dbs) == 0: + status_details = [ + (Output.make_yellow("Status"), "xCluster add database(s) to checkpoint success.") + ] + elif len(not_checkpointed_dbs) == len(databases): + status_details = [ + (Output.make_yellow("Status"), "xCluster add database(s) to checkpoint failed."), + (Output.make_yellow("Errors"), "Following are the errors for each database:") + ] + + for database, error in not_checkpointed_dbs.items(): + status_details.extend( + [(Output.make_yellow(""), "{}: {}".format(database, error))] + ) + Output.print_out(self.get_status_string_common(status_details, status_display_info)) + sys.exit(1) + else: + status_details = [ + (Output.make_yellow("Status"), "xCluster add database(s) to checkpoint " + + "partially succesful."), + (Output.make_yellow("Errors"), "Following are the errors for database(s) " + + "failed to checkpoint:") + ] + + for database, error in not_checkpointed_dbs.items(): + status_details.extend( + [(Output.make_yellow(""), "{}: {}".format(database, error))] + ) + + bootstrap_msg = "" + schema_creation_msg = "" + + if len(databases_to_be_bootstrapped) != 0: + if len(databases_to_be_bootstrapped) > 1: + bootstrap_msg = "Bootstrap is required for databases " + \ + "`{}`.".format(','.join(databases_to_be_bootstrapped)) + else: + bootstrap_msg = "Bootstrap is required for database " + \ + "`{}`.".format(databases_to_be_bootstrapped[0]) + + bootstrap_help_msg = "For each database which requires bootstrap run the following " +\ + "commands to perform a backup and restore.\n" + bootstrap_help_msg += "# Run on source:\n" + bootstrap_help_msg += "./yugabyted backup " +\ + "--cloud_storage_uri {} ".format(Output.make_cyan( + "")) +\ + " --database {}".format(Output.make_cyan("")) +\ + " --base_dir {}\n".format(Output.make_cyan("")) + bootstrap_help_msg += "# Run on target:\n" + bootstrap_help_msg += "./yugabyted restore " +\ + "--cloud_storage_uri {} ".format(Output.make_cyan( + "")) +\ + " --database {}".format(Output.make_cyan("")) +\ + " --base_dir {}\n".format(Output.make_cyan("")) + + if len(databases_not_to_be_bootstrapped) != 0: + if len(databases_not_to_be_bootstrapped) > 1: + schema_creation_msg = "Before running the xcluster setup command, Databases " +\ + "`{}` and schema needs to be applied on the target cluster.".format( + ','.join(databases_not_to_be_bootstrapped)) + else: + schema_creation_msg = "Before running the xcluster setup command, Database " +\ + "`{}` and schema needs to be applied on the target cluster.".format( + databases_not_to_be_bootstrapped[0]) + + + if bootstrap_msg and schema_creation_msg: + status_details.extend([ + (Output.make_yellow("Bootstrapping"), bootstrap_msg), + (Output.make_yellow(""), schema_creation_msg), + ]) + elif bootstrap_msg: + status_details.extend([ + (Output.make_yellow("Bootstrapping"), bootstrap_msg), + ]) + elif schema_creation_msg: + status_details.extend([ + (Output.make_yellow("Bootstrapping"), schema_creation_msg), + ]) + + Output.print_out(self.get_status_string_common(status_details, status_display_info)) + + if len(databases_to_be_bootstrapped) != 0: + Output.print_out(bootstrap_help_msg) # Set-up xcluster replication def xcluster_set_up(self): @@ -1589,14 +1772,13 @@ class ControlScript(object): Output.log_error_and_exit(Output.make_red("ERROR") + ": No YugabyteDB node " + "is running in the data_dir {}".format(self.configs.saved_data.get("data_dir"))) - master_addresses, leader_master = self.get_current_masters_and_leader_from_api( - self.advertise_ip()) - if master_addresses == '' or leader_master == '': + master_addresses = self.configs.saved_data.get("current_masters") + if master_addresses == '': Output.log_error_and_exit(Output.make_red("Error") + ": cannot retreive the masters.") replication_id = self.configs.temp_data.get("xcluster_replication_id") - replication_id_with_dbs = self.get_databases_for_xcluster(leader_master, replication_id) + replication_id_with_dbs = self.get_databases_data_for_xcluster(replication_id) if len(replication_id_with_dbs) == 0: Output.log_error_and_exit(Output.make_red("Error") + ": cannot retreive the " + "databases for replication_id {}.".format(replication_id)) @@ -1624,16 +1806,6 @@ class ControlScript(object): bootstrap_not_required_dbs = [database for database in databases_list \ if database not in bootstrap_required_databases] - if len(bootstrap_required_databases) != 0: - if not self.configs.temp_data.get("xcluster_bootstrap_done"): - # TODO: Maybe add the steps for bootstrap for backup/restore - err_msg = "Bootstrap required for following databases. Please do a backup of " + \ - "the databases and restore them to target database the rerun the " + \ - "`yugabyted xcluster set_up` command with --bootstrap_done flag." - for database in bootstrap_required_databases: - err_msg += "\n* " + database - Output.print_out(err_msg) - sys.exit(1) Output.init_animation("Setting up xCluster replication....") out, err, ret_code = YBAdminProxy.set_up_xcluster(master_addresses, replication_id, @@ -1648,7 +1820,7 @@ class ControlScript(object): ] Output.print_out(self.get_status_string_common(status_details, status_display_info)) else: - err_msg = self.get_xcluster_error_msg(err, + err_msg = self.get_xcluster_error_msg(err, command = "set_up", dbs_to_be_bootstrapped=bootstrap_required_databases, dbs_not_to_be_bootstrapped=bootstrap_not_required_dbs) if not err_msg: @@ -1656,6 +1828,91 @@ class ControlScript(object): Output.log_error_and_exit(Output.make_red("Error") + ": {}".format(err_msg)) + # Set-up xcluster replication + def xcluster_add_to_replication(self): + if not self.script.is_running(): + Output.log_error_and_exit(Output.make_red("ERROR") + ": No YugabyteDB node " + + "is running in the data_dir {}".format(self.configs.saved_data.get("data_dir"))) + + master_addresses = self.configs.saved_data.get("current_masters") + if master_addresses == '': + Output.log_error_and_exit(Output.make_red("Error") + ": cannot retreive the masters.") + + replication_id = self.configs.temp_data.get("xcluster_replication_id") + databases = self.configs.temp_data.get("xcluster_databases") + databases = databases.split(',') + + dbs_failed_to_add = dict() + + Output.init_animation("Adding database(s) to xCluster replication....") + + for database in databases: + out, ret_code = YBAdminProxy.xcluster_is_bootstrap_required(master_addresses, + replication_id, + database) + is_bootstrap_required = False + if ret_code: + Output.log(Output.make_red("Error") + ": couldn't check if bootstrap is " + + "required or not for database {}".format(database)) + else: + bootstrap_check_msg = "Bootstrap is required for setting up xCluster replication" + if bootstrap_check_msg in out: + is_bootstrap_required = True + + out, err, ret_code = YBAdminProxy.add_to_replication_xcluster(master_addresses, + replication_id, database, + self.configs.temp_data.get("xcluster_target_addresses")) + + if ret_code: + err_msg = None + if is_bootstrap_required: + err_msg = self.get_xcluster_error_msg(err, command = "add_to_replication", + dbs_to_be_bootstrapped=[database], + dbs_not_to_be_bootstrapped=[]) + else: + err_msg = self.get_xcluster_error_msg(err, command = "add_to_replication", + dbs_to_be_bootstrapped=[], + dbs_not_to_be_bootstrapped=[database]) + if not err_msg: + err_msg = "Add to xCluster replication not successful." + + dbs_failed_to_add[database] = err_msg + + Output.update_animation("", Output.ANIMATION_STOP) + status_details = [] + status_display_info = {} + + if len(dbs_failed_to_add) == 0: + status_details = [ + (Output.make_yellow("Status"), "Add database(s) to xCluster replication successful.") + ] + elif len(dbs_failed_to_add) == len(databases): + status_details = [ + (Output.make_yellow("Status"), "Add database(s) to xCluster replication failed."), + (Output.make_yellow("Errors"), "Following are the errors for each database:") + ] + + for database, error in dbs_failed_to_add.items(): + status_details.extend( + [(Output.make_yellow(""), "{}: {}".format(database, error))] + ) + Output.print_out(self.get_status_string_common(status_details, status_display_info)) + sys.exit(1) + else: + status_details = [ + (Output.make_yellow("Status"), "xCluster add database(s) to replication " + + "partially succesful."), + (Output.make_yellow("Errors"), "Following are the errors for database(s) " + + "failed to checkpoint:") + ] + + for database, error in dbs_failed_to_add.items(): + status_details.extend( + [(Output.make_yellow(""), "{}: {}".format(database, error))] + ) + + Output.print_out(self.get_status_string_common(status_details, status_display_info)) + # Show Outbound and Inbound replications status def xcluster_status(self): if not self.script.is_running(): @@ -1670,9 +1927,11 @@ class ControlScript(object): replication_id = self.configs.temp_data.get("xcluster_replication_id") xcluster_info = self.get_xcluster_info(leader_master) + if len(xcluster_info) == 0: + Output.log_error_and_exit("Couldn't fetch xcluster info.") outbound_replication_status = self.get_outbound_xcluster_replication_statuses(xcluster_info, - leader_master, replication_id if replication_id != "" else None) + replication_id if replication_id != "" else None) inbound_replication_status = self.get_inbound_xcluster_replication_statuses(xcluster_info, master_addresses, replication_id if replication_id != "" else None) @@ -1699,12 +1958,12 @@ class ControlScript(object): Output.log_error_and_exit("No inbound and Outbound replication groups.") # Delete xcluster replication - def xcluster_delete(self): + def xcluster_delete_replication(self): if not self.script.is_running(): Output.log_error_and_exit(Output.make_red("ERROR") + ": No YugabyteDB node " + "is running in the data_dir {}".format(self.configs.saved_data.get("data_dir"))) - master_addresses = self.get_current_masters_from_api(self.advertise_ip()) + master_addresses = self.configs.saved_data.get("current_masters") if master_addresses == '': Output.log_error_and_exit(Output.make_red("Error") + ": cannot retreive the masters.") @@ -1720,7 +1979,8 @@ class ControlScript(object): replication_id, target_addresses=target_master_addresses) if ret_code: - err_msg = self.get_xcluster_error_msg(err, replication_id=replication_id) + err_msg = self.get_xcluster_error_msg(err, command = "delete_replication", + replication_id=replication_id) if not err_msg: err_msg = "Couldn't delete the replication." Output.log_error_and_exit(Output.make_red("Error") + @@ -1752,6 +2012,79 @@ class ControlScript(object): Output.make_cyan("delete_universe_replication ")) Output.print_out(msg) + # Delete database from xcluster replication + def xcluster_remove_database_from_replication(self): + if not self.script.is_running(): + Output.log_error_and_exit(Output.make_red("ERROR") + ": No YugabyteDB node " + + "is running in the data_dir {}".format(self.configs.saved_data.get("data_dir"))) + + master_addresses = self.configs.saved_data.get("current_masters") + if master_addresses == '': + Output.log_error_and_exit(Output.make_red("Error") + ": cannot retreive the masters.") + + target_master_addresses = self.get_current_masters_from_api( + self.configs.temp_data.get("xcluster_target_addresses")) + + replication_id = self.configs.temp_data.get("xcluster_replication_id") + databases = self.configs.temp_data.get("xcluster_databases") + + not_deleted_dbs = dict() + + for database in databases.split(','): + err, ret_code = YBAdminProxy.delete_from_xcluster_replication(master_addresses, + replication_id, database, target_addresses=target_master_addresses) + if ret_code: + err_msg = self.get_xcluster_error_msg(err, + command = "remove_database_from_replication", + replication_id=replication_id) + if not err_msg: + err_msg = "couldn't delete database from xcluster replication." + + not_deleted_dbs[database] = err_msg + continue + else: + if target_master_addresses == '': + Output.log("Database {} deleted from replication ".format(database) + + "from source only.") + else: + Output.log("Database {} deleted from replication.".format(database)) + + status_details = [] + status_display_info = {} + + if len(not_deleted_dbs) == 0: + status_details = [ + (Output.make_yellow("Status"), "xCluster delete database(s) from " + + "replication success.") + ] + elif len(not_deleted_dbs) == len(databases): + status_details = [ + (Output.make_yellow("Status"), "xCluster delete database(s) from " + + "replication failed."), + (Output.make_yellow("Errors"), "Following are the errors for each database:") + ] + + for database, error in not_deleted_dbs.items(): + status_details.extend( + [(Output.make_yellow(""), "{}: {}".format(database, error))] + ) + Output.print_out(self.get_status_string_common(status_details, status_display_info)) + sys.exit(1) + else: + status_details = [ + (Output.make_yellow("Status"), "xCluster delete database(s) from " + + "replication partially succesful."), + (Output.make_yellow("Errors"), "Following are the errors for database(s) " + + "failed to get delete:") + ] + + for database, error in not_deleted_dbs.items(): + status_details.extend( + [(Output.make_yellow(""), "{}: {}".format(database, error))] + ) + + Output.print_out(self.get_status_string_common(status_details, status_display_info)) + # Configuring the primary cluster data placement policy def configure_data_placement(self): if not self.script.is_running(): @@ -5006,8 +5339,8 @@ class ControlScript(object): len(invalid_num_replica_locations) > 0 else None # Get Outbound xcluster replications' statuses - def get_outbound_xcluster_replication_statuses(self, xcluster_api_response, leader_master, - replication_id = None): + def get_outbound_xcluster_replication_statuses(self, xcluster_api_response, + replication_id = None): outbound_replication_groups = xcluster_api_response.get("outbound_replication_groups") if len(outbound_replication_groups) == 0: Output.log("No outbound replication found for target cluster.") @@ -5016,7 +5349,7 @@ class ControlScript(object): status_details = [] status_display_info = {} - replication_id_with_databases = self.get_databases_for_xcluster(leader_master) + replication_id_with_databases = self.get_databases_data_for_xcluster() if len(replication_id_with_databases) == 0: Output.log("Cannot retreive the databases for " + "replication_id {}.".format(replication_id)) @@ -5107,8 +5440,8 @@ class ControlScript(object): if len(xcluster_safe_time_statuses) == 0: Output.log("No inbound xcluster replication found. Set-up xcluster " + - "replication using `xcluster checkpoint` and `xcluster set_up`, " + - "then run `xcluster status` on the target node to get status of " + + "replication using `xcluster create_checkpoint` and `xcluster set_up`, " + + "then run `xcluster status` on the target node to get status of " + "replication.") return "" @@ -5747,104 +6080,71 @@ class ControlScript(object): return False - def get_databases_for_xcluster(self, masterIP, group_name = None): - if not masterIP: - Output.log("Empty masterIP passed to get_databases_for_xcluster") - return [] - master_addr = "{}:{}".format(masterIP, - self.configs.saved_data.get("master_webserver_port")) - try: - timeout = 30 - masterXClusterHttpPage = "http://{}/xcluster".format(master_addr) - Output.log("Trying to parse xcluster html page from {}".format(masterXClusterHttpPage) + - " (Timeout={})".format(timeout)) - response = urlopen(Request(masterXClusterHttpPage), timeout=timeout) - Output.log("Got the response from {}. ".format(masterXClusterHttpPage) + - "response: {}".format(response)) - - # Find the HTML code of Outbound Replication Groups - outbound_replication_group_reg = re.compile(r'(?ms)Outbound Replication ' + - r'Groups.*?

') - outbound_replication_groups_match = outbound_replication_group_reg.search( - response.read().decode('utf-8')) - if not outbound_replication_groups_match: - Output.log("No Outbound Replication Groups found. " + - "Was xcluster checkpoint initialised?") - return [] + def get_replication_groups_for_xcluster(self): + current_masters = self.configs.saved_data.get("current_masters") - outbound_replication_groups_html = outbound_replication_groups_match.group(0) + outbound_replication_groups = YBAdminProxy.get_source_xcluster_replication_ids( + current_masters) + if outbound_replication_groups == None: + Output.log_error_and_exit(Output.make_red("Error") + ": Couldn't retrieve Outbound " + + "Replication Groups.") - group_htmls = [] - # Find the HTML code of Group of the provided group name - if group_name: - group_regex = re.compile(r'(?ms)Group: {}.*?

'.format(group_name)) - group_match = group_regex.search(outbound_replication_groups_html) - if not group_match: - Output.log("No Outbound Replication Group with name(replication_id) " + - "{} was found. Was xcluster checkpoint ".format(group_name) + - "for this group initialised?") - return [] + Output.log("Found Outbound Replication Groups: {}".format(outbound_replication_groups)) - group_html = group_match.group(0) + return outbound_replication_groups - if group_html.count('Group') > 1: - group_regex = re.compile(r'(?ms)Group: {}.*?Group'.format(group_name)) - group_match = group_regex.search(outbound_replication_groups_html) - group_html = group_match.group(0) + def does_replication_id_exists(self, replication_id): + replication_groups = self.get_replication_groups_for_xcluster() + return replication_id in replication_groups - group_htmls.append(group_html) - else: - all_group_positions = [match.start() for match in re.finditer( - r'Group:', outbound_replication_groups_html)] + def get_databases_data_for_xcluster(self, group_name = None): + current_masters = self.configs.saved_data.get("current_masters") - # If no occurrences of "Group:" means no replication groups - if not all_group_positions: - return [] + # response format: + # [{ + # "replication_id": , + # "databases": [{ + # "name": , + # "id" : }] + # }, + # { + # "replication_id": , + # "databases": [{ + # "name": , + # "id" : }] + # },.....] + response = list() + + if group_name: + response.append(self.get_replication_group_info(group_name, current_masters)) + else: + replication_groups = self.get_replication_groups_for_xcluster() - # Add the position of the last character of the string - all_group_positions.append(len(outbound_replication_groups_html)) + for replication_group in replication_groups: + response.append(self.get_replication_group_info(replication_group, current_masters)) - # Iterate over the positions to split the string - for i in range(len(all_group_positions) - 1): - start = all_group_positions[i] - end = all_group_positions[i + 1] - group_htmls.append(outbound_replication_groups_html[start:end].strip()) + Output.log("Replication group infos: {}".format(response)) + return response - response = [] - for group_html in group_htmls: - replication_id_regex = re.compile(r'Group: \s*(.*?)\s*') - replication_id_match = replication_id_regex.search(group_html) - pattern = r'Namespace: (.*?).*?NamespaceId: (\w+).*?status' + def get_replication_group_info(self, group_name, current_masters): + databases_data = YBAdminProxy.get_source_xcluster_databases(current_masters, group_name) - # Find all matches in the HTML content - matches = re.findall(pattern, group_html, re.DOTALL) - group_info = {} - group_info["replication_id"] = replication_id_match.group(1) - group_info["databases"] = [] - for match in matches: - namespace_name, namespace_id = match - group_info["databases"].append({"name": namespace_name.strip(), - "id": namespace_id.strip()}) + rep_group_info = dict() + rep_group_info["replication_id"] = group_name + rep_group_info["databases"] = list() + for db_name, db_id in databases_data.items(): + rep_group_info["databases"].append({"name": db_name, + "id": db_id}) - response.append(group_info) + return rep_group_info - Output.log("replication_id with their databases info: {} from ".format(response) + - "{} http page".format(masterXClusterHttpPage)) + def does_database_exist_in_replication(self, database, group_name): + databases_data = self.get_databases_data_for_xcluster(group_name=group_name) - return response + rep_group_info = databases_data[0] - except HTTPError as http_err: - Output.log('HTTP error occurred while fetching current' + - 'masters from tserver: {}', http_err) - return [] - except URLError as url_err: - Output.log('URL error occurred while fetching current' + - 'masters from tserver: {}', url_err) - return [] - except Exception as err: - Output.log('Other error occurred while fetching current' + - 'masters from tserver: {}', err) - return [] + databases = [database_data.get("name") for database_data in rep_group_info["databases"]] + return database in databases def get_xcluster_info(self, masterIP): if not masterIP: @@ -5858,26 +6158,26 @@ class ControlScript(object): Output.log("Trying to parse xcluster info from {}".format(masterXClusterApi) + " (Timeout={})".format(timeout)) response = urlopen(Request(masterXClusterApi), timeout=timeout) - Output.log("Got the response from {}. ".format(masterXClusterApi) + - "response: {}".format(response)) jsonResponse = json.load(response) if not jsonResponse: Output.log("Empty response from {}.".format(masterXClusterApi)) return [] + Output.log("Got the response from {}. ".format(masterXClusterApi) + + "response: {}".format(jsonResponse)) return jsonResponse except HTTPError as http_err: Output.log('HTTP error occurred while fetching current' + - 'masters from tserver: {}', http_err) + 'masters from tserver: {}'.format(http_err)) return [] except URLError as url_err: Output.log('URL error occurred while fetching current' + - 'masters from tserver: {}', url_err) + 'masters from tserver: {}'.format(url_err)) return [] except Exception as err: Output.log('Other error occurred while fetching current' + - 'masters from tserver: {}', err) + 'masters from tserver: {}'.format(err)) return [] @@ -6400,8 +6700,9 @@ class ControlScript(object): def flags_not_provided(self, args, required_flags): flags_not_provided_list = list() + provided_flags = [provided_flag.split('=')[0] for provided_flag in sys.argv] for flag in required_flags: - if getattr(args, flag) is None: + if "--" + flag not in provided_flags: flags_not_provided_list.append(flag) return flags_not_provided_list @@ -6414,13 +6715,45 @@ class ControlScript(object): if self.configs.saved_data.get("database_password") is not None: self.setup_env_init.update_passwords(self.configs.saved_data.get("database_password")) - if args.parser == "checkpoint": + if args.parser == "create_checkpoint": + required_flags = ["replication_id", "databases"] + flags_not_provided_list = self.flags_not_provided(args, required_flags) + + if len(flags_not_provided_list) != 0: + err_msg = Output.make_red("Error") + ": The following flags are not provided. " +\ + "Please provide them while running `yugabyted xcluster create_checkpoint`." + for flag in flags_not_provided_list: + err_msg += "\n- " + flag + Output.print_out(err_msg) + sys.exit(1) + + ysql_proxy = YsqlProxy(self.advertise_ip(), self.configs.saved_data.get("ysql_port")) + databases = args.databases.split(",") + db_not_exist_list = list() + for db in databases: + if not ysql_proxy.db_exists(db): + db_not_exist_list.append(db) + + if len(db_not_exist_list): + if len(db_not_exist_list) == 1: + err_msg = "database {}".format(db_not_exist_list[0]) + else: + err_msg = "databases {}".format(','.join(db_not_exist_list)) + Output.log_error_and_exit(Output.make_red("Error") + ": " + err_msg + + " does not exists in this cluster. Please create " + + "the schema for the database to be replicated before running " + + "`yugabyted xcluster create_checkpoint`.") + + self.configs.temp_data["xcluster_replication_id"] = args.replication_id + self.configs.temp_data["xcluster_databases"] = args.databases + + if args.parser == "add_to_checkpoint": required_flags = ["replication_id", "databases"] flags_not_provided_list = self.flags_not_provided(args, required_flags) if len(flags_not_provided_list) != 0: - err_msg = Output.make_red("Error") + ": Following flags are not provided. " +\ - "Please provide them while running `yugabyted xcluster checkpoint`." + err_msg = Output.make_red("Error") + ": The following flags are not provided. " +\ + "Please provide them while running `yugabyted xcluster add_to_checkpoint`." for flag in flags_not_provided_list: err_msg += "\n- " + flag Output.print_out(err_msg) @@ -6429,9 +6762,17 @@ class ControlScript(object): ysql_proxy = YsqlProxy(self.advertise_ip(), self.configs.saved_data.get("ysql_port")) databases = args.databases.split(",") db_not_exist_list = list() + db_exist_in_replication = list() for db in databases: if not ysql_proxy.db_exists(db): db_not_exist_list.append(db) + if self.does_database_exist_in_replication(db, args.replication_id): + db_exist_in_replication.append(db) + + if not self.does_replication_id_exists(args.replication_id): + Output.log_error_and_exit(Output.make_red("Error") + ": --replication_id" + + " {} provided is not present. ".format(args.replication_id) + + "Was the xcluster replication with this replication_id set-up?") if len(db_not_exist_list): if len(db_not_exist_list) == 1: @@ -6441,23 +6782,99 @@ class ControlScript(object): Output.log_error_and_exit(Output.make_red("Error") + ": " + err_msg + " does not exists in this cluster. Please create " + "the schema for the database to be replicated before running " + - "`yugabyted xcluster checkpoint`.") + "`yugabyted xcluster create_checkpoint`.") + + if len(db_exist_in_replication): + if len(db_exist_in_replication) == 1: + err_msg = "database {}".format(db_exist_in_replication[0]) + else: + err_msg = "databases {}".format(','.join(db_exist_in_replication)) + Output.log_error_and_exit(Output.make_red("Error") + ": " + err_msg + + " already exists in the replication {}. ".format(args.replication_id) + + "Cannot add these again.") self.configs.temp_data["xcluster_replication_id"] = args.replication_id self.configs.temp_data["xcluster_databases"] = args.databases if args.parser == "set_up": - required_flags = ["replication_id", "target_address"] + required_flags = ["replication_id", "target_address", "bootstrap_done"] flags_not_provided_list = self.flags_not_provided(args, required_flags) if len(flags_not_provided_list) != 0: - err_msg = Output.make_red("Error") + ": Following flags are not provided. " +\ + err_msg = Output.make_red("Error") + ": The following flags are not provided. " +\ "Please provide them while running `yugabyted xcluster set_up`." for flag in flags_not_provided_list: err_msg += "\n- " + flag Output.print_out(err_msg) sys.exit(1) + if not self.does_replication_id_exists(args.replication_id): + Output.log_error_and_exit(Output.make_red("Error") + ": --replication_id" + + " {} provided is not present. ".format(args.replication_id) + + "Was the `yugabyted xcluster create_checkpoint` command run?") + + if self.validate_hostname_ip(args.target_address): + target_master_addresses = self.get_current_masters_from_api(args.target_address) + if target_master_addresses == '': + Output.log_error_and_exit(Output.make_red("Error") + ": cannot reach node at " + + "provided target IP {}. Is target cluster running?".format(args.target_address)) + else: + self.configs.temp_data["xcluster_target_addresses"] = target_master_addresses + else: + Output.log_error_and_exit(Output.make_red("Error") + ": --target_address" + + " {} provided is not a valid address. ".format(args.target_address) + + "Please try again with a valid IPV4, IPV6 or DNS.") + + self.configs.temp_data["xcluster_replication_id"] = args.replication_id + self.configs.temp_data["xcluster_bootstrap_done"] = args.bootstrap_done + + if args.parser == "add_to_replication": + required_flags = ["replication_id", "target_address", "bootstrap_done", "databases"] + flags_not_provided_list = self.flags_not_provided(args, required_flags) + + if len(flags_not_provided_list) != 0: + err_msg = Output.make_red("Error") + ": The following flags are not provided. " +\ + "Please provide them while running `yugabyted xcluster add_to_replication`." + for flag in flags_not_provided_list: + err_msg += "\n- " + flag + Output.print_out(err_msg) + sys.exit(1) + + if not self.does_replication_id_exists(args.replication_id): + Output.log_error_and_exit(Output.make_red("Error") + ": --replication_id" + + " {} provided is not present. ".format(args.replication_id) + + "Was the xcluster replication with this replication_id set-up?") + + ysql_proxy = YsqlProxy(self.advertise_ip(), self.configs.saved_data.get("ysql_port")) + databases = args.databases.split(",") + db_not_exist_list = list() + db_not_exist_in_replication = list() + for db in databases: + if not ysql_proxy.db_exists(db): + db_not_exist_list.append(db) + if not self.does_database_exist_in_replication(db, args.replication_id): + db_not_exist_in_replication.append(db) + + if len(db_not_exist_list): + if len(db_not_exist_list) == 1: + err_msg = "database {}".format(db_not_exist_list[0]) + else: + err_msg = "databases {}".format(','.join(db_not_exist_list)) + Output.log_error_and_exit(Output.make_red("Error") + ": " + err_msg + + " does not exists in this cluster. Please create " + + "the schema for the database(s) to be added to the replication and run " + + "`yugabyted xcluster add_to_checkpoint` to checkpoint the database(s) before " + + "running `yugabyted xcluster add_to_replication`") + + if len(db_not_exist_in_replication): + if len(db_not_exist_in_replication) == 1: + err_msg = "database {}".format(db_not_exist_in_replication[0]) + else: + err_msg = "databases {}".format(','.join(db_not_exist_in_replication)) + Output.log_error_and_exit(Output.make_red("Error") + ": " + err_msg + + " does not exist in the replication {}. ".format(args.replication_id) + + "Was `yuagbyted xcluster add_to_checkpoint` run for this database(s)?") + if self.validate_hostname_ip(args.target_address): target_master_addresses = self.get_current_masters_from_api(args.target_address) if target_master_addresses == '': @@ -6472,14 +6889,15 @@ class ControlScript(object): self.configs.temp_data["xcluster_replication_id"] = args.replication_id self.configs.temp_data["xcluster_bootstrap_done"] = args.bootstrap_done + self.configs.temp_data["xcluster_databases"] = args.databases - if args.parser == "delete" and sys.argv[1] == "xcluster": + if args.parser == "delete_replication": required_flags = ["replication_id", "target_address"] flags_not_provided_list = self.flags_not_provided(args, required_flags) if len(flags_not_provided_list) != 0: - err_msg = Output.make_red("Error") + ": Following flags are not provided. " +\ - "Please provide them while running `yugabyted xcluster delete`." + err_msg = Output.make_red("Error") + ": The following flags are not provided. " +\ + "Please provide them while running `yugabyted xcluster delete_replication`." for flag in flags_not_provided_list: err_msg += "\n- " + flag Output.print_out(err_msg) @@ -6492,6 +6910,39 @@ class ControlScript(object): " {} provided is not a valid address. ".format(args.target_address) + "Please try again with a valid IPV4, IPV6 or DNS.") + if not self.does_replication_id_exists(args.replication_id): + Output.log_error_and_exit(Output.make_red("Error") + ": --replication_id" + + " {} provided is not present. ".format(args.replication_id) + + "Was the xcluster replication with this replication_id set-up?") + + self.configs.temp_data["xcluster_replication_id"] = args.replication_id + + if args.parser == "remove_database_from_replication": + required_flags = ["replication_id", "target_address", "databases"] + flags_not_provided_list = self.flags_not_provided(args, required_flags) + + if len(flags_not_provided_list) != 0: + err_msg = Output.make_red("Error") + ": The following flags are not provided. " +\ + "Please provide them while running " +\ + "`yugabyted xcluster remove_database_from_replication`." + for flag in flags_not_provided_list: + err_msg += "\n- " + flag + Output.print_out(err_msg) + sys.exit(1) + + if self.validate_hostname_ip(args.target_address): + self.configs.temp_data["xcluster_target_addresses"] = args.target_address + else: + Output.log_error_and_exit(Output.make_red("Error") + ": --target_address" + + " {} provided is not a valid address. ".format(args.target_address) + + "Please try again with a valid IPV4, IPV6 or DNS.") + + if not self.does_replication_id_exists(args.replication_id): + Output.log_error_and_exit(Output.make_red("Error") + ": --replication_id" + + " {} provided is not present. ".format(args.replication_id) + + "Was the xcluster replication with this replication_id set-up?") + + self.configs.temp_data["xcluster_databases"] = args.databases self.configs.temp_data["xcluster_replication_id"] = args.replication_id if args.parser == "status" and sys.argv[1] == "xcluster": @@ -7568,14 +8019,23 @@ class ControlScript(object): xcluster_subparsers = xcluster_parser.add_subparsers(dest="parser", metavar="") xcluster_subparsers.required = True for cmd, description in ( - ("checkpoint", "Initialise a xcluster replication. " + + ("create_checkpoint", "Initialise a xcluster replication. " + + "Needs to be run from source cluster."), + ("add_to_checkpoint", "Checkpoint database which is to be added to an existing " + + "xcluster replication. " + "Needs to be run from source cluster."), ("set_up", "Setup xcluster replication. " + "Needs to be run from source cluster."), + ("add_to_replication", "Add database to xcluster replication. " + + "Needs to be run from source cluster."), ("status", "Displays the status of any replications running " + "from or to this cluster." + "Can be run from either source or target cluster."), - ("delete", "Deletes the specified replication. Needs to run from source cluster"),): + ("delete_replication", "Deletes the specified replication. " + + "Needs to run from source cluster"), + ("remove_database_from_replication", "Deletes the specified database " + + "from the replication." + + " Needs to run from source cluster"),): parser_name = "xcluster_" + cmd example = "" if EXAMPLE.get(parser_name): @@ -7586,8 +8046,8 @@ class ControlScript(object): subparser.set_defaults(func=func) all_parsers[parser_name] = subparser - # Flags for sub-command: checkpoint - for cmd in ("xcluster_checkpoint",): + # Flags for sub-command: create_checkpoint + for cmd in ("xcluster_create_checkpoint",): cur_parser = all_parsers[cmd] cur_parser.add_argument( "--replication_id", help="Unique string to assign to a replication.", @@ -7596,6 +8056,17 @@ class ControlScript(object): "--databases", help="Name of database to be replicated.", metavar="") + # Flags for sub-command: create_checkpoint + for cmd in ("xcluster_add_to_checkpoint",): + cur_parser = all_parsers[cmd] + cur_parser.add_argument( + "--replication_id", help="Replication id of the xcluster replication to which " + + "database is to be added.", + metavar="") + cur_parser.add_argument( + "--databases", help="Name of databases to be added.", + metavar="") + # Flags for sub-command: set_up for cmd in ("xcluster_set_up",): cur_parser = all_parsers[cmd] @@ -7605,11 +8076,29 @@ class ControlScript(object): metavar="") cur_parser.add_argument( "--replication_id", - help="Unique string to assigned to a replication during checkpoint.", + help="Unique string to assigned to a replication during create_checkpoint.", + metavar="") + cur_parser.add_argument( + "--bootstrap_done", + help="Use this flag to indicate bootstrapping has been completed. Mandatory flag", + action="store_true") + + for cmd in ("xcluster_add_to_replication",): + cur_parser = all_parsers[cmd] + cur_parser.add_argument( + "--target_address", + help="IP of any target cluster node.", + metavar="") + cur_parser.add_argument( + "--replication_id", + help="Replication id of the xcluster replication to which database is to be added.", + metavar="") + cur_parser.add_argument( + "--databases", help="Name of databases to be added.", metavar="") cur_parser.add_argument( "--bootstrap_done", - help="Use this flag if bootstrap was required and has been completed.", + help="Use this flag to indicate bootstrapping has been completed. Mandatory flag", action="store_true") # Flags for sub-command: set_up @@ -7620,8 +8109,8 @@ class ControlScript(object): help="Replication id of the xcluster replication whose status is to be displayed.", metavar="") - # Flags for sub-command: delete - for cmd in ("xcluster_delete",): + # Flags for sub-command: delete_replication + for cmd in ("xcluster_delete_replication",): cur_parser = all_parsers[cmd] cur_parser.add_argument( "--target_address", @@ -7629,7 +8118,22 @@ class ControlScript(object): metavar="") cur_parser.add_argument( "--replication_id", - help="Unique string to assigned to a replication during checkpoint.", + help="Unique string to assigned to a replication during create_checkpoint.", + metavar="") + + # Flags for sub-command: remove_database_from_replication + for cmd in ("xcluster_remove_database_from_replication",): + cur_parser = all_parsers[cmd] + cur_parser.add_argument( + "--target_address", + help="IP of any target cluster node.", + metavar="") + cur_parser.add_argument( + "--replication_id", + help="Unique string to assigned to a replication during create_checkpoint.", + metavar="") + cur_parser.add_argument( + "--databases", help="Name of databases to be removed.", metavar="") # Commands that can alter configuration file. @@ -8778,12 +9282,50 @@ class YBAdminProxy(object): return (ret_code == 0) @staticmethod - def checkpoint_xcluster(master_addrs, replication_id, databases, timeout=10): + def create_checkpoint_xcluster(master_addrs, replication_id, databases, timeout=10): cmd = YBAdminProxy.cmd_args + ["-master_addresses", master_addrs, "create_xcluster_checkpoint", replication_id, databases] out, err, ret_code = run_process(cmd, timeout=timeout, log_cmd=True) return (out, err, ret_code) + @staticmethod + def add_to_checkpoint_xcluster(master_addrs, replication_id, database, timeout=10): + cmd = YBAdminProxy.cmd_args + ["-master_addresses", master_addrs, + "add_namespace_to_xcluster_checkpoint", + replication_id, database] + out, err, ret_code = run_process(cmd, timeout=timeout, log_cmd=True) + return (out, err, ret_code) + + @staticmethod + def get_source_xcluster_replication_ids(master_addrs, timeout=10): + cmd = YBAdminProxy.cmd_args + ["-master_addresses", master_addrs, + "list_xcluster_outbound_replication_groups"] + out, _, ret_code = run_process(cmd, timeout=timeout, log_cmd=True) + if ret_code: + return None + + matches = re.findall(r'\[(.*?)\]', out) + replication_ids = matches[0].split(',') + + return replication_ids + + @staticmethod + def get_source_xcluster_databases(master_addrs, replication_id, timeout=10): + cmd = YBAdminProxy.cmd_args + ["-master_addresses", master_addrs, + "get_xcluster_outbound_replication_group_info", + replication_id] + out, _, ret_code = run_process(cmd, timeout=timeout, log_cmd=True) + if ret_code: + return None + + databases_with_ids = re.findall(r'Namespace ID:\s*(\w+)\s*Namespace name:\s*(\w+)', out) + # databases_data is a map with : as the format + databases_data = dict() + for database_id, database in databases_with_ids: + databases_data[database] = database_id + + return (databases_data) + @staticmethod def xcluster_is_bootstrap_required(master_addrs, replication_id, databases, timeout=10): cmd = YBAdminProxy.cmd_args + ["-master_addresses", master_addrs, @@ -8799,6 +9341,15 @@ class YBAdminProxy(object): out, err, ret_code = run_process(cmd, timeout=timeout, log_cmd=True) return (out, err, ret_code) + @staticmethod + def add_to_replication_xcluster(master_addrs, replication_id, database, target_addresses, + timeout=10): + cmd = YBAdminProxy.cmd_args + ["-master_addresses", master_addrs, + "add_namespace_to_xcluster_replication", replication_id, + database, target_addresses] + out, err, ret_code = run_process(cmd, timeout=timeout, log_cmd=True) + return (out, err, ret_code) + @staticmethod def get_xcluster_safe_time(master_addrs, timeout=10): cmd = YBAdminProxy.cmd_args + ["-master_addresses", master_addrs, @@ -8823,6 +9374,21 @@ class YBAdminProxy(object): _, err, ret_code = run_process(cmd, timeout=timeout, log_cmd=True) return (err, ret_code) + @staticmethod + def delete_from_xcluster_replication(master_addrs, replication_id, database, + target_addresses = '', timeout=10): + if target_addresses == '': + cmd = YBAdminProxy.cmd_args + ["-master_addresses", master_addrs, + "remove_namespace_from_xcluster_replication", + replication_id, database] + else: + cmd = YBAdminProxy.cmd_args + ["-master_addresses", master_addrs, + "remove_namespace_from_xcluster_replication", + replication_id, database, target_addresses] + + _, err, ret_code = run_process(cmd, timeout=timeout, log_cmd=True) + return (err, ret_code) + # Passthrough method for all the yb-admin commands # @staticmethod # def call_yb_admin_command(master_addrss, command, timeout=10): @@ -8886,7 +9452,7 @@ class YsqlProxy(object): def db_exists(self, db): cmd = self.cmd_with_password + ["-q", "-c", "\\t", "-c", "select datname from pg_catalog.pg_database where datname='{}'".format(db)] - return run_process_checked(cmd=cmd, env_vars=self.env).strip() == db + return db in run_process_checked(cmd=cmd, env_vars=self.env).strip() # Creates specified db. def create_db(self, db):