Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce delay before flush when large timekey and small timekey_wait a… #2291

Merged
merged 1 commit into from
Feb 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,7 @@ def enqueue_thread_run
end
if @chunk_key_time
if !value_for_interval || @buffer_config.timekey < value_for_interval
value_for_interval = @buffer_config.timekey
value_for_interval = [@buffer_config.timekey, @buffer_config.timekey_wait].min
end
end
unless value_for_interval
Expand Down
38 changes: 38 additions & 0 deletions test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,44 @@ def waiting(seconds)
end
end

sub_test_case 'buffered output with large timekey and small timekey_wait' do
setup do
chunk_key = 'time'
hash = {
'timekey' => 86400, # per 1 day
'timekey_wait' => 10, # 10 seconds delay for flush
'flush_thread_count' => 1,
'flush_thread_burst_interval' => 0.01,
}
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.start
@i.after_start
end

test '#configure raises config error if timekey is not specified' do
Timecop.freeze( Time.parse('2019-02-08 00:01:00 +0900') )
ary = []
@i.register(:write){|chunk| ary << chunk.read }
@i.thread_wait_until_start
events = [
[event_time('2019-02-08 00:02:00 +0900'), {"message" => "foobar"}]
]
@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))
@i.enqueue_thread_wait
assert{ @i.write_count == 0 }

Timecop.freeze( Time.parse('2019-02-09 00:00:08 +0900') )
@i.enqueue_thread_wait
assert{ @i.write_count == 0 }

Timecop.freeze( Time.parse('2019-02-09 00:00:12 +0900') )
# wirte should be called in few seconds since
# running interval of enque thread is timekey_wait / 11.0.
waiting(5){ sleep 0.1 until @i.write_count == 1 }
end
end

sub_test_case 'buffered output feature with tag key' do
setup do
chunk_key = 'tag'
Expand Down