From 94528e365f694888c3aa968ecaaca60982c3ec4b Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 30 Jul 2019 18:03:30 +0900 Subject: [PATCH 1/4] It should check if established is success Becuase the method available? does not represent established connect is success or not and load_balancer doesn't chose disabled node. Signed-off-by: Yuta Iwama --- lib/fluent/plugin/out_forward.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index c820b07907..0b9af33fc6 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -630,10 +630,6 @@ def establish_connection(sock, ri) end def send_data_actual(sock, tag, chunk) - unless available? - raise ConnectionClosedError, "failed to establish connection with node #{@name}" - end - option = { 'size' => chunk.size, 'compressed' => @compress } option['chunk'] = Base64.encode64(chunk.unique_id) if @sender.require_ack_response @@ -659,6 +655,10 @@ def send_data(tag, chunk) connect(nil, require_ack: @sender.require_ack_response) do |sock, ri| if ri.state != :established establish_connection(sock, ri) + + if ri.state != :established + raise ConnectionClosedError, "failed to establish connection with node #{@name}" + end end send_data_actual(sock, tag, chunk) From b6b4f124532c457681739a11897ad1cf823ea2af Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 30 Jul 2019 18:09:07 +0900 Subject: [PATCH 2/4] checking available? is no need Because the only way to disable node in this loop is calling disable! in this method. At the same time, calling disable calls break as well. Signed-off-by: Yuta Iwama --- lib/fluent/plugin/out_forward.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 0b9af33fc6..8e98148a6f 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -590,7 +590,7 @@ def verify_connection end def establish_connection(sock, ri) - while available? && ri.state != :established + while ri.state != :established begin # TODO: On Ruby 2.2 or earlier, read_nonblock doesn't work expectedly. # We need rewrite around here using new socket/server plugin helper. From cd169b0e220890c5897bbde9b76ae78c4c456cb4 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 30 Jul 2019 18:12:45 +0900 Subject: [PATCH 3/4] Use instance method Signed-off-by: Yuta Iwama --- lib/fluent/plugin/out_forward.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 8e98148a6f..6df01f5b91 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -743,7 +743,7 @@ def resolve_dns! def tick now = Time.now.to_f - if !@available + unless available? if @failure.hard_timeout?(now) @failure.clear end @@ -752,7 +752,7 @@ def tick if @failure.hard_timeout?(now) @log.warn "detached forwarding server '#{@name}'", host: @host, port: @port, hard_timeout: true - @available = false + disable! @resolved_host = nil # expire cached host @failure.clear return true @@ -762,7 +762,7 @@ def tick phi = @failure.phi(now) if phi > @sender.phi_threshold @log.warn "detached forwarding server '#{@name}'", host: @host, port: @port, phi: phi, phi_threshold: @sender.phi_threshold - @available = false + disable! @resolved_host = nil # expire cached host @failure.clear return true @@ -774,7 +774,7 @@ def tick def heartbeat(detect=true) now = Time.now.to_f @failure.add(now) - if detect && !@available && @failure.sample_size > @sender.recover_sample_size + if detect && !available? && @failure.sample_size > @sender.recover_sample_size @available = true @log.warn "recovered forwarding server '#{@name}'", host: @host, port: @port true From ebc6b5d07c1e6b9023634814661331d30c04cf80 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 30 Jul 2019 18:14:01 +0900 Subject: [PATCH 4/4] use available? and delete method for test Signed-off-by: Yuta Iwama --- lib/fluent/plugin/out_forward.rb | 2 +- test/plugin/test_out_forward.rb | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 6df01f5b91..d90982f1f2 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -560,7 +560,7 @@ def initialize(sender, server, failure:, connection_manager:) attr_reader :name, :host, :port, :weight, :standby, :state attr_reader :sockaddr # used by on_udp_heatbeat_response_recv - attr_reader :failure, :available # for test + attr_reader :failure # for test def validate_host_resolution! resolved_host diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index 2af2ae30f6..5881957d58 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -235,14 +235,14 @@ def read_ack_from_sock(sock, unpacker) node = d.instance.nodes.first stub(node.failure).phi { raise 'Should not be called' } node.tick - assert_equal node.available, true + assert_true node.available? end test 'phi_failure_detector enabled' do @d = d = create_driver(CONFIG + %[phi_failure_detector true \n phi_threshold 0]) node = d.instance.nodes.first node.tick - assert_equal node.available, false + assert_false node.available? end test 'require_ack_response is disabled in default' do @@ -555,7 +555,7 @@ def read_ack_from_sock(sock, unpacker) {"a" => 2} ] target_input_driver.end_if{ d.instance.rollback_count > 0 } - target_input_driver.end_if{ !node.available } + target_input_driver.end_if{ !node.available? } target_input_driver.run(expect_records: 2, timeout: 25) do d.run(default_tag: 'test', timeout: 20, wait_flush_completion: false, shutdown: false, flush: false) do delayed_commit_timeout_value = d.instance.delayed_commit_timeout @@ -600,7 +600,7 @@ def read_ack_from_sock(sock, unpacker) {"a" => 2} ] target_input_driver.end_if{ d.instance.rollback_count > 0 } - target_input_driver.end_if{ !node.available } + target_input_driver.end_if{ !node.available? } target_input_driver.run(expect_records: 2, timeout: 25) do d.run(default_tag: 'test', timeout: 20, wait_flush_completion: false, shutdown: false, flush: false) do delayed_commit_timeout_value = d.instance.delayed_commit_timeout @@ -840,7 +840,7 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG stub(node.failure).phi { raise 'Should not be called' } node.tick - assert_equal node.available, true + assert_true node.available? end test 'heartbeat_type_udp' do