Skip to content

Commit

Permalink
Enable test for producing in forked process
Browse files Browse the repository at this point in the history
This used to crash, but does not anymore. Seems like this is now
supported in librdkafka, although I could not find a pull request that
changes the behaviour. The spec is working flawlessly, so as long as it
stays green we can assume this works.

Fixes #19
  • Loading branch information
thijsc committed Aug 17, 2019
1 parent 45055f1 commit 9882ce4
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 56 deletions.
7 changes: 0 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ end
delivery_handles.each(&:wait)
```

## Known issues

When using forked process such as when using Unicorn you currently need
to make sure that you create rdkafka instances after forking. Otherwise
they will not work and crash your Ruby process when they are garbage
collected. See https://github.com/appsignal/rdkafka-ruby/issues/19

## Development

A Docker Compose file is included to run Kafka and Zookeeper. To run
Expand Down
92 changes: 43 additions & 49 deletions spec/rdkafka/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -280,55 +280,49 @@
end
end

# TODO this spec crashes if you create and use the producer before
# forking like so:
#
# @producer = producer
#
# This will be added as part of https://github.com/appsignal/rdkafka-ruby/issues/19
#it "should produce a message in a forked process" do
# # Fork, produce a message, send the report of a pipe and
# # wait for it in the main process.

# reader, writer = IO.pipe

# fork do
# reader.close

# handle = producer.produce(
# topic: "produce_test_topic",
# payload: "payload",
# key: "key"
# )

# report = handle.wait(5)
# producer.close

# report_json = JSON.generate(
# "partition" => report.partition,
# "offset" => report.offset
# )

# writer.write(report_json)
# end

# writer.close

# report_hash = JSON.parse(reader.read)
# report = Rdkafka::Producer::DeliveryReport.new(
# report_hash["partition"],
# report_hash["offset"]
# )

# # Consume message and verify it's content
# message = wait_for_message(
# topic: "produce_test_topic",
# delivery_report: report
# )
# expect(message.partition).to eq 1
# expect(message.payload).to eq "payload"
# expect(message.key).to eq "key"
#end
it "should produce a message in a forked process" do
# Fork, produce a message, send the report over a pipe and
# wait for and check the message in the main process.

reader, writer = IO.pipe

fork do
reader.close

handle = producer.produce(
topic: "produce_test_topic",
payload: "payload-forked",
key: "key-forked"
)

report = handle.wait(5)
producer.close

report_json = JSON.generate(
"partition" => report.partition,
"offset" => report.offset
)

writer.write(report_json)
end

writer.close

report_hash = JSON.parse(reader.read)
report = Rdkafka::Producer::DeliveryReport.new(
report_hash["partition"],
report_hash["offset"]
)

# Consume message and verify it's content
message = wait_for_message(
topic: "produce_test_topic",
delivery_report: report
)
expect(message.partition).to eq 0
expect(message.payload).to eq "payload-forked"
expect(message.key).to eq "key-forked"
end

it "should raise an error when producing fails" do
expect(Rdkafka::Bindings).to receive(:rd_kafka_producev).and_return(20)
Expand Down

1 comment on commit 9882ce4

@mensfeld
Copy link
Member

@mensfeld mensfeld commented on 9882ce4 Aug 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit crashes also specs on 2.7.0-dev1.

Please sign in to comment.