From df59a52b703ac1b8c69f6e938299ae915b2146f5 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 7 Oct 2024 17:12:26 +0200 Subject: [PATCH] Support AMQP filter expressions (#12415) * Support AMQP filter expressions ## What? This PR implements the following property filter expressions for AMQP clients consuming from streams as defined in [AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227): * properties filters [section 4.2.4] * application-properties filters [section 4.2.5] String prefix and suffix matching is also supported. This PR also fixes a bug where RabbitMQ would accept wrong filters. Specifically, prior to this PR the values of the filter-set's map were allowed to be symbols. However, "every value MUST be either null or of a described type which provides the archetype filter." ## Why? This feature adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order. This feature also reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. Note that AMQP filter expressions are more fine grained than the [bloom filter based stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because * they do not suffer false positives * the unit of filtering is per-message instead of per-chunk * matching can be performed on **multiple** values in the properties and application-properties sections * prefix and suffix matching on the actual values is supported. Both, AMQP filter expressions and bloom filters can be used together. ## How? If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only replies with filters it actually supports and validated successfully to comply with: "The receiving endpoint sets its desired filter, the sending endpoint [RabbitMQ] sets the filter actually in place (including any filters defaulted at the node)." * Delete streams test case The test suite constructed a wrong filter-set. Specifically the value of the filter-set didn't use a described type as mandated by the spec. Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html throws errors that the descriptor can't be encoded. Given that this code path is already tests via the amqp_filtex_SUITE, this F# test gets therefore deleted. * Re-introduce the AMQP filter-set bug Since clients might rely on the wrong filter-set value type, we support the bug behind a deprecated feature flag and gradually remove support this bug. * Revert "Delete streams test case" This reverts commit c95cfeaef74160894050ae51a563bf839384d2d7. --- .../src/amqp10_client_session.erl | 8 +- deps/amqp10_client/src/amqp10_msg.erl | 5 +- deps/amqp10_common/app.bzl | 2 +- deps/amqp10_common/include/amqp10_filtex.hrl | 15 + deps/rabbit/BUILD.bazel | 16 + deps/rabbit/Makefile | 2 +- deps/rabbit/app.bzl | 20 + deps/rabbit/ct.test.spec | 1 + deps/rabbit/src/mc.erl | 2 +- deps/rabbit/src/mc_amqp.erl | 42 +- deps/rabbit/src/rabbit_amqp_filtex.erl | 196 ++++++ deps/rabbit/src/rabbit_amqp_reader.erl | 6 +- deps/rabbit/src/rabbit_amqp_session.erl | 188 +++--- deps/rabbit/src/rabbit_amqp_util.erl | 11 +- deps/rabbit/src/rabbit_queue_type.erl | 8 + deps/rabbit/src/rabbit_quorum_queue.erl | 2 +- deps/rabbit/src/rabbit_stream_queue.erl | 62 +- deps/rabbit/test/amqp_address_SUITE.erl | 22 +- deps/rabbit/test/amqp_auth_SUITE.erl | 31 +- deps/rabbit/test/amqp_client_SUITE.erl | 137 +--- deps/rabbit/test/amqp_filtex_SUITE.erl | 591 ++++++++++++++++++ deps/rabbit/test/amqp_utils.erl | 139 ++++ .../test/protocol_interop_SUITE.erl | 71 ++- moduleindex.yaml | 1 + 24 files changed, 1275 insertions(+), 303 deletions(-) create mode 100644 deps/amqp10_common/include/amqp10_filtex.hrl create mode 100644 deps/rabbit/src/rabbit_amqp_filtex.erl create mode 100644 deps/rabbit/test/amqp_filtex_SUITE.erl create mode 100644 deps/rabbit/test/amqp_utils.erl diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index c1e5eb46214f..981e291a3853 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -737,15 +737,13 @@ translate_terminus_durability(configuration) -> 1; translate_terminus_durability(unsettled_state) -> 2. translate_filters(Filters) - when is_map(Filters) andalso - map_size(Filters) == 0 -> + when map_size(Filters) =:= 0 -> undefined; -translate_filters(Filters) - when is_map(Filters) -> +translate_filters(Filters) -> {map, maps:fold( fun - (<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) -> + (<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) -> %% special case conversion Key = sym(K), [{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc]; diff --git a/deps/amqp10_client/src/amqp10_msg.erl b/deps/amqp10_client/src/amqp10_msg.erl index fa046cc60657..0f60c9bb8c28 100644 --- a/deps/amqp10_client/src/amqp10_msg.erl +++ b/deps/amqp10_client/src/amqp10_msg.erl @@ -433,7 +433,10 @@ wrap_ap_value(V) when is_integer(V) -> case V < 0 of true -> {int, V}; false -> {uint, V} - end. + end; +wrap_ap_value(V) when is_number(V) -> + %% AMQP double and Erlang float are both 64-bit. + {double, V}. %% LOCAL header_value(durable, undefined) -> false; diff --git a/deps/amqp10_common/app.bzl b/deps/amqp10_common/app.bzl index a233c945cebe..5e41032a8eb3 100644 --- a/deps/amqp10_common/app.bzl +++ b/deps/amqp10_common/app.bzl @@ -72,7 +72,7 @@ def all_srcs(name = "all_srcs"): ) filegroup( name = "public_hdrs", - srcs = ["include/amqp10_framing.hrl", "include/amqp10_types.hrl"], + srcs = ["include/amqp10_filtex.hrl", "include/amqp10_framing.hrl", "include/amqp10_types.hrl"], ) filegroup( name = "private_hdrs", diff --git a/deps/amqp10_common/include/amqp10_filtex.hrl b/deps/amqp10_common/include/amqp10_filtex.hrl new file mode 100644 index 000000000000..a1743ea9669c --- /dev/null +++ b/deps/amqp10_common/include/amqp10_filtex.hrl @@ -0,0 +1,15 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + + +%% AMQP Filter Expressions Version 1.0 Working Draft 09 +%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227 + +-define(DESCRIPTOR_NAME_PROPERTIES_FILTER, <<"amqp:properties-filter">>). +-define(DESCRIPTOR_CODE_PROPERTIES_FILTER, 16#173). + +-define(DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER, <<"amqp:application-properties-filter">>). +-define(DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER, 16#174). diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 9bebe9be3ed5..d9910dc90e14 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -1207,6 +1207,7 @@ rabbitmq_integration_suite( name = "amqp_client_SUITE", size = "large", additional_beam = [ + ":test_amqp_utils_beam", ":test_event_recorder_beam", ], shard_count = 3, @@ -1215,6 +1216,16 @@ rabbitmq_integration_suite( ], ) +rabbitmq_integration_suite( + name = "amqp_filtex_SUITE", + additional_beam = [ + ":test_amqp_utils_beam", + ], + runtime_deps = [ + "//deps/rabbitmq_amqp_client:erlang_app", + ], +) + rabbitmq_integration_suite( name = "amqp_proxy_protocol_SUITE", size = "medium", @@ -1235,6 +1246,7 @@ rabbitmq_integration_suite( rabbitmq_integration_suite( name = "amqp_auth_SUITE", additional_beam = [ + ":test_amqp_utils_beam", ":test_event_recorder_beam", ], shard_count = 2, @@ -1246,6 +1258,9 @@ rabbitmq_integration_suite( rabbitmq_integration_suite( name = "amqp_address_SUITE", shard_count = 2, + additional_beam = [ + ":test_amqp_utils_beam", + ], runtime_deps = [ "//deps/rabbitmq_amqp_client:erlang_app", ], @@ -1358,6 +1373,7 @@ eunit( ":test_clustering_utils_beam", ":test_event_recorder_beam", ":test_rabbit_ct_hook_beam", + ":test_amqp_utils_beam", ], target = ":test_erlang_app", test_env = { diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 24110ce28db3..aad618f4211e 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -258,7 +258,7 @@ define ct_master.erl endef PARALLEL_CT_SET_1_A = amqp_client unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking -PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_system signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management +PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_system signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 4832861d9782..cf5a2d1769b7 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -45,6 +45,7 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_access_control.erl", "src/rabbit_alarm.erl", "src/rabbit_amqp1_0.erl", + "src/rabbit_amqp_filtex.erl", "src/rabbit_amqp_management.erl", "src/rabbit_amqp_reader.erl", "src/rabbit_amqp_session.erl", @@ -302,6 +303,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_access_control.erl", "src/rabbit_alarm.erl", "src/rabbit_amqp1_0.erl", + "src/rabbit_amqp_filtex.erl", "src/rabbit_amqp_management.erl", "src/rabbit_amqp_reader.erl", "src/rabbit_amqp_session.erl", @@ -578,6 +580,7 @@ def all_srcs(name = "all_srcs"): "src/rabbit_access_control.erl", "src/rabbit_alarm.erl", "src/rabbit_amqp1_0.erl", + "src/rabbit_amqp_filtex.erl", "src/rabbit_amqp_management.erl", "src/rabbit_amqp_reader.erl", "src/rabbit_amqp_session.erl", @@ -2195,3 +2198,20 @@ def test_suite_beam_files(name = "test_suite_beam_files"): erlc_opts = "//:test_erlc_opts", deps = ["//deps/amqp_client:erlang_app"], ) + erlang_bytecode( + name = "amqp_filtex_SUITE_beam_files", + testonly = True, + srcs = ["test/amqp_filtex_SUITE.erl"], + outs = ["test/amqp_filtex_SUITE.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + deps = ["//deps/amqp10_common:erlang_app"], + ) + erlang_bytecode( + name = "test_amqp_utils_beam", + testonly = True, + srcs = ["test/amqp_utils.erl"], + outs = ["test/amqp_utils.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + ) diff --git a/deps/rabbit/ct.test.spec b/deps/rabbit/ct.test.spec index e1027d06105f..60d65d2d5637 100644 --- a/deps/rabbit/ct.test.spec +++ b/deps/rabbit/ct.test.spec @@ -16,6 +16,7 @@ , amqp_auth_SUITE , amqp_client_SUITE , amqp_credit_api_v2_SUITE +, amqp_filtex_SUITE , amqp_proxy_protocol_SUITE , amqp_system_SUITE , amqpl_consumer_ack_SUITE diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 465c7054f089..9c23ac13daf8 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -301,7 +301,7 @@ message_id(BasicMsg) -> mc_compat:message_id(BasicMsg). -spec property(atom(), state()) -> - {utf8, binary()} | undefined. + tagged_value(). property(Property, #?MODULE{protocol = Proto, data = Data}) -> Proto:property(Property, Data); diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index be63597c3f96..ed6c4b4145d6 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -21,7 +21,7 @@ -define(MESSAGE_ANNOTATIONS_GUESS_SIZE, 100). --define(SIMPLE_VALUE(V), +-define(IS_SIMPLE_VALUE(V), is_binary(V) orelse is_number(V) orelse is_boolean(V)). @@ -145,16 +145,32 @@ property(Prop, #v1{bare_and_footer = Bin, Props = amqp10_framing:decode(PropsDescribed), property0(Prop, Props). -property0(correlation_id, #'v1_0.properties'{correlation_id = Corr}) -> - Corr; -property0(message_id, #'v1_0.properties'{message_id = MsgId}) -> - MsgId; -property0(user_id, #'v1_0.properties'{user_id = UserId}) -> - UserId; -property0(subject, #'v1_0.properties'{subject = Subject}) -> - Subject; -property0(to, #'v1_0.properties'{to = To}) -> - To; +property0(message_id, #'v1_0.properties'{message_id = Val}) -> + Val; +property0(user_id, #'v1_0.properties'{user_id = Val}) -> + Val; +property0(to, #'v1_0.properties'{to = Val}) -> + Val; +property0(subject, #'v1_0.properties'{subject = Val}) -> + Val; +property0(reply_to, #'v1_0.properties'{reply_to = Val}) -> + Val; +property0(correlation_id, #'v1_0.properties'{correlation_id = Val}) -> + Val; +property0(content_type, #'v1_0.properties'{content_type = Val}) -> + Val; +property0(content_encoding, #'v1_0.properties'{content_encoding = Val}) -> + Val; +property0(absolute_expiry_time, #'v1_0.properties'{absolute_expiry_time = Val}) -> + Val; +property0(creation_time, #'v1_0.properties'{creation_time = Val}) -> + Val; +property0(group_id, #'v1_0.properties'{group_id = Val}) -> + Val; +property0(group_sequence, #'v1_0.properties'{group_sequence = Val}) -> + Val; +property0(reply_to_group_id, #'v1_0.properties'{reply_to_group_id = Val}) -> + Val; property0(_Prop, #'v1_0.properties'{}) -> undefined. @@ -454,7 +470,7 @@ message_annotations_as_simple_map(#v1{message_annotations = Content}) -> message_annotations_as_simple_map0(Content) -> %% the section record format really is terrible lists:filtermap(fun({{symbol, K}, {_T, V}}) - when ?SIMPLE_VALUE(V) -> + when ?IS_SIMPLE_VALUE(V) -> {true, {K, V}}; (_) -> false @@ -480,7 +496,7 @@ application_properties_as_simple_map( application_properties_as_simple_map0(Content, L) -> %% the section record format really is terrible lists:foldl(fun({{utf8, K}, {_T, V}}, Acc) - when ?SIMPLE_VALUE(V) -> + when ?IS_SIMPLE_VALUE(V) -> [{K, V} | Acc]; ({{utf8, K}, V}, Acc) when V =:= undefined orelse is_boolean(V) -> diff --git a/deps/rabbit/src/rabbit_amqp_filtex.erl b/deps/rabbit/src/rabbit_amqp_filtex.erl new file mode 100644 index 000000000000..bcdd289e4723 --- /dev/null +++ b/deps/rabbit/src/rabbit_amqp_filtex.erl @@ -0,0 +1,196 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +%% AMQP Filter Expressions Version 1.0 Working Draft 09 +%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227 +-module(rabbit_amqp_filtex). + +-include_lib("amqp10_common/include/amqp10_filtex.hrl"). + +-export([validate/1, + filter/2]). + +-type simple_type() :: number() | binary() | atom(). +-type affix() :: {suffix, non_neg_integer(), binary()} | + {prefix, non_neg_integer(), binary()}. +-type filter_expression_value() :: simple_type() | affix(). +-type filter_expression() :: {properties, [{FieldName :: atom(), filter_expression_value()}]} | + {application_properties, [{binary(), filter_expression_value()}]}. +-type filter_expressions() :: [filter_expression()]. +-export_type([filter_expressions/0]). + +-spec validate(tuple()) -> + {ok, filter_expression()} | error. +validate({described, Descriptor, {map, KVList}}) -> + try validate0(Descriptor, KVList) + catch throw:{?MODULE, _, _} -> + error + end; +validate(_) -> + error. + +-spec filter(filter_expressions(), mc:state()) -> + boolean(). +filter(Filters, Mc) -> + %% "A message will pass through a filter-set if and only if + %% it passes through each of the named filters." [3.5.8] + lists:all(fun(Filter) -> + filter0(Filter, Mc) + end, Filters). + +%%%%%%%%%%%%%%%% +%%% Internal %%% +%%%%%%%%%%%%%%%% + +filter0({properties, KVList}, Mc) -> + %% "The filter evaluates to true if all properties enclosed in the filter expression + %% match the respective properties in the message." + %% [filtex-v1.0-wd09 4.2.4] + lists:all(fun({FieldName, RefVal}) -> + TaggedVal = mc:property(FieldName, Mc), + Val = unwrap(TaggedVal), + match_simple_type(RefVal, Val) + end, KVList); +filter0({application_properties, KVList}, Mc) -> + AppProps = mc:routing_headers(Mc, []), + %% "The filter evaluates to true if all properties enclosed in the filter expression + %% match the respective entries in the application-properties section in the message." + %% [filtex-v1.0-wd09 4.2.5] + lists:all(fun({Key, RefVal}) -> + case AppProps of + #{Key := Val} -> + match_simple_type(RefVal, Val); + _ -> + false + end + end, KVList). + +%% [filtex-v1.0-wd09 4.1.1] +%% "A reference field value in a property filter expression matches +%% its corresponding message metadata field value if: +%% [...] +match_simple_type(null, _Val) -> + %% * The reference field value is NULL + true; +match_simple_type({suffix, SuffixSize, Suffix}, Val) -> + %% * Suffix. The message metadata field matches the expression if the ordinal values of the + %% characters of the suffix expression equal the ordinal values of the same number of + %% characters trailing the message metadata field value. + case is_binary(Val) of + true -> + case Val of + <<_:(size(Val) - SuffixSize)/binary, Suffix:SuffixSize/binary>> -> + true; + _ -> + false + end; + false -> + false + end; +match_simple_type({prefix, PrefixSize, Prefix}, Val) -> + %% * Prefix. The message metadata field matches the expression if the ordinal values of the + %% characters of the prefix expression equal the ordinal values of the same number of + %% characters leading the message metadata field value. + case Val of + <> -> + true; + _ -> + false + end; +match_simple_type(RefVal, Val) -> + %% * the reference field value is of a floating-point or integer number type + %% and the message metadata field is of a different floating-point or integer number type, + %% the reference value and the metadata field value are within the value range of both types, + %% and the values are equal when treated as a floating-point" + RefVal == Val. + +validate0(Descriptor, KVList) when + (Descriptor =:= {symbol, ?DESCRIPTOR_NAME_PROPERTIES_FILTER} orelse + Descriptor =:= {ulong, ?DESCRIPTOR_CODE_PROPERTIES_FILTER}) andalso + KVList =/= [] -> + validate_props(KVList, []); +validate0(Descriptor, KVList0) when + (Descriptor =:= {symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER} orelse + Descriptor =:= {ulong, ?DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER}) andalso + KVList0 =/= [] -> + KVList = lists:map(fun({{utf8, Key}, {utf8, String}}) -> + {Key, parse_string_modifier_prefix(String)}; + ({{utf8, Key}, TaggedVal}) -> + {Key, unwrap(TaggedVal)} + end, KVList0), + {ok, {application_properties, KVList}}; +validate0(_, _) -> + error. + +validate_props([], Acc) -> + {ok, {properties, lists:reverse(Acc)}}; +validate_props([{{symbol, <<"message-id">>}, TaggedVal} | Rest], Acc) -> + case parse_message_id(TaggedVal) of + {ok, Val} -> + validate_props(Rest, [{message_id, Val} | Acc]); + error -> + error + end; +validate_props([{{symbol, <<"user-id">>}, {binary, Val}} | Rest], Acc) -> + validate_props(Rest, [{user_id, Val} | Acc]); +validate_props([{{symbol, <<"to">>}, {utf8, Val}} | Rest], Acc) -> + validate_props(Rest, [{to, parse_string_modifier_prefix(Val)} | Acc]); +validate_props([{{symbol, <<"subject">>}, {utf8, Val}} | Rest], Acc) -> + validate_props(Rest, [{subject, parse_string_modifier_prefix(Val)} | Acc]); +validate_props([{{symbol, <<"reply-to">>}, {utf8, Val}} | Rest], Acc) -> + validate_props(Rest, [{reply_to, parse_string_modifier_prefix(Val)} | Acc]); +validate_props([{{symbol, <<"correlation-id">>}, TaggedVal} | Rest], Acc) -> + case parse_message_id(TaggedVal) of + {ok, Val} -> + validate_props(Rest, [{correlation_id, Val} | Acc]); + error -> + error + end; +validate_props([{{symbol, <<"content-type">>}, {symbol, Val}} | Rest], Acc) -> + validate_props(Rest, [{content_type, Val} | Acc]); +validate_props([{{symbol, <<"content-encoding">>}, {symbol, Val}} | Rest], Acc) -> + validate_props(Rest, [{content_encoding, Val} | Acc]); +validate_props([{{symbol, <<"absolute-expiry-time">>}, {timestamp, Val}} | Rest], Acc) -> + validate_props(Rest, [{absolute_expiry_time, Val} | Acc]); +validate_props([{{symbol, <<"creation-time">>}, {timestamp, Val}} | Rest], Acc) -> + validate_props(Rest, [{creation_time, Val} | Acc]); +validate_props([{{symbol, <<"group-id">>}, {utf8, Val}} | Rest], Acc) -> + validate_props(Rest, [{group_id, parse_string_modifier_prefix(Val)} | Acc]); +validate_props([{{symbol, <<"group-sequence">>}, {uint, Val}} | Rest], Acc) -> + validate_props(Rest, [{group_sequence, Val} | Acc]); +validate_props([{{symbol, <<"reply-to-group-id">>}, {utf8, Val}} | Rest], Acc) -> + validate_props(Rest, [{reply_to_group_id, parse_string_modifier_prefix(Val)} | Acc]); +validate_props(_, _) -> + error. + +parse_message_id({ulong, Val}) -> + {ok, Val}; +parse_message_id({uuid, Val}) -> + {ok, Val}; +parse_message_id({binary, Val}) -> + {ok, Val}; +parse_message_id({utf8, Val}) -> + {ok, parse_string_modifier_prefix(Val)}; +parse_message_id(_) -> + error. + +%% [filtex-v1.0-wd09 4.1.1] +parse_string_modifier_prefix(<<"$s:", Suffix/binary>>) -> + {suffix, size(Suffix), Suffix}; +parse_string_modifier_prefix(<<"$p:", Prefix/binary>>) -> + {prefix, size(Prefix), Prefix}; +parse_string_modifier_prefix(<<"$$", _/binary>> = String) -> + %% "Escape prefix for case-sensitive matching of a string starting with ‘&’" + string:slice(String, 1); +parse_string_modifier_prefix(<<"$", _/binary>> = String) -> + throw({?MODULE, invalid_reference_field_value, String}); +parse_string_modifier_prefix(String) -> + String. + +unwrap({_Tag, V}) -> + V; +unwrap(V) -> + V. diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 0ad228a4e653..bcfa6a1dcc8c 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -518,16 +518,16 @@ handle_connection_frame( ok = rabbit_event:notify(connection_created, Infos), ok = rabbit_amqp1_0:register_connection(self()), Caps = [%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html#_Toc51331306 - {symbol, <<"LINK_PAIR_V1_0">>}, + <<"LINK_PAIR_V1_0">>, %% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay - {symbol, <<"ANONYMOUS-RELAY">>}], + <<"ANONYMOUS-RELAY">>], Open = #'v1_0.open'{ channel_max = {ushort, EffectiveChannelMax}, max_frame_size = {uint, IncomingMaxFrameSize}, %% "the value in idle-time-out SHOULD be half the peer's actual timeout threshold" [2.4.5] idle_time_out = {uint, ReceiveTimeoutMillis div 2}, container_id = {utf8, rabbit_nodes:cluster_name()}, - offered_capabilities = {array, symbol, Caps}, + offered_capabilities = rabbit_amqp_util:capabilities(Caps), properties = server_properties()}, ok = send_on_channel0(Sock, Open), State; diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 31d5348b56b5..3be9ea2b00fc 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -30,6 +30,12 @@ }} }). +-rabbit_deprecated_feature( + {amqp_filter_set_bug, + #{deprecation_phase => permitted_by_default, + doc_url => "https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-filter-set" + }}). + %% This is the link credit that we grant to sending clients. %% We are free to choose whatever we want, sending clients must obey. %% Default soft limits / credits in deps/rabbit/Makefile are: @@ -1284,12 +1290,13 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, end; handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, - name = LinkName, - handle = Handle = ?UINT(HandleInt), - source = Source, - snd_settle_mode = SndSettleMode, - rcv_settle_mode = RcvSettleMode, - max_message_size = MaybeMaxMessageSize} = Attach, + name = LinkName, + handle = Handle = ?UINT(HandleInt), + source = Source = #'v1_0.source'{filter = DesiredFilter}, + snd_settle_mode = SndSettleMode, + rcv_settle_mode = RcvSettleMode, + max_message_size = MaybeMaxMessageSize, + properties = Properties}, State0 = #state{queue_states = QStates0, outgoing_links = OutgoingLinks0, permission_cache = PermCache0, @@ -1359,6 +1366,10 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, credit_api_v1, credit_api_v1} end, + ConsumerArgs0 = parse_attach_properties(Properties), + {EffectiveFilter, ConsumerFilter, ConsumerArgs1} = + parse_filter(DesiredFilter), + ConsumerArgs = ConsumerArgs0 ++ ConsumerArgs1, Spec = #{no_ack => SndSettled, channel_pid => self(), limiter_pid => none, @@ -1366,11 +1377,14 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, mode => Mode, consumer_tag => handle_to_ctag(HandleInt), exclusive_consume => false, - args => consumer_arguments(Attach), + args => ConsumerArgs, + filter => ConsumerFilter, ok_msg => undefined, acting_user => Username}, case rabbit_queue_type:consume(Q, Spec, QStates0) of {ok, QStates} -> + OfferedCaps0 = rabbit_queue_type:amqp_capabilities(QType), + OfferedCaps = rabbit_amqp_util:capabilities(OfferedCaps0), A = #'v1_0.attach'{ name = LinkName, handle = Handle, @@ -1382,10 +1396,13 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, %% will be requeued. That's why the we only support RELEASED as the default outcome. source = Source#'v1_0.source'{ default_outcome = #'v1_0.released'{}, - outcomes = outcomes(Source)}, + outcomes = outcomes(Source), + %% "the sending endpoint sets the filter actually in place" [3.5.3] + filter = EffectiveFilter}, role = ?AMQP_ROLE_SENDER, %% Echo back that we will respect the client's requested max-message-size. - max_message_size = MaybeMaxMessageSize}, + max_message_size = MaybeMaxMessageSize, + offered_capabilities = OfferedCaps}, MaxMessageSize = max_message_size(MaybeMaxMessageSize), Link = #outgoing_link{ queue_name = queue_resource(Vhost, QNameBin), @@ -2705,11 +2722,10 @@ parse_target_v2_string(String) -> end. parse_target_v2_string0(<<"/exchanges/", Rest/binary>>) -> - Key = cp_slash, - Pattern = try persistent_term:get(Key) + Pattern = try persistent_term:get(cp_slash) catch error:badarg -> Cp = binary:compile_pattern(<<"/">>), - ok = persistent_term:put(Key, Cp), + ok = persistent_term:put(cp_slash, Cp), Cp end, case binary:split(Rest, Pattern, [global]) of @@ -2980,87 +2996,89 @@ encode_frames(T, Msg, MaxPayloadSize, Transfers) -> lists:reverse([[T, Msg] | Transfers]) end. -consumer_arguments(#'v1_0.attach'{ - source = #'v1_0.source'{filter = Filter}, - properties = Properties}) -> - properties_to_consumer_args(Properties) ++ - filter_to_consumer_args(Filter). - -properties_to_consumer_args({map, KVList}) -> +parse_attach_properties(undefined) -> + []; +parse_attach_properties({map, KVList}) -> Key = {symbol, <<"rabbitmq:priority">>}, case proplists:lookup(Key, KVList) of {Key, Val = {int, _Prio}} -> [mc_amqpl:to_091(<<"x-priority">>, Val)]; _ -> [] - end; -properties_to_consumer_args(_) -> - []. - -filter_to_consumer_args({map, KVList}) -> - filter_to_consumer_args( - [<<"rabbitmq:stream-offset-spec">>, - <<"rabbitmq:stream-filter">>, - <<"rabbitmq:stream-match-unfiltered">>], - KVList, - []); -filter_to_consumer_args(_) -> - []. + end. -filter_to_consumer_args([], _KVList, Acc) -> - Acc; -filter_to_consumer_args([<<"rabbitmq:stream-offset-spec">> = H | T], KVList, Acc) -> - Key = {symbol, H}, - Arg = case keyfind_unpack_described(Key, KVList) of - {_, {timestamp, Ts}} -> - [{<<"x-stream-offset">>, timestamp, Ts div 1000}]; %% 0.9.1 uses second based timestamps - {_, {utf8, Spec}} -> - [{<<"x-stream-offset">>, longstr, Spec}]; %% next, last, first and "10m" etc - {_, {_, Offset}} when is_integer(Offset) -> - [{<<"x-stream-offset">>, long, Offset}]; %% integer offset - _ -> - [] - end, - filter_to_consumer_args(T, KVList, Arg ++ Acc); -filter_to_consumer_args([<<"rabbitmq:stream-filter">> = H | T], KVList, Acc) -> - Key = {symbol, H}, - Arg = case keyfind_unpack_described(Key, KVList) of - {_, {list, Filters0}} when is_list(Filters0) -> - Filters = lists:foldl(fun({utf8, Filter}, L) -> - [{longstr, Filter} | L]; - (_, L) -> - L - end, [], Filters0), - [{<<"x-stream-filter">>, array, Filters}]; - {_, {utf8, Filter}} -> - [{<<"x-stream-filter">>, longstr, Filter}]; - _ -> - [] - end, - filter_to_consumer_args(T, KVList, Arg ++ Acc); -filter_to_consumer_args([<<"rabbitmq:stream-match-unfiltered">> = H | T], KVList, Acc) -> - Key = {symbol, H}, - Arg = case keyfind_unpack_described(Key, KVList) of - {_, MU} when is_boolean(MU) -> - [{<<"x-stream-match-unfiltered">>, bool, MU}]; - _ -> - [] - end, - filter_to_consumer_args(T, KVList, Arg ++ Acc); -filter_to_consumer_args([_ | T], KVList, Acc) -> - filter_to_consumer_args(T, KVList, Acc). - -keyfind_unpack_described(Key, KvList) -> - %% filterset values _should_ be described values - %% they aren't always however for historical reasons so we need this bit of - %% code to return a plain value for the given filter key - case lists:keyfind(Key, 1, KvList) of - {Key, {described, Key, Value}} -> - {Key, Value}; - {Key, _} = Kv -> - Kv; +parse_filter(undefined) -> + {undefined, [], []}; +parse_filter({map, DesiredKVList}) -> + {EffectiveKVList, ConsusumerFilter, ConsumerArgs} = + lists:foldr(fun parse_filters/2, {[], [], []}, DesiredKVList), + {{map, EffectiveKVList}, ConsusumerFilter, ConsumerArgs}. + +parse_filters(Filter = {{symbol, _Key}, {described, {symbol, <<"rabbitmq:stream-offset-spec">>}, Value}}, + Acc = {EffectiveFilters, ConsumerFilter, ConsumerArgs}) -> + case Value of + {timestamp, Ts} -> + %% 0.9.1 uses second based timestamps + Arg = {<<"x-stream-offset">>, timestamp, Ts div 1000}, + {[Filter | EffectiveFilters], ConsumerFilter, [Arg | ConsumerArgs]}; + {utf8, Spec} -> + %% next, last, first and "10m" etc + Arg = {<<"x-stream-offset">>, longstr, Spec}, + {[Filter | EffectiveFilters], ConsumerFilter, [Arg | ConsumerArgs]}; + {_Type, Offset} + when is_integer(Offset) andalso Offset >= 0 -> + Arg = {<<"x-stream-offset">>, long, Offset}, + {[Filter | EffectiveFilters], ConsumerFilter, [Arg | ConsumerArgs]}; + _ -> + Acc + end; +parse_filters(Filter = {{symbol, _Key}, {described, {symbol, <<"rabbitmq:stream-filter">>}, Value}}, + Acc = {EffectiveFilters, ConsumerFilter, ConsumerArgs}) -> + case Value of + {list, Filters0} -> + Filters = lists:filtermap(fun({utf8, Filter0}) -> + {true, {longstr, Filter0}}; + (_) -> + false + end, Filters0), + Arg = {<<"x-stream-filter">>, array, Filters}, + {[Filter | EffectiveFilters], ConsumerFilter, [Arg | ConsumerArgs]}; + + {utf8, Filter0} -> + Arg = {<<"x-stream-filter">>, longstr, Filter0}, + {[Filter | EffectiveFilters], ConsumerFilter, [Arg | ConsumerArgs]}; + _ -> + Acc + end; +parse_filters(Filter = {{symbol, _Key}, {described, {symbol, <<"rabbitmq:stream-match-unfiltered">>}, Match}}, + {EffectiveFilters, ConsumerFilter, ConsumerArgs}) + when is_boolean(Match) -> + Arg = {<<"x-stream-match-unfiltered">>, bool, Match}, + {[Filter | EffectiveFilters], ConsumerFilter, [Arg | ConsumerArgs]}; +parse_filters({Symbol = {symbol, <<"rabbitmq:stream-", _/binary>>}, Value}, Acc) + when element(1, Value) =/= described -> + case rabbit_deprecated_features:is_permitted(amqp_filter_set_bug) of + true -> + parse_filters({Symbol, {described, Symbol, Value}}, Acc); false -> - false + Acc + end; +parse_filters(Filter = {{symbol, _Key}, Value}, + Acc = {EffectiveFilters, ConsumerFilter, ConsumerArgs}) -> + case rabbit_amqp_filtex:validate(Value) of + {ok, FilterExpression = {FilterType, _}} -> + case proplists:is_defined(FilterType, ConsumerFilter) of + true -> + %% For now, let's prohibit multiple top level filters of the same type + %% (properties or application-properties). There should be no use case. + %% In future, we can allow multiple times the same top level grouping + %% filter expression type (all/any/not). + Acc; + false -> + {[Filter | EffectiveFilters], [FilterExpression | ConsumerFilter], ConsumerArgs} + end; + error -> + Acc end. validate_attach(#'v1_0.attach'{target = #'v1_0.coordinator'{}}) -> diff --git a/deps/rabbit/src/rabbit_amqp_util.erl b/deps/rabbit/src/rabbit_amqp_util.erl index 3257cef93704..e1ef95d77fad 100644 --- a/deps/rabbit/src/rabbit_amqp_util.erl +++ b/deps/rabbit/src/rabbit_amqp_util.erl @@ -8,7 +8,8 @@ -module(rabbit_amqp_util). -include("rabbit_amqp.hrl"). --export([protocol_error/3]). +-export([protocol_error/3, + capabilities/1]). -spec protocol_error(term(), io:format(), [term()]) -> no_return(). @@ -17,3 +18,11 @@ protocol_error(Condition, Msg, Args) -> Reason = #'v1_0.error'{condition = Condition, description = {utf8, Description}}, exit(Reason). + +-spec capabilities([binary()]) -> + undefined | {array, symbol, [{symbol, binary()}]}. +capabilities([]) -> + undefined; +capabilities(Capabilities) -> + Caps = [{symbol, C} || C <- Capabilities], + {array, symbol, Caps}. diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 938588da6662..219fccdf2f27 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -54,6 +54,7 @@ fold_state/3, is_policy_applicable/2, is_server_named_allowed/1, + amqp_capabilities/1, arguments/1, arguments/2, notify_decorators/1, @@ -125,6 +126,7 @@ consumer_tag := rabbit_types:ctag(), exclusive_consume => boolean(), args => rabbit_framing:amqp_table(), + filter => rabbit_amqp_filtex:filter_expressions(), ok_msg := term(), acting_user := rabbit_types:username()}. -type cancel_reason() :: cancel | remove. @@ -476,6 +478,12 @@ is_server_named_allowed(Type) -> Capabilities = Type:capabilities(), maps:get(server_named, Capabilities, false). +-spec amqp_capabilities(queue_type()) -> + [binary()]. +amqp_capabilities(Type) -> + Capabilities = Type:capabilities(), + maps:get(?FUNCTION_NAME, Capabilities, []). + -spec arguments(arguments()) -> [binary()]. arguments(ArgumentType) -> Args0 = lists:map(fun(T) -> diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index f936891d0560..18cc10f55ef8 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -925,7 +925,7 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) -> exclusive_consume := ExclusiveConsume, args := Args, ok_msg := OkMsg, - acting_user := ActingUser} = Spec, + acting_user := ActingUser} = Spec, %% TODO: validate consumer arguments %% currently quorum queues do not support any arguments QName = amqqueue:get_name(Q), diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index a7aa3a5a18cc..a011dc09a650 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -78,13 +78,14 @@ ack :: boolean(), start_offset = 0 :: non_neg_integer(), listening_offset = 0 :: non_neg_integer(), - last_consumed_offset = 0 :: non_neg_integer(), + last_consumed_offset :: non_neg_integer(), log :: undefined | osiris_log:state(), chunk_iterator :: undefined | osiris_log:chunk_iterator(), %% These messages were already read ahead from the Osiris log, %% were part of an uncompressed sub batch, and are buffered in %% reversed order until the consumer has more credits to consume them. buffer_msgs_rev = [] :: [rabbit_amqqueue:qmsg()], + filter :: rabbit_amqp_filtex:filter_expressions(), reader_options :: map()}). -record(stream_client, {stream_id :: string(), @@ -333,7 +334,8 @@ consume(Q, Spec, #stream_client{} = QState0) %% begins sending maybe_send_reply(ChPid, OkMsg), _ = rabbit_stream_coordinator:register_local_member_listener(Q), - begin_stream(QState, ConsumerTag, OffsetSpec, Mode, AckRequired, filter_spec(Args)) + Filter = maps:get(filter, Spec, []), + begin_stream(QState, ConsumerTag, OffsetSpec, Mode, AckRequired, Filter, filter_spec(Args)) end; {undefined, _} -> {protocol_error, precondition_failed, @@ -424,7 +426,7 @@ query_local_pid(#stream_client{stream_id = StreamId} = State) -> begin_stream(#stream_client{name = QName, readers = Readers0, local_pid = LocalPid} = State, - Tag, Offset, Mode, AckRequired, Options) + Tag, Offset, Mode, AckRequired, Filter, Options) when is_pid(LocalPid) -> CounterSpec = {{?MODULE, QName, Tag, self()}, []}, {ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options), @@ -451,6 +453,7 @@ begin_stream(#stream_client{name = QName, listening_offset = NextOffset, last_consumed_offset = StartOffset, log = Seg0, + filter = Filter, reader_options = Options}, {ok, State#stream_client{readers = Readers0#{Tag => Str0}}}. @@ -1158,7 +1161,8 @@ stream_entries(QName, Name, LocalPid, #stream{chunk_iterator = Iter0, delivery_count = DC, credit = Credit, - start_offset = StartOffset} = Str0, Acc0) -> + start_offset = StartOffset, + filter = Filter} = Str0, Acc0) -> case osiris_log:iterator_next(Iter0) of end_of_chunk -> case chunk_iterator(Str0, LocalPid) of @@ -1172,7 +1176,7 @@ stream_entries(QName, Name, LocalPid, {batch, _NumRecords, 0, _Len, BatchedEntries} -> {MsgsRev, NumMsgs} = parse_uncompressed_subbatch( BatchedEntries, Offset, StartOffset, - QName, Name, LocalPid, {[], 0}), + QName, Name, LocalPid, Filter, {[], 0}), case Credit >= NumMsgs of true -> {Str0#stream{chunk_iterator = Iter, @@ -1199,12 +1203,19 @@ stream_entries(QName, Name, LocalPid, _SimpleEntry -> case Offset >= StartOffset of true -> - Msg = entry_to_msg(Entry, Offset, QName, Name, LocalPid), - {Str0#stream{chunk_iterator = Iter, - delivery_count = delivery_count_add(DC, 1), - credit = Credit - 1, - last_consumed_offset = Offset}, - [Msg | Acc0]}; + case entry_to_msg(Entry, Offset, QName, + Name, LocalPid, Filter) of + none -> + {Str0#stream{chunk_iterator = Iter, + last_consumed_offset = Offset}, + Acc0}; + Msg -> + {Str0#stream{chunk_iterator = Iter, + delivery_count = delivery_count_add(DC, 1), + credit = Credit - 1, + last_consumed_offset = Offset}, + [Msg | Acc0]} + end; false -> {Str0#stream{chunk_iterator = Iter}, Acc0} end @@ -1236,25 +1247,30 @@ chunk_iterator(#stream{credit = Credit, end. %% Deliver each record of an uncompressed sub batch individually. -parse_uncompressed_subbatch(<<>>, _Offset, _StartOffset, _QName, _Name, _LocalPid, Acc) -> +parse_uncompressed_subbatch( + <<>>, _Offset, _StartOffset, _QName, _Name, _LocalPid, _Filter, Acc) -> Acc; parse_uncompressed_subbatch( <<0:1, %% simple entry Len:31/unsigned, Entry:Len/binary, Rem/binary>>, - Offset, StartOffset, QName, Name, LocalPid, Acc0 = {AccList, AccCount}) -> + Offset, StartOffset, QName, Name, LocalPid, Filter, Acc0 = {AccList, AccCount}) -> Acc = case Offset >= StartOffset of true -> - Msg = entry_to_msg(Entry, Offset, QName, Name, LocalPid), - {[Msg | AccList], AccCount + 1}; + case entry_to_msg(Entry, Offset, QName, Name, LocalPid, Filter) of + none -> + Acc0; + Msg -> + {[Msg | AccList], AccCount + 1} + end; false -> Acc0 end, - parse_uncompressed_subbatch(Rem, Offset + 1, StartOffset, QName, Name, LocalPid, Acc). + parse_uncompressed_subbatch(Rem, Offset + 1, StartOffset, QName, + Name, LocalPid, Filter, Acc). -entry_to_msg(Entry, Offset, #resource{kind = queue, - name = QName}, Name, LocalPid) -> +entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName}, Name, LocalPid, Filter) -> Mc0 = mc:init(mc_amqp, Entry, #{}), %% If exchange or routing_keys annotation isn't present the entry most likely came %% from the rabbitmq-stream plugin so we'll choose defaults that simulate use @@ -1268,7 +1284,12 @@ entry_to_msg(Entry, Offset, #resource{kind = queue, _ -> Mc1 end, Mc = mc:set_annotation(<<"x-stream-offset">>, Offset, Mc2), - {Name, LocalPid, Offset, false, Mc}. + case rabbit_amqp_filtex:filter(Filter, Mc) of + true -> + {Name, LocalPid, Offset, false, Mc}; + false -> + none + end. capabilities() -> #{unsupported_policies => [%% Classic policies @@ -1288,6 +1309,9 @@ capabilities() -> consumer_arguments => [<<"x-stream-offset">>, <<"x-stream-filter">>, <<"x-stream-match-unfiltered">>], + %% AMQP property filter expressions + %% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227 + amqp_capabilities => [<<"AMQP_FILTEX_PROP_V1_0">>], server_named => false}. notify_decorators(Q) when ?is_amqqueue(Q) -> diff --git a/deps/rabbit/test/amqp_address_SUITE.erl b/deps/rabbit/test/amqp_address_SUITE.erl index 910e1068eeed..f5a0f74b8932 100644 --- a/deps/rabbit/test/amqp_address_SUITE.erl +++ b/deps/rabbit/test/amqp_address_SUITE.erl @@ -18,6 +18,9 @@ [rpc/4]). -import(rabbit_ct_helpers, [eventually/1]). +-import(amqp_utils, + [flush/1, + wait_for_credit/1]). all() -> [ @@ -651,17 +654,6 @@ connection_config(Config) -> container_id => <<"my container">>, sasl => {plain, <<"guest">>, <<"guest">>}}. -% before we can send messages we have to wait for credit from the server -wait_for_credit(Sender) -> - receive - {amqp10_event, {link, Sender, credited}} -> - flush(?FUNCTION_NAME), - ok - after 5000 -> - flush(?FUNCTION_NAME), - ct:fail(?FUNCTION_NAME) - end. - wait_for_settled(State, Tag) -> receive {amqp10_disposition, {State, Tag}} -> @@ -671,11 +663,3 @@ wait_for_settled(State, Tag) -> flush(Reason), ct:fail(Reason) end. - -flush(Prefix) -> - receive Msg -> - ct:pal("~tp flushed: ~p~n", [Prefix, Msg]), - flush(Prefix) - after 1 -> - ok - end. diff --git a/deps/rabbit/test/amqp_auth_SUITE.erl b/deps/rabbit/test/amqp_auth_SUITE.erl index 920f779172d4..6bd905a9242f 100644 --- a/deps/rabbit/test/amqp_auth_SUITE.erl +++ b/deps/rabbit/test/amqp_auth_SUITE.erl @@ -21,6 +21,10 @@ -import(event_recorder, [assert_event_type/2, assert_event_prop/2]). +-import(amqp_utils, + [flush/1, + wait_for_credit/1, + close_connection_sync/1]). all() -> [ @@ -1077,34 +1081,7 @@ amqp_error(Condition, Description) condition = Condition, description = {utf8, Description}}. -% before we can send messages we have to wait for credit from the server -wait_for_credit(Sender) -> - receive - {amqp10_event, {link, Sender, credited}} -> - flush(?FUNCTION_NAME), - ok - after 5000 -> - flush("wait_for_credit timed out"), - ct:fail(credited_timeout) - end. - -flush(Prefix) -> - receive Msg -> - ct:pal("~ts flushed: ~p~n", [Prefix, Msg]), - flush(Prefix) - after 1 -> - ok - end. - delete_all_queues(Config) -> Qs = rpc(Config, rabbit_amqqueue, list, []), [{ok, _QLen} = rpc(Config, rabbit_amqqueue, delete, [Q, false, false, <<"fake-user">>]) || Q <- Qs]. - -close_connection_sync(Connection) - when is_pid(Connection) -> - ok = amqp10_client:close_connection(Connection), - receive {amqp10_event, {connection, Connection, {closed, normal}}} -> ok - after 5000 -> flush(missing_closed), - ct:fail("missing CLOSE from server") - end. diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index acc4dd004cd8..e8c64690a012 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -27,6 +27,17 @@ -import(event_recorder, [assert_event_type/2, assert_event_prop/2]). +-import(amqp_utils, + [init/1, init/2, + connection_config/1, connection_config/2, + flush/1, + wait_for_credit/1, + wait_for_accepts/1, + send_messages/3, send_messages/4, + detach_link_sync/1, + end_session_sync/1, + wait_for_session_end/1, + close_connection_sync/1]). all() -> [ @@ -100,7 +111,7 @@ groups() -> max_message_size_client_to_server, max_message_size_server_to_client, global_counters, - stream_filtering, + stream_bloom_filter, available_messages_classic_queue, available_messages_quorum_queue, available_messages_stream, @@ -3255,7 +3266,7 @@ target_queue_deleted(Config) -> after 5000 -> ct:fail({missing_accepted, DTag1}) end, - N0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + N0 = get_node_config(Config, 0, nodename), RaName = ra_name(QuorumQ), ServerId0 = {RaName, N0}, {ok, Members, _Leader} = ra:members(ServerId0), @@ -3937,7 +3948,7 @@ global_counters(Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). -stream_filtering(Config) -> +stream_bloom_filter(Config) -> Stream = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(Stream), Ch = rabbit_ct_client_helpers:open_channel(Config), @@ -4476,7 +4487,7 @@ handshake_timeout(Config) -> Par = ?FUNCTION_NAME, {ok, DefaultVal} = rpc(Config, application, get_env, [App, Par]), ok = rpc(Config, application, set_env, [App, Par, 200]), - Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + Port = get_node_config(Config, 0, tcp_port_amqp), {ok, Socket} = gen_tcp:connect("localhost", Port, [{active, false}]), ?assertEqual({error, closed}, gen_tcp:recv(Socket, 0, 400)), ok = rpc(Config, application, set_env, [App, Par, DefaultVal]). @@ -5762,16 +5773,6 @@ link_max_per_session(Config) -> %% internal %% -init(Config) -> - init(0, Config). - -init(Node, Config) -> - OpnConf = connection_config(Node, Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), - {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), - {Connection, Session, LinkPair}. - receive_all_messages(Receiver, Accept) -> receive_all_messages0(Receiver, Accept, []). @@ -5786,26 +5787,6 @@ receive_all_messages0(Receiver, Accept, Acc) -> lists:reverse(Acc) end. -connection_config(Config) -> - connection_config(0, Config). - -connection_config(Node, Config) -> - Host = ?config(rmq_hostname, Config), - Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp), - #{address => Host, - port => Port, - container_id => <<"my container">>, - sasl => {plain, <<"guest">>, <<"guest">>}}. - -flush(Prefix) -> - receive - Msg -> - ct:pal("~p flushed: ~p~n", [Prefix, Msg]), - flush(Prefix) - after 1 -> - ok - end. - open_and_close_connection(Config) -> OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -5814,58 +5795,6 @@ open_and_close_connection(Config) -> end, ok = close_connection_sync(Connection). -% before we can send messages we have to wait for credit from the server -wait_for_credit(Sender) -> - receive - {amqp10_event, {link, Sender, credited}} -> - ok - after 5000 -> - flush("wait_for_credit timed out"), - ct:fail(credited_timeout) - end. - -detach_link_sync(Link) -> - ok = amqp10_client:detach_link(Link), - ok = wait_for_link_detach(Link). - -wait_for_link_detach(Link) -> - receive - {amqp10_event, {link, Link, {detached, normal}}} -> - flush(?FUNCTION_NAME), - ok - after 5000 -> - flush("wait_for_link_detach timed out"), - ct:fail({link_detach_timeout, Link}) - end. - -end_session_sync(Session) -> - ok = amqp10_client:end_session(Session), - ok = wait_for_session_end(Session). - -wait_for_session_end(Session) -> - receive - {amqp10_event, {session, Session, {ended, _}}} -> - flush(?FUNCTION_NAME), - ok - after 5000 -> - flush("wait_for_session_end timed out"), - ct:fail({session_end_timeout, Session}) - end. - -close_connection_sync(Connection) -> - ok = amqp10_client:close_connection(Connection), - ok = wait_for_connection_close(Connection). - -wait_for_connection_close(Connection) -> - receive - {amqp10_event, {connection, Connection, {closed, normal}}} -> - flush(?FUNCTION_NAME), - ok - after 5000 -> - flush("wait_for_connection_close timed out"), - ct:fail({connection_close_timeout, Connection}) - end. - wait_for_accepted(Tag) -> wait_for_settlement(Tag, accepted). @@ -5878,16 +5807,6 @@ wait_for_settlement(Tag, State) -> ct:fail({settled_timeout, Tag}) end. -wait_for_accepts(0) -> - ok; -wait_for_accepts(N) -> - receive - {amqp10_disposition,{accepted,_}} -> - wait_for_accepts(N - 1) - after 5000 -> - ct:fail({missing_accepted, N}) - end. - delete_queue(Session, QName) -> {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync( Session, <<"delete queue">>), @@ -5938,32 +5857,6 @@ count_received_messages0(Receiver, Count) -> Count end. -send_messages(Sender, Left, Settled) -> - send_messages(Sender, Left, Settled, <<>>). - -send_messages(_, 0, _, _) -> - ok; -send_messages(Sender, Left, Settled, BodySuffix) -> - Bin = integer_to_binary(Left), - Body = <>, - Msg = amqp10_msg:new(Bin, Body, Settled), - case amqp10_client:send_msg(Sender, Msg) of - ok -> - send_messages(Sender, Left - 1, Settled, BodySuffix); - {error, insufficient_credit} -> - ok = wait_for_credit(Sender), - %% The credited event we just processed could have been received some time ago, - %% i.e. we might have 0 credits right now. This happens in the following scenario: - %% 1. We (test case proc) send a message successfully, the client session proc decrements remaining link credit from 1 to 0. - %% 2. The server grants our client session proc new credits. - %% 3. The client session proc sends us (test case proc) a credited event. - %% 4. We didn't even notice that we ran out of credits temporarily. We send the next message, it succeeds, - %% but do not process the credited event in our mailbox. - %% So, we must be defensive here and assume that the next amqp10_client:send/2 call might return {error, insufficient_credit} - %% again causing us then to really wait to receive a credited event (instead of just processing an old credited event). - send_messages(Sender, Left, Settled, BodySuffix) - end. - assert_link_credit_runs_out(_Sender, 0) -> ct:fail(sufficient_link_credit); assert_link_credit_runs_out(Sender, Left) -> diff --git a/deps/rabbit/test/amqp_filtex_SUITE.erl b/deps/rabbit/test/amqp_filtex_SUITE.erl new file mode 100644 index 000000000000..51469821a83b --- /dev/null +++ b/deps/rabbit/test/amqp_filtex_SUITE.erl @@ -0,0 +1,591 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +%% Test suite for +%% AMQP Filter Expressions Version 1.0 Working Draft 09 +-module(amqp_filtex_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp10_common/include/amqp10_filtex.hrl"). + +-compile([nowarn_export_all, + export_all]). + +-import(rabbit_ct_broker_helpers, + [rpc/4]). +-import(rabbit_ct_helpers, + [eventually/1]). +-import(amqp_utils, + [init/1, + flush/1, + wait_for_credit/1, + wait_for_accepts/1, + send_messages/3, + detach_link_sync/1, + end_session_sync/1, + wait_for_session_end/1, + close_connection_sync/1]). + +all() -> + [ + {group, cluster_size_1} + ]. + +groups() -> + [ + {cluster_size_1, [shuffle], + [ + properties_section, + application_properties_section, + multiple_sections, + filter_few_messages_from_many, + string_modifier + ]} + ]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(amqp10_client), + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:merge_app_env( + Config, {rabbit, [{quorum_tick_interval, 1000}, + {stream_tick_interval, 1000} + ]}). + +end_per_suite(Config) -> + Config. + +init_per_group(_Group, Config) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config( + Config, [{rmq_nodename_suffix, Suffix}]), + rabbit_ct_helpers:run_setup_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_, Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + %% Assert that every testcase cleaned up. + eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))), + %% Wait for sessions to terminate before starting the next test case. + eventually(?_assertEqual([], rpc(Config, rabbit_amqp_session, list_local, []))), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +properties_section(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + {Connection, Session, LinkPair} = init(Config), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + Now = erlang:system_time(millisecond), + To = rabbitmq_amqp_address:exchange(<<"some exchange">>, <<"routing key">>), + ReplyTo = rabbitmq_amqp_address:queue(<<"some queue">>), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{message_id => {ulong, 999}, + user_id => <<"guest">>, + to => To, + subject => <<"🐇"/utf8>>, + reply_to => ReplyTo, + correlation_id => <<"corr-123">>, + content_type => <<"text/plain">>, + content_encoding => <<"some encoding">>, + absolute_expiry_time => Now + 100_000, + creation_time => Now, + group_id => <<"my group ID">>, + group_sequence => 16#ff_ff_ff_ff, + reply_to_group_id => <<"other group ID">>}, + amqp10_msg:new(<<"t1">>, <<"m1">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:new(<<"t2">>, <<"m2">>)), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{group_id => <<"my group ID">>}, + amqp10_msg:new(<<"t3">>, <<"m3">>))), + + ok = wait_for_accepts(3), + ok = detach_link_sync(Sender), + flush(sent), + + PropsFilter1 = [ + {{symbol, <<"message-id">>}, {ulong, 999}}, + {{symbol, <<"user-id">>}, {binary, <<"guest">>}}, + {{symbol, <<"subject">>}, {utf8, <<"🐇"/utf8>>}}, + {{symbol, <<"to">>}, {utf8, To}}, + {{symbol, <<"reply-to">>}, {utf8, ReplyTo}}, + {{symbol, <<"correlation-id">>}, {utf8, <<"corr-123">>}}, + {{symbol, <<"content-type">>}, {symbol, <<"text/plain">>}}, + {{symbol, <<"content-encoding">>}, {symbol, <<"some encoding">>}}, + {{symbol, <<"absolute-expiry-time">>}, {timestamp, Now + 100_000}}, + {{symbol, <<"creation-time">>}, {timestamp, Now}}, + {{symbol, <<"group-id">>}, {utf8, <<"my group ID">>}}, + {{symbol, <<"group-sequence">>}, {uint, 16#ff_ff_ff_ff}}, + {{symbol, <<"reply-to-group-id">>}, {utf8, <<"other group ID">>}} + ], + Filter1 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter1}}, + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + settled, configuration, Filter1), + ok = amqp10_client:flow_link_credit(Receiver1, 10, never), + receive {amqp10_msg, Receiver1, R1M1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver1), + + PropsFilter2 = [{{symbol, <<"group-id">>}, {utf8, <<"my group ID">>}}], + Filter2 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter2}}, + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + unsettled, configuration, Filter2), + {ok, R2M1} = amqp10_client:get_msg(Receiver2), + {ok, R2M2} = amqp10_client:get_msg(Receiver2), + ok = amqp10_client:accept_msg(Receiver2, R2M1), + ok = amqp10_client:accept_msg(Receiver2, R2M2), + ?assertEqual([<<"m1">>], amqp10_msg:body(R2M1)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R2M2)), + ok = detach_link_sync(Receiver2), + + %% Filter is in place, but no message matches. + PropsFilter3 = [{{symbol, <<"group-id">>}, {utf8, <<"no match">>}}], + Filter3 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter3}}, + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session, <<"receiver 3">>, Address, + unsettled, configuration, Filter3), + ok = amqp10_client:flow_link_credit(Receiver3, 10, never), + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver3), + + %% Wrong type should fail validation in the server. + %% RabbitMQ should exclude this filter in its reply attach frame because + %% "the sending endpoint [RabbitMQ] sets the filter actually in place". + %% Hence, no filter expression is actually in place and we should receive all messages. + PropsFilter4 = [{{symbol, <<"group-id">>}, {uint, 3}}], + Filter4 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter4}}, + {ok, Receiver4} = amqp10_client:attach_receiver_link( + Session, <<"receiver 4">>, Address, + unsettled, configuration, Filter4), + {ok, R4M1} = amqp10_client:get_msg(Receiver4), + {ok, R4M2} = amqp10_client:get_msg(Receiver4), + {ok, R4M3} = amqp10_client:get_msg(Receiver4), + ok = amqp10_client:accept_msg(Receiver4, R4M1), + ok = amqp10_client:accept_msg(Receiver4, R4M2), + ok = amqp10_client:accept_msg(Receiver4, R4M3), + ?assertEqual([<<"m1">>], amqp10_msg:body(R4M1)), + ?assertEqual([<<"m2">>], amqp10_msg:body(R4M2)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R4M3)), + ok = detach_link_sync(Receiver4), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +application_properties_section(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + {Connection, Session, LinkPair} = init(Config), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"k1">> => -2, + <<"k2">> => 10, + <<"k3">> => false, + <<"k4">> => true, + <<"k5">> => <<"hey">>}, + amqp10_msg:new(<<"t1">>, <<"m1">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"k2">> => 10.1}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:new(<<"t3">>, <<"m3">>)), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"k2">> => 10.0}, + amqp10_msg:new(<<"t4">>, <<"m4">>))), + + ok = wait_for_accepts(4), + ok = detach_link_sync(Sender), + flush(sent), + + AppPropsFilter0 = [{{utf8, <<"k5">>}, {symbol, <<"no match">>}}], + Filter0 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter0}}, + {ok, Receiver0} = amqp10_client:attach_receiver_link( + Session, <<"receiver 0">>, Address, + unsettled, configuration, Filter0), + ok = amqp10_client:flow_link_credit(Receiver0, 10, never), + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver0), + + AppPropsFilter1 = [ + {{utf8, <<"k1">>}, {int, -2}}, + {{utf8, <<"k5">>}, {symbol, <<"hey">>}}, + {{utf8, <<"k4">>}, {boolean, true}}, + {{utf8, <<"k3">>}, false} + ], + Filter1 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1}}, + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + settled, configuration, Filter1), + ok = amqp10_client:flow_link_credit(Receiver1, 10, never), + receive {amqp10_msg, Receiver1, R1M1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver1), + + %% Due to simple type matching [filtex-v1.0-wd09 §4.1.1] + %% we expect integer 10 to also match number 10.0. + AppPropsFilter2 = [{{utf8, <<"k2">>}, {uint, 10}}], + Filter2 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter2}}, + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + unsettled, configuration, Filter2), + {ok, R2M1} = amqp10_client:get_msg(Receiver2), + {ok, R2M2} = amqp10_client:get_msg(Receiver2), + ok = amqp10_client:accept_msg(Receiver2, R2M1), + ok = amqp10_client:accept_msg(Receiver2, R2M2), + ?assertEqual([<<"m1">>], amqp10_msg:body(R2M1)), + ?assertEqual([<<"m4">>], amqp10_msg:body(R2M2)), + ok = detach_link_sync(Receiver2), + + %% A reference field value of NULL should always match. [filtex-v1.0-wd09 §4.1.1] + AppPropsFilter3 = [{{utf8, <<"k2">>}, null}], + Filter3 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter3}}, + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session, <<"receiver 3">>, Address, + unsettled, configuration, Filter3), + {ok, R3M1} = amqp10_client:get_msg(Receiver3), + {ok, R3M2} = amqp10_client:get_msg(Receiver3), + {ok, R3M3} = amqp10_client:get_msg(Receiver3), + ok = amqp10_client:accept_msg(Receiver3, R3M1), + ok = amqp10_client:accept_msg(Receiver3, R3M2), + ok = amqp10_client:accept_msg(Receiver3, R3M3), + ?assertEqual([<<"m1">>], amqp10_msg:body(R3M1)), + ?assertEqual([<<"m2">>], amqp10_msg:body(R3M2)), + ?assertEqual([<<"m4">>], amqp10_msg:body(R3M3)), + ok = detach_link_sync(Receiver3), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +%% Test filter expressions matching multiple message sections. +multiple_sections(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + {Connection, Session, LinkPair} = init(Config), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{subject => <<"The Subject">>}, + amqp10_msg:new(<<"t1">>, <<"m1">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"The Key">> => -123}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{subject => <<"The Subject">>}, + amqp10_msg:set_application_properties( + #{<<"The Key">> => -123}, + amqp10_msg:new(<<"t3">>, <<"m3">>)))), + + ok = wait_for_accepts(3), + ok = detach_link_sync(Sender), + flush(sent), + + PropsFilter = [{{symbol, <<"subject">>}, {utf8, <<"The Subject">>}}], + Filter1 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + unsettled, configuration, Filter1), + {ok, R1M1} = amqp10_client:get_msg(Receiver1), + {ok, R1M3} = amqp10_client:get_msg(Receiver1), + ok = amqp10_client:accept_msg(Receiver1, R1M1), + ok = amqp10_client:accept_msg(Receiver1, R1M3), + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R1M3)), + ok = detach_link_sync(Receiver1), + + AppPropsFilter = [{{utf8, <<"The Key">>}, {byte, -123}}], + Filter2 = #{?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + unsettled, configuration, Filter2), + {ok, R2M2} = amqp10_client:get_msg(Receiver2), + {ok, R2M3} = amqp10_client:get_msg(Receiver2), + ok = amqp10_client:accept_msg(Receiver2, R2M2), + ok = amqp10_client:accept_msg(Receiver2, R2M3), + ?assertEqual([<<"m2">>], amqp10_msg:body(R2M2)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R2M3)), + ok = detach_link_sync(Receiver2), + + Filter3 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter}, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session, <<"receiver 3">>, Address, + unsettled, configuration, Filter3), + {ok, R3M3} = amqp10_client:get_msg(Receiver3), + ok = amqp10_client:accept_msg(Receiver3, R3M3), + ?assertEqual([<<"m3">>], amqp10_msg:body(R3M3)), + ok = detach_link_sync(Receiver3), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +%% Filter a small subset from many messages. +%% We test here that flow control still works correctly. +filter_few_messages_from_many(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + {Connection, Session, LinkPair} = init(Config), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{group_id => <<"my group ID">>}, + amqp10_msg:new(<<"t1">>, <<"first msg">>))), + ok = send_messages(Sender, 1000, false), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{group_id => <<"my group ID">>}, + amqp10_msg:new(<<"t2">>, <<"last msg">>))), + ok = wait_for_accepts(1002), + ok = detach_link_sync(Sender), + flush(sent), + + %% Our filter should cause us to receive only the first and + %% last message out of the 1002 messages in the stream. + PropsFilter = [{{symbol, <<"group-id">>}, {utf8, <<"my group ID">>}}], + Filter = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter}}, + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, + unsettled, configuration, Filter), + + ok = amqp10_client:flow_link_credit(Receiver, 2, never), + receive {amqp10_msg, Receiver, M1} -> + ?assertEqual([<<"first msg">>], amqp10_msg:body(M1)), + ok = amqp10_client:accept_msg(Receiver, M1) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Receiver, M2} -> + ?assertEqual([<<"last msg">>], amqp10_msg:body(M2)), + ok = amqp10_client:accept_msg(Receiver, M2) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = detach_link_sync(Receiver), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +string_modifier(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + {Connection, Session, LinkPair} = init(Config), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{to => <<"abc 1">>, + reply_to => <<"abc 2">>, + subject => <<"abc 3">>, + group_id => <<"abc 4">>, + reply_to_group_id => <<"abc 5">>, + message_id => {utf8, <<"abc 6">>}, + correlation_id => <<"abc 7">>, + group_sequence => 16#ff_ff_ff_ff}, + amqp10_msg:set_application_properties( + #{<<"k1">> => <<"abc 8">>, + <<"k2">> => <<"abc 9">>}, + amqp10_msg:new(<<"t1">>, <<"m1">>)))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"k1">> => <<"abc">>}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{subject => <<"$Hello">>, + reply_to_group_id => <<"xyz 5">>}, + amqp10_msg:new(<<"t3">>, <<"m3">>))), + + ok = wait_for_accepts(3), + ok = detach_link_sync(Sender), + flush(sent), + + PropsFilter1 = [ + {{symbol, <<"to">>}, {utf8, <<"$p:abc ">>}}, + {{symbol, <<"reply-to">>}, {utf8, <<"$p:abc">>}}, + {{symbol, <<"subject">>}, {utf8, <<"$p:ab">>}}, + {{symbol, <<"group-id">>}, {utf8, <<"$p:a">>}}, + {{symbol, <<"reply-to-group-id">>}, {utf8, <<"$s:5">>}}, + {{symbol, <<"correlation-id">>}, {utf8, <<"$s:abc 7">>}}, + {{symbol, <<"message-id">>}, {utf8, <<"$p:abc 6">>}} + ], + AppPropsFilter1 = [ + {{utf8, <<"k1">>}, {utf8, <<"$s: 8">>}}, + {{utf8, <<"k2">>}, {utf8, <<"$p:abc ">>}} + ], + Filter1 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter1}, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + settled, configuration, Filter1), + ok = amqp10_client:flow_link_credit(Receiver1, 10, never), + receive {amqp10_msg, Receiver1, R1M1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver1), + + %% Same filters as before except for subject which shouldn't match anymore. + PropsFilter2 = lists:keyreplace( + {symbol, <<"subject">>}, 1, PropsFilter1, + {{symbol, <<"subject">>}, {utf8, <<"$s:xxxxxxxxxxxxxx">>}}), + Filter2 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter2}, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + settled, configuration, Filter2), + ok = amqp10_client:flow_link_credit(Receiver2, 10, never), + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver2), + + PropsFilter3 = [{{symbol, <<"reply-to-group-id">>}, {utf8, <<"$s: 5">>}}], + Filter3 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter3}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session, <<"receiver 3">>, Address, + settled, configuration, Filter3), + ok = amqp10_client:flow_link_credit(Receiver3, 10, never), + receive {amqp10_msg, Receiver3, R3M1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(R3M1)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Receiver3, R3M3} -> + ?assertEqual([<<"m3">>], amqp10_msg:body(R3M3)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = detach_link_sync(Receiver3), + + %% '$$" is the escape prefix for case-sensitive matching of a string starting with ‘&’ + PropsFilter4 = [{{symbol, <<"subject">>}, {utf8, <<"$$Hello">>}}], + Filter4 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter4}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver4} = amqp10_client:attach_receiver_link( + Session, <<"receiver 4">>, Address, + settled, configuration, Filter4), + {ok, R4M3} = amqp10_client:get_msg(Receiver4), + ?assertEqual([<<"m3">>], amqp10_msg:body(R4M3)), + ok = detach_link_sync(Receiver4), + + %% Starting the reference field value with $ is invalid without using a valid modifier + %% prefix is invalid. + %% RabbitMQ should exclude this filter in its reply attach frame because + %% "the sending endpoint [RabbitMQ] sets the filter actually in place". + %% Hence, no filter expression is actually in place and we should receive all messages. + PropsFilter5 = [{{symbol, <<"subject">>}, {utf8, <<"$Hello">>}}], + Filter5 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter5}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver5} = amqp10_client:attach_receiver_link( + Session, <<"receiver 5">>, Address, + settled, configuration, Filter5), + {ok, R5M1} = amqp10_client:get_msg(Receiver5), + ?assertEqual([<<"m1">>], amqp10_msg:body(R5M1)), + {ok, R5M2} = amqp10_client:get_msg(Receiver5), + ?assertEqual([<<"m2">>], amqp10_msg:body(R5M2)), + {ok, R5M3} = amqp10_client:get_msg(Receiver5), + ?assertEqual([<<"m3">>], amqp10_msg:body(R5M3)), + ok = detach_link_sync(Receiver5), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +assert_no_msg_received(Line) -> + receive {amqp10_msg, _, _} = Msg -> + ct:fail({received_unexpected_msg, Line, Msg}) + after 10 -> + ok + end. diff --git a/deps/rabbit/test/amqp_utils.erl b/deps/rabbit/test/amqp_utils.erl new file mode 100644 index 000000000000..f1816a07c228 --- /dev/null +++ b/deps/rabbit/test/amqp_utils.erl @@ -0,0 +1,139 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(amqp_utils). + +-export([init/1, init/2, + connection_config/1, connection_config/2, + flush/1, + wait_for_credit/1, + wait_for_accepts/1, + send_messages/3, send_messages/4, + detach_link_sync/1, + end_session_sync/1, + wait_for_session_end/1, + close_connection_sync/1]). + +init(Config) -> + init(0, Config). + +init(Node, Config) -> + OpnConf = connection_config(Node, Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + {Connection, Session, LinkPair}. + +connection_config(Config) -> + connection_config(0, Config). + +connection_config(Node, Config) -> + Host = proplists:get_value(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp), + #{address => Host, + port => Port, + container_id => <<"my container">>, + sasl => {plain, <<"guest">>, <<"guest">>}}. + +flush(Prefix) -> + receive + Msg -> + ct:pal("~p flushed: ~p~n", [Prefix, Msg]), + flush(Prefix) + after 1 -> + ok + end. + +% Before we can send messages we have to wait for credit from the server. +wait_for_credit(Sender) -> + receive + {amqp10_event, {link, Sender, credited}} -> + ok + after 5000 -> + flush("wait_for_credit timed out"), + ct:fail(credited_timeout) + end. + +wait_for_accepts(0) -> + ok; +wait_for_accepts(N) -> + receive + {amqp10_disposition, {accepted, _}} -> + wait_for_accepts(N - 1) + after 5000 -> + ct:fail({missing_accepted, N}) + end. + +send_messages(Sender, Left, Settled) -> + send_messages(Sender, Left, Settled, <<>>). + +send_messages(_, 0, _, _) -> + ok; +send_messages(Sender, Left, Settled, BodySuffix) -> + Bin = integer_to_binary(Left), + Body = <>, + Msg = amqp10_msg:new(Bin, Body, Settled), + case amqp10_client:send_msg(Sender, Msg) of + ok -> + send_messages(Sender, Left - 1, Settled, BodySuffix); + {error, insufficient_credit} -> + ok = wait_for_credit(Sender), + %% The credited event we just processed could have been received some time ago, + %% i.e. we might have 0 credits right now. This happens in the following scenario: + %% 1. We (test case proc) send a message successfully, the client session proc decrements remaining link credit from 1 to 0. + %% 2. The server grants our client session proc new credits. + %% 3. The client session proc sends us (test case proc) a credited event. + %% 4. We didn't even notice that we ran out of credits temporarily. We send the next message, it succeeds, + %% but do not process the credited event in our mailbox. + %% So, we must be defensive here and assume that the next amqp10_client:send/2 call might return {error, insufficient_credit} + %% again causing us then to really wait to receive a credited event (instead of just processing an old credited event). + send_messages(Sender, Left, Settled, BodySuffix) + end. + +detach_link_sync(Link) -> + ok = amqp10_client:detach_link(Link), + ok = wait_for_link_detach(Link). + +wait_for_link_detach(Link) -> + receive + {amqp10_event, {link, Link, {detached, normal}}} -> + flush(?FUNCTION_NAME), + ok + after 5000 -> + flush("wait_for_link_detach timed out"), + ct:fail({link_detach_timeout, Link}) + end. + +end_session_sync(Session) + when is_pid(Session) -> + ok = amqp10_client:end_session(Session), + ok = wait_for_session_end(Session). + +wait_for_session_end(Session) -> + receive + {amqp10_event, {session, Session, {ended, _}}} -> + flush(?FUNCTION_NAME), + ok + after 5000 -> + flush("wait_for_session_end timed out"), + ct:fail({session_end_timeout, Session}) + end. + +close_connection_sync(Connection) + when is_pid(Connection) -> + ok = amqp10_client:close_connection(Connection), + ok = wait_for_connection_close(Connection). + +wait_for_connection_close(Connection) -> + receive + {amqp10_event, {connection, Connection, {closed, normal}}} -> + flush(?FUNCTION_NAME), + ok + after 5000 -> + flush("wait_for_connection_close timed out"), + ct:fail({connection_close_timeout, Connection}) + end. diff --git a/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl b/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl index 872424f53224..3b0b79143617 100644 --- a/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl +++ b/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl @@ -14,6 +14,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("amqp10_common/include/amqp10_framing.hrl"). +-include_lib("amqp10_common/include/amqp10_filtex.hrl"). all() -> [{group, tests}]. @@ -24,7 +25,8 @@ groups() -> amqpl, amqp_credit_multiple_grants, amqp_credit_single_grant, - amqp_attach_sub_batch + amqp_attach_sub_batch, + amqp_filter_expression ] }]. @@ -270,6 +272,51 @@ amqp_attach_sub_batch(Config) -> ok = amqp10_client:detach_link(Receiver), ok = amqp10_client:close_connection(Connection). +%% Test that AMQP filter expressions work when messages +%% are published via the stream protocol and consumed via AMQP. +amqp_filter_expression(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + publish_via_stream_protocol(Stream, Config), + + %% Consume from the stream via AMQP 1.0. + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + Address = <<"/queue/", Stream/binary>>, + + AppPropsFilter = [{{utf8, <<"my key">>}, + {utf8, <<"my value">>}}], + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"test-receiver">>, Address, settled, configuration, + #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter} + }), + + ok = amqp10_client:flow_link_credit(Receiver, 100, never), + receive {amqp10_msg, Receiver, M2} -> + ?assertEqual([<<"m2">>], amqp10_msg:body(M2)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Receiver, M4} -> + ?assertEqual([<<"m4">>], amqp10_msg:body(M4)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Receiver, M5} -> + ?assertEqual([<<"m5">>], amqp10_msg:body(M5)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Receiver, M6} -> + ?assertEqual([<<"m6">>], amqp10_msg:body(M6)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, _, _} = Msg -> + ct:fail({received_unexpected_msg, Msg}) + after 10 -> ok + end, + + ok = amqp10_client:detach_link(Receiver), + ok = amqp10_client:close_connection(Connection). + %% ------------------------------------------------------------------- %% Helpers %% ------------------------------------------------------------------- @@ -310,7 +357,9 @@ publish_via_stream_protocol(Stream, Config) -> {{response, 1, {declare_publisher, _}}, C7} = receive_stream_commands(S, C6), M1 = simple_entry(1, <<"m1">>), - M2 = simple_entry(2, <<"m2">>), + M2 = simple_entry(2, <<"m2">>, #'v1_0.application_properties'{ + content = [{{utf8, <<"my key">>}, + {utf8, <<"my value">>}}]}), M3 = simple_entry(3, <<"m3">>), Messages1 = [M1, M2, M3], PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, length(Messages1), Messages1}), @@ -342,11 +391,25 @@ simple_entry(Sequence, Body) DataSectSize = byte_size(DataSect), <>. -%% Here, each AMQP 1.0 encoded message contains a single data section. +%% Streams contain AMQP 1.0 encoded messages. +%% In this case, the AMQP 1.0 encoded message consists of an application-properties section and a data section. +simple_entry(Sequence, Body, AppProps) + when is_binary(Body) -> + AppPropsSect = iolist_to_binary(amqp10_framing:encode_bin(AppProps)), + DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})), + Sects = <>, + SectSize = byte_size(Sects), + <>. + +%% Here, each AMQP 1.0 encoded message consists of an application-properties section and a data section. %% All data sections are delivered uncompressed in 1 batch. sub_batch_entry_uncompressed(Sequence, Bodies) -> Batch = lists:foldl(fun(Body, Acc) -> - Sect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})), + AppProps = #'v1_0.application_properties'{ + content = [{{utf8, <<"my key">>}, {utf8, <<"my value">>}}]}, + Sect0 = iolist_to_binary(amqp10_framing:encode_bin(AppProps)), + Sect1 = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})), + Sect = <>, <> end, <<>>, Bodies), Size = byte_size(Batch), diff --git a/moduleindex.yaml b/moduleindex.yaml index ebadcd41d644..1ce6bae902c0 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -543,6 +543,7 @@ rabbit: - rabbit_access_control - rabbit_alarm - rabbit_amqp1_0 +- rabbit_amqp_filtex - rabbit_amqp_management - rabbit_amqp_reader - rabbit_amqp_session