Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producing not working + crash for forked processes #19

Closed
thijsc opened this issue Feb 7, 2018 · 5 comments
Closed

Producing not working + crash for forked processes #19

thijsc opened this issue Feb 7, 2018 · 5 comments
Assignees
Labels

Comments

@thijsc
Copy link
Collaborator

thijsc commented Feb 7, 2018

A C rdkafka instance does not survive a fork. Producing or polling does not work and rd_kafka_destroy has a failing assertion that crashes the process when it is called via https://github.com/appsignal/rdkafka-ruby/blob/master/lib/rdkafka/config.rb#L149

A fix to not let rd_kafka_destroy crash in this scenario was added in librdkafka: confluentinc/librdkafka@8c67e42

We should move to this release when it's there end of Februari. For the failing produces and polls I see a few options:

  • Raise an exception if our pid changed
  • Recreate the rdkafka instance if our pid changed
  • Add a forked hook you need to call after forking, possibly with a Unicorn integration
@thijsc thijsc added the bug label Feb 7, 2018
@thijsc thijsc self-assigned this Feb 7, 2018
@thijsc thijsc added this to the Feature complete milestone Feb 7, 2018
@thijsc thijsc closed this as completed in 9882ce4 Aug 17, 2019
@agis
Copy link

agis commented Oct 24, 2019

@thijsc I'm trying producing from a forked process, but apparently nothing is written to the socket (i.e. nothing is produced).

require "rdkafka"

puts Rdkafka::LIBRDKAFKA_VERSION
puts Rdkafka::VERSION

config = {:"bootstrap.servers" => "kafka-a.vm.skroutz.gr:9092"}
producer = Rdkafka::Config.new(config).producer

fork do
  puts "Producing message"

  producer.produce(topic: "test-rb", payload: "foo").wait

  puts "this is never printed"
end

puts Process.wait

The script blocks at the call to wait, until the default timeout (60sec.) is reached:

$ ruby producer.rb
1.2.0
0.7.0
Producing message

^CTraceback (most recent call last):
        6: from producer.rb:9:in `<main>'
        5: from producer.rb:9:in `fork'
        4: from producer.rb:12:in `block in <main>'
        3: from /home/agis/.gem/ruby/2.6.3/gems/rdkafka-0.7.0/lib/rdkafka/producer/delivery_handle.rb:49:in `wait'
        2: from /home/agis/.gem/ruby/2.6.3/gems/rdkafka-0.7.0/lib/rdkafka/producer/delivery_handle.rb:49:in `loop'
        1: from /home/agis/.gem/ruby/2.6.3/gems/rdkafka-0.7.0/lib/rdkafka/producer/delivery_handle.rb:54:in `block in wait'
/home/hyu/.gem/ruby/2.6.3/gems/rdkafka-0.7.0/lib/rdkafka/producer/delivery_handle.rb:54:in `sleep': Interrupt
Traceback (most recent call last):
        1: from producer.rb:17:in `<main>'
producer.rb:17:in `wait': Interrupt

The forked process writes nothing to the socket (verified with tcpdump) and the consumer never sees any messages.

The forked process seems to be stuck in a loop of ppol and read syscalls (output from strace):

ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=0, tv_nsec=100000000}, NULL, 8) = 0 (Timeout)
read(3, 0x7ffc0ce16660, 8)              = -1 EAGAIN (Resource temporarily unavailable)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=0, tv_nsec=100000000}, NULL, 8) = 0 (Timeout)
read(3, 0x7ffc0ce16660, 8)              = -1 EAGAIN (Resource temporarily unavailable)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=0, tv_nsec=100000000}, NULL, 8) = 0 (Timeout)
read(3, 0x7ffc0ce16660, 8)              = -1 EAGAIN (Resource temporarily unavailable)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=0, tv_nsec=100000000}, NULL, 8) = 0 (Timeout)
read(3, 0x7ffc0ce16660, 8)              = -1 EAGAIN (Resource temporarily unavailable)

Is my script supposed to work or am I missing something?

Thanks!

P.S. I'm on Linux Debian with ruby 2.6.3p62.

@thijsc
Copy link
Collaborator Author

thijsc commented Oct 24, 2019

This indeed does not work, rdkafka does not survive a fork. It will work if you move producer = Rdkafka::Config.new(config).producer into the fork.

Not sure exactly sure still how to make this clear and user friendly, any thoughts on that?

@agis
Copy link

agis commented Oct 24, 2019

So there is no way to reuse a producer object across multiple forks?

9882ce4 gave me the impression that it should work (isn't that spec doing the same thing essentially?) If it doesn't, then that part of the README shouldn't be removed I guess(?)

Creating the producer after the fork defeats the purpose of what I'm trying to achieve: create a producer in the parent and reuse it across the children (e.g. this would be an ideal use case for Resque).

@agis
Copy link

agis commented Oct 24, 2019

After reading @edenhill's comment on https://github.com/edenhill/librdkafka/blob/master/tests/0079-fork.c#L37-L42, it's apparent that this use-case is not possible in librdkafka. Perhaps we should bring back the relevant README section informing that the client must be created after forking?

@thijsc
Copy link
Collaborator Author

thijsc commented Oct 24, 2019

Creating the producer after the fork defeats the purpose of what I'm trying to achieve: create a producer in the parent and reuse it across the children (e.g. this would be an ideal use case for Resque).

That's impossible I'm afraid. When you fork you create a separate Unix process. That process does not share state with the original one. Even if the producer survived the fork it would still be a copy in a separate process that's not being reused. I actually wrote a blog post about this a few years ago that might be good reading :-).

You'd have to create some setup with unix sockets for example to be able to communicate between them and reuse a single producer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

2 participants