diff --git a/deps/rabbitmq_mqtt/src/mc_mqtt.erl b/deps/rabbitmq_mqtt/src/mc_mqtt.erl index 9add4830cdad..4432ebd1939e 100644 --- a/deps/rabbitmq_mqtt/src/mc_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/mc_mqtt.erl @@ -27,9 +27,9 @@ init(Msg = #mqtt_msg{qos = Qos, when is_integer(Qos) -> Anns0 = case Qos > 0 of true -> - #{?ANN_DURABLE => true}; + #{}; false -> - #{} + #{?ANN_DURABLE => false} end, Anns1 = case Props of #{'Message-Expiry-Interval' := Seconds} -> diff --git a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl index ddf2f2a2e919..8b8dcbc6275b 100644 --- a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl @@ -37,7 +37,8 @@ groups() -> mqtt_amqpl_alt, mqtt_amqp, mqtt_amqp_alt, - amqp_mqtt + amqp_mqtt, + is_persistent ]} ]. @@ -501,6 +502,19 @@ amqp_mqtt(_Config) -> }, Mqtt), ok. +is_persistent(_Config) -> + Msg0 = #mqtt_msg{qos = 0, + topic = <<"my/topic">>, + payload = <<>>}, + Mc0 = mc:init(mc_mqtt, Msg0, #{}), + ?assertNot(mc:is_persistent(Mc0)), + + Msg1 = #mqtt_msg{qos = 1, + topic = <<"my/topic">>, + payload = <<>>}, + Mc1 = mc:init(mc_mqtt, Msg1, #{}), + ?assert(mc:is_persistent(Mc1)). + mqtt_msg() -> #mqtt_msg{qos = 0, topic = <<"my/topic">>, diff --git a/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl b/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl index 8d289e456929..f1c6f57fcdbf 100644 --- a/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl @@ -23,7 +23,8 @@ -import(rabbit_ct_broker_helpers, [rpc/4]). -import(rabbit_ct_helpers, - [eventually/3]). + [eventually/1, + eventually/3]). all() -> [{group, tests}]. @@ -88,7 +89,8 @@ end_per_testcase(Testcase, Config) -> mqtt_amqpl_mqtt(Config) -> Q = ClientId = atom_to_binary(?FUNCTION_NAME), Ch = rabbit_ct_client_helpers:open_channel(Config), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q, + durable = true}), #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = Q, exchange = <<"amq.topic">>, routing_key = <<"my.topic">>}), @@ -147,6 +149,14 @@ mqtt_amqpl_mqtt(Config) -> after 1000 -> ct:fail("did not receive reply") end, + %% Another message MQTT 5.0 to AMQP 0.9.1, this time with QoS 0 + ok = emqtt:publish(C, <<"my/topic">>, RequestPayload, [{qos, 0}]), + eventually( + ?_assertMatch( + {#'basic.get_ok'{}, #amqp_msg{payload = RequestPayload, + props = #'P_basic'{delivery_mode = 1}}}, + amqp_channel:call(Ch, #'basic.get'{queue = Q}))), + ok = emqtt:disconnect(C). mqtt_amqp_mqtt(Config) ->