From 9d12e599400007895d17cdd5649bbbd11d81660b Mon Sep 17 00:00:00 2001 From: Kentaro Hayashi Date: Mon, 11 Oct 2021 15:16:53 +0900 Subject: [PATCH] fluent-cat: support to send event time in specified timestamp It is aimed to help to send specific timestamp event easily. Signed-off-by: Kentaro Hayashi --- lib/fluent/command/cat.rb | 16 +++++++++++++--- test/command/test_cat.rb | 29 +++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/lib/fluent/command/cat.rb b/lib/fluent/command/cat.rb index 79b23bc6da..fd5ef4c074 100644 --- a/lib/fluent/command/cat.rb +++ b/lib/fluent/command/cat.rb @@ -35,6 +35,7 @@ message_key = 'message' time_as_integer = false retry_limit = 5 +event_time = nil op.on('-p', '--port PORT', "fluent tcp port (default: #{port})", Integer) {|i| port = i @@ -80,6 +81,10 @@ retry_limit = n } +op.on('--event-time TIME_STRING', "Specify the time expression string (default: nil)") {|v| + event_time = v +} + singleton_class.module_eval do define_method(:usage) do |msg| puts op.to_s @@ -134,7 +139,7 @@ def run end end - def initialize(tag, connector, time_as_integer: false, retry_limit: 5) + def initialize(tag, connector, time_as_integer: false, retry_limit: 5, event_time: nil) @tag = tag @connector = connector @socket = false @@ -148,6 +153,7 @@ def initialize(tag, connector, time_as_integer: false, retry_limit: 5) @retry_wait = 1 @retry_limit = retry_limit @time_as_integer = time_as_integer + @event_time = event_time super() end @@ -166,7 +172,11 @@ def write(record) end end - time = Fluent::EventTime.now + time = if @event_time + Fluent::EventTime.parse(@event_time) + else + Fluent::EventTime.now + end time = time.to_i if @time_as_integer entry = if secondary_record?(record) # Even though secondary contains Fluent::EventTime in record, @@ -309,7 +319,7 @@ def abort_message(time, record) } end -w = Writer.new(tag, connector, time_as_integer: time_as_integer, retry_limit: retry_limit) +w = Writer.new(tag, connector, time_as_integer: time_as_integer, retry_limit: retry_limit, event_time: event_time) w.start case format diff --git a/test/command/test_cat.rb b/test/command/test_cat.rb index d0a2f60d0d..51c1a20bbb 100644 --- a/test/command/test_cat.rb +++ b/test/command/test_cat.rb @@ -96,4 +96,33 @@ def test_cat_secondary_file [d.events.size, event.first, event.last]) end end + + sub_test_case "send specific event time" do + def test_without_event_time + event_time = Fluent::EventTime.now + d = create_driver + d.run(expect_records: 1) do + Open3.pipeline_w("#{ServerEngine.ruby_bin_path} #{FLUENT_CAT_COMMAND} --port #{@port} tag") do |stdin| + stdin.puts('{"key":"value"}') + stdin.close + end + end + event = d.events.first + assert_in_delta(event_time.to_f, event[1].to_f, 3.0) # expect command to be finished in 3 seconds + assert_equal([1, "tag", true, @record], + [d.events.size, event.first, event_time.to_f < event[1].to_f, event.last]) + end + + def test_with_event_time + event_time = "2021-01-02 13:14:15.0+00:00" + d = create_driver + d.run(expect_records: 1) do + Open3.pipeline_w("#{ServerEngine.ruby_bin_path} #{FLUENT_CAT_COMMAND} --port #{@port} --event-time '#{event_time}' tag") do |stdin| + stdin.puts('{"key":"value"}') + stdin.close + end + end + assert_equal([["tag", Fluent::EventTime.parse(event_time), @record]], d.events) + end + end end