Skip to content

Commit

Permalink
CRF: rename passive to adaptive
Browse files Browse the repository at this point in the history
* Also fixed a bug that pg message is not finished
* Add comments, refine logging
  • Loading branch information
fantix committed Sep 13, 2021
1 parent 93ccdd4 commit 7ce7430
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 30 deletions.
6 changes: 3 additions & 3 deletions edb/server/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class ServerConfig(NamedTuple):
insecure: bool
data_dir: pathlib.Path
backend_dsn: str
backend_passive_ha: bool
backend_adaptive_ha: bool
tenant_id: Optional[str]
ignore_other_tenants: bool
log_level: str
Expand Down Expand Up @@ -242,8 +242,8 @@ def _writer(status: str) -> None:
'Also supports HA clusters, for example: stolon+consul+http://'
'localhost:8500/test_cluster'),
click.option(
'--enable-backend-passive-ha', 'backend_passive_ha', is_flag=True,
help='If backend passive HA is enabled, the EdgeDB server will '
'--enable-backend-adaptive-ha', 'backend_adaptive_ha', is_flag=True,
help='If backend adaptive HA is enabled, the EdgeDB server will '
'monitor the health of the backend cluster and shutdown all '
'backend connections if threshold is reached, until reconnected '
'again using the same DSN (HA should have updated the DNS '
Expand Down
61 changes: 56 additions & 5 deletions edb/server/ha/passive.py → edb/server/ha/adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,56 @@ class State(enum.Enum):
FAILOVER = 3


class PassiveHASupport:
class AdaptiveHASupport:
# Adaptive HA support is used to detect HA backends that does not actively
# send clear failover signals to EdgeDB. It can be enabled through command
# line argument --enable-backend-adaptive-ha.
#
# This class evaluates the events on the backend connection pool into 3
# states representing the status of the backend:
#
# * Healthy - all is good
# * Unhealthy - a staging state before failover
# * Failover - backend failover is in process
#
# When entering Unhealthy state, we will start to count events for a
# threshold; when reached, we'll switch to Failover state - that means we
# will actively disconnect all backend connections and wait for sys_pgcon
# to reconnect. In any of the 3 states, client connections will not be
# dropped. Whether the clients could issue queries is irrelevant to the 3
# states - `BackendUnavailableError` or `BackendInFailoverError` is only
# raised if the sys_pgcon is broken. But even with that said,
# `BackendUnavailableError` is only seen in Unhealthy (not always), and
# Failover always means `BackendInFailoverError` for any queries.
#
# Rules of state switches:
#
# Unhealthy -> Healthy
# * Successfully connected to a non-hot-standby backend.
# * Data received from any pgcon (not implemented).
#
# Unhealthy -> Failover
# * More than 60% (UNEXPECTED_DISCONNECTS_THRESHOLD) of existing pgcons
# are "unexpectedly disconnected" (number of existing pgcons is
# captured at the moment we change to Unhealthy state, and maintained
# on "expected disconnects" too).
# * (and) In Unhealthy state for more than UNHEALTHY_MIN_TIME seconds.
# * (and) sys_pgcon is down.
# * (or) Postgres shutdown/hot-standby notification received.
#
# Healthy -> Unhealthy
# * Any unexpected disconnect.
# * (or) Failed to connect due to ConnectionError (not implemented).
# * (or) Last active time is greater than 10 seconds (depends on the
# sys_pgcon idle-poll interval) (not implemented).
#
# Healthy -> Failover
# * Postgres shutdown/hot-standby notification received.
#
# Failover -> Healthy
# * Successfully connected to a non-hot-standby backend.
# * (and) sys_pgcon is healthy.

_state: State
_unhealthy_timer_handle: Optional[asyncio.TimerHandle]

Expand All @@ -51,7 +100,7 @@ def __init__(self, cluster_protocol: base.ClusterProtocol):

def set_state_failover(self):
self._state = State.FAILOVER
logger.critical("Passive HA failover detected")
logger.critical("adaptive: HA failover detected")
self._reset()
self._cluster_protocol.on_switch_over()

Expand All @@ -70,7 +119,7 @@ def on_pgcon_broken(self, is_sys_pgcon: bool):
self._cluster_protocol.get_active_pgcon_num(), 0
) + 1
logger.warning(
"Passive HA cluster is unhealthy. "
"adaptive: Backend HA cluster is unhealthy. "
"Captured number of pgcons: %d",
self._pgcon_count,
)
Expand All @@ -94,12 +143,14 @@ def on_pgcon_made(self, is_sys_pgcon: bool):
self._sys_pgcon_healthy = True
if self._state == State.UNHEALTHY:
self._state = State.HEALTHY
logger.info("Passive HA cluster is healthy")
logger.info("adaptive: Backend HA cluster is healthy")
self._reset()
elif self._state == State.FAILOVER:
if self._sys_pgcon_healthy:
self._state = State.HEALTHY
logger.info("Passive HA cluster has recovered from failover")
logger.info(
"adaptive: Backend HA cluster has recovered from failover"
)

