diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 3310320722f7..4b161bb0689c 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -78,9 +78,9 @@ %%-------------------------------------------------------------------------- unpack_from_0_9_1( - {Sock,RecvLen, PendingRecv, Buf, BufLen, ProxySocket, + {Sock,RecvLen, PendingRecv, SupPid, Buf, BufLen, ProxySocket, ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt}, - Parent, ConnectionHelperSupPid, HandshakeTimeout) -> + Parent, HandshakeTimeout) -> #v1{parent = Parent, sock = Sock, callback = handshake, @@ -88,7 +88,7 @@ unpack_from_0_9_1( pending_recv = PendingRecv, connection_state = pre_init, heartbeater = none, - helper_sup = ConnectionHelperSupPid, + helper_sup = SupPid, buf = Buf, buf_len = BufLen, proxy_socket = ProxySocket, @@ -612,13 +612,8 @@ handle_input(Callback, Data, _State) -> init(Mode, PackedState) -> {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout), {parent, Parent} = erlang:process_info(self(), parent), - ConnectionHelperSupFlags = #{strategy => one_for_all, - intensity => 0, - period => 1, - auto_shutdown => any_significant}, - {ok, ConnectionHelperSupPid} = rabbit_connection_sup:start_connection_helper_sup( - Parent, ConnectionHelperSupFlags), - State0 = unpack_from_0_9_1(PackedState, Parent, ConnectionHelperSupPid, HandshakeTimeout), + ok = rabbit_connection_sup:remove_connection_helper_sup(Parent, helper_sup_amqp_091), + State0 = unpack_from_0_9_1(PackedState, Parent, HandshakeTimeout), State = start_1_0_connection(Mode, State0), %% By invoking recvloop here we become 1.0. recvloop(sys:debug_options([]), State). @@ -1036,7 +1031,7 @@ socket_info(Get, Select, #v1{sock = Sock}) -> ignore_maintenance({map, Properties}) -> lists:member( - {{symbol, <<"ignore-maintenance">>}, {boolean, true}}, + {{symbol, <<"ignore-maintenance">>}, true}, Properties); ignore_maintenance(_) -> false. diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 58e2dc603d14..6b937c4c2fee 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -1950,7 +1950,7 @@ source_filters_to_consumer_args([<<"rabbitmq:stream-filter">> = H | T], KVList, source_filters_to_consumer_args([<<"rabbitmq:stream-match-unfiltered">> = H | T], KVList, Acc) -> Key = {symbol, H}, Arg = case keyfind_unpack_described(Key, KVList) of - {_, {boolean, MU}} -> + {_, MU} when is_boolean(MU) -> [{<<"x-stream-match-unfiltered">>, bool, MU}]; _ -> [] diff --git a/deps/rabbit/src/rabbit_connection_sup.erl b/deps/rabbit/src/rabbit_connection_sup.erl index 92ddaaf7784e..00c003c8d136 100644 --- a/deps/rabbit/src/rabbit_connection_sup.erl +++ b/deps/rabbit/src/rabbit_connection_sup.erl @@ -7,7 +7,7 @@ -module(rabbit_connection_sup). -%% Supervisor for a (network) AMQP 0-9-1 client connection. +%% Supervisor for a (network) AMQP client connection. %% %% Supervises %% @@ -21,7 +21,7 @@ -export([start_link/3, reader/1, - start_connection_helper_sup/2 + remove_connection_helper_sup/2 ]). -export([init/1]). @@ -35,12 +35,48 @@ start_link(Ref, _Transport, _Opts) -> {ok, SupPid} = supervisor:start_link(?MODULE, []), + %% We need to get channels in the hierarchy here so they get shut + %% down after the reader, so the reader gets a chance to terminate + %% them cleanly. But for 1.0 readers we can't start the real + %% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) - + %% so we add another supervisor into the hierarchy. + %% + %% This supervisor also acts as an intermediary for heartbeaters and + %% the queue collector process, since these must not be siblings of the + %% reader due to the potential for deadlock if they are added/restarted + %% whilst the supervision tree is shutting down. + ChildSpec = #{restart => transient, + significant => true, + shutdown => infinity, + type => supervisor}, + {ok, HelperSup091} = + supervisor:start_child( + SupPid, + ChildSpec#{ + id => helper_sup_amqp_091, + start => {rabbit_connection_helper_sup, start_link, + [#{strategy => one_for_one, + intensity => 10, + period => 10, + auto_shutdown => any_significant}]}} + ), + {ok, HelperSup10} = + supervisor:start_child( + SupPid, + ChildSpec#{ + id => helper_sup_amqp_10, + start => {rabbit_connection_helper_sup, start_link, + [#{strategy => one_for_all, + intensity => 0, + period => 1, + auto_shutdown => any_significant}]}} + ), {ok, ReaderPid} = supervisor:start_child( SupPid, #{ id => reader, - start => {rabbit_reader, start_link, [Ref]}, + start => {rabbit_reader, start_link, [{HelperSup091, HelperSup10}, Ref]}, restart => transient, significant => true, shutdown => ?WORKER_WAIT, @@ -51,23 +87,13 @@ start_link(Ref, _Transport, _Opts) -> {ok, SupPid, ReaderPid}. -spec reader(pid()) -> pid(). - reader(Pid) -> hd(rabbit_misc:find_child(Pid, reader)). --spec start_connection_helper_sup(pid(), supervisor:sup_flags()) -> - supervisor:startchild_ret(). -start_connection_helper_sup(ConnectionSupPid, ConnectionHelperSupFlags) -> - supervisor:start_child( - ConnectionSupPid, - #{ - id => helper_sup, - start => {rabbit_connection_helper_sup, start_link, [ConnectionHelperSupFlags]}, - restart => transient, - significant => true, - shutdown => infinity, - type => supervisor - }). +-spec remove_connection_helper_sup(pid(), helper_sup_amqp_091 | helper_sup_amqp_10) -> ok. +remove_connection_helper_sup(ConnectionSupPid, ConnectionHelperId) -> + ok = supervisor:terminate_child(ConnectionSupPid, ConnectionHelperId), + ok = supervisor:delete_child(ConnectionSupPid, ConnectionHelperId). %%-------------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index 4f8c9a61b6cd..bb5268450d7e 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -43,12 +43,12 @@ -include_lib("rabbit_common/include/rabbit_framing.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). --export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/2, +-export([start_link/2, info_keys/0, info/1, info/2, force_event_refresh/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/2, mainloop/4, recvloop/4]). +-export([init/3, mainloop/4, recvloop/4]). -export([conserve_resources/3, server_properties/1]). @@ -78,7 +78,9 @@ %% pre_init | securing | running | blocking | blocked | closing | closed | {become, F} connection_state, %% see comment in rabbit_connection_sup:start_link/0 - helper_sup, + helper_sup :: {HelperSupAmqp091 :: pid(), + HelperSupAmqp10 :: pid()} % pre version negotiation + | pid(), % post version negotiation %% takes care of cleaning up exclusive queues, %% see rabbit_queue_collector queue_collector, @@ -145,10 +147,10 @@ %%-------------------------------------------------------------------------- --spec start_link(ranch:ref()) -> +-spec start_link({pid(), pid()}, ranch:ref()) -> rabbit_types:ok(pid()). -start_link(Ref) -> - Pid = proc_lib:spawn_link(?MODULE, init, [self(), Ref]), +start_link(HelperSups, Ref) -> + Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSups, Ref]), {ok, Pid}. -spec shutdown(pid(), string()) -> 'ok'. @@ -156,14 +158,14 @@ start_link(Ref) -> shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). --spec init(pid(), ranch:ref()) -> +-spec init(pid(), {pid(), pid()}, ranch:ref()) -> no_return(). -init(Parent, Ref) -> +init(Parent, HelperSups, Ref) -> ?LG_PROCESS_TYPE(reader), {ok, Sock} = rabbit_networking:handshake(Ref, application:get_env(rabbit, proxy_protocol, false)), Deb = sys:debug_options([]), - start_connection(Parent, Ref, Deb, Sock). + start_connection(Parent, HelperSups, Ref, Deb, Sock). -spec system_continue(_,_,{[binary()], non_neg_integer(), #v1{}}) -> any(). @@ -290,10 +292,10 @@ socket_op(Sock, Fun) -> exit(normal) end. --spec start_connection(pid(), ranch:ref(), any(), rabbit_net:socket()) -> +-spec start_connection(pid(), {pid(), pid()}, ranch:ref(), any(), rabbit_net:socket()) -> no_return(). -start_connection(Parent, RanchRef, Deb, Sock) -> +start_connection(Parent, HelperSups, RanchRef, Deb, Sock) -> process_flag(trap_exit, true), RealSocket = rabbit_net:unwrap_socket(Sock), Name = case rabbit_net:connection_string(Sock, inbound) of @@ -336,7 +338,7 @@ start_connection(Parent, RanchRef, Deb, Sock) -> pending_recv = false, connection_state = pre_init, queue_collector = undefined, %% started on tune-ok - helper_sup = none, + helper_sup = HelperSups, heartbeater = none, channel_sup_sup_pid = none, channel_count = 0, @@ -1104,13 +1106,9 @@ start_091_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, Protocol, #v1{parent = Parent, sock = Sock, + helper_sup = {HelperSup091, _HelperSup10}, connection = Connection} = State0) -> - ConnectionHelperSupFlags = #{strategy => one_for_one, - intensity => 10, - period => 10, - auto_shutdown => any_significant}, - {ok, ConnectionHelperSupPid} = rabbit_connection_sup:start_connection_helper_sup( - Parent, ConnectionHelperSupFlags), + ok = rabbit_connection_sup:remove_connection_helper_sup(Parent, helper_sup_amqp_10), rabbit_networking:register_connection(self()), Start = #'connection.start'{ version_major = ProtocolMajor, @@ -1123,7 +1121,7 @@ start_091_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, timeout_sec = ?NORMAL_TIMEOUT, protocol = Protocol}, connection_state = starting, - helper_sup = ConnectionHelperSupPid}, + helper_sup = HelperSup091}, switch_callback(State, frame_header, 7). -spec refuse_connection(rabbit_net:socket(), any()) -> no_return(). @@ -1647,6 +1645,7 @@ become_10(Id, State = #v1{sock = Sock}) -> pack_for_1_0(Buf, BufLen, #v1{sock = Sock, recv_len = RecvLen, pending_recv = PendingRecv, + helper_sup = {_HelperSup091, HelperSup10}, proxy_socket = ProxySocket, connection = #connection{ name = Name, @@ -1655,7 +1654,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock, port = Port, peer_port = PeerPort, connected_at = ConnectedAt}}) -> - {Sock, RecvLen, PendingRecv, Buf, BufLen, ProxySocket, + {Sock, RecvLen, PendingRecv, HelperSup10, Buf, BufLen, ProxySocket, Name, Host, PeerHost, Port, PeerPort, ConnectedAt}. respond_and_close(State, Channel, Protocol, Reason, LogErr) -> diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 54a0d72c70d2..884f54033a9f 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -244,7 +244,11 @@ reliable_send_receive(QType, Outcome, Config) -> %% link will be in "mixed" mode by default Body = <<"body-1">>, Msg1 = amqp10_msg:new(DTag1, Body, false), - ok = amqp10_client:send_msg(Sender, Msg1), + + %% Use the 2 byte AMQP boolean encoding, see AMQP ยง1.6.2 + True = {boolean, true}, + Msg2 = amqp10_msg:set_headers(#{durable => True}, Msg1), + ok = amqp10_client:send_msg(Sender, Msg2), ok = wait_for_settlement(DTag1), ok = amqp10_client:detach_link(Sender), @@ -258,6 +262,7 @@ reliable_send_receive(QType, Outcome, Config) -> Session2, <<"test-receiver">>, Address, unsettled), {ok, Msg} = amqp10_client:get_msg(Receiver), ?assertEqual([Body], amqp10_msg:body(Msg)), + ?assertEqual(true, amqp10_msg:header(durable, Msg)), ok = amqp10_client:settle_msg(Receiver, Msg, Outcome), flush("post accept"),