Skip to content

Commit

Permalink
Support exchange federation with MQTT 5.0 subscribers
Browse files Browse the repository at this point in the history
 ## What?
This commit fixes #13040.

Prior to this commit, exchange federation crashed if the MQTT topic exchange
(`amq.topic` by default) got federated and MQTT 5.0 clients subscribed on the
downstream. That's because the federation plugin sends bindings from downstream
to upstream via AMQP 0.9.1. However, binding arguments containing Erlang record
`mqtt_subscription_opts` (henceforth binding args v1) cannot be encoded in AMQP 0.9.1.

 ## Why?
Federating the MQTT topic exchange could be useful for warm standby use cases.

 ## How?
This commit makes binding arguments a valid AMQP 0.9.1 table (henceforth
binding args v2).

Binding args v2 can only be used if all nodes support it. Hence binding
args v2 comes with feature flag `rabbitmq_4.1.0`. Note that the AMQP
over WebSocket
[PR](#13071) already
introduces this same feature flag. Although the feature flag subsystem
supports plugins to define their own feature flags, and the MQTT plugin
defined its own feature flags in the past, reusing feature flag
`rabbitmq_4.1.0` is simpler.

This commit also avoids database migrations for both Mnesia and Khepri
if feature flag `rabbitmq_4.1.0` gets enabled. Instead, it's simpler to
migrate binding args v1 to binding args v2 at MQTT connection establishment
time if the feature flag is enabled. (If the feature flag is disabled at
connection etablishment time, but gets enabled during the connection
lifetime, the connection keeps using bindings args v1.)

This commit adds two new suites:
1. `federation_SUITE` which tests that federating the MQTT topic
   exchange works, and
2. `feature_flag_SUITE` which tests the binding args migration from v1 to v2.
  • Loading branch information
ansd committed Jan 22, 2025
1 parent a51d8a5 commit 3a65695
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 31 deletions.
7 changes: 7 additions & 0 deletions deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,10 @@
stability => stable,
depends_on => [message_containers]
}}).

-rabbit_feature_flag(
{'rabbitmq_4.1.0',
#{desc => "Allows rolling upgrades to 4.1.x",
stability => stable,
depends_on => ['rabbitmq_4.0.0']
}}).
4 changes: 2 additions & 2 deletions deps/rabbitmq_mqtt/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export BUILD_WITHOUT_QUIC

LOCAL_DEPS = ssl
DEPS = ranch rabbit amqp10_common
TEST_DEPS = emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management amqp_client rabbitmq_consistent_hash_exchange rabbitmq_amqp_client rabbitmq_stomp rabbitmq_stream
TEST_DEPS = emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management amqp_client rabbitmq_consistent_hash_exchange rabbitmq_amqp_client rabbitmq_stomp rabbitmq_stream rabbitmq_federation

PLT_APPS += rabbitmqctl elixir

Expand Down Expand Up @@ -94,7 +94,7 @@ define ct_master.erl
halt(0)
endef

PARALLEL_CT_SET_1_A = auth retainer
PARALLEL_CT_SET_1_A = auth retainer federation feature_flag
PARALLEL_CT_SET_1_B = cluster command config config_schema mc_mqtt packet_prop \
processor protocol_interop proxy_protocol rabbit_mqtt_confirms reader util
PARALLEL_CT_SET_1_C = java v5
Expand Down
132 changes: 103 additions & 29 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,13 @@
send_fun :: send_fun(),
%% Maximum MQTT packet size in bytes for packets sent from server to client.
max_packet_size_outbound :: max_packet_size(),
topic_alias_maximum_outbound :: non_neg_integer()
}).
topic_alias_maximum_outbound :: non_neg_integer(),
%% https://github.com/rabbitmq/rabbitmq-server/issues/13040
%% The database stores the MQTT subscription options in the binding arguments for:
%% * v1 as Erlang record #mqtt_subscription_opts{}
%% * v2 as AMQP 0.9.1 table
binding_args_v2 :: boolean()
}).

