Skip to content
This repository has been archived by the owner on Nov 18, 2020. It is now read-only.

Close all connections command #163

Merged
merged 7 commits into from
Jan 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions lib/rabbitmq/cli/ctl/commands/close_all_connections_command.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License
## at http://www.mozilla.org/MPL/
##
## Software distributed under the License is distributed on an "AS IS"
## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
## the License for the specific language governing rights and
## limitations under the License.
##
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is GoPivotal, Inc.
## Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.


defmodule RabbitMQ.CLI.Ctl.Commands.CloseAllConnectionsCommand do
@behaviour RabbitMQ.CLI.CommandBehaviour
@flags []

def merge_defaults(args, opts) do
{args, Map.merge(%{global: false, vhost: "/", per_connection_delay: 0, limit: 0}, opts)}
end

def validate(args, _) when length(args) > 1, do: {:validation_failure, :too_many_args}
def validate(args, _) when length(args) < 1, do: {:validation_failure, :not_enough_args}
def validate([_], _), do: :ok

def run([explanation], %{node: node_name, vhost: vhost, global: global_opt,
per_connection_delay: delay, limit: limit}) do
conns = case global_opt do
false ->
per_vhost = :rabbit_misc.rpc_call(node_name, :rabbit_connection_tracking, :list, [vhost])
apply_limit(per_vhost, limit)
true ->
:rabbit_misc.rpc_call(node_name, :rabbit_connection_tracking,
:list_on_node, [node_name])
end
case conns do
{:badrpc, _} = err ->
err
_ ->
:rabbit_misc.rpc_call(node_name, :rabbit_connection_tracking_handler,
:close_connections, [conns, explanation, delay])
{:ok, "Closed #{length(conns)} connections"}
end
end

defp apply_limit(conns, 0) do
conns
end
defp apply_limit(conns, number) do
:lists.sublist(conns, number)
end

def output({:stream, stream}, _opts) do
{:stream, Stream.filter(stream, fn(x) -> x != :ok end)}
end
use RabbitMQ.CLI.DefaultOutput

def switches(), do: [global: :boolean, per_connection_delay: :integer, limit: :integer]

def usage, do: "close_all_connections [-p <vhost> --limit <limit>] [-n <node> --global] [--per-connection-delay <delay>] <explanation>"

def banner([explanation], %{node: node_name, global: true}) do
"Closing all connections to node #{node_name} (across all vhosts), reason: #{explanation}..."
end
def banner([explanation], %{vhost: vhost, limit: 0}) do
"Closing all connections in vhost #{vhost}, reason: #{explanation}..."
end
def banner([explanation], %{vhost: vhost, limit: limit}) do
"Closing #{limit} connections in vhost #{vhost}, reason: #{explanation}..."
end
end
141 changes: 141 additions & 0 deletions test/close_all_connections_command_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License
## at http://www.mozilla.org/MPL/
##
## Software distributed under the License is distributed on an "AS IS"
## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
## the License for the specific language governing rights and
## limitations under the License.
##
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is GoPivotal, Inc.
## Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.


defmodule CloseAllConnectionsCommandTest do
use ExUnit.Case, async: false
import TestHelper

alias RabbitMQ.CLI.Ctl.RpcStream, as: RpcStream

@helpers RabbitMQ.CLI.Core.Helpers

@command RabbitMQ.CLI.Ctl.Commands.CloseAllConnectionsCommand

@vhost "/"

setup_all do
RabbitMQ.CLI.Core.Distribution.start()
:net_kernel.connect_node(get_rabbit_hostname)
close_all_connections(get_rabbit_hostname)

on_exit([], fn ->
close_all_connections(get_rabbit_hostname)
:erlang.disconnect_node(get_rabbit_hostname)

end)

:ok
end

test "validate: with an invalid number of arguments returns an arg count error", context do
assert @command.validate(["random", "explanation"], context[:opts]) == {:validation_failure, :too_many_args}
assert @command.validate([], context[:opts]) == {:validation_failure, :not_enough_args}
end

test "validate: with the correct number of arguments returns ok", context do
assert @command.validate(["explanation"], context[:opts]) == :ok
end

test "run: a close connections request in an existing vhost with all defaults closes all connections", context do
close_all_connections(get_rabbit_hostname)
with_connection(@vhost, fn(_) ->
node = @helpers.parse_node(context[:node])
nodes = @helpers.nodes_in_cluster(node)
[[vhost: @vhost]] = fetch_connections_vhosts(node, nodes)
opts = %{node: node, vhost: @vhost, global: false, per_connection_delay: 0, limit: 0}
assert {:ok, "Closed 1 connections"} == @command.run(["test"], opts)
assert fetch_connections_vhosts(node, nodes) == []
end)
end

