Skip to content

Commit

Permalink
working...
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Oct 7, 2016
1 parent 0a9d99a commit be68fbd
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 61 deletions.
9 changes: 0 additions & 9 deletions lib/fluent/config/configure_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -339,15 +339,6 @@ def config_section(name, **kwargs, &block)
sub_proxy = ConfigureProxy.new(name, type_lookup: @type_lookup, **kwargs)
sub_proxy.instance_exec(&block)

if sub_proxy.init?
if sub_proxy.argument && !sub_proxy.defaults.has_key?(sub_proxy.argument.first)
raise ArgumentError, "#{name}: init is specified, but default value of argument is missing"
end
if sub_proxy.params.keys.any?{|param_name| !sub_proxy.defaults.has_key?(param_name)}
raise ArgumentError, "#{name}: init is specified, but there're parameters without default values"
end
end

@params.delete(name)
@sections[name] = sub_proxy

Expand Down
7 changes: 7 additions & 0 deletions lib/fluent/config/section.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ def self.generate(proxy, conf, logger, plugin_class, stack = [])
varname = subproxy.variable_name
elements = (conf.respond_to?(:elements) ? conf.elements : []).select{ |e| e.name == subproxy.name.to_s || e.name == subproxy.alias.to_s }
if elements.empty? && subproxy.init?
if subproxy.argument && !subproxy.defaults.has_key?(subproxy.argument.first)
raise ArgumentError, "#{name}: init is specified, but default value of argument is missing"
end
missing_keys = subproxy.params.keys.select{|param_name| !subproxy.defaults.has_key?(param_name)}
if !missing_keys.empty?
raise ArgumentError, "#{name}: init is specified, but there're parameters without default values:#{missing_keys.join(',')}"
end
elements << Fluent::Config::Element.new(subproxy.name.to_s, '', {}, [])
end

Expand Down
5 changes: 3 additions & 2 deletions lib/fluent/plugin/formatter_stdout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ module Plugin
class StdoutFormatter < Formatter
Plugin.register_formatter('stdout', self)

TIME_FORMAT = '%Y-%m-%d %H:%M:%S.%9N %z'

config_param :output_type, :string, default: 'json'

def configure(conf)
Expand All @@ -36,8 +38,7 @@ def start
end

def format(tag, time, record)
header = "#{Time.now.localtime} #{tag}: "
"#{header}#{@sub_formatter.format(tag, time, record)}"
"#{Time.at(time).localtime.strftime(TIME_FORMAT)} #{tag}: #{@sub_formatter.format(tag, time, record).chomp}\n"
end

def stop
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/plugin/in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def configure(conf)

@use_default = false

# TODO: use config_section :parse and shrink this `if`
if conf.has_key?('format')
@parser = parser_create(usage: 'syslog_input', type: conf['format'], conf: conf)
else
Expand Down
12 changes: 6 additions & 6 deletions lib/fluent/plugin/out_stdout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class StdoutOutput < Output

helpers :inject, :formatter, :compat_parameters

DEFAULT_LINE_FORMAT_TYPE = 'stdout'
DEFAULT_FORMAT_TYPE = 'json'
TIME_FORMAT = '%Y-%m-%d %H:%M:%S.%9N %z'

Expand All @@ -32,7 +33,8 @@ class StdoutOutput < Output
end

config_section :format do
config_set_default :@type, DEFAULT_FORMAT_TYPE
config_set_default :@type, DEFAULT_LINE_FORMAT_TYPE
config_set_default :output_type, DEFAULT_FORMAT_TYPE
end

def prefer_buffered_processing
Expand All @@ -44,21 +46,19 @@ def prefer_delayed_commit
end

attr_accessor :delayed
attr_accessor :formatter

def initialize
super
@delayed = false
end

def configure(conf)
if conf['output_type'] && !conf['format']
conf['format'] = conf['output_type']
end
compat_parameters_convert(conf, :inject, :formatter)

super

@formatter = formatter_create(conf: conf.elements('format').first, default_type: DEFAULT_FORMAT_TYPE)
@formatter = formatter_create
end

def process(tag, es)
Expand All @@ -70,7 +70,7 @@ def process(tag, es)

def format(tag, time, record)
record = inject_values_to_record(tag, time, record)
"#{Time.at(time).localtime.strftime(TIME_FORMAT)} #{tag}: #{@formatter.format(tag, time, record).chomp}\n"
@formatter.format(tag, time, record).chomp + "\n"
end

