From 66e618e6d93ccc2928e378b8b5eb702723797bf7 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 25 Nov 2015 19:03:15 +0900 Subject: [PATCH] fix bug not to protect in-memory-buffer for multi thread emitting / run-loop --- lib/fluent/process.rb | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/lib/fluent/process.rb b/lib/fluent/process.rb index f2b04e9058..d49a41973f 100644 --- a/lib/fluent/process.rb +++ b/lib/fluent/process.rb @@ -232,26 +232,36 @@ def initialize(w, interval) @w = w @interval = interval @buffer = {} + @mutex = Mutex.new Thread.new(&method(:run)) end def emit(tag, es) - if ms = @buffer[tag] - ms << es.to_msgpack_stream - else - @buffer[tag] = es.to_msgpack_stream + stream = es.to_msgpack_stream + @mutex.synchronize do + if @buffer[tag] + @buffer[tag] << stream + else + @buffer[tag] = stream + end end end def run while true sleep @interval - @buffer.keys.each {|tag| - if ms = @buffer.delete(tag) - [tag, ms].to_msgpack(@w) - #@w.write [tag, ms].to_msgpack + + pairs = [] + @mutex.synchronize do + @buffer.keys.each do |tag| + if msg = @buffer.delete(tag) + pairs << [tag, ms] + end end - } + end + pairs.each do |pair| + pair.to_msgpack(@w) + end end rescue $log.error "error on forwerder thread", :error=>$!.to_s