Skip to content

Commit

Permalink
Merge pull request #1058 from okkez/migrate-v0.14-api-filter_stdout
Browse files Browse the repository at this point in the history
Migrate filter_stdout plugin to v0.14 API
  • Loading branch information
tagomoris authored Aug 1, 2016
2 parents 26eebdb + 97816ad commit ab92993
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 85 deletions.
29 changes: 18 additions & 11 deletions lib/fluent/plugin/filter_stdout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,38 @@
# limitations under the License.
#

require 'fluent/filter'
require 'fluent/plugin'
require 'fluent/plugin/filter'

module Fluent
module Fluent::Plugin
class StdoutFilter < Filter
Plugin.register_filter('stdout', self)
Fluent::Plugin.register_filter('stdout', self)

helpers :formatter, :compat_parameters, :inject

DEFAULT_FORMAT_TYPE = 'stdout'

config_section :format do
config_set_default :@type, DEFAULT_FORMAT_TYPE
end

# for tests
attr_reader :formatter

desc 'The format of the output.'
config_param :format, :string, default: 'stdout'
# config_param :output_type, :string, :default => 'json' (StdoutFormatter defines this)

def configure(conf)
compat_parameters_convert(conf, :inject, :formatter)
super
end

@formatter = Plugin.new_formatter(@format)
@formatter.configure(conf)
def start
@formatter = formatter_create(conf: @config.elements('format').first, default_type: DEFAULT_FORMAT_TYPE)
super
end

def filter_stream(tag, es)
es.each { |time, record|
begin
log.write @formatter.format(tag, time, record)
r = inject_values_to_record(tag, time, record)
log.write @formatter.format(tag, time, r)
rescue => e
router.emit_error_event(tag, time, record, e)
end
Expand Down
9 changes: 9 additions & 0 deletions lib/fluent/test/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ def msgpack(type)
raise ArgumentError, "unknown msgpack object type '#{type}'"
end
end

def capture_log(driver)
tmp = driver.instance.log.out
driver.instance.log.out = StringIO.new
yield
return driver.instance.log.out.string
ensure
driver.instance.log.out = tmp
end
end
end
end
245 changes: 171 additions & 74 deletions test/plugin/test_filter_stdout.rb
Original file line number Diff line number Diff line change
@@ -1,114 +1,211 @@
require_relative '../helper'
require 'fluent/test/driver/filter'
require 'fluent/plugin/filter_stdout'
require 'timecop'
require 'flexmock/test_unit'

class StdoutFilterTest < Test::Unit::TestCase
include Fluent
include FlexMock::TestCase

def setup
Fluent::Test.setup
@old_tz = ENV["TZ"]
ENV["TZ"] = "UTC"
Timecop.freeze
end

def teardown
super # FlexMock::TestCase requires this
# http://flexmock.rubyforge.org/FlexMock/TestCase.html
Timecop.return
ENV["TZ"] = @old_tz
end

CONFIG = %[
]
CONFIG = config_element('ROOT')

def create_driver(conf = CONFIG)
Test::FilterTestDriver.new(StdoutFilter, 'filter.test').configure(conf)
Fluent::Test::Driver::Filter.new(Fluent::Plugin::StdoutFilter).configure(conf)
end

def emit(d, msg, time)
def filter(d, time, record)
d.run {
d.emit(msg, time)
}.filtered_as_array[0][2]
d.feed("filter.test", time, record)
}
d.filtered_records
end

def test_through_record
d = create_driver
time = Time.now
filtered = emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time))
assert_equal({'test' => 'test'}, filtered)
filtered = filter(d, event_time, {'test' => 'test'})
assert_equal([{'test' => 'test'}], filtered)
end

def test_configure_default
d = create_driver
assert_equal 'json', d.instance.formatter.output_type
end

def test_configure_output_type
d = create_driver(CONFIG + "\noutput_type json")
assert_equal 'json', d.instance.formatter.output_type

d = create_driver(CONFIG + "\noutput_type hash")
assert_equal 'hash', d.instance.formatter.output_type

d = create_driver(CONFIG + "\noutput_type ltsv")
assert_equal 'ltsv', d.instance.formatter.output_type

assert_raise(Fluent::ConfigError) do
d = create_driver(CONFIG + "\noutput_type foo")
sub_test_case "flat style parameters" do
sub_test_case "configure" do
def test_configure_default
d = create_driver
d.run {}
assert_equal 'json', d.instance.formatter.output_type
end

data(json: "json",
hash: "hash",
ltsv: "ltsv")
def test_output_type(data)
d = create_driver(CONFIG + config_element("", "", { "output_type" => data }))
d.run {}
assert_equal data, d.instance.formatter.output_type
end

def test_invalid_output_type
assert_raise(Fluent::ConfigError) do
d = create_driver(CONFIG + config_element("", "", { "output_type" => "foo" }))
d.run {}
end
end
end
end

