From be68fbd27277ca4e2ac9cf924bf95b247b9e8408 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 7 Oct 2016 21:19:01 +0900 Subject: [PATCH] working... --- lib/fluent/config/configure_proxy.rb | 9 ------ lib/fluent/config/section.rb | 7 +++++ lib/fluent/plugin/formatter_stdout.rb | 5 +-- lib/fluent/plugin/in_syslog.rb | 1 + lib/fluent/plugin/out_stdout.rb | 12 +++---- lib/fluent/plugin_helper/formatter.rb | 21 +++++++------ lib/fluent/plugin_helper/parser.rb | 5 ++- test/plugin/test_filter_stdout.rb | 45 +++++++++++---------------- test/plugin/test_out_stdout.rb | 20 ++++++++---- 9 files changed, 64 insertions(+), 61 deletions(-) diff --git a/lib/fluent/config/configure_proxy.rb b/lib/fluent/config/configure_proxy.rb index f3b2fc42ee..6322260632 100644 --- a/lib/fluent/config/configure_proxy.rb +++ b/lib/fluent/config/configure_proxy.rb @@ -339,15 +339,6 @@ def config_section(name, **kwargs, &block) sub_proxy = ConfigureProxy.new(name, type_lookup: @type_lookup, **kwargs) sub_proxy.instance_exec(&block) - if sub_proxy.init? - if sub_proxy.argument && !sub_proxy.defaults.has_key?(sub_proxy.argument.first) - raise ArgumentError, "#{name}: init is specified, but default value of argument is missing" - end - if sub_proxy.params.keys.any?{|param_name| !sub_proxy.defaults.has_key?(param_name)} - raise ArgumentError, "#{name}: init is specified, but there're parameters without default values" - end - end - @params.delete(name) @sections[name] = sub_proxy diff --git a/lib/fluent/config/section.rb b/lib/fluent/config/section.rb index df983d599d..2d2e785c0a 100644 --- a/lib/fluent/config/section.rb +++ b/lib/fluent/config/section.rb @@ -166,6 +166,13 @@ def self.generate(proxy, conf, logger, plugin_class, stack = []) varname = subproxy.variable_name elements = (conf.respond_to?(:elements) ? conf.elements : []).select{ |e| e.name == subproxy.name.to_s || e.name == subproxy.alias.to_s } if elements.empty? && subproxy.init? + if subproxy.argument && !subproxy.defaults.has_key?(subproxy.argument.first) + raise ArgumentError, "#{name}: init is specified, but default value of argument is missing" + end + missing_keys = subproxy.params.keys.select{|param_name| !subproxy.defaults.has_key?(param_name)} + if !missing_keys.empty? + raise ArgumentError, "#{name}: init is specified, but there're parameters without default values:#{missing_keys.join(',')}" + end elements << Fluent::Config::Element.new(subproxy.name.to_s, '', {}, []) end diff --git a/lib/fluent/plugin/formatter_stdout.rb b/lib/fluent/plugin/formatter_stdout.rb index 272a4e22a0..cb29469605 100644 --- a/lib/fluent/plugin/formatter_stdout.rb +++ b/lib/fluent/plugin/formatter_stdout.rb @@ -21,6 +21,8 @@ module Plugin class StdoutFormatter < Formatter Plugin.register_formatter('stdout', self) + TIME_FORMAT = '%Y-%m-%d %H:%M:%S.%9N %z' + config_param :output_type, :string, default: 'json' def configure(conf) @@ -36,8 +38,7 @@ def start end def format(tag, time, record) - header = "#{Time.now.localtime} #{tag}: " - "#{header}#{@sub_formatter.format(tag, time, record)}" + "#{Time.at(time).localtime.strftime(TIME_FORMAT)} #{tag}: #{@sub_formatter.format(tag, time, record).chomp}\n" end def stop diff --git a/lib/fluent/plugin/in_syslog.rb b/lib/fluent/plugin/in_syslog.rb index 5e2b098e7c..602f0f4aed 100644 --- a/lib/fluent/plugin/in_syslog.rb +++ b/lib/fluent/plugin/in_syslog.rb @@ -101,6 +101,7 @@ def configure(conf) @use_default = false + # TODO: use config_section :parse and shrink this `if` if conf.has_key?('format') @parser = parser_create(usage: 'syslog_input', type: conf['format'], conf: conf) else diff --git a/lib/fluent/plugin/out_stdout.rb b/lib/fluent/plugin/out_stdout.rb index a4115aac67..c48bf7cfab 100644 --- a/lib/fluent/plugin/out_stdout.rb +++ b/lib/fluent/plugin/out_stdout.rb @@ -22,6 +22,7 @@ class StdoutOutput < Output helpers :inject, :formatter, :compat_parameters + DEFAULT_LINE_FORMAT_TYPE = 'stdout' DEFAULT_FORMAT_TYPE = 'json' TIME_FORMAT = '%Y-%m-%d %H:%M:%S.%9N %z' @@ -32,7 +33,8 @@ class StdoutOutput < Output end config_section :format do - config_set_default :@type, DEFAULT_FORMAT_TYPE + config_set_default :@type, DEFAULT_LINE_FORMAT_TYPE + config_set_default :output_type, DEFAULT_FORMAT_TYPE end def prefer_buffered_processing @@ -44,6 +46,7 @@ def prefer_delayed_commit end attr_accessor :delayed + attr_accessor :formatter def initialize super @@ -51,14 +54,11 @@ def initialize end def configure(conf) - if conf['output_type'] && !conf['format'] - conf['format'] = conf['output_type'] - end compat_parameters_convert(conf, :inject, :formatter) super - @formatter = formatter_create(conf: conf.elements('format').first, default_type: DEFAULT_FORMAT_TYPE) + @formatter = formatter_create end def process(tag, es) @@ -70,7 +70,7 @@ def process(tag, es) def format(tag, time, record) record = inject_values_to_record(tag, time, record) - "#{Time.at(time).localtime.strftime(TIME_FORMAT)} #{tag}: #{@formatter.format(tag, time, record).chomp}\n" + @formatter.format(tag, time, record).chomp + "\n" end def write(chunk) diff --git a/lib/fluent/plugin_helper/formatter.rb b/lib/fluent/plugin_helper/formatter.rb index 1133ad42a5..881d8dfd49 100644 --- a/lib/fluent/plugin_helper/formatter.rb +++ b/lib/fluent/plugin_helper/formatter.rb @@ -61,9 +61,9 @@ def formatter_create(usage: '', type: nil, conf: nil, default_type: nil) module FormatterParams include Fluent::Configurable # minimum section definition to instantiate formatter plugin instances - config_section :format, required: false, multi: true, param_name: :formatter_configs do + config_section :format, required: false, multi: true, init: true, param_name: :formatter_configs do config_argument :usage, :string, default: '' - config_param :@type, :string + config_param :@type, :string # config_set_default required for :@type end end @@ -82,15 +82,16 @@ def initialize def configure(conf) super - if @formatter_configs - @formatter_configs.each do |section| - if @_formatters[section.usage] - raise Fluent::ConfigError, "duplicated formatter configured: #{section.usage}" - end - formatter = Plugin.new_formatter(section[:@type], parent: self) - formatter.configure(section.corresponding_config_element) - @_formatters[section.usage] = formatter + @formatter_configs.each do |section| + if section[:@type].nil? + raise "BUG: plugin uses formatter plugin helper must call 'config_set_default :@type' in 'config_section :format'" end + if @_formatters[section.usage] + raise Fluent::ConfigError, "duplicated formatter configured: #{section.usage}" + end + formatter = Plugin.new_formatter(section[:@type], parent: self) + formatter.configure(section.corresponding_config_element) + @_formatters[section.usage] = formatter end end diff --git a/lib/fluent/plugin_helper/parser.rb b/lib/fluent/plugin_helper/parser.rb index d10ac4b065..417459baa5 100644 --- a/lib/fluent/plugin_helper/parser.rb +++ b/lib/fluent/plugin_helper/parser.rb @@ -63,7 +63,7 @@ module ParserParams # minimum section definition to instantiate parser plugin instances config_section :parse, required: false, multi: true, param_name: :parser_configs do config_argument :usage, :string, default: '' - config_param :@type, :string + config_param :@type, :string # config_set_default required for :@type end end @@ -83,6 +83,9 @@ def configure(conf) super @parser_configs.each do |section| + if section[:@type].nil? + raise "BUG: plugin uses parser plugin helper must call 'config_set_default :@type' in 'config_section :parse'" + end if @_parsers[section.usage] raise Fluent::ConfigError, "duplicated parsers configured: #{section.usage}" end diff --git a/test/plugin/test_filter_stdout.rb b/test/plugin/test_filter_stdout.rb index 55c7623db1..af427f05fa 100644 --- a/test/plugin/test_filter_stdout.rb +++ b/test/plugin/test_filter_stdout.rb @@ -67,10 +67,9 @@ def test_invalid_output_type def test_output_type_json d = create_driver(CONFIG + config_element("", "", { "output_type" => "json" })) - etime = event_time - time = Time.at(etime.sec) + etime = event_time("2016-10-07 21:09:31.012345678 UTC") out = capture_log(d) { filter(d, etime, {'test' => 'test'}) } - assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out + assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\":\"test\"}\n", out # NOTE: Float::NAN is not jsonable d = create_driver(CONFIG + config_element("", "", { "output_type" => "json" })) @@ -80,15 +79,14 @@ def test_output_type_json def test_output_type_hash d = create_driver(CONFIG + config_element("", "", { "output_type" => "hash" })) - etime = event_time - time = Time.at(etime.sec) + etime = event_time("2016-10-07 21:09:31.012345678 UTC") out = capture_log(d) { filter(d, etime, {'test' => 'test'}) } - assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out + assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\"=>\"test\"}\n", out # NOTE: Float::NAN is not jsonable, but hash string can output it. d = create_driver(CONFIG + config_element("", "", { "output_type" => "hash" })) out = capture_log(d) { filter(d, etime, {'test' => Float::NAN}) } - assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out + assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\"=>NaN}\n", out end # Use include_time_key to output the message's time @@ -99,11 +97,9 @@ def test_include_time_key "localtime" => false }) d = create_driver(config) - etime = event_time - time = Time.at(etime.sec) - message_time = event_time("2011-01-02 13:14:15 UTC") - out = capture_log(d) { filter(d, message_time, {'test' => 'test'}) } - assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out + etime = event_time("2016-10-07 21:09:31.012345678 UTC") + out = capture_log(d) { filter(d, etime, {'test' => 'test'}) } + assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\":\"test\",\"time\":\"2016-10-07T21:09:31Z\"}\n", out end # out_stdout formatter itself can also be replaced @@ -150,10 +146,9 @@ def test_json conf = config_element conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "json" }) d = create_driver(conf) - etime = event_time - time = Time.at(etime.sec) + etime = event_time("2016-10-07 21:09:31.012345678 UTC") out = capture_log(d) { filter(d, etime, {'test' => 'test'}) } - assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out + assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\":\"test\"}\n", out end def test_json_nan @@ -161,7 +156,7 @@ def test_json_nan conf = config_element conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "json" }) d = create_driver(conf) - etime = event_time + etime = event_time("2016-10-07 21:09:31.012345678 UTC") flexmock(d.instance.router).should_receive(:emit_error_event) filter(d, etime, {'test' => Float::NAN}) end @@ -170,10 +165,9 @@ def test_hash conf = config_element conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "hash" }) d = create_driver(conf) - etime = event_time - time = Time.at(etime.sec) + etime = event_time("2016-10-07 21:09:31.012345678 UTC") out = capture_log(d) { filter(d, etime, {'test' => 'test'}) } - assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out + assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\"=>\"test\"}\n", out end def test_hash_nan @@ -181,10 +175,9 @@ def test_hash_nan conf = config_element conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "hash" }) d = create_driver(conf) - etime = event_time - time = Time.at(etime.sec) + etime = event_time("2016-10-07 21:09:31.012345678 UTC") out = capture_log(d) { filter(d, etime, {'test' => Float::NAN}) } - assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out + assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\"=>NaN}\n", out end # Use include_time_key to output the message's time @@ -200,11 +193,9 @@ def test_include_time_key "localtime" => false }) d = create_driver(conf) - etime = event_time - time = Time.at(etime.sec) - message_time = event_time("2011-01-02 13:14:15 UTC") - out = capture_log(d) { filter(d, message_time, {'test' => 'test'}) } - assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out + etime = event_time("2016-10-07 21:09:31.012345678 UTC") + out = capture_log(d) { filter(d, etime, {'test' => 'test'}) } + assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\":\"test\",\"time\":\"2016-10-07T21:09:31Z\"}\n", out end end end diff --git a/test/plugin/test_out_stdout.rb b/test/plugin/test_out_stdout.rb index c8e99ed4ef..909244d16d 100644 --- a/test/plugin/test_out_stdout.rb +++ b/test/plugin/test_out_stdout.rb @@ -18,15 +18,19 @@ def create_driver(conf = CONFIG) sub_test_case 'non-buffered' do test 'configure' do d = create_driver - assert_equal [], d.instance.formatter_configs + assert_equal 1, d.instance.formatter_configs.size # init: true + assert_kind_of Fluent::Plugin::StdoutFormatter, d.instance.formatter + assert_equal 'json', d.instance.formatter.output_type end test 'configure output_type' do d = create_driver(CONFIG + "\noutput_type json") - assert_equal 'json', d.instance.formatter_configs.first[:@type] + assert_kind_of Fluent::Plugin::StdoutFormatter, d.instance.formatter + assert_equal 'json', d.instance.formatter.output_type d = create_driver(CONFIG + "\noutput_type hash") - assert_equal 'hash', d.instance.formatter_configs.first[:@type] + assert_kind_of Fluent::Plugin::StdoutFormatter, d.instance.formatter + assert_equal 'hash', d.instance.formatter.output_type assert_raise(Fluent::ConfigError) do d = create_driver(CONFIG + "\noutput_type foo") @@ -83,7 +87,9 @@ def create_driver(conf = CONFIG) sub_test_case 'buffered' do test 'configure' do d = create_driver(config_element("ROOT", "", {}, [config_element("buffer")])) - assert_equal [], d.instance.formatter_configs + assert_equal 1, d.instance.formatter_configs.size + assert_kind_of Fluent::Plugin::StdoutFormatter, d.instance.formatter + assert_equal 'json', d.instance.formatter.output_type assert_equal 10 * 1024, d.instance.buffer_config.chunk_limit_size assert d.instance.buffer_config.flush_at_shutdown assert_equal ['tag'], d.instance.buffer_config.chunk_keys @@ -94,10 +100,12 @@ def create_driver(conf = CONFIG) test 'configure with output_type' do d = create_driver(config_element("ROOT", "", {"output_type" => "json"}, [config_element("buffer")])) - assert_equal 'json', d.instance.formatter_configs.first[:@type] + assert_kind_of Fluent::Plugin::StdoutFormatter, d.instance.formatter + assert_equal 'json', d.instance.formatter.output_type d = create_driver(config_element("ROOT", "", {"output_type" => "hash"}, [config_element("buffer")])) - assert_equal 'hash', d.instance.formatter_configs.first[:@type] + assert_kind_of Fluent::Plugin::StdoutFormatter, d.instance.formatter + assert_equal 'hash', d.instance.formatter.output_type assert_raise(Fluent::ConfigError) do create_driver(config_element("ROOT", "", {"output_type" => "foo"}, [config_element("buffer")]))