-record(state,
{cfg :: #cfg{},
Expand Down Expand Up @@ -207,6 +212,9 @@ process_connect(
{TraceState, ConnName} = init_trace(VHost, ConnName0),
ok = rabbit_mqtt_keepalive:start(KeepaliveSecs, Socket),
Exchange = rabbit_misc:r(VHost, exchange, persistent_term:get(?PERSISTENT_TERM_EXCHANGE)),
%% To simplify logic, we decide at connection establishment time to stick
%% with either binding args v1 or v2 for the lifetime of the connection.
BindingArgsV2 = rabbit_feature_flags:is_enabled('rabbitmq_4.1.0'),
S = #state{
cfg = #cfg{socket = Socket,
proto_ver = proto_integer_to_atom(ProtoVer),
Expand All @@ -229,7 +237,8 @@ process_connect(
user_prop = maps:get('User-Property', ConnectProps, []),
will_msg = WillMsg,
max_packet_size_outbound = MaxPacketSize,
topic_alias_maximum_outbound = TopicAliasMaxOutbound},
topic_alias_maximum_outbound = TopicAliasMaxOutbound,
binding_args_v2 = BindingArgsV2},
auth_state = #auth_state{
user = User,
authz_ctx = AuthzCtx}},
Expand Down Expand Up @@ -432,7 +441,8 @@ process_request(?SUBSCRIBE,
packet_id = SubscribePktId,
subscriptions = Subscriptions},
payload = undefined},
#state{cfg = #cfg{proto_ver = ProtoVer}} = State0) ->
State0 = #state{cfg = #cfg{proto_ver = ProtoVer,
binding_args_v2 = BindingArgsV2}}) ->
?LOG_DEBUG("Received a SUBSCRIBE with subscription(s) ~p", [Subscriptions]),
{ResultRev, RetainedRev, State1} =
lists:foldl(
Expand Down Expand Up @@ -460,7 +470,7 @@ process_request(?SUBSCRIBE,
maybe
{ok, Q} ?= ensure_queue(QoS, S0),
QName = amqqueue:get_name(Q),
BindingArgs = binding_args_for_proto_ver(ProtoVer, TopicFilter, Opts),
BindingArgs = binding_args_for_proto_ver(ProtoVer, TopicFilter, Opts, BindingArgsV2),
ok ?= add_subscription(TopicFilter, BindingArgs, QName, S0),
ok ?= maybe_delete_old_subscription(TopicFilter, Opts, S0),
Subs = maps:put(TopicFilter, Opts, S0#state.subscriptions),
Expand Down Expand Up @@ -508,10 +518,11 @@ process_request(?UNSUBSCRIBE,
{ReasonCodes, State} =
lists:foldl(
fun(TopicFilter, {L, #state{subscriptions = Subs0,
cfg = #cfg{proto_ver = ProtoVer}} = S0}) ->
cfg = #cfg{proto_ver = ProtoVer,
binding_args_v2 = BindingArgsV2}} = S0}) ->
case maps:take(TopicFilter, Subs0) of
{Opts, Subs} ->
BindingArgs = binding_args_for_proto_ver(ProtoVer, TopicFilter, Opts),
BindingArgs = binding_args_for_proto_ver(ProtoVer, TopicFilter, Opts, BindingArgsV2),
case delete_subscription(
TopicFilter, BindingArgs, Opts#mqtt_subscription_opts.qos, S0) of
ok ->
Expand Down Expand Up @@ -872,52 +883,76 @@ init_subscriptions(_SessionPresent = _SubscriptionsPresent = true,
init_subscriptions(_, State) ->
{ok, State}.

%% We suppress a warning because rabbit_misc:table_lookup/2 declares the correct spec and
%% we must handle binding args v1 where binding arguments are not a valid AMQP 0.9.1 table.
-dialyzer({no_match, init_subscriptions0/2}).

-spec init_subscriptions0(qos(), state()) ->
{ok, subscriptions()} | {error, reason_code()}.
init_subscriptions0(QoS, State0 = #state{cfg = #cfg{proto_ver = ProtoVer,
exchange = Exchange}}) ->
init_subscriptions0(QoS, State = #state{cfg = #cfg{proto_ver = ProtoVer,
exchange = Exchange,
binding_args_v2 = BindingArgsV2}}) ->
Bindings =
rabbit_binding:list_for_source_and_destination(
Exchange,
queue_name(QoS, State0),
queue_name(QoS, State),
%% Querying table rabbit_route is catastrophic for CPU usage.
%% Querying table rabbit_reverse_route is acceptable because
%% the source exchange is always the same in the MQTT plugin whereas
%% the destination queue is different for each MQTT client and
%% rabbit_reverse_route is sorted by destination queue.
_Reverse = true),
try
Subs = lists:foldl(
Subs = lists:map(
fun(#binding{key = Key,
args = Args = []},
Acc) ->
args = Args = []}) ->
Opts = #mqtt_subscription_opts{qos = QoS},
TopicFilter = amqp_to_mqtt(Key),
case ProtoVer of
?MQTT_PROTO_V5 ->
%% session upgrade
NewBindingArgs = binding_args_for_proto_ver(ProtoVer, TopicFilter, Opts),
ok = recreate_subscription(TopicFilter, Args, NewBindingArgs, QoS, State0);
NewBindingArgs = binding_args_for_proto_ver(ProtoVer, TopicFilter, Opts, BindingArgsV2),
ok = recreate_subscription(TopicFilter, Args, NewBindingArgs, QoS, State);
_ ->
ok
end,
maps:put(TopicFilter, Opts, Acc);
{TopicFilter, Opts};
(#binding{key = Key,
args = Args},
Acc) ->
Opts0 = #mqtt_subscription_opts{} = lists:keyfind(mqtt_subscription_opts, 1, Args),
args = Args}) ->
TopicFilter = amqp_to_mqtt(Key),
Opts = case ProtoVer of
?MQTT_PROTO_V5 ->
Opts0;
case rabbit_misc:table_lookup(Args, <<"x-mqtt-subscription-opts">>) of
{table, Table} ->
%% binding args v2
subscription_opts_from_table(Table);
undefined ->
%% binding args v1
Opts0 = #mqtt_subscription_opts{} = lists:keyfind(
mqtt_subscription_opts, 1, Args),
case BindingArgsV2 of
true ->
%% Migrate v1 to v2.
%% Note that this migration must be in place even for some versions
%% (jump upgrade) after feature flag 'rabbitmq_4.1.0' has become
%% required since enabling the feature flag doesn't migrate binding
%% args for existing connections.
NewArgs = binding_args_for_proto_ver(
ProtoVer, TopicFilter, Opts0, BindingArgsV2),
ok = recreate_subscription(TopicFilter, Args, NewArgs, QoS, State);
false ->
ok
end,
Opts0
end;
_ ->
%% session downgrade
ok = recreate_subscription(TopicFilter, Args, [], QoS, State0),
ok = recreate_subscription(TopicFilter, Args, [], QoS, State),
#mqtt_subscription_opts{qos = QoS}
end,
maps:put(TopicFilter, Opts, Acc)
end, #{}, Bindings),
{ok, Subs}
{TopicFilter, Opts}
end, Bindings),
{ok, maps:from_list(Subs)}
catch throw:{error, Reason} ->
Rc = case Reason of
access_refused -> ?RC_NOT_AUTHORIZED;
Expand Down Expand Up @@ -1482,14 +1517,52 @@ consume(Q, QoS, #state{
Err
end.

binding_args_for_proto_ver(?MQTT_PROTO_V3, _, _) ->
binding_args_for_proto_ver(?MQTT_PROTO_V3, _, _, _) ->
[];
binding_args_for_proto_ver(?MQTT_PROTO_V4, _, _) ->
binding_args_for_proto_ver(?MQTT_PROTO_V4, _, _, _) ->
[];
binding_args_for_proto_ver(?MQTT_PROTO_V5, TopicFilter, SubOpts) ->
binding_args_for_proto_ver(?MQTT_PROTO_V5, TopicFilter, SubOpts0, V2) ->
SubOpts = case V2 of
true ->
Table = subscription_opts_to_table(SubOpts0),
{<<"x-mqtt-subscription-opts">>, table, Table};
false ->
SubOpts0
end,
BindingKey = mqtt_to_amqp(TopicFilter),
[SubOpts, {<<"x-binding-key">>, longstr, BindingKey}].

subscription_opts_to_table(#mqtt_subscription_opts{
qos = Qos,
no_local = NoLocal,
retain_as_published = RetainAsPublished,
retain_handling = RetainHandling,
id = Id}) ->
Table0 = [{<<"qos">>, unsignedbyte, Qos},
{<<"no-local">>, bool, NoLocal},
{<<"retain-as-published">>, bool, RetainAsPublished},
{<<"retain-handling">>, unsignedbyte, RetainHandling}],
Table = case Id of
undefined ->
Table0;
_ ->
[{<<"id">>, unsignedint, Id} | Table0]
end,
rabbit_misc:sort_field_table(Table).

