Skip to content

Commit

Permalink
Adopt new rabbit_backing_queue:discard implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Cafasso <[email protected]>
(cherry picked from commit facddb3)
  • Loading branch information
noxdafox authored and michaelklishin committed Feb 19, 2025
1 parent d6a19bb commit 4dfa447
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 19 deletions.
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ discard(#delivery{confirm = Confirm,
true -> confirm_messages([MsgId], MTC, QName);
false -> MTC
end,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
BQS1 = BQ:discard(Msg, SenderPid, BQS),
{BQS1, MTC1}.

run_message_queue(ActiveConsumersChanged, State) ->
Expand Down Expand Up @@ -828,7 +828,7 @@ send_reject_publish(#delivery{confirm = true,
amqqueue:get_name(Q), MsgSeqNo),

MTC1 = maps:remove(MsgId, MTC),
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
BQS1 = BQ:discard(Msg, SenderPid, BQS),
State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 };
send_reject_publish(#delivery{confirm = false}, State) ->
State.
Expand Down
22 changes: 6 additions & 16 deletions deps/rabbit/src/rabbit_priority_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -220,22 +220,12 @@ publish_delivered(Msg, MsgProps, ChPid,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, BQS)).

%% TODO this is a hack. The BQ api does not give us enough information
%% here - if we had the Msg we could look at its priority and forward
%% to the appropriate sub-BQ. But we don't so we are stuck.
%%
%% But fortunately VQ ignores discard/4, so we can too, *assuming we
%% are talking to VQ*. discard/4 is used by HA, but that's "above" us
%% (if in use) so we don't break that either, just some hypothetical
%% alternate BQ implementation.
discard(_MsgId, _ChPid, State = #state{}) ->
State;
%% We should have something a bit like this here:
%% pick1(fun (_P, BQSN) ->
%% BQ:discard(MsgId, ChPid, BQSN)
%% end, Msg, State);
discard(MsgId, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(discard(MsgId, ChPid, BQS)).
discard(Msg, ChPid, State = #state{bq = BQ}) ->
pick1(fun (_P, BQSN) ->
BQ:discard(Msg, ChPid, BQSN)
end, Msg, State);
discard(Msg, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(discard(Msg, ChPid, BQS)).

drain_confirmed(State = #state{bq = BQ}) ->
fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State);
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_variable_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ publish_delivered(Msg, MsgProps, ChPid, State) ->
State),
{SeqId, a(maybe_update_rates(State1))}.

discard(_MsgId, _ChPid, State) -> State.
discard(_Msg, _ChPid, State) -> State.

drain_confirmed(State = #vqstate { confirmed = C }) ->
case sets:is_empty(C) of
Expand Down

0 comments on commit 4dfa447

Please sign in to comment.