From 825e6e22e56effa77e70fb9d574b9495290412cf Mon Sep 17 00:00:00 2001 From: Masatake Iwasaki Date: Sat, 9 Feb 2019 05:34:08 +0900 Subject: [PATCH] Reduce delay before flush when large timekey and small timekey_wait are given. Signed-off-by: Masatake Iwasaki --- lib/fluent/plugin/output.rb | 2 +- test/plugin/test_output_as_buffered.rb | 38 ++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 7d2457e3fd..c90f36e466 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1336,7 +1336,7 @@ def enqueue_thread_run end if @chunk_key_time if !value_for_interval || @buffer_config.timekey < value_for_interval - value_for_interval = @buffer_config.timekey + value_for_interval = [@buffer_config.timekey, @buffer_config.timekey_wait].min end end unless value_for_interval diff --git a/test/plugin/test_output_as_buffered.rb b/test/plugin/test_output_as_buffered.rb index ca3552b4d2..667d1a8af8 100644 --- a/test/plugin/test_output_as_buffered.rb +++ b/test/plugin/test_output_as_buffered.rb @@ -1074,6 +1074,44 @@ def waiting(seconds) end end + sub_test_case 'buffered output with large timekey and small timekey_wait' do + setup do + chunk_key = 'time' + hash = { + 'timekey' => 86400, # per 1 day + 'timekey_wait' => 10, # 10 seconds delay for flush + 'flush_thread_count' => 1, + 'flush_thread_burst_interval' => 0.01, + } + @i = create_output(:buffered) + @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)])) + @i.start + @i.after_start + end + + test '#configure raises config error if timekey is not specified' do + Timecop.freeze( Time.parse('2019-02-08 00:01:00 +0900') ) + ary = [] + @i.register(:write){|chunk| ary << chunk.read } + @i.thread_wait_until_start + events = [ + [event_time('2019-02-08 00:02:00 +0900'), {"message" => "foobar"}] + ] + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + @i.enqueue_thread_wait + assert{ @i.write_count == 0 } + + Timecop.freeze( Time.parse('2019-02-09 00:00:08 +0900') ) + @i.enqueue_thread_wait + assert{ @i.write_count == 0 } + + Timecop.freeze( Time.parse('2019-02-09 00:00:12 +0900') ) + # wirte should be called in few seconds since + # running interval of enque thread is timekey_wait / 11.0. + waiting(5){ sleep 0.1 until @i.write_count == 1 } + end + end + sub_test_case 'buffered output feature with tag key' do setup do chunk_key = 'tag'