subscription_opts_from_table(Table) ->
#{<<"qos">> := Qos,
<<"no-local">> := NoLocal,
<<"retain-as-published">> := RetainAsPublished,
<<"retain-handling">> := RetainHandling
} = Map = rabbit_misc:amqp_table(Table),
#mqtt_subscription_opts{
qos = Qos,
no_local = NoLocal,
retain_as_published = RetainAsPublished,
retain_handling = RetainHandling,
id = maps:get(<<"id">>, Map, undefined)}.

add_subscription(TopicFilter, BindingArgs, Qos, State)
when is_integer(Qos) ->
add_subscription(TopicFilter, BindingArgs, queue_name(Qos, State), State);
Expand All @@ -1506,12 +1579,13 @@ delete_subscription(TopicFilter, BindingArgs, Qos, State) ->
%% Subscription will be identical to that in the previous Subscription, although its
%% Subscription Options could be different." [v5 3.8.4]
maybe_delete_old_subscription(TopicFilter, Opts, State = #state{subscriptions = Subs,
cfg = #cfg{proto_ver = ProtoVer}}) ->
cfg = #cfg{proto_ver = ProtoVer,
binding_args_v2 = BindingArgsV2}}) ->
case Subs of
#{TopicFilter := OldOpts}
when OldOpts =/= Opts ->
delete_subscription(TopicFilter,
binding_args_for_proto_ver(ProtoVer, TopicFilter, OldOpts),
binding_args_for_proto_ver(ProtoVer, TopicFilter, OldOpts, BindingArgsV2),
OldOpts#mqtt_subscription_opts.qos,
State);
_ ->
Expand Down
111 changes: 111 additions & 0 deletions deps/rabbitmq_mqtt/test/feature_flag_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

