From 59f5ef1f05d02beb0e673223588bf1b3063aae81 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 19 Jun 2020 05:52:03 +0900 Subject: [PATCH 1/2] in_http: Use 400 for invalid request Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/in_http.rb | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index bed031005c..12e4a593b2 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -189,10 +189,10 @@ def on_request(path_info, params) return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{e}\n"] end - # TODO server error - begin - # Support batched requests - if record.is_a?(Array) + mes = nil + # Support batched requests + if record.is_a?(Array) + begin mes = Fluent::MultiEventStream.new record.each do |single_record| if @add_http_headers @@ -219,6 +219,18 @@ def on_request(path_info, params) mes.add(single_time, single_record) end + rescue => e + if @dump_error_log + p e.backtrace + log.error "failed to process batch request", error: e + end + return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{e}\n"] + end + end + + # TODO server error + begin + if mes router.emit_stream(tag, mes) else router.emit(tag, time, record) From cd279eefc1e8f8e49d8b1ba69b4235d017bcd186 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 19 Jun 2020 12:16:45 +0900 Subject: [PATCH 2/2] in_http: Add time parser support for default json/msgpack. ref #3036 Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/in_http.rb | 60 ++++++++++++++++++++++++++++++------ test/plugin/test_in_http.rb | 57 ++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 10 deletions(-) diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 12e4a593b2..7916825f3e 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -27,10 +27,28 @@ module Fluent::Plugin class InHttpParser < Parser Fluent::Plugin.register_parser('in_http', self) + + config_set_default :time_key, 'time' + + def configure(conf) + super + + # if no time parser related parameters, use in_http's time convert rule + @time_parser = if conf.has_key?('time_type') || conf.has_key?('time_format') + time_parser_create + else + nil + end + end + def parse(text) # this plugin is dummy implementation not to raise error yield nil, nil end + + def get_time_parser + @time_parser + end end class HttpInput < Input @@ -74,17 +92,20 @@ def configure(conf) super + @parser = nil m = if @parser_configs.first['@type'] == 'in_http' @parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack') + @parser_msgpack.time_key = nil @parser_msgpack.estimate_current_event = false @parser_json = parser_create(usage: 'parser_in_http_json', type: 'json') + @parser_json.time_key = nil @parser_json.estimate_current_event = false + + default_parser = parser_create(usage: '') @format_name = 'default' - @parser_time_key = if parser_config = conf.elements('parse').first - parser_config['time_key'] || 'time' - else - 'time' - end + @parser_time_key = default_parser.time_key + @default_time_parser = default_parser.get_time_parser + @default_keep_time_key = default_parser.keep_time_key method(:parse_params_default) else @parser = parser_create @@ -180,7 +201,23 @@ def on_request(path_info, params) param_time = param_time.to_f param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) else - record_time.nil? ? Fluent::EventTime.now : record_time + if record_time.nil? + if !record.is_a?(Array) + if t = @default_keep_time_key ? record[@parser_time_key] : record.delete(@parser_time_key) + if @default_time_parser + @default_time_parser.parse(t) + else + Fluent::EventTime.from_time(Time.at(t)) + end + else + Fluent::EventTime.now + end + else + Fluent::EventTime.now + end + else + record_time + end end rescue => e if @dump_error_log @@ -206,12 +243,16 @@ def on_request(path_info, params) single_record['REMOTE_ADDR'] = params['REMOTE_ADDR'] end - if defined? @parser + if @parser single_time = @parser.parse_time(single_record) single_time, single_record = @parser.convert_values(single_time, single_record) else - single_time = if t = single_record.delete(@parser_time_key) - Fluent::EventTime.from_time(Time.at(t)) + single_time = if t = @default_keep_time_key ? single_record[@parser_time_key] : single_record.delete(@parser_time_key) + if @default_time_parser + @default_time_parser.parse(t) + else + Fluent::EventTime.from_time(Time.at(t)) + end else time end @@ -221,7 +262,6 @@ def on_request(path_info, params) end rescue => e if @dump_error_log - p e.backtrace log.error "failed to process batch request", error: e end return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{e}\n"] diff --git a/test/plugin/test_in_http.rb b/test/plugin/test_in_http.rb index 3847f4a3c2..c16fde805c 100644 --- a/test/plugin/test_in_http.rb +++ b/test/plugin/test_in_http.rb @@ -116,6 +116,36 @@ def test_json assert_equal_event_time time, d.events[1][1] end + data('json' => ['json', :to_json], + 'msgpack' => ['msgpack', :to_msgpack]) + def test_default_with_time_format(data) + param, method_name = data + d = create_driver(CONFIG + %[ + + keep_time_key + time_format %iso8601 + + ]) + + time = event_time("2020-06-10T01:14:27+00:00") + events = [ + ["tag1", time, {"a" => 1, "time" => '2020-06-10T01:14:27+00:00'}], + ["tag2", time, {"a" => 2, "time" => '2020-06-10T01:14:27+00:00'}], + ] + res_codes = [] + + d.run(expect_records: 2) do + events.each do |tag, t, record| + res = post("/#{tag}", {param => record.__send__(method_name)}) + res_codes << res.code + end + end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] + end + def test_multi_json d = create_driver time = event_time("2011-01-02 13:14:15 UTC") @@ -163,6 +193,33 @@ def test_multi_json_with_time_field assert_equal_event_time time, d.events[1][1] end + data('json' => ['json', :to_json], + 'msgpack' => ['msgpack', :to_msgpack]) + def test_default_multi_with_time_format(data) + param, method_name = data + d = create_driver(CONFIG + %[ + + keep_time_key + time_format %iso8601 + + ]) + time = event_time("2020-06-10T01:14:27+00:00") + events = [ + ["tag1", time, {'a' => 1, 'time' => "2020-06-10T01:14:27+00:00"}], + ["tag1", time, {'a' => 2, 'time' => "2020-06-10T01:14:27+00:00"}], + ] + tag = "tag1" + res_codes = [] + d.run(expect_records: 2, timeout: 5) do + res = post("/#{tag}", {param => events.map { |e| e[2] }.__send__(method_name)}) + res_codes << res.code + end + assert_equal ["200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] + end + def test_multi_json_with_nonexistent_time_key d = create_driver(CONFIG + %[