Skip to content

Commit

Permalink
Merge pull request #7638 from rabbitmq/stream-take-credit-even-for-in…
Browse files Browse the repository at this point in the history
…active-subscription

Take credits for inactive stream subscription
  • Loading branch information
acogoluegnes authored Mar 16, 2023
2 parents c23fd71 + e9f2fa4 commit 3f74df7
Showing 1 changed file with 31 additions and 13 deletions.
44 changes: 31 additions & 13 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2008,11 +2008,14 @@ handle_frame_post_auth(Transport,
#stream_connection_state{consumers = Consumers} = State,
{credit, SubscriptionId, Credit}) ->
case Consumers of
#{SubscriptionId := #consumer{log = undefined}} ->
#{SubscriptionId := #consumer{log = undefined} = Consumer} ->
%% the consumer is not active, it's likely to be credit leftovers
%% from a formerly active consumer, just logging and send an error
%% from a formerly active consumer. Taking the credits,
%% logging and sending an error
rabbit_log:debug("Giving credit to an inactive consumer: ~tp",
[SubscriptionId]),
#consumer{credit = AvailableCredit} = Consumer,
Consumer1 = Consumer#consumer{credit = AvailableCredit + Credit},

Code = ?RESPONSE_CODE_PRECONDITION_FAILED,
Frame =
Expand All @@ -2022,7 +2025,9 @@ handle_frame_post_auth(Transport,
rabbit_global_counters:increase_protocol_counter(stream,
?PRECONDITION_FAILED,
1),
{Connection, State};
{Connection,
State#stream_connection_state{consumers =
Consumers#{SubscriptionId => Consumer1}}};
#{SubscriptionId := Consumer} ->
#consumer{credit = AvailableCredit, last_listener_offset = LLO} =
Consumer,
Expand Down Expand Up @@ -2519,9 +2524,10 @@ handle_frame_post_auth(Transport,
ROS
end,

rabbit_log:debug("Initializing reader for active consumer, offset "
rabbit_log:debug("Initializing reader for active consumer "
"(subscription ~tp, stream ~tp), offset "
"spec is ~tp",
[OffsetSpec]),
[SubscriptionId, Stream, OffsetSpec]),
QueueResource =
#resource{name = Stream,
kind = queue,
Expand All @@ -2535,6 +2541,19 @@ handle_frame_post_auth(Transport,
Properties,
OffsetSpec),
Consumer1 = Consumer#consumer{log = Segment},
#consumer{credit = Crdt,
send_limit = SndLmt,
configuration = #consumer_configuration{counters = ConsumerCounters}} = Consumer1,

rabbit_log:debug("Dispatching to subscription ~tp (stream ~tp), "
"credit(s) ~tp, send limit ~tp",
[SubscriptionId,
Stream,
Crdt,
SndLmt]),

ConsumedMessagesBefore = messages_consumed(ConsumerCounters),

Consumer2 =
case send_chunks(DeliverVersion,
Transport,
Expand All @@ -2554,17 +2573,16 @@ handle_frame_post_auth(Transport,
{ok, Csmr} ->
Csmr
end,
#consumer{configuration =
#consumer_configuration{counters =
ConsumerCounters},
log = Log2} =
Consumer2,
#consumer{log = Log2} = Consumer2,
ConsumerOffset = osiris_log:next_offset(Log2),

rabbit_log:debug("Subscription ~tp is now at offset ~tp with ~tp "
ConsumedMessagesAfter = messages_consumed(ConsumerCounters),
rabbit_log:debug("Subscription ~tp (stream ~tp) is now at offset ~tp with ~tp "
"message(s) distributed after subscription",
[SubscriptionId, ConsumerOffset,
messages_consumed(ConsumerCounters)]),
[SubscriptionId,
Stream,
ConsumerOffset,
ConsumedMessagesAfter - ConsumedMessagesBefore]),

Consumers#{SubscriptionId => Consumer2};
#{SubscriptionId :=
Expand Down

0 comments on commit 3f74df7

Please sign in to comment.