diff --git a/sambacc/commands/ctdb.py b/sambacc/commands/ctdb.py index a3847c1..abbfe78 100644 --- a/sambacc/commands/ctdb.py +++ b/sambacc/commands/ctdb.py @@ -17,6 +17,7 @@ # import argparse +import contextlib import logging import os import socket @@ -28,7 +29,7 @@ from sambacc import samba_cmds from sambacc.simple_waiter import Sleeper, Waiter -from .cli import best_waiter, commands, Context, Fail +from .cli import best_leader_locator, best_waiter, commands, Context, Fail _logger = logging.getLogger(__name__) @@ -302,37 +303,66 @@ def ctdb_set_node(ctx: Context) -> None: @commands.command(name="ctdb-manage-nodes", arg_func=_ctdb_general_node_args) def ctdb_manage_nodes(ctx: Context) -> None: - """Run a long lived procees to monitor the node state file for new nodes. - When a new node is found, if the current node is in the correct state, this - node will add it to CTDB. + """Run a long lived process to manage the cluster metadata. It can add new + nodes. When a new node is found, if the current node is in the correct + state, this node will add it to CTDB. """ _ctdb_ok() np = NodeParams(ctx) expected_pnn = np.node_number or 0 waiter = np.cluster_meta_waiter() - errors = 0 + limiter = ErrorLimiter("ctdb_manage_nodes", 10, pause_func=waiter.wait) while True: - try: - ctdb.monitor_cluster_meta_updates( + with limiter.catch(): + ctdb.manage_cluster_meta_updates( cmeta=np.cluster_meta(), pnn=expected_pnn, real_path=np.persistent_path, pause_func=waiter.wait, ) - errors = 0 - except KeyboardInterrupt: - raise - except Exception as err: - _logger.error( - f"error during manage_nodes: {err}, count={errors}", - exc_info=True, + + +def _ctdb_monitor_nodes_args(parser: argparse.ArgumentParser) -> None: + _ctdb_must_have_node_args(parser) + parser.add_argument( + "--reload", + choices=("leader", "never", "all"), + default="leader", + help="Specify which nodes can command CTDB to reload nodes", + ) + + +@commands.command(name="ctdb-monitor-nodes", arg_func=_ctdb_monitor_nodes_args) +def ctdb_monitor_nodes(ctx: Context) -> None: + """Run a long lived process to monitor the cluster metadata. + Unlike ctdb_manage_nodes this function assumes that the node state + file is externally managed and primarily exists to reflect any changes + to the cluster meta into CTDB. + """ + _ctdb_ok() + np = NodeParams(ctx) + waiter = np.cluster_meta_waiter() + leader_locator = None + if ctx.cli.reload == "leader": + leader_locator = best_leader_locator(ctx.instance_config) + reload_all = ctx.cli.reload == "all" + nodes_file_path = np.persistent_path if ctx.cli.write_nodes else None + + _logger.info("monitoring cluster meta changes") + _logger.debug( + "reload_all=%s leader_locator=%r", reload_all, leader_locator + ) + limiter = ErrorLimiter("ctdb_monitor_nodes", 10, pause_func=waiter.wait) + while True: + with limiter.catch(): + ctdb.monitor_cluster_meta_changes( + cmeta=np.cluster_meta(), + pause_func=waiter.wait, + nodes_file_path=nodes_file_path, + leader_locator=leader_locator, + reload_all=reload_all, ) - errors += 1 - if errors > 10: - _logger.error(f"too many retries ({errors}). giving up") - raise - waiter.wait() def _ctdb_must_have_node_args(parser: argparse.ArgumentParser) -> None: @@ -340,7 +370,7 @@ def _ctdb_must_have_node_args(parser: argparse.ArgumentParser) -> None: parser.add_argument( "--write-nodes", action="store_true", - help="Specify node by IP", + help="Write ctdb nodes file based on cluster meta contents", ) @@ -411,3 +441,43 @@ def ctdb_rados_mutex(ctx: Context) -> None: cmd = cmd["-n", namespace] _logger.debug("executing command: %r", cmd) samba_cmds.execute(cmd) # replaces process + + +class ErrorLimiter: + def __init__( + self, + name: str, + limit: int, + *, + pause_func: typing.Optional[typing.Callable] = None, + ) -> None: + self.name = name + self.limit = limit + self.errors = 0 + self.pause_func = pause_func + + def post_catch(self): + if self.pause_func is not None: + self.pause_func() + + @contextlib.contextmanager + def catch(self) -> typing.Iterator[None]: + try: + _logger.debug( + "error limiter proceeding: %s: errors=%r", + self.name, + self.errors, + ) + yield + except KeyboardInterrupt: + raise + except Exception as err: + _logger.error( + f"error during {self.name}: {err}, count={self.errors}", + exc_info=True, + ) + self.errors += 1 + if self.errors > self.limit: + _logger.error(f"too many retries ({self.errors}). giving up") + raise + self.post_catch() diff --git a/sambacc/ctdb.py b/sambacc/ctdb.py index bfb2a0a..095033a 100644 --- a/sambacc/ctdb.py +++ b/sambacc/ctdb.py @@ -23,6 +23,7 @@ import typing from sambacc import config +from sambacc import leader from sambacc import samba_cmds from sambacc.jfile import ClusterMetaJSONFile from sambacc.netcmd_loader import template_config @@ -338,7 +339,7 @@ def manage_nodes( pause_func: typing.Callable, ) -> None: """Monitor nodes json for updates, reflecting those changes into ctdb.""" - monitor_cluster_meta_updates( + manage_cluster_meta_updates( ClusterMetaJSONFile(nodes_json), pnn, real_path, @@ -346,7 +347,7 @@ def manage_nodes( ) -def monitor_cluster_meta_updates( +def manage_cluster_meta_updates( cmeta: ClusterMeta, pnn: int, real_path: str, @@ -464,10 +465,7 @@ def _node_update(cmeta: ClusterMeta, real_path: str) -> bool: new_ctdb_nodes.append(expected_line) else: new_ctdb_nodes[pnn] = expected_line - with open(real_path, "w") as nffh: - write_nodes_file(nffh, new_ctdb_nodes) - nffh.flush() - os.fsync(nffh) + _save_nodes(real_path, new_ctdb_nodes) _logger.info("running: ctdb reloadnodes") subprocess.check_call(list(samba_cmds.ctdb["reloadnodes"])) for entry in need_reload: @@ -490,17 +488,107 @@ def cluster_meta_to_nodes(cmeta: ClusterMeta, real_path: str) -> None: json_data = cmo.load() nodes = json_data.get("nodes", []) _logger.info("Found node metadata: %r", nodes) - pnn_max = max(n["pnn"] for n in nodes) + 1 # pnn is zero indexed - ctdb_nodes: list[str] = [""] * pnn_max - for entry in nodes: - pnn = entry["pnn"] - expected_line = _entry_to_node(ctdb_nodes, entry) - ctdb_nodes[pnn] = expected_line + ctdb_nodes = _cluster_meta_to_ctdb_nodes(nodes) _logger.info("Will write nodes: %s", ctdb_nodes) - with open(real_path, "w") as nffh: - write_nodes_file(nffh, ctdb_nodes) - nffh.flush() - os.fsync(nffh) + _save_nodes(real_path, ctdb_nodes) + + +def _cluster_meta_to_ctdb_nodes(nodes: list[dict]) -> list[str]: + pnn_max = max(n["pnn"] for n in nodes) + 1 # pnn is zero indexed + ctdb_nodes: list[str] = [""] * pnn_max + for entry in nodes: + pnn = entry["pnn"] + # overwrite the pnn indexed entry with expected value + ctdb_nodes[pnn] = _entry_to_node(ctdb_nodes, entry) + return ctdb_nodes + + +def _save_nodes(path: str, ctdb_nodes: list[str]) -> None: + with open(path, "w") as nffh: + write_nodes_file(nffh, ctdb_nodes) + nffh.flush() + os.fsync(nffh) + + +def monitor_cluster_meta_changes( + cmeta: ClusterMeta, + pause_func: typing.Callable, + *, + nodes_file_path: typing.Optional[str] = None, + reload_all: bool = False, + leader_locator: typing.Optional[leader.LeaderLocator] = None, +) -> None: + """Monitor cluster meta for changes, reflecting those changes into ctdb. + + Unlike manage_cluster_meta_updates this function never changes the + contents of the nodes list in the cluster meta and takes those values + as a given, assuming some external agent has the correct global view of + the cluster and is updating it correctly. This function exists to + translate that content into something ctdb can understand. + """ + prev_meta: dict[str, typing.Any] = {} + if nodes_file_path: + prev_nodes = read_ctdb_nodes(nodes_file_path) + else: + with cmeta.open(locked=True) as cmo: + meta1 = cmo.load() + prev_nodes = _cluster_meta_to_ctdb_nodes(meta1.get("nodes", [])) + _logger.debug("initial cluster meta content: %r", prev_meta) + _logger.debug("initial nodes content: %r", prev_nodes) + while True: + pause_func() + with cmeta.open(locked=True) as cmo: + curr_meta = cmo.load() + if curr_meta == prev_meta: + _logger.debug("cluster meta content unchanged: %r", curr_meta) + continue + _logger.info("cluster meta content changed") + _logger.debug( + "cluster meta: previous=%r current=%r", prev_meta, curr_meta + ) + prev_meta = curr_meta + + # maybe some other metadata changed? + expected_nodes = _cluster_meta_to_ctdb_nodes( + curr_meta.get("nodes", []) + ) + if prev_nodes == expected_nodes: + _logger.debug("ctdb nodes list unchanged: %r", expected_nodes) + continue + _logger.info("ctdb nodes list changed") + _logger.debug( + "nodes list: previous=%r current=%r", prev_nodes, expected_nodes + ) + prev_nodes = expected_nodes + + if nodes_file_path: + _logger.info("updating nodes file: %s", nodes_file_path) + _save_nodes(nodes_file_path, expected_nodes) + _maybe_reload_nodes(leader_locator, reload_all=reload_all) + + +def _maybe_reload_nodes( + leader_locator: typing.Optional[leader.LeaderLocator] = None, + reload_all: bool = False, +) -> None: + """Issue a reloadnodes command if leader_locator is available and + node is leader or reload_all is true. + """ + if reload_all: + _logger.info("running: ctdb reloadnodes") + subprocess.check_call(list(samba_cmds.ctdb["reloadnodes"])) + return + if leader_locator is None: + _logger.warning("no leader locator: not calling reloadnodes") + return + # use the leader locator to only issue the reloadnodes command once + # for a change instead of all the nodes "spamming" the cluster + with leader_locator as ll: + if ll.is_leader(): + _logger.info("running: ctdb reloadnodes") + subprocess.check_call(list(samba_cmds.ctdb["reloadnodes"])) + else: + _logger.info("node is not leader. skipping reloadnodes") def ensure_ctdbd_etc_files(