-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add SplitDiscovery for use in split enumerator #14
Conversation
updated
TopicPath topicPath = TopicPath.parse(proto.getTopic()); | ||
long partitionCount = | ||
currentSplits.stream() | ||
.filter(s -> s.subscriptionPath().equals(subscriptionPath)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd argue you should fail if any of the subscription paths are wrong, but this is a nit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy with that, done
currentSplits.stream() | ||
.filter(s -> s.subscriptionPath().equals(subscriptionPath)) | ||
.mapToLong(a -> a.partition().value()) | ||
.max() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is wrong. the max existing partition number != the count. partitions are 0 indexed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, good catch. Fixed this and the test
} | ||
|
||
static SingleSubscriptionSplitDiscovery fromCheckpoint( | ||
SplitEnumeratorCheckpoint.Discovery proto, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you just put the saved partition count in this proto directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately we can't. Checkpointing can be interleaved between split discovery and when the splits are actually turned over to the assigner. If we checkpoint the partition count, we might lose some splits if a checkpoint is taken after the splits are discovered but before they're given to the assigner.
...main/java/com/google/cloud/pubsublite/flink/enumerator/SingleSubscriptionSplitDiscovery.java
Outdated
Show resolved
Hide resolved
Map<Partition, Offset> cursorMap; | ||
try { | ||
cursorMap = cursorClient.listPartitionCursors(subscriptionPath).get(); | ||
} catch (ExecutionException | InterruptedException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwable t
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
adminClient, cursorClient, topicPath, subscriptionPath, partitionCount); | ||
} | ||
|
||
public List<SubscriptionPartitionSplit> discoverSplits() throws ApiException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do methods on this class need to be synchronized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so -- discover splits is the only method which modifies state and it's never called concurrently, but at the same time, there's no performance consideration or anything, so I just made them synchronized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixes required
SubscriptionPath subscriptionPath = SubscriptionPath.parse(proto.getSubscription()); | ||
TopicPath topicPath = TopicPath.parse(proto.getTopic()); | ||
long partitionCount = 0; | ||
for (SubscriptionPartitionSplit s : currentSplits) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you put them in a Set while going through this loop, and check on line 75 that every partition < partitionCount is present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
newPartitionCount = adminClient.getTopicPartitionCount(topicPath).get(); | ||
} catch (Throwable t) { | ||
throw ExtractStatus.toCanonical(t).underlying; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. I'd just use a try{ block around the whole function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
import java.util.List; | ||
|
||
interface SplitDiscovery extends AutoCloseable { | ||
List<SubscriptionPartitionSplit> discoverSplits() throws ApiException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. discoverNewSplits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
updated
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> ☕️