def _reset(self):
self._pgcon_count = 0
Expand Down
7 changes: 4 additions & 3 deletions edb/server/ha/stolon.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def on_cluster_data(self, data):
master_db = cluster_status.get("master")
cluster_phase = cluster_status.get("phase")
if cluster_phase != "normal":
logger.debug("Stolon cluster phase: %s", cluster_phase)
logger.debug("Stolon cluster phase: %r", cluster_phase)

if not master_db:
return
Expand All @@ -108,10 +108,10 @@ def on_cluster_data(self, data):
master_addr = master_host, int(master_port)
if master_addr != self._master_addr:
if self._master_addr is None:
logger.info("Discovered master Postgres at %s", master_addr)
logger.info("Discovered master Postgres at %r", master_addr)
else:
logger.critical(
f"Switching over the master Postgres from %s to %s",
f"Switching over the master Postgres from %r to %r",
self._master_addr,
master_addr,
)
Expand Down Expand Up @@ -160,6 +160,7 @@ def connection_lost(self, exc):

def on_status(self, status: bytes):
if self._parser.get_status_code() != 200:
logger.debug("Consul is returning non-200 responses")
self._transport.close()

def on_body(self, body: bytes):
Expand Down
2 changes: 1 addition & 1 deletion edb/server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ async def _run_server(
startup_script=args.startup_script,
allow_insecure_binary_clients=args.allow_insecure_binary_clients,
allow_insecure_http_clients=args.allow_insecure_http_clients,
backend_passive_ha=args.backend_passive_ha,
backend_adaptive_ha=args.backend_adaptive_ha,
)
await sc.wait_for(ss.init())

Expand Down
5 changes: 4 additions & 1 deletion edb/server/pgcon/pgcon.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ async def connect(connargs, dbname, tenant_id):
)

if 'in_hot_standby' in pgcon.parameter_status:
# in_hot_standby is present in Postgres 14 and above
# in_hot_standby is always present in Postgres 14 and above
if pgcon.parameter_status['in_hot_standby'] == 'on':
# Abort if we're connecting to a hot standby
pgcon.terminate()
Expand Down Expand Up @@ -1842,6 +1842,9 @@ cdef class PGConnection:
assert self.buffer.get_message_type() == b'S'
name = self.buffer.read_null_str().decode()
value = self.buffer.read_null_str().decode()
self.buffer.finish_message()
if self.debug:
self.debug_print('PARAMETER STATUS MSG', name, value)
return name, value

