Skip to content

Commit

Permalink
Add slow_flush_count and flush_time_count
Browse files Browse the repository at this point in the history
Signed-off-by: Yuta Iwama <[email protected]>
  • Loading branch information
ganmacs committed Jun 18, 2019
1 parent 6736701 commit 63c006e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
6 changes: 6 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ def initialize
@emit_records = 0
@write_count = 0
@rollback_count = 0
@flush_time_count = 0
@slow_flush_count = 0

# How to process events is decided here at once, but it will be decided in delayed way on #configure & #start
if implement?(:synchronous)
Expand Down Expand Up @@ -1202,7 +1204,9 @@ def backup_chunk(chunk, using_secondary, delayed_commit)

def check_slow_flush(start)
elapsed_time = Fluent::Clock.now - start
@counters_monitor.synchronize { @flush_time_count += elapsed_time }
if elapsed_time > @slow_flush_log_threshold
@counters_monitor.synchronize { @slow_flush_count += 1 }
log.warn "buffer flush took longer time than slow_flush_log_threshold:",
elapsed_time: elapsed_time, slow_flush_log_threshold: @slow_flush_log_threshold, plugin_id: self.plugin_id
end
Expand Down Expand Up @@ -1467,6 +1471,8 @@ def statistics
'emit_count' => @emit_count,
'write_count' => @write_count,
'rollback_count' => @rollback_count,
'slow_flush_count' => @slow_flush_count,
'flush_time_count' => @flush_time_count,
}

if @buffer && @buffer.respond_to?(:statistics)
Expand Down
16 changes: 16 additions & 0 deletions test/plugin/test_in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ def test_configure
"emit_records" => 0,
"write_count" => 0,
"rollback_count" => 0,
"slow_flush_count" => 0,
"flush_time_count" => 0,
}
output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config
error_label_info = {
Expand All @@ -142,6 +144,8 @@ def test_configure
"emit_records" => 0,
"write_count" => 0,
"rollback_count" => 0,
"slow_flush_count" => 0,
"flush_time_count" => 0,
}
error_label_info.merge!("config" => {"@id"=>"null", "@type" => "null"}) if with_config
opts = {with_config: with_config}
Expand Down Expand Up @@ -203,6 +207,8 @@ def test_configure
"emit_records" => 0,
"write_count" => 0,
"rollback_count" => 0,
"slow_flush_count" => 0,
"flush_time_count" => 0,
}
expect_test_out_record = {
"plugin_id" => "test_out",
Expand All @@ -214,6 +220,8 @@ def test_configure
"emit_records" => 0,
"write_count" => 0,
"rollback_count" => 0,
"slow_flush_count" => 0,
"flush_time_count" => 0,
}
assert_equal(expect_relabel_record, d.events[1][2])
assert_equal(expect_test_out_record, d.events[3][2])
Expand Down Expand Up @@ -333,6 +341,8 @@ def get(uri, header = {})
"emit_records" => 0,
"write_count" => 0,
"rollback_count" => 0,
"slow_flush_count" => 0,
"flush_time_count" => 0,
}
expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config
expected_null_response.merge!("retry" => {}) if with_retry
Expand Down Expand Up @@ -393,6 +403,8 @@ def get(uri, header = {})
"emit_records" => 0,
"write_count" => 0,
"rollback_count" => 0,
"slow_flush_count" => 0,
"flush_time_count" => 0,
}
expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config
expected_null_response.merge!("retry" => {}) if with_retry
Expand Down Expand Up @@ -437,6 +449,8 @@ def get(uri, header = {})
"emit_records" => 0,
"write_count" => 0,
"rollback_count" => 0,
"slow_flush_count" => 0,
"flush_time_count" => 0,
}
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json?with_config=no&with_retry=no&with_ivars=id,num_errors").body)
test_in_response = response["plugins"][0]
Expand Down Expand Up @@ -565,6 +579,8 @@ def write(chunk)
"emit_records" => 1,
"write_count" => 2,
"rollback_count" => 0,
'slow_flush_count' => 0,
'flush_time_count' => 0,
}
output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]]))
# flush few times to check steps
Expand Down

0 comments on commit 63c006e

Please sign in to comment.