-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9449: Adds support for closing the producer's BufferPool. #7967
Conversation
retest this please |
@bdbyrne Thanks for the patch. I will take a closer look tomorrow. Would you mind creating a JIRA for this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Left a couple small comments.
clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
Outdated
Show resolved
Hide resolved
retest this please |
1 similar comment
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the patch!
The producer's BufferPool may block allocations if its memory limit has hit capacity. If the producer is closed, it's possible for the allocation waiters to wait for max.block.ms if progress cannot be made, even when force-closed (immediate), which can cause indefinite blocking if max.block.ms is particularly high. This patch fixes the problem by adding a `close()` method to `BufferPool`, which wakes up any waiters that have pending allocations and throws an exception. Reviewers: Jason Gustafson <[email protected]>
clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
Show resolved
Hide resolved
LGTM! Thanks for the quick PR @bdbyrne |
@@ -138,6 +147,9 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx | |||
recordWaitTime(timeNs); | |||
} | |||
|
|||
if (this.closed) | |||
throw new KafkaException("Producer closed while allocating memory"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bdbyrne @hachikuji Currently on Producer.send our javadoc mentioned "If a Kafka related error occurs that does not belong to the public API exceptions." for KafkaException and most callers default it to fatal. However if we consider the pattern where thread A blocked on send#bufferPool, and then thread B calls producer.close which would cause thread A to be unblocked by throwing a KafkaException to be a recommended pattern, should we use a different exception than KafkaException to differentiate it with other other fatal exceptions?
I'm thinking for Streams if we eventually want to move to this pattern, i.e. the stream thread blocked on producer.send
while the closing thread calls producer.close
then stream thread would throw KafkaException that in turn would be interpreted as fatal and then the stream thread tries to shutdown itself as "shutdown unclean" whereas here since we are indeed closing we should just proceed with "shutdown clean" --- of course this is still doable with some extra check but I'm wondering if such complexity would be universal for any callers like Streams.
Conflicts or compilation errors due to the fact that we temporarily reverted the commit that removes Scala 2.11 support: * AclCommand.scala: take upstream changes. * AclCommandTest.scala: take upstream changes. * TransactionCoordinatorTest.scala: don't use SAMs, but adjust mock call to putTransactionStateIfNotExists given new signature. * TransactionStateManagerTest: use Runnable instead of SAMs. * PartitionLockTest: use Runnable instead of SAMs. * docs/upgrade.html: take upstream changes excluding line that states that Scala 2.11 support has been removed. * apache-github/trunk: (28 commits) KAFKA-9457; Fix flaky test org.apache.kafka.common.network.SelectorTest.testGracefulClose (apache#7989) MINOR: Update AclCommand help message to match implementation (apache#7990) MINOR: Update introduction page in Kafka documentation MINOR: Use Math.min for StreamsPartitionAssignor#updateMinReceivedVersion method (apache#7954) KAFKA-9338; Fetch session should cache request leader epoch (apache#7970) KAFKA-9329; KafkaController::replicasAreValid should return error message (apache#7865) KAFKA-9449; Adds support for closing the producer's BufferPool. (apache#7967) MINOR: Handle expandIsr in PartitionLockTest and ensure read threads not blocked on write (apache#7973) MINOR: Fix typo in connect integration test class name (apache#7976) KAFKA-9218: MirrorMaker 2 can fail to create topics (apache#7745) KAFKA-8847; Deprecate and remove usage of supporting classes in kafka.security.auth (apache#7966) MINOR: Suppress DescribeConfigs Denied log during CreateTopics (apache#7971) [MINOR]: Fix typo in Fetcher comment (apache#7934) MINOR: Remove unnecessary call to `super` in `MetricConfig` constructor (apache#7975) MINOR: fix flaky StreamsUpgradeTestIntegrationTest (apache#7974) KAFKA-9431: Expose API in KafkaStreams to fetch all local offset lags (apache#7961) KAFKA-9235; Ensure transaction coordinator is stopped after replica deletion (apache#7963) KAFKA-9410; Make groupId Optional in KafkaConsumer (apache#7943) MINOR: Removed accidental double negation in error message. (apache#7834) KAFKA-6144: IQ option to query standbys (apache#7962) ...
…he#7967) The producer's BufferPool may block allocations if its memory limit has hit capacity. If the producer is closed, it's possible for the allocation waiters to wait for max.block.ms if progress cannot be made, even when force-closed (immediate), which can cause indefinite blocking if max.block.ms is particularly high. This patch fixes the problem by adding a `close()` method to `BufferPool`, which wakes up any waiters that have pending allocations and throws an exception. Reviewers: Jason Gustafson <[email protected]>
Committer Checklist (excluded from commit message)