cdef make_clean_stmt_message(self, bytes stmt_name):
Expand Down
34 changes: 17 additions & 17 deletions edb/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
from edb.server import defines
from edb.server import protocol
from edb.server.ha import base as ha_base
from edb.server.ha import passive
from edb.server.ha import adaptive as adaptive_ha
from edb.server.protocol import binary # type: ignore
from edb.server import pgcon
from edb.server.pgcon import errors as pgcon_errors
Expand Down Expand Up @@ -94,7 +94,7 @@ class Server(ha_base.ClusterProtocol):

_task_group: Optional[taskgroup.TaskGroup]
_binary_conns: Set[binary.EdgeConnection]
_backend_passive_ha: Optional[passive.PassiveHASupport]
_backend_adaptive_ha: Optional[adaptive_ha.AdaptiveHASupport]

def __init__(
self,
Expand All @@ -112,15 +112,15 @@ def __init__(
echo_runtime_info: bool = False,
status_sink: Optional[Callable[[str], None]] = None,
startup_script: Optional[srvargs.StartupScript] = None,
backend_passive_ha: bool = False,
backend_adaptive_ha: bool = False,
):

self._loop = asyncio.get_running_loop()

# Used to tag PG notifications to later disambiguate them.
self._server_id = str(uuid.uuid4())

# Increase-only counter to reject outdated connects
# Increase-only counter to reject outdated attempts to connect
self._ha_master_serial = 0

self._serving = False
Expand Down Expand Up @@ -195,10 +195,10 @@ def __init__(

self._allow_insecure_binary_clients = allow_insecure_binary_clients
self._allow_insecure_http_clients = allow_insecure_http_clients
if backend_passive_ha:
self._backend_passive_ha = passive.PassiveHASupport(self)
if backend_adaptive_ha:
self._backend_adaptive_ha = adaptive_ha.AdaptiveHASupport(self)
else:
self._backend_passive_ha = None
self._backend_adaptive_ha = None

async def _request_stats_logger(self):
last_seen = -1
Expand Down Expand Up @@ -268,8 +268,8 @@ async def _pg_connect(self, dbname):
self._get_pgaddr(), pg_dbname, self._tenant_id)
if ha_serial == self._ha_master_serial:
rv.set_server(self)
if self._backend_passive_ha is not None:
self._backend_passive_ha.on_pgcon_made(
if self._backend_adaptive_ha is not None:
self._backend_adaptive_ha.on_pgcon_made(
dbname == defines.EDGEDB_SYSTEM_DB
)
return rv
Expand Down Expand Up @@ -925,24 +925,24 @@ def _on_sys_pgcon_parameter_status_updated(self, name, value):
self._on_sys_pgcon_failover_signal()

def _on_sys_pgcon_failover_signal(self):
if self._backend_passive_ha is not None:
# Switch to FAILOVER if passive HA is enabled
self._backend_passive_ha.set_state_failover()
if self._backend_adaptive_ha is not None:
# Switch to FAILOVER if adaptive HA is enabled
self._backend_adaptive_ha.set_state_failover()
elif getattr(self._cluster, '_ha_backend', None) is None:
# If the server is not using an HA backend, nor has enabled the
# passive HA monitoring, we still tries to "switch over" by
# adaptive HA monitoring, we still tries to "switch over" by
# disconnecting all pgcons if failover signal is received, allowing
# reconnection to happen sooner.
self.on_switch_over()
# Else, the HA backend should take care of calling on_switch_over()

def _on_pgcon_broken(self, is_sys_pgcon=False):
if self._backend_passive_ha:
self._backend_passive_ha.on_pgcon_broken(is_sys_pgcon)
if self._backend_adaptive_ha:
self._backend_adaptive_ha.on_pgcon_broken(is_sys_pgcon)

def _on_pgcon_lost(self):
if self._backend_passive_ha:
self._backend_passive_ha.on_pgcon_lost()
if self._backend_adaptive_ha:
self._backend_adaptive_ha.on_pgcon_lost()

async def _reconnect_sys_pgcon(self):
try:
Expand Down

0 comments on commit 7ce7430

Please sign in to comment.