Skip to content

Commit

Permalink
[PLAT-15726][PLAT-15735] Avoid use of multi-statement queries in node…
Browse files Browse the repository at this point in the history
… health script + recheck in DDL atomicity check

Summary:
Recent DB version do not allow multi-statement queries execution via ysqlsh -c. We have to modify out erite read test to use single query.
Also, in case of ongoing DDL operations we can end up in a race conditions, which trigger false positive errors.
Trying to address that by re-checking particular table in case errors were fould previously.

Test Plan:
Create universe with build 2.20.0.0_b50
Make sure DDL atomicity check passes.
FOrce DDL atomicity issues.
Make sure DDL atomocity check fails with proper error messages.
Make sure write read test metric is exported properly.

Reviewers: vbansal

Reviewed By: vbansal

Subscribers: yugaware

Differential Revision: https://phorge.dev.yugabyte.com/D39550
  • Loading branch information
anmalysh-yb committed Oct 31, 2024
1 parent eec6d50 commit da7ba1b
Showing 1 changed file with 46 additions and 26 deletions.
72 changes: 46 additions & 26 deletions managed/src/main/resources/health/node_health.py.template
Original file line number Diff line number Diff line change
Expand Up @@ -1497,17 +1497,16 @@ class NodeChecker():

key = str(self.tserver_index * 100 + 1)
output = []
statement = "insert into write_read_test values ({}) on conflict do nothing; \
select from write_read_test where id = {}; \
delete from write_read_test where id = {};".format(key, key, key)
statement = "insert into write_read_test values ({}) on conflict (id) \
do update set id = {} returning id;".format(key, key)
cmd = "{} -c \"{}\"".format(ysqlsh_cmd, statement)
output = self._check_output(cmd).strip()

if 'relation "write_read_test" does not exist' in output:
# Old system without read write table
# It's metric-only check, hence we do set has_error=False to avoid retries
return e.fill_and_return_entry(["Test table does not exist"], has_error=False)
if 'DELETE 1' in output:
if 'INSERT 0 1' in output:
metric.add_value(1)
return e.fill_and_return_entry([], has_error=False, metrics=[metric])

Expand Down Expand Up @@ -1583,22 +1582,35 @@ class NodeChecker():

def check_ddl_atomicity(self):
logging.info("Checking DDL atomicity on node {}".format(self.node))

result, recheck_table_uuids = self.check_ddl_atomicity_internal([])

if not recheck_table_uuids:
return result

result, recheck_table_uuids = self.check_ddl_atomicity_internal(recheck_table_uuids)
return result


def check_ddl_atomicity_internal(self, recheck_table_uuids):
logging.info("Checking DDL atomicity on node {}".format(self.node))
e = self._new_entry("DDL atomicity")
metric = Metric.from_definition(YB_DDL_ATOMICITY_CHECK)
tables_with_errors = []

tserver_pid = self.get_process_pid_by_name(TSERVER)
if tserver_pid is None:
metric.add_value(0)
return e.fill_and_return_entry(["TServer is not running on this node"],
return (e.fill_and_return_entry(["TServer is not running on this node"],
has_error=True,
metrics=[metric])
metrics=[metric]), [])

try:
ysqlsh_cmd = self.create_ysqlsh_command(
"", True if self.enable_connection_pooling else False)
except RuntimeError as re:
metric.add_value(0)
return e.fill_and_return_entry([str(re)], has_error=True, metrics=[metric])
return (e.fill_and_return_entry([str(re)], has_error=True, metrics=[metric]), [])

errors = []
try:
Expand All @@ -1607,19 +1619,19 @@ class NodeChecker():
"{}/api/v1/tables".format(self.master_leader_url))
if not tables_response:
metric.add_value(0)
return e.fill_and_return_entry(
return (e.fill_and_return_entry(
['Master Leader HTTP endpoint is not running'],
has_error=True,
metrics=[metric])
metrics=[metric]), [])
try:
tables_output = json.loads(tables_response)
except Exception as ex:
logging.warning("Tables HTTP API response is not a valid json: %s", tables_response)
metric.add_value(0)
return e.fill_and_return_entry(
return (e.fill_and_return_entry(
['Tables HTTP API response is not a valid json'],
has_error=True,
metrics=[metric])
metrics=[metric]), [])
table_data_json = tables_output["user"]
table_data_json += tables_output["index"]

Expand Down Expand Up @@ -1654,20 +1666,20 @@ class NodeChecker():

