Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9338; Fetch session should cache request leader epoch #7970

Merged
merged 4 commits into from
Jan 17, 2020

Conversation

hachikuji
Copy link
Contributor

Since the leader epoch was not maintained in the fetch session cache, no validation would be done except for the initial (full) fetch request. This patch adds the leader epoch to the session cache and addresses the testing gaps.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ijuma ijuma requested a review from cmccabe January 16, 2020 14:53
var maxBytes: Int,
var fetchOffset: Long,
var highWatermark: Long,
var leaderEpoch: Optional[Integer],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like rather than use None for older fetch requests, we could just initialize the leaderEpoch to the current leader epoch. That way, we could at least detect epoch changes. Although we'd have to figure out how to handle the error-- probably just by dropping the session.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate a little bit? I think the validation only has value if it comes from the client making the request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. It's an interesting thought. The problem is we don't have an error that would force a follower to go through the truncation phase. We could force a retry and we could drop the session, but the follower would still be able to continue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we file a separate JIRA for this idea?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, I'm not sure it's worth it. Without true leader epoch awareness in the client, we don't have a way to force the client to refresh metadata and use a different leader, which is what we would need here. So maybe my initial idea was a bit half-baked-- sorry.

Copy link
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, what about you @cmccabe?

@cmccabe
Copy link
Contributor

cmccabe commented Jan 17, 2020

LGTM. Thanks

@hachikuji hachikuji merged commit faf46de into apache:trunk Jan 17, 2020
ijuma added a commit to confluentinc/kafka that referenced this pull request Jan 21, 2020
Conflicts or compilation errors due to the fact that we temporarily
reverted the commit that removes Scala 2.11 support:

* AclCommand.scala: take upstream changes.
* AclCommandTest.scala: take upstream changes.
* TransactionCoordinatorTest.scala: don't use SAMs, but adjust
mock call to putTransactionStateIfNotExists given new signature.
* TransactionStateManagerTest: use Runnable instead of SAMs.
* PartitionLockTest: use Runnable instead of SAMs.
* docs/upgrade.html: take upstream changes excluding line that
states that Scala 2.11 support has been removed.

* apache-github/trunk: (28 commits)
  KAFKA-9457; Fix flaky test org.apache.kafka.common.network.SelectorTest.testGracefulClose (apache#7989)
  MINOR: Update AclCommand help message to match implementation (apache#7990)
  MINOR: Update introduction page in Kafka documentation
  MINOR: Use Math.min for StreamsPartitionAssignor#updateMinReceivedVersion method (apache#7954)
  KAFKA-9338; Fetch session should cache request leader epoch (apache#7970)
  KAFKA-9329; KafkaController::replicasAreValid should return error message (apache#7865)
  KAFKA-9449; Adds support for closing the producer's BufferPool. (apache#7967)
  MINOR: Handle expandIsr in PartitionLockTest and ensure read threads not blocked on write (apache#7973)
  MINOR: Fix typo in connect integration test class name (apache#7976)
  KAFKA-9218: MirrorMaker 2 can fail to create topics (apache#7745)
  KAFKA-8847; Deprecate and remove usage of supporting classes in kafka.security.auth (apache#7966)
  MINOR: Suppress DescribeConfigs Denied log during CreateTopics (apache#7971)
  [MINOR]: Fix typo in Fetcher comment (apache#7934)
  MINOR: Remove unnecessary call to `super` in `MetricConfig` constructor (apache#7975)
  MINOR: fix flaky StreamsUpgradeTestIntegrationTest (apache#7974)
  KAFKA-9431: Expose API in KafkaStreams to fetch all local offset lags (apache#7961)
  KAFKA-9235; Ensure transaction coordinator is stopped after replica deletion (apache#7963)
  KAFKA-9410; Make groupId Optional in KafkaConsumer (apache#7943)
  MINOR: Removed accidental double negation in error message. (apache#7834)
  KAFKA-6144: IQ option to query standbys (apache#7962)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants