Skip to content

Commit

Permalink
Add #slice and correct #size method to classes of event stream.
Browse files Browse the repository at this point in the history
To implement it efficiently, caching unpacked objects were introduced in MessagePackEventStream.
It make it possible to slice, iterate and duplicate it efficiently after it was iterated or #size
called at once.
  • Loading branch information
tagomoris committed Jul 19, 2016
1 parent a353a36 commit 55eda5f
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 7 deletions.
68 changes: 61 additions & 7 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def repeatable?
false
end

def slice(index, num)
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end

def each(&block)
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end
Expand Down Expand Up @@ -124,6 +128,10 @@ def empty?
@entries.empty?
end

def slice(index, num)
ArrayEventStream.new(@entries.slice(index, num))
end

def each(&block)
@entries.each(&block)
nil
Expand Down Expand Up @@ -167,6 +175,10 @@ def empty?
@time_array.empty?
end

def slice(index, num)
MultiEventStream.new(@time_array.slice(index, num), @record_array.slice(index, num))
end

def each(&block)
time_array = @time_array
record_array = @record_array
Expand All @@ -178,32 +190,74 @@ def each(&block)
end

class MessagePackEventStream < EventStream
# Keep cached_unpacker argument for existence plugins
def initialize(data, cached_unpacker = nil, size = 0)
# https://github.com/msgpack/msgpack-ruby/issues/119

# Keep cached_unpacker argument for existing plugins
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
@data = data
@size = size
@unpacked_times = unpacked_times
@unpacked_records = unpacked_records
end

def empty?
# This is not correct, but actual number of records will be shown after iteration, and
# "size" argument is always 0 currently (because forward protocol doesn't tell it to destination)
false
@data.empty?
end

def dup
MessagePackEventStream.new(@data.dup, @size)
if @unpacked_times
MessagePackEventStream.new(@data.dup, nil, @size, unpacked_times: @unpacked_times, unpacked_records: @unpacked_records.map(&:dup))
else
MessagePackEventStream.new(@data.dup, nil, @size)
end
end

def size
# @size is unbelievable always when @size == 0
# If the number of events is really zero, unpacking events takes very short time.
ensure_unpacked! if @size == 0
@size
end

def repeatable?
true
end

def ensure_unpacked!
return if @unpacked_times && @unpacked_records
@unpacked_times = []
@unpacked_records = []
msgpack_unpacker.feed_each(@data) do |time, record|
@unpacked_times << time
@unpacked_records << record
end
# @size should be updated always right after unpack.
# The real size of unpacked objects are correct, rather than given size.
@size = @unpacked_times.size
end

# This method returns MultiEventStream, because there are no reason
# to surve binary serialized by msgpack.
def slice(index, num)
ensure_unpacked!
MultiEventStream.new(@unpacked_times.slice(index, num), @unpacked_records.slice(index, num))
end

def each(&block)
msgpack_unpacker.feed_each(@data, &block)
if @unpacked_times
@unpacked_times.each_with_index do |time, i|
block.call(time, @unpacked_records[i])
end
else
@unpacked_times = []
@unpacked_records = []
msgpack_unpacker.feed_each(@data) do |time, record|
@unpacked_times << time
@unpacked_records << record
block.call(time, record)
end
@size = @unpacked_times.size
end
nil
end

Expand Down
93 changes: 93 additions & 0 deletions test/test_event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,35 @@ def setup
assert_true ArrayEventStream.new([]).empty?
end

test 'size' do
assert_equal 2, @es.size
assert_equal 0, ArrayEventStream.new([]).size
end

test 'slice' do
sliced = @es.slice(1,1)
assert_kind_of EventStream, sliced
assert_equal 1, sliced.size

sliced.each do |time, record|
assert_equal @times[1], time
assert_equal 'v2', record['k']
assert_equal 2, record['n']
end

sliced = @es.slice(0,2)
assert_kind_of EventStream, sliced
assert_equal 2, sliced.size

counter = 0
sliced.each do |time, record|
assert_equal @times[counter], time
assert_equal @records[counter]['k'], record['k']
assert_equal @records[counter]['n'], record['n']
counter += 1
end
end

test 'each' do
i = 0
@es.each { |time, record|
Expand Down Expand Up @@ -164,6 +193,35 @@ def setup
assert_true MultiEventStream.new.empty?
end

test 'size' do
assert_equal 2, @es.size
assert_equal 0, MultiEventStream.new.size
end

test 'slice' do
sliced = @es.slice(1,1)
assert_kind_of EventStream, sliced
assert_equal 1, sliced.size

sliced.each do |time, record|
assert_equal @times[1], time
assert_equal 'v2', record['k']
assert_equal 2, record['n']
end

sliced = @es.slice(0,2)
assert_kind_of EventStream, sliced
assert_equal 2, sliced.size

counter = 0
sliced.each do |time, record|
assert_equal @times[counter], time
assert_equal @records[counter]['k'], record['k']
assert_equal @records[counter]['n'], record['n']
counter += 1
end
end

test 'each' do
i = 0
@es.each { |time, record|
Expand Down Expand Up @@ -204,16 +262,51 @@ def setup
assert_kind_of MessagePackEventStream, dupped
assert_not_equal @es.object_id, dupped.object_id
assert_duplicated_records @es, dupped

# After iteration of events (done in assert_duplicated_records),
# duplicated event stream still has unpacked objects and correct size
dupped = @es.dup
assert_equal 2, dupped.instance_eval{ @size }
end

test 'empty?' do
assert_false @es.empty?
assert_true MessagePackEventStream.new('', 0).empty?
end

test 'size' do
assert_equal 2, @es.size
assert_equal 0, MessagePackEventStream.new('').size
end

test 'repeatable?' do
assert_true @es.repeatable?
end

test 'slice' do
sliced = @es.slice(1,1)
assert_kind_of EventStream, sliced
assert_equal 1, sliced.size

sliced.each do |time, record|
assert_equal @times[1], time
assert_equal 'v2', record['k']
assert_equal 2, record['n']
end

sliced = @es.slice(0,2)
assert_kind_of EventStream, sliced
assert_equal 2, sliced.size

counter = 0
sliced.each do |time, record|
assert_equal @times[counter], time
assert_equal @records[counter]['k'], record['k']
assert_equal @records[counter]['n'], record['n']
counter += 1
end
end

test 'each' do
i = 0
@es.each { |time, record|
Expand Down

0 comments on commit 55eda5f

Please sign in to comment.