diff --git a/lib/fluent/plugin/out_copy.rb b/lib/fluent/plugin/out_copy.rb index bb4ac5d97c..3d03cae41b 100644 --- a/lib/fluent/plugin/out_copy.rb +++ b/lib/fluent/plugin/out_copy.rb @@ -27,20 +27,28 @@ class CopyOutput < MultiOutput desc 'Pass different record to each `store` plugin by specified method' config_param :copy_mode, :enum, list: [:no_copy, :shallow, :deep, :marshal], default: :no_copy - attr_reader :ignore_errors + attr_reader :ignore_errors, :ignore_if_prev_successes def initialize super @ignore_errors = [] + @ignore_if_prev_successes = [] end def configure(conf) super @copy_proc = gen_copy_proc - @stores.each { |store| - @ignore_errors << (store.arg == 'ignore_error') + @stores.each_with_index { |store, i| + if i == 0 && store.arg.include?('ignore_if_prev_success') + raise Fluent::ConfigError, "ignore_if_prev_success must specify 2nd or later directives" + end + @ignore_errors << (store.arg.include?('ignore_error')) + @ignore_if_prev_successes << (store.arg.include?('ignore_if_prev_success')) } + if @ignore_errors.uniq.size == 1 && @ignore_errors.include?(true) && @ignore_if_prev_successes.include?(false) + log.warn "ignore_errors are specified in all , but ignore_if_prev_success is not specified. Is this intended?" + end end def multi_workers_ready? @@ -55,10 +63,15 @@ def process(tag, es) } es = m end - + success = Array.new(outputs.size) outputs.each_with_index do |output, i| begin - output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es) + if i > 0 && success[i - 1] && @ignore_if_prev_successes[i] + log.debug "ignore copy because prev_success in #{output.plugin_id}", index: i + else + output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es) + success[i] = true + end rescue => e if @ignore_errors[i] log.error "ignore emit error in #{output.plugin_id}", error: e diff --git a/test/plugin/test_out_copy.rb b/test/plugin/test_out_copy.rb index 0c209a29f1..4283f27908 100644 --- a/test/plugin/test_out_copy.rb +++ b/test/plugin/test_out_copy.rb @@ -2,8 +2,11 @@ require 'fluent/test/driver/multi_output' require 'fluent/plugin/out_copy' require 'fluent/event' +require 'flexmock/test_unit' class CopyOutputTest < Test::Unit::TestCase + include FlexMock::TestCase + class << self def startup $LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts')) @@ -54,6 +57,48 @@ def test_configure assert_equal :no_copy, d.instance.copy_mode end + ERRORNEOUS_IGNORE_IF_PREV_SUCCESS_CONFIG = %[ + + @type test + name c0 + + + @type test + name c1 + + + @type test + name c2 + + ] + def test_configure_with_errorneus_ignore_if_prev_success + assert_raise(Fluent::ConfigError) do + create_driver(ERRORNEOUS_IGNORE_IF_PREV_SUCCESS_CONFIG) + end + end + + ALL_IGNORE_ERROR_WITHOUT_IGNORE_IF_PREV_SUCCESS_CONFIG = %[ + @log_level info + + @type test + name c0 + + + @type test + name c1 + + + @type test + name c2 + + ] + def test_configure_all_ignore_errors_without_ignore_if_prev_success + d = create_driver(ALL_IGNORE_ERROR_WITHOUT_IGNORE_IF_PREV_SUCCESS_CONFIG) + expected = /ignore_errors are specified in all , but ignore_if_prev_success is not specified./ + matches = d.logs.grep(expected) + assert_equal(1, matches.length, "Logs do not contain '#{expected}' '#{d.logs}'") + end + def test_configure_with_deep_copy_and_use_shallow_copy_mode d = create_driver(%[ deep_copy true @@ -217,5 +262,47 @@ def test_ignore_error end end end + + IGNORE_IF_PREV_SUCCESS_CONFIG = %[ + + @type test + name c0 + + + @type test + name c1 + + + @type test + name c2 + + ] + + def test_ignore_if_prev_success + d = create_driver(IGNORE_IF_PREV_SUCCESS_CONFIG) + + # override to raise an error + d.instance.outputs[0].define_singleton_method(:process) do |tag, es| + raise ArgumentError, 'Failed' + end + + # check ingore_if_prev_success functionality: + # 1. output 2 is succeeded. + # 2. output 3 is not called. + flexstub(d.instance.outputs[1]) do |output| + output.should_receive(:process).once + end + flexstub(d.instance.outputs[2]) do |output| + output.should_receive(:process).never + end + + time = Time.parse("2011-01-02 13:14:15 UTC").to_i + assert_nothing_raised do + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + end + end + end + end