Skip to content

Commit

Permalink
Merge pull request #2532 from ganmacs/refactor-available
Browse files Browse the repository at this point in the history
Refactor available
  • Loading branch information
repeatedly authored Jul 31, 2019
2 parents 7640531 + ebc6b5d commit 5662a39
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
20 changes: 10 additions & 10 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ def initialize(sender, server, failure:, connection_manager:)

attr_reader :name, :host, :port, :weight, :standby, :state
attr_reader :sockaddr # used by on_udp_heatbeat_response_recv
attr_reader :failure, :available # for test
attr_reader :failure # for test

def validate_host_resolution!
resolved_host
Expand Down Expand Up @@ -590,7 +590,7 @@ def verify_connection
end

def establish_connection(sock, ri)
while available? && ri.state != :established
while ri.state != :established
begin
# TODO: On Ruby 2.2 or earlier, read_nonblock doesn't work expectedly.
# We need rewrite around here using new socket/server plugin helper.
Expand Down Expand Up @@ -630,10 +630,6 @@ def establish_connection(sock, ri)
end

def send_data_actual(sock, tag, chunk)
unless available?
raise ConnectionClosedError, "failed to establish connection with node #{@name}"
end

option = { 'size' => chunk.size, 'compressed' => @compress }
option['chunk'] = Base64.encode64(chunk.unique_id) if @sender.require_ack_response

Expand All @@ -659,6 +655,10 @@ def send_data(tag, chunk)
connect(nil, require_ack: @sender.require_ack_response) do |sock, ri|
if ri.state != :established
establish_connection(sock, ri)

if ri.state != :established
raise ConnectionClosedError, "failed to establish connection with node #{@name}"
end
end

send_data_actual(sock, tag, chunk)
Expand Down Expand Up @@ -743,7 +743,7 @@ def resolve_dns!

def tick
now = Time.now.to_f
if !@available
unless available?
if @failure.hard_timeout?(now)
@failure.clear
end
Expand All @@ -752,7 +752,7 @@ def tick

if @failure.hard_timeout?(now)
@log.warn "detached forwarding server '#{@name}'", host: @host, port: @port, hard_timeout: true
@available = false
disable!
@resolved_host = nil # expire cached host
@failure.clear
return true
Expand All @@ -762,7 +762,7 @@ def tick
phi = @failure.phi(now)
if phi > @sender.phi_threshold
@log.warn "detached forwarding server '#{@name}'", host: @host, port: @port, phi: phi, phi_threshold: @sender.phi_threshold
@available = false
disable!
@resolved_host = nil # expire cached host
@failure.clear
return true
Expand All @@ -774,7 +774,7 @@ def tick
def heartbeat(detect=true)
now = Time.now.to_f
@failure.add(now)
if detect && !@available && @failure.sample_size > @sender.recover_sample_size
if detect && !available? && @failure.sample_size > @sender.recover_sample_size
@available = true
@log.warn "recovered forwarding server '#{@name}'", host: @host, port: @port
true
Expand Down
10 changes: 5 additions & 5 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,14 @@ def read_ack_from_sock(sock, unpacker)
node = d.instance.nodes.first
stub(node.failure).phi { raise 'Should not be called' }
node.tick
assert_equal node.available, true
assert_true node.available?
end

test 'phi_failure_detector enabled' do
@d = d = create_driver(CONFIG + %[phi_failure_detector true \n phi_threshold 0])
node = d.instance.nodes.first
node.tick
assert_equal node.available, false
assert_false node.available?
end

test 'require_ack_response is disabled in default' do
Expand Down Expand Up @@ -555,7 +555,7 @@ def read_ack_from_sock(sock, unpacker)
{"a" => 2}
]
target_input_driver.end_if{ d.instance.rollback_count > 0 }
target_input_driver.end_if{ !node.available }
target_input_driver.end_if{ !node.available? }
target_input_driver.run(expect_records: 2, timeout: 25) do
d.run(default_tag: 'test', timeout: 20, wait_flush_completion: false, shutdown: false, flush: false) do
delayed_commit_timeout_value = d.instance.delayed_commit_timeout
Expand Down Expand Up @@ -600,7 +600,7 @@ def read_ack_from_sock(sock, unpacker)
{"a" => 2}
]
target_input_driver.end_if{ d.instance.rollback_count > 0 }
target_input_driver.end_if{ !node.available }
target_input_driver.end_if{ !node.available? }
target_input_driver.run(expect_records: 2, timeout: 25) do
d.run(default_tag: 'test', timeout: 20, wait_flush_completion: false, shutdown: false, flush: false) do
delayed_commit_timeout_value = d.instance.delayed_commit_timeout
Expand Down Expand Up @@ -840,7 +840,7 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG

stub(node.failure).phi { raise 'Should not be called' }
node.tick
assert_equal node.available, true
assert_true node.available?
end

test 'heartbeat_type_udp' do
Expand Down

0 comments on commit 5662a39

Please sign in to comment.