Skip to content

Commit

Permalink
Queue federation: improve logging
Browse files Browse the repository at this point in the history
around federated queue link state transitions (pause/unpause).

References #8297.
  • Loading branch information
michaelklishin committed May 24, 2023
1 parent 1c727a4 commit 958043c
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
6 changes: 5 additions & 1 deletion deps/rabbitmq_federation/src/rabbit_federation_link_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,13 @@ connection_error(remote_start, {{shutdown, {server_initiated_close, Code, Messag
connection_error(remote_start, E, Upstream, UParams, XorQName, State) ->
rabbit_federation_status:report(
Upstream, UParams, XorQName, clean_reason(E)),
Reason = case E of
{error, Value} -> Value;
Other -> Other
end,
log_warning(XorQName, "did not connect to ~ts. Reason: ~tp",
[rabbit_federation_upstream:params_to_string(UParams),
E]),
Reason]),
{stop, {shutdown, restart}, State};

connection_error(remote, E, Upstream, UParams, XorQName, State) ->
Expand Down
14 changes: 11 additions & 3 deletions deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ handle_cast(pause, State = #state{run = false}) ->
handle_cast(pause, State = #not_started{}) ->
{noreply, State#not_started{run = false}};

handle_cast(pause, State = #state{ch = Ch, upstream = Upstream}) ->
handle_cast(pause, State = #state{ch = Ch, upstream = Upstream = #upstream{
name = UpName, queue_name = QName
}}) ->
rabbit_log_federation:debug("Federation link of ~s (upstream: '~s'): asked to pause",
[QName, UpName]),
cancel(Ch, Upstream),
{noreply, State#state{run = false}};

Expand Down Expand Up @@ -305,18 +309,22 @@ visit_match(_ ,_) ->
consumer_tag(#upstream{consumer_tag = ConsumerTag}) ->
ConsumerTag.

consume(Ch, Upstream, UQueue) ->
consume(Ch, Upstream = #upstream{name = UpName}, UQueue) ->
ConsumerTag = consumer_tag(Upstream),
NoAck = Upstream#upstream.ack_mode =:= 'no-ack',
rabbit_log_federation:debug("Federation link of ~ts: will consume from the upstream '~ts'",
[rabbit_misc:rs(amqqueue:get_name(UQueue)), UpName]),
amqp_channel:cast(
Ch, #'basic.consume'{queue = name(UQueue),
no_ack = NoAck,
nowait = true,
consumer_tag = ConsumerTag,
arguments = [{<<"x-priority">>, long, -1}]}).

cancel(Ch, Upstream) ->
cancel(Ch, Upstream = #upstream{name = UpName, queue_name = QName}) ->
ConsumerTag = consumer_tag(Upstream),
rabbit_log_federation:debug("Federation queue '~ts' link: will cancel consumer '~ts' on upstream '~ts'",
[QName, ConsumerTag, UpName]),
amqp_channel:cast(Ch, #'basic.cancel'{nowait = true,
consumer_tag = ConsumerTag}).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ params_table(SafeURI, XorQ) ->

params_to_string(#upstream_params{safe_uri = SafeURI,
x_or_q = XorQ}) ->
print("~ts on ~ts", [rabbit_misc:rs(r(XorQ)), SafeURI]).
print("~ts on '~ts'", [rabbit_misc:rs(r(XorQ)), SafeURI]).

remove_credentials(URI) ->
list_to_binary(amqp_uri:remove_credentials(binary_to_list(URI))).
Expand Down

0 comments on commit 958043c

Please sign in to comment.