def test_output_type_json
d = create_driver(CONFIG + "\noutput_type json")
time = Time.now
out = capture_log(d) { emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out

# NOTE: Float::NAN is not jsonable
d = create_driver(CONFIG + "\noutput_type json")
flexmock(d.instance.router).should_receive(:emit_error_event)
emit(d, {'test' => Float::NAN}, time)
end

def test_output_type_hash
d = create_driver(CONFIG + "\noutput_type hash")
time = Time.now
out = capture_log(d) { emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out
def test_output_type_json
d = create_driver(CONFIG + config_element("", "", { "output_type" => "json" }))
etime = event_time
time = Time.at(etime.sec)
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out

# NOTE: Float::NAN is not jsonable
d = create_driver(CONFIG + config_element("", "", { "output_type" => "json" }))
flexmock(d.instance.router).should_receive(:emit_error_event)
filter(d, etime, {'test' => Float::NAN})
end

# NOTE: Float::NAN is not jsonable, but hash string can output it.
d = create_driver(CONFIG + "\noutput_type hash")
out = capture_log(d) { emit(d, {'test' => Float::NAN}, Fluent::EventTime.from_time(time)) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out
end
def test_output_type_hash
d = create_driver(CONFIG + config_element("", "", { "output_type" => "hash" }))
etime = event_time
time = Time.at(etime.sec)
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out

# NOTE: Float::NAN is not jsonable, but hash string can output it.
d = create_driver(CONFIG + config_element("", "", { "output_type" => "hash" }))
out = capture_log(d) { filter(d, etime, {'test' => Float::NAN}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out
end

# Use include_time_key to output the message's time
def test_include_time_key
d = create_driver(CONFIG + "\noutput_type json\ninclude_time_key true\nutc")
time = Time.now
message_time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC")
out = capture_log(d) { emit(d, {'test' => 'test'}, message_time) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out
end
# Use include_time_key to output the message's time
def test_include_time_key
config = config_element("", "", {
"output_type" => "json",
"include_time_key" => true,
"localtime" => false
})
d = create_driver(config)
etime = event_time
time = Time.at(etime.sec)
message_time = event_time("2011-01-02 13:14:15 UTC")
out = capture_log(d) { filter(d, message_time, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out
end

# out_stdout formatter itself can also be replaced
def test_format_json
d = create_driver(CONFIG + "\nformat json")
time = Time.now
out = capture_log(d) { emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) }
assert_equal "{\"test\":\"test\"}\n", out
# out_stdout formatter itself can also be replaced
def test_format_json
d = create_driver(CONFIG + config_element("", "", { "format" => "json" }))
out = capture_log(d) { filter(d, event_time, {'test' => 'test'}) }
assert_equal "{\"test\":\"test\"}\n", out
end
end

private
sub_test_case "with <format> sub section" do
sub_test_case "configure" do
def test_default
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout"})
d = create_driver(conf)
d.run {}
assert_equal("json", d.instance.formatter.output_type)
end

data(json: "json",
hash: "hash",
ltsv: "ltsv")
def test_output_type(data)
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => data })
d = create_driver(conf)
d.run {}
assert_equal(data, d.instance.formatter.output_type)
end

def test_invalid_output_type
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "foo" })
assert_raise(Fluent::ConfigError) do
d = create_driver(conf)
d.run {}
end
end
end

# Capture the log output of the block given
def capture_log(d, &block)
tmp = d.instance.log.out
d.instance.log.out = StringIO.new
yield
return d.instance.log.out.string
ensure
d.instance.log.out = tmp
sub_test_case "output_type" do
def test_json
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "json" })
d = create_driver(conf)
etime = event_time
time = Time.at(etime.sec)
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out
end

def test_json_nan
# NOTE: Float::NAN is not jsonable
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "json" })
d = create_driver(conf)
etime = event_time
flexmock(d.instance.router).should_receive(:emit_error_event)
filter(d, etime, {'test' => Float::NAN})
end

def test_hash
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "hash" })
d = create_driver(conf)
etime = event_time
time = Time.at(etime.sec)
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out
end

def test_hash_nan
# NOTE: Float::NAN is not jsonable, but hash string can output it.
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "hash" })
d = create_driver(conf)
etime = event_time
time = Time.at(etime.sec)
out = capture_log(d) { filter(d, etime, {'test' => Float::NAN}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out
end

# Use include_time_key to output the message's time
def test_include_time_key
conf = config_element
conf.elements << config_element("format", "", {
"@type" => "stdout",
"output_type" => "json"
})
conf.elements << config_element("inject", "", {
"time_key" => "time",
"time_type" => "string",
"localtime" => false
})
d = create_driver(conf)
etime = event_time
time = Time.at(etime.sec)
message_time = event_time("2011-01-02 13:14:15 UTC")
out = capture_log(d) { filter(d, message_time, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out
end
end
end
end

0 comments on commit ab92993

Please sign in to comment.