From dcac5fbe47880c1d7a22b8223e8e9684f73a3fbf Mon Sep 17 00:00:00 2001 From: Alexey Schurov Date: Fri, 31 Aug 2018 11:07:03 +0300 Subject: [PATCH 1/2] in_tail: Prevent thread switching in the interval between seek and read/write operations to pos_file Signed-off-by: Alexey Schurov --- lib/fluent/plugin/in_tail.rb | 38 +++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 0baf9d8a79..35b04d0e08 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -869,8 +869,9 @@ def reset_timer class PositionFile UNWATCHED_POSITION = 0xffffffffffffffff - def initialize(file, map, last_pos) + def initialize(file, file_mutex, map, last_pos) @file = file + @file_mutex = file_mutex @map = map @last_pos = last_pos end @@ -880,19 +881,19 @@ def [](path) return m end - @file.pos = @last_pos - @file.write path - @file.write "\t" - seek = @file.pos - @file.write "0000000000000000\t0000000000000000\n" - @last_pos = @file.pos - - @map[path] = FilePositionEntry.new(@file, seek, 0, 0) + @file_mutex.synchronize { + @file.pos = @last_pos + @file.write "#{path}\t0000000000000000\t0000000000000000\n" + seek = @last_pos + path.bytesize + 1 + @last_pos = @file.pos + @map[path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0) + } end def self.parse(file) compact(file) + file_mutex = Mutex.new map = {} file.pos = 0 file.each_line {|line| @@ -902,9 +903,9 @@ def self.parse(file) pos = m[2].to_i(16) ino = m[3].to_i(16) seek = file.pos - line.bytesize + path.bytesize + 1 - map[path] = FilePositionEntry.new(file, seek, pos, ino) + map[path] = FilePositionEntry.new(file, file_mutex, seek, pos, ino) } - new(file, map, file.pos) + new(file, file_mutex, map, file.pos) end # Clean up unwatched file entries @@ -935,23 +936,28 @@ class FilePositionEntry LN_OFFSET = 33 SIZE = 34 - def initialize(file, seek, pos, inode) + def initialize(file, file_mutex, seek, pos, inode) @file = file + @file_mutex = file_mutex @seek = seek @pos = pos @inode = inode end def update(ino, pos) - @file.pos = @seek - @file.write "%016x\t%016x" % [pos, ino] + @file_mutex.synchronize { + @file.pos = @seek + @file.write "%016x\t%016x" % [pos, ino] + } @pos = pos @inode = ino end def update_pos(pos) - @file.pos = @seek - @file.write "%016x" % pos + @file_mutex.synchronize { + @file.pos = @seek + @file.write "%016x" % pos + } @pos = pos end From cdc71091989cdbae4f4c4fd1b43d8a29c3c2e9c1 Mon Sep 17 00:00:00 2001 From: Alexey Schurov Date: Fri, 31 Aug 2018 11:23:56 +0300 Subject: [PATCH 2/2] in_tail: Add log messages about corrupted lines in pos_file Signed-off-by: Alexey Schurov --- lib/fluent/plugin/in_tail.rb | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 35b04d0e08..dbe628bea7 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -898,7 +898,10 @@ def self.parse(file) file.pos = 0 file.each_line {|line| m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line) - next unless m + unless m + $log.warn "Unparsable line in pos_file: #{line}" + next + end path = m[1] pos = m[2].to_i(16) ino = m[3].to_i(16) @@ -913,7 +916,10 @@ def self.compact(file) file.pos = 0 existent_entries = file.each_line.map { |line| m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line) - next unless m + unless m + $log.warn "Unparsable line in pos_file: #{line}" + next + end path = m[1] pos = m[2].to_i(16) ino = m[3].to_i(16)