-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make rabbitmqctl delete_queue more robust #9324
Make rabbitmqctl delete_queue more robust #9324
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//deps/rabbit:dialyze
//deps/rabbit_common:dialyze
//deps/rabbit_common:xref
//deps/rabbitmq_cli:check_formatted
all repeatedly fail.
In the latter case you need to run mix format
from deps/rabbitmq_cli
and that's it.
Curiously the function is there and it is exported. |
updates build metadata but that in turn has detected a cyclical dependency:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rabbit_amqqueue:pid_or_crashed/2
is used from rabbit_common
. rabbit_common
must not depend on rabbit
functions or modules.
Looks like await_new_pid/3
need a new home outside of rabbit_common
.
@@ -1631,3 +1640,11 @@ maps_put_falsy(K, false, M) -> | |||
maps:put(K, false, M); | |||
maps_put_falsy(_K, _V, M) -> | |||
M. | |||
|
|||
-spec remote_sup_child(node(), supervisor:sup_ref()) -> rabbit_types:ok_or_error2(supervisor:child(), no_child | no_sup). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These types are not exported, at least not in OTP 26.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This required a whole bunch of changes to almost get it to pass xref, dialyzer and compilation under Bazel:
mix format
indeps/rabbitmq_cli
- Moving
await_new_pid/3
and friends torabbit_amqqueue
- Copying
sup_ref/0
andchild/0
types fromsupervisor
torabbit_misc
- Updating
crashing_queue_SUITE
accordingly
and now Dialyzer detects that kill_queue/3
exits (produces a non-local return) while kill_queue_hard/3
matches on a returned value of crashed
.
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index c55e631bd0..96d77cbecc 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -54,6 +54,7 @@
delete_crashed/2,
delete_crashed_internal/2]).
-export([delete_with/4, delete_with/6]).
+-export([await_new_pid/3, await_state/3, await_state/4]).
-export([pid_of/1, pid_of/2]).
-export([pid_or_crashed/2]).
-export([mark_local_durable_queues_stopped/1]).
@@ -88,6 +89,11 @@
-define(IS_CLASSIC(QPid), is_pid(QPid)).
-define(IS_QUORUM(QPid), is_tuple(QPid)).
+
+-define(DEFAULT_AWAIT_STATE_TIMEOUT, 30000).
+-define(AWAIT_NEW_PID_DELAY_INTERVAL, 10).
+-define(AWAIT_STATE_DELAY_INTERVAL, 100).
+-define(AWAIT_STATE_DELAY_TIME_DELTA, 100).
%%----------------------------------------------------------------------------
-export_type([name/0, qmsg/0, absent_reason/0]).
@@ -1629,10 +1635,10 @@ delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) whe
fun (not_found) ->
{ok, 0};
({absent, Q, crashed}) ->
- _ = rabbit_classic_queue:delete_crashed(Q, Username),
+ _ = delete_crashed(Q, Username),
{ok, 0};
({absent, Q, stopped}) ->
- _ = rabbit_classic_queue:delete_crashed(Q, Username),
+ _ = delete_crashed(Q, Username),
{ok, 0};
({absent, Q, Reason}) ->
absent(Q, Reason)
@@ -2132,12 +2138,12 @@ kill_queue(Node, QRes = #resource{kind = queue}) ->
kill_queue(Node, QRes = #resource{kind = queue}, Reason = shutdown) ->
Pid1 = pid_or_crashed(Node, QRes),
exit(Pid1, Reason),
- rabbit_control_misc:await_state(Node, QRes, stopped),
+ await_state(Node, QRes, stopped),
stopped;
kill_queue(Node, QRes = #resource{kind = queue}, Reason) ->
Pid1 = pid_or_crashed(Node, QRes),
exit(Pid1, Reason),
- rabbit_control_misc:await_new_pid(Node, QRes, Pid1).
+ await_new_pid(Node, QRes, Pid1).
-spec pid_or_crashed(node(), name()) -> pid() | crashed | rabbit_types:error(term()).
pid_or_crashed(Node, QRes = #resource{virtual_host = VHost, kind = queue}) ->
@@ -2160,3 +2166,41 @@ pid_or_crashed(Node, QRes = #resource{virtual_host = VHost, kind = queue}) ->
Error = {error, _} -> Error;
Reason -> {error, Reason}
end.
+
+-spec await_new_pid(node(), rabbit_amqqueue:name(), pid()) -> pid().
+await_new_pid(Node, QRes = #resource{kind = queue}, OldPid) ->
+ case rabbit_amqqueue:pid_or_crashed(Node, QRes) of
+ OldPid -> timer:sleep(?AWAIT_NEW_PID_DELAY_INTERVAL),
+ await_new_pid(Node, QRes, OldPid);
+ New -> New
+ end.
+
+-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom()) -> 'ok'.
+-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom(), integer()) -> 'ok'.
+await_state(Node, QName, State) when is_binary(QName) ->
+ QRes = rabbit_misc:r(<<"/">>, queue, QName),
+ await_state(Node, QRes, State);
+await_state(Node, QRes = #resource{kind = queue}, State) ->
+ await_state(Node, QRes, State, ?DEFAULT_AWAIT_STATE_TIMEOUT).
+
+await_state(Node, QName, State, Time) when is_binary(QName) ->
+ QRes = rabbit_misc:r(<<"/">>, queue, QName),
+ await_state(Node, QRes, State, Time);
+await_state(Node, QRes = #resource{kind = queue}, State, Time) ->
+ case state(Node, QRes) of
+ State ->
+ ok;
+ Other ->
+ case Time of
+ 0 -> exit({timeout_awaiting_state, State, Other});
+ _ -> timer:sleep(?AWAIT_STATE_DELAY_INTERVAL),
+ await_state(Node, QRes, State, Time - ?AWAIT_STATE_DELAY_TIME_DELTA)
+ end
+ end.
+
+state(Node, QRes = #resource{virtual_host = VHost}) ->
+ Infos = rpc:call(Node, rabbit_amqqueue, info_all, [VHost, [name, state]]),
+ case Infos of
+ [] -> undefined;
+ [[{name, QRes}, {state, State}]] -> State
+ end.
\ No newline at end of file
diff --git a/deps/rabbit/test/crashing_queues_SUITE.erl b/deps/rabbit/test/crashing_queues_SUITE.erl
index 4bf4bf3397..9ec5727d90 100644
--- a/deps/rabbit/test/crashing_queues_SUITE.erl
+++ b/deps/rabbit/test/crashing_queues_SUITE.erl
@@ -113,22 +113,22 @@ give_up_after_repeated_crashes(Config) ->
amqp_channel:call(ChA, #'confirm.select'{}),
amqp_channel:call(ChA, #'queue.declare'{queue = QName,
durable = true}),
- rabbit_control_misc:await_state(A, QName, running),
+ rabbit_amqqueue:await_state(A, QName, running),
publish(ChA, QName, durable),
QRes = rabbit_misc:r(<<"/">>, queue, QName),
rabbit_amqqueue:kill_queue_hard(A, QRes),
{'EXIT', _} = (catch amqp_channel:call(
ChA, #'queue.declare'{queue = QName,
durable = true})),
- rabbit_control_misc:await_state(A, QName, crashed),
+ rabbit_amqqueue:await_state(A, QName, crashed),
amqp_channel:call(ChB, #'queue.delete'{queue = QName}),
amqp_channel:call(ChB, #'queue.declare'{queue = QName,
durable = true}),
- rabbit_control_misc:await_state(A, QName, running),
+ rabbit_amqqueue:await_state(A, QName, running),
%% Since it's convenient, also test absent queue status here.
rabbit_ct_broker_helpers:stop_node(Config, B),
- rabbit_control_misc:await_state(A, QName, down),
+ rabbit_amqqueue:await_state(A, QName, down),
ok.
diff --git a/deps/rabbit_common/src/rabbit_control_misc.erl b/deps/rabbit_common/src/rabbit_control_misc.erl
index 9df4fd99be..6b7b09c64d 100644
--- a/deps/rabbit_common/src/rabbit_control_misc.erl
+++ b/deps/rabbit_common/src/rabbit_control_misc.erl
@@ -7,12 +7,9 @@
-module(rabbit_control_misc).
--include_lib("rabbit_common/include/resource.hrl").
-
-export([emitting_map/4, emitting_map/5, emitting_map_with_exit_handler/4,
emitting_map_with_exit_handler/5, wait_for_info_messages/6,
spawn_emitter_caller/7, await_emitters_termination/1,
- await_new_pid/3, await_state/3, await_state/4,
print_cmd_result/2]).
-spec emitting_map(pid(), reference(), fun(), list()) -> 'ok'.
@@ -28,17 +25,9 @@
InitialAcc :: term(), Acc :: term(), OK :: {ok, Acc}, Err :: {error, term()}.
-spec spawn_emitter_caller(node(), module(), atom(), [term()], reference(), pid(), timeout()) -> 'ok'.
-spec await_emitters_termination([pid()]) -> 'ok'.
--spec await_new_pid(node(), rabbit_amqqueue:name(), pid()) -> pid().
--spec await_state(node(), rabbit_amqqueue:name() | binary(), atom()) -> 'ok'.
--spec await_state(node(), rabbit_amqqueue:name() | binary(), atom(), integer()) -> 'ok'.
-spec print_cmd_result(atom(), term()) -> 'ok'.
--define(DEFAULT_AWAIT_STATE_TIMEOUT, 30000).
--define(AWAIT_NEW_PID_DELAY_INTERVAL, 10).
--define(AWAIT_STATE_DELAY_INTERVAL, 100).
--define(AWAIT_STATE_DELAY_TIME_DELTA, 100).
-
emitting_map(AggregatorPid, Ref, Fun, List) ->
emitting_map(AggregatorPid, Ref, Fun, List, continue),
AggregatorPid ! {Ref, finished},
@@ -188,38 +177,3 @@ notify_if_timeout(Pid, Ref, Timeout) ->
print_cmd_result(authenticate_user, _Result) -> io:format("Success~n");
print_cmd_result(join_cluster, already_member) -> io:format("The node is already a member of this cluster~n").
-
-await_new_pid(Node, QRes = #resource{kind = queue}, OldPid) ->
- case rabbit_amqqueue:pid_or_crashed(Node, QRes) of
- OldPid -> timer:sleep(?AWAIT_NEW_PID_DELAY_INTERVAL),
- await_new_pid(Node, QRes, OldPid);
- New -> New
- end.
-
-await_state(Node, QName, State) when is_binary(QName) ->
- QRes = rabbit_misc:r(<<"/">>, queue, QName),
- await_state(Node, QRes, State);
-await_state(Node, QRes = #resource{kind = queue}, State) ->
- await_state(Node, QRes, State, ?DEFAULT_AWAIT_STATE_TIMEOUT).
-
-await_state(Node, QName, State, Time) when is_binary(QName) ->
- QRes = rabbit_misc:r(<<"/">>, queue, QName),
- await_state(Node, QRes, State, Time);
-await_state(Node, QRes = #resource{kind = queue}, State, Time) ->
- case state(Node, QRes) of
- State ->
- ok;
- Other ->
- case Time of
- 0 -> exit({timeout_awaiting_state, State, Other});
- _ -> timer:sleep(?AWAIT_STATE_DELAY_INTERVAL),
- await_state(Node, QRes, State, Time - ?AWAIT_STATE_DELAY_TIME_DELTA)
- end
- end.
-
-state(Node, QRes = #resource{virtual_host = VHost}) ->
- Infos = rpc:call(Node, rabbit_amqqueue, info_all, [VHost, [name, state]]),
- case Infos of
- [] -> undefined;
- [[{name, QRes}, {state, State}]] -> State
- end.
diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl
index a1908c5077..f63d03b70a 100644
--- a/deps/rabbit_common/src/rabbit_misc.erl
+++ b/deps/rabbit_common/src/rabbit_misc.erl
@@ -1641,7 +1641,15 @@ maps_put_falsy(K, false, M) ->
maps_put_falsy(_K, _V, M) ->
M.
--spec remote_sup_child(node(), supervisor:sup_ref()) -> rabbit_types:ok_or_error2(supervisor:child(), no_child | no_sup).
+-type sup_ref() :: (Name :: atom())
+ | {Name :: atom(), Node :: node()}
+ | {'global', Name :: term()}
+ | {'via', Module :: module(), Name :: any()}
+ | pid().
+
+-type child() :: 'undefined' | pid().
+
+-spec remote_sup_child(node(), sup_ref()) -> rabbit_types:ok_or_error2(child(), no_child | no_sup).
remote_sup_child(Node, Sup) ->
case rpc:call(Node, supervisor, which_children, [Sup]) of
[{_, Child, _, _}] -> {ok, Child};
diff --git a/deps/rabbitmq_cli/lib/rabbitmqctl.ex b/deps/rabbitmq_cli/lib/rabbitmqctl.ex
index 42ce92c567..fc9fd2321f 100644
--- a/deps/rabbitmq_cli/lib/rabbitmqctl.ex
+++ b/deps/rabbitmq_cli/lib/rabbitmqctl.ex
@@ -636,7 +636,7 @@ defmodule RabbitMQCtl do
## {:fun, fun} - run a custom function to enable distribution.
## custom mode is usefult for commands which should have specific node name.
## Runs code if distribution is successful, or not needed.
- @spec maybe_with_distribution(module(), options(), (() -> command_result())) :: command_result()
+ @spec maybe_with_distribution(module(), options(), (-> command_result())) :: command_result()
defp maybe_with_distribution(command, options, code) do
try do
maybe_with_distribution_without_catch(command, options, code) should get you to the stage described in the above comment. |
ah, my bad, I made sure relevant CT tests pass, will do all the bazel checks locally next time. Regarding your suggested diff and changes above (I don't agree with most of these):
So I can't use your suggested diff and attached zip. It only focuses on clearing bazel issues, but isn't solving the main problem of developers blindly piling up functions in I'll push totally separate updates shortly. Thanks for review so far! 👍 |
hmm, unrelated |
|
I'm calling these on the CLI tests, so need them on the running node to execute the elixir tests against (hence in put them rabbit / initially in rabbit_common). Also didnt want to rewrite this same logic thats already implemented in Btw, I cant spot the root cause of latest bazel failures. |
The force push above was a rebase. |
@Ayanda-D I think the output of Bazel may be confusing. Right now |
@Ayanda-D if we manually fix |
yeah those |
Re-running Why this is the case I'm not 100% sure, but it may have been a case of having run gazelle after make has left some new files in place. If gazelle detects an existing In fact, checking out 77dddb7 and running gazelle produces exactly the same diff as 4db0d07 |
folks i'm not sure how |
…ue name arg types it accepts
…i and update spec
…fix bazel issues raised in MK's review
ok, that worked! 👍 |
Make rabbitmqctl delete_queue more robust (backport #9324)
Proposed Changes
This fixes/makes
rabbitmqctl delete_queue
more robust, and use the same code path as queuedeletions from the management-ui (which mimics and uses the AMQP
queue.delete
operation). Thelatter deletions are more thorough and handle different queue states on
rabbit_queue_type:delete/4
execution, e.g. when queue has crashed or stopped.
This follows an incident where deletions of crashed classic queues using
rabbitmqctl delete_queue
was consistently failing, until attempt from the management-ui succeeded (rare case though).
Part of this change:
rabbit_amqqueue:delete_with
API shared by the channel and clikill_queue
apis from crashing_queues test suite reusable, using them in the cli test in this prrabbit_control_misc
also from crashing_queues suite (I couldn't think of a better place to add these 👀)cc: @adamncasey
Types of Changes
What types of changes does your code introduce to this project?
Put an
x
in the boxes that applyChecklist
Put an
x
in the boxes that apply.You can also fill these out after creating the PR.
If you're unsure about any of them, don't hesitate to ask on the mailing list.
We're here to help!
This is simply a reminder of what we are going to look for before merging your code.
CONTRIBUTING.md
documentFurther Comments
If this is a relatively large or complex change, kick off the discussion by explaining why you chose the solution you did and what alternatives you considered, etc.