Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract record keys, headers and metadata from Stream sources #9224

Merged
merged 11 commits into from
Sep 28, 2022

Conversation

navina
Copy link
Contributor

@navina navina commented Aug 16, 2022

Design doc: https://docs.google.com/document/d/1kTUfBud1SBSh_703mvu6ybkbIwiKKH9eXpdcpEmhC2E/edit

This is an extension of PR #9096

Motivation

Most stream systems provide a message envelope, which encapsulates the record payload, along with record headers, keys and other system-specific metadata For e.g:

  1. Kafka allows keyed records and additionally, provides headers
  2. Kinesis requires keyed records and includes some additional metadata such as sequenceId etc
  3. Pulsar also supports keyed records and allows including arbitrary properties.
  4. Pubsub supports keyed messages, along with user-defined attributes and message metadata.

Today, Pinot drops everything from the payload, other than the record value itself. Hence, there needs to be a way to extract these values and present them in the Pinot table as regular columns (of course, it has to be defined in the pinot schema).

This can be very useful for the Pinot user as they don't have to "pre-process" the stream to make the record metadata available in the data payload. It also prevents custom solutions (such as this).

Context

Want to clarify the terminology here. Typically, in most streaming systems, a record is composed of the following:

  1. Record key - usually, a string, although kafka allows any type (today, pinot-kafka connector assumes the key to always be a key)
  2. Record value - actual data paylaod. Pinot extract only this value and decodes it.
  3. Record headers - these are user-defined record header that can be specific to the publishing application. Typically, headers are meant to be efficient and small. For example, in Kafka , it allows <String, byte[]>. technically, byte[] can be anything and we can make a call on whether to support arbitrary header value types or not.
  4. Record Metadata - these may or may not be included in the record payload and it is system-defined. For example, for message identifiers, kinesis has sequenceId, kafka has offset, pubsub has messageId etc. While these may not be useful for the user-facing application, it comes-in handy for debugging.

What does this PR do?

This PR attempts to extract key, header and other metadata from any supported streaming connector. This feature is opt-in, meaning it can be enabled by setting stream.$streamType.metadata.populate as true

please note: Documentation for this feature will follow after this PR is merged.

For Reviewers, things to discuss:

  1. In the current patch, the record key (when available) is extracted as __key column , where as headers are extracted as header$<HEADER_KEY_NAME> . Does this sound like a good convention to follow for all stream connectors -> Header columns will always be prefixed with header$ and any other metadata such as key or offset will be prefixed as __
  2. In MessageBatch, I have marked one of the methods as @Deprecated as I am hoping to eventually eliminate the need for typed interface there. The current changes are backwards compatible. Let me know if there is a better way.

@navina navina force-pushed the extract-kv-headers branch from fbc8001 to 921ca41 Compare August 17, 2022 20:34
@codecov-commenter
Copy link

codecov-commenter commented Aug 18, 2022

Codecov Report

Merging #9224 (92de7ca) into master (83b7f15) will increase coverage by 0.02%.
The diff coverage is 61.80%.

@@             Coverage Diff              @@
##             master    #9224      +/-   ##
============================================
+ Coverage     69.89%   69.91%   +0.02%     
- Complexity     4742     4822      +80     
============================================
  Files          1910     1914       +4     
  Lines        101787   101886      +99     
  Branches      15445    15457      +12     
