From a1731f118f59c53807a8f2e09fee7600d2a27243 Mon Sep 17 00:00:00 2001 From: Harish Nelakurthi Date: Wed, 11 Dec 2019 22:54:10 -0800 Subject: [PATCH 1/2] Make stage_size & stage computation thread safe Signed-off-by: Harish Nelakurthi --- lib/fluent/plugin/buffer.rb | 51 +++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 58154a9d93..f0e10c5707 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -270,10 +270,10 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) log.on_trace { log.trace "writing events into buffer", instance: self.object_id, metadata_size: metadata_and_data.size } - staged_bytesize = 0 operated_chunks = [] unstaged_chunks = {} # metadata => [chunk, chunk, ...] chunks_to_enqueue = [] + staged_bytesizes_by_chunk = {} begin # sort metadata to get lock of chunks in same order with other threads @@ -283,7 +283,8 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads operated_chunks << chunk if chunk.staged? - staged_bytesize += adding_bytesize + staged_bytesizes_by_chunk[chunk] = 0 if staged_bytesizes_by_chunk[chunk].nil? + staged_bytesizes_by_chunk[chunk] += adding_bytesize elsif chunk.unstaged? unstaged_chunks[metadata] ||= [] unstaged_chunks[metadata] << chunk @@ -330,27 +331,39 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) # All locks about chunks are released. - synchronize do - # At here, staged chunks may be enqueued by other threads. - @stage_size += staged_bytesize - - chunks_to_enqueue.each do |c| - if c.staged? && (enqueue || chunk_size_full?(c)) - m = c.metadata - enqueue_chunk(m) - if unstaged_chunks[m] - u = unstaged_chunks[m].pop + # + # Now update the stage, stage_size with proper locking + # FIX FOR stage_size miscomputation - https://github.com/fluent/fluentd/issues/2712 + # + staged_bytesizes_by_chunk.each do |chunk, bytesize| + chunk.synchronize do + if chunk.staged? + synchronize { @stage_size += bytesize } + log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } } + end + end + end + + chunks_to_enqueue.each do |c| + if c.staged? && (enqueue || chunk_size_full?(c)) + m = c.metadata + enqueue_chunk(m) + if unstaged_chunks[m] + u = unstaged_chunks[m].pop + u.synchronize do if u.unstaged? && !chunk_size_full?(u) - @stage[m] = u.staged! - @stage_size += u.bytesize + synchronize { + @stage[m] = u.staged! + @stage_size += u.bytesize + } end end - elsif c.unstaged? - enqueue_unstaged_chunk(c) - else - # previously staged chunk is already enqueued, closed or purged. - # no problem. end + elsif c.unstaged? + enqueue_unstaged_chunk(c) + else + # previously staged chunk is already enqueued, closed or purged. + # no problem. end end From 065751f5c370b70ba8ab5e12aeced0c4a6138eb8 Mon Sep 17 00:00:00 2001 From: Harish Nelakurthi Date: Tue, 17 Dec 2019 10:41:07 -0800 Subject: [PATCH 2/2] add stage_size only for last successful write to the chunk Signed-off-by: Harish Nelakurthi --- lib/fluent/plugin/buffer.rb | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index f0e10c5707..37e4d9852d 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -283,8 +283,13 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads operated_chunks << chunk if chunk.staged? - staged_bytesizes_by_chunk[chunk] = 0 if staged_bytesizes_by_chunk[chunk].nil? - staged_bytesizes_by_chunk[chunk] += adding_bytesize + # + # https://github.com/fluent/fluentd/issues/2712 + # write_once is supposed to write to a chunk only once + # but this block **may** run multiple times from write_step_by_step and previous write may be rollbacked + # So we should be counting the stage_size only for the last successful write + # + staged_bytesizes_by_chunk[chunk] = adding_bytesize elsif chunk.unstaged? unstaged_chunks[metadata] ||= [] unstaged_chunks[metadata] << chunk @@ -337,10 +342,8 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) # staged_bytesizes_by_chunk.each do |chunk, bytesize| chunk.synchronize do - if chunk.staged? - synchronize { @stage_size += bytesize } - log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } } - end + synchronize { @stage_size += bytesize } + log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } } end end