%% This suite should be deleted when feature flag 'rabbitmq_4.1.0' becomes required.
-module(feature_flag_SUITE).
-compile([export_all,
nowarn_export_all]).

-include_lib("eunit/include/eunit.hrl").

-import(util,
[connect/2,
connect/3,
non_clean_sess_opts/0
]).

-define(RC_SESSION_TAKEN_OVER, 16#8E).

all() ->
[migrate_binding_args].

init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(
Config,
[{mqtt_version, v5},
{rmq_nodename_suffix, ?MODULE}]),
Config2 = rabbit_ct_helpers:merge_app_env(
Config1,
{rabbit, [{forced_feature_flags_on_init, []}]}),
rabbit_ct_helpers:run_setup_steps(
Config2,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(
Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).

migrate_binding_args(Config) ->
%% Feature flag rabbitmq_4.1.0 enables binding arguments v2.
FeatureFlag = 'rabbitmq_4.1.0',
?assertNot(rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, FeatureFlag)),

Sub1a = connect(<<"sub 1">>, Config, non_clean_sess_opts()),
{ok, _, [0]} = emqtt:subscribe(Sub1a, <<"x/+">>, qos0),
ok = emqtt:disconnect(Sub1a),

Sub2a = connect(<<"sub 2">>, Config,non_clean_sess_opts()),
{ok, _, [0, 1]} = emqtt:subscribe(
Sub2a,
#{'Subscription-Identifier' => 9},
[{<<"x/y">>, [{nl, false}, {rap, false}, {qos, qos0}]},
{<<"z">>, [{nl, true}, {rap, true}, {qos, qos1}]}]),

Pub = connect(<<"pub">>, Config),
{ok, _} = emqtt:publish(Pub, <<"x/y">>, <<"m1">>, [{retain, true}, {qos, 1}]),
receive {publish, #{client_pid := Sub2a,
qos := 0,
topic := <<"x/y">>,
payload := <<"m1">>,
retain := false}} -> ok
after 10_000 -> ct:fail({missing_publish, ?LINE})
end,

?assertEqual(ok, rabbit_ct_broker_helpers:enable_feature_flag(Config, FeatureFlag)),

%% Connecting causes binding args to be migrated from v1 to v2.
Sub1b = connect(<<"sub 1">>, Config, [{clean_start, false}]),
receive {publish, #{client_pid := Sub1b,
qos := 0,
topic := <<"x/y">>,
payload := <<"m1">>}} -> ok
after 10_000 -> ct:fail({missing_publish, ?LINE})
end,

unlink(Sub2a),
%% Connecting causes binding args to be migrated from v1 to v2.
Sub2b = connect(<<"sub 2">>, Config, [{clean_start, false}]),
receive {disconnected, ?RC_SESSION_TAKEN_OVER, #{}} -> ok
after 10_000 -> ct:fail({missing_disconnected, ?LINE})
end,

{ok, _} = emqtt:publish(Sub2b, <<"z">>, <<"m2">>, qos1),
%% We should not receive m2 since it's a local publish.
{ok, _} = emqtt:publish(Pub, <<"z">>, <<"m3">>, [{retain, true}, {qos, qos1}]),
receive {publish, Publish} ->
?assertMatch(#{client_pid := Sub2b,
qos := 1,
topic := <<"z">>,
payload := <<"m3">>,
properties := #{'Subscription-Identifier' := 9},
retain := true},
Publish)
after 10_000 -> ct:fail({missing_publish, ?LINE})
end,

ok = emqtt:disconnect(Sub1b),
ok = emqtt:disconnect(Sub2b),
ok = emqtt:disconnect(Pub).
Loading

0 comments on commit 3a65695

Please sign in to comment.