diff --git a/lib/fluent/plugin/filter_parser.rb b/lib/fluent/plugin/filter_parser.rb new file mode 100644 index 0000000000..cc5d785b4a --- /dev/null +++ b/lib/fluent/plugin/filter_parser.rb @@ -0,0 +1,108 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/time' +require 'fluent/config/error' +require 'fluent/plugin/filter' +require 'fluent/plugin_helper/parser' +require 'fluent/plugin_helper/compat_parameters' + +module Fluent::Plugin + class ParserFilter < Filter + Fluent::Plugin.register_filter('parser', self) + + helpers :parser, :compat_parameters + + config_param :key_name, :string + config_param :reserve_data, :bool, default: false + config_param :reserve_time, :bool, default: false + config_param :inject_key_prefix, :string, default: nil + config_param :replace_invalid_sequence, :bool, default: false + config_param :hash_value_field, :string, default: nil + + attr_reader :parser + + def configure(conf) + compat_parameters_convert(conf, :parser) + + super + + @parser = parser_create + end + + FAILED_RESULT = [nil, nil].freeze # reduce allocation cost + REPLACE_CHAR = '?'.freeze + + def filter_with_time(tag, time, record) + raw_value = record[@key_name] + if raw_value.nil? + router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist")) + if @reserve_data + return time, handle_parsed(tag, record, time, {}) + else + return FAILED_RESULT + end + end + begin + @parser.parse(raw_value) do |t, values| + if values + t = if @reserve_time + time + else + t.nil? ? time : t + end + r = handle_parsed(tag, record, t, values) + return t, r + else + router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not match with data '#{raw_value}'")) + if @reserve_data + t = time + r = handle_parsed(tag, record, time, {}) + return t, r + else + return FAILED_RESULT + end + end + end + rescue Fluent::Plugin::Parser::ParserError => e + router.emit_error_event(tag, time, record, e) + return FAILED_RESULT + rescue ArgumentError => e + raise unless @replace_invalid_sequence + raise unless e.message.index("invalid byte sequence in") == 0 + + raw_value = raw_value.scrub(REPLACE_CHAR) + retry + rescue => e + router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("parse failed #{e.message}")) + return FAILED_RESULT + end + end + + private + + def handle_parsed(tag, record, t, values) + if values && @inject_key_prefix + values = Hash[values.map { |k, v| [@inject_key_prefix + k, v] }] + end + r = @hash_value_field ? {@hash_value_field => values} : values + if @reserve_data + r = r ? record.merge(r) : record + end + r + end + end +end diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index c5b2f3a3c6..879bff8f8e 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -52,12 +52,13 @@ module CompatParameters "types" => nil, "types_delimiter" => nil, "types_label_delimiter" => nil, - "null_value_pattern" => "null_value_pattern", - "null_empty_string" => "null_empty_string", "keys" => "keys", # CSVParser, TSVParser (old ValuesParser) "time_key" => "time_key", "time_format" => "time_format", "delimiter" => "delimiter", + "keep_time_key" => "keep_time_key", + "null_empty_string" => "null_empty_string", + "null_value_pattern" => "null_value_pattern", "json_parser" => "json_parser", # JSONParser "label_delimiter" => "label_delimiter", # LabeledTSVParser "format_firstline" => "format_firstline", # MultilineParser diff --git a/test/plugin/test_filter_parser.rb b/test/plugin/test_filter_parser.rb new file mode 100644 index 0000000000..0e54989478 --- /dev/null +++ b/test/plugin/test_filter_parser.rb @@ -0,0 +1,665 @@ +require_relative '../helper' +require 'timecop' +require 'fluent/test/driver/filter' +require 'fluent/plugin/filter_parser' +require 'flexmock/test_unit' + +class ParserFilterTest < Test::Unit::TestCase + include FlexMock::TestCase + + def setup + Fluent::Test.setup + @tag = 'test' + @default_time = Time.parse('2010-05-04 03:02:01 UTC') + Timecop.freeze(@default_time) + end + + def teardown + super + Timecop.return + end + + def assert_equal_parsed_time(expected, actual) + if expected.is_a?(Integer) + assert_equal(expected, actual.to_i) + else + assert_equal_event_time(expected, actual) + end + end + + ParserError = Fluent::Plugin::Parser::ParserError + CONFIG = %[ + key_name message + reserve_data true + + @type regexp + expression /^(?.)(?.) (? + ] + + def create_driver(conf=CONFIG) + Fluent::Test::Driver::Filter.new(Fluent::Plugin::ParserFilter).configure(conf) + end + + def test_configure + assert_raise(Fluent::ConfigError) { + create_driver('') + } + assert_raise(Fluent::ConfigError) { + create_driver %[ + key_name foo + + @type unknown_format_that_will_never_be_implemented + + ] + } + assert_nothing_raised { + create_driver %[ + key_name foo + + @type regexp + expression /(?.)/ + + ] + } + assert_nothing_raised { + create_driver %[ + key_name foo + + @type json + + ] + } + assert_nothing_raised { + create_driver %[ + key_name foo + format json + ] + } + assert_nothing_raised { + create_driver %[ + key_name foo + + @type ltsv + + ] + } + assert_nothing_raised { + create_driver %[ + key_name message + + @type regexp + expression /^col1=(?.+) col2=(?.+)$/ + + ] + } + d = create_driver %[ + key_name foo + + @type regexp + expression /(?.)/ + + ] + assert_false d.instance.reserve_data + end + + # CONFIG = %[ + # remove_prefix test + # add_prefix parsed + # key_name message + # format /^(?.)(?.) (?