Skip to content

Commit

Permalink
Merge pull request #1722 from fluent/lock-buffers-in-order-of-metadata
Browse files Browse the repository at this point in the history
Lock buffers in order of metadata
  • Loading branch information
repeatedly authored Nov 2, 2017
2 parents 9752b76 + 2369a4a commit 75fef8f
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 2 deletions.
72 changes: 71 additions & 1 deletion lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,74 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
def empty?
timekey.nil? && tag.nil? && variables.nil?
end

def cmp_variables(v1, v2)
if v1.nil? && v2.nil?
return 0
elsif v1.nil? # v2 is non-nil
return -1
elsif v2.nil? # v1 is non-nil
return 1
end
# both of v1 and v2 are non-nil
v1_sorted_keys = v1.keys.sort
v2_sorted_keys = v2.keys.sort
if v1_sorted_keys != v2_sorted_keys
if v1_sorted_keys.size == v2_sorted_keys.size
v1_sorted_keys <=> v2_sorted_keys
else
v1_sorted_keys.size <=> v2_sorted_keys.size
end
else
v1_sorted_keys.each do |k|
a = v1[k]
b = v2[k]
if a && b && a != b
return a <=> b
elsif a && b || (!a && !b) # same value (including both are nil)
next
elsif a # b is nil
return 1
else # a is nil (but b is non-nil)
return -1
end
end

0
end
end

def <=>(o)
timekey2 = o.timekey
tag2 = o.tag
variables2 = o.variables
if (!!timekey ^ !!timekey2) || (!!tag ^ !!tag2) || (!!variables ^ !!variables2)
# One has value in a field, but another doesn't have value in same field
# This case occurs very rarely
if timekey == timekey2 # including the case of nil == nil
if tag == tag2
cmp_variables(variables, variables2)
elsif tag.nil?
-1
elsif tag2.nil?
1
else
tag <=> tag2
end
elsif timekey.nil?
-1
elsif timekey2.nil?
1
else
timekey <=> timekey2
end
else
# objects have values in same field pairs (comparison with non-nil and nil doesn't occur here)
(timekey <=> timekey2 || 0).nonzero? || # if `a <=> b` is nil, then both are nil
(tag <=> tag2 || 0).nonzero? ||
cmp_variables(variables, variables2)
end
end
end

# for tests
Expand Down Expand Up @@ -203,7 +271,9 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
chunks_to_enqueue = []

begin
metadata_and_data.each do |metadata, data|
# sort metadata to get lock of chunks in same order with other threads
metadata_and_data.keys.sort.each do |metadata|
data = metadata_and_data[metadata]
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize|
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
Expand Down
3 changes: 2 additions & 1 deletion test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,8 @@ def create_chunk_es(metadata, es)
assert{ @p.stage[@dm1].size == 2 }
assert !@p.stage[@dm1].rollbacked

@p.stage[@dm1].failing = true
meta_list = [@dm0, @dm1, @dm2, @dm3].sort
@p.stage[meta_list.last].failing = true

assert_raise(FluentPluginBufferTest::DummyMemoryChunkError) do
@p.write({ @dm2 => [row], @dm3 => [row], @dm0 => [row, row, row], @dm1 => [row, row] })
Expand Down
89 changes: 89 additions & 0 deletions test/plugin/test_metadata.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
require_relative '../helper'
require 'fluent/plugin/buffer'

class BufferMetadataTest < Test::Unit::TestCase

def meta(timekey=nil, tag=nil, variables=nil)
Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables)
end

setup do
Fluent::Test.setup
end

sub_test_case 'about metadata' do
test 'comparison of variables should be stable' do
m = meta(nil, nil, nil)
# different sets of keys
assert_equal(-1, m.cmp_variables({}, {a: 1}))
assert_equal(1, m.cmp_variables({a: 1}, {}))
assert_equal(1, m.cmp_variables({c: 1}, {a: 1}))
assert_equal(-1, m.cmp_variables({a: 1}, {a: 1, b: 2}))
assert_equal(1, m.cmp_variables({a: 1, c: 1}, {a: 1, b: 2}))
assert_equal(1, m.cmp_variables({a: 1, b: 0, c: 1}, {a: 1, b: 2}))
# same set of keys
assert_equal(-1, m.cmp_variables({a: 1}, {a: 2}))
assert_equal(-1, m.cmp_variables({a: 1, b: 0}, {a: 1, b: 1}))
assert_equal(-1, m.cmp_variables({a: 1, b: 1, c: 100}, {a: 1, b: 1, c: 200}))
assert_equal(-1, m.cmp_variables({b: 1, c: 100, a: 1}, {a: 1, b: 1, c: 200})) # comparison sorts keys
assert_equal(-1, m.cmp_variables({a: nil}, {a: 1}))
assert_equal(-1, m.cmp_variables({a: 1, b: nil}, {a: 1, b: 1}))
end

