Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_tail: Ensure to discard TailWatcher with missing target when follow_inodes #4239

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,24 @@ def close_watcher_handles

# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
def update_watcher(tail_watcher, pe, new_inode)
# TODO we should use another callback for this.
# To supress impact to existing logics, limit the case to `@follow_inodes`.
# We may not need `@follow_inodes` condition.
if @follow_inodes && new_inode.nil?
# nil inode means the file disappeared, so we only need to stop it.
@tails.delete(tail_watcher.path)
# https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632
# Because of this problem, log duplication can occur during `rotate_wait`.
# Need to set `rotate_wait 0` for a workaround.
# Duplication will occur if `refresh_watcher` is called during the `rotate_wait`.
# In that case, `refresh_watcher` will add the new TailWatcher to tail the same target,
# and it causes the log duplication.
# (Other `detach_watcher_after_rotate_wait` may have the same problem.
# We need the mechanism not to add duplicated TailWathcer with detaching TailWatcher.)
detach_watcher_after_rotate_wait(tail_watcher, pe.read_inode)
return
end

path = tail_watcher.path

log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds")
Expand Down Expand Up @@ -890,10 +908,14 @@ def on_rotate(stat)

if watcher_needs_update
if @follow_inodes
# No need to update a watcher if stat is nil (file not present), because moving to inodes will create
# new watcher, and old watcher will be closed by stop_watcher in refresh_watchers method
# don't want to swap state because we need latest read offset in pos file even after rotate_wait
@update_watcher.call(self, @pe, stat.ino) if stat
# If stat is nil (file not present), NEED to stop and discard this watcher.
# When the file is disappeared but is resurrected soon, then `#refresh_watcher`
# can't recognize this TailWatcher needs to be stopped.
# This can happens when the file is rotated.
# If a notify comes before the new file for the path is created during rotation,
# then it appears as if the file was resurrected once it disappeared.
# Don't want to swap state because we need latest read offset in pos file even after rotate_wait
@update_watcher.call(self, @pe, stat&.ino)
else
# Permit to handle if stat is nil (file not present).
# If a file is mv-ed and a new file is created during
Expand Down
117 changes: 117 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2899,5 +2899,122 @@ def test_updateTW_after_refreshTW
},
)
end

def test_path_resurrection
config = config_element(
"ROOT",
"",
{
"path" => "#{@tmp_dir}/tail.txt*",
"pos_file" => "#{@tmp_dir}/tail.pos",
"tag" => "t1",
"format" => "none",
"read_from_head" => "true",
"follow_inodes" => "true",
# In order to reproduce the same condition stably, ensure that `refresh_watchers` is not
# called by a timer.
"refresh_interval" => "1h",
# https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632
# Because of this problem, log duplication can occur during `rotate_wait`.
# Need to set `rotate_wait 0` for a workaround.
"rotate_wait" => "0s",
}
)
d = create_driver(config, false)

tail_watchers = []
stub.proxy(d.instance).setup_watcher do |tw|
tail_watchers.append(tw)
tw
end

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"}

d.run(expect_records: 5, timeout: 10) do
# Rotate
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"}
FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1")
# TailWatcher(path: "tail.txt", inode: inode_0) detects `tail.txt` disappeared.
# Call `update_watcher` to stop and discard self.
# If not discarding, then it will be a orphan and cause leak and log duplication.
#
# This reproduces the case where the notify to TailWatcher comes before the new file for the path
# is created during rotation.
# (stat_watcher notifies faster than a new file is created)
# Overall, this is a rotation operation, but from the TailWatcher, it appears as if the file
# was resurrected once it disappeared.
sleep 2 # On Windows and macOS, StatWatcher doesn't work, so need enough interval for TimeTrigger.
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# Add new TailWatchers
# tail.txt: TailWatcher(path: "tail.txt", inode: inode_1)
# tail.txt: TailWatcher(path: "tail.txt1", inode: inode_0)
# NOTE: If not discarding the first TailWatcher on notify, this makes it a orphan because
# this overwrites the `@tails[tail.txt]` by adding TailWatcher(path: "tail.txt", inode: inode_1)
d.instance.refresh_watchers

# This does nothing.
# NOTE: If not discarding the first TailWatcher on notify, this add
# tail.txt1: TailWatcher(path: "tail.txt1", inode: inode_0)
# because the previous refresh_watcher overwrites `@tails[tail.txt]` and the inode_0 is lost.
# This would cause log duplication.
d.instance.refresh_watchers

# Append to the old file
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt1", "ab") {|f| f.puts "file1 log3"}

# Append to the new current log file.
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"}
end

inode_0 = Fluent::FileWrapper.stat("#{@tmp_dir}/tail.txt1").ino
inode_1 = Fluent::FileWrapper.stat("#{@tmp_dir}/tail.txt").ino
record_values = d.events.collect { |event| event[2]["message"] }.sort
position_entries = []
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
f.readlines(chomp: true).each do |line|
values = line.split("\t")
position_entries.append([values[0], values[1], values[2].to_i(16)])
end
end

assert_equal(
{
record_values: ["file1 log1", "file1 log2", "file1 log3", "file2 log1", "file2 log2"],
tail_watcher_set: Set[
{
path: "#{@tmp_dir}/tail.txt",
inode: inode_0,
io_handler_opened_status: false,
},
{
path: "#{@tmp_dir}/tail.txt",
inode: inode_1,
io_handler_opened_status: false,
},
{
path: "#{@tmp_dir}/tail.txt1",
inode: inode_0,
io_handler_opened_status: false,
},
],
position_entries: [
["#{@tmp_dir}/tail.txt", "0000000000000021", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
],
},
{
record_values: record_values,
tail_watcher_set: Set.new(tail_watchers.collect { |tw|
{
path: tw.path,
inode: tw.ino,
io_handler_opened_status: tw.instance_variable_get(:@io_handler)&.opened? || false,
}
}),
position_entries: position_entries,
},
)
end
end
end