From 2539871e3cd4e5de35965e610aeb05d3d98d12bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Thu, 13 Oct 2016 18:01:36 +0200 Subject: [PATCH] Make Federation connections (links) specify a name Fixes #39 --- src/rabbit_federation_link_util.erl | 37 ++++++++++++++++++++++------- test/unit_inbroker_SUITE.erl | 26 +++++++++++++++++++- 2 files changed, 54 insertions(+), 9 deletions(-) diff --git a/src/rabbit_federation_link_util.erl b/src/rabbit_federation_link_util.erl index 817152c..1061fef 100644 --- a/src/rabbit_federation_link_util.erl +++ b/src/rabbit_federation_link_util.erl @@ -23,7 +23,7 @@ -export([start_conn_ch/5, disposable_channel_call/2, disposable_channel_call/3, disposable_connection_call/3, ensure_connection_closed/1, log_terminate/4, unacked_new/0, ack/3, nack/3, forward/9, - handle_down/6]). + handle_down/6, get_connection_name/2]). %% temp -export([connection_error/6]). @@ -36,7 +36,7 @@ start_conn_ch(Fun, Upstream, UParams, XorQName = #resource{virtual_host = DownVHost}, State) -> - case open_monitor(#amqp_params_direct{virtual_host = DownVHost}) of + case open_monitor(#amqp_params_direct{virtual_host = DownVHost}, get_connection_name(Upstream, UParams)) of {ok, DConn, DCh} -> case Upstream#upstream.ack_mode of 'on-confirm' -> @@ -46,7 +46,7 @@ start_conn_ch(Fun, Upstream, UParams, _ -> ok end, - case open_monitor(UParams#upstream_params.params) of + case open_monitor(UParams#upstream_params.params, get_connection_name(Upstream, UParams)) of {ok, Conn, Ch} -> %% Don't trap exits until we have established %% connections so that if we try to delete @@ -82,15 +82,36 @@ start_conn_ch(Fun, Upstream, UParams, Upstream, UParams, XorQName, State) end. -open_monitor(Params) -> - case open(Params) of +get_connection_name(#upstream{name = UpstreamName}, + #upstream_params{x_or_q = Resource}) when is_record(Resource, exchange)-> + Policy = Resource#exchange.policy, + PolicyName = proplists:get_value(name, Policy), + connection_name(UpstreamName, PolicyName); + +get_connection_name(#upstream{name = UpstreamName}, + #upstream_params{x_or_q = Resource}) when is_record(Resource, amqqueue)-> + Policy = Resource#amqqueue.policy, + PolicyName = proplists:get_value(name, Policy), + connection_name(UpstreamName, PolicyName); + +get_connection_name(_, _) -> + connection_name(undefined, undefined). + +connection_name(Upstream, Policy) when is_binary(Upstream), is_binary(Policy) -> + <<<<"Federation ">>/binary, Upstream/binary, <<" ">>/binary, Policy/binary>>; + +connection_name(_, _) -> + <<"Federation">>. + +open_monitor(Params, Name) -> + case open(Params, Name) of {ok, Conn, Ch} -> erlang:monitor(process, Ch), {ok, Conn, Ch}; E -> E end. -open(Params) -> - case amqp_connection:start(Params) of +open(Params, Source) -> + case amqp_connection:start(Params, Source) of {ok, Conn} -> case amqp_connection:open_channel(Conn) of {ok, Ch} -> {ok, Conn, Ch}; E -> catch amqp_connection:close(Conn), @@ -273,7 +294,7 @@ disposable_channel_call(Conn, Method, ErrFun) -> end. disposable_connection_call(Params, Method, ErrFun) -> - case open(Params) of + case open(Params, undefined) of {ok, Conn, Ch} -> try amqp_channel:call(Ch, Method) diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl index e172f4f..82d1f79 100644 --- a/test/unit_inbroker_SUITE.erl +++ b/test/unit_inbroker_SUITE.erl @@ -36,7 +36,8 @@ groups() -> {non_parallel_tests, [], [ serialisation, scratch_space, - remove_credentials + remove_credentials, + get_connection_name ]} ]. @@ -139,6 +140,29 @@ remove_credentials(Config) -> Test(<<"amqps://">>, <<"localhost:5672/%2f">>), ok. +get_connection_name(_Config) -> + <<"Federation my.upstream my.federation.policy">> = rabbit_federation_link_util:get_connection_name( + #upstream{name = <<"my.upstream">>}, + #upstream_params{x_or_q = #amqqueue{policy = [{name, <<"my.federation.policy">>}]}} + ), + <<"Federation my.upstream my.federation.policy">> = rabbit_federation_link_util:get_connection_name( + #upstream{name = <<"my.upstream">>}, + #upstream_params{x_or_q = #exchange{policy = [{name, <<"my.federation.policy">>}]}} + ), + <<"Federation">> = rabbit_federation_link_util:get_connection_name( + #upstream{}, + #upstream_params{x_or_q = #amqqueue{policy = []}} + ), + <<"Federation">> = rabbit_federation_link_util:get_connection_name( + #upstream{}, + #upstream_params{x_or_q = #exchange{policy = []}} + ), + <<"Federation">> = rabbit_federation_link_util:get_connection_name( + whatever, + whatever + ), + ok. + with_exchanges(Fun) -> rabbit_exchange:declare(r(?US_NAME), fanout, false, false, false, []), X = rabbit_exchange:declare(r(?DS_NAME), fanout, false, false, false, []),