diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 5230b0ce2d..bf936b376e 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -317,19 +317,33 @@ def refresh_watchers end def setup_watcher(path, pe) - line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil - tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines)) - tw.attach do |watcher| - event_loop_attach(watcher.timer_trigger) if watcher.timer_trigger - event_loop_attach(watcher.stat_trigger) if watcher.stat_trigger + line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil + tw = TailWatcher.new(path, pe, log, @read_from_head, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines)) + + if @enable_watch_timer + tt = TimerTrigger.new(1, log) { tw.on_notify } + tw.register_watcher(tt) + end + + if @enable_stat_watcher + tt = StatWatcher.new(path, log) { tw.on_notify } + tw.register_watcher(tt) + end + + tw.on_notify + + tw.watchers.each do |watcher| + event_loop_attach(watcher) end + tw rescue => e if tw - tw.detach { |watcher| - event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger - event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger - } + tw.watchers.each do |watcher| + event_loop_detach(watcher) + end + + tw.detach tw.close end raise e @@ -384,6 +398,8 @@ def close_watcher_handles # refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety. def update_watcher(path, pe) + log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds") + if @pf unless pe.read_inode == @pf[path].read_inode log.debug "Skip update_watcher because watcher has been already updated by other inotify event" @@ -400,10 +416,11 @@ def update_watcher(path, pe) # so adding close_io argument to avoid this problem. # At shutdown, IOHandler's io will be released automatically after detached the event loop def detach_watcher(tw, close_io = true) - tw.detach { |watcher| - event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger - event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger - } + tw.watchers.each do |watcher| + event_loop_detach(watcher) + end + tw.detach + tw.close if close_io flush_buffer(tw) if tw.unwatched && @pf @@ -420,7 +437,7 @@ def detach_watcher_after_rotate_wait(tw) end def flush_buffer(tw) - if lb = tw.line_buffer + if lb = tw.line_buffer_timer_flusher&.line_buffer lb.chomp! @parser.parse(lb) { |time, record| if time && record @@ -490,11 +507,12 @@ def parse_singleline(lines, tail_watcher) es end + # No need to check if line_buffer_timer_flusher is nil, since line_buffer_timer_flusher should exist def parse_multilines(lines, tail_watcher) - lb = tail_watcher.line_buffer + lb = tail_watcher.line_buffer_timer_flusher.line_buffer es = Fluent::MultiEventStream.new if @parser.has_firstline? - tail_watcher.line_buffer_timer_flusher.reset_timer if tail_watcher.line_buffer_timer_flusher + tail_watcher.line_buffer_timer_flusher.reset_timer lines.each { |line| if @parser.firstline?(line) if lb @@ -524,59 +542,75 @@ def parse_multilines(lines, tail_watcher) } end end - tail_watcher.line_buffer = lb + tail_watcher.line_buffer_timer_flusher.line_buffer = lb es end + class StatWatcher < Coolio::StatWatcher + def initialize(path, log, &callback) + @callback = callback + @log = log + super(path) + end + + def on_change(prev, cur) + @callback.call + rescue + @log.error $!.to_s + @log.error_backtrace + end + end + + class TimerTrigger < Coolio::TimerWatcher + def initialize(interval, log, &callback) + @log = log + @callback = callback + super(interval, true) + end + + def on_timer + @callback.call + rescue => e + @log.error e.to_s + @log.error_backtrace + end + end + class TailWatcher - def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, enable_stat_watcher, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) + def initialize(path, pe, log, read_from_head, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) @path = path - @rotate_wait = rotate_wait @pe = pe || MemoryPositionEntry.new @read_from_head = read_from_head - @enable_watch_timer = enable_watch_timer - @enable_stat_watcher = enable_stat_watcher @read_lines_limit = read_lines_limit @receive_lines = receive_lines @update_watcher = update_watcher - @stat_trigger = @enable_stat_watcher ? StatWatcher.new(path, log, &method(:on_notify)) : nil - @timer_trigger = @enable_watch_timer ? TimerTrigger.new(1, log, &method(:on_notify)) : nil - @rotate_handler = RotateHandler.new(log, &method(:on_rotate)) @io_handler = nil @log = log - @line_buffer = nil @line_buffer_timer_flusher = line_buffer_timer_flusher @from_encoding = from_encoding @encoding = encoding @open_on_every_update = open_on_every_update + @watchers = [] end attr_reader :path - attr_reader :log, :pe, :read_lines_limit, :open_on_every_update - attr_reader :from_encoding, :encoding - attr_reader :stat_trigger, :enable_watch_timer, :enable_stat_watcher - attr_accessor :timer_trigger - attr_accessor :line_buffer, :line_buffer_timer_flusher + attr_reader :pe + attr_reader :line_buffer_timer_flusher attr_accessor :unwatched # This is used for removing position entry from PositionFile + attr_reader :watchers def tag @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '') end - def wrap_receive_lines(lines) - @receive_lines.call(lines, self) - end - - def attach - on_notify - yield self + def register_watcher(watcher) + @watchers << watcher end def detach - yield self @io_handler.on_notify if @io_handler end @@ -630,7 +664,7 @@ def on_rotate(stat) pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end - @io_handler = IOHandler.new(self, &method(:wrap_receive_lines)) + @io_handler = io_handler else @io_handler = NullIOHandler.new end @@ -655,18 +689,21 @@ def on_rotate(stat) watcher_needs_update = true end - log_msg = "detected rotation of #{@path}" - log_msg << "; waiting #{@rotate_wait} seconds" if watcher_needs_update # wait rotate_time if previous file exists - @log.info log_msg - if watcher_needs_update @update_watcher.call(@path, swap_state(@pe)) else - @io_handler = IOHandler.new(self, &method(:wrap_receive_lines)) + @log.info "detected rotation of #{@path}" + @io_handler = io_handler end end end + def io_handler + IOHandler.new(self, path: @path, log: @log, read_lines_limit: @read_lines_limit, open_on_every_update: @open_on_every_update, from_encoding: @from_encoding, encoding: @encoding) do |lines| + @receive_lines.call(lines, self) + end + end + def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = MemoryPositionEntry.new @@ -675,37 +712,6 @@ def swap_state(pe) pe # This pe will be updated in on_rotate after TailWatcher is initialized end - class TimerTrigger < Coolio::TimerWatcher - def initialize(interval, log, &callback) - @callback = callback - @log = log - super(interval, true) - end - - def on_timer - @callback.call - rescue => e - @log.error e.to_s - @log.error_backtrace - end - end - - class StatWatcher < Coolio::StatWatcher - def initialize(path, log, &callback) - @callback = callback - @log = log - super(path) - end - - def on_change(prev, cur) - @callback.call - rescue - # TODO log? - @log.error $!.to_s - @log.error_backtrace - end - end - class FIFO def initialize(from_encoding, encoding) @from_encoding = from_encoding @@ -765,15 +771,20 @@ def bytesize end class IOHandler - def initialize(watcher, &receive_lines) + def initialize(watcher, path:, read_lines_limit:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines) @watcher = watcher + @path = path + @read_lines_limit = read_lines_limit @receive_lines = receive_lines - @fifo = FIFO.new(@watcher.from_encoding || Encoding::ASCII_8BIT, @watcher.encoding || Encoding::ASCII_8BIT) + @open_on_every_update = open_on_every_update + @fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT) @iobuf = ''.force_encoding('ASCII-8BIT') @lines = [] @io = nil @notify_mutex = Mutex.new - @watcher.log.info "following tail of #{@watcher.path}" + @log = log + + @log.info "following tail of #{@path}" end def on_notify @@ -790,7 +801,7 @@ def handle_notify while true @fifo << io.readpartial(8192, @iobuf) @fifo.read_lines(@lines) - if @lines.size >= @watcher.read_lines_limit + if @lines.size >= @read_lines_limit # not to use too much memory in case the file is very large read_more = true break @@ -824,18 +835,18 @@ def opened? end def open - io = Fluent::FileWrapper.open(@watcher.path) + io = Fluent::FileWrapper.open(@path) io.seek(@watcher.pe.read_pos + @fifo.bytesize) io rescue RangeError io.close if io - raise WatcherSetupError, "seek error with #{@watcher.path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}" + raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}" rescue Errno::ENOENT nil end def with_io - if @watcher.open_on_every_update + if @open_on_every_update io = open begin yield io @@ -850,8 +861,8 @@ def with_io close raise e rescue - @watcher.log.error $!.to_s - @watcher.log.error_backtrace + @log.error $!.to_s + @log.error_backtrace close end end @@ -903,24 +914,31 @@ def on_notify(stat) end class LineBufferTimerFlusher + attr_accessor :line_buffer + def initialize(log, flush_interval, &flush_method) @log = log @flush_interval = flush_interval @flush_method = flush_method @start = nil + @line_buffer = nil end def on_notify(tw) - if @start && @flush_interval - if Time.now - @start >= @flush_interval - @flush_method.call(tw) - tw.line_buffer = nil - @start = nil - end + unless @start && @flush_method + return + end + + if Time.now - @start >= @flush_interval + @flush_method.call(tw) + @line_buffer = nil + @start = nil end end def reset_timer + return unless @flush_interval + @start = Time.now end end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 2a9e3b5883..6aacf0d068 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -1093,35 +1093,18 @@ def test_z_refresh_watchers end Timecop.freeze(2010, 1, 2, 3, 4, 5) do - flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass| - EX_PATHS.each do |path| - watcherclass.should_receive(:new).with(path, EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, any, any, any, any, any, any).once.and_return do - flexmock('TailWatcher') { |watcher| - watcher.should_receive(:attach).once - watcher.should_receive(:unwatched=).zero_or_more_times - watcher.should_receive(:line_buffer).zero_or_more_times - } - end - end - plugin.refresh_watchers + EX_PATHS.each do |path| + mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(path, anything, anything, true, 1000, anything, anything, anything, anything, false).once end - end - plugin.instance_eval do - @tails['test/plugin/data/2010/01/20100102-030405.log'].should_receive(:close).zero_or_more_times + plugin.refresh_watchers end + mock.proxy(plugin).detach_watcher_after_rotate_wait(plugin.instance_variable_get(:@tails)['test/plugin/data/2010/01/20100102-030405.log']) + Timecop.freeze(2010, 1, 2, 3, 4, 6) do - flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass| - watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, any, any, any, any, any, any).once.and_return do - flexmock('TailWatcher') do |watcher| - watcher.should_receive(:attach).once - watcher.should_receive(:unwatched=).zero_or_more_times - watcher.should_receive(:line_buffer).zero_or_more_times - end - end - plugin.refresh_watchers - end + mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new('test/plugin/data/2010/01/20100102-030406.log', anything, anything, true, 1000, anything, anything, anything, anything, false).once + plugin.refresh_watchers flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass| watcherclass.should_receive(:new).never