diff --git a/lib/fluent/plugin/filter_stdout.rb b/lib/fluent/plugin/filter_stdout.rb index 4fc14d4c43..0949cc2c5e 100644 --- a/lib/fluent/plugin/filter_stdout.rb +++ b/lib/fluent/plugin/filter_stdout.rb @@ -14,31 +14,38 @@ # limitations under the License. # -require 'fluent/filter' -require 'fluent/plugin' +require 'fluent/plugin/filter' -module Fluent +module Fluent::Plugin class StdoutFilter < Filter - Plugin.register_filter('stdout', self) + Fluent::Plugin.register_filter('stdout', self) + + helpers :formatter, :compat_parameters, :inject + + DEFAULT_FORMAT_TYPE = 'stdout' + + config_section :format do + config_set_default :@type, DEFAULT_FORMAT_TYPE + end # for tests attr_reader :formatter - desc 'The format of the output.' - config_param :format, :string, default: 'stdout' - # config_param :output_type, :string, :default => 'json' (StdoutFormatter defines this) - def configure(conf) + compat_parameters_convert(conf, :inject, :formatter) super + end - @formatter = Plugin.new_formatter(@format) - @formatter.configure(conf) + def start + @formatter = formatter_create(conf: @config.elements('format').first, default_type: DEFAULT_FORMAT_TYPE) + super end def filter_stream(tag, es) es.each { |time, record| begin - log.write @formatter.format(tag, time, record) + r = inject_values_to_record(tag, time, record) + log.write @formatter.format(tag, time, r) rescue => e router.emit_error_event(tag, time, record, e) end diff --git a/lib/fluent/test/helpers.rb b/lib/fluent/test/helpers.rb index 32bd048796..acc113bdc3 100644 --- a/lib/fluent/test/helpers.rb +++ b/lib/fluent/test/helpers.rb @@ -65,6 +65,15 @@ def msgpack(type) raise ArgumentError, "unknown msgpack object type '#{type}'" end end + + def capture_log(driver) + tmp = driver.instance.log.out + driver.instance.log.out = StringIO.new + yield + return driver.instance.log.out.string + ensure + driver.instance.log.out = tmp + end end end end diff --git a/test/plugin/test_filter_stdout.rb b/test/plugin/test_filter_stdout.rb index 4c07a412fd..55c7623db1 100644 --- a/test/plugin/test_filter_stdout.rb +++ b/test/plugin/test_filter_stdout.rb @@ -1,14 +1,16 @@ require_relative '../helper' +require 'fluent/test/driver/filter' require 'fluent/plugin/filter_stdout' require 'timecop' require 'flexmock/test_unit' class StdoutFilterTest < Test::Unit::TestCase - include Fluent include FlexMock::TestCase def setup Fluent::Test.setup + @old_tz = ENV["TZ"] + ENV["TZ"] = "UTC" Timecop.freeze end @@ -16,99 +18,194 @@ def teardown super # FlexMock::TestCase requires this # http://flexmock.rubyforge.org/FlexMock/TestCase.html Timecop.return + ENV["TZ"] = @old_tz end - CONFIG = %[ - ] + CONFIG = config_element('ROOT') def create_driver(conf = CONFIG) - Test::FilterTestDriver.new(StdoutFilter, 'filter.test').configure(conf) + Fluent::Test::Driver::Filter.new(Fluent::Plugin::StdoutFilter).configure(conf) end - def emit(d, msg, time) + def filter(d, time, record) d.run { - d.emit(msg, time) - }.filtered_as_array[0][2] + d.feed("filter.test", time, record) + } + d.filtered_records end def test_through_record d = create_driver - time = Time.now - filtered = emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) - assert_equal({'test' => 'test'}, filtered) + filtered = filter(d, event_time, {'test' => 'test'}) + assert_equal([{'test' => 'test'}], filtered) end - def test_configure_default - d = create_driver - assert_equal 'json', d.instance.formatter.output_type - end - - def test_configure_output_type - d = create_driver(CONFIG + "\noutput_type json") - assert_equal 'json', d.instance.formatter.output_type - - d = create_driver(CONFIG + "\noutput_type hash") - assert_equal 'hash', d.instance.formatter.output_type - - d = create_driver(CONFIG + "\noutput_type ltsv") - assert_equal 'ltsv', d.instance.formatter.output_type - - assert_raise(Fluent::ConfigError) do - d = create_driver(CONFIG + "\noutput_type foo") + sub_test_case "flat style parameters" do + sub_test_case "configure" do + def test_configure_default + d = create_driver + d.run {} + assert_equal 'json', d.instance.formatter.output_type + end + + data(json: "json", + hash: "hash", + ltsv: "ltsv") + def test_output_type(data) + d = create_driver(CONFIG + config_element("", "", { "output_type" => data })) + d.run {} + assert_equal data, d.instance.formatter.output_type + end + + def test_invalid_output_type + assert_raise(Fluent::ConfigError) do + d = create_driver(CONFIG + config_element("", "", { "output_type" => "foo" })) + d.run {} + end + end end - end - - def test_output_type_json - d = create_driver(CONFIG + "\noutput_type json") - time = Time.now - out = capture_log(d) { emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) } - assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out - - # NOTE: Float::NAN is not jsonable - d = create_driver(CONFIG + "\noutput_type json") - flexmock(d.instance.router).should_receive(:emit_error_event) - emit(d, {'test' => Float::NAN}, time) - end - def test_output_type_hash - d = create_driver(CONFIG + "\noutput_type hash") - time = Time.now - out = capture_log(d) { emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) } - assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out + def test_output_type_json + d = create_driver(CONFIG + config_element("", "", { "output_type" => "json" })) + etime = event_time + time = Time.at(etime.sec) + out = capture_log(d) { filter(d, etime, {'test' => 'test'}) } + assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out + + # NOTE: Float::NAN is not jsonable + d = create_driver(CONFIG + config_element("", "", { "output_type" => "json" })) + flexmock(d.instance.router).should_receive(:emit_error_event) + filter(d, etime, {'test' => Float::NAN}) + end - # NOTE: Float::NAN is not jsonable, but hash string can output it. - d = create_driver(CONFIG + "\noutput_type hash") - out = capture_log(d) { emit(d, {'test' => Float::NAN}, Fluent::EventTime.from_time(time)) } - assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out - end + def test_output_type_hash + d = create_driver(CONFIG + config_element("", "", { "output_type" => "hash" })) + etime = event_time + time = Time.at(etime.sec) + out = capture_log(d) { filter(d, etime, {'test' => 'test'}) } + assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out + + # NOTE: Float::NAN is not jsonable, but hash string can output it. + d = create_driver(CONFIG + config_element("", "", { "output_type" => "hash" })) + out = capture_log(d) { filter(d, etime, {'test' => Float::NAN}) } + assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out + end - # Use include_time_key to output the message's time - def test_include_time_key - d = create_driver(CONFIG + "\noutput_type json\ninclude_time_key true\nutc") - time = Time.now - message_time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") - out = capture_log(d) { emit(d, {'test' => 'test'}, message_time) } - assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out - end + # Use include_time_key to output the message's time + def test_include_time_key + config = config_element("", "", { + "output_type" => "json", + "include_time_key" => true, + "localtime" => false + }) + d = create_driver(config) + etime = event_time + time = Time.at(etime.sec) + message_time = event_time("2011-01-02 13:14:15 UTC") + out = capture_log(d) { filter(d, message_time, {'test' => 'test'}) } + assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out + end - # out_stdout formatter itself can also be replaced - def test_format_json - d = create_driver(CONFIG + "\nformat json") - time = Time.now - out = capture_log(d) { emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) } - assert_equal "{\"test\":\"test\"}\n", out + # out_stdout formatter itself can also be replaced + def test_format_json + d = create_driver(CONFIG + config_element("", "", { "format" => "json" })) + out = capture_log(d) { filter(d, event_time, {'test' => 'test'}) } + assert_equal "{\"test\":\"test\"}\n", out + end end - private + sub_test_case "with sub section" do + sub_test_case "configure" do + def test_default + conf = config_element + conf.elements << config_element("format", "", { "@type" => "stdout"}) + d = create_driver(conf) + d.run {} + assert_equal("json", d.instance.formatter.output_type) + end + + data(json: "json", + hash: "hash", + ltsv: "ltsv") + def test_output_type(data) + conf = config_element + conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => data }) + d = create_driver(conf) + d.run {} + assert_equal(data, d.instance.formatter.output_type) + end + + def test_invalid_output_type + conf = config_element + conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "foo" }) + assert_raise(Fluent::ConfigError) do + d = create_driver(conf) + d.run {} + end + end + end - # Capture the log output of the block given - def capture_log(d, &block) - tmp = d.instance.log.out - d.instance.log.out = StringIO.new - yield - return d.instance.log.out.string - ensure - d.instance.log.out = tmp + sub_test_case "output_type" do + def test_json + conf = config_element + conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "json" }) + d = create_driver(conf) + etime = event_time + time = Time.at(etime.sec) + out = capture_log(d) { filter(d, etime, {'test' => 'test'}) } + assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out + end + + def test_json_nan + # NOTE: Float::NAN is not jsonable + conf = config_element + conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "json" }) + d = create_driver(conf) + etime = event_time + flexmock(d.instance.router).should_receive(:emit_error_event) + filter(d, etime, {'test' => Float::NAN}) + end + + def test_hash + conf = config_element + conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "hash" }) + d = create_driver(conf) + etime = event_time + time = Time.at(etime.sec) + out = capture_log(d) { filter(d, etime, {'test' => 'test'}) } + assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out + end + + def test_hash_nan + # NOTE: Float::NAN is not jsonable, but hash string can output it. + conf = config_element + conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "hash" }) + d = create_driver(conf) + etime = event_time + time = Time.at(etime.sec) + out = capture_log(d) { filter(d, etime, {'test' => Float::NAN}) } + assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out + end + + # Use include_time_key to output the message's time + def test_include_time_key + conf = config_element + conf.elements << config_element("format", "", { + "@type" => "stdout", + "output_type" => "json" + }) + conf.elements << config_element("inject", "", { + "time_key" => "time", + "time_type" => "string", + "localtime" => false + }) + d = create_driver(conf) + etime = event_time + time = Time.at(etime.sec) + message_time = event_time("2011-01-02 13:14:15 UTC") + out = capture_log(d) { filter(d, message_time, {'test' => 'test'}) } + assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out + end + end end end -