if check_for_errors(pg_class_output):
metric.add_value(0)
return e.fill_and_return_entry(
return (e.fill_and_return_entry(
["Failed to retrieve pg_class info: {}".format(pg_class_output)],
has_error=True,
metrics=[metric])
metrics=[metric]), [])
try:
pg_class_json = json.loads(pg_class_output)
except Exception as ex:
logging.warning("pg_class query returned invalid json: %s",
pg_class_output)
metric.add_value(0)
return e.fill_and_return_entry(
return (e.fill_and_return_entry(
['pg_class query returned invalid json'],
has_error=True,
metrics=[metric])
metrics=[metric]), [])
pg_class_oid_tableinfo_dict = {}
# Use relfilenode if it exists (as the table may be rewritten)
for table in pg_class_json:
Expand All @@ -1684,32 +1696,37 @@ class NodeChecker():

if check_for_errors(pg_attribute_output):
metric.add_value(0)
return e.fill_and_return_entry(
return (e.fill_and_return_entry(
["Failed to retrieve pg_attribute info: {}".format(pg_attribute_output)],
has_error=True,
metrics=[metric])
metrics=[metric]), [])

try:
pg_attribute_json = json.loads(pg_attribute_output)
except Exception as ex:
logging.warning("pg_attribute query returned invalid json: %s",
pg_attribute_output)
metric.add_value(0)
return e.fill_and_return_entry(
return (e.fill_and_return_entry(
['pg_attribute query returned invalid json'],
has_error=True,
metrics=[metric])
metrics=[metric]), [])
pg_attribute_attrelid_attnames_dict = defaultdict(list)
for attribute in pg_attribute_json:
(pg_attribute_attrelid_attnames_dict[attribute['attrelid']]
.append(attribute['attname']))

# Iterate through each table
for tablename, pg_oid, yb_pg_table_oid, tableid in tables:
if recheck_table_uuids and tableid not in recheck_table_uuids:
# This is a recheck in case DDL happened during the previous check.
# Just test particular tables.
continue
# Check if the table exists in pg_class
if yb_pg_table_oid not in pg_class_oid_tableinfo_dict:
# Note: on versions older than 2024.1, the oid in this log
# will refer to the relfilenode for materialized views.
tables_with_errors.append(tableid)
errors.append(("Table {} with oid {} and uuid {} does not exist in "
"database {} - ORPHANED TABLE NEEDS TO BE DROPPED")
.format(tablename, pg_oid, tableid, dbname))
Expand All @@ -1722,6 +1739,7 @@ class NodeChecker():
pg_oid = pg_class_entry['oid']

if tablename != pg_class_entry['relname']:
tables_with_errors.append(tableid)
errors.append(("Table {} with oid {} and uuid {} exists in {} but has a "
"mismatched table name - TABLE NAME NEEDS TO BE FIXED")
.format(tablename, pg_oid, tableid, dbname))
Expand All @@ -1732,21 +1750,21 @@ class NodeChecker():
"{}/api/v1/table?id={}".format(self.master_leader_url, tableid))
if not table_schema_response:
metric.add_value(0)
return e.fill_and_return_entry(
return (e.fill_and_return_entry(
['Master Leader HTTP endpoint is not running'],
has_error=True,
metrics=[metric])
metrics=[metric]), [])

try:
table_schema_json = json.loads(table_schema_response)
except Exception as ex:
logging.warning("Table HTTP API response is not a valid json: %s",
table_schema_response)
metric.add_value(0)
return e.fill_and_return_entry(
return (e.fill_and_return_entry(
['Table HTTP API response is not a valid json'],
has_error=True,
metrics=[metric])
metrics=[metric]), [])
columns = [html.unescape(
column['column']) for column in table_schema_json["columns"]]
# Check if each column exists in pg_attribute
Expand All @@ -1755,17 +1773,18 @@ class NodeChecker():
or column == "ybidxbasectid"):
continue
if column not in pg_attribute_attrelid_attnames_dict[pg_oid]:
tables_with_errors.append(tableid)
errors.append(("Column {} does not exist in table {} in database {} - "
"ORPHANED COLUMN NEEDS TO BE DROPPED")
.format(column, tablename, dbname))
continue
except Exception as ex:
logging.exception('Got exception on while performing DDL Atomicity check')
metric.add_value(0)
return e.fill_and_return_entry(
return (e.fill_and_return_entry(
["Unexpected error occurred"],
has_error=True,
metrics=[metric])
metrics=[metric]), [])

has_errors = len(errors) > 0
if has_errors:
Expand All @@ -1776,7 +1795,8 @@ class NodeChecker():
else:
msgs = ["No errors found"]
metric.add_value(0 if has_errors else 1)
return e.fill_and_return_entry(msgs, has_error=has_errors, metrics=[metric])
return (e.fill_and_return_entry(msgs, has_error=has_errors, metrics=[metric]),
tables_with_errors)

def check_openssl_availability(self):
cmd = "which openssl &>/dev/null; echo $?"
Expand Down

0 comments on commit da7ba1b

Please sign in to comment.