From 78b1f67e7503f220de42fbe4523918a3dfd7c7bb Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 2 Nov 2020 09:54:37 -0800 Subject: [PATCH 1/2] Document the Timeout parameter to wait_for_confirms This parameter is in seconds. Also remove superfluous `CALL_TIMEOUT` macro. cc @dumbbell Also see rabbitmq/rabbitmq-server#2490 --- deps/amqp_client/include/amqp_client_internal.hrl | 4 ---- deps/amqp_client/src/amqp_channel.erl | 10 ++++++++++ deps/amqp_client/src/amqp_util.erl | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/deps/amqp_client/include/amqp_client_internal.hrl b/deps/amqp_client/include/amqp_client_internal.hrl index 36fcdacd79f3..960c4a52ff1a 100644 --- a/deps/amqp_client/include/amqp_client_internal.hrl +++ b/deps/amqp_client/include/amqp_client_internal.hrl @@ -26,7 +26,3 @@ {<<"consumer_cancel_notify">>, bool, true}, {<<"connection.blocked">>, bool, true}, {<<"authentication_failure_close">>, bool, true}]). - --define(CALL_TIMEOUT, rabbit_misc:get_env(amqp_client, gen_server_call_timeout, - 60000)). - diff --git a/deps/amqp_client/src/amqp_channel.erl b/deps/amqp_client/src/amqp_channel.erl index 4b5386a05d33..c00e4f61f16b 100644 --- a/deps/amqp_client/src/amqp_channel.erl +++ b/deps/amqp_client/src/amqp_channel.erl @@ -215,6 +215,8 @@ next_publish_seqno(Channel) -> %% @doc Wait until all messages published since the last call have %% been either ack'd or nack'd by the broker. Note, when called on a %% non-Confirm channel, waitForConfirms returns an error. +%% @param Channel: the channel on which to wait. +%% @end wait_for_confirms(Channel) -> wait_for_confirms(Channel, amqp_util:call_timeout()). @@ -226,6 +228,9 @@ wait_for_confirms(Channel) -> %% been either ack'd or nack'd by the broker or the timeout expires. %% Note, when called on a non-Confirm channel, waitForConfirms throws %% an exception. +%% @param Channel: the channel on which to wait. +%% @param Timeout: the wait timeout in seconds. +%% @end wait_for_confirms(Channel, Timeout) -> case gen_server:call(Channel, {wait_for_confirms, Timeout}, amqp_util:call_timeout()) of {error, Reason} -> throw(Reason); @@ -238,6 +243,8 @@ wait_for_confirms(Channel, Timeout) -> %% @doc Behaves the same as wait_for_confirms/1, but if a nack is %% received, the calling process is immediately sent an %% exit(nack_received). +%% @param Channel: the channel on which to wait. +%% @end wait_for_confirms_or_die(Channel) -> wait_for_confirms_or_die(Channel, amqp_util:call_timeout()). @@ -249,6 +256,9 @@ wait_for_confirms_or_die(Channel) -> %% received, the calling process is immediately sent an %% exit(nack_received). If the timeout expires, the calling process is %% sent an exit(timeout). +%% @param Channel: the channel on which to wait. +%% @param Timeout: the wait timeout in seconds. +%% @end wait_for_confirms_or_die(Channel, Timeout) -> case wait_for_confirms(Channel, Timeout) of timeout -> close(Channel, 200, <<"Confirm Timeout">>), diff --git a/deps/amqp_client/src/amqp_util.erl b/deps/amqp_client/src/amqp_util.erl index c44ebeb08d2b..df7ce3066289 100644 --- a/deps/amqp_client/src/amqp_util.erl +++ b/deps/amqp_client/src/amqp_util.erl @@ -9,7 +9,7 @@ call_timeout() -> undefined -> Timeout = rabbit_misc:get_env(amqp_client, gen_server_call_timeout, - ?CALL_TIMEOUT), + 60000), put(gen_server_call_timeout, Timeout), Timeout; Timeout -> From 482907e1fab94ae246d2938b2adcc29828a4ce48 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 3 Nov 2020 09:24:01 -0800 Subject: [PATCH 2/2] No longer use amqp_util:call_timeout/0 for wait_for_confirms. Allow specifying integer dimension via a tuple --- .../include/amqp_client_internal.hrl | 2 ++ deps/amqp_client/src/amqp_channel.erl | 27 ++++++++++++------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/deps/amqp_client/include/amqp_client_internal.hrl b/deps/amqp_client/include/amqp_client_internal.hrl index 960c4a52ff1a..01e099097e82 100644 --- a/deps/amqp_client/include/amqp_client_internal.hrl +++ b/deps/amqp_client/include/amqp_client_internal.hrl @@ -26,3 +26,5 @@ {<<"consumer_cancel_notify">>, bool, true}, {<<"connection.blocked">>, bool, true}, {<<"authentication_failure_close">>, bool, true}]). + +-define(WAIT_FOR_CONFIRMS_TIMEOUT, {60000, millisecond}). diff --git a/deps/amqp_client/src/amqp_channel.erl b/deps/amqp_client/src/amqp_channel.erl index c00e4f61f16b..9e95df4fe325 100644 --- a/deps/amqp_client/src/amqp_channel.erl +++ b/deps/amqp_client/src/amqp_channel.erl @@ -218,12 +218,12 @@ next_publish_seqno(Channel) -> %% @param Channel: the channel on which to wait. %% @end wait_for_confirms(Channel) -> - wait_for_confirms(Channel, amqp_util:call_timeout()). + wait_for_confirms(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT). %% @spec (Channel, Timeout) -> boolean() | 'timeout' %% where %% Channel = pid() -%% Timeout = non_neg_integer() | 'infinity' +%% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity' %% @doc Wait until all messages published since the last call have %% been either ack'd or nack'd by the broker or the timeout expires. %% Note, when called on a non-Confirm channel, waitForConfirms throws @@ -231,11 +231,12 @@ wait_for_confirms(Channel) -> %% @param Channel: the channel on which to wait. %% @param Timeout: the wait timeout in seconds. %% @end +wait_for_confirms(Channel, {Timeout, second}) -> + do_wait_for_confirms(Channel, second_to_millisecond(Timeout)); +wait_for_confirms(Channel, {Timeout, millisecond}) -> + do_wait_for_confirms(Channel, Timeout); wait_for_confirms(Channel, Timeout) -> - case gen_server:call(Channel, {wait_for_confirms, Timeout}, amqp_util:call_timeout()) of - {error, Reason} -> throw(Reason); - Other -> Other - end. + do_wait_for_confirms(Channel, second_to_millisecond(Timeout)). %% @spec (Channel) -> true %% where @@ -246,12 +247,12 @@ wait_for_confirms(Channel, Timeout) -> %% @param Channel: the channel on which to wait. %% @end wait_for_confirms_or_die(Channel) -> - wait_for_confirms_or_die(Channel, amqp_util:call_timeout()). + wait_for_confirms_or_die(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT). %% @spec (Channel, Timeout) -> true %% where %% Channel = pid() -%% Timeout = non_neg_integer() | 'infinity' +%% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity' %% @doc Behaves the same as wait_for_confirms/1, but if a nack is %% received, the calling process is immediately sent an %% exit(nack_received). If the timeout expires, the calling process is @@ -973,6 +974,12 @@ notify_confirm_waiters(State = #state{waiting_set = WSet, State#state{waiting_set = gb_trees:empty(), only_acks_received = true}. +do_wait_for_confirms(Channel, Timeout) when is_integer(Timeout) -> + case gen_server:call(Channel, {wait_for_confirms, Timeout}, amqp_util:call_timeout()) of + {error, Reason} -> throw(Reason); + Other -> Other + end. + handle_wait_for_confirms(_From, _Timeout, State = #state{next_pub_seqno = 0}) -> {reply, {error, not_in_confirm_mode}, State}; handle_wait_for_confirms(From, Timeout, @@ -983,7 +990,7 @@ handle_wait_for_confirms(From, Timeout, false -> TRef = case Timeout of infinity -> undefined; _ -> erlang:send_after( - Timeout * 1000, self(), + Timeout, self(), {confirm_timeout, From}) end, {noreply, @@ -999,3 +1006,5 @@ call_to_consumer(Method, Args, DeliveryCtx, #state{consumer = Consumer}) -> safe_cancel_timer(undefined) -> ok; safe_cancel_timer(TRef) -> erlang:cancel_timer(TRef). +second_to_millisecond(Timeout) -> + Timeout * 1000.