diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index e8cb94dada..198f8be1e0 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -73,6 +73,8 @@ def initialize config_param :from_encoding, :string, default: nil desc 'Add the log path being tailed to records. Specify the field name to be used.' config_param :path_key, :string, default: nil + desc 'Open and close the file on every update instead of leaving it open until it gets rotated.' + config_param :open_on_every_update, :bool, default: false attr_reader :paths @@ -213,7 +215,7 @@ def refresh_watchers def setup_watcher(path, pe) line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil - tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, &method(:receive_lines)) + tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines)) tw.attach do |watcher| watcher.timer_trigger = timer_execute(:in_tail_timer_trigger, 1, &watcher.method(:on_notify)) if watcher.enable_watch_timer event_loop_attach(watcher.stat_trigger) @@ -297,13 +299,6 @@ def detach_watcher_after_rotate_wait(tw) def flush_buffer(tw) if lb = tw.line_buffer lb.chomp! - if @encoding - if @from_encoding - lb.encode!(@encoding, @from_encoding) - else - lb.force_encoding(@encoding) - end - end @parser.parse(lb) { |time, record| if time && record tag = if @tag_prefix || @tag_suffix @@ -345,13 +340,6 @@ def receive_lines(lines, tail_watcher) def convert_line_to_event(line, es, tail_watcher) begin line.chomp! # remove \n - if @encoding - if @from_encoding - line.encode!(@encoding, @from_encoding) - else - line.force_encoding(@encoding) - end - end @parser.parse(line) { |time, record| if time && record record[@path_key] ||= tail_watcher.path unless @path_key.nil? @@ -410,7 +398,7 @@ def parse_multilines(lines, tail_watcher) end class TailWatcher - def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, &receive_lines) + def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) @path = path @rotate_wait = rotate_wait @pe = pe || MemoryPositionEntry.new @@ -420,17 +408,22 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, r @receive_lines = receive_lines @update_watcher = update_watcher - @stat_trigger = StatWatcher.new(path, log, &method(:on_notify)) + @stat_trigger = StatWatcher.new(self, &method(:on_notify)) @timer_trigger = nil - @rotate_handler = RotateHandler.new(path, log, &method(:on_rotate)) + @rotate_handler = RotateHandler.new(self, &method(:on_rotate)) @io_handler = nil @log = log @line_buffer_timer_flusher = line_buffer_timer_flusher + @from_encoding = from_encoding + @encoding = encoding + @open_on_every_update = open_on_every_update end attr_reader :path + attr_reader :log, :pe, :read_lines_limit, :open_on_every_update + attr_reader :from_encoding, :encoding attr_reader :stat_trigger, :enable_watch_timer attr_accessor :timer_trigger attr_accessor :line_buffer, :line_buffer_timer_flusher @@ -458,21 +451,27 @@ def detach def close if @io_handler @io_handler.close + @io_handler = nil end end def on_notify - @rotate_handler.on_notify if @rotate_handler + begin + stat = Fluent::FileWrapper.stat(@path) + rescue Errno::ENOENT + # moved or deleted + stat = nil + end + + @rotate_handler.on_notify(stat) if @rotate_handler @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher - return unless @io_handler - @io_handler.on_notify + @io_handler.on_notify if @io_handler end - def on_rotate(io) - if @io_handler == nil - if io + def on_rotate(stat) + if @io_handler.nil? + if stat # first time - stat = io.stat fsize = stat.size inode = stat.ino @@ -483,13 +482,11 @@ def on_rotate(io) # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case, seek to the saved position - pos = @pe.read_pos elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. - pos = 0 - @pe.update(inode, pos) + @pe.update(inode, 0) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. @@ -498,36 +495,37 @@ def on_rotate(io) pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end - io.seek(pos) - - @io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) + @io_handler = IOHandler.new(self, &method(:wrap_receive_lines)) else @io_handler = NullIOHandler.new end else - log_msg = "detected rotation of #{@path}" - log_msg << "; waiting #{@rotate_wait} seconds" if @io_handler.io # wait rotate_time if previous file is exist - @log.info log_msg + watcher_needs_update = false - if io - stat = io.stat + if stat inode = stat.ino if inode == @pe.read_inode # truncated - @pe.update_pos(stat.size) - io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) + @pe.update_pos(0) @io_handler.close - @io_handler = io_handler - elsif @io_handler.io.nil? # There is no previous file. Reuse TailWatcher - @pe.update(inode, io.pos) - io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) - @io_handler = io_handler + elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher + @pe.update(inode, 0) else # file is rotated and new file found - @update_watcher.call(@path, swap_state(@pe)) + watcher_needs_update = true end else # file is rotated and new file not found # Clear RotateHandler to avoid duplicated file watch in same path. @rotate_handler = nil + watcher_needs_update = true + end + + log_msg = "detected rotation of #{@path}" + log_msg << "; waiting #{@rotate_wait} seconds" if watcher_needs_update # wait rotate_time if previous file exists + @log.info log_msg + + if watcher_needs_update @update_watcher.call(@path, swap_state(@pe)) + else + @io_handler = IOHandler.new(self, &method(:wrap_receive_lines)) end end end @@ -537,86 +535,154 @@ def swap_state(pe) mpe = MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe - @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer. - pe # This pe will be updated in on_rotate after TailWatcher is initialized end class StatWatcher < Coolio::StatWatcher - def initialize(path, log, &callback) + def initialize(watcher, &callback) + @watcher = watcher @callback = callback - @log = log - super(path) + super(watcher.path) end def on_change(prev, cur) @callback.call rescue # TODO log? - @log.error $!.to_s - @log.error_backtrace + @watcher.log.error $!.to_s + @watcher.log.error_backtrace + end + end + + + class FIFO + def initialize(from_encoding, encoding) + @from_encoding = from_encoding + @encoding = encoding + @buffer = ''.force_encoding(from_encoding) + @eol = "\n".encode(from_encoding).freeze + end + + attr_reader :from_encoding, :encoding, :buffer + + def <<(chunk) + # Although "chunk" is most likely transient besides String#force_encoding itself + # won't affect the actual content of it, it is also probable that "chunk" is + # a reused buffer and changing its encoding causes some problems on the caller side. + # + # Actually, the caller here is specific and "chunk" comes from IO#partial with + # the second argument, which the function always returns as a return value. + # + # Feeding a string that has its encoding attribute set to any double-byte or + # quad-byte encoding to IO#readpartial as the second arguments results in an + # assertion failure on Ruby < 2.4.0 for unknown reasons. + orig_encoding = chunk.encoding + chunk.force_encoding(from_encoding) + @buffer << chunk + # Thus the encoding needs to be reverted back here + chunk.force_encoding(orig_encoding) + end + + def convert(s) + if @from_encoding == @encoding + s + else + s.encode(@encoding, @from_encoding) + end + end + + def next_line + idx = @buffer.index(@eol) + convert(@buffer.slice!(0, idx + 1)) unless idx.nil? + end + + def bytesize + @buffer.bytesize end end class IOHandler - def initialize(io, pe, log, read_lines_limit, first = true, &receive_lines) - @log = log - @log.info "following tail of #{io.path}" if first - @io = io - @pe = pe - @read_lines_limit = read_lines_limit + def initialize(watcher, &receive_lines) + @watcher = watcher @receive_lines = receive_lines - @buffer = ''.force_encoding('ASCII-8BIT') + @fifo = FIFO.new(@watcher.from_encoding || Encoding::ASCII_8BIT, @watcher.encoding || Encoding::ASCII_8BIT) @iobuf = ''.force_encoding('ASCII-8BIT') @lines = [] + @io = nil + @watcher.log.info "following tail of #{@watcher.path}" end - attr_reader :io - attr_accessor :pe - def on_notify - begin - read_more = false - - if @lines.empty? - begin - while true - if @buffer.empty? - @io.readpartial(2048, @buffer) - else - @buffer << @io.readpartial(2048, @iobuf) - end - while idx = @buffer.index("\n".freeze) - @lines << @buffer.slice!(0, idx + 1) - end - if @lines.size >= @read_lines_limit - # not to use too much memory in case the file is very large - read_more = true - break + with_io do |io| + begin + read_more = false + + if !io.nil? && @lines.empty? + begin + while true + @fifo << io.readpartial(2048, @iobuf) + while (line = @fifo.next_line) + @lines << line + end + if @lines.size >= @watcher.read_lines_limit + # not to use too much memory in case the file is very large + read_more = true + break + end end + rescue EOFError end - rescue EOFError end - end - unless @lines.empty? - if @receive_lines.call(@lines) - @pe.update_pos(@io.pos - @buffer.bytesize) - @lines.clear - else - read_more = false + unless @lines.empty? + if @receive_lines.call(@lines) + @watcher.pe.update_pos(io.pos - @fifo.bytesize) + @lines.clear + else + read_more = false + end end - end - end while read_more - - rescue - @log.error $!.to_s - @log.error_backtrace - close + end while read_more + end end def close - @io.close unless @io.closed? + if @io && !@io.closed? + @io.close + @io = nil + end + end + + def opened? + !!@io + end + + def open + io = Fluent::FileWrapper.open(@watcher.path) + io.seek(@watcher.pe.read_pos + @fifo.bytesize) + io + rescue Errno::ENOENT + nil + end + + def with_io + begin + if @watcher.open_on_every_update + io = open + begin + yield io + ensure + io.close unless io.nil? + end + else + @io ||= open + yield @io + end + rescue + @watcher.log.error $!.to_s + @watcher.log.error_backtrace + close + end end end @@ -632,44 +698,40 @@ def on_notify def close end + + def opened? + false + end end class RotateHandler - def initialize(path, log, &on_rotate) - @path = path + def initialize(watcher, &on_rotate) + @watcher = watcher @inode = nil @fsize = -1 # first @on_rotate = on_rotate - @log = log end - def on_notify - begin - stat = Fluent::FileWrapper.stat(@path) - inode = stat.ino - fsize = stat.size - rescue Errno::ENOENT - # moved or deleted + def on_notify(stat) + if stat.nil? inode = nil fsize = 0 + else + inode = stat.ino + fsize = stat.size end begin if @inode != inode || fsize < @fsize - # rotated or truncated - begin - io = Fluent::FileWrapper.open(@path) - rescue Errno::ENOENT - end - @on_rotate.call(io) + @on_rotate.call(stat) end @inode = inode @fsize = fsize end rescue - @log.error $!.to_s - @log.error_backtrace + @watcher.log.error $!.to_s + @watcher.log.error_backtrace end end @@ -767,16 +829,19 @@ class FilePositionEntry def initialize(file, seek) @file = file @seek = seek + @pos = nil end def update(ino, pos) @file.pos = @seek @file.write "%016x\t%016x" % [pos, ino] + @pos = pos end def update_pos(pos) @file.pos = @seek @file.write "%016x" % pos + @pos = pos end def read_inode @@ -786,9 +851,11 @@ def read_inode end def read_pos - @file.pos = @seek - raw = @file.read(16) - raw ? raw.to_i(16) : 0 + @pos ||= begin + @file.pos = @seek + raw = @file.read(16) + raw ? raw.to_i(16) : 0 + end end end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 131f4ffda1..804acf24da 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -37,6 +37,7 @@ def teardown COMMON_CONFIG = CONFIG + config_element("", "", { "pos_file" => "#{TMP_DIR}/tail.pos" }) CONFIG_READ_FROM_HEAD = config_element("", "", { "read_from_head" => true }) CONFIG_ENABLE_WATCH_TIMER = config_element("", "", { "enable_watch_timer" => false }) + CONFIG_OPEN_ON_EVERY_UPDATE = config_element("", "", { "open_on_every_update" => true }) SINGLE_LINE_CONFIG = config_element("", "", { "format" => "/(?.*)/" }) PARSE_SINGLE_LINE_CONFIG = config_element("", "", {}, [config_element("parse", "", { "@type" => "/(?.*)/" })]) MULTILINE_CONFIG = config_element( @@ -315,6 +316,20 @@ def test_rotate_file_with_read_from_head(data) assert_equal({"message" => "test6"}, events[5][2]) end + data(flat: CONFIG_OPEN_ON_EVERY_UPDATE + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG, + parse: CONFIG_OPEN_ON_EVERY_UPDATE + CONFIG_READ_FROM_HEAD + PARSE_SINGLE_LINE_CONFIG) + def test_rotate_file_with_open_on_every_update(data) + config = data + events = sub_test_rotate_file(config, expect_records: 6) + assert_equal(6, events.length) + assert_equal({"message" => "test1"}, events[0][2]) + assert_equal({"message" => "test2"}, events[1][2]) + assert_equal({"message" => "test3"}, events[2][2]) + assert_equal({"message" => "test4"}, events[3][2]) + assert_equal({"message" => "test5"}, events[4][2]) + assert_equal({"message" => "test6"}, events[5][2]) + end + data(flat: SINGLE_LINE_CONFIG, parse: PARSE_SINGLE_LINE_CONFIG) def test_rotate_file_with_write_old(data) @@ -487,6 +502,30 @@ def test_from_encoding assert_equal(Encoding::UTF_8, events[0][2]['message'].encoding) end + def test_from_encoding_utf16 + conf = config_element( + "", "", { + "format" => "/(?.*)/", + "read_from_head" => "true", + "from_encoding" => "utf-16le", + "encoding" => "utf-8" + }) + d = create_driver(conf) + utf16_message = "\u306F\u308D\u30FC\u308F\u30FC\u308B\u3069\n".encode(Encoding::UTF_16LE) + utf8_message = utf16_message.encode(Encoding::UTF_8).strip + + d.run(expect_emits: 1) do + File.open("#{TMP_DIR}/tail.txt", "w:utf-16le") { |f| + f.write utf16_message + } + end + + events = d.events + assert_equal(utf8_message, events[0][2]['message']) + assert_equal(Encoding::UTF_8, events[0][2]['message'].encoding) + end + + sub_test_case "multiline" do data(flat: MULTILINE_CONFIG, parse: PARSE_MULTILINE_CONFIG) @@ -769,7 +808,7 @@ def test_z_refresh_watchers Timecop.freeze(2010, 1, 2, 3, 4, 5) do flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass| EX_PATHS.each do |path| - watcherclass.should_receive(:new).with(path, EX_RORATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, 1000, any, any, any).once.and_return do + watcherclass.should_receive(:new).with(path, EX_RORATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, 1000, any, any, any, any, any, any).once.and_return do flexmock('TailWatcher') { |watcher| watcher.should_receive(:attach).once watcher.should_receive(:unwatched=).zero_or_more_times @@ -787,7 +826,7 @@ def test_z_refresh_watchers Timecop.freeze(2010, 1, 2, 3, 4, 6) do flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass| - watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_RORATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, 1000, any, any, any).once.and_return do + watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_RORATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, 1000, any, any, any, any, any, any).once.and_return do flexmock('TailWatcher') do |watcher| watcher.should_receive(:attach).once watcher.should_receive(:unwatched=).zero_or_more_times