Skip to content

Commit

Permalink
Closes #8564
Browse files Browse the repository at this point in the history
rabbit_amqqueue:list* functions: take queue type into account when
determining if a queue is local to a set of nodes.
  • Loading branch information
michaelklishin committed Jun 18, 2023
1 parent 8ad0e53 commit 499a7c7
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
list_local_mirrored_classic_without_synchronised_mirrors_for_cli/0,
list_local_quorum_queues_with_name_matching/1,
list_local_quorum_queues_with_name_matching/2]).
-export([is_local_to_node/2, is_local_to_node_set/2]).
-export([ensure_rabbit_queue_record_is_initialized/1]).
-export([format/1]).
-export([delete_immediately_by_resource/1]).
Expand Down Expand Up @@ -1096,7 +1097,7 @@ list() ->
lists:filter(fun (Q) ->
Pid = amqqueue:get_pid(Q),
St = amqqueue:get_state(Q),
St =/= stopped orelse lists:member(node(Pid), NodesRunning)
St =/= stopped orelse is_local_to_node_set(Pid, NodesRunning)
end, All).

-spec count() -> non_neg_integer().
Expand Down Expand Up @@ -1271,6 +1272,11 @@ is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) ->
is_local_to_node(_QPid, _Node) ->
false.

is_local_to_node_set(QPid, Nodes) when is_list(Nodes) ->
lists:any(fun(Node) -> is_local_to_node(QPid, Node) end, Nodes);
is_local_to_node_set(QPid, OneNode) when is_atom(OneNode) ->
is_local_to_node(QPid, OneNode).

is_in_virtual_host(Q, VHostName) ->
VHostName =:= get_resource_vhost_name(amqqueue:get_name(Q)).

Expand All @@ -1281,7 +1287,7 @@ list(VHostPath) ->
lists:filter(fun (Q) ->
Pid = amqqueue:get_pid(Q),
St = amqqueue:get_state(Q),
St =/= stopped orelse lists:member(node(Pid), NodesRunning)
St =/= stopped orelse is_local_to_node_set(Pid, NodesRunning)
end, All).

-spec list_down(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
Expand All @@ -1299,7 +1305,7 @@ list_down(VHostPath) ->
St = amqqueue:get_state(Q),
amqqueue:get_vhost(Q) =:= VHostPath
andalso
((St =:= stopped andalso not lists:member(node(Pid), NodesRunning))
((St =:= stopped andalso not is_local_to_node_set(Pid, NodesRunning))
orelse
(not sets:is_element(N, Alive)))
end)
Expand Down

0 comments on commit 499a7c7

Please sign in to comment.