diff --git a/MODULE.bazel b/MODULE.bazel index e3a8e36ce70c..ef2196b07514 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -247,7 +247,7 @@ erlang_package.hex_package( erlang_package.git_package( repository = "rabbitmq/osiris", - tag = "v1.4.3", + tag = "v1.5.0", ) erlang_package.hex_package( diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index ea6704b6e463..71a746580d8e 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -148,7 +148,7 @@ TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck prop PLT_APPS += mnesia dep_syslog = git https://github.com/schlagert/syslog 4.0.0 -dep_osiris = git https://github.com/rabbitmq/osiris v1.4.3 +dep_osiris = git https://github.com/rabbitmq/osiris v1.5.0 dep_systemd = hex 0.6.1 dep_seshat = hex 0.4.0 diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 1dd9ff1ee3e0..02ee2de9efd7 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -365,13 +365,11 @@ credit(QName, CTag, Credit, Drain, #stream_client{readers = Readers0, {State#stream_client{readers = Readers}, [{send_credit_reply, length(Msgs)}, {deliver, CTag, true, Msgs}] ++ Actions}. -deliver(QSs, #delivery{confirm = Confirm} = Delivery) -> +deliver(QSs, #delivery{message = Msg, confirm = Confirm} = Delivery) -> lists:foldl( - fun({_Q, stateless}, {Qs, Actions}) -> - %% TODO what do we do with stateless? - %% QRef = amqqueue:get_pid(Q), - %% ok = rabbit_fifo_client:untracked_enqueue( - %% [QRef], Delivery#delivery.message), + fun({Q, stateless}, {Qs, Actions}) -> + LeaderPid = amqqueue:get_pid(Q), + ok = osiris:write(LeaderPid, msg_to_iodata(Msg)), {Qs, Actions}; ({Q, S0}, {Qs, Actions}) -> {S, As} = deliver(Confirm, Delivery, S0), diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 342cb61d5fd0..7591262091b7 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -124,7 +124,8 @@ all_tests() -> update_retention_policy, queue_info, tracking_status, - restart_stream + restart_stream, + dead_letter_target ]. %% ------------------------------------------------------------------- @@ -2329,6 +2330,45 @@ purge(Config) -> amqp_channel:call(Ch, #'queue.purge'{queue = Q})), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). +dead_letter_target(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + SourceQ = <>, + ?assertEqual({'queue.declare_ok', SourceQ, 0, 0}, + declare(Ch, SourceQ, [{<<"x-queue-type">>, longstr, <<"classic">>}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, Q} + ])), + + publish_confirm(Ch, SourceQ, [<<"msg">>]), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 1, false), + CTag = <<"ctag">>, + amqp_channel:subscribe(Ch1, + #'basic.consume'{queue = SourceQ, + no_ack = false, + consumer_tag = CTag}, + self()), + receive + #'basic.consume_ok'{consumer_tag = CTag} -> + ok + after 5000 -> + exit(basic_consume_ok_timeout) + end, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + ok = amqp_channel:cast(Ch1, #'basic.nack'{delivery_tag = DeliveryTag, + requeue =false, + multiple = false}), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]) + after 5000 -> + exit(timeout) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). %%---------------------------------------------------------------------------- delete_queues(Qs) when is_list(Qs) ->