-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: integration test for the psl sink and source #22
Conversation
6c930f1
to
f7008c6
Compare
.collect(Collectors.toList())) | ||
.get(); | ||
|
||
while (CollectSink.values().size() < 100) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fewer magic constants if you could- I think INTEGER_STRINGS.size()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's much clearer
|
||
Publisher<MessageMetadata> publisher = getPublisher(); | ||
|
||
StreamExecutionEnvironment env = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put in SetUp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
// A testing sink which stores messages in a static map to prevent them from being lost when | ||
// the sink is serialized. | ||
private static class CollectSink implements SinkFunction<String>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make its own class- independently useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do in another pr
// the sink is serialized. | ||
private static class CollectSink implements SinkFunction<String>, Serializable { | ||
// Note: doesn't store duplicates. | ||
private static final Set<String> collector = Collections.synchronizedSet(new HashSet<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Instead, I'd use a private static local HashSet, and add synchronized on the methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to nack this. It's a bit weird since there are some instance methods and some class methods - I think using synchronized invites someone to hold an object level lock when they should be holding the class lock
f7008c6
to
19dda49
Compare
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> ☕️