Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ctdb: add ctdb-monitor-nodes command #127

Merged
merged 8 commits into from
Jul 18, 2024
110 changes: 90 additions & 20 deletions sambacc/commands/ctdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#

import argparse
import contextlib
import logging
import os
import socket
Expand All @@ -28,7 +29,7 @@
from sambacc import samba_cmds
from sambacc.simple_waiter import Sleeper, Waiter

from .cli import best_waiter, commands, Context, Fail
from .cli import best_leader_locator, best_waiter, commands, Context, Fail

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -302,45 +303,74 @@ def ctdb_set_node(ctx: Context) -> None:

@commands.command(name="ctdb-manage-nodes", arg_func=_ctdb_general_node_args)
def ctdb_manage_nodes(ctx: Context) -> None:
"""Run a long lived procees to monitor the node state file for new nodes.
When a new node is found, if the current node is in the correct state, this
node will add it to CTDB.
"""Run a long lived process to manage the cluster metadata. It can add new
nodes. When a new node is found, if the current node is in the correct
state, this node will add it to CTDB.
"""
_ctdb_ok()
np = NodeParams(ctx)
expected_pnn = np.node_number or 0
waiter = np.cluster_meta_waiter()

errors = 0
limiter = ErrorLimiter("ctdb_manage_nodes", 10, pause_func=waiter.wait)
while True:
try:
ctdb.monitor_cluster_meta_updates(
with limiter.catch():
ctdb.manage_cluster_meta_updates(
cmeta=np.cluster_meta(),
pnn=expected_pnn,
real_path=np.persistent_path,
pause_func=waiter.wait,
)
errors = 0
except KeyboardInterrupt:
raise
except Exception as err:
_logger.error(
f"error during manage_nodes: {err}, count={errors}",
exc_info=True,


def _ctdb_monitor_nodes_args(parser: argparse.ArgumentParser) -> None:
_ctdb_must_have_node_args(parser)
parser.add_argument(
"--reload",
choices=("leader", "never", "all"),
default="leader",
help="Specify which nodes can command CTDB to reload nodes",
)


@commands.command(name="ctdb-monitor-nodes", arg_func=_ctdb_monitor_nodes_args)
def ctdb_monitor_nodes(ctx: Context) -> None:
"""Run a long lived process to monitor the cluster metadata.
Unlike ctdb_manage_nodes this function assumes that the node state
file is externally managed and primarily exists to reflect any changes
to the cluster meta into CTDB.
"""
_ctdb_ok()
np = NodeParams(ctx)
waiter = np.cluster_meta_waiter()
leader_locator = None
if ctx.cli.reload == "leader":
leader_locator = best_leader_locator(ctx.instance_config)
reload_all = ctx.cli.reload == "all"
nodes_file_path = np.persistent_path if ctx.cli.write_nodes else None

_logger.info("monitoring cluster meta changes")
_logger.debug(
"reload_all=%s leader_locator=%r", reload_all, leader_locator
)
limiter = ErrorLimiter("ctdb_monitor_nodes", 10, pause_func=waiter.wait)
while True:
with limiter.catch():
ctdb.monitor_cluster_meta_changes(
cmeta=np.cluster_meta(),
pause_func=waiter.wait,
nodes_file_path=nodes_file_path,
leader_locator=leader_locator,
reload_all=reload_all,
)
errors += 1
if errors > 10:
_logger.error(f"too many retries ({errors}). giving up")
raise
waiter.wait()


def _ctdb_must_have_node_args(parser: argparse.ArgumentParser) -> None:
_ctdb_general_node_args(parser)
parser.add_argument(
"--write-nodes",
action="store_true",
help="Specify node by IP",
help="Write ctdb nodes file based on cluster meta contents",
)


Expand Down Expand Up @@ -411,3 +441,43 @@ def ctdb_rados_mutex(ctx: Context) -> None:
cmd = cmd["-n", namespace]
_logger.debug("executing command: %r", cmd)
samba_cmds.execute(cmd) # replaces process


class ErrorLimiter:
def __init__(
self,
name: str,
limit: int,
*,
pause_func: typing.Optional[typing.Callable] = None,
) -> None:
self.name = name
self.limit = limit
self.errors = 0
self.pause_func = pause_func

def post_catch(self):
if self.pause_func is not None:
self.pause_func()

@contextlib.contextmanager
def catch(self) -> typing.Iterator[None]:
try:
_logger.debug(
"error limiter proceeding: %s: errors=%r",
self.name,
self.errors,
)
yield
except KeyboardInterrupt:
raise
except Exception as err:
_logger.error(
f"error during {self.name}: {err}, count={self.errors}",
exc_info=True,
)
self.errors += 1
if self.errors > self.limit:
_logger.error(f"too many retries ({self.errors}). giving up")
raise
self.post_catch()
120 changes: 104 additions & 16 deletions sambacc/ctdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import typing

from sambacc import config
from sambacc import leader
from sambacc import samba_cmds
from sambacc.jfile import ClusterMetaJSONFile
from sambacc.netcmd_loader import template_config
Expand Down Expand Up @@ -338,15 +339,15 @@ def manage_nodes(
pause_func: typing.Callable,
) -> None:
"""Monitor nodes json for updates, reflecting those changes into ctdb."""
monitor_cluster_meta_updates(
manage_cluster_meta_updates(
ClusterMetaJSONFile(nodes_json),
pnn,
real_path,
pause_func,
)


def monitor_cluster_meta_updates(
def manage_cluster_meta_updates(
cmeta: ClusterMeta,
pnn: int,
real_path: str,
Expand Down Expand Up @@ -464,10 +465,7 @@ def _node_update(cmeta: ClusterMeta, real_path: str) -> bool:
new_ctdb_nodes.append(expected_line)
else:
new_ctdb_nodes[pnn] = expected_line
with open(real_path, "w") as nffh:
write_nodes_file(nffh, new_ctdb_nodes)
nffh.flush()
os.fsync(nffh)
_save_nodes(real_path, new_ctdb_nodes)
_logger.info("running: ctdb reloadnodes")
subprocess.check_call(list(samba_cmds.ctdb["reloadnodes"]))
for entry in need_reload:
Expand All @@ -490,17 +488,107 @@ def cluster_meta_to_nodes(cmeta: ClusterMeta, real_path: str) -> None:
json_data = cmo.load()
nodes = json_data.get("nodes", [])
_logger.info("Found node metadata: %r", nodes)
pnn_max = max(n["pnn"] for n in nodes) + 1 # pnn is zero indexed
ctdb_nodes: list[str] = [""] * pnn_max
for entry in nodes:
pnn = entry["pnn"]
expected_line = _entry_to_node(ctdb_nodes, entry)
ctdb_nodes[pnn] = expected_line
ctdb_nodes = _cluster_meta_to_ctdb_nodes(nodes)
_logger.info("Will write nodes: %s", ctdb_nodes)
with open(real_path, "w") as nffh:
write_nodes_file(nffh, ctdb_nodes)
nffh.flush()
os.fsync(nffh)
_save_nodes(real_path, ctdb_nodes)


def _cluster_meta_to_ctdb_nodes(nodes: list[dict]) -> list[str]:
pnn_max = max(n["pnn"] for n in nodes) + 1 # pnn is zero indexed
ctdb_nodes: list[str] = [""] * pnn_max
for entry in nodes:
pnn = entry["pnn"]
# overwrite the pnn indexed entry with expected value
ctdb_nodes[pnn] = _entry_to_node(ctdb_nodes, entry)
return ctdb_nodes


def _save_nodes(path: str, ctdb_nodes: list[str]) -> None:
with open(path, "w") as nffh:
write_nodes_file(nffh, ctdb_nodes)
nffh.flush()
os.fsync(nffh)


def monitor_cluster_meta_changes(
cmeta: ClusterMeta,
pause_func: typing.Callable,
*,
nodes_file_path: typing.Optional[str] = None,
reload_all: bool = False,
leader_locator: typing.Optional[leader.LeaderLocator] = None,
) -> None:
"""Monitor cluster meta for changes, reflecting those changes into ctdb.

