Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quorum queues v4 #10637

Merged
merged 45 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
6c8c62b
Quorum queues v4 scaffolding.
kjnilsson Feb 28, 2024
3cd3a0c
QQ: add v4 ff and new more compact enqueue command.
kjnilsson Mar 11, 2024
09e271a
More test refactoring and new API fixes
kjnilsson Mar 19, 2024
922459d
First pass SAC consumer priority implementation.
kjnilsson Apr 2, 2024
daade0e
QQ: add new consumer cancel option: 'remove'
kjnilsson May 1, 2024
44e7d09
QQ: capture checked out time for each consumer message.
kjnilsson May 7, 2024
1662d14
QQ: Refactor to use the new ra_machine:handle_aux/5 API
kjnilsson May 8, 2024
021d93b
QQ hi/lo priority queue
kjnilsson May 15, 2024
6a1cf01
QQ: Avoid using mc:size/1 inside rabbit_fifo
kjnilsson Jun 6, 2024
e0b6e4d
QQ bug fix: Maintain order when returning multiple
ansd Jun 25, 2024
ae0560e
Simplify
ansd Jun 26, 2024
a5c7533
Add rabbit_quorum_queue:file_handle* functions back.
kjnilsson Jun 27, 2024
b876548
dialyzer fix
kjnilsson Jun 27, 2024
0e3a20f
dynamic_qq_SUITE: avoid mixed versions failure.
kjnilsson Jun 28, 2024
da09e8d
QQ: track number of requeues for message.
kjnilsson Jul 2, 2024
39a5f3b
Use QQ consumer removal when AMQP client detaches
ansd Jul 3, 2024
b0dde40
Use AMQP address v2 in fsharp-tests
ansd Jul 4, 2024
e608c88
QQ: track number of requeues for message.
kjnilsson Jul 2, 2024
5cf1916
rabbit_fifo: Use Ra checkpoints
the-mikedavis Jan 29, 2024
cb1deea
quorum queues: Use a custom interval for checkpoints
the-mikedavis Jan 29, 2024
24482f5
rabbit_fifo_SUITE: List actual effects in ?ASSERT_EFF failure
the-mikedavis May 1, 2024
bf13322
QQ: Checkpoints modifications
kjnilsson Jul 9, 2024
42c501a
fixes
kjnilsson Jul 11, 2024
e0fadac
QQ: emit release cursors on tick for followers and leaders
kjnilsson Jul 12, 2024
71eec01
Support draining a QQ SAC waiting consumer
ansd Jul 11, 2024
5e30ad2
Extract applying #credit{} cmd into 2 functions
ansd Jul 12, 2024
954e37b
Fix default priority level
ansd Jul 12, 2024
8242b82
bazel run gazelle
ansd Jul 12, 2024
bc0d914
Avoid deprecated time unit
ansd Jul 12, 2024
4213c33
Fix aux_test
ansd Jul 12, 2024
903fe74
Delete dead code
ansd Jul 15, 2024
abf902e
Fix rabbit_fifo_q:get_lowest_index/1
ansd Jul 15, 2024
24d0d9d
Delete unused normalize functions
ansd Jul 15, 2024
d3828fe
Generate less garbage
ansd Jul 15, 2024
8ee4d74
Add integration test for QQ SAC with consumer priority
ansd Jul 16, 2024
26c6332
Improve readability
ansd Jul 16, 2024
f3de471
Change modified outcome behaviour
ansd Jul 17, 2024
3361036
Introduce single feature flag rabbitmq_4.0.0
ansd Jul 17, 2024
990c10d
QQ: expose priority metrics in UI
kjnilsson Jul 18, 2024
5a9d2e6
Enable skipped test after rebasing onto main
ansd Jul 31, 2024
83a78f1
QQ: add new command "modify" to better handle AMQP modified outcomes.
kjnilsson Jul 23, 2024
374980c
Type tweaks and naming
kjnilsson Aug 7, 2024
be05f9a
Add test for modified outcome with classic queue
ansd Aug 7, 2024
72d855d
Add test routing on message-annotations in modified outcome
ansd Aug 7, 2024
84f262a
Skip tests in mixed version tests
ansd Aug 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ header(first_acquirer = K,
header(delivery_count = K,
#amqp10_msg{header = #'v1_0.header'{delivery_count = D}}) ->
header_value(K, D);
header(K, #amqp10_msg{header = undefined}) -> header_value(K, undefined).
header(K, #amqp10_msg{header = undefined}) ->
header_value(K, undefined).

-spec delivery_annotations(amqp10_msg()) -> #{annotations_key() => any()}.
delivery_annotations(#amqp10_msg{delivery_annotations = undefined}) ->
Expand Down
13 changes: 13 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,9 @@ rabbitmq_suite(
rabbitmq_suite(
name = "rabbit_fifo_int_SUITE",
size = "medium",
additional_beam = [
":test_test_util_beam",
],
deps = [
"//deps/rabbit_common:erlang_app",
"@aten//:erlang_app",
Expand All @@ -722,6 +725,7 @@ rabbitmq_suite(
],
deps = [
"//deps/rabbit_common:erlang_app",
"@meck//:erlang_app",
"@proper//:erlang_app",
"@ra//:erlang_app",
],
Expand All @@ -735,6 +739,15 @@ rabbitmq_suite(
],
)

rabbitmq_suite(
name = "rabbit_fifo_q_SUITE",
size = "small",
deps = [
"//deps/rabbit_common:erlang_app",
"@proper//:erlang_app",
],
)

rabbitmq_integration_suite(
name = "rabbit_fifo_dlx_integration_SUITE",
size = "medium",
Expand Down
20 changes: 19 additions & 1 deletion deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,10 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_fifo_dlx_sup.erl",
"src/rabbit_fifo_dlx_worker.erl",
"src/rabbit_fifo_index.erl",
"src/rabbit_fifo_q.erl",
"src/rabbit_fifo_v0.erl",
"src/rabbit_fifo_v1.erl",
"src/rabbit_fifo_v3.erl",
"src/rabbit_file.erl",
"src/rabbit_global_counters.erl",
"src/rabbit_guid.erl",
Expand Down Expand Up @@ -399,8 +401,10 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_fifo_dlx_sup.erl",
"src/rabbit_fifo_dlx_worker.erl",
"src/rabbit_fifo_index.erl",
"src/rabbit_fifo_q.erl",
"src/rabbit_fifo_v0.erl",
"src/rabbit_fifo_v1.erl",
"src/rabbit_fifo_v3.erl",
"src/rabbit_file.erl",
"src/rabbit_global_counters.erl",
"src/rabbit_guid.erl",
Expand Down Expand Up @@ -541,6 +545,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_fifo_dlx.hrl",
"src/rabbit_fifo_v0.hrl",
"src/rabbit_fifo_v1.hrl",
"src/rabbit_fifo_v3.hrl",
"src/rabbit_stream_coordinator.hrl",
"src/rabbit_stream_sac_coordinator.hrl",
],
Expand Down Expand Up @@ -672,8 +677,10 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_fifo_dlx_sup.erl",
"src/rabbit_fifo_dlx_worker.erl",
"src/rabbit_fifo_index.erl",
"src/rabbit_fifo_q.erl",
"src/rabbit_fifo_v0.erl",
"src/rabbit_fifo_v1.erl",
"src/rabbit_fifo_v3.erl",
"src/rabbit_file.erl",
"src/rabbit_global_counters.erl",
"src/rabbit_guid.erl",
Expand Down Expand Up @@ -1288,7 +1295,8 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
testonly = True,
srcs = ["test/rabbit_fifo_SUITE.erl"],
outs = ["test/rabbit_fifo_SUITE.beam"],
hdrs = ["src/rabbit_fifo.hrl"],
hdrs = ["src/rabbit_fifo.hrl",
"src/rabbit_fifo_dlx.hrl"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app"],
Expand Down Expand Up @@ -2142,3 +2150,13 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "rabbit_fifo_q_SUITE_beam_files",
testonly = True,
srcs = ["test/rabbit_fifo_q_SUITE.erl"],
outs = ["test/rabbit_fifo_q_SUITE.beam"],
hdrs = ["src/rabbit_fifo.hrl"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["@proper//:erlang_app"],
)
4 changes: 2 additions & 2 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ record_death(Reason, SourceQueue,
routing_keys = RKeys,
count = 1,
anns = DeathAnns},
ReasonBin = atom_to_binary(Reason),
Anns = case Anns0 of
#{deaths := Deaths0} ->
Deaths = case Deaths0 of
Expand All @@ -406,7 +407,7 @@ record_death(Reason, SourceQueue,
[{Key, NewDeath} | Deaths0]
end
end,
Anns0#{<<"x-last-death-reason">> := atom_to_binary(Reason),
Anns0#{<<"x-last-death-reason">> := ReasonBin,
<<"x-last-death-queue">> := SourceQueue,
<<"x-last-death-exchange">> := Exchange,
deaths := Deaths};
Expand All @@ -419,7 +420,6 @@ record_death(Reason, SourceQueue,
_ ->
[{Key, NewDeath}]
end,
ReasonBin = atom_to_binary(Reason),
Anns0#{<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange,
Expand Down
40 changes: 17 additions & 23 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,7 @@ get_property(priority, Msg) ->
-spec protocol_state(state(), mc:annotations()) -> iolist().
protocol_state(Msg0 = #msg_body_decoded{header = Header0,
message_annotations = MA0}, Anns) ->
FirstAcquirer = first_acquirer(Anns),
Header = case Header0 of
undefined ->
#'v1_0.header'{durable = true,
first_acquirer = FirstAcquirer};
#'v1_0.header'{} ->
Header0#'v1_0.header'{first_acquirer = FirstAcquirer}
end,
Header = update_header_from_anns(Header0, Anns),
MA = protocol_state_message_annotations(MA0, Anns),
Msg = Msg0#msg_body_decoded{header = Header,
message_annotations = MA},
Expand All @@ -238,14 +231,7 @@ protocol_state(Msg0 = #msg_body_decoded{header = Header0,
protocol_state(#msg_body_encoded{header = Header0,
message_annotations = MA0,
bare_and_footer = BareAndFooter}, Anns) ->
FirstAcquirer = first_acquirer(Anns),
Header = case Header0 of
undefined ->
#'v1_0.header'{durable = true,
first_acquirer = FirstAcquirer};
#'v1_0.header'{} ->
Header0#'v1_0.header'{first_acquirer = FirstAcquirer}
end,
Header = update_header_from_anns(Header0, Anns),
MA = protocol_state_message_annotations(MA0, Anns),
Sections = to_sections(Header, MA, []),
[encode(Sections), BareAndFooter];
Expand All @@ -269,10 +255,9 @@ protocol_state(#v1{message_annotations = MA0,
_ ->
undefined
end,
Header = #'v1_0.header'{durable = Durable,
priority = Priority,
ttl = Ttl,
first_acquirer = first_acquirer(Anns)},
Header = update_header_from_anns(#'v1_0.header'{durable = Durable,
priority = Priority,
ttl = Ttl}, Anns),
MA = protocol_state_message_annotations(MA0, Anns),
Sections = to_sections(Header, MA, []),
[encode(Sections), BareAndFooter].
Expand Down Expand Up @@ -573,13 +558,22 @@ msg_body_encoded([{{pos, Pos}, {body, Code}}], BarePos, Msg)
binary_part_bare_and_footer(Payload, Start) ->
binary_part(Payload, Start, byte_size(Payload) - Start).

-spec first_acquirer(mc:annotations()) -> boolean().
first_acquirer(Anns) ->
update_header_from_anns(undefined, Anns) ->
update_header_from_anns(#'v1_0.header'{durable = true}, Anns);
update_header_from_anns(Header, Anns) ->
DeliveryCount = case Anns of
#{delivery_count := C} -> C;
_ -> 0
end,
Redelivered = case Anns of
#{redelivered := R} -> R;
_ -> false
end,
not Redelivered.
FirstAcq = not Redelivered andalso
DeliveryCount =:= 0 andalso
not is_map_key(deaths, Anns),
Header#'v1_0.header'{first_acquirer = FirstAcq,
delivery_count = {uint, DeliveryCount}}.

encode_deaths(Deaths) ->
lists:map(
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ convert_from(mc_amqp, Sections, Env) ->
{Headers2, CorrId091} = message_id(CorrId, <<"x-correlation-id">>, Headers1),

Headers = case Env of
#{message_containers_store_amqp_v1 := false} ->
#{'rabbitmq_4.0.0' := false} ->
Headers3 = case AProp of
undefined ->
Headers2;
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbit/src/mc_compat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ get_annotation(?ANN_ROUTING_KEYS, #basic_message{routing_keys = RKeys}) ->
get_annotation(?ANN_EXCHANGE, #basic_message{exchange_name = Ex}) ->
Ex#resource.name;
get_annotation(id, #basic_message{id = Id}) ->
Id.
Id;
get_annotation(_Key, #basic_message{}) ->
undefined.

set_annotation(id, Value, #basic_message{} = Msg) ->
Msg#basic_message{id = Value};
Expand Down
64 changes: 40 additions & 24 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@
send_settled :: boolean(),
max_message_size :: unlimited | pos_integer(),

%% When feature flag credit_api_v2 becomes required,
%% When feature flag rabbitmq_4.0.0 becomes required,
%% the following 2 fields should be deleted.
credit_api_version :: 1 | 2,
%% When credit API v1 is used, our session process holds the delivery-count
Expand Down Expand Up @@ -225,7 +225,7 @@
frames :: [transfer_frame_body(), ...],
queue_ack_required :: boolean(),
%% Queue that sent us this message.
%% When feature flag credit_api_v2 becomes required, this field should be deleted.
%% When feature flag rabbitmq_4.0.0 becomes required, this field should be deleted.
queue_pid :: pid() | credit_api_v2,
delivery_id :: delivery_number(),
outgoing_unsettled :: #outgoing_unsettled{}
Expand Down Expand Up @@ -1068,17 +1068,17 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
QType = amqqueue:get_type(Q),
%% Whether credit API v1 or v2 is used is decided only here at link attachment time.
%% This decision applies to the whole life time of the link.
%% This means even when feature flag credit_api_v2 will be enabled later, this consumer will
%% This means even when feature flag rabbitmq_4.0.0 will be enabled later, this consumer will
%% continue to use credit API v1. This is the safest and easiest solution avoiding
%% transferring link flow control state (the delivery-count) at runtime from this session
%% process to the queue process.
%% Eventually, after feature flag credit_api_v2 gets enabled and a subsequent rolling upgrade,
%% Eventually, after feature flag rabbitmq_4.0.0 gets enabled and a subsequent rolling upgrade,
%% all consumers will use credit API v2.
%% Streams always use credit API v2 since the stream client (rabbit_stream_queue) holds the link
%% flow control state. Hence, credit API mixed version isn't an issue for streams.
{CreditApiVsn, Mode, DeliveryCount, ClientFlowCtl,
QueueFlowCtl, CreditReqInFlight, StashedCreditReq} =
case rabbit_feature_flags:is_enabled(credit_api_v2) orelse
case rabbit_feature_flags:is_enabled('rabbitmq_4.0.0') orelse
QType =:= rabbit_stream_queue of
true ->
{2,
Expand Down Expand Up @@ -1861,20 +1861,30 @@ settle_op_from_outcome(#'v1_0.rejected'{}) ->
discard;
settle_op_from_outcome(#'v1_0.released'{}) ->
requeue;
%% Keep the same Modified behaviour as in RabbitMQ 3.x
settle_op_from_outcome(#'v1_0.modified'{delivery_failed = true,
undeliverable_here = UndelHere})
when UndelHere =/= true ->
requeue;
settle_op_from_outcome(#'v1_0.modified'{}) ->
%% If delivery_failed is not true, we can't increment its delivery_count.
%% So, we will have to reject without requeue.
%%
%% If undeliverable_here is true, this is not quite correct because
%% undeliverable_here refers to the link, and not the message in general.
%% However, we cannot filter messages from being assigned to individual consumers.
%% That's why we will have to reject it without requeue.
discard;

%% Not all queue types support the modified outcome fields correctly.
%% However, we still allow the client to settle with the modified outcome
%% because some client libraries such as Apache QPid make use of it:
%% https://github.com/apache/qpid-jms/blob/90eb60f59cb59b7b9ad8363ee8a843d6903b8e77/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java#L464
%% In such cases, it's better when RabbitMQ does not end the session.
%% See https://github.com/rabbitmq/rabbitmq-server/issues/6121
settle_op_from_outcome(#'v1_0.modified'{delivery_failed = DelFailed,
undeliverable_here = UndelHere,
message_annotations = Anns0
}) ->
Anns = case Anns0 of
#'v1_0.message_annotations'{content = C} ->
C;
_ ->
[]
end,
{modify,
default(DelFailed, false),
default(UndelHere, false),
%% TODO: this must exist elsewhere
lists:foldl(fun ({{symbol, K}, V}, Acc) ->
Acc#{K => unwrap(V)}
end, #{}, Anns)};
settle_op_from_outcome(Outcome) ->
protocol_error(
?V_1_0_AMQP_ERROR_INVALID_FIELD,
Expand Down Expand Up @@ -1981,7 +1991,7 @@ handle_queue_actions(Actions, State) ->
S0 = #state{outgoing_links = OutgoingLinks0,
outgoing_pending = Pending}) ->
%% credit API v1
%% Delete this branch when feature flag credit_api_v2 becomes required.
%% Delete this branch when feature flag rabbitmq_4.0.0 becomes required.
Handle = ctag_to_handle(Ctag),
Link = #outgoing_link{delivery_count = Count0} = maps:get(Handle, OutgoingLinks0),
{Count, Credit, S} = case Drain of
Expand Down Expand Up @@ -2788,7 +2798,7 @@ delivery_count_rcv(undefined) ->
%% credits to a queue has to synchronously wait for a credit reply from the queue:
%% https://github.com/rabbitmq/rabbitmq-server/blob/b9566f4d02f7ceddd2f267a92d46affd30fb16c8/deps/rabbitmq_codegen/credit_extension.json#L43
%% This blocks our entire AMQP 1.0 session process. Since the credit reply from the
%% queue did not contain the consumr tag prior to feature flag credit_api_v2, we
%% queue did not contain the consumr tag prior to feature flag rabbitmq_4.0.0, we
%% must behave here the same way as non-native AMQP 1.0: We wait until the queue
%% sends us a credit reply sucht that we can correlate that reply with our consumer tag.
process_credit_reply_sync(
Expand Down Expand Up @@ -2853,7 +2863,7 @@ process_credit_reply_sync_quorum_queue(Ctag, QName, Credit, State0) ->
no_return().
credit_reply_timeout(QType, QName) ->
Fmt = "Timed out waiting for credit reply from ~s ~s. "
"Hint: Enable feature flag credit_api_v2",
"Hint: Enable feature flag rabbitmq_4.0.0",
Args = [QType, rabbit_misc:rs(QName)],
rabbit_log:error(Fmt, Args),
protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Fmt, Args).
Expand Down Expand Up @@ -3441,12 +3451,13 @@ cap_credit(DesiredCredit) ->
min(DesiredCredit, MaxCredit).

ensure_mc_cluster_compat(Mc) ->
IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1),
Feature = 'rabbitmq_4.0.0',
IsEnabled = rabbit_feature_flags:is_enabled(Feature),
case IsEnabled of
true ->
Mc;
false ->
McEnv = #{message_containers_store_amqp_v1 => IsEnabled},
McEnv = #{Feature => IsEnabled},
%% other nodes in the cluster may not understand the new internal
%% amqp mc format - in this case we convert to AMQP legacy format
%% for compatibility
Expand Down Expand Up @@ -3497,3 +3508,8 @@ format_status(
permission_cache => PermissionCache,
topic_permission_cache => TopicPermissionCache},
maps:update(state, State, Status).

unwrap({_Tag, V}) ->
V;
unwrap(V) ->
V.
6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_amqp_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ send_command_sync(Writer, ChannelNum, Performative) ->
Request = {send_command, ChannelNum, Performative},
gen_server:call(Writer, Request, ?CALL_TIMEOUT).

%% Delete this function when feature flag credit_api_v2 becomes required.
%% Delete this function when feature flag rabbitmq_4.0.0 becomes required.
-spec send_command_and_notify(pid(),
pid(),
rabbit_types:channel_number(),
Expand Down Expand Up @@ -111,7 +111,7 @@ handle_cast({send_command, SessionPid, ChannelNum, Performative, Payload}, State
State1 = internal_send_command_async(ChannelNum, Performative, Payload, State0),
State = credit_flow_ack(SessionPid, State1),
no_reply(State);
%% Delete below function clause when feature flag credit_api_v2 becomes required.
%% Delete below function clause when feature flag rabbitmq_4.0.0 becomes required.
handle_cast({send_command_and_notify, QueuePid, SessionPid, ChannelNum, Performative, Payload}, State0) ->
State1 = internal_send_command_async(ChannelNum, Performative, Payload, State0),
State = credit_flow_ack(SessionPid, State1),
Expand All @@ -131,7 +131,7 @@ handle_info({{'DOWN', session}, _MRef, process, SessionPid, _Reason},
credit_flow:peer_down(SessionPid),
State = State0#state{monitored_sessions = maps:remove(SessionPid, Sessions)},
no_reply(State);
%% Delete below function clause when feature flag credit_api_v2 becomes required.
%% Delete below function clause when feature flag rabbitmq_4.0.0 becomes required.
handle_info({'DOWN', _MRef, process, QueuePid, _Reason}, State) ->
rabbit_amqqueue:notify_sent_queue_down(QueuePid),
no_reply(State).
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1516,7 +1516,7 @@ handle_cast({credit, SessionPid, CTag, Credit, Drain},
backing_queue = BQ,
backing_queue_state = BQS0} = State) ->
%% Credit API v1.
%% Delete this function clause when feature flag credit_api_v2 becomes required.
%% Delete this function clause when feature flag rabbitmq_4.0.0 becomes required.
%% Behave like non-native AMQP 1.0: Send send_credit_reply before deliveries.
rabbit_classic_queue:send_credit_reply_credit_api_v1(
SessionPid, amqqueue:get_name(Q), BQ:len(BQS0)),
Expand Down
Loading
Loading