def write(chunk)
Expand Down
21 changes: 11 additions & 10 deletions lib/fluent/plugin_helper/formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ def formatter_create(usage: '', type: nil, conf: nil, default_type: nil)
module FormatterParams
include Fluent::Configurable
# minimum section definition to instantiate formatter plugin instances
config_section :format, required: false, multi: true, param_name: :formatter_configs do
config_section :format, required: false, multi: true, init: true, param_name: :formatter_configs do
config_argument :usage, :string, default: ''
config_param :@type, :string
config_param :@type, :string # config_set_default required for :@type
end
end

Expand All @@ -82,15 +82,16 @@ def initialize
def configure(conf)
super

if @formatter_configs
@formatter_configs.each do |section|
if @_formatters[section.usage]
raise Fluent::ConfigError, "duplicated formatter configured: #{section.usage}"
end
formatter = Plugin.new_formatter(section[:@type], parent: self)
formatter.configure(section.corresponding_config_element)
@_formatters[section.usage] = formatter
@formatter_configs.each do |section|
if section[:@type].nil?
raise "BUG: plugin uses formatter plugin helper must call 'config_set_default :@type' in 'config_section :format'"
end
if @_formatters[section.usage]
raise Fluent::ConfigError, "duplicated formatter configured: #{section.usage}"
end
formatter = Plugin.new_formatter(section[:@type], parent: self)
formatter.configure(section.corresponding_config_element)
@_formatters[section.usage] = formatter
end
end

Expand Down
5 changes: 4 additions & 1 deletion lib/fluent/plugin_helper/parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ module ParserParams
# minimum section definition to instantiate parser plugin instances
config_section :parse, required: false, multi: true, param_name: :parser_configs do
config_argument :usage, :string, default: ''
config_param :@type, :string
config_param :@type, :string # config_set_default required for :@type
end
end

Expand All @@ -83,6 +83,9 @@ def configure(conf)
super

@parser_configs.each do |section|
if section[:@type].nil?
raise "BUG: plugin uses parser plugin helper must call 'config_set_default :@type' in 'config_section :parse'"
end
if @_parsers[section.usage]
raise Fluent::ConfigError, "duplicated parsers configured: #{section.usage}"
end
Expand Down
45 changes: 18 additions & 27 deletions test/plugin/test_filter_stdout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ def test_invalid_output_type

def test_output_type_json
d = create_driver(CONFIG + config_element("", "", { "output_type" => "json" }))
etime = event_time
time = Time.at(etime.sec)
etime = event_time("2016-10-07 21:09:31.012345678 UTC")
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out
assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\":\"test\"}\n", out

# NOTE: Float::NAN is not jsonable
d = create_driver(CONFIG + config_element("", "", { "output_type" => "json" }))
Expand All @@ -80,15 +79,14 @@ def test_output_type_json

def test_output_type_hash
d = create_driver(CONFIG + config_element("", "", { "output_type" => "hash" }))
etime = event_time
time = Time.at(etime.sec)
etime = event_time("2016-10-07 21:09:31.012345678 UTC")
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out
assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\"=>\"test\"}\n", out

# NOTE: Float::NAN is not jsonable, but hash string can output it.
d = create_driver(CONFIG + config_element("", "", { "output_type" => "hash" }))
out = capture_log(d) { filter(d, etime, {'test' => Float::NAN}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out
assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\"=>NaN}\n", out
end

# Use include_time_key to output the message's time
Expand All @@ -99,11 +97,9 @@ def test_include_time_key
"localtime" => false
})
d = create_driver(config)
etime = event_time
time = Time.at(etime.sec)
message_time = event_time("2011-01-02 13:14:15 UTC")
out = capture_log(d) { filter(d, message_time, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out
etime = event_time("2016-10-07 21:09:31.012345678 UTC")
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\":\"test\",\"time\":\"2016-10-07T21:09:31Z\"}\n", out
end

# out_stdout formatter itself can also be replaced
Expand Down Expand Up @@ -150,18 +146,17 @@ def test_json
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "json" })
d = create_driver(conf)
etime = event_time
time = Time.at(etime.sec)
etime = event_time("2016-10-07 21:09:31.012345678 UTC")
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out
assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\":\"test\"}\n", out
end