test "run: close a limited number of connections in an existing vhost closes a subset of connections", context do
close_all_connections(get_rabbit_hostname)
with_connections([@vhost, @vhost, @vhost], fn(_) ->
node = @helpers.parse_node(context[:node])
nodes = @helpers.nodes_in_cluster(node)
[[vhost: @vhost], [vhost: @vhost], [vhost: @vhost]] = fetch_connections_vhosts(node, nodes)
opts = %{node: node, vhost: @vhost, global: false, per_connection_delay: 0, limit: 2}
assert {:ok, "Closed 2 connections"} == @command.run(["test"], opts)
assert fetch_connections_vhosts(node, nodes) == [[vhost: @vhost]]
end)
end

test "run: a close connections request for a non-existing vhost does nothing", context do
with_connection(@vhost, fn(_) ->
node = @helpers.parse_node(context[:node])
nodes = @helpers.nodes_in_cluster(node)
[[vhost: @vhost]] = fetch_connections_vhosts(node, nodes)
opts = %{node: node, vhost: "burrow", global: false, per_connection_delay: 0, limit: 0}
assert {:ok, "Closed 0 connections"} == @command.run(["test"], opts)
assert fetch_connections_vhosts(node, nodes) == [[vhost: @vhost]]
close_all_connections(node)
end)
end

test "run: a close connections request to an existing node with --global (all vhosts)", context do
close_all_connections(get_rabbit_hostname)
with_connection(@vhost, fn(_) ->
node = @helpers.parse_node(context[:node])
nodes = @helpers.nodes_in_cluster(node)
[[vhost: @vhost]] = fetch_connections_vhosts(node, nodes)
opts = %{node: node, vhost: "fakeit", global: true, per_connection_delay: 0, limit: 0}
assert {:ok, "Closed 1 connections"} == @command.run(["test"], opts)
assert fetch_connections_vhosts(node, nodes) == []
end)
end

test "run: a close_all_connections request to non-existent RabbitMQ node returns nodedown" do
target = :jake@thedog
:net_kernel.connect_node(target)
opts = %{node: target, vhost: @vhost, global: true, per_connection_delay: 0, limit: 0}
assert match?({:badrpc, :nodedown}, @command.run(["test"], opts))
end

test "banner for vhost option", context do
node = @helpers.parse_node(context[:node])
opts = %{node: node, vhost: "burrow", global: false, per_connection_delay: 0, limit: 0}
s = @command.banner(["some reason"], opts)
assert s =~ ~r/Closing all connections in vhost burrow/
assert s =~ ~r/some reason/
end

test "banner for vhost option with limit", context do
node = @helpers.parse_node(context[:node])
opts = %{node: node, vhost: "burrow", global: false, per_connection_delay: 0, limit: 2}
s = @command.banner(["some reason"], opts)
assert s =~ ~r/Closing 2 connections in vhost burrow/
assert s =~ ~r/some reason/
end

test "banner for global option" do
opts = %{node: :test@localhost, vhost: "burrow", global: true, per_connection_delay: 0, limit: 0}
s = @command.banner(["some reason"], opts)
assert s =~ ~r/Closing all connections to node test@localhost/
assert s =~ ~r/some reason/
end

defp fetch_connections_vhosts(node, nodes) do
RpcStream.receive_list_items(node,
:rabbit_networking,
:emit_connection_info_all,
[nodes, [:vhost]],
:infinity,
[:vhost],
Kernel.length(nodes))
|> Enum.to_list
end

end
15 changes: 15 additions & 0 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,21 @@ defmodule TestHelper do
fun.(conn)
end

def with_connections(vhosts, fun) do
conns = for v <- vhosts do
{:ok, conn} = AMQP.Connection.open(virtual_host: v)
conn
end
ExUnit.Callbacks.on_exit(fn ->
try do
for c <- conns, do: :amqp_connection.close(c, 1000)
catch
:exit, _ -> :ok
end
end)
fun.(conns)
end

def message_count(vhost, queue_name) do
with_channel(vhost, fn(channel) ->
{:ok, %{message_count: mc}} = AMQP.Queue.declare(channel, queue_name)
Expand Down