Skip to content

Commit

Permalink
Merge pull request #138 from rabbitmq/lrb-confirm-timeout-is-seconds
Browse files Browse the repository at this point in the history
Document the Timeout parameter to wait_for_confirms
  • Loading branch information
michaelklishin authored Nov 6, 2020
2 parents 76cb60f + 482907e commit da95a11
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 13 deletions.
4 changes: 1 addition & 3 deletions deps/amqp_client/include/amqp_client_internal.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,4 @@
{<<"connection.blocked">>, bool, true},
{<<"authentication_failure_close">>, bool, true}]).

-define(CALL_TIMEOUT, rabbit_misc:get_env(amqp_client, gen_server_call_timeout,
60000)).

-define(WAIT_FOR_CONFIRMS_TIMEOUT, {60000, millisecond}).
37 changes: 28 additions & 9 deletions deps/amqp_client/src/amqp_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -215,40 +215,51 @@ next_publish_seqno(Channel) ->
%% @doc Wait until all messages published since the last call have
%% been either ack'd or nack'd by the broker. Note, when called on a
%% non-Confirm channel, waitForConfirms returns an error.
%% @param Channel: the channel on which to wait.
%% @end
wait_for_confirms(Channel) ->
wait_for_confirms(Channel, amqp_util:call_timeout()).
wait_for_confirms(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT).

%% @spec (Channel, Timeout) -> boolean() | 'timeout'
%% where
%% Channel = pid()
%% Timeout = non_neg_integer() | 'infinity'
%% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity'
%% @doc Wait until all messages published since the last call have
%% been either ack'd or nack'd by the broker or the timeout expires.
%% Note, when called on a non-Confirm channel, waitForConfirms throws
%% an exception.
%% @param Channel: the channel on which to wait.
%% @param Timeout: the wait timeout in seconds.
%% @end
wait_for_confirms(Channel, {Timeout, second}) ->
do_wait_for_confirms(Channel, second_to_millisecond(Timeout));
wait_for_confirms(Channel, {Timeout, millisecond}) ->
do_wait_for_confirms(Channel, Timeout);
wait_for_confirms(Channel, Timeout) ->
case gen_server:call(Channel, {wait_for_confirms, Timeout}, amqp_util:call_timeout()) of
{error, Reason} -> throw(Reason);
Other -> Other
end.
do_wait_for_confirms(Channel, second_to_millisecond(Timeout)).

%% @spec (Channel) -> true
%% where
%% Channel = pid()
%% @doc Behaves the same as wait_for_confirms/1, but if a nack is
%% received, the calling process is immediately sent an
%% exit(nack_received).
%% @param Channel: the channel on which to wait.
%% @end
wait_for_confirms_or_die(Channel) ->
wait_for_confirms_or_die(Channel, amqp_util:call_timeout()).
wait_for_confirms_or_die(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT).

%% @spec (Channel, Timeout) -> true
%% where
%% Channel = pid()
%% Timeout = non_neg_integer() | 'infinity'
%% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity'
%% @doc Behaves the same as wait_for_confirms/1, but if a nack is
%% received, the calling process is immediately sent an
%% exit(nack_received). If the timeout expires, the calling process is
%% sent an exit(timeout).
%% @param Channel: the channel on which to wait.
%% @param Timeout: the wait timeout in seconds.
%% @end
wait_for_confirms_or_die(Channel, Timeout) ->
case wait_for_confirms(Channel, Timeout) of
timeout -> close(Channel, 200, <<"Confirm Timeout">>),
Expand Down Expand Up @@ -963,6 +974,12 @@ notify_confirm_waiters(State = #state{waiting_set = WSet,
State#state{waiting_set = gb_trees:empty(),
only_acks_received = true}.

do_wait_for_confirms(Channel, Timeout) when is_integer(Timeout) ->
case gen_server:call(Channel, {wait_for_confirms, Timeout}, amqp_util:call_timeout()) of
{error, Reason} -> throw(Reason);
Other -> Other
end.

handle_wait_for_confirms(_From, _Timeout, State = #state{next_pub_seqno = 0}) ->
{reply, {error, not_in_confirm_mode}, State};
handle_wait_for_confirms(From, Timeout,
Expand All @@ -973,7 +990,7 @@ handle_wait_for_confirms(From, Timeout,
false -> TRef = case Timeout of
infinity -> undefined;
_ -> erlang:send_after(
Timeout * 1000, self(),
Timeout, self(),
{confirm_timeout, From})
end,
{noreply,
Expand All @@ -989,3 +1006,5 @@ call_to_consumer(Method, Args, DeliveryCtx, #state{consumer = Consumer}) ->
safe_cancel_timer(undefined) -> ok;
safe_cancel_timer(TRef) -> erlang:cancel_timer(TRef).

second_to_millisecond(Timeout) ->
Timeout * 1000.
2 changes: 1 addition & 1 deletion deps/amqp_client/src/amqp_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ call_timeout() ->
undefined ->
Timeout = rabbit_misc:get_env(amqp_client,
gen_server_call_timeout,
?CALL_TIMEOUT),
60000),
put(gen_server_call_timeout, Timeout),
Timeout;
Timeout ->
Expand Down

0 comments on commit da95a11

Please sign in to comment.