============================================
+ Hits          71139    71231      +92     
- Misses        25628    25630       +2     
- Partials       5020     5025       +5     
Flag Coverage Δ
integration1 26.10% <18.05%> (+0.06%) ⬆️
integration2 24.79% <20.13%> (+0.08%) ⬆️
unittests1 67.18% <67.05%> (+0.03%) ⬆️
unittests2 15.54% <18.75%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...lugin/stream/kafka20/server/KafkaDataProducer.java 33.33% <0.00%> (-24.36%) ⬇️
...rg/apache/pinot/spi/stream/StreamDataProducer.java 32.35% <0.00%> (-11.65%) ⬇️
.../pinot/common/function/scalar/StringFunctions.java 73.14% <14.28%> (-4.31%) ⬇️
...inot/plugin/stream/kafka20/KafkaStreamMessage.java 50.00% <40.00%> (ø)
.../java/org/apache/pinot/spi/stream/RowMetadata.java 40.00% <40.00%> (ø)
...pinot/plugin/stream/kafka20/KafkaMessageBatch.java 68.75% <50.00%> (-24.11%) ⬇️
.../plugin/stream/kafka20/KafkaMetadataExtractor.java 82.35% <82.35%> (ø)
...apache/pinot/spi/stream/StreamDataDecoderImpl.java 82.60% <82.60%> (ø)
...in/stream/kafka20/KafkaPartitionLevelConsumer.java 72.72% <85.71%> (+1.29%) ⬆️
...manager/realtime/LLRealtimeSegmentDataManager.java 69.98% <100.00%> (-0.24%) ⬇️
... and 44 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@navina navina force-pushed the extract-kv-headers branch 2 times, most recently from 616bed1 to 881b29e Compare August 18, 2022 18:11
@navina navina marked this pull request as ready for review August 18, 2022 18:17
@walterddr
Copy link
Contributor

is there a backward compatibility story for this change.

  • if the header is broken but somehow the payload is ok. what's the behavior before/after this change
  • if header doesn't agree with the pre-process info, which one should we trust?
  • if payload doesn't agree with the pre-process info but align with header, which one should we trust?

another question ii what's the performance cost of skipping headers vs parsing headers? do we have any benchmarks?

@navina
Copy link
Contributor Author

navina commented Aug 18, 2022

if the header is broken but somehow the payload is ok. what's the behavior before/after this change

Not sure if I follow what you mean by " header is broken "? Today, if a record cannot be decoded, it throws an exception and stops ingestion. The behavior will remain the same.

if header doesn't agree with the pre-process info, which one should we trust?
if payload doesn't agree with the pre-process info but align with header, which one should we trust?

I think the PR description needs more details on what kind of pre-processing I was referring to. I was referring to pre-processing on the source that is outside of Pinot.

another question ii what's the performance cost of skipping headers vs parsing headers? do we have any benchmarks?

There shouldn't be any performance cost for parsing headers as typically, when the headers are returned via the client API, they are already decoded. If the user-specified headers are non-primitives, then it will add to the cost. But in my current implementation, I haven't even added support for a header value decoder. It assumes that header key is a string and the value is a primitive. this is a good question. I will add more details in the PR description.

@navina
Copy link
Contributor Author

navina commented Aug 19, 2022

Let's continue any design related discussions in the design doc
Thanks!

@navina navina force-pushed the extract-kv-headers branch from 881b29e to 6a5de4c Compare August 19, 2022 22:45
@npawar
Copy link
Contributor

npawar commented Sep 15, 2022

Design doc: https://docs.google.com/document/d/1kTUfBud1SBSh_703mvu6ybkbIwiKKH9eXpdcpEmhC2E/edit

This is an extension of PR #9096

Motivation

Most stream systems provide a message envelope, which encapsulates the record payload, along with record headers, keys and other system-specific metadata For e.g:

  1. Kafka allows keyed records and additionally, provides headers
  2. Kinesis requires keyed records and includes some additional metadata such as sequenceId etc
  3. Pulsar also supports keyed records and allows including arbitrary properties.
  4. Pubsub supports keyed messages, along with user-defined attributes and message metadata.

Today, Pinot drops everything from the payload, other than the record value itself. Hence, there needs to be a way to extract these values and present them in the Pinot table as regular columns (of course, it has to be defined in the pinot schema).

This can be very useful for the Pinot user as they don't have to "pre-process" the stream to make the record metadata available in the data payload. It also prevents custom solutions (such as this).

Context