Unlike manage_cluster_meta_updates this function never changes the
contents of the nodes list in the cluster meta and takes those values
as a given, assuming some external agent has the correct global view of
the cluster and is updating it correctly. This function exists to
translate that content into something ctdb can understand.
"""
prev_meta: dict[str, typing.Any] = {}
if nodes_file_path:
prev_nodes = read_ctdb_nodes(nodes_file_path)
else:
with cmeta.open(locked=True) as cmo:
meta1 = cmo.load()
prev_nodes = _cluster_meta_to_ctdb_nodes(meta1.get("nodes", []))
_logger.debug("initial cluster meta content: %r", prev_meta)
_logger.debug("initial nodes content: %r", prev_nodes)
while True:
pause_func()
with cmeta.open(locked=True) as cmo:
curr_meta = cmo.load()
if curr_meta == prev_meta:
_logger.debug("cluster meta content unchanged: %r", curr_meta)
continue
_logger.info("cluster meta content changed")
_logger.debug(
"cluster meta: previous=%r current=%r", prev_meta, curr_meta
)
prev_meta = curr_meta

# maybe some other metadata changed?
expected_nodes = _cluster_meta_to_ctdb_nodes(
curr_meta.get("nodes", [])
)
if prev_nodes == expected_nodes:
_logger.debug("ctdb nodes list unchanged: %r", expected_nodes)
continue
_logger.info("ctdb nodes list changed")
_logger.debug(
"nodes list: previous=%r current=%r", prev_nodes, expected_nodes
)
prev_nodes = expected_nodes

if nodes_file_path:
_logger.info("updating nodes file: %s", nodes_file_path)
_save_nodes(nodes_file_path, expected_nodes)
_maybe_reload_nodes(leader_locator, reload_all=reload_all)


def _maybe_reload_nodes(
leader_locator: typing.Optional[leader.LeaderLocator] = None,
reload_all: bool = False,
) -> None:
"""Issue a reloadnodes command if leader_locator is available and
node is leader or reload_all is true.
"""
if reload_all:
_logger.info("running: ctdb reloadnodes")
subprocess.check_call(list(samba_cmds.ctdb["reloadnodes"]))
return
if leader_locator is None:
_logger.warning("no leader locator: not calling reloadnodes")
return
# use the leader locator to only issue the reloadnodes command once
# for a change instead of all the nodes "spamming" the cluster
with leader_locator as ll:
if ll.is_leader():
_logger.info("running: ctdb reloadnodes")
subprocess.check_call(list(samba_cmds.ctdb["reloadnodes"]))
else:
_logger.info("node is not leader. skipping reloadnodes")


def ensure_ctdbd_etc_files(
Expand Down
Loading