test 'comparison of metadata should be stable' do
n = Time.now.to_i

assert_equal(0, meta(nil, nil, nil) <=> meta(nil, nil, nil))
assert_equal(0, meta(n, nil, nil) <=> meta(n, nil, nil))
assert_equal(0, meta(nil, "t1", nil) <=> meta(nil, "t1", nil))
assert_equal(0, meta(nil, nil, {}) <=> meta(nil, nil, {}))
assert_equal(0, meta(nil, nil, {a: "1"}) <=> meta(nil, nil, {a: "1"}))
assert_equal(0, meta(n, nil, {}) <=> meta(n, nil, {}))
assert_equal(0, meta(n, "t1", {}) <=> meta(n, "t1", {}))
assert_equal(0, meta(n, "t1", {a: "x", b: 10}) <=> meta(n, "t1", {a: "x", b: 10}))

# timekey is 1st comparison key
assert_equal(-1, meta(n - 300, nil, nil) <=> meta(n - 270, nil, nil))
assert_equal(1, meta(n + 1, "a", nil) <=> meta(n - 1, "b", nil))
assert_equal(-1, meta(n - 1, nil, {a: 100}) <=> meta(n + 1, nil, {}))

# tag is 2nd
assert_equal(-1, meta(nil, "a", {}) <=> meta(nil, "b", {}))
assert_equal(-1, meta(n, "a", {}) <=> meta(n, "b", {}))
assert_equal(1, meta(nil, "x", {a: 1}) <=> meta(nil, "t", {}))
assert_equal(1, meta(n, "x", {a: 1}) <=> meta(n, "t", {}))
assert_equal(1, meta(nil, "x", {a: 1}) <=> meta(nil, "t", {a: 1}))
assert_equal(1, meta(n, "x", {a: 1}) <=> meta(n, "t", {a: 2}))
assert_equal(1, meta(n, "x", {a: 1}) <=> meta(n, "t", {a: 10, b: 1}))

# variables is the last
assert_equal(-1, meta(nil, nil, {}) <=> meta(nil, nil, {a: 1}))
assert_equal(-1, meta(n, "t", {}) <=> meta(n, "t", {a: 1}))
assert_equal(1, meta(n, "t", {a: 1}) <=> meta(n, "t", {}))
assert_equal(-1, meta(n, "t", {a: 1}) <=> meta(n, "t", {a: 2}))
assert_equal(-1, meta(n, "t", {a: 1}) <=> meta(n, "t", {a: 1, b: 1}))
assert_equal(1, meta(nil, nil, {b: 1}) <=> meta(nil, nil, {a: 1}))
assert_equal(1, meta(n, "t", {b: 1}) <=> meta(n, "t", {a: 1}))
end

test 'metadata can be sorted' do
n = Time.now.to_i
m0 = meta(nil, nil, nil)
m1 = meta(n - 1, nil, nil)
m2 = meta(n - 1, "a", nil)
m3 = meta(n - 1, "a", {a: 1})
m4 = meta(n - 1, "a", {a: 100})
m5 = meta(n - 1, "a", {a: 100, b: 1})
m6 = meta(n - 1, "aa", nil)
m7 = meta(n - 1, "aa", {a: 1})
m8 = meta(n - 1, "b", nil)
m9 = meta(n, nil, nil)
m10 = meta(n + 1, nil, {a: 1})
expected = [m0, m1, m2, m3, m4, m5, m6, m7, m8, m9, m10].freeze
ary = expected.dup
100.times do
assert_equal expected, ary.shuffle.sort
end
end
end
end

0 comments on commit 75fef8f

Please sign in to comment.