Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream coordinator: fixes to automatic membership changes. #10331

Merged
merged 13 commits into from
Jan 26, 2024
Merged
8 changes: 7 additions & 1 deletion deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,9 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "message_containers_SUITE",
size = "medium",
additional_beam = [
":test_queue_utils_beam",
],
)

rabbitmq_integration_suite(
Expand Down Expand Up @@ -675,6 +678,9 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "queue_type_SUITE",
size = "medium",
additional_beam = [
":test_queue_utils_beam",
],
)

rabbitmq_integration_suite(
Expand Down Expand Up @@ -829,7 +835,7 @@ rabbitmq_integration_suite(
additional_beam = [
":test_queue_utils_beam",
],
shard_count = 21,
shard_count = 22,
deps = [
"@proper//:erlang_app",
],
Expand Down
12 changes: 8 additions & 4 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
-export([has_synchronised_mirrors_online/1, is_match/2, is_in_virtual_host/2]).
-export([is_replicated/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, list_local_stream_queues/0,
-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0,
list_local_stream_queues/0, list_stream_queues_on/1,
list_local_mirrored_classic_queues/0, list_local_mirrored_classic_names/0,
list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1,
list_local_mirrored_classic_without_synchronised_mirrors/0,
Expand Down Expand Up @@ -1220,9 +1221,12 @@ list_local_quorum_queues() ->

-spec list_local_stream_queues() -> [amqqueue:amqqueue()].
list_local_stream_queues() ->
[ Q || Q <- list_by_type(stream),
amqqueue:get_state(Q) =/= crashed,
lists:member(node(), get_quorum_nodes(Q))].
list_stream_queues_on(node()).

-spec list_stream_queues_on(node()) -> [amqqueue:amqqueue()].
list_stream_queues_on(Node) when is_atom(Node) ->
[Q || Q <- list_by_type(rabbit_stream_queue),
lists:member(Node, get_quorum_nodes(Q))].

-spec list_local_leaders() -> [amqqueue:amqqueue()].
list_local_leaders() ->
Expand Down
5 changes: 1 addition & 4 deletions deps/rabbit/src/rabbit_db_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,7 @@ members_using_mnesia() ->
rabbit_mnesia:members().

members_using_khepri() ->
case rabbit_khepri:locally_known_nodes() of
[] -> [node()];
Members -> Members
end.
rabbit_khepri:locally_known_nodes().

-spec disc_members() -> Members when
Members :: [node()].
Expand Down
30 changes: 29 additions & 1 deletion deps/rabbit/src/rabbit_queue_type_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
check_auto_delete/1,
check_exclusive/1,
check_non_durable/1,
run_checks/2]).
run_checks/2,
erpc_call/5]).

-include_lib("rabbit_common/include/rabbit.hrl").
-include("amqqueue.hrl").
Expand Down Expand Up @@ -70,3 +71,30 @@ run_checks([C | Checks], Q) ->
Err ->
Err
end.

-spec erpc_call(node(), module(), atom(), list(), non_neg_integer()) ->
term() | {error, term()}.
erpc_call(Node, M, F, A, _Timeout)
when Node =:= node() ->
%% Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned:
%% https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121
try erpc:call(Node, M, F, A, infinity) of
Result ->
Result
catch
error:Err ->
{error, Err}
end;
erpc_call(Node, M, F, A, Timeout) ->
case lists:member(Node, nodes()) of
true ->
try erpc:call(Node, M, F, A, Timeout) of
Result ->
Result
catch
error:Err ->
{error, Err}
end;
false ->
{error, noconnection}
end.
40 changes: 9 additions & 31 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@
force_all_queues_shrink_member_to_current_member/0]).

-import(rabbit_queue_type_util, [args_policy_lookup/3,
qname_to_internal_name/1]).
qname_to_internal_name/1,
erpc_call/5]).

-include_lib("stdlib/include/qlc.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
Expand Down Expand Up @@ -505,6 +506,7 @@ handle_tick(QName,
spawn(
fun() ->
try
{ok, Q} = rabbit_amqqueue:lookup(QName),
Reductions = reductions(Name),
rabbit_core_metrics:queue_stats(QName, NumReadyMsgs,
NumCheckedOut, NumMessages,
Expand Down Expand Up @@ -537,18 +539,19 @@ handle_tick(QName,
{single_active_consumer_tag, SacTag},
{single_active_consumer_pid, SacPid},
{leader, node()}
| infos(QName, Keys)],
| info(Q, Keys)],
rabbit_core_metrics:queue_stats(QName, Infos),
ok = repair_leader_record(QName, Self),
ok = repair_leader_record(Q, Self),
ExpectedNodes = rabbit_nodes:list_members(),
case Nodes -- ExpectedNodes of
[] ->
ok;
Stale when length(ExpectedNodes) > 0 ->
%% rabbit_nodes:list_members/0 returns [] when there
%% is an error so we need to handle that case
rabbit_log:debug("~ts: stale nodes detected. Purging ~w",
[rabbit_misc:rs(QName), Stale]),
%% pipeline purge command
{ok, Q} = rabbit_amqqueue:lookup(QName),
ok = ra:pipeline_command(amqqueue:get_pid(Q),
rabbit_fifo:make_purge_nodes(Stale)),

Expand All @@ -564,14 +567,14 @@ handle_tick(QName,
end
end).

repair_leader_record(QName, Self) ->
{ok, Q} = rabbit_amqqueue:lookup(QName),
repair_leader_record(Q, Self) ->
Node = node(),
case amqqueue:get_pid(Q) of
{_, Node} ->
%% it's ok - we don't need to do anything
ok;
_ ->
QName = amqqueue:get_name(Q),
rabbit_log:debug("~ts: repairing leader record",
[rabbit_misc:rs(QName)]),
{_, Name} = erlang:process_info(Self, registered_name),
Expand Down Expand Up @@ -1842,31 +1845,6 @@ notify_decorators(QName, F, A) ->
ok
end.

erpc_call(Node, M, F, A, _Timeout)
when Node =:= node() ->
%% Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned:
%% https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121
try erpc:call(Node, M, F, A, infinity) of
Result ->
Result
catch
error:Err ->
{error, Err}
end;
erpc_call(Node, M, F, A, Timeout) ->
case lists:member(Node, nodes()) of
true ->
try erpc:call(Node, M, F, A, Timeout) of
Result ->
Result
catch
error:Err ->
{error, Err}
end;
false ->
{error, noconnection}
end.

is_stateful() -> true.

force_shrink_member_to_current_member(VHost, Name) ->
Expand Down
Loading
Loading