diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 63ebda9a0fb3..099952cbfe2b 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -128,7 +128,7 @@ -define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT -define(TICK_TIMEOUT, 5000). %% the ra server tick time -define(DELETE_TIMEOUT, 5000). --define(ADD_MEMBER_TIMEOUT, 5000). +-define(MEMBER_CHANGE_TIMEOUT, 20_000). -define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096 -define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra @@ -1201,7 +1201,7 @@ add_member(Q, Node) -> add_member(Q, Node, promotable). add_member(Q, Node, Membership) -> - add_member(Q, Node, Membership, ?ADD_MEMBER_TIMEOUT). + add_member(Q, Node, Membership, ?MEMBER_CHANGE_TIMEOUT). add_member(VHost, Name, Node, Timeout) when is_binary(VHost) -> %% NOTE needed to pass mixed cluster tests. @@ -1278,8 +1278,11 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> %% deleting the last member is not allowed {error, last_node}; Members -> - case ra:remove_member(Members, ServerId) of - {ok, _, _Leader} -> + case ra:remove_member(Members, ServerId, ?MEMBER_CHANGE_TIMEOUT) of + Res when element(1, Res) == ok orelse + Res == {error, not_member} -> + %% if not a member we can still proceed with updating the + %% mnesia record and clean up server if still running Fun = fun(Q1) -> update_type_state( Q1, diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 60b2f2f2ccdb..43962bdbf558 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -59,6 +59,7 @@ groups() -> delete_member_queue_not_found, delete_member, delete_member_not_a_member, + delete_member_member_already_deleted, node_removal_is_quorum_critical] ++ memory_tests()}, {cluster_size_3, [], [ @@ -1954,6 +1955,32 @@ delete_member_not_a_member(Config) -> rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server])). +delete_member_member_already_deleted(Config) -> + [Server, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + NServers = length(Servers), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + RaName = ra_name(QQ), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?awaitMatch(NServers, count_online_nodes(Server, <<"/">>, QQ), ?DEFAULT_AWAIT), + ServerId = {RaName, Server}, + ServerId2 = {RaName, Server2}, + %% use are APU directory to simulate situation where the ra:remove_server/2 + %% call timed out but later succeeded + ?assertMatch(ok, + rpc:call(Server2, ra, leave_and_terminate, + [quorum_queues, ServerId, ServerId2])), + + %% idempotent by design + ?assertEqual(ok, + rpc:call(Server, rabbit_quorum_queue, delete_member, + [<<"/">>, QQ, Server2])), + {ok, Q} = rpc:call(Server, rabbit_amqqueue, lookup, [QQ, <<"/">>]), + #{nodes := Nodes} = amqqueue:get_type_state(Q), + ?assertEqual(1, length(Nodes)), + ok. + delete_member_during_node_down(Config) -> [Server, DownServer, Remove] = Servers = rabbit_ct_broker_helpers:get_node_configs( Config, nodename),