Skip to content

Commit

Permalink
Merge pull request #7846 from rabbitmq/stream-at-most-once-dead-lette…
Browse files Browse the repository at this point in the history
…ring

Streams: make at-most-once dead lettering work
  • Loading branch information
michaelklishin authored Apr 5, 2023
2 parents 70af1c4 + e7d7f6f commit ac89309
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 9 deletions.
2 changes: 1 addition & 1 deletion MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ erlang_package.hex_package(

erlang_package.git_package(
repository = "rabbitmq/osiris",
tag = "v1.4.3",
tag = "v1.5.0",
)

erlang_package.hex_package(
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck prop
PLT_APPS += mnesia

dep_syslog = git https://github.com/schlagert/syslog 4.0.0
dep_osiris = git https://github.com/rabbitmq/osiris v1.4.3
dep_osiris = git https://github.com/rabbitmq/osiris v1.5.0
dep_systemd = hex 0.6.1
dep_seshat = hex 0.4.0

Expand Down
10 changes: 4 additions & 6 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -365,13 +365,11 @@ credit(QName, CTag, Credit, Drain, #stream_client{readers = Readers0,
{State#stream_client{readers = Readers}, [{send_credit_reply, length(Msgs)},
{deliver, CTag, true, Msgs}] ++ Actions}.

deliver(QSs, #delivery{confirm = Confirm} = Delivery) ->
deliver(QSs, #delivery{message = Msg, confirm = Confirm} = Delivery) ->
lists:foldl(
fun({_Q, stateless}, {Qs, Actions}) ->
%% TODO what do we do with stateless?
%% QRef = amqqueue:get_pid(Q),
%% ok = rabbit_fifo_client:untracked_enqueue(
%% [QRef], Delivery#delivery.message),
fun({Q, stateless}, {Qs, Actions}) ->
LeaderPid = amqqueue:get_pid(Q),
ok = osiris:write(LeaderPid, msg_to_iodata(Msg)),
{Qs, Actions};
({Q, S0}, {Qs, Actions}) ->
{S, As} = deliver(Confirm, Delivery, S0),
Expand Down
42 changes: 41 additions & 1 deletion deps/rabbit/test/rabbit_stream_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ all_tests() ->
update_retention_policy,
queue_info,
tracking_status,
restart_stream
restart_stream,
dead_letter_target
].

%% -------------------------------------------------------------------
Expand Down Expand Up @@ -2329,6 +2330,45 @@ purge(Config) ->
amqp_channel:call(Ch, #'queue.purge'{queue = Q})),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).

dead_letter_target(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),

SourceQ = <<Q/binary, "_source">>,
?assertEqual({'queue.declare_ok', SourceQ, 0, 0},
declare(Ch, SourceQ, [{<<"x-queue-type">>, longstr, <<"classic">>},
{<<"x-dead-letter-exchange">>, longstr, <<>>},
{<<"x-dead-letter-routing-key">>, longstr, Q}
])),

publish_confirm(Ch, SourceQ, [<<"msg">>]),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 1, false),
CTag = <<"ctag">>,
amqp_channel:subscribe(Ch1,
#'basic.consume'{queue = SourceQ,
no_ack = false,
consumer_tag = CTag},
self()),
receive
#'basic.consume_ok'{consumer_tag = CTag} ->
ok
after 5000 ->
exit(basic_consume_ok_timeout)
end,
receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
ok = amqp_channel:cast(Ch1, #'basic.nack'{delivery_tag = DeliveryTag,
requeue =false,
multiple = false}),
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]])
after 5000 ->
exit(timeout)
end,
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
%%----------------------------------------------------------------------------

delete_queues(Qs) when is_list(Qs) ->
Expand Down

0 comments on commit ac89309

Please sign in to comment.