Skip to content

Commit

Permalink
Bump outgoing delivery-count in handle_deliver
Browse files Browse the repository at this point in the history
What?
For credit API v1, increase the outgoing delivery-count as soon as the
message is scheduled for delivery, that is before the message is queued
in the session's outgoing_pending queue.

Why?
1. More correct for credit API v1 in case a FLOW is received
   for an outgoing link topping up credit while an outgoing transfer on
   the same link is queued in outgoing_pending. For the server's credit
   calculation to be correct, it doesn't matter whether the outgoing
   in-flight message travels through the network, is queued in TCP
   buffers, processed by the writer, or just queued in the session's
   outgoing_pending queue.
2. Higher performance as no map update is performed for credit API v2
   in send_pending()
3. Simplifies code
  • Loading branch information
ansd committed Feb 20, 2024
1 parent 2704c69 commit f3f5a8e
Showing 1 changed file with 27 additions and 27 deletions.
54 changes: 27 additions & 27 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1127,9 +1127,8 @@ send_pending(#state{remote_incoming_window = Space,
{{value, #pending_transfer{
frames = Frames,
queue_pid = QPid,
outgoing_unsettled = #outgoing_unsettled{
consumer_tag = Ctag,
queue_name = QName}} = Pending}, Buf1}
outgoing_unsettled = #outgoing_unsettled{queue_name = QName}
} = Pending}, Buf1}
when Space > 0 ->
SendFun = case rabbit_queue_type:module(QName, State0#state.queue_states) of
{ok, rabbit_classic_queue} ->
Expand All @@ -1143,28 +1142,20 @@ send_pending(#state{remote_incoming_window = Space,
WriterPid, Ch, Transfer, Sections)
end
end,
%% rabbit_basic:maybe_gc_large_msg(Content, GCThreshold)
{NumTransfersSent, Buf, State1} =
case send_frames(SendFun, Frames, Space) of
{all, SpaceLeft} ->
State1 = #state{outgoing_links = OutgoingLinks0} = session_flow_control_sent_transfers(
Space - SpaceLeft, State0),
HandleInt = ctag_to_handle(Ctag),
OutgoingLinks = maps:update_with(
HandleInt,
fun(#outgoing_link{delivery_count = {credit_api_v1, C}} = Link) ->
Link#outgoing_link{delivery_count = {credit_api_v1, add(C, 1)}};
(#outgoing_link{delivery_count = credit_api_v2} = Link) ->
Link
end,
OutgoingLinks0),
State2 = State1#state{outgoing_links = OutgoingLinks},
State = record_outgoing_unsettled(Pending, State2),
send_pending(State#state{outgoing_pending = Buf1});
{Space - SpaceLeft,
Buf1,
record_outgoing_unsettled(Pending, State0)};
{some, Rest} ->
State = session_flow_control_sent_transfers(Space, State0),
Buf = queue:in_r(Pending#pending_transfer{frames = Rest}, Buf1),
send_pending(State#state{outgoing_pending = Buf})
end;
{Space,
queue:in_r(Pending#pending_transfer{frames = Rest}, Buf1),
State0}
end,
State2 = session_flow_control_sent_transfers(NumTransfersSent, State1),
State = State2#state{outgoing_pending = Buf},
send_pending(State);
{{value, #pending_transfer{}}, _}
when Space =:= 0 ->
State0
Expand Down Expand Up @@ -1415,17 +1406,18 @@ handle_deliver(ConsumerTag, AckRequired,
Msg = {QName, QPid, MsgId, Redelivered, Mc0},
State = #state{outgoing_pending = Pending,
outgoing_delivery_id = DeliveryId,
outgoing_links = OutgoingLinks,
outgoing_links = OutgoingLinks0,
cfg = #cfg{outgoing_max_frame_size = MaxFrameSize,
conn_name = ConnName,
channel_num = ChannelNum,
user = #user{username = Username},
trace_state = Trace}}) ->
Handle = ctag_to_handle(ConsumerTag),
case OutgoingLinks of
case OutgoingLinks0 of
#{Handle := #outgoing_link{queue_type = QType,
send_settled = SendSettled,
max_message_size = MaxMessageSize}} ->
max_message_size = MaxMessageSize,
delivery_count = DelCount} = Link0} ->
Dtag = delivery_tag(MsgId, SendSettled),
Transfer = #'v1_0.transfer'{
handle = ?UINT(Handle),
Expand All @@ -1451,6 +1443,13 @@ handle_deliver(ConsumerTag, AckRequired,
end,
messages_delivered(Redelivered, QType),
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, Trace),
OutgoingLinks = case DelCount of
credit_api_v2 ->
OutgoingLinks0;
{credit_api_v1, C} ->
Link = Link0#outgoing_link{delivery_count = {credit_api_v1, add(C, 1)}},
maps:update(Handle, Link, OutgoingLinks0)
end,
Del = #outgoing_unsettled{
msg_id = MsgId,
consumer_tag = ConsumerTag,
Expand All @@ -1465,8 +1464,9 @@ handle_deliver(ConsumerTag, AckRequired,
queue_pid = QPid,
delivery_id = DeliveryId,
outgoing_unsettled = Del},
State#state{outgoing_delivery_id = add(DeliveryId, 1),
outgoing_pending = queue:in(PendingTransfer, Pending)};
State#state{outgoing_pending = queue:in(PendingTransfer, Pending),
outgoing_delivery_id = add(DeliveryId, 1),
outgoing_links = OutgoingLinks};
_ ->
%% TODO handle missing link -- why does the queue think it's there?
rabbit_log:warning(
Expand Down

0 comments on commit f3f5a8e

Please sign in to comment.