Want to clarify the terminology here. Typically, in most streaming systems, a record is composed of the following:

  1. Record key - usually, a string, although kafka allows any type (today, pinot-kafka connector assumes the key to always be a key)
  2. Record value - actual data paylaod. Pinot extract only this value and decodes it.
  3. Record headers - these are user-defined record header that can be specific to the publishing application. Typically, headers are meant to be efficient and small. For example, in Kafka , it allows <String, byte[]>. technically, byte[] can be anything and we can make a call on whether to support arbitrary header value types or not.
  4. Record Metadata - these may or may not be included in the record payload and it is system-defined. For example, for message identifiers, kinesis has sequenceId, kafka has offset, pubsub has messageId etc. While these may not be useful for the user-facing application, it comes-in handy for debugging.

What does this PR do?

This PR attempts to extract key, header and other metadata from any supported streaming connector. This feature is opt-in, meaning it can be enabled by setting stream.$streamType.metadata.populate as true

please note:

  1. I am in the process of adding some unit tests. I have tested with a pinot realtime quickstart. Need to do some more cleanup.
  2. For whatever reason, the integration tests fail in the CI pipeline here, where as it runs fine on my laptop. Still fixing forward.
  3. Documentation for this feature will follow after this PR is merged.

For Reviewers, things to discuss:

  1. In the current patch, the record key (when available) is extracted as __key column , where as headers are extracted as header$<HEADER_KEY_NAME> . Does this sound like a good convention to follow for all stream connectors -> Header columns will always be prefixed with header$ and any other metadata such as key or offset will be prefixed as __
  2. In MessageBatch, I have marked one of the methods as @Deprecated as I am hoping to eventually eliminate the need for typed interface there. The current changes are backwards compatible. Let me know if there is a better way.

Would prefer if we're able to keep it all consistent in terms of the prefix (if going with __, then __key, __header$headerName, __metadata)

import javax.annotation.Nullable;
import org.apache.pinot.spi.data.readers.GenericRow;


