From dd1d62c9709308370d0e277449022324da99744f Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 9 Jan 2025 15:35:41 +0100 Subject: [PATCH] Redesigned k8s peer discovery Rather than querying the Kubernetes API, just check the local node name and try to connect to the pod with `-0` suffix (or configured `ordinal_start` value). Only the pod with the lowest ordinal can form a new cluster - all other pods will wait forever. This should prevent any race conditions and incorrectly formed clusters. --- .github/workflows/oci-make.yaml | 2 +- deps/rabbit/src/rabbit_peer_discovery.erl | 66 ++-- .../src/rabbit_peer_discovery_backend.erl | 4 +- deps/rabbitmq_peer_discovery_k8s/Makefile | 2 +- deps/rabbitmq_peer_discovery_k8s/README.md | 79 +++-- deps/rabbitmq_peer_discovery_k8s/app.bzl | 9 - .../include/rabbit_peer_discovery_k8s.hrl | 63 ---- .../schema/rabbitmq_peer_discovery_k8s.schema | 32 ++ .../src/rabbit_peer_discovery_k8s.erl | 298 ++++-------------- .../src/rabbitmq_peer_discovery_k8s.erl | 30 +- .../src/rabbitmq_peer_discovery_k8s_app.erl | 21 -- ...bbitmq_peer_discovery_k8s_node_monitor.erl | 49 --- .../src/rabbitmq_peer_discovery_k8s_sup.erl | 38 --- .../rabbitmq_peer_discovery_k8s.snippets | 64 ++++ .../rabbitmq_peer_discovery_k8s_SUITE.erl | 216 +++++-------- 15 files changed, 343 insertions(+), 630 deletions(-) delete mode 100644 deps/rabbitmq_peer_discovery_k8s/include/rabbit_peer_discovery_k8s.hrl delete mode 100644 deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s_app.erl delete mode 100644 deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s_node_monitor.erl delete mode 100644 deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s_sup.erl diff --git a/.github/workflows/oci-make.yaml b/.github/workflows/oci-make.yaml index 18e169ae5537..fa70891568f9 100644 --- a/.github/workflows/oci-make.yaml +++ b/.github/workflows/oci-make.yaml @@ -43,7 +43,7 @@ jobs: make package-generic-unix PROJECT_VERSION=4.1.0-alpha.1 - name: Upload package-generic-unix if: steps.authorized.outputs.authorized == 'true' - uses: actions/upload-artifact@v4.3.1 + uses: actions/upload-artifact@v4 with: name: package-generic-unix path: PACKAGES/rabbitmq-server-*.tar.xz diff --git a/deps/rabbit/src/rabbit_peer_discovery.erl b/deps/rabbit/src/rabbit_peer_discovery.erl index 0b196873c058..e5207c87ae0c 100644 --- a/deps/rabbit/src/rabbit_peer_discovery.erl +++ b/deps/rabbit/src/rabbit_peer_discovery.erl @@ -108,29 +108,22 @@ maybe_init() -> %% node, even if the configuration changed in between. persistent_term:put(?PT_PEER_DISC_BACKEND, Backend), - _ = code:ensure_loaded(Backend), - case erlang:function_exported(Backend, init, 0) of - true -> - ?LOG_DEBUG( - "Peer discovery: backend supports initialisation", + case catch Backend:init() of + ok -> + ?LOG_INFO( + "Peer discovery: backend initialisation succeeded", #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), - case Backend:init() of - ok -> - ?LOG_DEBUG( - "Peer discovery: backend initialisation succeeded", - #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), - ok; - {error, _Reason} = Error -> - ?LOG_WARNING( - "Peer discovery: backend initialisation failed: ~tp.", - [Error], - #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), - ok - end; - false -> + ok; + {error, _Reason} = Error -> + ?LOG_ERROR( + "Peer discovery: backend initialisation failed: ~tp", + [Error], + #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), + ok; + {'EXIT', {undef, _}} -> ?LOG_DEBUG( "Peer discovery: backend does not support initialisation", - #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), + #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), ok end. @@ -153,13 +146,13 @@ sync_desired_cluster() -> %% We handle retries at the top level: steps are followed sequentially and %% if one of them fails, we retry the whole process. - {Retries, RetryDelay} = discovery_retries(), + {Retries, RetryDelay} = discovery_retries(Backend), sync_desired_cluster(Backend, Retries, RetryDelay). -spec sync_desired_cluster(Backend, RetriesLeft, RetryDelay) -> ok when Backend :: backend(), - RetriesLeft :: non_neg_integer(), + RetriesLeft :: non_neg_integer() | unlimited, RetryDelay :: non_neg_integer(). %% @private @@ -240,10 +233,18 @@ sync_desired_cluster(Backend, RetriesLeft, RetryDelay) -> -spec retry_sync_desired_cluster(Backend, RetriesLeft, RetryDelay) -> ok when Backend :: backend(), - RetriesLeft :: non_neg_integer(), + RetriesLeft :: non_neg_integer() | unlimited, RetryDelay :: non_neg_integer(). %% @private +retry_sync_desired_cluster(Backend, unlimited, RetryDelay) -> + ?LOG_DEBUG( + "Peer discovery: retrying to create/sync cluster in ~b ms " + "(will retry forever)", + [RetryDelay], + #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), + timer:sleep(RetryDelay), + sync_desired_cluster(Backend, unlimited, RetryDelay); retry_sync_desired_cluster(Backend, RetriesLeft, RetryDelay) when RetriesLeft > 0 -> RetriesLeft1 = RetriesLeft - 1, @@ -1007,11 +1008,24 @@ maybe_unregister() -> ok end. --spec discovery_retries() -> {Retries, RetryDelay} when - Retries :: non_neg_integer(), +-spec discovery_retries(Backend) -> {Retries, RetryDelay} when + Backend :: backend(), + Retries :: non_neg_integer() | unlimited, RetryDelay :: non_neg_integer(). -discovery_retries() -> +discovery_retries(Backend) -> + {_Retries, RetryDelay} = RetryConfig = discovery_retries_from_config(), + case catch Backend:retry_strategy() of + unlimited -> + {unlimited, RetryDelay}; + _ -> + RetryConfig + end. + +-spec discovery_retries_from_config() -> {Retries, RetryDelay} when + Retries :: non_neg_integer(), + RetryDelay :: non_neg_integer(). +discovery_retries_from_config() -> case application:get_env(rabbit, cluster_formation) of {ok, Proplist} -> Retries = proplists:get_value(discovery_retry_limit, Proplist, ?DEFAULT_DISCOVERY_RETRY_COUNT), diff --git a/deps/rabbit_common/src/rabbit_peer_discovery_backend.erl b/deps/rabbit_common/src/rabbit_peer_discovery_backend.erl index fdf917eb2156..56def7be9567 100644 --- a/deps/rabbit_common/src/rabbit_peer_discovery_backend.erl +++ b/deps/rabbit_common/src/rabbit_peer_discovery_backend.erl @@ -54,7 +54,9 @@ -callback unlock(Data :: term()) -> ok. --optional_callbacks([init/0]). +-callback retry_strategy() -> limited | unlimited. + +-optional_callbacks([init/0, retry_strategy/0]). -export([api_version/0]). diff --git a/deps/rabbitmq_peer_discovery_k8s/Makefile b/deps/rabbitmq_peer_discovery_k8s/Makefile index 01a791551cf9..8de21011f38b 100644 --- a/deps/rabbitmq_peer_discovery_k8s/Makefile +++ b/deps/rabbitmq_peer_discovery_k8s/Makefile @@ -3,7 +3,7 @@ PROJECT_DESCRIPTION = Kubernetes-based RabbitMQ peer discovery backend PROJECT_MOD = rabbitmq_peer_discovery_k8s_app DEPS = rabbit_common rabbitmq_peer_discovery_common rabbit -TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers ct_helper meck +TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers meck dep_ct_helper = git https://github.com/extend/ct_helper.git master DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk diff --git a/deps/rabbitmq_peer_discovery_k8s/README.md b/deps/rabbitmq_peer_discovery_k8s/README.md index 96217cbee488..73c4fc6cf191 100644 --- a/deps/rabbitmq_peer_discovery_k8s/README.md +++ b/deps/rabbitmq_peer_discovery_k8s/README.md @@ -2,55 +2,78 @@ ## Overview -This is an implementation of RabbitMQ [peer discovery interface](https://www.rabbitmq.com/blog/2018/02/12/peer-discovery-subsystem-in-rabbitmq-3-7/) -for Kubernetes. +This is an implementation of RabbitMQ peer discovery interface for Kubernetes. -This plugin only performs peer discovery using Kubernetes API as the source of data on running cluster pods. -Please get familiar with [RabbitMQ clustering fundamentals](https://rabbitmq.com/clustering.html) before attempting -to use it. +On Kubernetes, RabbitMQ should be deployed as a StatefulSet. Each Pod in a StatefulSet has +a name made up of the StatefulSet name and an ordinal index. The ordinal index values almost +always start with 0, although [this is configurable](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#ordinal-index). -Cluster provisioning and most of Day 2 operations such as [proper monitoring](https://rabbitmq.com/monitoring.html) -are not in scope for this plugin. +This plugin only allows the node with the lowest ordinal index (generally the pod with the `-0` suffix) to form a new cluster. +This node is referred to as the seed node. -For a more comprehensive open source RabbitMQ on Kubernetes deployment solution, -see the [RabbitMQ Cluster Operator for Kubernetes](https://www.rabbitmq.com/kubernetes/operator/operator-overview.html). -The Operator is developed [on GitHub](https://github.com/rabbitmq/cluster-operator/) and contains its -own [set of examples](https://github.com/rabbitmq/cluster-operator/tree/main/docs/examples). +All other nodes will join the seed node, or will forever keep try to join it, if they can't. +In the most common scenario, this means that: +* the pod with `-0` suffix will start immediately, effectively forming a new single-node cluster +* any other pod will join the pod with `-0` suffix and synchronize the cluster metadata with it -## Supported RabbitMQ Versions - -This plugin ships with RabbitMQ 3.7.0 or later. +## Configuration +**In most cases, no configuration should be necessary beyond enabling this plugin.** -## Installation +If you use the [RabbitMQ Cluster Operator](https://www.rabbitmq.com/kubernetes/operator/operator-overview) +or the [Bitnami Helm Chart](https://github.com/bitnami/charts/tree/main/bitnami/rabbitmq), this plugin is enabled by default, +so you don't have to do anything. -This plugin ships with [supported RabbitMQ versions](https://www.rabbitmq.com/versions.html). -There is no need to install it separately. +### Advanced Configuration -As with any [plugin](https://rabbitmq.com/plugins.html), it must be enabled before it -can be used. For peer discovery plugins it means they must be [enabled](https://rabbitmq.com//plugins.html#basics) or [preconfigured](https://rabbitmq.com//plugins.html#enabled-plugins-file) -before first node boot: +If you use a different ordinal start value in your StatefulSet, you have to configure this plugin to use it: +``` +cluster_formation.k8s.ordinal_start = N +``` +where `N` matches the `.spec.ordinals.start` value of the StatefulSet. +If the plugin doesn't work for any reason (a very unusual Kubernetes configuration or issues with hostname resolution) +and you have to force RabbitMQ to use a different seed node than it would automatically, you can do this: ``` -rabbitmq-plugins --offline enable rabbitmq_peer_discovery_k8s +cluster_formation.k8s.seed_node = rabbit@seed-node-hostname ``` +If `cluster_formation.k8s.seed_node` is configured, this plugin will just use this value as the seed node. +If you do this, please open a GitHub issue and explain why the plugin didn't work for you, so we can improve it. + +### Historical Notes + +This implementation (version 2) of the plugin was introduced in RabbitMQ 4.1 and has little to do with the original design, +which was included in RabbitMQ from version 3.7.0 until 4.0. Nevertheless, backwards compatibility is maintained, +by simply ignoring the configuration options of the original implementation. + +The original implementation of this plugin performed peer discovery by querying the Kubernetes API for the list of endpoints +serving as the backends of a Kubernetes Service. However, this approach had a few issues: +1. The query was not necessary, given that the Pod names are predictable +2. The query could fail or require configuration to work (eg. TLS settings had to be adjusted in some environments) +3. To perform the query, access to the Kubernetes API was required (unnecessary privileges) +4. In some environments, the plugin was prone to race conditions and could form more than 1 cluster + +The new design solves all those problems: +1. It doesn't query the Kubernetes API at all +2. Only one node is allowed to form a cluster + +## Supported RabbitMQ Versions + +Version 2 was first included in RabbitMQ 4.1. + +Version 1 of this plugin (which queried the Kubernetes API) was included from RabbitMQ 3.7.0 until 4.0. + ## Documentation See [RabbitMQ Cluster Formation guide](https://www.rabbitmq.com/cluster-formation.html) for an overview of the peer discovery subsystem, general and Kubernetes-specific configurable values and troubleshooting tips. -Example deployments that use this plugin can be found in an [RabbitMQ on Kubernetes examples repository](https://github.com/rabbitmq/diy-kubernetes-examples). -Note that they are just that, examples, and won't be optimal for every use case or cover a lot of important production -system concerns such as monitoring, persistent volume settings, access control, sizing, and so on. - - ## Contributing See [CONTRIBUTING.md](./CONTRIBUTING.md) and our [development process overview](https://www.rabbitmq.com/github.html). - ## License [Licensed under the MPL](LICENSE-MPL-RabbitMQ), same as RabbitMQ server. @@ -58,4 +81,4 @@ See [CONTRIBUTING.md](./CONTRIBUTING.md) and our [development process overview]( ## Copyright -(c) 2007-2024 Broadcom. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +(c) 2007-2025 Broadcom. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. diff --git a/deps/rabbitmq_peer_discovery_k8s/app.bzl b/deps/rabbitmq_peer_discovery_k8s/app.bzl index 7ad24c5c5c95..30256fea81ab 100644 --- a/deps/rabbitmq_peer_discovery_k8s/app.bzl +++ b/deps/rabbitmq_peer_discovery_k8s/app.bzl @@ -11,9 +11,6 @@ def all_beam_files(name = "all_beam_files"): srcs = [ "src/rabbit_peer_discovery_k8s.erl", "src/rabbitmq_peer_discovery_k8s.erl", - "src/rabbitmq_peer_discovery_k8s_app.erl", - "src/rabbitmq_peer_discovery_k8s_node_monitor.erl", - "src/rabbitmq_peer_discovery_k8s_sup.erl", ], hdrs = [":public_and_private_hdrs"], app_name = "rabbitmq_peer_discovery_k8s", @@ -37,9 +34,6 @@ def all_test_beam_files(name = "all_test_beam_files"): srcs = [ "src/rabbit_peer_discovery_k8s.erl", "src/rabbitmq_peer_discovery_k8s.erl", - "src/rabbitmq_peer_discovery_k8s_app.erl", - "src/rabbitmq_peer_discovery_k8s_node_monitor.erl", - "src/rabbitmq_peer_discovery_k8s_sup.erl", ], hdrs = [":public_and_private_hdrs"], app_name = "rabbitmq_peer_discovery_k8s", @@ -73,9 +67,6 @@ def all_srcs(name = "all_srcs"): srcs = [ "src/rabbit_peer_discovery_k8s.erl", "src/rabbitmq_peer_discovery_k8s.erl", - "src/rabbitmq_peer_discovery_k8s_app.erl", - "src/rabbitmq_peer_discovery_k8s_node_monitor.erl", - "src/rabbitmq_peer_discovery_k8s_sup.erl", ], ) filegroup( diff --git a/deps/rabbitmq_peer_discovery_k8s/include/rabbit_peer_discovery_k8s.hrl b/deps/rabbitmq_peer_discovery_k8s/include/rabbit_peer_discovery_k8s.hrl deleted file mode 100644 index 39622ad42578..000000000000 --- a/deps/rabbitmq_peer_discovery_k8s/include/rabbit_peer_discovery_k8s.hrl +++ /dev/null @@ -1,63 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --define(CONFIG_MODULE, rabbit_peer_discovery_config). --define(UTIL_MODULE, rabbit_peer_discovery_util). --define(HTTPC_MODULE, rabbit_peer_discovery_httpc). - --define(BACKEND_CONFIG_KEY, peer_discovery_k8s). - --define(K8S_EVENT_SOURCE_DESCRIPTION, "rabbitmq_peer_discovery"). - --define(CONFIG_MAPPING, - #{ - k8s_scheme => #peer_discovery_config_entry_meta{ - type = string, - env_variable = "K8S_SCHEME", - default_value = "https" - }, - k8s_host => #peer_discovery_config_entry_meta{ - type = string, - env_variable = "K8S_HOST", - default_value = "kubernetes.default.svc.cluster.local" - }, - k8s_port => #peer_discovery_config_entry_meta{ - type = integer, - env_variable = "K8S_PORT", - default_value = 443 - }, - k8s_token_path => #peer_discovery_config_entry_meta{ - type = string, - env_variable = "K8S_TOKEN_PATH", - default_value = "/var/run/secrets/kubernetes.io/serviceaccount/token" - }, - k8s_cert_path => #peer_discovery_config_entry_meta{ - type = string, - env_variable = "K8S_CERT_PATH", - default_value = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" - }, - k8s_namespace_path => #peer_discovery_config_entry_meta{ - type = string, - env_variable = "K8S_NAMESPACE_PATH", - default_value = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" - }, - k8s_service_name => #peer_discovery_config_entry_meta{ - type = string, - env_variable = "K8S_SERVICE_NAME", - default_value = "rabbitmq" - }, - k8s_address_type => #peer_discovery_config_entry_meta{ - type = string, - env_variable = "K8S_ADDRESS_TYPE", - default_value = "ip" - }, - k8s_hostname_suffix => #peer_discovery_config_entry_meta{ - type = string, - env_variable = "K8S_HOSTNAME_SUFFIX", - default_value = "" - } - }). diff --git a/deps/rabbitmq_peer_discovery_k8s/priv/schema/rabbitmq_peer_discovery_k8s.schema b/deps/rabbitmq_peer_discovery_k8s/priv/schema/rabbitmq_peer_discovery_k8s.schema index bfba9b670120..68a818d06884 100644 --- a/deps/rabbitmq_peer_discovery_k8s/priv/schema/rabbitmq_peer_discovery_k8s.schema +++ b/deps/rabbitmq_peer_discovery_k8s/priv/schema/rabbitmq_peer_discovery_k8s.schema @@ -5,6 +5,38 @@ %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% +{mapping, "cluster_formation.k8s.ordinal_start", "rabbit.cluster_formation.peer_discovery_k8s.ordinal_start", [ + {datatype, integer} +]}. + +{translation, "rabbit.cluster_formation.peer_discovery_k8s.ordinal_start", +fun(Conf) -> + case cuttlefish:conf_get("cluster_formation.k8s.ordinal_start", Conf, undefined) of + undefined -> cuttlefish:unset(); + Value -> Value + end +end}. + +{mapping, "cluster_formation.k8s.seed_node", "rabbit.cluster_formation.peer_discovery_k8s.seed_node", [ + {datatype, string} +]}. + +{translation, "rabbit.cluster_formation.peer_discovery_k8s.seed_node", +fun(Conf) -> + case cuttlefish:conf_get("cluster_formation.k8s.seed_node", Conf, undefined) of + undefined -> cuttlefish:unset(); + Value -> Value + end +end}. + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% +%% All configuration options below are deprecated as of version 2 of this plugin (first shipped in 4.1). +%% Test cases are kept to ensure we still accept the old config, but it is completely ignored. +%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + %% Kubernetes host {mapping, "cluster_formation.k8s.host", "rabbit.cluster_formation.peer_discovery_k8s.k8s_host", [ diff --git a/deps/rabbitmq_peer_discovery_k8s/src/rabbit_peer_discovery_k8s.erl b/deps/rabbitmq_peer_discovery_k8s/src/rabbit_peer_discovery_k8s.erl index c05962edbeba..0242b8eb852f 100644 --- a/deps/rabbitmq_peer_discovery_k8s/src/rabbit_peer_discovery_k8s.erl +++ b/deps/rabbitmq_peer_discovery_k8s/src/rabbit_peer_discovery_k8s.erl @@ -2,248 +2,84 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% The Initial Developer of the Original Code is AWeber Communications. -%% Copyright (c) 2015-2016 AWeber Communications -%% Copyright (c) 2007-2024 Broadcom. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. All rights reserved. +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -module(rabbit_peer_discovery_k8s). -behaviour(rabbit_peer_discovery_backend). --include_lib("rabbitmq_peer_discovery_common/include/rabbit_peer_discovery.hrl"). --include("rabbit_peer_discovery_k8s.hrl"). +-export([init/0, list_nodes/0, supports_registration/0, register/0, + unregister/0, post_registration/0, lock/1, unlock/1, node/0, + retry_strategy/0]). - --export([init/0, list_nodes/0, supports_registration/0, register/0, unregister/0, - post_registration/0, lock/1, unlock/1, send_event/3, generate_v1_event/7]). +-include_lib("kernel/include/logger.hrl"). +-include_lib("rabbit_common/include/logging.hrl"). -ifdef(TEST). --compile(export_all). +-compile([node/0]). -endif. -%% -%% API -%% - init() -> - rabbit_log:debug("Peer discovery Kubernetes: initialising..."), - ok = application:ensure_started(inets), - %% we cannot start this plugin yet since it depends on the rabbit app, - %% which is in the process of being started by the time this function is called - _ = application:load(rabbitmq_peer_discovery_common), - rabbit_peer_discovery_httpc:maybe_configure_proxy(), - rabbit_peer_discovery_httpc:maybe_configure_inet6(). - --spec list_nodes() -> {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} | {error, Reason :: string()}. - -list_nodes() -> - case make_request() of - {ok, Response} -> - Addresses = extract_node_list(Response), - Nodes = lists:map(fun node_name/1, Addresses), - {ok, {Nodes, disc}}; - {error, Reason} -> - Details = io_lib:format("Failed to fetch a list of nodes from Kubernetes API: ~ts", [Reason]), - rabbit_log:error(Details), - _ = send_event("Warning", "Failed", Details), - {error, Reason} - end. - --spec supports_registration() -> boolean(). - -supports_registration() -> - true. %% to send event in post_registration/0 - --spec post_registration() -> ok | {error, Reason :: string()}. -post_registration() -> - Details = io_lib:format("Node ~ts is registered", [node()]), - send_event("Normal", "Created", Details). - --spec register() -> ok. -register() -> - ok. - --spec unregister() -> ok. -unregister() -> - ok. - --spec lock(Nodes :: [node()]) -> - {ok, {{ResourceId :: string(), LockRequesterId :: node()}, Nodes :: [node()]}} | - {error, Reason :: string()}. - -lock(Nodes) -> - Node = node(), - case lists:member(Node, Nodes) of - true -> - rabbit_log:info("Will try to lock connecting to nodes ~tp", [Nodes]), - LockId = rabbit_nodes:lock_id(Node), - Retries = rabbit_nodes:lock_retries(), - case global:set_lock(LockId, Nodes, Retries) of - true -> - {ok, {LockId, Nodes}}; - false -> - {error, io_lib:format("Acquiring lock taking too long, bailing out after ~b retries", [Retries])} - end; - false -> - %% Don't try to acquire the global lock when local node is not discoverable by peers. - %% This branch is just an additional safety check. We should never run into this branch - %% because the local Pod is in state 'Running' and we listed both ready and not-ready addresses. - {error, lists:flatten(io_lib:format("Local node ~ts is not part of discovered nodes ~tp", [Node, Nodes]))} - end. - --spec unlock({{ResourceId :: string(), LockRequestedId :: atom()}, Nodes :: [atom()]}) -> 'ok'. -unlock({LockId, Nodes}) -> - global:del_lock(LockId, Nodes), - ok. - -%% -%% Implementation -%% - --spec get_config_key(Key :: atom(), Map :: #{atom() => peer_discovery_config_value()}) - -> peer_discovery_config_value(). - -get_config_key(Key, Map) -> - ?CONFIG_MODULE:get(Key, ?CONFIG_MAPPING, Map). - -%% @doc Perform a HTTP GET request to K8s -%% @end -%% --spec make_request() -> {ok, map() | list() | term()} | {error, term()}. - -make_request() -> - M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), - {ok, Token} = rabbit_misc:raw_read_file(get_config_key(k8s_token_path, M)), - Token1 = binary:replace(Token, <<"\n">>, <<>>), - - rabbit_log:debug("Will issue a Kubernetes API request client with the following settings: ~tp", [M]), - - TLSClientOpts0 = maps:get(ssl_options, M, []), - LegacyCACertfilePath = get_config_key(k8s_cert_path, M), - %% merge legacy CA certificate file argument if TLSClientOpts does not have its modern counterpart set - TLSClientOpts = case proplists:get_value(cacertfile, TLSClientOpts0, undefined) of - undefined -> - [{cacertfile, LegacyCACertfilePath} | TLSClientOpts0]; - _Other -> - TLSClientOpts0 + Formation = application:get_env(rabbit, cluster_formation, []), + Opts = proplists:get_value(peer_discovery_k8s, Formation, []), + DeprecatedOpts = [ {K, V} || {K, V} <- Opts, not lists:member(K, configuration_options()) ], + case DeprecatedOpts of + [] -> ok; + _ -> ?LOG_WARNING("Peer discovery: ignoring deprecated configuration options: ~w", + [proplists:get_keys(DeprecatedOpts)], + #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), + ok end, + case proplists:get_value(discovery_retry_limit, Formation, undefined) of + undefined -> ok; + _ -> ?LOG_WARNING("Peer discovery: ignoring cluster_formation.discovery_retry_limit option " + "(will retry forever)", + [], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), + ok + end, + ok. - rabbit_log:debug("Will issue a Kubernetes API request client with the following TLS options: ~tp", [TLSClientOpts]), - - ?HTTPC_MODULE:get( - get_config_key(k8s_scheme, M), - get_config_key(k8s_host, M), - get_config_key(k8s_port, M), - base_path(endpoints, get_config_key(k8s_service_name, M)), - [], - [{"Authorization", "Bearer " ++ binary_to_list(Token1)}], - [{ssl, TLSClientOpts}]). - -%% @spec node_name(k8s_endpoint) -> list() -%% @doc Return a full rabbit node name, appending hostname suffix -%% @end -%% -node_name(Address) -> - M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), - ?UTIL_MODULE:node_name( - ?UTIL_MODULE:as_string(Address) ++ get_config_key(k8s_hostname_suffix, M)). - -%% @spec address(k8s_subsets()) -> list() -%% @doc Return a list of both ready and not-ready nodes. -%% For the purpose of peer discovery, consider both ready and not-ready addresses. -%% Discover peers as quickly as possible not waiting for their readiness check to succeed. -%% @end -%% --spec address(map()) -> list(). -address(Subset) -> - maps:get(<<"notReadyAddresses">>, Subset, []) ++ - maps:get(<<"addresses">>, Subset, []). - -%% @doc Return a list of nodes -%% "The set of all endpoints is the union of all subsets." -%% https://kubernetes.io/docs/reference/kubernetes-api/service-resources/endpoints-v1/ -%% @end -%% --spec extract_node_list(map()) -> list(). - -extract_node_list(Response) -> - IpLists = [[get_address(Address) - || Address <- address(Subset)] || Subset <- maps:get(<<"subsets">>, Response, [])], - sets:to_list(sets:union(lists:map(fun sets:from_list/1, IpLists))). - - -%% @doc Return a list of path segments that are the base path for k8s key actions -%% @end -%% --spec base_path(events | endpoints, term()) -> string(). -base_path(Type, Args) -> - M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), - {ok, Namespace} = rabbit_misc:raw_read_file(get_config_key(k8s_namespace_path, M)), - NameSpace1 = binary:replace(Namespace, <<"\n">>, <<>>), - rabbit_peer_discovery_httpc:build_path([api, v1, namespaces, NameSpace1, Type, Args]). - -%% get_config_key(k8s_service_name, M) - -get_address(Address) -> - M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), - maps:get(list_to_binary(get_config_key(k8s_address_type, M)), Address). - - -generate_v1_event(Namespace, Type, Reason, Message) -> - {ok, HostName} = inet:gethostname(), - Name = - io_lib:format(HostName ++ ".~B",[os:system_time(millisecond)]), - TimeInSeconds = calendar:system_time_to_rfc3339(erlang:system_time(second)), - generate_v1_event(Namespace, Name, Type, Reason, Message, TimeInSeconds, HostName). - - -generate_v1_event(Namespace, Name, Type, Reason, Message, Timestamp, HostName) -> - #{ - metadata => #{ - namespace => rabbit_data_coercion:to_binary(Namespace), - name => rabbit_data_coercion:to_binary(Name) - }, - type => rabbit_data_coercion:to_binary(Type), - reason => rabbit_data_coercion:to_binary(Reason), - message => rabbit_data_coercion:to_binary(Message), - count => 1, - lastTimestamp => rabbit_data_coercion:to_binary(Timestamp), - involvedObject => #{ - apiVersion => <<"v1">>, - kind => <<"RabbitMQ">>, - name => rabbit_data_coercion:to_binary("pod/" ++ HostName), - namespace => rabbit_data_coercion:to_binary(Namespace) - }, - source => #{ - component => rabbit_data_coercion:to_binary(HostName ++ "/" ++ - ?K8S_EVENT_SOURCE_DESCRIPTION), - host => rabbit_data_coercion:to_binary(HostName) - } - }. - - -%% @doc Perform a HTTP POST request to K8s to send and k8s v1.Event -%% @end -%% --spec send_event(term(),term(), term()) -> {ok, term()} | {error, term()}. -send_event(Type, Reason, Message) -> - M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), - {ok, Token} = rabbit_misc:raw_read_file(get_config_key(k8s_token_path, M)), - Token1 = binary:replace(Token, <<"\n">>, <<>>), - {ok, NameSpace} = rabbit_misc:raw_read_file( - get_config_key(k8s_namespace_path, M)), - NameSpace1 = binary:replace(NameSpace, <<"\n">>, <<>>), - - V1Event = generate_v1_event(NameSpace1, Type, Reason, Message), +-spec list_nodes() -> {ok, {Nodes :: [node()] | node(), NodeType :: rabbit_types:node_type()}} | {error, Reason :: string()}. - Body = rabbit_data_coercion:to_list(rabbit_json:encode(V1Event)), - ?HTTPC_MODULE:post( - get_config_key(k8s_scheme, M), - get_config_key(k8s_host, M), - get_config_key(k8s_port, M), - base_path(events,""), - [], - [{"Authorization", "Bearer " ++ rabbit_data_coercion:to_list(Token1)}], - [{ssl, [{cacertfile, get_config_key(k8s_cert_path, M)}]}], - Body - ). +list_nodes() -> + Formation = application:get_env(rabbit, cluster_formation, []), + Opts = proplists:get_value(peer_discovery_k8s, Formation, []), + SeedNode = proplists:get_value(seed_node, Opts, undefined), + SeedNodeOrdinal = integer_to_list(proplists:get_value(ordinal_start, Opts, 0)), + seed_node(SeedNode, SeedNodeOrdinal). + +seed_node(undefined, SeedNodeOrdinal) -> + Nodename = atom_to_list(?MODULE:node()), + try + [[], Prefix, StatefulSetName, MyPodId, Domain] = re:split( + Nodename, + "^([^@]+@)([^.]*-)([0-9]+)", + [{return, list}]), + _ = list_to_integer(MyPodId), + SeedNode = list_to_atom(lists:flatten(Prefix ++ StatefulSetName ++ SeedNodeOrdinal ++ Domain)), + {ok, {SeedNode, disc}} + catch error:_ -> + ?LOG_WARNING("Peer discovery: Failed to parse my node (~s). " + "Perhaps you are trying to deploy RabbitMQ without a StatefulSet?", + [Nodename], + #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), + {error, lists:flatten(io_lib:format("my nodename (~s) doesn't seem to be have an -ID suffix " + "like StatefulSet pods should", [?MODULE:node()]))} + end; +seed_node(SeedNode, _SeedNodeOrdinal) -> + % elp:ignore atoms_exhaustion + {ok, {list_to_atom(SeedNode), disc}}. + +node() -> + erlang:node(). + +supports_registration() -> false. +register() -> ok. +unregister() -> ok. +post_registration() -> ok. +lock(_) -> not_supported. +unlock(_) -> ok. +retry_strategy() -> unlimited. + +configuration_options() -> + [ordinal_start, seed_node]. diff --git a/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s.erl b/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s.erl index 335b6165e531..4470a9191a6d 100644 --- a/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s.erl +++ b/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s.erl @@ -2,7 +2,7 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% %% This module exists as an alias for rabbit_peer_discovery_k8s. @@ -12,7 +12,7 @@ -behaviour(rabbit_peer_discovery_backend). -export([init/0, list_nodes/0, supports_registration/0, register/0, unregister/0, - post_registration/0, lock/1, unlock/1, send_event/3, generate_v1_event/7]). + post_registration/0, lock/1, unlock/1, retry_strategy/0]). -define(DELEGATE, rabbit_peer_discovery_k8s). @@ -20,11 +20,12 @@ %% API %% +-spec init() -> ok | {error, Reason :: string()}. init() -> ?DELEGATE:init(). --spec list_nodes() -> {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} | - {error, Reason :: string()}. +-spec list_nodes() -> {ok, {Nodes :: [node()] | node(), NodeType :: rabbit_types:node_type()}} | {error, Reason :: string()}. + list_nodes() -> ?DELEGATE:list_nodes(). @@ -32,32 +33,27 @@ list_nodes() -> supports_registration() -> ?DELEGATE:supports_registration(). --spec register() -> ok. +-spec register() -> ok | {error, Reason :: string()}. register() -> ?DELEGATE:register(). --spec unregister() -> ok. +-spec unregister() -> ok | {error, Reason :: string()}. unregister() -> ?DELEGATE:unregister(). --spec post_registration() -> ok | {error, Reason :: string()}. +-spec post_registration() -> ok | {error, Reason :: string()}. post_registration() -> ?DELEGATE:post_registration(). --spec lock(Nodes :: [node()]) -> {ok, {ResourceId :: string(), LockRequesterId :: node()}} | {error, Reason :: string()}. +-spec lock(Nodes :: [node()]) -> {ok, Data :: term()} | not_supported | {error, Reason :: string()}. lock(Node) -> ?DELEGATE:lock(Node). --spec unlock({{ResourceId :: string(), LockRequestedId :: atom()}, Nodes :: [atom()]}) -> 'ok'. +-spec unlock(Data :: term()) -> ok. unlock(Data) -> ?DELEGATE:unlock(Data). -generate_v1_event(Namespace, Name, Type, Message, Reason, Timestamp, HostName) -> - ?DELEGATE:generate_v1_event(Namespace, Name, Type, Message, Reason, Timestamp, HostName). +-spec retry_strategy() -> limited | unlimited. +retry_strategy() -> + ?DELEGATE:retry_strategy(). -%% @doc Perform a HTTP POST request to K8s to send and k8s v1.Event -%% @end -%% --spec send_event(term(),term(), term()) -> {ok, term()} | {error, term()}. -send_event(Type, Reason, Message) -> - ?DELEGATE:send_event(Type, Reason, Message). diff --git a/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s_app.erl b/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s_app.erl deleted file mode 100644 index 803f5ccb0667..000000000000 --- a/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s_app.erl +++ /dev/null @@ -1,21 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(rabbitmq_peer_discovery_k8s_app). - -%% -%% API -%% - --behaviour(application). --export([start/2, stop/1]). - -start(_Type, _StartArgs) -> - rabbitmq_peer_discovery_k8s_sup:start_link(). - -stop(_State) -> - ok. diff --git a/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s_node_monitor.erl b/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s_node_monitor.erl deleted file mode 100644 index 94e2d9423d6b..000000000000 --- a/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s_node_monitor.erl +++ /dev/null @@ -1,49 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - -%% This gen_server receives node monitoring events from net_kernel -%% and forwards them to the Kubernetes API. - --module(rabbitmq_peer_discovery_k8s_node_monitor). - --behaviour(gen_server). - --export([start_link/0]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - -%% -%% API -%% - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -init([]) -> - _ = net_kernel:monitor_nodes(true, []), - {ok, #{}}. - -handle_call(_Msg, _From, State) -> - {reply, not_understood, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({nodeup, Node}, State) -> - Details = io_lib:format("Node ~ts is up ", [Node]), - _ = rabbit_peer_discovery_k8s:send_event("Normal", "NodeUp", Details), - {noreply, State}; -handle_info({nodedown, Node}, State) -> - Details = io_lib:format("Node ~ts is down or disconnected ", [Node]), - _ = rabbit_peer_discovery_k8s:send_event("Warning", "NodeDown", Details), - {noreply, State}. - -terminate(_Arg, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. diff --git a/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s_sup.erl b/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s_sup.erl deleted file mode 100644 index da4a96d39b4d..000000000000 --- a/deps/rabbitmq_peer_discovery_k8s/src/rabbitmq_peer_discovery_k8s_sup.erl +++ /dev/null @@ -1,38 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(rabbitmq_peer_discovery_k8s_sup). - --behaviour(supervisor). - --export([init/1, start_link/0]). - --include_lib("rabbit_common/include/rabbit.hrl"). --include("rabbit_peer_discovery_k8s.hrl"). - -%% -%% API -%% - -init([]) -> - Flags = #{strategy => one_for_one, intensity => 1, period => 1}, - Fun0 = fun() -> {ok, {Flags, []}} end, - Fun1 = fun() -> {ok, {Flags, []}} end, - Fun2 = fun(_) -> - Specs = [#{id => rabbitmq_peer_discovery_k8s_node_monitor, - start => {rabbitmq_peer_discovery_k8s_node_monitor, start_link, []}, - restart => permanent, - shutdown => ?SUPERVISOR_WAIT, - type => worker, - modules => [rabbitmq_peer_discovery_k8s] - }], - {ok, {Flags, Specs}} - end, - rabbit_peer_discovery_util:maybe_backend_configured(?BACKEND_CONFIG_KEY, Fun0, Fun1, Fun2). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). diff --git a/deps/rabbitmq_peer_discovery_k8s/test/config_schema_SUITE_data/rabbitmq_peer_discovery_k8s.snippets b/deps/rabbitmq_peer_discovery_k8s/test/config_schema_SUITE_data/rabbitmq_peer_discovery_k8s.snippets index 9d88ee8078ba..26341146b840 100644 --- a/deps/rabbitmq_peer_discovery_k8s/test/config_schema_SUITE_data/rabbitmq_peer_discovery_k8s.snippets +++ b/deps/rabbitmq_peer_discovery_k8s/test/config_schema_SUITE_data/rabbitmq_peer_discovery_k8s.snippets @@ -6,6 +6,70 @@ %% [ + + {k8s_discovery_mechanism_as_module, + "cluster_formation.peer_discovery_backend = rabbit_peer_discovery_k8s + cluster_formation.k8s.ordinal_start = 42", [ + {rabbit, [ + {cluster_formation, [ + {peer_discovery_backend, rabbit_peer_discovery_k8s}, + {peer_discovery_k8s, [ + {ordinal_start, 42} + ]} + ]} + ]} + ], [rabbitmq_peer_discovery_k8s] + }, + + {k8s_discovery_mechanism_as_alias1, + "cluster_formation.peer_discovery_backend = k8s + cluster_formation.k8s.ordinal_start = 42", [ + {rabbit, [ + {cluster_formation, [ + {peer_discovery_backend, rabbit_peer_discovery_k8s}, + {peer_discovery_k8s, [ + {ordinal_start, 42} + ]} + ]} + ]} + ], [rabbitmq_peer_discovery_k8s] + }, + + {k8s_discovery_mechanism_as_alias2, + "cluster_formation.peer_discovery_backend = kubernetes + cluster_formation.k8s.ordinal_start = 42", [ + {rabbit, [ + {cluster_formation, [ + {peer_discovery_backend, rabbit_peer_discovery_k8s}, + {peer_discovery_k8s, [ + {ordinal_start, 42} + ]} + ]} + ]} + ], [rabbitmq_peer_discovery_k8s] + }, + + {k8s_discovery_mechanism_as_module, + "cluster_formation.peer_discovery_backend = rabbit_peer_discovery_k8s + cluster_formation.k8s.seed_node = seed-node-hostname", [ + {rabbit, [ + {cluster_formation, [ + {peer_discovery_backend, rabbit_peer_discovery_k8s}, + {peer_discovery_k8s, [ + {seed_node, "seed-node-hostname"} + ]} + ]} + ]} + ], [rabbitmq_peer_discovery_k8s] + }, + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% +%% All configuration options below are deprecated as of version 2 of this plugin (first shipped in 4.1). +%% Test cases are kept to ensure we still accept the old config, but it is completely ignored. +%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + {k8s_discovery_mechanism_as_module, "cluster_formation.peer_discovery_backend = rabbit_peer_discovery_k8s cluster_formation.k8s.host = k8s.eng.megacorp.local", [ diff --git a/deps/rabbitmq_peer_discovery_k8s/test/rabbitmq_peer_discovery_k8s_SUITE.erl b/deps/rabbitmq_peer_discovery_k8s/test/rabbitmq_peer_discovery_k8s_SUITE.erl index 376f3f4e8b9f..1702446fbdf1 100644 --- a/deps/rabbitmq_peer_discovery_k8s/test/rabbitmq_peer_discovery_k8s_SUITE.erl +++ b/deps/rabbitmq_peer_discovery_k8s/test/rabbitmq_peer_discovery_k8s_SUITE.erl @@ -2,167 +2,93 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% The Initial Developer of the Original Code is AWeber Communications. -%% Copyright (c) 2015-2016 AWeber Communications -%% Copyright (c) 2007-2024 Broadcom. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. All rights reserved. +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -module(rabbitmq_peer_discovery_k8s_SUITE). +-compile(nowarn_export_all). -compile(export_all). --include_lib("eunit/include/eunit.hrl"). - -%% rabbitmq/cluster-operator contains an implicit integration test -%% for the rabbitmq_peer_discovery_k8s plugin added by -%% https://github.com/rabbitmq/cluster-operator/pull/704 +-include_lib("stdlib/include/assert.hrl"). all() -> [ - {group, unit}, - {group, lock} + {group, unit} ]. groups() -> [ {unit, [], [ - extract_node_list_long_test, - extract_node_list_short_test, - extract_node_list_hostname_short_test, - extract_node_list_real_test, - extract_node_list_with_not_ready_addresses_test, - node_name_empty_test, - node_name_suffix_test, - registration_support, - event_v1_test - ]}, - {lock, [], [ - lock_single_node, - lock_multiple_nodes, - lock_local_node_not_discovered + returns_node_0_by_default, + ordinal_start_is_configurable, + seed_node_can_be_explicitly_configured ]} ]. -init_per_testcase(T, Config) when T == node_name_empty_test; - T == node_name_suffix_test -> - meck:new(net_kernel, [passthrough, unstick]), - meck:expect(net_kernel, longnames, fun() -> true end), - Config; -init_per_testcase(_, Config) -> Config. +returns_node_0_by_default(_Config) -> + meck:new(rabbit_peer_discovery_k8s, [passthrough]), + + Cases = #{ + 'rabbit@foo-server-0.foo-nodes.default' => {ok, {'rabbit@foo-server-0.foo-nodes.default', disc} }, + 'rabbit@foo-server-10.foo-nodes.default' => {ok, {'rabbit@foo-server-0.foo-nodes.default', disc} }, + 'rabbit@foo-0-bar-1.foo-0-bar-nodes.default' => {ok, {'rabbit@foo-0-bar-0.foo-0-bar-nodes.default', disc} }, + 'rabbit@foo--0-bar--1.foo0.default' => {ok, {'rabbit@foo--0-bar--0.foo0.default', disc} }, + 'bunny@hop' => {error, "my nodename (bunny@hop) doesn't seem to be have an -ID suffix like StatefulSet pods should"} + }, + + [begin + meck:expect(rabbit_peer_discovery_k8s, node, fun() -> Nodename end), + ?assertEqual(Result, rabbitmq_peer_discovery_k8s:list_nodes()) + end || Nodename := Result <- Cases ], + + meck:unload([rabbit_peer_discovery_k8s]). + +ordinal_start_is_configurable(_Config) -> + meck:new(rabbit_peer_discovery_k8s, [passthrough]), + + application:set_env(rabbit, cluster_formation, + [ + {peer_discovery_backend, rabbit_peer_discovery_k8s}, + {peer_discovery_k8s, [ + {ordinal_start, 123} + ]} + ]), + + Cases = #{ + 'rabbit@foo-server-0.foo-nodes.default' => {ok, {'rabbit@foo-server-123.foo-nodes.default', disc} }, + 'rabbit@foo-server-10.foo-nodes.default' => {ok, {'rabbit@foo-server-123.foo-nodes.default', disc} }, + 'rabbit@foo-0-bar-1.foo-0-bar-nodes.default' => {ok, {'rabbit@foo-0-bar-123.foo-0-bar-nodes.default', disc} }, + 'rabbit@foo--0-bar--1.foo0.default' => {ok, {'rabbit@foo--0-bar--123.foo0.default', disc} }, + 'bunny@hop' => {error, "my nodename (bunny@hop) doesn't seem to be have an -ID suffix like StatefulSet pods should"} + }, + + [begin + meck:expect(rabbit_peer_discovery_k8s, node, fun() -> Nodename end), + ?assertEqual(Result, rabbitmq_peer_discovery_k8s:list_nodes()) + end || Nodename := Result <- Cases ], -end_per_testcase(_, _Config) -> - meck:unload(), application:unset_env(rabbit, cluster_formation), - [os:unsetenv(Var) || Var <- ["K8S_HOSTNAME_SUFFIX", - "K8S_ADDRESS_TYPE"]]. - -%%% -%%% Testcases -%%% - -registration_support(_Config) -> - ?assertEqual(true, rabbit_peer_discovery_k8s:supports_registration()). - -extract_node_list_long_test(_Config) -> - {ok, Response} = - rabbit_json:try_decode( - rabbit_data_coercion:to_binary( - "{\"name\": \"mysvc\",\n\"subsets\": [\n{\n\"addresses\": [{\"ip\": \"10.10.1.1\"}, {\"ip\": \"10.10.2.2\"}],\n\"ports\": [{\"name\": \"a\", \"port\": 8675}, {\"name\": \"b\", \"port\": 309}]\n},\n{\n\"addresses\": [{\"ip\": \"10.10.3.3\"}],\n\"ports\": [{\"name\": \"a\", \"port\": 93},{\"name\": \"b\", \"port\": 76}]\n}]}")), - Expectation = [<<"10.10.1.1">>, <<"10.10.2.2">>, <<"10.10.3.3">>], - ?assertEqual(Expectation, rabbit_peer_discovery_k8s:extract_node_list(Response)). - -extract_node_list_short_test(_Config) -> - {ok, Response} = - rabbit_json:try_decode( - rabbit_data_coercion:to_binary( - "{\"name\": \"mysvc\",\n\"subsets\": [\n{\n\"addresses\": [{\"ip\": \"10.10.1.1\"}, {\"ip\": \"10.10.2.2\"}],\n\"ports\": [{\"name\": \"a\", \"port\": 8675}, {\"name\": \"b\", \"port\": 309}]\n}]}")), - Expectation = [<<"10.10.1.1">>, <<"10.10.2.2">>], - ?assertEqual(Expectation, rabbit_peer_discovery_k8s:extract_node_list(Response)). - -extract_node_list_hostname_short_test(_Config) -> - os:putenv("K8S_ADDRESS_TYPE", "hostname"), - {ok, Response} = - rabbit_json:try_decode( - rabbit_data_coercion:to_binary( - "{\"name\": \"mysvc\",\n\"subsets\": [\n{\n\"addresses\": [{\"ip\": \"10.10.1.1\", \"hostname\": \"rabbitmq-1\"}, {\"ip\": \"10.10.2.2\", \"hostname\": \"rabbitmq-2\"}],\n\"ports\": [{\"name\": \"a\", \"port\": 8675}, {\"name\": \"b\", \"port\": 309}]\n}]}")), - Expectation = [<<"rabbitmq-1">>, <<"rabbitmq-2">>], - ?assertEqual(Expectation, rabbit_peer_discovery_k8s:extract_node_list(Response)). - -extract_node_list_real_test(_Config) -> - {ok, Response} = - rabbit_json:try_decode( - rabbit_data_coercion:to_binary( - "{\"kind\":\"Endpoints\",\"apiVersion\":\"v1\",\"metadata\":{\"name\":\"galera\",\"namespace\":\"default\",\"selfLink\":\"/api/v1/namespaces/default/endpoints/galera\",\"uid\":\"646f8305-3491-11e6-8c20-ecf4bbd91e6c\",\"resourceVersion\":\"17373568\",\"creationTimestamp\":\"2016-06-17T13:42:54Z\",\"labels\":{\"app\":\"mysqla\"}},\"subsets\":[{\"addresses\":[{\"ip\":\"10.1.29.8\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"default\",\"name\":\"mariadb-tco7k\",\"uid\":\"fb59cc71-558c-11e6-86e9-ecf4bbd91e6c\",\"resourceVersion\":\"13034802\"}},{\"ip\":\"10.1.47.2\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"default\",\"name\":\"mariadb-izgp8\",\"uid\":\"fb484ab3-558c-11e6-86e9-ecf4bbd91e6c\",\"resourceVersion\":\"13035747\"}},{\"ip\":\"10.1.47.3\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"default\",\"name\":\"mariadb-init-ffrsz\",\"uid\":\"fb12e1d3-558c-11e6-86e9-ecf4bbd91e6c\",\"resourceVersion\":\"13032722\"}},{\"ip\":\"10.1.94.2\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"default\",\"name\":\"mariadb-zcc0o\",\"uid\":\"fb31ce6e-558c-11e6-86e9-ecf4bbd91e6c\",\"resourceVersion\":\"13034771\"}}],\"ports\":[{\"name\":\"mysql\",\"port\":3306,\"protocol\":\"TCP\"}]}]}")), - Expectation = [<<"10.1.94.2">>, <<"10.1.47.3">>, <<"10.1.47.2">>, - <<"10.1.29.8">>], - ?assertEqual(Expectation, rabbit_peer_discovery_k8s:extract_node_list(Response)). - -extract_node_list_with_not_ready_addresses_test(_Config) -> - {ok, Response} = - rabbit_json:try_decode( - rabbit_data_coercion:to_binary( - "{\"kind\":\"Endpoints\",\"apiVersion\":\"v1\",\"metadata\":{\"name\":\"rabbitmq\",\"namespace\":\"test-rabbitmq\",\"selfLink\":\"\/api\/v1\/namespaces\/test-rabbitmq\/endpoints\/rabbitmq\",\"uid\":\"4ff733b8-3ad2-11e7-a40d-080027cbdcae\",\"resourceVersion\":\"170098\",\"creationTimestamp\":\"2017-05-17T07:27:41Z\",\"labels\":{\"app\":\"rabbitmq\",\"type\":\"LoadBalancer\"}},\"subsets\":[{\"addresses\":[{\"ip\":\"10.1.29.8\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"default\",\"name\":\"mariadb-tco7k\",\"uid\":\"fb59cc71-558c-11e6-86e9-ecf4bbd91e6c\",\"resourceVersion\":\"13034802\"}}],\"ports\":[{\"name\":\"mysql\",\"port\":3306,\"protocol\":\"TCP\"}]},{\"notReadyAddresses\":[{\"ip\":\"172.17.0.2\",\"hostname\":\"rabbitmq-0\",\"nodeName\":\"minikube\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"test-rabbitmq\",\"name\":\"rabbitmq-0\",\"uid\":\"e980fe5a-3afd-11e7-a40d-080027cbdcae\",\"resourceVersion\":\"170044\"}},{\"ip\":\"172.17.0.4\",\"hostname\":\"rabbitmq-1\",\"nodeName\":\"minikube\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"test-rabbitmq\",\"name\":\"rabbitmq-1\",\"uid\":\"f6285603-3afd-11e7-a40d-080027cbdcae\",\"resourceVersion\":\"170071\"}},{\"ip\":\"172.17.0.5\",\"hostname\":\"rabbitmq-2\",\"nodeName\":\"minikube\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"test-rabbitmq\",\"name\":\"rabbitmq-2\",\"uid\":\"fd5a86dc-3afd-11e7-a40d-080027cbdcae\",\"resourceVersion\":\"170096\"}}],\"ports\":[{\"name\":\"amqp\",\"port\":5672,\"protocol\":\"TCP\"},{\"name\":\"http\",\"port\":15672,\"protocol\":\"TCP\"}]}]}")), - Expectation = [<<"10.1.29.8">>, - <<"172.17.0.2">>, <<"172.17.0.4">>, <<"172.17.0.5">>], - ?assertEqual(Expectation, lists:sort(rabbit_peer_discovery_k8s:extract_node_list(Response))). - -node_name_empty_test(_Config) -> - Expectation = 'rabbit@rabbitmq-0', - ?assertEqual(Expectation, rabbit_peer_discovery_k8s:node_name(<<"rabbitmq-0">>)). - -node_name_suffix_test(_Config) -> - os:putenv("K8S_HOSTNAME_SUFFIX", ".rabbitmq.default.svc.cluster.local"), - Expectation = 'rabbit@rabbitmq-0.rabbitmq.default.svc.cluster.local', - ?assertEqual(Expectation, rabbit_peer_discovery_k8s:node_name(<<"rabbitmq-0">>)). - -event_v1_test(_Config) -> - Expectation = #{ - count => 1, - type => <<"Normal">>, - lastTimestamp => <<"2019-12-06T15:10:23+00:00">>, - reason => <<"Reason">>, - message => <<"MyMessage">>, - metadata =>#{ - name => <<"test">> , - namespace => <<"namespace">> - }, - involvedObject =>#{ - apiVersion => <<"v1">>, - kind => <<"RabbitMQ">>, - name => <<"pod/MyHostName">>, - namespace => <<"namespace">> - }, - source =>#{ - component => <<"MyHostName/rabbitmq_peer_discovery">>, - host => <<"MyHostName">> - } - }, - ?assertEqual(Expectation, - rabbit_peer_discovery_k8s:generate_v1_event(<<"namespace">>, "test", - "Normal", "Reason", "MyMessage", "2019-12-06T15:10:23+00:00", "MyHostName")). - -lock_single_node(_Config) -> - LocalNode = node(), - Nodes = [LocalNode], - - {ok, {LockId, Nodes}} = rabbit_peer_discovery_k8s:lock([LocalNode]), - ?assertEqual(ok, rabbit_peer_discovery_k8s:unlock({LockId, Nodes})). - -lock_multiple_nodes(_Config) -> - application:set_env(rabbit, cluster_formation, [{internal_lock_retries, 2}]), - LocalNode = node(), - OtherNodeA = a@host, - OtherNodeB = b@host, - - meck:expect(rabbit_nodes, lock_id, 1, {rabbit_nodes:cookie_hash(), OtherNodeA}), - {ok, {{LockResourceId, OtherNodeA}, [LocalNode, OtherNodeA]}} = rabbit_peer_discovery_k8s:lock([LocalNode, OtherNodeA]), - meck:expect(rabbit_nodes, lock_id, 1, {rabbit_nodes:cookie_hash(), OtherNodeB}), - ?assertEqual({error, "Acquiring lock taking too long, bailing out after 2 retries"}, rabbit_peer_discovery_k8s:lock([LocalNode, OtherNodeB])), - ?assertEqual(ok, rabbit_peer_discovery_k8s:unlock({{LockResourceId, OtherNodeA}, [LocalNode, OtherNodeA]})), - ?assertEqual({ok, {{LockResourceId, OtherNodeB}, [LocalNode, OtherNodeB]}}, rabbit_peer_discovery_k8s:lock([LocalNode, OtherNodeB])), - ?assertEqual(ok, rabbit_peer_discovery_k8s:unlock({{LockResourceId, OtherNodeB}, [LocalNode, OtherNodeB]})), - meck:unload(rabbit_nodes). - -lock_local_node_not_discovered(_Config) -> - Expectation = {error, "Local node " ++ atom_to_list(node()) ++ " is not part of discovered nodes [me@host]"}, - ?assertEqual(Expectation, rabbit_peer_discovery_k8s:lock([me@host])). + meck:unload([rabbit_peer_discovery_k8s]). + +seed_node_can_be_explicitly_configured(_Config) -> + meck:new(rabbit_peer_discovery_k8s, [passthrough]), + + application:set_env(rabbit, cluster_formation, + [ + {peer_discovery_backend, rabbit_peer_discovery_k8s}, + {peer_discovery_k8s, [ + {seed_node, "foo@seed-node"} + ]} + ]), + Cases = #{ + 'rabbit@foo-server-0.foo-nodes.default' => {ok, {'foo@seed-node', disc} }, + 'bunny@hop' => {ok, {'foo@seed-node', disc} } + }, + + [begin + meck:expect(rabbit_peer_discovery_k8s, node, fun() -> Nodename end), + ?assertEqual(Result, rabbitmq_peer_discovery_k8s:list_nodes()) + end || Nodename := Result <- Cases ], + + meck:unload([rabbit_peer_discovery_k8s]).