Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add the pubsub lite source and settings #17

Merged
merged 6 commits into from
Jul 27, 2021

Conversation

palmere-google
Copy link
Contributor

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

@palmere-google palmere-google requested a review from a team as a code owner July 23, 2021 20:23
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Jul 23, 2021
Copy link
Contributor

@dpcollins-google dpcollins-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of a higher level comment. Can we separate out the user interface from the implementation? I.E. create an "internal" folder and put anything users shouldn't touch in it

SourceReaderContext readerContext) throws Exception {
PubsubLiteDeserializationSchema<OUT> schema = settings.deserializationSchema();
schema.open(
new DeserializationSchema.InitializationContext() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this not exist as a utility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. At least I didn't find other sources using a utility to do this

new PubsubLiteRecordEmitter<>(),
settings.getCursorCommitter(),
settings.getSplitReaderSupplier(),
new Configuration(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm is this correct? Is this supposed to be retrieved from somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is at least intentional

The only place the the configuration object is used is by the source reader base class here:

https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java#L26

I don't really want to expose these settings to users, so I'm just passing in an empty config so we'll use the default value

@palmere-google
Copy link
Contributor Author

This is a bit of a higher level comment. Can we separate out the user interface from the implementation? I.E. create an "internal" folder and put anything users shouldn't touch in it

Yeah, that makes sense. I'll move everything in a separate pr

@AutoValue
public abstract class PubsubLiteSourceSettings<OutputT> implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(PubsubLiteSourceSettings.class);
private static final long serialVersionUID = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pick a "random" value. I.E. pound the number keys

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


public abstract Boundedness boundedness();

public abstract MessageTimestampExtractor timestampSelector();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add an // Optional comment on these that have defaults and add comments on each setting's getter and setter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


public abstract FlowControlSettings flowControlSettings();

public abstract Boundedness boundedness();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make this optional and unbounded by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@palmere-google palmere-google merged commit 5e1bb41 into master Jul 27, 2021
@palmere-google palmere-google deleted the palmere-dev-source branch July 27, 2021 00:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants