Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Kafka] Fix in kafka streaming mode can not read incremental data #7871

Merged
merged 6 commits into from
Nov 16, 2024

Conversation

Carl-Zhou-CN
Copy link
Member

@Carl-Zhou-CN Carl-Zhou-CN commented Oct 18, 2024

Purpose of this pull request

Does this PR introduce any user-facing change?

How was this patch tested?

Current test

Check list

@liunaijie liunaijie changed the title [Bugfix][Kafka] In kafak flow mode, stop offse should be Long.MAX_VALUE [Bugfix][Kafka] In kafka flow mode, stop offse should be Long.MAX_VALUE Oct 18, 2024
@hailin0
Copy link
Member

hailin0 commented Oct 18, 2024

Please add test cases

@Hisoka-X Hisoka-X added this to the 2.3.9 milestone Oct 28, 2024
@weipengfei-sj
Copy link

weipengfei-sj commented Oct 31, 2024

image
When running tasks in a flow mode, an empty map array is returned, and a NullPointerException will occur at this location.

It also needs to be adjusted to the following format.
split.setEndOffset(
latestOffsets.getOrDefault(
split.getTopicPartition(), Long.MAX_VALUE));

@Carl-Zhou-CN
Copy link
Member Author

image When running tasks in a flow mode, an empty map array is returned, and a NullPointerException will occur at this location.

It also needs to be adjusted to the following format. split.setEndOffset( latestOffsets.getOrDefault( split.getTopicPartition(), Long.MAX_VALUE));

After the above modification, are there any problems


if (isStreamingMode) {
return Collections.emptyMap();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Carl-Zhou-CN The fix is LGTM.

But I have a question about Kafka Batch mode, there is no option to set end offset, so how do stop it in batch mode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In batch processing mode, the last offset in the partition will be consumed when reaching the slice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, looks we need update the doc, I am not find related notes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be adding test cases and documentation by the end of the week

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

There only consume the data less than Stop offset.
If this record is on the middle of poll result, How to commit the offset?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the offset of the current consumption data is submitted

liunaijie
liunaijie previously approved these changes Nov 14, 2024
@liunaijie
Copy link
Member

image When running tasks in a flow mode, an empty map array is returned, and a NullPointerException will occur at this location.

It also needs to be adjusted to the following format. split.setEndOffset( latestOffsets.getOrDefault( split.getTopicPartition(), Long.MAX_VALUE));

Good catch, @Carl-Zhou-CN we also need update here.

Or can we in streaming mode, not return an empty map, return the value with Long.MAX_VALUE?

@Carl-Zhou-CN
Copy link
Member Author

image When running tasks in a flow mode, an empty map array is returned, and a NullPointerException will occur at this location.
It also needs to be adjusted to the following format. split.setEndOffset( latestOffsets.getOrDefault( split.getTopicPartition(), Long.MAX_VALUE));

Good catch, @Carl-Zhou-CN we also need update here.

Or can we in streaming mode, not return an empty map, return the value with Long.MAX_VALUE?

Yes, he was ignored

Hisoka-X
Hisoka-X previously approved these changes Nov 15, 2024
Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@@ -59,6 +59,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
### Simple

> This example reads the data of kafka's topic_1, topic_2, topic_3 and prints it to the client.And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in Install SeaTunnel to install and deploy SeaTunnel. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job.
> In batch mode, it will consume continuously until it reaches the maximum offset.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest move this hint to https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/kafka.md?plain=1#L14

And the maximun is not clear, in batch mode, it will stop when it reaches the offset at startup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR modification does not alter the behavior of the Kafka source. I believe adding it above does not make a significant difference and disrupts the structure of the documentation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I updated the description to explain the process

@liunaijie
Copy link
Member

LGTM

@Hisoka-X Hisoka-X changed the title [Bugfix][Kafka] In kafka flow mode, stop offse should be Long.MAX_VALUE [Fix][Kafka] Fix in kafka streaming mode can not read incremental data Nov 16, 2024
@Hisoka-X Hisoka-X merged commit a0eeeb9 into apache:dev Nov 16, 2024
7 checks passed
fcb-xiaobo pushed a commit to fcb-xiaobo/seatunnel that referenced this pull request Nov 18, 2024
fcb-xiaobo pushed a commit to fcb-xiaobo/seatunnel that referenced this pull request Nov 18, 2024
hawk9821 pushed a commit to hawk9821/seatunnel that referenced this pull request Nov 18, 2024
hawk9821 pushed a commit to hawk9821/seatunnel that referenced this pull request Nov 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants