Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

using msg_opaque in Producer::producer #115

Closed
ghost opened this issue May 9, 2014 · 8 comments
Closed

using msg_opaque in Producer::producer #115

ghost opened this issue May 9, 2014 · 8 comments

Comments

@ghost
Copy link

ghost commented May 9, 2014

Hallo,

Is it possible to explain how I can use the msg_opague in Producer::procuder to see if a message is send to kafka when request.required.acks=1 ?
For each message I have a callback function and if the message is delivered to kafka I want to call this callback function. I suppose that I have to use DeliveryReportCb and in RdKafka::DeliveryReportCb::dr_cb I have to call the callback function for that message.

Regards

@edenhill
Copy link
Contributor

edenhill commented May 9, 2014

Hi,

so what you do is:

  • set request.required.acks to anything but 0 (e.g., -1 or 1).
  • Register a DeliveryReportCb
  • ..:produce() your message, you can provide a pointer to your own stuff in msg_opaque.
  • Call ..:poll(), which will eventually trigger the automatic calling of your registered DeliveryReportCb when the message has either been delivered to the broker, or failed (usually by timeout). The msg_opaque as you specified to ..:produce() will be provided to you in the DeliveryReportCb.

Does that answer your question?

Note that you are allowed to call ..:produce() and ..poll() from different threads if you want to.

@ghost
Copy link
Author

ghost commented May 9, 2014

So for doing it asynchronous I need to use another thread that does the
poll() request

On Fri, May 9, 2014 at 2:15 PM, Magnus Edenhill [email protected]:

Hi,

so what you do is:

  • set request.required.acks to anything but 0 (e.g., -1 or 1).
  • Register a DeliveryReportCb
  • ..:produce() your message
  • Call ..:poll(), which will eventually trigger the automatic calling
    of your registered DeliveryReportCb when the message has either been
    delivered to the broker, or failed (usually by timeout).

Does that answer your question?

Note that you are allowed to call ..:produce() and ..poll() from
different threads if you want to.


Reply to this email directly or view it on GitHubhttps://github.com//issues/115#issuecomment-42659543
.

@edenhill
Copy link
Contributor

edenhill commented May 9, 2014

It is really up to your program design.
The produce() call is asynchronous, it will return immediately.

For programs that expect a steady flow of produce() calls I usually just put a poll() right after the produce(). That poll() call will not serve the deliveryreports of the message just produced, but likely from previous produce() calls.

E.g.:

  while (run) {
     produce();
     poll(0);
  }

   // Wait for outstanding messages to be delivered
  while (outq_len() > 0)
    poll(10);

@ghost
Copy link
Author

ghost commented May 12, 2014

Hello Magnus,

If I call the producer with a msg_opaque and I call Message::msg_opaque()
in the DeliveryReportCb then I get a wrong address back. I get an address
not equal to one of the msg_opaque that I have use in the send of the
producer

Chris

On Fri, May 9, 2014 at 2:33 PM, Magnus Edenhill [email protected]:

It is really up to your program design.
The produce() call is asynchronous, it will return immediately.

For programs that expect a steady flow of produce() calls I usually just
put a poll() right after the produce(). That poll() call will not serve the
deliveryreports of the message just produced, but likely from previous
produce() calls.

E.g.:

while (run) {
produce();
poll(0);
}

// Wait for outstanding messages to be delivered
while (outq_len() > 0)
poll(10);


Reply to this email directly or view it on GitHubhttps://github.com//issues/115#issuecomment-42660761
.

@ghost
Copy link
Author

ghost commented May 12, 2014

It seems that I always get the same pointer in the callback

On Mon, May 12, 2014 at 2:33 PM, Chris Herssens [email protected]:

Hello Magnus,

If I call the producer with a msg_opaque and I call Message::msg_opaque()
in the DeliveryReportCb then I get a wrong address back. I get an address
not equal to one of the msg_opaque that I have use in the send of the
producer

Chris

On Fri, May 9, 2014 at 2:33 PM, Magnus Edenhill [email protected]:

It is really up to your program design.
The produce() call is asynchronous, it will return immediately.

For programs that expect a steady flow of produce() calls I usually just
put a poll() right after the produce(). That poll() call will not serve the
deliveryreports of the message just produced, but likely from previous
produce() calls.

E.g.:

while (run) {
produce();
poll(0);
}

// Wait for outstanding messages to be delivered
while (outq_len() > 0)
poll(10);


Reply to this email directly or view it on GitHubhttps://github.com//issues/115#issuecomment-42660761
.

@edenhill
Copy link
Contributor

Thanks for spotting this bug, it has now been fixed in master branch.
Can you update and verify the fix?

Thank you

@ghost
Copy link
Author

ghost commented May 12, 2014

it works.
Thanks for the quick response and fix

On Mon, May 12, 2014 at 3:06 PM, Magnus Edenhill
[email protected]:

Thanks for spotting this bug, it has now been fixed in master branch.
Can you update and verify the fix?

Thank you


Reply to this email directly or view it on GitHubhttps://github.com//issues/115#issuecomment-42829087
.

@edenhill
Copy link
Contributor

Great!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant