diff --git a/src/rabbit.erl b/src/rabbit.erl index 2fa4cdee71ef..59ede3c802de 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -178,6 +178,11 @@ {mfa, {rabbit_direct, boot, []}}, {requires, log_relay}]}). +-rabbit_boot_step({connection_tracker, + [{description, "helps track node-local connections"}, + {mfa, {rabbit_connection_tracker, boot, []}}, + {requires, log_relay}]}). + -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, {requires, log_relay}]}). diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index 21d72ef91e98..562a13983b51 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -28,6 +28,7 @@ -export([register_connection/1, unregister_connection/1, list/0, list/1, list_on_node/1, tracked_connection_from_connection_created/1, + tracked_connection_from_connection_state/1, is_over_connection_limit/1, count_connections_in/1, on_node_down/1]). @@ -101,6 +102,9 @@ on_node_down(Node) -> "Keeping ~s connections: the node is already back~n", [Node]) end. +-spec on_node_up(node()) -> ok. +on_node_up(Node) -> + end. -spec is_over_connection_limit(rabbit_types:vhost()) -> boolean(). @@ -198,3 +202,21 @@ tracked_connection_from_connection_created(EventDetails) -> pid = proplists:get_value(pid, EventDetails), peer_host = proplists:get_value(peer_host, EventDetails), peer_port = proplists:get_value(peer_port, EventDetails)}. + +tracked_connection_from_connection_state(#connection{ + vhost = VHost, + connected_at = Ts, + peer_host = PeerHost, + peer_port = PeerPort, + user = Username, + name = Name + }) -> + tracked_connection_from_connection_created( + [{name, Name}, + {node, node()}, + {vhost, VHost}, + {user, Username}, + {connected_at, Ts}, + {pid, self()}, + {peer_port, PeerPort}, + {peer_host, PeerHost}]). diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl index e460235f6fa6..70195d5fb8b2 100644 --- a/src/rabbit_connection_tracking_handler.erl +++ b/src/rabbit_connection_tracking_handler.erl @@ -57,6 +57,12 @@ handle_event(#event{type = connection_created, props = Details}, State) -> rabbit_connection_tracking:tracked_connection_from_connection_created(Details) ), {ok, State}; +%% see rabbit_reader +handle_event(#event{type = connection_reregistered, props = [{state, ConnState}]}, State) -> + rabbit_connection_tracking:register_connection( + rabbit_connection_tracking:tracked_connection_from_connection_state(ConnState) + ), + {ok, State}; handle_event(#event{type = connection_closed, props = Details}, State) -> %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>}, %% {pid,<0.1774.0>}, diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 90667d514636..3409c8ab71a4 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -761,7 +761,8 @@ ensure_keepalive_timer(State) -> handle_live_rabbit(Node) -> ok = rabbit_amqqueue:on_node_up(Node), ok = rabbit_alarm:on_node_up(Node), - ok = rabbit_mnesia:on_node_up(Node). + ok = rabbit_mnesia:on_node_up(Node), + ok = rabbit_connection_tracking:on_node_up(Node),. maybe_autoheal(State = #state{partitions = []}) -> State; diff --git a/test/per_vhost_connection_limit_partitions_SUITE.erl b/test/per_vhost_connection_limit_partitions_SUITE.erl index 2e5d1c8570ee..149e9542611d 100644 --- a/test/per_vhost_connection_limit_partitions_SUITE.erl +++ b/test/per_vhost_connection_limit_partitions_SUITE.erl @@ -28,12 +28,12 @@ all() -> [ - {group, partition_handling} + {group, net_ticktime_1} ]. groups() -> [ - {partition_handling, [], [ + {net_ticktime_1, [], [ cluster_full_partition_test ]} ]. @@ -54,23 +54,25 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). -init_per_group(partition_handling, Config) -> +init_per_group(net_ticktime_1 = GroupName, Config) -> Config1 = rabbit_ct_helpers:set_config(Config, [{net_ticktime, 1}]), - init_per_multinode_group(partition_handling, Config1, 3). + init_per_multinode_group(GroupName, Config1, 3). init_per_multinode_group(_GroupName, Config, NodeCount) -> Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), Config1 = rabbit_ct_helpers:set_config(Config, [ {rmq_nodes_count, NodeCount}, - {rmq_nodename_suffix, Suffix} + {rmq_nodename_suffix, Suffix}, + {rmq_nodes_clustered, false} ]), rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). + rabbit_ct_broker_helpers:setup_steps() ++ [ + fun rabbit_ct_broker_helpers:enable_dist_proxy/1, + fun rabbit_ct_broker_helpers:cluster_nodes/1 + ]). end_per_group(_Group, Config) -> rabbit_ct_helpers:run_steps(Config, - rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()). init_per_testcase(Testcase, Config) -> @@ -92,25 +94,31 @@ cluster_full_partition_test(Config) -> ?assertEqual(0, count_connections_in(Config, VHost)), [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - %% 3 connections, 1 per node - Conn1 = open_unmanaged_connection(Config, 0), - Conn2 = open_unmanaged_connection(Config, 1), - Conn3 = open_unmanaged_connection(Config, 2), - ?assertEqual(3, count_connections_in(Config, VHost)), + %% 6 connections, 2 per node + Conn1 = open_unmanaged_connection(Config, A), + Conn2 = open_unmanaged_connection(Config, A), + Conn3 = open_unmanaged_connection(Config, B), + Conn4 = open_unmanaged_connection(Config, B), + Conn5 = open_unmanaged_connection(Config, C), + Conn6 = open_unmanaged_connection(Config, C), + ?assertEqual(6, count_connections_in(Config, VHost)), %% B drops off the network, non-reachable by either A or C rabbit_ct_broker_helpers:block_traffic_between(A, B), rabbit_ct_broker_helpers:block_traffic_between(B, C), timer:sleep(?DELAY), - ?assertEqual(2, count_connections_in(Config, VHost)), + %% A and C are still connected, so 4 connections are tracked + ?assertEqual(4, count_connections_in(Config, VHost)), rabbit_ct_broker_helpers:allow_traffic_between(A, B), rabbit_ct_broker_helpers:allow_traffic_between(B, C), - ?assertEqual(3, count_connections_in(Config, VHost)), + timer:sleep(?DELAY), + + ?assertEqual(6, count_connections_in(Config, VHost)), lists:foreach(fun (Conn) -> - (catch amqp_connection:close(Conn)) + (catch rabbit_ct_client_helpers:close_connection(Conn)) end, [Conn1, Conn2, Conn3]), passed.