From 078a78ae00a88566e2b7068a63457e58e149e09e Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 4 Jul 2016 02:44:58 +0300 Subject: [PATCH] Towards covering node termination/unavailability in connection tracking --- src/rabbit_connection_tracking.erl | 51 +++++- src/rabbit_connection_tracking_handler.erl | 2 +- src/rabbit_node_monitor.erl | 1 + test/per_vhost_connection_limit_SUITE.erl | 191 +++++++++++++++++++-- 4 files changed, 223 insertions(+), 22 deletions(-) diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index 948a85eba890..b806f8d3e800 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -26,12 +26,15 @@ %% * rabbit_event -export([register_connection/1, unregister_connection/1, + list/0, list/1, list_on_node/1, tracked_connection_from_connection_created/1, - is_over_connection_limit/1, count_connections_in/1]). + is_over_connection_limit/1, count_connections_in/1, + on_node_down/1]). -include_lib("rabbit.hrl"). --define(TABLE, rabbit_tracked_connection). +-define(TABLE, rabbit_tracked_connection). +-define(PER_VHOST_COUNTER_TABLE, rabbit_tracked_connection_per_vhost). -define(SERVER, ?MODULE). %% @@ -58,12 +61,49 @@ unregister_connection(ConnId = {_Node, _Name}) -> [] -> ok; [Row] -> mnesia:dirty_update_counter( - rabbit_tracked_connection_per_vhost, + ?PER_VHOST_COUNTER_TABLE, Row#tracked_connection.vhost, -1), mnesia:delete({?TABLE, ConnId}) end end). + +-spec list() -> [rabbit_types:tracked_connection()]. + +list() -> + mnesia:dirty_match_object(?TABLE, #tracked_connection{_ = '_'}). + + +-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()]. + +list(VHost) -> + mnesia:dirty_match_object(?TABLE, #tracked_connection{vhost = VHost, _ = '_'}). + + +-spec list_on_node(node()) -> [rabbit_types:tracked_connection()]. + +list_on_node(Node) -> + mnesia:dirty_match_object(?TABLE, #tracked_connection{node = Node, _ = '_'}). + + +-spec on_node_down(node()) -> ok. + +on_node_down(Node) -> + case lists:member(Node, nodes()) of + false -> + Cs = list_on_node(Node), + rabbit_log:info( + "Node ~p is down, unregistering ~p connections to it~n", + [Node, length(Cs)]), + [unregister_connection(Id) || #tracked_connection{id = Id} <- Cs], + ok; + true -> rabbit_log:info( + "Keep ~s connections: the node is already back~n", [Node]) + end. + + +-spec is_over_connection_limit(rabbit_types:vhost()) -> boolean(). + is_over_connection_limit(VirtualHost) -> ConnectionCount = count_connections_in(VirtualHost), case rabbit_vhost_limit:connection_limit(VirtualHost) of @@ -77,12 +117,15 @@ is_over_connection_limit(VirtualHost) -> end end. + +-spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer(). + count_connections_in(VirtualHost) -> try case mnesia:transaction( fun() -> case mnesia:dirty_read( - {rabbit_tracked_connection_per_vhost, + {?PER_VHOST_COUNTER_TABLE, VirtualHost}) of [] -> 0; [Val] -> diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl index eb08464b3847..e460235f6fa6 100644 --- a/src/rabbit_connection_tracking_handler.erl +++ b/src/rabbit_connection_tracking_handler.erl @@ -41,7 +41,7 @@ [rabbit_event, ?MODULE, []]}}, {cleanup, {gen_event, delete_handler, [rabbit_event, ?MODULE, []]}}, - {requires, rabbit_event}, + {requires, [rabbit_event, rabbit_node_monitor]}, {enables, recovery}]}). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 0322aacfd151..90667d514636 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -732,6 +732,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions, ok = rabbit_amqqueue:on_node_down(Node), ok = rabbit_alarm:on_node_down(Node), ok = rabbit_mnesia:on_node_down(Node), + ok = rabbit_connection_tracking:on_node_down(Node), %% If we have been partitioned, and we are now in the only remaining %% partition, we no longer care about partitions - forget them. Note %% that we do not attempt to deal with individual (other) partitions diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl index a75c4dc36e39..843a6b3e20b1 100644 --- a/test/per_vhost_connection_limit_SUITE.erl +++ b/test/per_vhost_connection_limit_SUITE.erl @@ -25,6 +25,7 @@ -import(rabbit_ct_client_helpers, [open_unmanaged_connection/2, open_unmanaged_connection/3]). + all() -> [ {group, cluster_size_1}, @@ -34,25 +35,35 @@ all() -> groups() -> [ {cluster_size_1, [], [ - most_basic_single_node_connection_tracking_test, - single_node_single_vhost_connection_tracking_test, - single_node_multiple_vhost_connection_tracking_test + most_basic_single_node_test, + single_node_single_vhost_test, + single_node_multiple_vhost_test, + single_node_list_in_vhost_test ]}, {cluster_size_2, [], [ - most_basic_cluster_connection_tracking_test, - cluster_single_vhost_connection_tracking_test, - cluster_multiple_vhost_connection_tracking_test, - cluster_node_shutdown_connection_tracking_test - ]} + most_basic_cluster_test, + cluster_single_vhost_test, + cluster_multiple_vhost_test, + cluster_node_restart_test, + cluster_node_list_on_node_test + ]}, + {partition_handling, [], [ + cluster_full_partition_test + ]} ]. +%% see partitions_SUITE +-define(DELAY, 9000). + %% ------------------------------------------------------------------- %% Testsuite setup/teardown. %% ------------------------------------------------------------------- init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config). + rabbit_ct_helpers:run_setup_steps(Config, [ + fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1 + ]). end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). @@ -67,9 +78,15 @@ init_per_group(cluster_size_1, Config) -> rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()); init_per_group(cluster_size_2, Config) -> + init_per_multinode_group(cluster_size_2, Config, 2); +init_per_group(partition_handling, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{net_ticktime, 1}]), + init_per_multinode_group(partition_handling, Config1, 3). + +init_per_multinode_group(_GroupName, Config, NodeCount) -> Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodes_count, 3}, + {rmq_nodes_count, NodeCount}, {rmq_nodename_suffix, Suffix} ]), rabbit_ct_helpers:run_steps(Config1, @@ -93,7 +110,7 @@ end_per_testcase(Testcase, Config) -> %% Test cases. %% ------------------------------------------------------------------- -most_basic_single_node_connection_tracking_test(Config) -> +most_basic_single_node_test(Config) -> VHost = <<"/">>, ?assertEqual(0, count_connections_in(Config, VHost)), Conn = open_unmanaged_connection(Config, 0), @@ -103,7 +120,7 @@ most_basic_single_node_connection_tracking_test(Config) -> passed. -single_node_single_vhost_connection_tracking_test(Config) -> +single_node_single_vhost_test(Config) -> VHost = <<"/">>, ?assertEqual(0, count_connections_in(Config, VHost)), @@ -135,7 +152,7 @@ single_node_single_vhost_connection_tracking_test(Config) -> passed. -single_node_multiple_vhost_connection_tracking_test(Config) -> +single_node_multiple_vhost_test(Config) -> VHost1 = <<"vhost1">>, VHost2 = <<"vhost2">>, @@ -184,7 +201,52 @@ single_node_multiple_vhost_connection_tracking_test(Config) -> passed. -most_basic_cluster_connection_tracking_test(Config) -> +single_node_list_in_vhost_test(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + rabbit_ct_broker_helpers:add_vhost(Config, VHost1), + rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost1), + + rabbit_ct_broker_helpers:add_vhost(Config, VHost2), + rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost2), + + ?assertEqual(0, length(connections_in(Config, VHost1))), + ?assertEqual(0, length(connections_in(Config, VHost2))), + + Conn1 = open_unmanaged_connection(Config, 0, VHost1), + [#tracked_connection{vhost = VHost1}] = connections_in(Config, VHost1), + amqp_connection:close(Conn1), + ?assertEqual(0, length(connections_in(Config, VHost1))), + + Conn2 = open_unmanaged_connection(Config, 0, VHost2), + [#tracked_connection{vhost = VHost2}] = connections_in(Config, VHost2), + + Conn3 = open_unmanaged_connection(Config, 0, VHost1), + [#tracked_connection{vhost = VHost1}] = connections_in(Config, VHost1), + + Conn4 = open_unmanaged_connection(Config, 0, VHost1), + (catch exit(Conn4, please_terminate)), + [#tracked_connection{vhost = VHost1}] = connections_in(Config, VHost1), + + Conn5 = open_unmanaged_connection(Config, 0, VHost2), + Conn6 = open_unmanaged_connection(Config, 0, VHost2), + [<<"vhost1">>, <<"vhost2">>] = + lists:usort(lists:map(fun (#tracked_connection{vhost = V}) -> V end, + all_connections(Config))), + + lists:foreach(fun (C) -> + amqp_connection:close(C) + end, [Conn2, Conn3, Conn5, Conn6]), + + ?assertEqual(0, length(all_connections(Config))), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + + passed. + +most_basic_cluster_test(Config) -> VHost = <<"/">>, ?assertEqual(0, count_connections_in(Config, VHost)), Conn1 = open_unmanaged_connection(Config, 0), @@ -204,7 +266,7 @@ most_basic_cluster_connection_tracking_test(Config) -> passed. -cluster_single_vhost_connection_tracking_test(Config) -> +cluster_single_vhost_test(Config) -> VHost = <<"/">>, ?assertEqual(0, count_connections_in(Config, VHost)), @@ -236,7 +298,7 @@ cluster_single_vhost_connection_tracking_test(Config) -> passed. -cluster_multiple_vhost_connection_tracking_test(Config) -> +cluster_multiple_vhost_test(Config) -> VHost1 = <<"vhost1">>, VHost2 = <<"vhost2">>, @@ -285,7 +347,7 @@ cluster_multiple_vhost_connection_tracking_test(Config) -> passed. -cluster_node_shutdown_connection_tracking_test(Config) -> +cluster_node_restart_test(Config) -> VHost = <<"/">>, ?assertEqual(0, count_connections_in(Config, VHost)), @@ -317,6 +379,75 @@ cluster_node_shutdown_connection_tracking_test(Config) -> passed. +cluster_node_list_on_node_test(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + ?assertEqual(0, length(all_connections(Config))), + ?assertEqual(0, length(connections_on_node(Config, 0))), + + Conn1 = open_unmanaged_connection(Config, 0), + [#tracked_connection{node = A}] = connections_on_node(Config, 0), + amqp_connection:close(Conn1), + ?assertEqual(0, length(connections_on_node(Config, 0))), + + _Conn2 = open_unmanaged_connection(Config, 1), + [#tracked_connection{node = B}] = connections_on_node(Config, 1), + + Conn3 = open_unmanaged_connection(Config, 0), + ?assertEqual(1, length(connections_on_node(Config, 0))), + + Conn4 = open_unmanaged_connection(Config, 1), + ?assertEqual(2, length(connections_on_node(Config, 1))), + + (catch exit(Conn4, please_terminate)), + ?assertEqual(1, length(connections_on_node(Config, 1))), + + Conn5 = open_unmanaged_connection(Config, 0), + ?assertEqual(2, length(connections_on_node(Config, 0))), + + rabbit_ct_broker_helpers:stop_broker(Config, 1), + ?assertEqual(2, length(all_connections(Config))), + ?assertEqual(0, length(connections_on_node(Config, 0, B))), + + lists:foreach(fun (C) -> + amqp_connection:close(C) + end, [Conn3, Conn5]), + + timer:sleep(100), + ?assertEqual(0, length(all_connections(Config, 0))), + + passed. + +cluster_full_partition_test(Config) -> + VHost = <<"/">>, + rabbit_ct_broker_helpers:set_partition_handling_mode_globally(Config, autoheal), + + ?assertEqual(0, count_connections_in(Config, VHost)), + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + %% 3 connections, 1 per node + Conn1 = open_unmanaged_connection(Config, 0), + Conn2 = open_unmanaged_connection(Config, 1), + Conn3 = open_unmanaged_connection(Config, 2), + ?assertEqual(3, count_connections_in(Config, VHost)), + + %% B drops off the network, non-reachable by either A or C + rabbit_ct_broker_helpers:block_traffic_between(A, B), + rabbit_ct_broker_helpers:block_traffic_between(B, C), + timer:sleep(?DELAY), + + ?assertEqual(3, count_connections_in(Config, VHost)), + + rabbit_ct_broker_helpers:allow_traffic_between(A, B), + rabbit_ct_broker_helpers:allow_traffic_between(B, C), + ?assertEqual(3, count_connections_in(Config, VHost)), + + lists:foreach(fun (Conn) -> + (catch amqp_connection:close(Conn)) + end, [Conn1, Conn2, Conn3]), + + passed. + %% ------------------------------------------------------------------- %% Helpers @@ -328,3 +459,29 @@ count_connections_in(Config, VHost, NodeIndex) -> rabbit_ct_broker_helpers:rpc(Config, NodeIndex, rabbit_connection_tracking, count_connections_in, [VHost]). + +connections_in(Config, VHost) -> + connections_in(Config, 0, VHost). +connections_in(Config, NodeIndex, VHost) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + list, [VHost]). + +connections_on_node(Config) -> + connections_on_node(Config, 0). +connections_on_node(Config, NodeIndex) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, NodeIndex, nodename), + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + list_on_node, [Node]). +connections_on_node(Config, NodeIndex, NodeForListing) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + list_on_node, [NodeForListing]). + +all_connections(Config) -> + all_connections(Config, 0). +all_connections(Config, NodeIndex) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + list, []).