Skip to content

Commit

Permalink
Merge pull request #11956 from rabbitmq/mergify/bp/v4.0.x/pr-11947
Browse files Browse the repository at this point in the history
STOMP: add support for consumer priorities (backport #11947)
  • Loading branch information
michaelklishin authored Aug 8, 2024
2 parents cf719bc + 2f51518 commit 1715e00
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 1 deletion.
1 change: 1 addition & 0 deletions deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
-define(HEADER_X_STREAM_FILTER, "x-stream-filter").
-define(HEADER_X_STREAM_MATCH_UNFILTERED, "x-stream-match-unfiltered").
-define(HEADER_PRIORITY, "priority").
-define(HEADER_X_PRIORITY, "x-priority").
-define(HEADER_RECEIPT, "receipt").
-define(HEADER_REDELIVERED, "redelivered").
-define(HEADER_REPLY_TO, "reply-to").
Expand Down
11 changes: 10 additions & 1 deletion deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,8 @@ do_subscribe(Destination, DestHdr, Frame,
subscribe_arguments(Frame) ->
subscribe_arguments([?HEADER_X_STREAM_OFFSET,
?HEADER_X_STREAM_FILTER,
?HEADER_X_STREAM_MATCH_UNFILTERED], Frame, []).
?HEADER_X_STREAM_MATCH_UNFILTERED,
?HEADER_X_PRIORITY], Frame, []).

subscribe_arguments([], _Frame , Acc) ->
Acc;
Expand Down Expand Up @@ -749,6 +750,14 @@ subscribe_argument(?HEADER_X_STREAM_MATCH_UNFILTERED, Frame, Acc) ->
[{list_to_binary(?HEADER_X_STREAM_MATCH_UNFILTERED), bool, MU}] ++ Acc;
not_found ->
Acc
end;
subscribe_argument(?HEADER_X_PRIORITY, Frame, Acc) ->
Priority = rabbit_stomp_frame:integer_header(Frame, ?HEADER_X_PRIORITY),
case Priority of
{ok, P} ->
[{list_to_binary(?HEADER_X_PRIORITY), byte, P}] ++ Acc;
not_found ->
Acc
end.

check_subscription_access(Destination = {topic, _Topic},
Expand Down
41 changes: 41 additions & 0 deletions deps/rabbitmq_stomp/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
-include("rabbit_stomp_headers.hrl").

-define(QUEUE, <<"TestQueue">>).
-define(QUEUE_QQ, <<"TestQueueQQ">>).
-define(DESTINATION, "/amq/queue/TestQueue").
-define(DESTINATION_QQ, "/amq/queue/TestQueueQQ").

all() ->
[{group, version_to_group_name(V)} || V <- ?SUPPORTED_VERSIONS].
Expand All @@ -28,6 +30,7 @@ groups() ->
publish_unauthorized_error,
subscribe_error,
subscribe,
subscribe_with_x_priority,
unsubscribe_ack,
subscribe_ack,
send,
Expand Down Expand Up @@ -161,6 +164,44 @@ subscribe(Config) ->
{ok, _Client2, _, [<<"hello">>]} = stomp_receive(Client1, "MESSAGE"),
ok.

subscribe_with_x_priority(Config) ->
Version = ?config(version, Config),
StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
Channel = ?config(amqp_channel, Config),
ClientA = ?config(stomp_client, Config),
#'queue.declare_ok'{} =
amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE_QQ,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-single-active-consumer">>, bool, true}
]}),

%% subscribe and wait for receipt
rabbit_stomp_client:send(
ClientA, "SUBSCRIBE", [{"destination", ?DESTINATION_QQ}, {"receipt", "foo"}]),
{ok, _ClientA1, _, _} = stomp_receive(ClientA, "RECEIPT"),

%% subscribe with a higher priority and wait for receipt
{ok, ClientB} = rabbit_stomp_client:connect(Version, StompPort),
rabbit_stomp_client:send(
ClientB, "SUBSCRIBE", [{"destination", ?DESTINATION_QQ},
{"receipt", "foo"},
{"x-priority", 10}
]),
{ok, ClientB1, _, _} = stomp_receive(ClientB, "RECEIPT"),

%% send from amqp
Method = #'basic.publish'{exchange = <<"">>, routing_key = ?QUEUE_QQ},

amqp_channel:call(Channel, Method, #amqp_msg{props = #'P_basic'{},
payload = <<"hello">>}),

%% ClientB should receive the message since it has a higher priority
{ok, _ClientB2, _, [<<"hello">>]} = stomp_receive(ClientB1, "MESSAGE"),
#'queue.delete_ok'{} =
amqp_channel:call(Channel, #'queue.delete'{queue = ?QUEUE_QQ}),
ok.

unsubscribe_ack(Config) ->
Channel = ?config(amqp_channel, Config),
Client = ?config(stomp_client, Config),
Expand Down

0 comments on commit 1715e00

Please sign in to comment.