diff --git a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl index 4338af7e0091..a0283dea2044 100644 --- a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl +++ b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl @@ -30,6 +30,7 @@ -define(HEADER_X_STREAM_FILTER, "x-stream-filter"). -define(HEADER_X_STREAM_MATCH_UNFILTERED, "x-stream-match-unfiltered"). -define(HEADER_PRIORITY, "priority"). +-define(HEADER_X_PRIORITY, "x-priority"). -define(HEADER_RECEIPT, "receipt"). -define(HEADER_REDELIVERED, "redelivered"). -define(HEADER_REPLY_TO, "reply-to"). diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl index 7eab1bdcc6f8..50a1b68fabf8 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl @@ -718,7 +718,8 @@ do_subscribe(Destination, DestHdr, Frame, subscribe_arguments(Frame) -> subscribe_arguments([?HEADER_X_STREAM_OFFSET, ?HEADER_X_STREAM_FILTER, - ?HEADER_X_STREAM_MATCH_UNFILTERED], Frame, []). + ?HEADER_X_STREAM_MATCH_UNFILTERED, + ?HEADER_X_PRIORITY], Frame, []). subscribe_arguments([], _Frame , Acc) -> Acc; @@ -749,6 +750,14 @@ subscribe_argument(?HEADER_X_STREAM_MATCH_UNFILTERED, Frame, Acc) -> [{list_to_binary(?HEADER_X_STREAM_MATCH_UNFILTERED), bool, MU}] ++ Acc; not_found -> Acc + end; +subscribe_argument(?HEADER_X_PRIORITY, Frame, Acc) -> + Priority = rabbit_stomp_frame:integer_header(Frame, ?HEADER_X_PRIORITY), + case Priority of + {ok, P} -> + [{list_to_binary(?HEADER_X_PRIORITY), byte, P}] ++ Acc; + not_found -> + Acc end. check_subscription_access(Destination = {topic, _Topic}, diff --git a/deps/rabbitmq_stomp/test/system_SUITE.erl b/deps/rabbitmq_stomp/test/system_SUITE.erl index caf6de6ddc93..c583f2102d1b 100644 --- a/deps/rabbitmq_stomp/test/system_SUITE.erl +++ b/deps/rabbitmq_stomp/test/system_SUITE.erl @@ -17,7 +17,9 @@ -include("rabbit_stomp_headers.hrl"). -define(QUEUE, <<"TestQueue">>). +-define(QUEUE_QQ, <<"TestQueueQQ">>). -define(DESTINATION, "/amq/queue/TestQueue"). +-define(DESTINATION_QQ, "/amq/queue/TestQueueQQ"). all() -> [{group, version_to_group_name(V)} || V <- ?SUPPORTED_VERSIONS]. @@ -28,6 +30,7 @@ groups() -> publish_unauthorized_error, subscribe_error, subscribe, + subscribe_with_x_priority, unsubscribe_ack, subscribe_ack, send, @@ -161,6 +164,44 @@ subscribe(Config) -> {ok, _Client2, _, [<<"hello">>]} = stomp_receive(Client1, "MESSAGE"), ok. +subscribe_with_x_priority(Config) -> + Version = ?config(version, Config), + StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp), + Channel = ?config(amqp_channel, Config), + ClientA = ?config(stomp_client, Config), + #'queue.declare_ok'{} = + amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE_QQ, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-single-active-consumer">>, bool, true} + ]}), + + %% subscribe and wait for receipt + rabbit_stomp_client:send( + ClientA, "SUBSCRIBE", [{"destination", ?DESTINATION_QQ}, {"receipt", "foo"}]), + {ok, _ClientA1, _, _} = stomp_receive(ClientA, "RECEIPT"), + + %% subscribe with a higher priority and wait for receipt + {ok, ClientB} = rabbit_stomp_client:connect(Version, StompPort), + rabbit_stomp_client:send( + ClientB, "SUBSCRIBE", [{"destination", ?DESTINATION_QQ}, + {"receipt", "foo"}, + {"x-priority", 10} + ]), + {ok, ClientB1, _, _} = stomp_receive(ClientB, "RECEIPT"), + + %% send from amqp + Method = #'basic.publish'{exchange = <<"">>, routing_key = ?QUEUE_QQ}, + + amqp_channel:call(Channel, Method, #amqp_msg{props = #'P_basic'{}, + payload = <<"hello">>}), + + %% ClientB should receive the message since it has a higher priority + {ok, _ClientB2, _, [<<"hello">>]} = stomp_receive(ClientB1, "MESSAGE"), + #'queue.delete_ok'{} = + amqp_channel:call(Channel, #'queue.delete'{queue = ?QUEUE_QQ}), + ok. + unsubscribe_ack(Config) -> Channel = ?config(amqp_channel, Config), Client = ?config(stomp_client, Config),