From 0219906df332853eb9f839c63f61c2495dc445ed Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Sat, 21 Oct 2017 21:10:39 +0900 Subject: [PATCH 1/4] add methods to sort metadata, and these tests --- lib/fluent/plugin/buffer.rb | 66 ++++++++++++++++++++++++++ test/plugin/test_metadata.rb | 89 ++++++++++++++++++++++++++++++++++++ 2 files changed, 155 insertions(+) create mode 100644 test/plugin/test_metadata.rb diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 954d07c7c2..71099c0e42 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -62,6 +62,72 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than def empty? timekey.nil? && tag.nil? && variables.nil? end + + def cmp_variables(v1, v2) + if v1.nil? && v2.nil? + return 0 + elsif v1.nil? # v2 is non-nil + return -1 + elsif v2.nil? # v1 is non-nil + return 1 + end + # both of v1 and v2 are non-nil + if v1.keys.sort != v2.keys.sort + if v1.keys.size == v2.keys.size + v1.keys.sort <=> v2.keys.sort + else + v1.keys.size <=> v2.keys.size + end + else + v1.keys.sort.each do |k| + a = v1[k] + b = v2[k] + if a && b && a != b + return a <=> b + elsif a && b || (!a && !b) # same value (including both are nil) + next + elsif a # b is nil + return 1 + else # a is nil (but b is non-nil) + return -1 + end + end + + 0 + end + end + + def <=>(o) + timekey2 = o.timekey + tag2 = o.tag + variables2 = o.variables + if (!!timekey ^ !!timekey2) || (!!tag ^ !!tag2) || (!!variables ^ !!variables2) + # One has value in a field, but another doesn't have value in same field + # This case occurs very rarely + if timekey == timekey2 # including the case of nil == nil + if tag == tag2 + cmp_variables(variables, variables2) + elsif tag.nil? + -1 + elsif tag2.nil? + 1 + else + tag <=> tag2 + end + elsif timekey.nil? + -1 + elsif timekey2.nil? + 1 + else + timekey <=> timekey2 + end + else + # objects have values in same field pairs (comparison with non-nil and nil doesn't occur here) + (timekey <=> timekey2 || 0).nonzero? || # if `a <=> b` is nil, then both are nil + (tag <=> tag2 || 0).nonzero? || + cmp_variables(variables, variables2) + end + end end # for tests diff --git a/test/plugin/test_metadata.rb b/test/plugin/test_metadata.rb new file mode 100644 index 0000000000..58f0357b20 --- /dev/null +++ b/test/plugin/test_metadata.rb @@ -0,0 +1,89 @@ +require_relative '../helper' +require 'fluent/plugin/buffer' + +class BufferMetadataTest < Test::Unit::TestCase + + def meta(timekey=nil, tag=nil, variables=nil) + Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables) + end + + setup do + Fluent::Test.setup + end + + sub_test_case 'about metadata' do + test 'comparison of variables should be stable' do + m = meta(nil, nil, nil) + # different sets of keys + assert_equal(-1, m.cmp_variables({}, {a: 1})) + assert_equal(1, m.cmp_variables({a: 1}, {})) + assert_equal(1, m.cmp_variables({c: 1}, {a: 1})) + assert_equal(-1, m.cmp_variables({a: 1}, {a: 1, b: 2})) + assert_equal(1, m.cmp_variables({a: 1, c: 1}, {a: 1, b: 2})) + assert_equal(1, m.cmp_variables({a: 1, b: 0, c: 1}, {a: 1, b: 2})) + # same set of keys + assert_equal(-1, m.cmp_variables({a: 1}, {a: 2})) + assert_equal(-1, m.cmp_variables({a: 1, b: 0}, {a: 1, b: 1})) + assert_equal(-1, m.cmp_variables({a: 1, b: 1, c: 100}, {a: 1, b: 1, c: 200})) + assert_equal(-1, m.cmp_variables({b: 1, c: 100, a: 1}, {a: 1, b: 1, c: 200})) # comparison sorts keys + assert_equal(-1, m.cmp_variables({a: nil}, {a: 1})) + assert_equal(-1, m.cmp_variables({a: 1, b: nil}, {a: 1, b: 1})) + end + + test 'comparison of metadata should be stable' do + n = Time.now.to_i + + assert_equal(0, meta(nil, nil, nil) <=> meta(nil, nil, nil)) + assert_equal(0, meta(n, nil, nil) <=> meta(n, nil, nil)) + assert_equal(0, meta(nil, "t1", nil) <=> meta(nil, "t1", nil)) + assert_equal(0, meta(nil, nil, {}) <=> meta(nil, nil, {})) + assert_equal(0, meta(nil, nil, {a: "1"}) <=> meta(nil, nil, {a: "1"})) + assert_equal(0, meta(n, nil, {}) <=> meta(n, nil, {})) + assert_equal(0, meta(n, "t1", {}) <=> meta(n, "t1", {})) + assert_equal(0, meta(n, "t1", {a: "x", b: 10}) <=> meta(n, "t1", {a: "x", b: 10})) + + # timekey is 1st comparison key + assert_equal(-1, meta(n - 300, nil, nil) <=> meta(n - 270, nil, nil)) + assert_equal(1, meta(n + 1, "a", nil) <=> meta(n - 1, "b", nil)) + assert_equal(-1, meta(n - 1, nil, {a: 100}) <=> meta(n + 1, nil, {})) + + # tag is 2nd + assert_equal(-1, meta(nil, "a", {}) <=> meta(nil, "b", {})) + assert_equal(-1, meta(n, "a", {}) <=> meta(n, "b", {})) + assert_equal(1, meta(nil, "x", {a: 1}) <=> meta(nil, "t", {})) + assert_equal(1, meta(n, "x", {a: 1}) <=> meta(n, "t", {})) + assert_equal(1, meta(nil, "x", {a: 1}) <=> meta(nil, "t", {a: 1})) + assert_equal(1, meta(n, "x", {a: 1}) <=> meta(n, "t", {a: 2})) + assert_equal(1, meta(n, "x", {a: 1}) <=> meta(n, "t", {a: 10, b: 1})) + + # variables is the last + assert_equal(-1, meta(nil, nil, {}) <=> meta(nil, nil, {a: 1})) + assert_equal(-1, meta(n, "t", {}) <=> meta(n, "t", {a: 1})) + assert_equal(1, meta(n, "t", {a: 1}) <=> meta(n, "t", {})) + assert_equal(-1, meta(n, "t", {a: 1}) <=> meta(n, "t", {a: 2})) + assert_equal(-1, meta(n, "t", {a: 1}) <=> meta(n, "t", {a: 1, b: 1})) + assert_equal(1, meta(nil, nil, {b: 1}) <=> meta(nil, nil, {a: 1})) + assert_equal(1, meta(n, "t", {b: 1}) <=> meta(n, "t", {a: 1})) + end + + test 'metadata can be sorted' do + n = Time.now.to_i + m0 = meta(nil, nil, nil) + m1 = meta(n - 1, nil, nil) + m2 = meta(n - 1, "a", nil) + m3 = meta(n - 1, "a", {a: 1}) + m4 = meta(n - 1, "a", {a: 100}) + m5 = meta(n - 1, "a", {a: 100, b: 1}) + m6 = meta(n - 1, "aa", nil) + m7 = meta(n - 1, "aa", {a: 1}) + m8 = meta(n - 1, "b", nil) + m9 = meta(n, nil, nil) + m10 = meta(n + 1, nil, {a: 1}) + expected = [m0, m1, m2, m3, m4, m5, m6, m7, m8, m9, m10].freeze + ary = expected.dup + 100.times do + assert_equal expected, ary.shuffle.sort + end + end + end +end From 703ec2ce7a4fc18e20bb36ed2965caac73c8d3d3 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Sat, 21 Oct 2017 21:12:04 +0900 Subject: [PATCH 2/4] sort metadata to be locked, not to make dead locks w/ other threads --- lib/fluent/plugin/buffer.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 71099c0e42..10216bbb83 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -269,7 +269,9 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) chunks_to_enqueue = [] begin - metadata_and_data.each do |metadata, data| + # sort metadata to get lock of chunks in same order with other threads + metadata_and_data.keys.sort.each do |metadata| + data = metadata_and_data[metadata] write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize| chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads operated_chunks << chunk From 8766438d8ef84b7868c7e069481230f0ce3c996a Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Tue, 31 Oct 2017 19:16:00 +0900 Subject: [PATCH 3/4] assign sorted keys list to local variable not to re-compute it --- lib/fluent/plugin/buffer.rb | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 10216bbb83..7bad8f9fbe 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -72,14 +72,16 @@ def cmp_variables(v1, v2) return 1 end # both of v1 and v2 are non-nil - if v1.keys.sort != v2.keys.sort - if v1.keys.size == v2.keys.size - v1.keys.sort <=> v2.keys.sort + v1_sorted_keys = v1.keys.sort + v2_sorted_keys = v2.keys.sort + if v1_sorted_keys != v2_sorted_keys + if v1_sorted_keys.size == v2_sorted_keys.size + v1_sorted_keys <=> v2_sorted_keys else - v1.keys.size <=> v2.keys.size + v1_sorted_keys.size <=> v2_sorted_keys.size end else - v1.keys.sort.each do |k| + v1_sorted_keys.each do |k| a = v1[k] b = v2[k] if a && b && a != b From 2369a4a604f8c8589699916d88b61554085f6214 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Tue, 31 Oct 2017 19:16:34 +0900 Subject: [PATCH 4/4] sort metadata list to occur failure at the last metadata of test target --- test/plugin/test_buffer.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index 431cb370e6..728bc9b84d 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -851,7 +851,8 @@ def create_chunk_es(metadata, es) assert{ @p.stage[@dm1].size == 2 } assert !@p.stage[@dm1].rollbacked - @p.stage[@dm1].failing = true + meta_list = [@dm0, @dm1, @dm2, @dm3].sort + @p.stage[meta_list.last].failing = true assert_raise(FluentPluginBufferTest::DummyMemoryChunkError) do @p.write({ @dm2 => [row], @dm3 => [row], @dm0 => [row, row, row], @dm1 => [row, row] })