Skip to content

Commit

Permalink
Merge pull request #810 from fluent/fix-in_tail-for-bqle
Browse files Browse the repository at this point in the history
Fix in_tail input messages loss when receive_lines fail. fix #699
  • Loading branch information
repeatedly committed Mar 2, 2016
2 parents c69e459 + 38105fe commit c6e88cc
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 20 deletions.
52 changes: 32 additions & 20 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def run
log.error_backtrace
end

# @return true if no error or unrecoverable error happens in emit action. false if got BufferQueueLimitError
def receive_lines(lines, tail_watcher)
es = @receive_handler.call(lines, tail_watcher)
unless es.empty?
Expand All @@ -260,10 +261,15 @@ def receive_lines(lines, tail_watcher)
end
begin
router.emit_stream(tag, es)
rescue BufferQueueLimitError
return false
rescue
# ignore errors. Engine shows logs and backtraces.
# ignore non BufferQueueLimitError errors because in_tail can't recover. Engine shows logs and backtraces.
return true
end
end

return true
end

def convert_line_to_event(line, es)
Expand Down Expand Up @@ -519,38 +525,44 @@ def initialize(io, pe, log, read_lines_limit, first = true, &receive_lines)
@receive_lines = receive_lines
@buffer = ''.force_encoding('ASCII-8BIT')
@iobuf = ''.force_encoding('ASCII-8BIT')
@lines = []
end

attr_reader :io
attr_accessor :pe

def on_notify
begin
lines = []
read_more = false

begin
while true
if @buffer.empty?
@io.readpartial(2048, @buffer)
else
@buffer << @io.readpartial(2048, @iobuf)
end
while line = @buffer.slice!(/.*?\n/m)
lines << line
end
if lines.size >= @read_lines_limit
# not to use too much memory in case the file is very large
read_more = true
break
if @lines.empty?
begin
while true
if @buffer.empty?
@io.readpartial(2048, @buffer)
else
@buffer << @io.readpartial(2048, @iobuf)
end
while line = @buffer.slice!(/.*?\n/m)
@lines << line
end
if @lines.size >= @read_lines_limit
# not to use too much memory in case the file is very large
read_more = true
break
end
end
rescue EOFError
end
rescue EOFError
end

unless lines.empty?
@receive_lines.call(lines)
@pe.update_pos(@io.pos - @buffer.bytesize)
unless @lines.empty?
if @receive_lines.call(@lines)
@pe.update_pos(@io.pos - @buffer.bytesize)
@lines.clear
else
read_more = false
end
end
end while read_more

Expand Down
42 changes: 42 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -646,4 +646,46 @@ def test_missing_file
assert_equal({"message" => "test4"}, emits[1][2])
end
end

sub_test_case 'emit error cases' do
def test_emit_error_with_buffer_queue_limit_error
emits = execute_test(::Fluent::BufferQueueLimitError, "queue size exceeds limit")
assert_equal(10, emits.length)
10.times { |i|
assert_equal({"message" => "test#{i}"}, emits[i][2])
}
end

def test_emit_error_with_non_buffer_queue_limit_error
emits = execute_test(StandardError, "non BufferQueueLimitError error")
assert_true(emits.size > 0 && emits.size != 10)
emits.size.times { |i|
assert_equal({"message" => "test#{10 - emits.size + i}"}, emits[i][2])
}
end

def execute_test(error_class, error_message)
d = create_driver(CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG)
# Use define_singleton_method instead of d.emit_stream to capture local variable
d.define_singleton_method(:emit_stream) do |tag, es|
@test_num_errors ||= 0
if @test_num_errors < 5
@test_num_errors += 1
raise error_class, error_message
else
@emit_streams << [tag, es.to_a]
end
end

d.run do
10.times { |i|
File.open("#{TMP_DIR}/tail.txt", "ab") { |f| f.puts "test#{i}" }
sleep 0.5
}
sleep 1
end

d.emits
end
end
end

0 comments on commit c6e88cc

Please sign in to comment.