def test_json_nan
# NOTE: Float::NAN is not jsonable
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "json" })
d = create_driver(conf)
etime = event_time
etime = event_time("2016-10-07 21:09:31.012345678 UTC")
flexmock(d.instance.router).should_receive(:emit_error_event)
filter(d, etime, {'test' => Float::NAN})
end
Expand All @@ -170,21 +165,19 @@ def test_hash
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "hash" })
d = create_driver(conf)
etime = event_time
time = Time.at(etime.sec)
etime = event_time("2016-10-07 21:09:31.012345678 UTC")
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out
assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\"=>\"test\"}\n", out
end

def test_hash_nan
# NOTE: Float::NAN is not jsonable, but hash string can output it.
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "hash" })
d = create_driver(conf)
etime = event_time
time = Time.at(etime.sec)
etime = event_time("2016-10-07 21:09:31.012345678 UTC")
out = capture_log(d) { filter(d, etime, {'test' => Float::NAN}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out
assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\"=>NaN}\n", out
end

# Use include_time_key to output the message's time
Expand All @@ -200,11 +193,9 @@ def test_include_time_key
"localtime" => false
})
d = create_driver(conf)
etime = event_time
time = Time.at(etime.sec)
message_time = event_time("2011-01-02 13:14:15 UTC")
out = capture_log(d) { filter(d, message_time, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out
etime = event_time("2016-10-07 21:09:31.012345678 UTC")
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "2016-10-07 21:09:31.012345678 +0000 filter.test: {\"test\":\"test\",\"time\":\"2016-10-07T21:09:31Z\"}\n", out
end
end
end
Expand Down
20 changes: 14 additions & 6 deletions test/plugin/test_out_stdout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ def create_driver(conf = CONFIG)
sub_test_case 'non-buffered' do
test 'configure' do
d = create_driver
assert_equal [], d.instance.formatter_configs
assert_equal 1, d.instance.formatter_configs.size # init: true
assert_kind_of Fluent::Plugin::StdoutFormatter, d.instance.formatter
assert_equal 'json', d.instance.formatter.output_type
end

test 'configure output_type' do
d = create_driver(CONFIG + "\noutput_type json")
assert_equal 'json', d.instance.formatter_configs.first[:@type]
assert_kind_of Fluent::Plugin::StdoutFormatter, d.instance.formatter
assert_equal 'json', d.instance.formatter.output_type

d = create_driver(CONFIG + "\noutput_type hash")
assert_equal 'hash', d.instance.formatter_configs.first[:@type]
assert_kind_of Fluent::Plugin::StdoutFormatter, d.instance.formatter
assert_equal 'hash', d.instance.formatter.output_type

assert_raise(Fluent::ConfigError) do
d = create_driver(CONFIG + "\noutput_type foo")
Expand Down Expand Up @@ -83,7 +87,9 @@ def create_driver(conf = CONFIG)
sub_test_case 'buffered' do
test 'configure' do
d = create_driver(config_element("ROOT", "", {}, [config_element("buffer")]))
assert_equal [], d.instance.formatter_configs
assert_equal 1, d.instance.formatter_configs.size
assert_kind_of Fluent::Plugin::StdoutFormatter, d.instance.formatter
assert_equal 'json', d.instance.formatter.output_type
assert_equal 10 * 1024, d.instance.buffer_config.chunk_limit_size
assert d.instance.buffer_config.flush_at_shutdown
assert_equal ['tag'], d.instance.buffer_config.chunk_keys
Expand All @@ -94,10 +100,12 @@ def create_driver(conf = CONFIG)

test 'configure with output_type' do
d = create_driver(config_element("ROOT", "", {"output_type" => "json"}, [config_element("buffer")]))
assert_equal 'json', d.instance.formatter_configs.first[:@type]
assert_kind_of Fluent::Plugin::StdoutFormatter, d.instance.formatter
assert_equal 'json', d.instance.formatter.output_type

d = create_driver(config_element("ROOT", "", {"output_type" => "hash"}, [config_element("buffer")]))
assert_equal 'hash', d.instance.formatter_configs.first[:@type]
assert_kind_of Fluent::Plugin::StdoutFormatter, d.instance.formatter
assert_equal 'hash', d.instance.formatter.output_type

assert_raise(Fluent::ConfigError) do
create_driver(config_element("ROOT", "", {"output_type" => "foo"}, [config_element("buffer")]))
Expand Down

0 comments on commit be68fbd

Please sign in to comment.