Skip to content

Commit

Permalink
Merge pull request #1370 from fluent/add-slow_flush_log_threshold-par…
Browse files Browse the repository at this point in the history
…ameter

Add slow_flush_log_threshold parameter
  • Loading branch information
repeatedly authored Dec 14, 2016
2 parents d2dd151 + 1a6f41e commit acc7e3f
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
17 changes: 17 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class Output < Base
PROCESS_CLOCK_ID = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC

config_param :time_as_integer, :bool, default: false
desc 'The threshold to show slow flush logs'
config_param :slow_flush_log_threshold, :float, default: 20.0

# `<buffer>` and `<secondary>` sections are available only when '#format' and '#write' are implemented
config_section :buffer, param_name: :buffer_config, init: true, required: false, multi: false, final: true do
Expand Down Expand Up @@ -985,21 +987,28 @@ def try_flush
end

begin
chunk_write_start = Process.clock_gettime(PROCESS_CLOCK_ID)

if output.delayed_commit
log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id)
@counters_monitor.synchronize{ @write_count += 1 }
@dequeued_chunks_mutex.synchronize do
# delayed_commit_timeout for secondary is configured in <buffer> of primary (<secondary> don't get <buffer>)
@dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout)
end

output.try_write(chunk)
check_slow_flush(chunk_write_start)
else # output plugin without delayed purge
chunk_id = chunk.unique_id
dump_chunk_id = dump_unique_id_hex(chunk_id)
log.trace "adding write count", instance: self.object_id
@counters_monitor.synchronize{ @write_count += 1 }
log.trace "executing sync write", chunk: dump_chunk_id

output.write(chunk)
check_slow_flush(chunk_write_start)

log.trace "write operation done, committing", chunk: dump_chunk_id
commit_write(chunk_id, delayed: false, secondary: using_secondary)
log.trace "done to commit a chunk", chunk: dump_chunk_id
Expand All @@ -1019,6 +1028,14 @@ def try_flush
end
end

def check_slow_flush(start)
elapsed_time = Process.clock_gettime(PROCESS_CLOCK_ID) - start
if elapsed_time > @slow_flush_log_threshold
log.warn "buffer flush took longer time than slow_flush_log_threshold:",
elapsed_time: elapsed_time, slow_flush_log_threshold: @slow_flush_log_threshold, plugin_id: self.plugin_id
end
end

def update_retry_state(chunk_id, using_secondary, error = nil)
@retry_mutex.synchronize do
@counters_monitor.synchronize{ @num_errors += 1 }
Expand Down
44 changes: 44 additions & 0 deletions test/plugin/test_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -799,4 +799,48 @@ def waiting(seconds)
assert_equal Fluent::Plugin::Output::FORMAT_MSGPACK_STREAM, i.generate_format_proc
end
end

sub_test_case 'slow_flush_log_threshold' do
def invoke_slow_flush_log_threshold_test(i)
i.configure(config_element('ROOT', '', {'slow_flush_log_threshold' => 0.5}, [config_element('buffer', '', {"flush_mode" => "immediate"})]))
i.start
i.after_start

t = event_time()
i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]))
i.force_flush

waiting(4) { Thread.pass until i.test_finished? }

yield
ensure
i.stop; i.before_shutdown; i.shutdown; i.after_shutdown; i.close; i.terminate
end

test '#write flush took longer time than slow_flush_log_threshold' do
i = create_output(:buffered)
write_called = false
i.register(:write) { |chunk| sleep 1 }
i.define_singleton_method(:test_finished?) { write_called }
i.define_singleton_method(:try_flush) { super(); write_called = true }

invoke_slow_flush_log_threshold_test(i) {
assert write_called
assert_equal 1, i.log.out.logs.select { |line| line =~ /buffer flush took longer time than slow_flush_log_threshold: elapsed_time/ }.size
}
end

test '#try_write flush took longer time than slow_flush_log_threshold' do
i = create_output(:delayed)
try_write_called = false
i.register(:try_write){ |chunk| sleep 1 }
i.define_singleton_method(:test_finished?) { try_write_called }
i.define_singleton_method(:try_flush) { super(); try_write_called = true }

invoke_slow_flush_log_threshold_test(i) {
assert try_write_called
assert_equal 1, i.log.out.logs.select { |line| line =~ /buffer flush took longer time than slow_flush_log_threshold: elapsed_time/ }.size
}
end
end
end

0 comments on commit acc7e3f

Please sign in to comment.