diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index 5af808e997fd..b5c152b6ea3c 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -894,25 +894,22 @@ session_expiry(Config) -> ok = rpc(Config, application, set_env, [App, Par, DefaultVal]). non_clean_sess_reconnect_qos1(Config) -> - non_clean_sess_reconnect(Config, qos1). + non_clean_sess_reconnect(Config, 1). non_clean_sess_reconnect_qos0(Config) -> - non_clean_sess_reconnect(Config, qos0). + non_clean_sess_reconnect(Config, 0). non_clean_sess_reconnect(Config, SubscriptionQoS) -> Pub = connect(<<"publisher">>, Config), Topic = ClientId = atom_to_binary(?FUNCTION_NAME), C1 = connect(ClientId, Config, non_clean_sess_opts()), - {ok, _, _} = emqtt:subscribe(C1, Topic, SubscriptionQoS), - ?assertMatch(#{consumers := 1}, - get_global_counters(Config)), + {ok, _, [SubscriptionQoS]} = emqtt:subscribe(C1, Topic, SubscriptionQoS), + ok = await_consumer_count(1, ClientId, SubscriptionQoS, Config), ok = emqtt:disconnect(C1), - eventually(?_assertMatch(#{consumers := 0}, - get_global_counters(Config))), + ok = await_consumer_count(0, ClientId, SubscriptionQoS, Config), - timer:sleep(20), ok = emqtt:publish(Pub, Topic, <<"msg-3-qos0">>, qos0), {ok, _} = emqtt:publish(Pub, Topic, <<"msg-4-qos1">>, qos1), @@ -920,8 +917,7 @@ non_clean_sess_reconnect(Config, SubscriptionQoS) -> %% Server should reply in CONNACK that it has session state. ?assertEqual({session_present, 1}, proplists:lookup(session_present, emqtt:info(C2))), - ?assertMatch(#{consumers := 1}, - get_global_counters(Config)), + ok = await_consumer_count(1, ClientId, SubscriptionQoS, Config), ok = emqtt:publish(Pub, Topic, <<"msg-5-qos0">>, qos0), {ok, _} = emqtt:publish(Pub, Topic, <<"msg-6-qos1">>, qos1), @@ -954,21 +950,20 @@ non_clean_sess_reconnect_qos0_and_qos1(Config) -> ClientId = ?FUNCTION_NAME, C1 = connect(ClientId, Config, non_clean_sess_opts()), - {ok, _, [1, 0]} = emqtt:subscribe(C1, [{Topic1, qos1}, {Topic0, qos0}]), - ?assertMatch(#{consumers := 1}, - get_global_counters(Config)), + {ok, _, [1, 0]} = emqtt:subscribe(C1, [{Topic1, qos1}, + {Topic0, qos0}]), + ok = await_consumer_count(1, ClientId, 0, Config), + ok = await_consumer_count(1, ClientId, 1, Config), ok = emqtt:disconnect(C1), - eventually(?_assertMatch(#{consumers := 0}, - get_global_counters(Config))), - + ok = await_consumer_count(0, ClientId, 0, Config), + ok = await_consumer_count(0, ClientId, 1, Config), {ok, _} = emqtt:publish(Pub, Topic0, <<"msg-0">>, qos1), {ok, _} = emqtt:publish(Pub, Topic1, <<"msg-1">>, qos1), C2 = connect(ClientId, Config, non_clean_sess_opts()), - ?assertMatch(#{consumers := 1}, - get_global_counters(Config)), - + ok = await_consumer_count(1, ClientId, 0, Config), + ok = await_consumer_count(1, ClientId, 1, Config), ok = expect_publishes(C2, Topic0, [<<"msg-0">>]), ok = expect_publishes(C2, Topic1, [<<"msg-1">>]), @@ -1884,6 +1879,17 @@ await_confirms_unordered(From, Left) -> ct:fail("~b confirms are missing", [Left]) end. +await_consumer_count(ConsumerCount, ClientId, QoS, Config) -> + Ch = rabbit_ct_client_helpers:open_channel(Config), + QueueName = rabbit_mqtt_util:queue_name_bin( + rabbit_data_coercion:to_binary(ClientId), QoS), + eventually( + ?_assertMatch( + #'queue.declare_ok'{consumer_count = ConsumerCount}, + amqp_channel:call(Ch, #'queue.declare'{queue = QueueName, + passive = true})), 500, 10), + ok = rabbit_ct_client_helpers:close_channel(Ch). + declare_queue(Ch, QueueName, Args) when is_pid(Ch), is_binary(QueueName), is_list(Args) -> #'queue.declare_ok'{} = amqp_channel:call(