Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MessagePackEventStream returns only first event #2099

Closed
osela opened this issue Aug 9, 2018 · 8 comments · Fixed by #2116
Closed

MessagePackEventStream returns only first event #2099

osela opened this issue Aug 9, 2018 · 8 comments · Fixed by #2116
Labels

Comments

@osela
Copy link
Contributor

osela commented Aug 9, 2018

After calling any? or first() on an instance of MessagePackEventStream which contains multiple events, iterating over it using each returns only the first event and the stream size is 0.

Here is a minimal reproduction:

require 'fluent/event'

time = Fluent::EventTime.now
array_stream = Fluent::ArrayEventStream.new([[time, {'a' => 1}], [time, {'a' => 2}]])
mp_stream = Fluent::MessagePackEventStream.new(array_stream.to_msgpack_stream)

puts mp_stream.any?

puts mp_stream.size # Prints 0

mp_stream.each do |time|
  puts time # Executes only once
end

Commenting the call to any? makes it work as it should. Am I missing something here?
I noticed it trying to debug a specific filter directly following a forward-input.
I'm using fluentd 1.2.4.

@fujimotos
Copy link
Member

AFAIK, MessagePackEventStream is designed to do a lazy parse of the given
msgpack payload and does not work well with any? or first? due to this design.

If you really want to use these methods, you first need to call ensure_unpacked!:

require 'fluent/event'

time = Fluent::EventTime.now
array_stream = Fluent::ArrayEventStream.new([[time, {'a' => 1}], [time, {'a' => 2}]])
mp_stream = Fluent::MessagePackEventStream.new(array_stream.to_msgpack_stream)

mp_stream.ensure_unpacked!

puts mp_stream.any?

puts mp_stream.size # Prints 0

mp_stream.each do |time|
  puts time # Executes only once
end

The above code should work just as you expect.

@osela
Copy link
Contributor Author

osela commented Aug 10, 2018

@fujimotos I understand that ensure_unpacked! would solve the issue in my example, but I still think the behaviour is problematic.

MessagePackEventStream is an EventStream and includes the Enumerable mixin, so as a user I expect it to behave the same as all other EventStream implementations and support the Enumerable methods. Requiring different behaviour based on whether the stream is MessagePackEventStream or not breaks some basic OO design principles.

From my issue it may seem like I'm "picking" on MessagePackEventStream and specifically trying to find ways to break it, but we came across this in reality. We used a fairly popular custom filter plugin called fluent-plugin-kubernetes_metadata_filter directly following an in_forward input (which returns an instance of MessagePackEventStream) and it processed only the first event in each stream. The code simply calls first on the stream before using each.

@fujimotos
Copy link
Member

fujimotos commented Aug 15, 2018

@osela OK, I've got your point.

MessagePackEventStream is an EventStream and includes the Enumerable mixin, so as a user I expect it to behave the same as all other EventStream implementations and support the Enumerable methods. Requiring different behaviour based on whether the stream is MessagePackEventStream or not breaks some basic OO design principles.

I actully agree with you on that point.

As long as plugin developers misuse these methods in the wild, I concur
that it is better to fix the abstraction leak and make these method
work just as expected.

In this particular case, the problem is that first and any? poison
the internal cache of MessagePackEventStream, and remaining events became
"unreadable" due to this, which is not a desirable behaviour.

Maybe we can implement first and any? on MessagePackEventStream
and call ensure_unpacked! internally. This should make these methods
work just as supposed.

@osela
Copy link
Contributor Author

osela commented Aug 15, 2018

@fujimotos It's not just first and any?, but any Enumerable method that may exit early before iterating all the items (take and many more).

Would you mind sharing some insight regarding the usage of MessagePackEventStream? Specifically, what are the scenarios in which unpacking only a subset of the items is needed?

@fujimotos
Copy link
Member

Specifically, what are the scenarios in which unpacking only a subset of the items is needed?

I actually don't know any case.

My basic stance here is that, since this class is intentionally designed to defer the unpack
of blobs as late as possible (in spite of the obvious drawbacks discussed here), the original
author must have good reasons to do so (maybe performance-related reasons).

I'm not saying the current design is super convincing (for example, its child class
CompressedMessagePackStream does descompress the whole blob at one go,
then, why can't MeessagePackStream do that?), but as long as it is intentional, I'd
like to respect the current design as possible.

@osela
Copy link
Contributor Author

osela commented Aug 28, 2018

@fujimotos I investigated the issue further and also checked the commit where the code was introduced and I'm fairly certain it's a bug rather than the intended behavior.

The problem is in the each method:

def each(&block)
      if @unpacked_times
        @unpacked_times.each_with_index do |time, i|
          block.call(time, @unpacked_records[i])
        end
      else
        @unpacked_times = []
        @unpacked_records = []
        msgpack_unpacker.feed_each(@data) do |time, record|
          @unpacked_times << time
          @unpacked_records << record
          block.call(time, record)
        end
        @size = @unpacked_times.size
      end
      nil
    end
end

The any? method (and all other Enumerable methods) call each internally from C code. In many of these methods, once block was called enough times (for any? it's one time), each exists immediately. This means @unpacked_times holds just a subset of the records and @size is still 0 because the @size = @unpacked_times.size is not executed.

I guess the code was written this way to avoid looping over the data twice, once for unpacking and again for returning it from each. But again, I think it's unintentional.

I'm going to create a pull request to make each call ensure_unpacked!.

What do you think? @tagomoris, do you have some input by any chance?

@github-actions
Copy link

This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days

@github-actions github-actions bot added the stale label Jan 26, 2021
@github-actions
Copy link

This issue was automatically closed because of stale in 30 days

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants