Skip to content

Commit

Permalink
Emit cancellation event only when stream consumer is cancelled
Browse files Browse the repository at this point in the history
Not when the channel or the connection is closed.

References #13085, #9356

(cherry picked from commit 69d0382)

# Conflicts:
#	deps/rabbitmq_stream/src/rabbit_stream_reader.erl
  • Loading branch information
acogoluegnes authored and mergify[bot] committed Jan 17, 2025
1 parent 0fb3634 commit cac0839
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
6 changes: 1 addition & 5 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -973,11 +973,7 @@ close(#stream_client{readers = Readers,
name = QName}) ->
maps:foreach(fun (CTag, #stream{log = Log}) ->
close_log(Log),
rabbit_core_metrics:consumer_deleted(self(), CTag, QName),
rabbit_event:notify(consumer_deleted,
[{consumer_tag, CTag},
{channel, self()},
{queue, QName}])
rabbit_core_metrics:consumer_deleted(self(), CTag, QName)
end, Readers).

update(Q, State)
Expand Down
14 changes: 9 additions & 5 deletions deps/rabbitmq_stream/src/rabbit_stream_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-export([init/0]).
-export([consumer_created/9,
consumer_updated/9,
consumer_cancelled/3]).
consumer_cancelled/4]).
-export([publisher_created/4,
publisher_updated/7,
publisher_deleted/3]).
Expand Down Expand Up @@ -104,16 +104,20 @@ consumer_updated(Connection,

ok.

consumer_cancelled(Connection, StreamResource, SubscriptionId) ->
consumer_cancelled(Connection, StreamResource, SubscriptionId, Notify) ->
ets:delete(?TABLE_CONSUMER,
{StreamResource, Connection, SubscriptionId}),
rabbit_global_counters:consumer_deleted(stream),
rabbit_core_metrics:consumer_deleted(Connection,
consumer_tag(SubscriptionId),
StreamResource),
rabbit_event:notify(consumer_deleted,
[{consumer_tag, consumer_tag(SubscriptionId)},
{channel, self()}, {queue, StreamResource}]),
case Notify of
true ->
rabbit_event:notify(consumer_deleted,
[{consumer_tag, consumer_tag(SubscriptionId)},
{channel, self()}, {queue, StreamResource}]);
_ -> ok
end,
ok.

publisher_created(Connection,
Expand Down
23 changes: 16 additions & 7 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
<<<<<<< HEAD
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved.
=======
%% Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
>>>>>>> 69d0382dd (Emit cancellation event only when stream consumer is cancelled)
%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

Expand Down Expand Up @@ -2249,7 +2253,7 @@ handle_frame_post_auth(Transport,
{Connection, State};
true ->
{Connection1, State1} =
remove_subscription(SubscriptionId, Connection, State),
remove_subscription(SubscriptionId, Connection, State, true),
response_ok(Transport, Connection, unsubscribe, CorrelationId),
{Connection1, State1}
end;
Expand Down Expand Up @@ -3081,7 +3085,7 @@ evaluate_state_after_secret_update(Transport,
_ ->
{C1, S1} =
lists:foldl(fun(SubId, {Conn, St}) ->
remove_subscription(SubId, Conn, St)
remove_subscription(SubId, Conn, St, false)
end, {C0, S0}, Subs),
{Acc#{Str => ok}, C1, S1}
end
Expand Down Expand Up @@ -3216,7 +3220,8 @@ notify_connection_closed(#statem_data{connection =
ConnectionState}) ->
rabbit_core_metrics:connection_closed(self()),
[rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(S, Connection), SubId)
stream_r(S, Connection),
SubId, false)
|| #consumer{configuration =
#consumer_configuration{stream = S,
subscription_id = SubId}}
Expand Down Expand Up @@ -3304,7 +3309,8 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(Stream,
C0),
SubId),
SubId,
false),
maybe_unregister_consumer(
VirtualHost, Consumer,
single_active_consumer(Consumer),
Expand All @@ -3314,7 +3320,8 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(Stream,
C0),
SubId),
SubId,
false),
maybe_unregister_consumer(
VirtualHost, Consumer,
single_active_consumer(Consumer),
Expand Down Expand Up @@ -3431,7 +3438,8 @@ remove_subscription(SubscriptionId,
stream_subscriptions =
StreamSubscriptions} =
Connection,
#stream_connection_state{consumers = Consumers} = State) ->
#stream_connection_state{consumers = Consumers} = State,
Notify) ->
#{SubscriptionId := Consumer} = Consumers,
#consumer{log = Log,
configuration = #consumer_configuration{stream = Stream, member_pid = MemberPid}} =
Expand All @@ -3457,7 +3465,8 @@ remove_subscription(SubscriptionId,
Connection2 = maybe_clean_connection_from_stream(MemberPid, Stream, Connection1),
rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(Stream, Connection2),
SubscriptionId),
SubscriptionId,
Notify),

Requests1 = maybe_unregister_consumer(
VirtualHost, Consumer,
Expand Down

0 comments on commit cac0839

Please sign in to comment.