/**
* A class that provides metadata associated with the message of a stream, for e.g.,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a little confusing, since the design doc called out headers and metadata separately, but here we're creating a StreamMessageMetadata object that contains metadata and headers. We should at least update the javadoc if we're going with putting these 2 together in this class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. I will update the doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer if we're able to keep it all consistent in terms of the prefix (if going with __, then __key, __header$headerName, __metadata)

Makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separated the metadata and headers in StreamMessageMetadata into 2 separate fields.

@navina navina force-pushed the extract-kv-headers branch 2 times, most recently from ff91597 to 97e2f30 Compare September 19, 2022 11:15
@navina navina requested a review from npawar September 20, 2022 04:51
@navina navina force-pushed the extract-kv-headers branch 2 times, most recently from 7092d29 to ef5549f Compare September 23, 2022 07:55
@navina
Copy link
Contributor Author

navina commented Sep 23, 2022

Drew out the class diagram for easy reference. RowMetadata seems redundant. But I think we can keep it around for now.

stream-message-apis

@mcvsubbu
Copy link
Contributor

We would like to review this. thanks

StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
RowMetadata msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
if (decodedRow.getException() != null) {
// TODO: handle exception as we do today - do we silently drop the record or throw exception?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This area is a little wild. There were a couple of PRs before as well, that kind of made it even more messy (someone needed decoder exceptions to retry to fetch new schema). It will be great if you can add Javadocs as appropriate in the decoder interfaces that clearly indicates how the exceptions will be treated by this class.

Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like, we increment a metric today when an exception is thrown by the decoder itself or if null is returned. I am leaving this part unchanged for now.
In a follow-up PR, I think we can elegantly handle decode errors for all supported streams - my proposal would be to use a config to decide whether to re-throw exception or skip the record.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you are defining the StreamDataDecoderResult now to contain exceptions, don't you think it is better than we introduce a class of exceptions there -- ones in which we need to retry and others in which we don't? Of course, we can always change it later.

I am fine if you are handling the exception stuff in another PR, and are open to changing the definition of StreamDataDecoderResult if it makes sense.

Copy link
Contributor Author

@navina navina Sep 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am open to introducing a class of retriable and non-retriable exceptions if we can clearly articulate them in a manner that is common to all decoders. As of today, I only see this to be the case in avro decoder. Hence, for now, I would leave this retry capability within the individual decoders and not surface it in LLRealtimeSegmentManager.

We can iterate on it if the need arises in the future.

*/
public final class StreamDataDecoderResult {
private final GenericRow _result;
private final Exception _exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have an exception declared here. I see that the decode() operation is guaranteed not to throw an exception. You may also want to add a comment on the decode() interface that it is NOT supposed to throw an exception (just to guard against someone changing it).

Also, instead of a generic exception, I suggest that we define some specific ones (or at least specific categories) -- retryable or not. See PR #9051

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have an exception declared here. I see that the decode() operation is guaranteed not to throw an exception. You may also want to add a comment on the decode() interface that it is NOT supposed to throw an exception (just to guard against someone changing it).

Added

Also, instead of a generic exception, I suggest that we define some specific ones (or at least specific categories) -- retryable or not. See PR #9051

I specifically don't want to add any "types" of exception. The reason is that typically, when decoding fails, the caller either ignores the record or aborts the process. It is very uncommon to try and "handle" decoding errors. That's why I preferred encapsulating the exception into a class, instead of throwing it. It is then, completely left to the caller to determine whether to handle specific exceptions or not. This leaves the interface more flexible for usage as well.

For #9051 and the like: The current StreamDecoder interface allows returning null value, which seems to the main source of confusion. The interface is defining how a specific caller should handle a null value when there can be more than 1 caller. At the time of writing #9051, there was only one caller for this method LLRealtimeSegmentDataManager. Now that we have one more, StreamDataDecoder, it is prone to error.

I will ensure that the existing functionalities are not affected or find a workaround. Thanks for pointing it out.

Copy link
Contributor

@npawar npawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm now.

adding StreamMessage concept and using it with kafka

move StreamMessage into pinot-spi

Use StreamDataDecoder interface for deocding a StreamMessage in LLRealtimeSegmentDataManager

verified key and header with realtime quick start

checkstyle

updating realtime quickstart to include headers and key parsing

fix failing tests

checkstyle

revert to continue using hlc in integration tests

added TODO for failing test; revert adding header in the integ test

nit

Deleting unused class: MessageAndOffset and MessageAndOffsetAndMetadata
addressing PR feedback
@npawar npawar merged commit 737d443 into apache:master Sep 28, 2022
61yao pushed a commit to 61yao/pinot that referenced this pull request Oct 3, 2022
…#9224)

* initial commit from kishore

adding StreamMessage concept and using it with kafka

move StreamMessage into pinot-spi

Use StreamDataDecoder interface for deocding a StreamMessage in LLRealtimeSegmentDataManager

verified key and header with realtime quick start

checkstyle

updating realtime quickstart to include headers and key parsing

fix failing tests

checkstyle

revert to continue using hlc in integration tests

added TODO for failing test; revert adding header in the integ test

nit

Deleting unused class: MessageAndOffset and MessageAndOffsetAndMetadata

* Adding metadata to RowMetadata to distinguish it from headers
Addressing feeback

* clear reuse genericrow

* Making offset as a part of KafkaStreamMessageMetadata; changed StreamMessage into a concrete class

* rename RowMetadataExtractor.java to KafkaMetadataExtractor.java

* adding unit test
addressing PR feedback

* Always populate offset and ingestion time in record time

* update the comment

* rename to recordIngestionTimeMs; Fixed unit test assertion in KafkaPartition consumer -  Kafka will always have metadata

* include kafka recordTimestamp in streamMessageMetadata object

* do not use Map.of
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants