Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port #1304 - Add event for recovering consumer #1305

Merged
merged 1 commit into from
Feb 23, 2023

Conversation

Zerpet
Copy link
Contributor

@Zerpet Zerpet commented Feb 23, 2023

Proposed Changes

Port of #1304 from 6.x branch to main. The port required some minor changes, as the internals have changed a bit since 6.x branched out.

Closes #1293.

Types of Changes

What types of changes does your code introduce to this project?
Put an x in the boxes that apply

  • Bug fix (non-breaking change which fixes issue #NNNN)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause an observable behavior change in existing systems)
  • Documentation improvements (corrections, new content, etc)
  • Cosmetic change (whitespace, formatting, etc)

Checklist

Put an x in the boxes that apply. You can also fill these out after creating
the PR. If you're unsure about any of them, don't hesitate to ask on the
mailing list. We're here to help! This is simply a reminder of what we are
going to look for before merging your code.

  • I have read the CONTRIBUTING.md document
  • I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
  • All tests pass locally with my changes
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)
  • Any dependent changes have been merged and published in related repositories

Further Comments

N/A

Related to #1293. In the context of Streams over AMQP,
the consumer must keep track of the consumer offset.
When a consumer subscribes to a stream-type queue, it
may provide a consumer argument to specify a "point"
in the stream to attach to.

This library records consumers, and their arguments,
for topology recovery purposes. When a consumer is
declared, it starts reading at an arbitrary point e.g.
offset=123. After receiving messages, the offset "moves"
forward. In the event of a connection recovery due to
e.g. network error, the consumer is re-declared with
the offset recorded when it was first declared i.e.
offset=123. This is not correct, because the consumer
has received some messages, and the offset value when
it was first declared is not accurate anymore.

This commit adds an event that fire when a consumer is
about to be recovered, but has not started recovering yet.
This event exposes the consumer arguments as a reference
type, allowing an event handler to update the consumer
offset, or any other consumer argument.

Signed-off-by: Aitor Perez Cedres <[email protected]>
@Zerpet Zerpet force-pushed the recoving-consumer-event branch from 8765be0 to e4974f6 Compare February 23, 2023 19:14
@Zerpet Zerpet marked this pull request as ready for review February 23, 2023 19:15
@Zerpet Zerpet added this to the 7.0.0 milestone Feb 23, 2023
@Zerpet
Copy link
Contributor Author

Zerpet commented Feb 23, 2023

Set milestone to 7.0.0 since this is merging to main. Feel free to change if that was not correct 🙂

@michaelklishin michaelklishin merged commit cbe6a8c into main Feb 23, 2023
@michaelklishin michaelklishin deleted the recoving-consumer-event branch February 23, 2023 19:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Recovery does not appear to save consumer arguments
2 participants