Skip to content

Commit

Permalink
Merge pull request #9777 from rabbitmq/mergify/bp/v3.12.x/pr-9767
Browse files Browse the repository at this point in the history
AMQP 1.0 Erlang client: integrate credentials_obfuscation (backport #9767)
  • Loading branch information
michaelklishin authored Oct 25, 2023
2 parents 5e18d89 + e3facbd commit 152fd1f
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 15 deletions.
5 changes: 4 additions & 1 deletion deps/amqp10_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ rabbitmq_app(
],
license_files = [":license_files"],
priv = [":priv"],
deps = ["//deps/amqp10_common:erlang_app"],
deps = [
"//deps/amqp10_common:erlang_app",
"@credentials_obfuscation//:erlang_app",
],
)

xref(
Expand Down
2 changes: 1 addition & 1 deletion deps/amqp10_client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ endef
PACKAGES_DIR ?= $(abspath PACKAGES)

BUILD_DEPS = rabbit_common elvis_mk
DEPS = amqp10_common
DEPS = amqp10_common credentials_obfuscation
TEST_DEPS = rabbit rabbitmq_amqp1_0 rabbitmq_ct_helpers
LOCAL_DEPS = ssl inets crypto public_key

Expand Down
12 changes: 10 additions & 2 deletions deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
-spec open_connection(inet:socket_address() | inet:hostname(),
inet:port_number()) -> supervisor:startchild_ret().
open_connection(Addr, Port) ->
_ = ensure_started(),
open_connection(#{address => Addr, port => Port, notify => self(),
sasl => anon}).

Expand All @@ -97,14 +98,19 @@ open_connection(Addr, Port) ->
-spec open_connection(connection_config()) ->
supervisor:startchild_ret().
open_connection(ConnectionConfig0) ->
_ = ensure_started(),

Notify = maps:get(notify, ConnectionConfig0, self()),
NotifyWhenOpened = maps:get(notify_when_opened, ConnectionConfig0, self()),
NotifyWhenClosed = maps:get(notify_when_closed, ConnectionConfig0, self()),
amqp10_client_connection:open(ConnectionConfig0#{
ConnectionConfig1 = ConnectionConfig0#{
notify => Notify,
notify_when_opened => NotifyWhenOpened,
notify_when_closed => NotifyWhenClosed
}).
},
Sasl = maps:get(sasl, ConnectionConfig1),
ConnectionConfig2 = ConnectionConfig1#{sasl => amqp10_client_connection:encrypt_sasl(Sasl)},
amqp10_client_connection:open(ConnectionConfig2).

%% @doc Opens a connection using a connection_config map
%% This is asynchronous and will notify completion to the caller using
Expand Down Expand Up @@ -497,6 +503,8 @@ try_to_existing_atom(L) when is_list(L) ->
throw({non_existent_atom, L})
end.

ensure_started() ->
_ = application:ensure_all_started(credentials_obfuscation).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
Expand Down
46 changes: 35 additions & 11 deletions deps/amqp10_client/src/amqp10_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
socket_ready/2,
protocol_header_received/5,
begin_session/1,
heartbeat/1]).
heartbeat/1,
encrypt_sasl/1,
decrypt_sasl/1]).

%% gen_fsm callbacks.
-export([init/1,
Expand All @@ -56,6 +58,9 @@

-type address() :: inet:socket_address() | inet:hostname().

-type encrypted_sasl() :: {plaintext, binary()} | {encrypted, binary()}.
-type decrypted_sasl() :: none | anon | {plain, User :: binary(), Pwd :: binary()}.

-type connection_config() ::
#{container_id => binary(), % AMQP container id
hostname => binary(), % the dns name of the target host
Expand All @@ -72,7 +77,9 @@
% set to a negative value to allow a sender to "overshoot" the flow
% control by this margin
transfer_limit_margin => 0 | neg_integer(),
sasl => none | anon | {plain, User :: binary(), Pwd :: binary()},
%% These credentials_obfuscation-wrapped values have the type of
%% decrypted_sasl/0
sasl => encrypted_sasl() | decrypted_sasl(),
notify => pid(),
notify_when_opened => pid() | none,
notify_when_closed => pid() | none
Expand All @@ -92,7 +99,9 @@
}).

-export_type([connection_config/0,
amqp10_socket/0]).
amqp10_socket/0,
encrypted_sasl/0,
decrypted_sasl/0]).

%% -------------------------------------------------------------------
%% Public API.
Expand Down Expand Up @@ -125,6 +134,18 @@ open(Config) ->
close(Pid, Reason) ->
gen_statem:cast(Pid, {close, Reason}).

-spec encrypt_sasl(decrypted_sasl()) -> encrypted_sasl().
encrypt_sasl(none) ->
credentials_obfuscation:encrypt(none);
encrypt_sasl(DecryptedSasl) ->
credentials_obfuscation:encrypt(term_to_binary(DecryptedSasl)).

-spec decrypt_sasl(encrypted_sasl()) -> decrypted_sasl().
decrypt_sasl(none) ->
credentials_obfuscation:decrypt(none);
decrypt_sasl(EncryptedSasl) ->
binary_to_term(credentials_obfuscation:decrypt(EncryptedSasl)).

%% -------------------------------------------------------------------
%% Private API.
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -166,8 +187,9 @@ init([Sup, Config0]) ->
expecting_socket(_EvtType, {socket_ready, Socket},
State = #state{config = Cfg}) ->
State1 = State#state{socket = Socket},
case Cfg of
#{sasl := none} ->
Sasl = credentials_obfuscation:decrypt(maps:get(sasl, Cfg)),
case Sasl of
none ->
ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER),
{next_state, hdr_sent, State1};
_ ->
Expand All @@ -193,16 +215,17 @@ sasl_hdr_sent({call, From}, begin_session,

sasl_hdr_rcvds(_EvtType, #'v1_0.sasl_mechanisms'{
sasl_server_mechanisms = {array, symbol, Mechs}},
State = #state{config = #{sasl := Sasl}}) ->
SaslBin = {symbol, sasl_to_bin(Sasl)},
State = #state{config = #{sasl := EncryptedSasl}}) ->
DecryptedSasl = decrypt_sasl(EncryptedSasl),
SaslBin = {symbol, decrypted_sasl_to_bin(DecryptedSasl)},
case lists:any(fun(S) when S =:= SaslBin -> true;
(_) -> false
end, Mechs) of
true ->
ok = send_sasl_init(State, Sasl),
ok = send_sasl_init(State, DecryptedSasl),
{next_state, sasl_init_sent, State};
false ->
{stop, {sasl_not_supported, Sasl}, State}
{stop, {sasl_not_supported, DecryptedSasl}, State}
end;
sasl_hdr_rcvds({call, From}, begin_session,
#state{pending_session_reqs = PendingSessionReqs} = State) ->
Expand Down Expand Up @@ -522,8 +545,9 @@ translate_err(#'v1_0.error'{condition = Cond, description = Desc}) ->
amqp10_event(Evt) ->
{amqp10_event, {connection, self(), Evt}}.

sasl_to_bin({plain, _, _}) -> <<"PLAIN">>;
sasl_to_bin(anon) -> <<"ANONYMOUS">>.
decrypted_sasl_to_bin({plain, _, _}) -> <<"PLAIN">>;
decrypted_sasl_to_bin(anon) -> <<"ANONYMOUS">>;
decrypted_sasl_to_bin(none) -> <<"ANONYMOUS">>.

config_defaults() ->
#{sasl => none,
Expand Down

0 comments on commit 152fd1f

Please sign in to comment.