From b2abfcfa5b34e9119f2a7d7f5429a0edfeaeb456 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Sat, 23 Mar 2024 15:39:11 +0100 Subject: [PATCH] Use 1.0 instead of 0.9.1 client for 1.0 tests --- deps/rabbit/BUILD.bazel | 2 +- deps/rabbit/src/rabbit_amqp_management.erl | 46 ++--- deps/rabbit/test/amqp_client_SUITE.erl | 227 +++++++++------------ 3 files changed, 114 insertions(+), 161 deletions(-) diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 96720b81bb4d..e7291c046f77 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -1252,7 +1252,7 @@ rabbitmq_integration_suite( ], shard_count = 3, runtime_deps = [ - "//deps/amqp10_client:erlang_app", + "//deps/rabbitmq_amqp_client:erlang_app", ], ) diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index 55fb5448f721..910c7f858f83 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -343,15 +343,8 @@ decode_queue({map, KVList}) -> ({{utf8, <<"auto_delete">>}, V}, Acc) when is_boolean(V) -> Acc#{auto_delete => V}; - ({{utf8, <<"arguments">>}, {map, List}}, Acc) -> - Args = lists:map(fun({{utf8, Key = <<"x-", _/binary>>}, {utf8, Val}}) -> - {Key, longstr, Val}; - (Arg) -> - throw(<<"400">>, - "unsupported queue argument ~tp", - [Arg]) - end, List), - Acc#{arguments => Args}; + ({{utf8, <<"arguments">>}, Args}, Acc) -> + Acc#{arguments => args_amqp_to_amqpl(Args)}; (Prop, _Acc) -> throw(<<"400">>, "bad queue property ~tp", [Prop]) end, #{}, KVList), @@ -442,15 +435,8 @@ decode_exchange({map, KVList}) -> ({{utf8, <<"internal">>}, V}, Acc) when is_boolean(V) -> Acc#{internal => V}; - ({{utf8, <<"arguments">>}, {map, List}}, Acc) -> - Args = lists:map(fun({{utf8, Key = <<"x-", _/binary>>}, {utf8, Val}}) -> - {Key, longstr, Val}; - (Arg) -> - throw(<<"400">>, - "unsupported exchange argument ~tp", - [Arg]) - end, List), - Acc#{arguments => Args}; + ({{utf8, <<"arguments">>}, Args}, Acc) -> + Acc#{arguments => args_amqp_to_amqpl(Args)}; (Prop, _Acc) -> throw(<<"400">>, "bad exchange property ~tp", [Prop]) end, #{}, KVList), @@ -471,17 +457,8 @@ decode_binding({map, KVList}) -> Acc#{destination_exchange => V}; ({{utf8, <<"binding_key">>}, {utf8, V}}, Acc) -> Acc#{binding_key => V}; - ({{utf8, <<"arguments">>}, {map, List}}, Acc) -> - Args = lists:map(fun({{T, Key}, TypeVal}) - when T =:= utf8 orelse - T =:= symbol -> - mc_amqpl:to_091(Key, TypeVal); - (Arg) -> - throw(<<"400">>, - "unsupported binding argument ~tp", - [Arg]) - end, List), - Acc#{arguments => Args}; + ({{utf8, <<"arguments">>}, Args}, Acc) -> + Acc#{arguments => args_amqp_to_amqpl(Args)}; (Field, _Acc) -> throw(<<"400">>, "bad binding field ~tp", [Field]) end, #{}, KVList). @@ -511,6 +488,17 @@ encode_bindings(Bindings) -> end, Bindings), {list, Bs}. +args_amqp_to_amqpl({map, KVList}) -> + lists:map(fun({{T, Key}, TypeVal}) + when T =:= utf8 orelse + T =:= symbol -> + mc_amqpl:to_091(Key, TypeVal); + (Arg) -> + throw(<<"400">>, + "unsupported argument ~tp", + [Arg]) + end, KVList). + args_amqpl_to_amqp(Args) -> {map, [{{utf8, K}, mc_amqpl:from_091(T, V)} || {K, T, V} <- Args]}. diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 7490eca6d6db..0d1deaf78566 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -220,19 +220,16 @@ reliable_send_receive(QType, Outcome, Config) -> end, ct:pal("~s testing ~s", [?FUNCTION_NAME, OutcomeBin]), - QName = <>, - Ch = rabbit_ct_client_helpers:open_channel(Config), - #'queue.declare_ok'{} = amqp_channel:call( - Ch, #'queue.declare'{ - queue = QName, - durable = true, - arguments = [{<<"x-queue-type">>, longstr, QType}]}), - ok = rabbit_ct_client_helpers:close_channel(Ch), - - %% reliable send and consume OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"pair">>), + QName = <>, + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}}, + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + + %% reliable send and consume Address = <<"/amq/queue/", QName/binary>>, {ok, Sender} = amqp10_client:attach_sender_link( Session, <<"test-sender">>, Address), @@ -267,24 +264,17 @@ reliable_send_receive(QType, Outcome, Config) -> flush("post accept"), ok = amqp10_client:detach_link(Receiver), + ok = delete_queue(Session2, QName), ok = end_session_sync(Session2), - ok = amqp10_client:close_connection(Connection2), - ok = delete_queue(Config, QName). + ok = amqp10_client:close_connection(Connection2). %% Tests that confirmations are returned correctly %% when sending many messages async to a quorum queue. sender_settle_mode_unsettled(Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), - #'queue.declare_ok'{} = amqp_channel:call( - Ch, #'queue.declare'{ - queue = QName, - durable = true, - arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}), - - OpnConf = connection_config(Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), + {Connection, Session, LinkPair} = init(Config), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}, + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), Address = <<"/amq/queue/", QName/binary>>, {ok, Sender} = amqp10_client:attach_sender_link( Session, <<"test-sender">>, Address, unsettled), @@ -305,24 +295,20 @@ sender_settle_mode_unsettled(Config) -> end || DTag <- DTags], ok = amqp10_client:detach_link(Sender), + ?assertMatch({ok, #{message_count := NumMsgs}}, + rabbitmq_amqp_client:delete_queue(LinkPair, QName)), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), - ?assertEqual(#'queue.delete_ok'{message_count = NumMsgs}, - amqp_channel:call(Ch, #'queue.delete'{queue = QName})), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = amqp10_client:close_connection(Connection). sender_settle_mode_unsettled_fanout(Config) -> + {Connection, Session, LinkPair} = init(Config), QNames = [<<"q1">>, <<"q2">>, <<"q3">>], - Ch = rabbit_ct_client_helpers:open_channel(Config), [begin - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), - #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName, - exchange = <<"amq.fanout">>}) + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, <<"amq.fanout">>, <<>>, #{}) end || QName <- QNames], - OpnConf = connection_config(Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), Address = <<"/exchange/amq.fanout">>, {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"test-sender">>, Address, unsettled), ok = wait_for_credit(Sender), @@ -342,28 +328,22 @@ sender_settle_mode_unsettled_fanout(Config) -> end || DTag <- DTags], ok = amqp10_client:detach_link(Sender), + [?assertMatch({ok, #{message_count := NumMsgs}}, + rabbitmq_amqp_client:delete_queue(LinkPair, QName)) + || QName <- QNames], + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), - [?assertEqual(#'queue.delete_ok'{message_count = NumMsgs}, - amqp_channel:call(Ch, #'queue.delete'{queue = QName})) || - QName <- QNames], - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = amqp10_client:close_connection(Connection). %% Tests that confirmations are returned correctly %% when sending many messages async to a quorum queue where %% every 3rd message is settled by the sender. sender_settle_mode_mixed(Config) -> + {Connection, Session, LinkPair} = init(Config), QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), - #'queue.declare_ok'{} = amqp_channel:call( - Ch, #'queue.declare'{ - queue = QName, - durable = true, - arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}, + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), - OpnConf = connection_config(Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), Address = <<"/amq/queue/", QName/binary>>, {ok, Sender} = amqp10_client:attach_sender_link( Session, <<"test-sender">>, Address, mixed), @@ -390,27 +370,20 @@ sender_settle_mode_mixed(Config) -> end || DTag <- DTags], ok = amqp10_client:detach_link(Sender), + ?assertMatch({ok, #{message_count := NumMsgs}}, + rabbitmq_amqp_client:delete_queue(LinkPair, QName)), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), - ?assertEqual(#'queue.delete_ok'{message_count = NumMsgs}, - amqp_channel:call(Ch, #'queue.delete'{queue = QName})), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = amqp10_client:close_connection(Connection). quorum_queue_rejects(Config) -> + {Connection, Session, LinkPair} = init(Config), QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), - #'queue.declare_ok'{} = amqp_channel:call( - Ch, #'queue.declare'{ - queue = QName, - durable = true, - arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-max-length">>, long, 1}, - {<<"x-overflow">>, longstr, <<"reject-publish">>} - ]}), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, + <<"x-max-length">> => {ulong, 1}, + <<"x-overflow">> => {utf8, <<"reject-publish">>}}}, + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), - OpnConf = connection_config(Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), Address = <<"/amq/queue/", QName/binary>>, {ok, Sender} = amqp10_client:attach_sender_link( Session, <<"test-sender">>, Address, mixed), @@ -443,12 +416,11 @@ quorum_queue_rejects(Config) -> end || DTag <- DTags ++ [<<"tag d">>]], ok = amqp10_client:detach_link(Sender), + ?assertMatch({ok, #{message_count := 2}}, + rabbitmq_amqp_client:delete_queue(LinkPair, QName)), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = amqp10_client:end_session(Session), - ok = amqp10_client:close_connection(Connection), - - ?assertEqual(#'queue.delete_ok'{message_count = 2}, - amqp_channel:call(Ch, #'queue.delete'{queue = QName})), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = amqp10_client:close_connection(Connection). receiver_settle_mode_first(Config) -> QName = atom_to_binary(?FUNCTION_NAME), @@ -535,9 +507,9 @@ receiver_settle_mode_first(Config) -> assert_messages(QName, 0, 0, Config), ok = amqp10_client:detach_link(Receiver), + ok = delete_queue(Session, QName), ok = amqp10_client:end_session(Session), - ok = amqp10_client:close_connection(Connection), - ok = delete_queue(Config, QName). + ok = amqp10_client:close_connection(Connection). publishing_to_non_existing_queue_should_settle_with_released(Config) -> OpnConf = connection_config(Config), @@ -592,16 +564,9 @@ roundtrip_with_drain_stream(Config) -> roundtrip_with_drain(Config, QueueType, QName) when is_binary(QueueType) -> Address = <<"/amq/queue/", QName/binary>>, - Ch = rabbit_ct_client_helpers:open_channel(Config), - Args = [{<<"x-queue-type">>, longstr, QueueType}], - #'queue.declare_ok'{} = amqp_channel:call( - Ch, #'queue.declare'{ - queue = QName, - durable = true, - arguments = Args}), - OpnConf = connection_config(Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), + {Connection, Session, LinkPair} = init(Config), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QueueType}}}, + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), {ok, Sender} = amqp10_client:attach_sender_link( Session, <<"test-sender">>, Address), wait_for_credit(Sender), @@ -640,24 +605,19 @@ roundtrip_with_drain(Config, QueueType, QName) flush("final"), ok = amqp10_client:detach_link(Sender), - ok = amqp10_client:close_connection(Connection), - ok = delete_queue(Config, QName). + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:close_connection(Connection). %% Send a message with a body containing a single AMQP 1.0 value section %% to a stream and consume via AMQP 0.9.1. amqp_stream_amqpl(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Connection, Session, LinkPair} = init(Config), QName = atom_to_binary(?FUNCTION_NAME), - - amqp_channel:call(Ch, #'queue.declare'{ - queue = QName, - durable = true, - arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}, + {ok, #{type := <<"stream">>}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), Address = <<"/amq/queue/", QName/binary>>, - OpnConf = connection_config(Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), {ok, Sender} = amqp10_client:attach_sender_link( Session, <<"test-sender">>, Address), wait_for_credit(Sender), @@ -665,8 +625,8 @@ amqp_stream_amqpl(Config) -> ok = amqp10_client:send_msg(Sender, OutMsg), flush("final"), ok = amqp10_client:detach_link(Sender), - ok = amqp10_client:close_connection(Connection), + Ch = rabbit_ct_client_helpers:open_channel(Config), #'basic.qos_ok'{} = amqp_channel:call(Ch, #'basic.qos'{global = false, prefetch_count = 1}), CTag = <<"my-tag">>, @@ -685,8 +645,11 @@ amqp_stream_amqpl(Config) -> after 5000 -> ct:fail(basic_deliver_timeout) end, - #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + + ok = rabbit_ct_client_helpers:close_channel(Ch), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:close_connection(Connection). message_headers_conversion(Config) -> QName = atom_to_binary(?FUNCTION_NAME), @@ -705,7 +668,7 @@ message_headers_conversion(Config) -> amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address), ok = rabbit_ct_client_helpers:close_channel(Ch), - ok = delete_queue(Config, QName), + ok = delete_queue(Session, QName), ok = amqp10_client:close_connection(Connection). amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) -> @@ -835,10 +798,10 @@ multiple_sessions(Config) -> %% Clean up. [ok = amqp10_client:detach_link(Link) || Link <- [Receiver1, Receiver2, Sender1, Sender2]], + [ok = delete_queue(Session1, Q) || Q <- Qs], ok = end_session_sync(Session1), ok = end_session_sync(Session2), - ok = amqp10_client:close_connection(Connection), - [ok = delete_queue(Config, Q) || Q <- Qs]. + ok = amqp10_client:close_connection(Connection). server_closes_link_classic_queue(Config) -> server_closes_link(<<"classic">>, Config). @@ -896,7 +859,7 @@ server_closes_link(QType, Config) -> %% Server closes the link endpoint due to some AMQP 1.0 external condition: %% In this test, the external condition is that an AMQP 0.9.1 client deletes the queue. - delete_queue(Config, QName), + ok = delete_queue(Session, QName), %% We expect that the server closes the link endpoints, %% i.e. the server sends us DETACH frames. @@ -1011,7 +974,7 @@ link_target_queue_deleted(QType, Config) -> %% Now, the server AMQP session contains a delivery that did not get confirmed by the target queue. %% If we now delete that target queue, RabbitMQ must not reply to us with ACCEPTED. %% Instead, we expect RabbitMQ to reply with RELEASED since no queue ever received our 2nd message. - delete_queue(Config, QName), + ok = delete_queue(Session, QName), ok = wait_for_settlement(DTag2, released), %% After the 2nd message got released, we additionally expect RabbitMQ to close the link given @@ -1580,19 +1543,12 @@ single_active_consumer_quorum_queue(Config) -> single_active_consumer(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), - #'queue.declare_ok'{} = amqp_channel:call( - Ch, #'queue.declare'{ - queue = QName, - durable = true, - arguments = [{<<"x-single-active-consumer">>, bool, true}, - {<<"x-queue-type">>, longstr, QType}]}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + {Connection, Session, LinkPair} = init(Config), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}, + <<"x-single-active-consumer">> => true}}, + {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), %% Attach 1 sender and 2 receivers to the queue. - OpnConf = connection_config(Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), Address = <<"/amq/queue/", QName/binary>>, {ok, Sender} = amqp10_client:attach_sender_link( Session, <<"test-sender">>, Address), @@ -1660,9 +1616,10 @@ single_active_consumer(QType, Config) -> end, ok = amqp10_client:detach_link(Receiver2), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), - delete_queue(Config, QName). + ok = amqp10_client:close_connection(Connection). %% "A session endpoint can choose to unmap its output handle for a link. In this case, the endpoint MUST %% send a detach frame to inform the remote peer that the handle is no longer attached to the link endpoint. @@ -2047,9 +2004,9 @@ receive_transfer_flow_order(Config) -> after 5000 -> ct:fail("timeout receiving credit_exhausted") end, + ok = delete_queue(Session, QName), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), - ok = delete_queue(Config, QName). + ok = amqp10_client:close_connection(Connection). last_queue_confirms(Config) -> ClassicQ = <<"my classic queue">>, @@ -2254,9 +2211,9 @@ target_classic_queue_down(Config) -> ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Receiver2), + ok = delete_queue(Session, QName), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), - delete_queue(Config, QName). + ok = amqp10_client:close_connection(Connection). async_notify_settled_classic_queue(Config) -> %% TODO Bump old version in mixed version tests to 3.13.x, @@ -2437,10 +2394,10 @@ link_flow_control(Config) -> end, [ok = amqp10_client:detach_link(Link) || Link <- [ReceiverCQ, ReceiverQQ, SenderCQ, SenderQQ]], + ok = delete_queue(Session, QQ), + ok = delete_queue(Session, CQ), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), - delete_queue(Config, QQ), - delete_queue(Config, CQ). + ok = amqp10_client:close_connection(Connection). classic_queue_on_old_node(Config) -> %% TODO Bump old version in mixed version tests to 3.13.x, @@ -2858,7 +2815,7 @@ stream_filtering(Config) -> ?assertEqual(WaveCount * 2, length(AppleUnfilteredFilteredMessages)), ok = amqp10_client:detach_link(AppleUnfilteredReceiver), - delete_queue(Config, Stream), + ok = delete_queue(Session, Stream), ok = amqp10_client:close_connection(Connection). available_messages_classic_queue(Config) -> @@ -2966,9 +2923,9 @@ incoming_message_interceptors(Config) -> ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Receiver), + ok = delete_queue(Session, QName), ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection), - delete_queue(Config, QName), true = rpc(Config, persistent_term, erase, [Key]). trace(Config) -> @@ -3050,10 +3007,10 @@ trace(Config) -> ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Receiver), + [delete_queue(SessionSender, Q0) || Q0 <- Qs], ok = end_session_sync(SessionSender), ok = end_session_sync(SessionReceiver), - ok = amqp10_client:close_connection(Connection), - [delete_queue(Config, Q0) || Q0 <- Qs]. + ok = amqp10_client:close_connection(Connection). %% https://www.rabbitmq.com/validated-user-id.html user_id(Config) -> @@ -3123,8 +3080,8 @@ message_ttl(Config) -> ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Receiver), - ok = amqp10_client:close_connection(Connection), - ok = delete_queue(Config, QName). + ok = delete_queue(Session, QName), + ok = amqp10_client:close_connection(Connection). %% For backward compatibility, deployment tools should be able to %% enable and disable the deprecated no-op AMQP 1.0 plugin. @@ -3342,13 +3299,20 @@ classic_priority_queue(Config) -> ok = amqp10_client:detach_link(Receiver1), ok = amqp10_client:detach_link(Receiver2), ok = amqp10_client:detach_link(Sender), + ok = delete_queue(Session, QName), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), - ok = delete_queue(Config, QName). + ok = amqp10_client:close_connection(Connection). %% internal %% +init(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + {Connection, Session, LinkPair}. + receive_all_messages(Receiver, Accept) -> receive_all_messages0(Receiver, Accept, []). @@ -3465,10 +3429,11 @@ wait_for_accepts(N) -> ct:fail({missing_accepted, N}) end. -delete_queue(Config, QName) -> - Ch = rabbit_ct_client_helpers:open_channel(Config), - #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). +delete_queue(Session, QName) -> + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync( + Session, <<"delete queue">>), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair). amqp091_get_msg_headers(Channel, QName) -> {#'basic.get_ok'{}, #amqp_msg{props = #'P_basic'{ headers= Headers}}}