-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add the DeserializingSplitReader #11
Conversation
src/main/java/com/google/cloud/pubsublite/flink/reader/DeserializingSplitReader.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/cloud/pubsublite/flink/reader/DeserializingSplitReader.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/cloud/pubsublite/flink/MessageTimestampExtractor.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/cloud/pubsublite/flink/reader/DeserializingSplitReader.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/cloud/pubsublite/flink/reader/DeserializingSplitReader.java
Outdated
Show resolved
Hide resolved
* @param message The pub/sub lite message | ||
* @return The deserialized message as an object (null if the message cannot be deserialized). | ||
*/ | ||
T deserialize(SequencedMessage message) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mark as @Nullable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
String split = entry.getKey(); | ||
SequencedMessage message = entry.getValue(); | ||
try { | ||
T value = schema.deserialize(message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mark as @Nullable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
public abstract Offset offset(); | ||
|
||
public abstract long timestamp(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use Instant until you absolutely need to convert to millis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
}; | ||
} | ||
|
||
long timestampMillis(SequencedMessage m) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to "Instant timestamp" since we're just passing this to our own type. Only transform to millis at the last possible moment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's way better. I dunno what I was doing passing longs around. Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending fixes
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> ☕️