From a774c7bebe0d91c18af1a2035697c431dca28d89 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 19 Jul 2016 04:05:20 +0300 Subject: [PATCH] Ask nodes that come back to re-register their connections Depending on the partition handling mode used there may or may not be any clients still connected. We make sure that registration and deregistration functions are idempotent and assume there may be connections on the node that has come back. Point of improvement: when a node comes back up, N-1 nodes will tell it to re-register connections. It could be fewer than N-1, ideally just 1. --- src/rabbit_connection_tracker.erl | 18 ++++++++++++------ src/rabbit_connection_tracking.erl | 18 ++++++++++++------ ...vhost_connection_limit_partitions_SUITE.erl | 12 +++++++----- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/src/rabbit_connection_tracker.erl b/src/rabbit_connection_tracker.erl index b9975ef10b33..0177627d8316 100644 --- a/src/rabbit_connection_tracker.erl +++ b/src/rabbit_connection_tracker.erl @@ -28,7 +28,7 @@ -behaviour(gen_server2). %% API --export([boot/0, start_link/0, reregister/0]). +-export([boot/0, start_link/0, reregister/1]). %% gen_fsm callbacks -export([init/1, @@ -52,8 +52,9 @@ boot() -> start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). -reregister() -> - gen_server2:cast({local, ?SERVER}, reregister). +reregister(Node) -> + rabbit_log:info("Telling node ~p to re-register tracked connections", [Node]), + gen_server2:cast({?SERVER, Node}, reregister). %%%=================================================================== %%% gen_server callbacks @@ -66,8 +67,9 @@ handle_call(_Req, _From, State) -> {noreply, State}. handle_cast(reregister, State) -> - rabbit_log:info("Connection tracker: asked to re-register client connections"), - case rabbit_networking:connections_local() of + Cs = rabbit_networking:connections_local(), + rabbit_log:info("Connection tracker: asked to re-register ~p client connections", [length(Cs)]), + case Cs of [] -> ok; Cs -> [reregister_connection(C) || C <- Cs], @@ -90,4 +92,8 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== reregister_connection(Conn) -> - Conn ! reregister. + try + Conn ! reregister + catch _:Error -> + rabbit_log:error("Failed to re-register connection ~p after a network split: ~p", [Conn, Error]) + end. diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index dbb7192f7b2b..c65023a2a850 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -44,12 +44,18 @@ -spec register_connection(rabbit_types:tracked_connection()) -> ok. -register_connection(#tracked_connection{vhost = VHost} = Conn) -> +register_connection(#tracked_connection{vhost = VHost, id = ConnId} = Conn) -> rabbit_misc:execute_mnesia_transaction( fun() -> - mnesia:write(?TABLE, Conn, write), - mnesia:dirty_update_counter( - rabbit_tracked_connection_per_vhost, VHost, 1), + %% upsert + case mnesia:dirty_read(?TABLE, ConnId) of + [] -> + mnesia:write(?TABLE, Conn, write), + mnesia:dirty_update_counter( + rabbit_tracked_connection_per_vhost, VHost, 1); + [_Row] -> + ok + end, ok end). @@ -103,8 +109,8 @@ on_node_down(Node) -> end. -spec on_node_up(node()) -> ok. -on_node_up(_Node) -> - %% TODO +on_node_up(Node) -> + rabbit_connection_tracker:reregister(Node), ok. -spec is_over_connection_limit(rabbit_types:vhost()) -> boolean(). diff --git a/test/per_vhost_connection_limit_partitions_SUITE.erl b/test/per_vhost_connection_limit_partitions_SUITE.erl index 149e9542611d..6ef6a2549686 100644 --- a/test/per_vhost_connection_limit_partitions_SUITE.erl +++ b/test/per_vhost_connection_limit_partitions_SUITE.erl @@ -34,12 +34,12 @@ all() -> groups() -> [ {net_ticktime_1, [], [ - cluster_full_partition_test + cluster_full_partition_with_autoheal_test ]} ]. %% see partitions_SUITE --define(DELAY, 9000). +-define(DELAY, 12000). %% ------------------------------------------------------------------- %% Testsuite setup/teardown. @@ -87,7 +87,7 @@ end_per_testcase(Testcase, Config) -> %% Test cases. %% ------------------------------------------------------------------- -cluster_full_partition_test(Config) -> +cluster_full_partition_with_autoheal_test(Config) -> VHost = <<"/">>, rabbit_ct_broker_helpers:set_partition_handling_mode_globally(Config, autoheal), @@ -115,11 +115,13 @@ cluster_full_partition_test(Config) -> rabbit_ct_broker_helpers:allow_traffic_between(B, C), timer:sleep(?DELAY), - ?assertEqual(6, count_connections_in(Config, VHost)), + %% during autoheal B's connections were dropped + ?assertEqual(4, count_connections_in(Config, VHost)), lists:foreach(fun (Conn) -> (catch rabbit_ct_client_helpers:close_connection(Conn)) - end, [Conn1, Conn2, Conn3]), + end, [Conn1, Conn2, Conn3, Conn4, + Conn5, Conn6]), passed.