-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transactional Producer (full EOS, KIP-98) #2605
Conversation
e736138
to
9a305d1
Compare
c93dbed
to
fa7804c
Compare
CONFIGURATION.md
Outdated
@@ -113,6 +113,8 @@ offset_commit_cb | C | | | |||
enable.partition.eof | C | true, false | false | low | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition. <br>*Type: boolean* | |||
check.crcs | C | true, false | false | medium | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage. <br>*Type: boolean* | |||
client.rack | * | | | low | A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config `broker.rack`. <br>*Type: string* | |||
transactional.id | P | | | high | The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. Note that enable.idempotence must be enabled if a TransactionalId is configured. The default is null, which means transactions cannot be used. Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting transaction.state.log.replication.factor. <br>*Type: string* | |||
transaction.timeout.ms | P | 1000 .. 2147483647 | 60000 | medium | The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the broker, the init_transactions() call will fail with ERR_INVALID_TRANSACTION_TIMEOUT. The transaction timeout must be at least 1000 ms larger than `message.timeout.ms` and `socket.timeout.ms`. <br>*Type: integer* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'setting in the broker' -> 'setting on the broker' i think
INTRODUCTION.md
Outdated
|
||
#### FIXME: misc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you probably want to do a search for 'FIXME'.
* committed successfully. | ||
* | ||
* The offsets should be the next message your application will consume, | ||
* i.e., the last processed message's offset + 1 for each partition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is really dangerous - people are going to screw this up by default because everyone expects committed offset to be the last messages successfully read (and it's what you naturally have in your code without doing any arithmetic, which people won't do by default unless they're thinking hard about it, which they won't be). when offsets are committed using the consumer, this isn't such a big deal because the consumer does the calculation for you behind the scenes (for most variants). But in this case, the user will be in the loop.
given this, i think in the higher level bindings we should have an overload of SendOffsetsToTransaction
which accepts a consumer instance. the java producer doesn't have this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hear you, but we have existing offset interfaces that take the actual offset (commit, store_offsets), without doing +1, so we need to be careful with being consistent in this regard.
Best thing would be if users did not have to care about individual partitions and offsets, but instead, as you propose, provide a helper method taking a consumer.
In fact, we really need three things:
- set start-of-transaction snapshot/restore point
- commit/send-offsets for transaction
- rewind to snapshot/restore point on transaction abort
Which would allow you to do something along the lines of:
while the_truth:
producer.begin_transaction()
consumer.set_restorepoint()
try:
while time_in_txn < max_time_per_txn:
input = consumer.poll(remaining_time)
if consumer.is_rebalancing():
producer.abort_transaction()
consumer.clear_restorepoint()
continue outer_loop
output = process(input)
producer.produce(output)
producer.commit_offsets(consumer)
producer.commit_transaction()
except AbortableError:
producer.abort_transaction()
consumer.rewind_to_restorepoint()
continue outer_loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to expose rd_kafka_is_transactional
for such cases so we can fail fast on eos consumer instantiation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not terribly clear to me from the mega design doc what happens in the event of a rebalance when a transaction is still open. Do we need to abort as implied by the snipped above, or is there a way for a consumer to pick up the last offset sent to a transaction for a given partition upon assignment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to expose rd_kafka_is_transactional for such cases so we can fail fast on eos consumer instantiation.
Can you elaborate on this? Where and how would it be used?
* (set \c enable.auto.commit to false on the consumer). | ||
* | ||
* @remark Logical and invalid offsets (such as RD_KAFKA_OFFSET_INVALID) in | ||
* \p offsets will be ignored, if there are no valid offsets in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haven't looked at the code yet to check actual behavior, but I think this should fail immediately if there are any logic offsets (and this docstring updated accordingly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea here is that you should be able to pass the current position() directly to this function, and some partitions may not have been consumed at all (due to emptyness, no messages, paused, whatever) and thus may have a logical current position.
And add 'make unit' test target
They're not in the public header files so will not be picked up automatically.
…peration timeout)
This completes support for Kafka EOS as specified in KIP-98.
- Set broker up / down - Create topic through mock API - Explicitly set coordinator - Return ERR_NOT_COORDINATOR when txn request is sent to non-coordinator.
..since that error is already properly handled throughout the code. The previous ERR__NODE_UPDATE would need special handling everywhere, and all the node update does is trigger a reconnect (which is a TRANSPORT failure, more or less).
Moves the fatal-induced producer queue purges to the rdkafka main thread to avoid locking issues in deep transaction call stacks.
This allow broker_any_up() to be used consistently along with broker_up_cnt
🎉 glad to see this merged! Thanks for the awesome work! |
This is work in progress.
Current status:
Review guide:
Miscellaneous: