Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable key value byte stitching in PulsarMessageBatch #8897

Merged
merged 5 commits into from
Jul 7, 2022

Conversation

icefury71
Copy link
Contributor

@icefury71 icefury71 commented Jun 16, 2022

Add a config flag for Pulsar stream connector to enable key and value byte array stitching support in PulsarMessageBatch. This is important when ingesting from a Pulsar topic with a key value schema where the message data only includes value bytes and key bytes have to be retrieved separately.

By stitching, we allow higher layers (eg: decoder) to access both key and value in the same byte array.

Custom Decoder

When this flag is enabled, a custom decoder will be needed to extract Pinot GenericRow from the incoming key and value byte arrays. This involves decoupling the individual byte arrays and then using a corresponding decoder (eg: Avro or Json) to extract fields from the decoded key and value.

Release Notes

Adding support to stitch key and value bytes together in PulsarMessageBatch, controlled by a flag. Custom decoders are needed when this flag is enabled.

@icefury71 icefury71 requested review from KKcorps and npawar June 16, 2022 04:56
@codecov-commenter
Copy link

codecov-commenter commented Jun 16, 2022

Codecov Report

Merging #8897 (243f917) into master (f3bde9f) will decrease coverage by 6.61%.
The diff coverage is 65.10%.

❗ Current head 243f917 differs from pull request most recent head 19008be. Consider uploading reports for the commit 19008be to get more accurate results

@@             Coverage Diff              @@
##             master    #8897      +/-   ##
============================================
- Coverage     69.60%   62.99%   -6.62%     
+ Complexity     4997     4865     -132     
============================================
  Files          1806     1770      -36     
  Lines         94202    92714    -1488     
  Branches      14050    13943     -107     
============================================
- Hits          65571    58404    -7167     
- Misses        24072    30071    +5999     
+ Partials       4559     4239     -320     
Flag Coverage Δ
integration1 ?
integration2 ?
unittests1 66.26% <65.01%> (-0.12%) ⬇️
unittests2 15.01% <15.25%> (-0.42%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...roker/requesthandler/GrpcBrokerRequestHandler.java 0.00% <0.00%> (-80.96%) ⬇️
...main/java/org/apache/pinot/client/PinotDriver.java 0.00% <0.00%> (-59.62%) ⬇️
...ava/org/apache/pinot/client/utils/DriverUtils.java 4.39% <0.00%> (-6.72%) ⬇️
...inot/common/function/scalar/DateTimeFunctions.java 95.00% <ø> (-0.24%) ⬇️
.../java/org/apache/pinot/common/utils/DataTable.java 95.12% <ø> (ø)
...e/pinot/common/utils/FileUploadDownloadClient.java 18.18% <ø> (-40.91%) ⬇️
...pache/pinot/common/utils/grpc/GrpcQueryClient.java 0.00% <0.00%> (-80.86%) ⬇️
...roller/api/resources/PinotTaskRestletResource.java 0.00% <0.00%> (-4.06%) ⬇️
...lix/core/minion/PinotHelixTaskResourceManager.java 2.42% <0.00%> (-37.52%) ⬇️
...he/pinot/core/common/datablock/DataBlockUtils.java 84.21% <ø> (ø)
... and 555 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f3bde9f...19008be. Read the comment docs.

* Stitch key and value bytes together using a simple format:
* 4 bytes for key length + key bytes + 4 bytes for value length + value bytes
*/
private byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is the case for other stream connectors too. I don't think the decoders have access the message key or message headers today.

A more elegant approach maybe to use MessageBatch<StreamMessageType>, where StreamMessageType can contain payload and metadata. But I suspect you want to avoid a more invasive code change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that particular change will touch too many classes.

Copy link
Contributor

@npawar npawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change lgtm, but you might want to callout here and tag for release notes, that existing decoders won't work, along with guidelines on how to use this in that case?

@icefury71 icefury71 added the release-notes Referenced by PRs that need attention when compiling the next release notes label Jun 23, 2022
@icefury71
Copy link
Contributor Author

this change lgtm, but you might want to callout here and tag for release notes, that existing decoders won't work, along with guidelines on how to use this in that case?

Done

@KKcorps KKcorps merged commit 1bcc032 into apache:master Jul 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants