diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index c1e5eb46214f..981e291a3853 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -737,15 +737,13 @@ translate_terminus_durability(configuration) -> 1; translate_terminus_durability(unsettled_state) -> 2. translate_filters(Filters) - when is_map(Filters) andalso - map_size(Filters) == 0 -> + when map_size(Filters) =:= 0 -> undefined; -translate_filters(Filters) - when is_map(Filters) -> +translate_filters(Filters) -> {map, maps:fold( fun - (<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) -> + (<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) -> %% special case conversion Key = sym(K), [{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc]; diff --git a/deps/amqp10_client/src/amqp10_msg.erl b/deps/amqp10_client/src/amqp10_msg.erl index fa046cc60657..0f60c9bb8c28 100644 --- a/deps/amqp10_client/src/amqp10_msg.erl +++ b/deps/amqp10_client/src/amqp10_msg.erl @@ -433,7 +433,10 @@ wrap_ap_value(V) when is_integer(V) -> case V < 0 of true -> {int, V}; false -> {uint, V} - end. + end; +wrap_ap_value(V) when is_number(V) -> + %% AMQP double and Erlang float are both 64-bit. + {double, V}. %% LOCAL header_value(durable, undefined) -> false; diff --git a/deps/amqp10_common/app.bzl b/deps/amqp10_common/app.bzl index a233c945cebe..5e41032a8eb3 100644 --- a/deps/amqp10_common/app.bzl +++ b/deps/amqp10_common/app.bzl @@ -72,7 +72,7 @@ def all_srcs(name = "all_srcs"): ) filegroup( name = "public_hdrs", - srcs = ["include/amqp10_framing.hrl", "include/amqp10_types.hrl"], + srcs = ["include/amqp10_filtex.hrl", "include/amqp10_framing.hrl", "include/amqp10_types.hrl"], ) filegroup( name = "private_hdrs", diff --git a/deps/amqp10_common/include/amqp10_filtex.hrl b/deps/amqp10_common/include/amqp10_filtex.hrl new file mode 100644 index 000000000000..a1743ea9669c --- /dev/null +++ b/deps/amqp10_common/include/amqp10_filtex.hrl @@ -0,0 +1,15 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + + +%% AMQP Filter Expressions Version 1.0 Working Draft 09 +%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227 + +-define(DESCRIPTOR_NAME_PROPERTIES_FILTER, <<"amqp:properties-filter">>). +-define(DESCRIPTOR_CODE_PROPERTIES_FILTER, 16#173). + +-define(DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER, <<"amqp:application-properties-filter">>). +-define(DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER, 16#174). diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 9bebe9be3ed5..d9910dc90e14 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -1207,6 +1207,7 @@ rabbitmq_integration_suite( name = "amqp_client_SUITE", size = "large", additional_beam = [ + ":test_amqp_utils_beam", ":test_event_recorder_beam", ], shard_count = 3, @@ -1215,6 +1216,16 @@ rabbitmq_integration_suite( ], ) +rabbitmq_integration_suite( + name = "amqp_filtex_SUITE", + additional_beam = [ + ":test_amqp_utils_beam", + ], + runtime_deps = [ + "//deps/rabbitmq_amqp_client:erlang_app", + ], +) + rabbitmq_integration_suite( name = "amqp_proxy_protocol_SUITE", size = "medium", @@ -1235,6 +1246,7 @@ rabbitmq_integration_suite( rabbitmq_integration_suite( name = "amqp_auth_SUITE", additional_beam = [ + ":test_amqp_utils_beam", ":test_event_recorder_beam", ], shard_count = 2, @@ -1246,6 +1258,9 @@ rabbitmq_integration_suite( rabbitmq_integration_suite( name = "amqp_address_SUITE", shard_count = 2, + additional_beam = [ + ":test_amqp_utils_beam", + ], runtime_deps = [ "//deps/rabbitmq_amqp_client:erlang_app", ], @@ -1358,6 +1373,7 @@ eunit( ":test_clustering_utils_beam", ":test_event_recorder_beam", ":test_rabbit_ct_hook_beam", + ":test_amqp_utils_beam", ], target = ":test_erlang_app", test_env = { diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 24110ce28db3..aad618f4211e 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -258,7 +258,7 @@ define ct_master.erl endef PARALLEL_CT_SET_1_A = amqp_client unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking -PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_system signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management +PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_system signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 4832861d9782..cf5a2d1769b7 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -45,6 +45,7 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_access_control.erl", "src/rabbit_alarm.erl", "src/rabbit_amqp1_0.erl", + "src/rabbit_amqp_filtex.erl", "src/rabbit_amqp_management.erl", "src/rabbit_amqp_reader.erl", "src/rabbit_amqp_session.erl", @@ -302,6 +303,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_access_control.erl", "src/rabbit_alarm.erl", "src/rabbit_amqp1_0.erl", + "src/rabbit_amqp_filtex.erl", "src/rabbit_amqp_management.erl", "src/rabbit_amqp_reader.erl", "src/rabbit_amqp_session.erl", @@ -578,6 +580,7 @@ def all_srcs(name = "all_srcs"): "src/rabbit_access_control.erl", "src/rabbit_alarm.erl", "src/rabbit_amqp1_0.erl", + "src/rabbit_amqp_filtex.erl", "src/rabbit_amqp_management.erl", "src/rabbit_amqp_reader.erl", "src/rabbit_amqp_session.erl", @@ -2195,3 +2198,20 @@ def test_suite_beam_files(name = "test_suite_beam_files"): erlc_opts = "//:test_erlc_opts", deps = ["//deps/amqp_client:erlang_app"], ) + erlang_bytecode( + name = "amqp_filtex_SUITE_beam_files", + testonly = True, + srcs = ["test/amqp_filtex_SUITE.erl"], + outs = ["test/amqp_filtex_SUITE.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + deps = ["//deps/amqp10_common:erlang_app"], + ) + erlang_bytecode( + name = "test_amqp_utils_beam", + testonly = True, + srcs = ["test/amqp_utils.erl"], + outs = ["test/amqp_utils.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + ) diff --git a/deps/rabbit/ct.test.spec b/deps/rabbit/ct.test.spec index e1027d06105f..60d65d2d5637 100644 --- a/deps/rabbit/ct.test.spec +++ b/deps/rabbit/ct.test.spec @@ -16,6 +16,7 @@ , amqp_auth_SUITE , amqp_client_SUITE , amqp_credit_api_v2_SUITE +, amqp_filtex_SUITE , amqp_proxy_protocol_SUITE , amqp_system_SUITE , amqpl_consumer_ack_SUITE diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 465c7054f089..9c23ac13daf8 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -301,7 +301,7 @@ message_id(BasicMsg) -> mc_compat:message_id(BasicMsg). -spec property(atom(), state()) -> - {utf8, binary()} | undefined. + tagged_value(). property(Property, #?MODULE{protocol = Proto, data = Data}) -> Proto:property(Property, Data); diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index be63597c3f96..ed6c4b4145d6 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -21,7 +21,7 @@ -define(MESSAGE_ANNOTATIONS_GUESS_SIZE, 100). --define(SIMPLE_VALUE(V), +-define(IS_SIMPLE_VALUE(V), is_binary(V) orelse is_number(V) orelse is_boolean(V)). @@ -145,16 +145,32 @@ property(Prop, #v1{bare_and_footer = Bin, Props = amqp10_framing:decode(PropsDescribed), property0(Prop, Props). -property0(correlation_id, #'v1_0.properties'{correlation_id = Corr}) -> - Corr; -property0(message_id, #'v1_0.properties'{message_id = MsgId}) -> - MsgId; -property0(user_id, #'v1_0.properties'{user_id = UserId}) -> - UserId; -property0(subject, #'v1_0.properties'{subject = Subject}) -> - Subject; -property0(to, #'v1_0.properties'{to = To}) -> - To; +property0(message_id, #'v1_0.properties'{message_id = Val}) -> + Val; +property0(user_id, #'v1_0.properties'{user_id = Val}) -> + Val; +property0(to, #'v1_0.properties'{to = Val}) -> + Val; +property0(subject, #'v1_0.properties'{subject = Val}) -> + Val; +property0(reply_to, #'v1_0.properties'{reply_to = Val}) -> + Val; +property0(correlation_id, #'v1_0.properties'{correlation_id = Val}) -> + Val; +property0(content_type, #'v1_0.properties'{content_type = Val}) -> + Val; +property0(content_encoding, #'v1_0.properties'{content_encoding = Val}) -> + Val; +property0(absolute_expiry_time, #'v1_0.properties'{absolute_expiry_time = Val}) -> + Val; +property0(creation_time, #'v1_0.properties'{creation_time = Val}) -> + Val; +property0(group_id, #'v1_0.properties'{group_id = Val}) -> + Val; +property0(group_sequence, #'v1_0.properties'{group_sequence = Val}) -> + Val; +property0(reply_to_group_id, #'v1_0.properties'{reply_to_group_id = Val}) -> + Val; property0(_Prop, #'v1_0.properties'{}) -> undefined. @@ -454,7 +470,7 @@ message_annotations_as_simple_map(#v1{message_annotations = Content}) -> message_annotations_as_simple_map0(Content) -> %% the section record format really is terrible lists:filtermap(fun({{symbol, K}, {_T, V}}) - when ?SIMPLE_VALUE(V) -> + when ?IS_SIMPLE_VALUE(V) -> {true, {K, V}}; (_) -> false @@ -480,7 +496,7 @@ application_properties_as_simple_map( application_properties_as_simple_map0(Content, L) -> %% the section record format really is terrible lists:foldl(fun({{utf8, K}, {_T, V}}, Acc) - when ?SIMPLE_VALUE(V) -> + when ?IS_SIMPLE_VALUE(V) -> [{K, V} | Acc]; ({{utf8, K}, V}, Acc) when V =:= undefined orelse is_boolean(V) -> diff --git a/deps/rabbit/src/rabbit_amqp_filtex.erl b/deps/rabbit/src/rabbit_amqp_filtex.erl new file mode 100644 index 000000000000..bcdd289e4723 --- /dev/null +++ b/deps/rabbit/src/rabbit_amqp_filtex.erl @@ -0,0 +1,196 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +%% AMQP Filter Expressions Version 1.0 Working Draft 09 +%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227 +-module(rabbit_amqp_filtex). + +-include_lib("amqp10_common/include/amqp10_filtex.hrl"). + +-export([validate/1, + filter/2]). + +-type simple_type() :: number() | binary() | atom(). +-type affix() :: {suffix, non_neg_integer(), binary()} | + {prefix, non_neg_integer(), binary()}. +-type filter_expression_value() :: simple_type() | affix(). +-type filter_expression() :: {properties, [{FieldName :: atom(), filter_expression_value()}]} | + {application_properties, [{binary(), filter_expression_value()}]}. +-type filter_expressions() :: [filter_expression()]. +-export_type([filter_expressions/0]). + +-spec validate(tuple()) -> + {ok, filter_expression()} | error. +validate({described, Descriptor, {map, KVList}}) -> + try validate0(Descriptor, KVList) + catch throw:{?MODULE, _, _} -> + error + end; +validate(_) -> + error. + +-spec filter(filter_expressions(), mc:state()) -> + boolean(). +filter(Filters, Mc) -> + %% "A message will pass through a filter-set if and only if + %% it passes through each of the named filters." [3.5.8] + lists:all(fun(Filter) -> + filter0(Filter, Mc) + end, Filters). + +%%%%%%%%%%%%%%%% +%%% Internal %%% +%%%%%%%%%%%%%%%% + +filter0({properties, KVList}, Mc) -> + %% "The filter evaluates to true if all properties enclosed in the filter expression + %% match the respective properties in the message." + %% [filtex-v1.0-wd09 4.2.4] + lists:all(fun({FieldName, RefVal}) -> + TaggedVal = mc:property(FieldName, Mc), + Val = unwrap(TaggedVal), + match_simple_type(RefVal, Val) + end, KVList); +filter0({application_properties, KVList}, Mc) -> + AppProps = mc:routing_headers(Mc, []), + %% "The filter evaluates to true if all properties enclosed in the filter expression + %% match the respective entries in the application-properties section in the message." + %% [filtex-v1.0-wd09 4.2.5] + lists:all(fun({Key, RefVal}) -> + case AppProps of + #{Key := Val} -> + match_simple_type(RefVal, Val); + _ -> + false + end + end, KVList). + +%% [filtex-v1.0-wd09 4.1.1] +%% "A reference field value in a property filter expression matches +%% its corresponding message metadata field value if: +%% [...] +match_simple_type(null, _Val) -> + %% * The reference field value is NULL + true; +match_simple_type({suffix, SuffixSize, Suffix}, Val) -> + %% * Suffix. The message metadata field matches the expression if the ordinal values of the + %% characters of the suffix expression equal the ordinal values of the same number of + %% characters trailing the message metadata field value. + case is_binary(Val) of + true -> + case Val of + <<_:(size(Val) - SuffixSize)/binary, Suffix:SuffixSize/binary>> -> + true; + _ -> + false + end; + false -> + false + end; +match_simple_type({prefix, PrefixSize, Prefix}, Val) -> + %% * Prefix. The message metadata field matches the expression if the ordinal values of the + %% characters of the prefix expression equal the ordinal values of the same number of + %% characters leading the message metadata field value. + case Val of + <> -> + true; + _ -> + false + end; +match_simple_type(RefVal, Val) -> + %% * the reference field value is of a floating-point or integer number type + %% and the message metadata field is of a different floating-point or integer number type, + %% the reference value and the metadata field value are within the value range of both types, + %% and the values are equal when treated as a floating-point" + RefVal == Val. + +validate0(Descriptor, KVList) when + (Descriptor =:= {symbol, ?DESCRIPTOR_NAME_PROPERTIES_FILTER} orelse + Descriptor =:= {ulong, ?DESCRIPTOR_CODE_PROPERTIES_FILTER}) andalso + KVList =/= [] -> + validate_props(KVList, []); +validate0(Descriptor, KVList0) when + (Descriptor =:= {symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER} orelse + Descriptor =:= {ulong, ?DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER}) andalso + KVList0 =/= [] -> + KVList = lists:map(fun({{utf8, Key}, {utf8, String}}) -> + {Key, parse_string_modifier_prefix(String)}; + ({{utf8, Key}, TaggedVal}) -> + {Key, unwrap(TaggedVal)} + end, KVList0), + {ok, {application_properties, KVList}}; +validate0(_, _) -> + error. + +validate_props([], Acc) -> + {ok, {properties, lists:reverse(Acc)}}; +validate_props([{{symbol, <<"message-id">>}, TaggedVal} | Rest], Acc) -> + case parse_message_id(TaggedVal) of + {ok, Val} -> + validate_props(Rest, [{message_id, Val} | Acc]); + error -> + error + end; +validate_props([{{symbol, <<"user-id">>}, {binary, Val}} | Rest], Acc) -> + validate_props(Rest, [{user_id, Val} | Acc]); +validate_props([{{symbol, <<"to">>}, {utf8, Val}} | Rest], Acc) -> + validate_props(Rest, [{to, parse_string_modifier_prefix(Val)} | Acc]); +validate_props([{{symbol, <<"subject">>}, {utf8, Val}} | Rest], Acc) -> + validate_props(Rest, [{subject, parse_string_modifier_prefix(Val)} | Acc]); +validate_props([{{symbol, <<"reply-to">>}, {utf8, Val}} | Rest], Acc) -> + validate_props(Rest, [{reply_to, parse_string_modifier_prefix(Val)} | Acc]); +validate_props([{{symbol, <<"correlation-id">>}, TaggedVal} | Rest], Acc) -> + case parse_message_id(TaggedVal) of + {ok, Val} -> + validate_props(Rest, [{correlation_id, Val} | Acc]); + error -> + error + end; +validate_props([{{symbol, <<"content-type">>}, {symbol, Val}} | Rest], Acc) -> + validate_props(Rest, [{content_type, Val} | Acc]); +validate_props([{{symbol, <<"content-encoding">>}, {symbol, Val}} | Rest], Acc) -> + validate_props(Rest, [{content_encoding, Val} | Acc]); +validate_props([{{symbol, <<"absolute-expiry-time">>}, {timestamp, Val}} | Rest], Acc) -> + validate_props(Rest, [{absolute_expiry_time, Val} | Acc]); +validate_props([{{symbol, <<"creation-time">>}, {timestamp, Val}} | Rest], Acc) -> + validate_props(Rest, [{creation_time, Val} | Acc]); +validate_props([{{symbol, <<"group-id">>}, {utf8, Val}} | Rest], Acc) -> + validate_props(Rest, [{group_id, parse_string_modifier_prefix(Val)} | Acc]); +validate_props([{{symbol, <<"group-sequence">>}, {uint, Val}} | Rest], Acc) -> + validate_props(Rest, [{group_sequence, Val} | Acc]); +validate_props([{{symbol, <<"reply-to-group-id">>}, {utf8, Val}} | Rest], Acc) -> + validate_props(Rest, [{reply_to_group_id, parse_string_modifier_prefix(Val)} | Acc]); +validate_props(_, _) -> + error. + +parse_message_id({ulong, Val}) -> + {ok, Val}; +parse_message_id({uuid, Val}) -> + {ok, Val}; +parse_message_id({binary, Val}) -> + {ok, Val}; +parse_message_id({utf8, Val}) -> + {ok, parse_string_modifier_prefix(Val)}; +parse_message_id(_) -> + error. + +%% [filtex-v1.0-wd09 4.1.1] +parse_string_modifier_prefix(<<"$s:", Suffix/binary>>) -> + {suffix, size(Suffix), Suffix}; +parse_string_modifier_prefix(<<"$p:", Prefix/binary>>) -> + {prefix, size(Prefix), Prefix}; +parse_string_modifier_prefix(<<"$$", _/binary>> = String) -> + %% "Escape prefix for case-sensitive matching of a string starting with ‘&’" + string:slice(String, 1); +parse_string_modifier_prefix(<<"$", _/binary>> = String) -> + throw({?MODULE, invalid_reference_field_value, String}); +parse_string_modifier_prefix(String) -> + String. + +unwrap({_Tag, V}) -> + V; +unwrap(V) -> + V. diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 0ad228a4e653..bcfa6a1dcc8c 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -518,16 +518,16 @@ handle_connection_frame( ok = rabbit_event:notify(connection_created, Infos), ok = rabbit_amqp1_0:register_connection(self()), Caps = [%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html#_Toc51331306 - {symbol, <<"LINK_PAIR_V1_0">>}, + <<"LINK_PAIR_V1_0">>, %% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay - {symbol, <<"ANONYMOUS-RELAY">>}], + <<"ANONYMOUS-RELAY">>], Open = #'v1_0.open'{ channel_max = {ushort, EffectiveChannelMax}, max_frame_size = {uint, IncomingMaxFrameSize}, %% "the value in idle-time-out SHOULD be half the peer's actual timeout threshold" [2.4.5] idle_time_out = {uint, ReceiveTimeoutMillis div 2}, container_id = {utf8, rabbit_nodes:cluster_name()}, - offered_capabilities = {array, symbol, Caps}, + offered_capabilities = rabbit_amqp_util:capabilities(Caps), properties = server_properties()}, ok = send_on_channel0(Sock, Open), State; diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 31d5348b56b5..3be9ea2b00fc 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -30,6 +30,12 @@ }} }). +-rabbit_deprecated_feature( + {amqp_filter_set_bug, + #{deprecation_phase => permitted_by_default, + doc_url => "https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-filter-set" + }}). + %% This is the link credit that we grant to sending clients. %% We are free to choose whatever we want, sending clients must obey. %% Default soft limits / credits in deps/rabbit/Makefile are: @@ -1284,12 +1290,13 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, end; handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, - name = LinkName, - handle = Handle = ?UINT(HandleInt), - source = Source, - snd_settle_mode = SndSettleMode, - rcv_settle_mode = RcvSettleMode, - max_message_size = MaybeMaxMessageSize} = Attach, + name = LinkName, + handle = Handle = ?UINT(HandleInt), + source = Source = #'v1_0.source'{filter = DesiredFilter}, + snd_settle_mode = SndSettleMode, + rcv_settle_mode = RcvSettleMode, + max_message_size = MaybeMaxMessageSize, + properties = Properties}, State0 = #state{queue_states = QStates0, outgoing_links = OutgoingLinks0, permission_cache = PermCache0, @@ -1359,6 +1366,10 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, credit_api_v1, credit_api_v1} end, + ConsumerArgs0 = parse_attach_properties(Properties), + {EffectiveFilter, ConsumerFilter, ConsumerArgs1} = + parse_filter(DesiredFilter), + ConsumerArgs = ConsumerArgs0 ++ ConsumerArgs1, Spec = #{no_ack => SndSettled, channel_pid => self(), limiter_pid => none, @@ -1366,11 +1377,14 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, mode => Mode, consumer_tag => handle_to_ctag(HandleInt), exclusive_consume => false, - args => consumer_arguments(Attach), + args => ConsumerArgs, + filter => ConsumerFilter, ok_msg => undefined, acting_user => Username}, case rabbit_queue_type:consume(Q, Spec, QStates0) of {ok, QStates} -> + OfferedCaps0 = rabbit_queue_type:amqp_capabilities(QType), + OfferedCaps = rabbit_amqp_util:capabilities(OfferedCaps0), A = #'v1_0.attach'{ name = LinkName, handle = Handle, @@ -1382,10 +1396,13 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, %% will be requeued. That's why the we only support RELEASED as the default outcome. source = Source#'v1_0.source'{ default_outcome = #'v1_0.released'{}, - outcomes = outcomes(Source)}, + outcomes = outcomes(Source), + %% "the sending endpoint sets the filter actually in place" [3.5.3] + filter = EffectiveFilter}, role = ?AMQP_ROLE_SENDER, %% Echo back that we will respect the client's requested max-message-size. - max_message_size = MaybeMaxMessageSize}, + max_message_size = MaybeMaxMessageSize, + offered_capabilities = OfferedCaps}, MaxMessageSize = max_message_size(MaybeMaxMessageSize), Link = #outgoing_link{ queue_name = queue_resource(Vhost, QNameBin), @@ -2705,11 +2722,10 @@ parse_target_v2_string(String) -> end. parse_target_v2_string0(<<"/exchanges/", Rest/binary>>) -> - Key = cp_slash, - Pattern = try persistent_term:get(Key) + Pattern = try persistent_term:get(cp_slash) catch error:badarg -> Cp = binary:compile_pattern(<<"/">>), - ok = persistent_term:put(Key, Cp), + ok = persistent_term:put(cp_slash, Cp), Cp end, case binary:split(Rest, Pattern, [global]) of @@ -2980,87 +2996,89 @@ encode_frames(T, Msg, MaxPayloadSize, Transfers) -> lists:reverse([[T, Msg] | Transfers]) end. -consumer_arguments(#'v1_0.attach'{ - source = #'v1_0.source'{filter = Filter}, - properties = Properties}) -> - properties_to_consumer_args(Properties) ++ - filter_to_consumer_args(Filter). - -properties_to_consumer_args({map, KVList}) -> +parse_attach_properties(undefined) -> + []; +parse_attach_properties({map, KVList}) -> Key = {symbol, <<"rabbitmq:priority">>}, case proplists:lookup(Key, KVList) of {Key, Val = {int, _Prio}} -> [mc_amqpl:to_091(<<"x-priority">>, Val)]; _ -> [] - end; -properties_to_consumer_args(_) -> - []. - -filter_to_consumer_args({map, KVList}) -> - filter_to_consumer_args( - [<<"rabbitmq:stream-offset-spec">>, - <<"rabbitmq:stream-filter">>, - <<"rabbitmq:stream-match-unfiltered">>], - KVList, - []); -filter_to_consumer_args(_) -> - []. + end. -filter_to_consumer_args([], _KVList, Acc) -> - Acc; -filter_to_consumer_args([<<"rabbitmq:stream-offset-spec">> = H | T], KVList, Acc) -> - Key = {symbol, H}, - Arg = case keyfind_unpack_described(Key, KVList) of - {_, {timestamp, Ts}} -> - [{<<"x-stream-offset">>, timestamp, Ts div 1000}]; %% 0.9.1 uses second based timestamps - {_, {utf8, Spec}} -> - [{<<"x-stream-offset">>, longstr, Spec}]; %% next, last, first and "10m" etc - {_, {_, Offset}} when is_integer(Offset) -> - [{<<"x-stream-offset">>, long, Offset}]; %% integer offset - _ -> - [] - end, - filter_to_consumer_args(T, KVList, Arg ++ Acc); -filter_to_consumer_args([<<"rabbitmq:stream-filter">> = H | T], KVList, Acc) -> - Key = {symbol, H}, - Arg = case keyfind_unpack_described(Key, KVList) of - {_, {list, Filters0}} when is_list(Filters0) -> - Filters = lists:foldl(fun({utf8, Filter}, L) -> - [{longstr, Filter} | L]; - (_, L) -> - L - end, [], Filters0), - [{<<"x-stream-filter">>, array, Filters}]; - {_, {utf8, Filter}} -> - [{<<"x-stream-filter">>, longstr, Filter}]; - _ -> - [] - end, - filter_to_consumer_args(T, KVList, Arg ++ Acc); -filter_to_consumer_args([<<"rabbitmq:stream-match-unfiltered">> = H | T], KVList, Acc) -> - Key = {symbol, H}, - Arg = case keyfind_unpack_described(Key, KVList) of - {_, MU} when is_boolean(MU) -> - [{<<"x-stream-match-unfiltered">>, bool, MU}]; - _ -> - [] - end, - filter_to_consumer_args(T, KVList, Arg ++ Acc); -filter_to_consumer_args([_ | T], KVList, Acc) -> - filter_to_consumer_args(T, KVList, Acc). - -keyfind_unpack_described(Key, KvList) -> - %% filterset values _should_ be described values - %% they aren't always however for historical reasons so we need this bit of - %% code to return a plain value for the given filter key - case lists:keyfind(Key, 1, KvList) of - {Key, {described, Key, Value}} -> - {Key, Value}; - {Key, _} = Kv -> - Kv; +parse_filter(undefined) -> + {undefined, [], []}; +parse_filter({map, DesiredKVList}) -> + {EffectiveKVList, ConsusumerFilter, ConsumerArgs} = + lists:foldr(fun parse_filters/2, {[], [], []}, DesiredKVList), + {{map, EffectiveKVList}, ConsusumerFilter, ConsumerArgs}. + +parse_filters(Filter = {{symbol, _Key}, {described, {symbol, <<"rabbitmq:stream-offset-spec">>}, Value}}, + Acc = {EffectiveFilters, ConsumerFilter, ConsumerArgs}) -> + case Value of + {timestamp, Ts} -> + %% 0.9.1 uses second based timestamps + Arg = {<<"x-stream-offset">>, timestamp, Ts div 1000}, + {[Filter | EffectiveFilters], ConsumerFilter, [Arg | ConsumerArgs]}; + {utf8, Spec} -> + %% next, last, first and "10m" etc + Arg = {<<"x-stream-offset">>, longstr, Spec}, + {[Filter | EffectiveFilters], ConsumerFilter, [Arg | ConsumerArgs]}; + {_Type, Offset} + when is_integer(Offset) andalso Offset >= 0 -> + Arg = {<<"x-stream-offset">>, long, Offset}, + {[Filter | EffectiveFilters], ConsumerFilter, [Arg | ConsumerArgs]}; + _ -> + Acc + end; +parse_filters(Filter = {{symbol, _Key}, {described, {symbol, <<"rabbitmq:stream-filter">>}, Value}}, + Acc = {EffectiveFilters, ConsumerFilter, ConsumerArgs}) -> + case Value of + {list, Filters0} -> + Filters = lists:filtermap(fun({utf8, Filter0}) -> + {true, {longstr, Filter0}}; + (_) -> + false + end, Filters0), + Arg = {<<"x-stream-filter">>, array, Filters}, + {[Filter | EffectiveFilters], ConsumerFilter, [Arg | ConsumerArgs]}; + + {utf8, Filter0} -> + Arg = {<<"x-stream-filter">>, longstr, Filter0}, + {[Filter | EffectiveFilters], ConsumerFilter, [Arg | ConsumerArgs]}; + _ -> + Acc + end; +parse_filters(Filter = {{symbol, _Key}, {described, {symbol, <<"rabbitmq:stream-match-unfiltered">>}, Match}}, + {EffectiveFilters, ConsumerFilter, ConsumerArgs}) + when is_boolean(Match) -> + Arg = {<<"x-stream-match-unfiltered">>, bool, Match}, + {[Filter | EffectiveFilters], ConsumerFilter, [Arg | ConsumerArgs]}; +parse_filters({Symbol = {symbol, <<"rabbitmq:stream-", _/binary>>}, Value}, Acc) + when element(1, Value) =/= described -> + case rabbit_deprecated_features:is_permitted(amqp_filter_set_bug) of + true -> + parse_filters({Symbol, {described, Symbol, Value}}, Acc); false -> - false + Acc + end; +parse_filters(Filter = {{symbol, _Key}, Value}, + Acc = {EffectiveFilters, ConsumerFilter, ConsumerArgs}) -> + case rabbit_amqp_filtex:validate(Value) of + {ok, FilterExpression = {FilterType, _}} -> + case proplists:is_defined(FilterType, ConsumerFilter) of + true -> + %% For now, let's prohibit multiple top level filters of the same type + %% (properties or application-properties). There should be no use case. + %% In future, we can allow multiple times the same top level grouping + %% filter expression type (all/any/not). + Acc; + false -> + {[Filter | EffectiveFilters], [FilterExpression | ConsumerFilter], ConsumerArgs} + end; + error -> + Acc end. validate_attach(#'v1_0.attach'{target = #'v1_0.coordinator'{}}) -> diff --git a/deps/rabbit/src/rabbit_amqp_util.erl b/deps/rabbit/src/rabbit_amqp_util.erl index 3257cef93704..e1ef95d77fad 100644 --- a/deps/rabbit/src/rabbit_amqp_util.erl +++ b/deps/rabbit/src/rabbit_amqp_util.erl @@ -8,7 +8,8 @@ -module(rabbit_amqp_util). -include("rabbit_amqp.hrl"). --export([protocol_error/3]). +-export([protocol_error/3, + capabilities/1]). -spec protocol_error(term(), io:format(), [term()]) -> no_return(). @@ -17,3 +18,11 @@ protocol_error(Condition, Msg, Args) -> Reason = #'v1_0.error'{condition = Condition, description = {utf8, Description}}, exit(Reason). + +-spec capabilities([binary()]) -> + undefined | {array, symbol, [{symbol, binary()}]}. +capabilities([]) -> + undefined; +capabilities(Capabilities) -> + Caps = [{symbol, C} || C <- Capabilities], + {array, symbol, Caps}. diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 938588da6662..219fccdf2f27 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -54,6 +54,7 @@ fold_state/3, is_policy_applicable/2, is_server_named_allowed/1, + amqp_capabilities/1, arguments/1, arguments/2, notify_decorators/1, @@ -125,6 +126,7 @@ consumer_tag := rabbit_types:ctag(), exclusive_consume => boolean(), args => rabbit_framing:amqp_table(), + filter => rabbit_amqp_filtex:filter_expressions(), ok_msg := term(), acting_user := rabbit_types:username()}. -type cancel_reason() :: cancel | remove. @@ -476,6 +478,12 @@ is_server_named_allowed(Type) -> Capabilities = Type:capabilities(), maps:get(server_named, Capabilities, false). +-spec amqp_capabilities(queue_type()) -> + [binary()]. +amqp_capabilities(Type) -> + Capabilities = Type:capabilities(), + maps:get(?FUNCTION_NAME, Capabilities, []). + -spec arguments(arguments()) -> [binary()]. arguments(ArgumentType) -> Args0 = lists:map(fun(T) -> diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index f936891d0560..18cc10f55ef8 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -925,7 +925,7 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) -> exclusive_consume := ExclusiveConsume, args := Args, ok_msg := OkMsg, - acting_user := ActingUser} = Spec, + acting_user := ActingUser} = Spec, %% TODO: validate consumer arguments %% currently quorum queues do not support any arguments QName = amqqueue:get_name(Q), diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index a7aa3a5a18cc..a011dc09a650 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -78,13 +78,14 @@ ack :: boolean(), start_offset = 0 :: non_neg_integer(), listening_offset = 0 :: non_neg_integer(), - last_consumed_offset = 0 :: non_neg_integer(), + last_consumed_offset :: non_neg_integer(), log :: undefined | osiris_log:state(), chunk_iterator :: undefined | osiris_log:chunk_iterator(), %% These messages were already read ahead from the Osiris log, %% were part of an uncompressed sub batch, and are buffered in %% reversed order until the consumer has more credits to consume them. buffer_msgs_rev = [] :: [rabbit_amqqueue:qmsg()], + filter :: rabbit_amqp_filtex:filter_expressions(), reader_options :: map()}). -record(stream_client, {stream_id :: string(), @@ -333,7 +334,8 @@ consume(Q, Spec, #stream_client{} = QState0) %% begins sending maybe_send_reply(ChPid, OkMsg), _ = rabbit_stream_coordinator:register_local_member_listener(Q), - begin_stream(QState, ConsumerTag, OffsetSpec, Mode, AckRequired, filter_spec(Args)) + Filter = maps:get(filter, Spec, []), + begin_stream(QState, ConsumerTag, OffsetSpec, Mode, AckRequired, Filter, filter_spec(Args)) end; {undefined, _} -> {protocol_error, precondition_failed, @@ -424,7 +426,7 @@ query_local_pid(#stream_client{stream_id = StreamId} = State) -> begin_stream(#stream_client{name = QName, readers = Readers0, local_pid = LocalPid} = State, - Tag, Offset, Mode, AckRequired, Options) + Tag, Offset, Mode, AckRequired, Filter, Options) when is_pid(LocalPid) -> CounterSpec = {{?MODULE, QName, Tag, self()}, []}, {ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options), @@ -451,6 +453,7 @@ begin_stream(#stream_client{name = QName, listening_offset = NextOffset, last_consumed_offset = StartOffset, log = Seg0, + filter = Filter, reader_options = Options}, {ok, State#stream_client{readers = Readers0#{Tag => Str0}}}. @@ -1158,7 +1161,8 @@ stream_entries(QName, Name, LocalPid, #stream{chunk_iterator = Iter0, delivery_count = DC, credit = Credit, - start_offset = StartOffset} = Str0, Acc0) -> + start_offset = StartOffset, + filter = Filter} = Str0, Acc0) -> case osiris_log:iterator_next(Iter0) of end_of_chunk -> case chunk_iterator(Str0, LocalPid) of @@ -1172,7 +1176,7 @@ stream_entries(QName, Name, LocalPid, {batch, _NumRecords, 0, _Len, BatchedEntries} -> {MsgsRev, NumMsgs} = parse_uncompressed_subbatch( BatchedEntries, Offset, StartOffset, - QName, Name, LocalPid, {[], 0}), + QName, Name, LocalPid, Filter, {[], 0}), case Credit >= NumMsgs of true -> {Str0#stream{chunk_iterator = Iter, @@ -1199,12 +1203,19 @@ stream_entries(QName, Name, LocalPid, _SimpleEntry -> case Offset >= StartOffset of true -> - Msg = entry_to_msg(Entry, Offset, QName, Name, LocalPid), - {Str0#stream{chunk_iterator = Iter, - delivery_count = delivery_count_add(DC, 1), - credit = Credit - 1, - last_consumed_offset = Offset}, - [Msg | Acc0]}; + case entry_to_msg(Entry, Offset, QName, + Name, LocalPid, Filter) of + none -> + {Str0#stream{chunk_iterator = Iter, + last_consumed_offset = Offset}, + Acc0}; + Msg -> + {Str0#stream{chunk_iterator = Iter, + delivery_count = delivery_count_add(DC, 1), + credit = Credit - 1, + last_consumed_offset = Offset}, + [Msg | Acc0]} + end; false -> {Str0#stream{chunk_iterator = Iter}, Acc0} end @@ -1236,25 +1247,30 @@ chunk_iterator(#stream{credit = Credit, end. %% Deliver each record of an uncompressed sub batch individually. -parse_uncompressed_subbatch(<<>>, _Offset, _StartOffset, _QName, _Name, _LocalPid, Acc) -> +parse_uncompressed_subbatch( + <<>>, _Offset, _StartOffset, _QName, _Name, _LocalPid, _Filter, Acc) -> Acc; parse_uncompressed_subbatch( <<0:1, %% simple entry Len:31/unsigned, Entry:Len/binary, Rem/binary>>, - Offset, StartOffset, QName, Name, LocalPid, Acc0 = {AccList, AccCount}) -> + Offset, StartOffset, QName, Name, LocalPid, Filter, Acc0 = {AccList, AccCount}) -> Acc = case Offset >= StartOffset of true -> - Msg = entry_to_msg(Entry, Offset, QName, Name, LocalPid), - {[Msg | AccList], AccCount + 1}; + case entry_to_msg(Entry, Offset, QName, Name, LocalPid, Filter) of + none -> + Acc0; + Msg -> + {[Msg | AccList], AccCount + 1} + end; false -> Acc0 end, - parse_uncompressed_subbatch(Rem, Offset + 1, StartOffset, QName, Name, LocalPid, Acc). + parse_uncompressed_subbatch(Rem, Offset + 1, StartOffset, QName, + Name, LocalPid, Filter, Acc). -entry_to_msg(Entry, Offset, #resource{kind = queue, - name = QName}, Name, LocalPid) -> +entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName}, Name, LocalPid, Filter) -> Mc0 = mc:init(mc_amqp, Entry, #{}), %% If exchange or routing_keys annotation isn't present the entry most likely came %% from the rabbitmq-stream plugin so we'll choose defaults that simulate use @@ -1268,7 +1284,12 @@ entry_to_msg(Entry, Offset, #resource{kind = queue, _ -> Mc1 end, Mc = mc:set_annotation(<<"x-stream-offset">>, Offset, Mc2), - {Name, LocalPid, Offset, false, Mc}. + case rabbit_amqp_filtex:filter(Filter, Mc) of + true -> + {Name, LocalPid, Offset, false, Mc}; + false -> + none + end. capabilities() -> #{unsupported_policies => [%% Classic policies @@ -1288,6 +1309,9 @@ capabilities() -> consumer_arguments => [<<"x-stream-offset">>, <<"x-stream-filter">>, <<"x-stream-match-unfiltered">>], + %% AMQP property filter expressions + %% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227 + amqp_capabilities => [<<"AMQP_FILTEX_PROP_V1_0">>], server_named => false}. notify_decorators(Q) when ?is_amqqueue(Q) -> diff --git a/deps/rabbit/test/amqp_address_SUITE.erl b/deps/rabbit/test/amqp_address_SUITE.erl index 910e1068eeed..f5a0f74b8932 100644 --- a/deps/rabbit/test/amqp_address_SUITE.erl +++ b/deps/rabbit/test/amqp_address_SUITE.erl @@ -18,6 +18,9 @@ [rpc/4]). -import(rabbit_ct_helpers, [eventually/1]). +-import(amqp_utils, + [flush/1, + wait_for_credit/1]). all() -> [ @@ -651,17 +654,6 @@ connection_config(Config) -> container_id => <<"my container">>, sasl => {plain, <<"guest">>, <<"guest">>}}. -% before we can send messages we have to wait for credit from the server -wait_for_credit(Sender) -> - receive - {amqp10_event, {link, Sender, credited}} -> - flush(?FUNCTION_NAME), - ok - after 5000 -> - flush(?FUNCTION_NAME), - ct:fail(?FUNCTION_NAME) - end. - wait_for_settled(State, Tag) -> receive {amqp10_disposition, {State, Tag}} -> @@ -671,11 +663,3 @@ wait_for_settled(State, Tag) -> flush(Reason), ct:fail(Reason) end. - -flush(Prefix) -> - receive Msg -> - ct:pal("~tp flushed: ~p~n", [Prefix, Msg]), - flush(Prefix) - after 1 -> - ok - end. diff --git a/deps/rabbit/test/amqp_auth_SUITE.erl b/deps/rabbit/test/amqp_auth_SUITE.erl index 920f779172d4..6bd905a9242f 100644 --- a/deps/rabbit/test/amqp_auth_SUITE.erl +++ b/deps/rabbit/test/amqp_auth_SUITE.erl @@ -21,6 +21,10 @@ -import(event_recorder, [assert_event_type/2, assert_event_prop/2]). +-import(amqp_utils, + [flush/1, + wait_for_credit/1, + close_connection_sync/1]). all() -> [ @@ -1077,34 +1081,7 @@ amqp_error(Condition, Description) condition = Condition, description = {utf8, Description}}. -% before we can send messages we have to wait for credit from the server -wait_for_credit(Sender) -> - receive - {amqp10_event, {link, Sender, credited}} -> - flush(?FUNCTION_NAME), - ok - after 5000 -> - flush("wait_for_credit timed out"), - ct:fail(credited_timeout) - end. - -flush(Prefix) -> - receive Msg -> - ct:pal("~ts flushed: ~p~n", [Prefix, Msg]), - flush(Prefix) - after 1 -> - ok - end. - delete_all_queues(Config) -> Qs = rpc(Config, rabbit_amqqueue, list, []), [{ok, _QLen} = rpc(Config, rabbit_amqqueue, delete, [Q, false, false, <<"fake-user">>]) || Q <- Qs]. - -close_connection_sync(Connection) - when is_pid(Connection) -> - ok = amqp10_client:close_connection(Connection), - receive {amqp10_event, {connection, Connection, {closed, normal}}} -> ok - after 5000 -> flush(missing_closed), - ct:fail("missing CLOSE from server") - end. diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index acc4dd004cd8..e8c64690a012 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -27,6 +27,17 @@ -import(event_recorder, [assert_event_type/2, assert_event_prop/2]). +-import(amqp_utils, + [init/1, init/2, + connection_config/1, connection_config/2, + flush/1, + wait_for_credit/1, + wait_for_accepts/1, + send_messages/3, send_messages/4, + detach_link_sync/1, + end_session_sync/1, + wait_for_session_end/1, + close_connection_sync/1]). all() -> [ @@ -100,7 +111,7 @@ groups() -> max_message_size_client_to_server, max_message_size_server_to_client, global_counters, - stream_filtering, + stream_bloom_filter, available_messages_classic_queue, available_messages_quorum_queue, available_messages_stream, @@ -3255,7 +3266,7 @@ target_queue_deleted(Config) -> after 5000 -> ct:fail({missing_accepted, DTag1}) end, - N0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + N0 = get_node_config(Config, 0, nodename), RaName = ra_name(QuorumQ), ServerId0 = {RaName, N0}, {ok, Members, _Leader} = ra:members(ServerId0), @@ -3937,7 +3948,7 @@ global_counters(Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). -stream_filtering(Config) -> +stream_bloom_filter(Config) -> Stream = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(Stream), Ch = rabbit_ct_client_helpers:open_channel(Config), @@ -4476,7 +4487,7 @@ handshake_timeout(Config) -> Par = ?FUNCTION_NAME, {ok, DefaultVal} = rpc(Config, application, get_env, [App, Par]), ok = rpc(Config, application, set_env, [App, Par, 200]), - Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + Port = get_node_config(Config, 0, tcp_port_amqp), {ok, Socket} = gen_tcp:connect("localhost", Port, [{active, false}]), ?assertEqual({error, closed}, gen_tcp:recv(Socket, 0, 400)), ok = rpc(Config, application, set_env, [App, Par, DefaultVal]). @@ -5762,16 +5773,6 @@ link_max_per_session(Config) -> %% internal %% -init(Config) -> - init(0, Config). - -init(Node, Config) -> - OpnConf = connection_config(Node, Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), - {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), - {Connection, Session, LinkPair}. - receive_all_messages(Receiver, Accept) -> receive_all_messages0(Receiver, Accept, []). @@ -5786,26 +5787,6 @@ receive_all_messages0(Receiver, Accept, Acc) -> lists:reverse(Acc) end. -connection_config(Config) -> - connection_config(0, Config). - -connection_config(Node, Config) -> - Host = ?config(rmq_hostname, Config), - Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp), - #{address => Host, - port => Port, - container_id => <<"my container">>, - sasl => {plain, <<"guest">>, <<"guest">>}}. - -flush(Prefix) -> - receive - Msg -> - ct:pal("~p flushed: ~p~n", [Prefix, Msg]), - flush(Prefix) - after 1 -> - ok - end. - open_and_close_connection(Config) -> OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -5814,58 +5795,6 @@ open_and_close_connection(Config) -> end, ok = close_connection_sync(Connection). -% before we can send messages we have to wait for credit from the server -wait_for_credit(Sender) -> - receive - {amqp10_event, {link, Sender, credited}} -> - ok - after 5000 -> - flush("wait_for_credit timed out"), - ct:fail(credited_timeout) - end. - -detach_link_sync(Link) -> - ok = amqp10_client:detach_link(Link), - ok = wait_for_link_detach(Link). - -wait_for_link_detach(Link) -> - receive - {amqp10_event, {link, Link, {detached, normal}}} -> - flush(?FUNCTION_NAME), - ok - after 5000 -> - flush("wait_for_link_detach timed out"), - ct:fail({link_detach_timeout, Link}) - end. - -end_session_sync(Session) -> - ok = amqp10_client:end_session(Session), - ok = wait_for_session_end(Session). - -wait_for_session_end(Session) -> - receive - {amqp10_event, {session, Session, {ended, _}}} -> - flush(?FUNCTION_NAME), - ok - after 5000 -> - flush("wait_for_session_end timed out"), - ct:fail({session_end_timeout, Session}) - end. - -close_connection_sync(Connection) -> - ok = amqp10_client:close_connection(Connection), - ok = wait_for_connection_close(Connection). - -wait_for_connection_close(Connection) -> - receive - {amqp10_event, {connection, Connection, {closed, normal}}} -> - flush(?FUNCTION_NAME), - ok - after 5000 -> - flush("wait_for_connection_close timed out"), - ct:fail({connection_close_timeout, Connection}) - end. - wait_for_accepted(Tag) -> wait_for_settlement(Tag, accepted). @@ -5878,16 +5807,6 @@ wait_for_settlement(Tag, State) -> ct:fail({settled_timeout, Tag}) end. -wait_for_accepts(0) -> - ok; -wait_for_accepts(N) -> - receive - {amqp10_disposition,{accepted,_}} -> - wait_for_accepts(N - 1) - after 5000 -> - ct:fail({missing_accepted, N}) - end. - delete_queue(Session, QName) -> {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync( Session, <<"delete queue">>), @@ -5938,32 +5857,6 @@ count_received_messages0(Receiver, Count) -> Count end. -send_messages(Sender, Left, Settled) -> - send_messages(Sender, Left, Settled, <<>>). - -send_messages(_, 0, _, _) -> - ok; -send_messages(Sender, Left, Settled, BodySuffix) -> - Bin = integer_to_binary(Left), - Body = <>, - Msg = amqp10_msg:new(Bin, Body, Settled), - case amqp10_client:send_msg(Sender, Msg) of - ok -> - send_messages(Sender, Left - 1, Settled, BodySuffix); - {error, insufficient_credit} -> - ok = wait_for_credit(Sender), - %% The credited event we just processed could have been received some time ago, - %% i.e. we might have 0 credits right now. This happens in the following scenario: - %% 1. We (test case proc) send a message successfully, the client session proc decrements remaining link credit from 1 to 0. - %% 2. The server grants our client session proc new credits. - %% 3. The client session proc sends us (test case proc) a credited event. - %% 4. We didn't even notice that we ran out of credits temporarily. We send the next message, it succeeds, - %% but do not process the credited event in our mailbox. - %% So, we must be defensive here and assume that the next amqp10_client:send/2 call might return {error, insufficient_credit} - %% again causing us then to really wait to receive a credited event (instead of just processing an old credited event). - send_messages(Sender, Left, Settled, BodySuffix) - end. - assert_link_credit_runs_out(_Sender, 0) -> ct:fail(sufficient_link_credit); assert_link_credit_runs_out(Sender, Left) -> diff --git a/deps/rabbit/test/amqp_filtex_SUITE.erl b/deps/rabbit/test/amqp_filtex_SUITE.erl new file mode 100644 index 000000000000..51469821a83b --- /dev/null +++ b/deps/rabbit/test/amqp_filtex_SUITE.erl @@ -0,0 +1,591 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +%% Test suite for +%% AMQP Filter Expressions Version 1.0 Working Draft 09 +-module(amqp_filtex_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp10_common/include/amqp10_filtex.hrl"). + +-compile([nowarn_export_all, + export_all]). + +-import(rabbit_ct_broker_helpers, + [rpc/4]). +-import(rabbit_ct_helpers, + [eventually/1]). +-import(amqp_utils, + [init/1, + flush/1, + wait_for_credit/1, + wait_for_accepts/1, + send_messages/3, + detach_link_sync/1, + end_session_sync/1, + wait_for_session_end/1, + close_connection_sync/1]). + +all() -> + [ + {group, cluster_size_1} + ]. + +groups() -> + [ + {cluster_size_1, [shuffle], + [ + properties_section, + application_properties_section, + multiple_sections, + filter_few_messages_from_many, + string_modifier + ]} + ]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(amqp10_client), + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:merge_app_env( + Config, {rabbit, [{quorum_tick_interval, 1000}, + {stream_tick_interval, 1000} + ]}). + +end_per_suite(Config) -> + Config. + +init_per_group(_Group, Config) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config( + Config, [{rmq_nodename_suffix, Suffix}]), + rabbit_ct_helpers:run_setup_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_, Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + %% Assert that every testcase cleaned up. + eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))), + %% Wait for sessions to terminate before starting the next test case. + eventually(?_assertEqual([], rpc(Config, rabbit_amqp_session, list_local, []))), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +properties_section(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + {Connection, Session, LinkPair} = init(Config), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + Now = erlang:system_time(millisecond), + To = rabbitmq_amqp_address:exchange(<<"some exchange">>, <<"routing key">>), + ReplyTo = rabbitmq_amqp_address:queue(<<"some queue">>), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{message_id => {ulong, 999}, + user_id => <<"guest">>, + to => To, + subject => <<"🐇"/utf8>>, + reply_to => ReplyTo, + correlation_id => <<"corr-123">>, + content_type => <<"text/plain">>, + content_encoding => <<"some encoding">>, + absolute_expiry_time => Now + 100_000, + creation_time => Now, + group_id => <<"my group ID">>, + group_sequence => 16#ff_ff_ff_ff, + reply_to_group_id => <<"other group ID">>}, + amqp10_msg:new(<<"t1">>, <<"m1">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:new(<<"t2">>, <<"m2">>)), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{group_id => <<"my group ID">>}, + amqp10_msg:new(<<"t3">>, <<"m3">>))), + + ok = wait_for_accepts(3), + ok = detach_link_sync(Sender), + flush(sent), + + PropsFilter1 = [ + {{symbol, <<"message-id">>}, {ulong, 999}}, + {{symbol, <<"user-id">>}, {binary, <<"guest">>}}, + {{symbol, <<"subject">>}, {utf8, <<"🐇"/utf8>>}}, + {{symbol, <<"to">>}, {utf8, To}}, + {{symbol, <<"reply-to">>}, {utf8, ReplyTo}}, + {{symbol, <<"correlation-id">>}, {utf8, <<"corr-123">>}}, + {{symbol, <<"content-type">>}, {symbol, <<"text/plain">>}}, + {{symbol, <<"content-encoding">>}, {symbol, <<"some encoding">>}}, + {{symbol, <<"absolute-expiry-time">>}, {timestamp, Now + 100_000}}, + {{symbol, <<"creation-time">>}, {timestamp, Now}}, + {{symbol, <<"group-id">>}, {utf8, <<"my group ID">>}}, + {{symbol, <<"group-sequence">>}, {uint, 16#ff_ff_ff_ff}}, + {{symbol, <<"reply-to-group-id">>}, {utf8, <<"other group ID">>}} + ], + Filter1 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter1}}, + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + settled, configuration, Filter1), + ok = amqp10_client:flow_link_credit(Receiver1, 10, never), + receive {amqp10_msg, Receiver1, R1M1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver1), + + PropsFilter2 = [{{symbol, <<"group-id">>}, {utf8, <<"my group ID">>}}], + Filter2 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter2}}, + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + unsettled, configuration, Filter2), + {ok, R2M1} = amqp10_client:get_msg(Receiver2), + {ok, R2M2} = amqp10_client:get_msg(Receiver2), + ok = amqp10_client:accept_msg(Receiver2, R2M1), + ok = amqp10_client:accept_msg(Receiver2, R2M2), + ?assertEqual([<<"m1">>], amqp10_msg:body(R2M1)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R2M2)), + ok = detach_link_sync(Receiver2), + + %% Filter is in place, but no message matches. + PropsFilter3 = [{{symbol, <<"group-id">>}, {utf8, <<"no match">>}}], + Filter3 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter3}}, + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session, <<"receiver 3">>, Address, + unsettled, configuration, Filter3), + ok = amqp10_client:flow_link_credit(Receiver3, 10, never), + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver3), + + %% Wrong type should fail validation in the server. + %% RabbitMQ should exclude this filter in its reply attach frame because + %% "the sending endpoint [RabbitMQ] sets the filter actually in place". + %% Hence, no filter expression is actually in place and we should receive all messages. + PropsFilter4 = [{{symbol, <<"group-id">>}, {uint, 3}}], + Filter4 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter4}}, + {ok, Receiver4} = amqp10_client:attach_receiver_link( + Session, <<"receiver 4">>, Address, + unsettled, configuration, Filter4), + {ok, R4M1} = amqp10_client:get_msg(Receiver4), + {ok, R4M2} = amqp10_client:get_msg(Receiver4), + {ok, R4M3} = amqp10_client:get_msg(Receiver4), + ok = amqp10_client:accept_msg(Receiver4, R4M1), + ok = amqp10_client:accept_msg(Receiver4, R4M2), + ok = amqp10_client:accept_msg(Receiver4, R4M3), + ?assertEqual([<<"m1">>], amqp10_msg:body(R4M1)), + ?assertEqual([<<"m2">>], amqp10_msg:body(R4M2)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R4M3)), + ok = detach_link_sync(Receiver4), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +application_properties_section(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + {Connection, Session, LinkPair} = init(Config), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"k1">> => -2, + <<"k2">> => 10, + <<"k3">> => false, + <<"k4">> => true, + <<"k5">> => <<"hey">>}, + amqp10_msg:new(<<"t1">>, <<"m1">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"k2">> => 10.1}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:new(<<"t3">>, <<"m3">>)), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"k2">> => 10.0}, + amqp10_msg:new(<<"t4">>, <<"m4">>))), + + ok = wait_for_accepts(4), + ok = detach_link_sync(Sender), + flush(sent), + + AppPropsFilter0 = [{{utf8, <<"k5">>}, {symbol, <<"no match">>}}], + Filter0 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter0}}, + {ok, Receiver0} = amqp10_client:attach_receiver_link( + Session, <<"receiver 0">>, Address, + unsettled, configuration, Filter0), + ok = amqp10_client:flow_link_credit(Receiver0, 10, never), + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver0), + + AppPropsFilter1 = [ + {{utf8, <<"k1">>}, {int, -2}}, + {{utf8, <<"k5">>}, {symbol, <<"hey">>}}, + {{utf8, <<"k4">>}, {boolean, true}}, + {{utf8, <<"k3">>}, false} + ], + Filter1 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1}}, + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + settled, configuration, Filter1), + ok = amqp10_client:flow_link_credit(Receiver1, 10, never), + receive {amqp10_msg, Receiver1, R1M1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver1), + + %% Due to simple type matching [filtex-v1.0-wd09 §4.1.1] + %% we expect integer 10 to also match number 10.0. + AppPropsFilter2 = [{{utf8, <<"k2">>}, {uint, 10}}], + Filter2 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter2}}, + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + unsettled, configuration, Filter2), + {ok, R2M1} = amqp10_client:get_msg(Receiver2), + {ok, R2M2} = amqp10_client:get_msg(Receiver2), + ok = amqp10_client:accept_msg(Receiver2, R2M1), + ok = amqp10_client:accept_msg(Receiver2, R2M2), + ?assertEqual([<<"m1">>], amqp10_msg:body(R2M1)), + ?assertEqual([<<"m4">>], amqp10_msg:body(R2M2)), + ok = detach_link_sync(Receiver2), + + %% A reference field value of NULL should always match. [filtex-v1.0-wd09 §4.1.1] + AppPropsFilter3 = [{{utf8, <<"k2">>}, null}], + Filter3 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter3}}, + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session, <<"receiver 3">>, Address, + unsettled, configuration, Filter3), + {ok, R3M1} = amqp10_client:get_msg(Receiver3), + {ok, R3M2} = amqp10_client:get_msg(Receiver3), + {ok, R3M3} = amqp10_client:get_msg(Receiver3), + ok = amqp10_client:accept_msg(Receiver3, R3M1), + ok = amqp10_client:accept_msg(Receiver3, R3M2), + ok = amqp10_client:accept_msg(Receiver3, R3M3), + ?assertEqual([<<"m1">>], amqp10_msg:body(R3M1)), + ?assertEqual([<<"m2">>], amqp10_msg:body(R3M2)), + ?assertEqual([<<"m4">>], amqp10_msg:body(R3M3)), + ok = detach_link_sync(Receiver3), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +%% Test filter expressions matching multiple message sections. +multiple_sections(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + {Connection, Session, LinkPair} = init(Config), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{subject => <<"The Subject">>}, + amqp10_msg:new(<<"t1">>, <<"m1">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"The Key">> => -123}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{subject => <<"The Subject">>}, + amqp10_msg:set_application_properties( + #{<<"The Key">> => -123}, + amqp10_msg:new(<<"t3">>, <<"m3">>)))), + + ok = wait_for_accepts(3), + ok = detach_link_sync(Sender), + flush(sent), + + PropsFilter = [{{symbol, <<"subject">>}, {utf8, <<"The Subject">>}}], + Filter1 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + unsettled, configuration, Filter1), + {ok, R1M1} = amqp10_client:get_msg(Receiver1), + {ok, R1M3} = amqp10_client:get_msg(Receiver1), + ok = amqp10_client:accept_msg(Receiver1, R1M1), + ok = amqp10_client:accept_msg(Receiver1, R1M3), + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R1M3)), + ok = detach_link_sync(Receiver1), + + AppPropsFilter = [{{utf8, <<"The Key">>}, {byte, -123}}], + Filter2 = #{?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + unsettled, configuration, Filter2), + {ok, R2M2} = amqp10_client:get_msg(Receiver2), + {ok, R2M3} = amqp10_client:get_msg(Receiver2), + ok = amqp10_client:accept_msg(Receiver2, R2M2), + ok = amqp10_client:accept_msg(Receiver2, R2M3), + ?assertEqual([<<"m2">>], amqp10_msg:body(R2M2)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R2M3)), + ok = detach_link_sync(Receiver2), + + Filter3 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter}, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session, <<"receiver 3">>, Address, + unsettled, configuration, Filter3), + {ok, R3M3} = amqp10_client:get_msg(Receiver3), + ok = amqp10_client:accept_msg(Receiver3, R3M3), + ?assertEqual([<<"m3">>], amqp10_msg:body(R3M3)), + ok = detach_link_sync(Receiver3), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +%% Filter a small subset from many messages. +%% We test here that flow control still works correctly. +filter_few_messages_from_many(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + {Connection, Session, LinkPair} = init(Config), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{group_id => <<"my group ID">>}, + amqp10_msg:new(<<"t1">>, <<"first msg">>))), + ok = send_messages(Sender, 1000, false), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{group_id => <<"my group ID">>}, + amqp10_msg:new(<<"t2">>, <<"last msg">>))), + ok = wait_for_accepts(1002), + ok = detach_link_sync(Sender), + flush(sent), + + %% Our filter should cause us to receive only the first and + %% last message out of the 1002 messages in the stream. + PropsFilter = [{{symbol, <<"group-id">>}, {utf8, <<"my group ID">>}}], + Filter = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter}}, + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, + unsettled, configuration, Filter), + + ok = amqp10_client:flow_link_credit(Receiver, 2, never), + receive {amqp10_msg, Receiver, M1} -> + ?assertEqual([<<"first msg">>], amqp10_msg:body(M1)), + ok = amqp10_client:accept_msg(Receiver, M1) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Receiver, M2} -> + ?assertEqual([<<"last msg">>], amqp10_msg:body(M2)), + ok = amqp10_client:accept_msg(Receiver, M2) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = detach_link_sync(Receiver), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +string_modifier(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + {Connection, Session, LinkPair} = init(Config), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{to => <<"abc 1">>, + reply_to => <<"abc 2">>, + subject => <<"abc 3">>, + group_id => <<"abc 4">>, + reply_to_group_id => <<"abc 5">>, + message_id => {utf8, <<"abc 6">>}, + correlation_id => <<"abc 7">>, + group_sequence => 16#ff_ff_ff_ff}, + amqp10_msg:set_application_properties( + #{<<"k1">> => <<"abc 8">>, + <<"k2">> => <<"abc 9">>}, + amqp10_msg:new(<<"t1">>, <<"m1">>)))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"k1">> => <<"abc">>}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{subject => <<"$Hello">>, + reply_to_group_id => <<"xyz 5">>}, + amqp10_msg:new(<<"t3">>, <<"m3">>))), + + ok = wait_for_accepts(3), + ok = detach_link_sync(Sender), + flush(sent), + + PropsFilter1 = [ + {{symbol, <<"to">>}, {utf8, <<"$p:abc ">>}}, + {{symbol, <<"reply-to">>}, {utf8, <<"$p:abc">>}}, + {{symbol, <<"subject">>}, {utf8, <<"$p:ab">>}}, + {{symbol, <<"group-id">>}, {utf8, <<"$p:a">>}}, + {{symbol, <<"reply-to-group-id">>}, {utf8, <<"$s:5">>}}, + {{symbol, <<"correlation-id">>}, {utf8, <<"$s:abc 7">>}}, + {{symbol, <<"message-id">>}, {utf8, <<"$p:abc 6">>}} + ], + AppPropsFilter1 = [ + {{utf8, <<"k1">>}, {utf8, <<"$s: 8">>}}, + {{utf8, <<"k2">>}, {utf8, <<"$p:abc ">>}} + ], + Filter1 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter1}, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + settled, configuration, Filter1), + ok = amqp10_client:flow_link_credit(Receiver1, 10, never), + receive {amqp10_msg, Receiver1, R1M1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver1), + + %% Same filters as before except for subject which shouldn't match anymore. + PropsFilter2 = lists:keyreplace( + {symbol, <<"subject">>}, 1, PropsFilter1, + {{symbol, <<"subject">>}, {utf8, <<"$s:xxxxxxxxxxxxxx">>}}), + Filter2 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter2}, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + settled, configuration, Filter2), + ok = amqp10_client:flow_link_credit(Receiver2, 10, never), + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver2), + + PropsFilter3 = [{{symbol, <<"reply-to-group-id">>}, {utf8, <<"$s: 5">>}}], + Filter3 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter3}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session, <<"receiver 3">>, Address, + settled, configuration, Filter3), + ok = amqp10_client:flow_link_credit(Receiver3, 10, never), + receive {amqp10_msg, Receiver3, R3M1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(R3M1)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Receiver3, R3M3} -> + ?assertEqual([<<"m3">>], amqp10_msg:body(R3M3)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = detach_link_sync(Receiver3), + + %% '$$" is the escape prefix for case-sensitive matching of a string starting with ‘&’ + PropsFilter4 = [{{symbol, <<"subject">>}, {utf8, <<"$$Hello">>}}], + Filter4 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter4}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver4} = amqp10_client:attach_receiver_link( + Session, <<"receiver 4">>, Address, + settled, configuration, Filter4), + {ok, R4M3} = amqp10_client:get_msg(Receiver4), + ?assertEqual([<<"m3">>], amqp10_msg:body(R4M3)), + ok = detach_link_sync(Receiver4), + + %% Starting the reference field value with $ is invalid without using a valid modifier + %% prefix is invalid. + %% RabbitMQ should exclude this filter in its reply attach frame because + %% "the sending endpoint [RabbitMQ] sets the filter actually in place". + %% Hence, no filter expression is actually in place and we should receive all messages. + PropsFilter5 = [{{symbol, <<"subject">>}, {utf8, <<"$Hello">>}}], + Filter5 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter5}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver5} = amqp10_client:attach_receiver_link( + Session, <<"receiver 5">>, Address, + settled, configuration, Filter5), + {ok, R5M1} = amqp10_client:get_msg(Receiver5), + ?assertEqual([<<"m1">>], amqp10_msg:body(R5M1)), + {ok, R5M2} = amqp10_client:get_msg(Receiver5), + ?assertEqual([<<"m2">>], amqp10_msg:body(R5M2)), + {ok, R5M3} = amqp10_client:get_msg(Receiver5), + ?assertEqual([<<"m3">>], amqp10_msg:body(R5M3)), + ok = detach_link_sync(Receiver5), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +assert_no_msg_received(Line) -> + receive {amqp10_msg, _, _} = Msg -> + ct:fail({received_unexpected_msg, Line, Msg}) + after 10 -> + ok + end. diff --git a/deps/rabbit/test/amqp_utils.erl b/deps/rabbit/test/amqp_utils.erl new file mode 100644 index 000000000000..f1816a07c228 --- /dev/null +++ b/deps/rabbit/test/amqp_utils.erl @@ -0,0 +1,139 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(amqp_utils). + +-export([init/1, init/2, + connection_config/1, connection_config/2, + flush/1, + wait_for_credit/1, + wait_for_accepts/1, + send_messages/3, send_messages/4, + detach_link_sync/1, + end_session_sync/1, + wait_for_session_end/1, + close_connection_sync/1]). + +init(Config) -> + init(0, Config). + +init(Node, Config) -> + OpnConf = connection_config(Node, Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + {Connection, Session, LinkPair}. + +connection_config(Config) -> + connection_config(0, Config). + +connection_config(Node, Config) -> + Host = proplists:get_value(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp), + #{address => Host, + port => Port, + container_id => <<"my container">>, + sasl => {plain, <<"guest">>, <<"guest">>}}. + +flush(Prefix) -> + receive + Msg -> + ct:pal("~p flushed: ~p~n", [Prefix, Msg]), + flush(Prefix) + after 1 -> + ok + end. + +% Before we can send messages we have to wait for credit from the server. +wait_for_credit(Sender) -> + receive + {amqp10_event, {link, Sender, credited}} -> + ok + after 5000 -> + flush("wait_for_credit timed out"), + ct:fail(credited_timeout) + end. + +wait_for_accepts(0) -> + ok; +wait_for_accepts(N) -> + receive + {amqp10_disposition, {accepted, _}} -> + wait_for_accepts(N - 1) + after 5000 -> + ct:fail({missing_accepted, N}) + end. + +send_messages(Sender, Left, Settled) -> + send_messages(Sender, Left, Settled, <<>>). + +send_messages(_, 0, _, _) -> + ok; +send_messages(Sender, Left, Settled, BodySuffix) -> + Bin = integer_to_binary(Left), + Body = <>, + Msg = amqp10_msg:new(Bin, Body, Settled), + case amqp10_client:send_msg(Sender, Msg) of + ok -> + send_messages(Sender, Left - 1, Settled, BodySuffix); + {error, insufficient_credit} -> + ok = wait_for_credit(Sender), + %% The credited event we just processed could have been received some time ago, + %% i.e. we might have 0 credits right now. This happens in the following scenario: + %% 1. We (test case proc) send a message successfully, the client session proc decrements remaining link credit from 1 to 0. + %% 2. The server grants our client session proc new credits. + %% 3. The client session proc sends us (test case proc) a credited event. + %% 4. We didn't even notice that we ran out of credits temporarily. We send the next message, it succeeds, + %% but do not process the credited event in our mailbox. + %% So, we must be defensive here and assume that the next amqp10_client:send/2 call might return {error, insufficient_credit} + %% again causing us then to really wait to receive a credited event (instead of just processing an old credited event). + send_messages(Sender, Left, Settled, BodySuffix) + end. + +detach_link_sync(Link) -> + ok = amqp10_client:detach_link(Link), + ok = wait_for_link_detach(Link). + +wait_for_link_detach(Link) -> + receive + {amqp10_event, {link, Link, {detached, normal}}} -> + flush(?FUNCTION_NAME), + ok + after 5000 -> + flush("wait_for_link_detach timed out"), + ct:fail({link_detach_timeout, Link}) + end. + +end_session_sync(Session) + when is_pid(Session) -> + ok = amqp10_client:end_session(Session), + ok = wait_for_session_end(Session). + +wait_for_session_end(Session) -> + receive + {amqp10_event, {session, Session, {ended, _}}} -> + flush(?FUNCTION_NAME), + ok + after 5000 -> + flush("wait_for_session_end timed out"), + ct:fail({session_end_timeout, Session}) + end. + +close_connection_sync(Connection) + when is_pid(Connection) -> + ok = amqp10_client:close_connection(Connection), + ok = wait_for_connection_close(Connection). + +wait_for_connection_close(Connection) -> + receive + {amqp10_event, {connection, Connection, {closed, normal}}} -> + flush(?FUNCTION_NAME), + ok + after 5000 -> + flush("wait_for_connection_close timed out"), + ct:fail({connection_close_timeout, Connection}) + end. diff --git a/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl b/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl index 872424f53224..3b0b79143617 100644 --- a/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl +++ b/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl @@ -14,6 +14,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("amqp10_common/include/amqp10_framing.hrl"). +-include_lib("amqp10_common/include/amqp10_filtex.hrl"). all() -> [{group, tests}]. @@ -24,7 +25,8 @@ groups() -> amqpl, amqp_credit_multiple_grants, amqp_credit_single_grant, - amqp_attach_sub_batch + amqp_attach_sub_batch, + amqp_filter_expression ] }]. @@ -270,6 +272,51 @@ amqp_attach_sub_batch(Config) -> ok = amqp10_client:detach_link(Receiver), ok = amqp10_client:close_connection(Connection). +%% Test that AMQP filter expressions work when messages +%% are published via the stream protocol and consumed via AMQP. +amqp_filter_expression(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + publish_via_stream_protocol(Stream, Config), + + %% Consume from the stream via AMQP 1.0. + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + Address = <<"/queue/", Stream/binary>>, + + AppPropsFilter = [{{utf8, <<"my key">>}, + {utf8, <<"my value">>}}], + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"test-receiver">>, Address, settled, configuration, + #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter} + }), + + ok = amqp10_client:flow_link_credit(Receiver, 100, never), + receive {amqp10_msg, Receiver, M2} -> + ?assertEqual([<<"m2">>], amqp10_msg:body(M2)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Receiver, M4} -> + ?assertEqual([<<"m4">>], amqp10_msg:body(M4)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Receiver, M5} -> + ?assertEqual([<<"m5">>], amqp10_msg:body(M5)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Receiver, M6} -> + ?assertEqual([<<"m6">>], amqp10_msg:body(M6)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, _, _} = Msg -> + ct:fail({received_unexpected_msg, Msg}) + after 10 -> ok + end, + + ok = amqp10_client:detach_link(Receiver), + ok = amqp10_client:close_connection(Connection). + %% ------------------------------------------------------------------- %% Helpers %% ------------------------------------------------------------------- @@ -310,7 +357,9 @@ publish_via_stream_protocol(Stream, Config) -> {{response, 1, {declare_publisher, _}}, C7} = receive_stream_commands(S, C6), M1 = simple_entry(1, <<"m1">>), - M2 = simple_entry(2, <<"m2">>), + M2 = simple_entry(2, <<"m2">>, #'v1_0.application_properties'{ + content = [{{utf8, <<"my key">>}, + {utf8, <<"my value">>}}]}), M3 = simple_entry(3, <<"m3">>), Messages1 = [M1, M2, M3], PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, length(Messages1), Messages1}), @@ -342,11 +391,25 @@ simple_entry(Sequence, Body) DataSectSize = byte_size(DataSect), <>. -%% Here, each AMQP 1.0 encoded message contains a single data section. +%% Streams contain AMQP 1.0 encoded messages. +%% In this case, the AMQP 1.0 encoded message consists of an application-properties section and a data section. +simple_entry(Sequence, Body, AppProps) + when is_binary(Body) -> + AppPropsSect = iolist_to_binary(amqp10_framing:encode_bin(AppProps)), + DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})), + Sects = <>, + SectSize = byte_size(Sects), + <>. + +%% Here, each AMQP 1.0 encoded message consists of an application-properties section and a data section. %% All data sections are delivered uncompressed in 1 batch. sub_batch_entry_uncompressed(Sequence, Bodies) -> Batch = lists:foldl(fun(Body, Acc) -> - Sect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})), + AppProps = #'v1_0.application_properties'{ + content = [{{utf8, <<"my key">>}, {utf8, <<"my value">>}}]}, + Sect0 = iolist_to_binary(amqp10_framing:encode_bin(AppProps)), + Sect1 = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})), + Sect = <>, <> end, <<>>, Bodies), Size = byte_size(Batch), diff --git a/moduleindex.yaml b/moduleindex.yaml index ebadcd41d644..1ce6bae902c0 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -543,6 +543,7 @@ rabbit: - rabbit_access_control - rabbit_alarm - rabbit_amqp1_0 +- rabbit_amqp_filtex - rabbit_amqp_management - rabbit_amqp_reader - rabbit_amqp_session