Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add SplitDiscovery for use in split enumerator #14

Merged
merged 4 commits into from
Jul 23, 2021

Conversation

palmere-google
Copy link
Contributor

updated

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

@palmere-google palmere-google requested a review from a team as a code owner July 20, 2021 03:12
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Jul 20, 2021
TopicPath topicPath = TopicPath.parse(proto.getTopic());
long partitionCount =
currentSplits.stream()
.filter(s -> s.subscriptionPath().equals(subscriptionPath))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue you should fail if any of the subscription paths are wrong, but this is a nit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy with that, done

currentSplits.stream()
.filter(s -> s.subscriptionPath().equals(subscriptionPath))
.mapToLong(a -> a.partition().value())
.max()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is wrong. the max existing partition number != the count. partitions are 0 indexed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good catch. Fixed this and the test

}

static SingleSubscriptionSplitDiscovery fromCheckpoint(
SplitEnumeratorCheckpoint.Discovery proto,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you just put the saved partition count in this proto directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we can't. Checkpointing can be interleaved between split discovery and when the splits are actually turned over to the assigner. If we checkpoint the partition count, we might lose some splits if a checkpoint is taken after the splits are discovered but before they're given to the assigner.

Map<Partition, Offset> cursorMap;
try {
cursorMap = cursorClient.listPartitionCursors(subscriptionPath).get();
} catch (ExecutionException | InterruptedException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwable t

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

adminClient, cursorClient, topicPath, subscriptionPath, partitionCount);
}

public List<SubscriptionPartitionSplit> discoverSplits() throws ApiException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do methods on this class need to be synchronized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so -- discover splits is the only method which modifies state and it's never called concurrently, but at the same time, there's no performance consideration or anything, so I just made them synchronized

Copy link
Contributor

@dpcollins-google dpcollins-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixes required

SubscriptionPath subscriptionPath = SubscriptionPath.parse(proto.getSubscription());
TopicPath topicPath = TopicPath.parse(proto.getTopic());
long partitionCount = 0;
for (SubscriptionPartitionSplit s : currentSplits) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put them in a Set while going through this loop, and check on line 75 that every partition < partitionCount is present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

newPartitionCount = adminClient.getTopicPartitionCount(topicPath).get();
} catch (Throwable t) {
throw ExtractStatus.toCanonical(t).underlying;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. I'd just use a try{ block around the whole function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

import java.util.List;

interface SplitDiscovery extends AutoCloseable {
List<SubscriptionPartitionSplit> discoverSplits() throws ApiException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. discoverNewSplits

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@palmere-google palmere-google merged commit b68d2b4 into master Jul 23, 2021
@palmere-google palmere-google deleted the palmere-dev-discovery branch July 23, 2021 15:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants