-
Notifications
You must be signed in to change notification settings - Fork 345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support Cumulative ack #183
Comments
The cumulative ack was left out on purpose and the reasoning was (in weighted order):
|
I would like to chime in that there is a use-case this would be most useful: batch consumers. For example, if I want to make an every 6h/12h/24h report or bundle from a topic. To do this with a normal subscription, I would need to retain a potentially-unbounded set of messages (or at least message IDs) in memory to ack them all when complete, which is wasteful. The other method (and the one I am currently using) is to use a reader, however this requires me to store my last offset in external state between job runs. With cumulative ack, I could accomplish this entirely within the pulsar ecosystem by simply resuming my subscription and then exiting the loop after an idle period or when I begin to receive messages within a certain small distance of job start time. |
@flowchartsman In your example there would be no need to keep the message ids, you could use a Reader and do SeekByTime():
|
Yes, you're right. I suppose I could just say "this job runs at somewhere around midnight and starts from 6pm. and then again at 6 am and starts from midnight" and then break when I reach messages on or after my "run" time. I wasn't correctly recalling the seek to time functionality because we had to avoid using it initially because of stability issues arising from starting with latestmessageid and seeking backwards, but if those are resolved now, you're right, that's probably best. |
Cumulative commit seems to be a reasonable feature. |
Hi, @merlimat , I think comulative ack needs to be supported in go client.
In Java client, I think this point has no problem. Because in the default ack mode of Java client, the ack requests will be pushed to the pending queue and flush async. But in the go client, only one message will be acknowledged per acknowledgment request. Which means cumulative ack will save the ack rpc times. pulsar-client-go/pulsar/consumer_partition.go Lines 684 to 695 in 48c39ee
Just two more interface should be added. // Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
AckCumulative(Message) error
// Acknowledge the reception of all the messages in the stream up to (and including) the provided message, identified by its MessageID
AckIDCumulative(MessageID) error
I think the implementation of cumulative is not complicated. It's just need to modify the ack command, which add
In some scenarios, the lack of cumulative ack will bring great trouble to users.
As @xiaofan-luan shows, cumulative ack actually provides a way for users to decide when their messages are acknowledged by default. For example, a user need to consume 10000 messages continuously. The user may discard all previous messages because the 8000th message is wrong. In this scenario, if cumulative ack is introduced, the user can call cumulative ack directly with the 8000th message and start to receive new message sequence. Caching message ids here I don't think is a good idea. If cumulative ack is considered to be introduced, I will implement it. @RobertIndie @nodece Could you give some comments? Thanks. |
Hi @Gleiphir2769 , Thanks for bringing this up. Overall, I'm +1 for implementing this feature.
Right, supporting the cumulative ack will also improve the performance. But We could also implement the ack group tracker to aggregate serval ack(individual or cumulative) into one RPC. This could improve the performance of the individual ack.
This case makes sense to me. +1 for implementing this feature. |
Master Issue: #183 ### Motivation Cumulative acknowledgement is a useful feature. Users can use this feature to ack messages in the stream up to (and including) provided message. Issue #183 shows more details. ### Modifications - Add two api `AckCumulative` and `AckIDCumulative` for `Consumer`. ``` golang // AckCumulative the reception of all the messages in the stream up to (and including) // the provided message. AckCumulative(msg Message) error // AckIDCumulative the reception of all the messages in the stream up to (and including) // the provided message, identified by its MessageID AckIDCumulative(msgID MessageID) error ``` - Add the `AckCumulative` and `AckIDCumulative` implementation for `consumer`, `multiTopicConsumer`, `regexConsumer` and `mockConsumer`. - Add the unit test `TestConsumerNoBatchCumulativeAck` `TestConsumerBatchCumulativeAck` `TestCumulativeAckWithResponse` for cumulative ack in `consumer_test.go`.
I think grouping acks feature has been supported in PR(#957) |
Closed this as implemented in #903 |
pulsar-client-go use CommandAck_Individual now, and we can subscrib a topic in exclusive, so add CommandAck_Cumulative is usefull.
and it seems not very difficult, may is it already in your scheduler?
The text was updated successfully, but these errors were encountered: