Skip to content

Commit

Permalink
Merge pull request #2184 from bootjp/master
Browse files Browse the repository at this point in the history
Add connection check at startup when out_forward.
  • Loading branch information
repeatedly authored Nov 22, 2018
2 parents cf85ba6 + 8a7022d commit c505cf6
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 0 deletions.
27 changes: 27 additions & 0 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class ConnectionClosedError < Error; end
desc 'Ignore DNS resolution and errors at startup time.'
config_param :ignore_network_errors_at_startup, :bool, default: false

desc 'Verify that a connection can be made with one of out_forward nodes at the time of startup.'
config_param :verify_connection_at_startup, :bool, default: false

desc 'Compress buffered data.'
config_param :compress, :enum, list: [:text, :gzip], default: :text

Expand Down Expand Up @@ -258,6 +261,17 @@ def start
@sock_ack_waiting = []
thread_create(:out_forward_receiving_ack, &method(:ack_reader))
end

if @verify_connection_at_startup
@nodes.each do |node|
begin
node.verify_connection
rescue StandardError => e
log.fatal "forward's connection setting error: #{e.message}"
raise Fluent::UnrecoverableError, e.message
end
end
end
end

def close
Expand Down Expand Up @@ -573,6 +587,19 @@ def standby?
@standby
end

def verify_connection
sock = @sender.create_transfer_socket(resolved_host, port, @hostname)
begin
ri = RequestInfo.new(@sender.security ? :helo : :established)
if ri.state != :established
establish_connection(sock, ri)
raise if ri.state != :established
end
ensure
sock.close
end
end

def establish_connection(sock, ri)
while available? && ri.state != :established
begin
Expand Down
121 changes: 121 additions & 0 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,18 @@ def read_ack_from_sock(sock, unpacker)
assert_equal 2, d.instance.ack_response_timeout
end

test 'verify_connection_at_startup is disabled in default' do
@d = d = create_driver(CONFIG)
assert_false d.instance.verify_connection_at_startup
end

test 'verify_connection_at_startup can be enabled' do
@d = d = create_driver(CONFIG + %[
verify_connection_at_startup true
])
assert_true d.instance.verify_connection_at_startup
end

test 'send tags in str (utf-8 strings)' do
target_input_driver = create_target_input_driver

Expand Down Expand Up @@ -800,4 +812,113 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG
i.configure(conf)
end
end

sub_test_case 'verify_connection_at_startup' do
test 'nodes are not available' do
@d = d = create_driver(CONFIG + %[
verify_connection_at_startup true
<buffer tag>
flush_mode immediate
retry_type periodic
retry_wait 30s
flush_at_shutdown false # suppress errors in d.instance_shutdown
</buffer>
])
assert_raise Fluent::UnrecoverableError do
d.instance_start
end
d.instance_shutdown
end

test 'nodes_shared_key_miss_match' do
input_conf = TARGET_CONFIG + %[
<security>
self_hostname in.localhost
shared_key fluentd-sharedkey
</security>
]
target_input_driver = create_target_input_driver(conf: input_conf)
output_conf = %[
send_timeout 30
heartbeat_type transport
transport tls
tls_verify_hostname false
verify_connection_at_startup true
require_ack_response true
ack_response_timeout 5s
<security>
self_hostname localhost
shared_key key_miss_match
</security>
<buffer tag>
flush_mode immediate
retry_type periodic
retry_wait 30s
flush_at_shutdown false # suppress errors in d.instance_shutdown
flush_thread_interval 31s
</buffer>
<server>
host #{TARGET_HOST}
port #{TARGET_PORT}
</server>
]
@d = d = create_driver(output_conf)

target_input_driver.run(expect_records: 1, timeout: 15) do
assert_raise Fluent::UnrecoverableError do
d.instance_start
end
d.instance_shutdown
end
end

test 'nodes_shared_key_match' do
input_conf = TARGET_CONFIG + %[
<security>
self_hostname in.localhost
shared_key fluentd-sharedkey
<client>
host 127.0.0.1
</client>
</security>
]
target_input_driver = create_target_input_driver(conf: input_conf)

output_conf = %[
send_timeout 51
verify_connection_at_startup true
<security>
self_hostname localhost
shared_key fluentd-sharedkey
</security>
<server>
name test
host #{TARGET_HOST}
port #{TARGET_PORT}
shared_key fluentd-sharedkey
</server>
]
@d = d = create_driver(output_conf)

time = event_time("2011-01-02 13:14:15 UTC")
records = [
{"a" => 1},
{"a" => 2}
]

target_input_driver.run(expect_records: 2, timeout: 15) do
d.run(default_tag: 'test') do
records.each do |record|
d.feed(time, record)
end
end
end

events = target_input_driver.events
assert{ events != [] }
assert_equal(['test', time, records[0]], events[0])
assert_equal(['test', time, records[1]], events[1])
end
end
end

0 comments on commit c505cf6

Please sign in to comment.