From 5cf0afb99bcc0ee9b7b44b5e49235ece4bc10041 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 22 Oct 2024 09:51:34 +0200 Subject: [PATCH 1/2] Convert array from AMQP 1.0 to AMQP 0.9.1 Fix the following crash when an AMQP 0.9.1 client consumes an AMQP 1.0 encoded message that contains an array value in message annotations: ``` crasher: initial call: rabbit_channel:init/1 pid: <0.685.0> registered_name: [] exception exit: {function_clause, [{mc_amqpl,to_091, [<<"x-array">>, {array,utf8,[{utf8,<<"e1">>},{utf8,<<"e2">>}]}], [{file,"mc_amqpl.erl"},{line,737}]}, {mc_amqpl,'-convert_from/3-fun-3-',1, [{file,"mc_amqpl.erl"},{line,168}]}, {lists,filtermap_1,2, [{file,"lists.erl"},{line,2279}]}, {mc_amqpl,convert_from,3, [{file,"mc_amqpl.erl"},{line,158}]}, {mc,convert,3,[{file,"mc.erl"},{line,332}]}, {rabbit_channel,handle_deliver0,4, [{file,"rabbit_channel.erl"},{line,2619}]}, {lists,foldl_1,3,[{file,"lists.erl"},{line,2151}]}, {lists,foldl,3,[{file,"lists.erl"},{line,2146}]}]} ``` (cherry picked from commit 814d44dd82969edc0d548d6a540f1d5517a5c2ff) # Conflicts: # deps/amqp10_client/src/amqp10_msg.erl --- deps/amqp10_client/src/amqp10_msg.erl | 13 +++++++++++-- deps/rabbit/src/mc_amqpl.erl | 9 +++++++-- deps/rabbit/test/amqp_client_SUITE.erl | 14 ++++++++++++++ deps/rabbit/test/mc_unit_SUITE.erl | 4 +++- 4 files changed, 35 insertions(+), 5 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_msg.erl b/deps/amqp10_client/src/amqp10_msg.erl index fa046cc60657..556dbef9664f 100644 --- a/deps/amqp10_client/src/amqp10_msg.erl +++ b/deps/amqp10_client/src/amqp10_msg.erl @@ -402,8 +402,8 @@ set_delivery_annotations( Anns1 = #'v1_0.delivery_annotations'{content = maps:to_list(Anns)}, Msg#amqp10_msg{delivery_annotations = Anns1}. --spec set_message_annotations(#{binary() => binary() | integer() | string()}, - amqp10_msg()) -> amqp10_msg(). +-spec set_message_annotations(#{binary() => binary() | number() | string() | tuple()}, + amqp10_msg()) -> amqp10_msg(). set_message_annotations(Props, #amqp10_msg{message_annotations = undefined} = Msg) -> @@ -433,7 +433,16 @@ wrap_ap_value(V) when is_integer(V) -> case V < 0 of true -> {int, V}; false -> {uint, V} +<<<<<<< HEAD end. +======= + end; +wrap_ap_value(V) when is_number(V) -> + %% AMQP double and Erlang float are both 64-bit. + {double, V}; +wrap_ap_value(TaggedValue) when is_tuple(TaggedValue) -> + TaggedValue. +>>>>>>> 814d44dd82 (Convert array from AMQP 1.0 to AMQP 0.9.1) %% LOCAL header_value(durable, undefined) -> false; diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 8de27294723a..723e60cd3f79 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -754,9 +754,14 @@ to_091(Key, false) -> {Key, bool, false}; to_091(Key, undefined) -> {Key, void, undefined}; to_091(Key, null) -> {Key, void, undefined}; to_091(Key, {list, L}) -> - {Key, array, [to_091(V) || V <- L]}; + to_091_array(Key, L); to_091(Key, {map, M}) -> - {Key, table, [to_091(unwrap(K), V) || {K, V} <- M]}. + {Key, table, [to_091(unwrap(K), V) || {K, V} <- M]}; +to_091(Key, {array, _T, L}) -> + to_091_array(Key, L). + +to_091_array(Key, L) -> + {Key, array, [to_091(V) || V <- L]}. to_091({utf8, V}) -> {longstr, V}; to_091({symbol, V}) -> {longstr, V}; diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index a21c372fc25f..aae2dfbe9ca5 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -1328,6 +1328,13 @@ amqp_amqpl(QType, Config) -> message_format = {uint, 0}}, Body1, Footer])), + %% Send with an array value in message annotations. + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_message_annotations( + #{<<"x-array">> => {array, utf8, [{utf8, <<"e1">>}, + {utf8, <<"e2">>}]}}, + amqp10_msg:new(<<>>, Body1, true))), ok = amqp10_client:detach_link(Sender), flush(detached), @@ -1407,6 +1414,13 @@ amqp_amqpl(QType, Config) -> ?assertEqual([Body1, Footer], amqp10_framing:decode_bin(Payload10)) after 5000 -> ct:fail({missing_deliver, ?LINE}) end, + receive {_, #amqp_msg{payload = Payload11, + props = #'P_basic'{headers = Headers11}}} -> + ?assertEqual([Body1], amqp10_framing:decode_bin(Payload11)), + ?assertEqual({array, [{longstr, <<"e1">>}, {longstr, <<"e2">>}]}, + rabbit_misc:table_lookup(Headers11, <<"x-array">>)) + after 5000 -> ct:fail({missing_deliver, ?LINE}) + end, ok = rabbit_ct_client_helpers:close_channel(Ch), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 529ffe072c28..acc9ea69adfe 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -532,7 +532,8 @@ amqp_amqpl(_Config) -> MAC = [ {{symbol, <<"x-stream-filter">>}, {utf8, <<"apple">>}}, thead2('x-list', list, [utf8(<<"l">>)]), - thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}]) + thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}]), + {{symbol, <<"x-array">>}, {array, utf8, [{utf8, <<"a">>}]}} ], M = #'v1_0.message_annotations'{content = MAC}, P = #'v1_0.properties'{content_type = {symbol, <<"ctype">>}, @@ -598,6 +599,7 @@ amqp_amqpl(_Config) -> ?assertMatch({_, longstr, <<"apple">>}, header(<<"x-stream-filter">>, HL)), ?assertMatch({_ ,array, [{longstr,<<"l">>}]}, header(<<"x-list">>, HL)), ?assertMatch({_, table, [{<<"k">>,longstr,<<"v">>}]}, header(<<"x-map">>, HL)), + ?assertMatch({_, array, [{longstr, <<"a">>}]}, header(<<"x-array">>, HL)), ?assertMatch({_, long, 5}, header(<<"long">>, HL)), ?assertMatch({_, long, 5}, header(<<"ulong">>, HL)), From 3894aa51302300c80a9d62b7e6ffb05af0d3d418 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 22 Oct 2024 12:30:19 +0200 Subject: [PATCH 2/2] Fix conflict --- deps/amqp10_client/src/amqp10_msg.erl | 4 ---- 1 file changed, 4 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_msg.erl b/deps/amqp10_client/src/amqp10_msg.erl index 556dbef9664f..673617acc6a0 100644 --- a/deps/amqp10_client/src/amqp10_msg.erl +++ b/deps/amqp10_client/src/amqp10_msg.erl @@ -433,16 +433,12 @@ wrap_ap_value(V) when is_integer(V) -> case V < 0 of true -> {int, V}; false -> {uint, V} -<<<<<<< HEAD - end. -======= end; wrap_ap_value(V) when is_number(V) -> %% AMQP double and Erlang float are both 64-bit. {double, V}; wrap_ap_value(TaggedValue) when is_tuple(TaggedValue) -> TaggedValue. ->>>>>>> 814d44dd82 (Convert array from AMQP 1.0 to AMQP 0.9.1) %% LOCAL header_value(durable, undefined) -> false;