Skip to content

Commit

Permalink
[Fix][Kafka] Fix in kafka streaming mode can not read incremental data (
Browse files Browse the repository at this point in the history
  • Loading branch information
Carl-Zhou-CN authored and hawk9821 committed Nov 18, 2024
1 parent ae8d475 commit 2a45253
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 82 deletions.
1 change: 1 addition & 0 deletions docs/en/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
### Simple

> This example reads the data of kafka's topic_1, topic_2, topic_3 and prints it to the client.And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in Install SeaTunnel to install and deploy SeaTunnel. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job.
> In batch mode, during the enumerator sharding process, it will fetch the latest offset for each partition and use it as the stopping point.
```hocon
# Defining the runtime environment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,22 @@ public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(
@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> createEnumerator(
SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext) {
return new KafkaSourceSplitEnumerator(kafkaSourceConfig, enumeratorContext, null);
return new KafkaSourceSplitEnumerator(
kafkaSourceConfig,
enumeratorContext,
null,
getBoundedness() == Boundedness.UNBOUNDED);
}

@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> restoreEnumerator(
SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext,
KafkaSourceState checkpointState) {
return new KafkaSourceSplitEnumerator(
kafkaSourceConfig, enumeratorContext, checkpointState);
kafkaSourceConfig,
enumeratorContext,
checkpointState,
getBoundedness() == Boundedness.UNBOUNDED);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,25 @@ public class KafkaSourceSplitEnumerator

private final Map<String, TablePath> topicMappingTablePathMap = new HashMap<>();

private boolean isStreamingMode;

KafkaSourceSplitEnumerator(
KafkaSourceConfig kafkaSourceConfig,
Context<KafkaSourceSplit> context,
KafkaSourceState sourceState) {
KafkaSourceState sourceState,
boolean isStreamingMode) {
this.kafkaSourceConfig = kafkaSourceConfig;
this.tablePathMetadataMap = kafkaSourceConfig.getMapMetadata();
this.context = context;
this.assignedSplit = new HashMap<>();
this.pendingSplit = new HashMap<>();
this.adminClient = initAdminClient(this.kafkaSourceConfig.getProperties());
this.discoveryIntervalMillis = kafkaSourceConfig.getDiscoveryIntervalMillis();
this.isStreamingMode = isStreamingMode;
}

@VisibleForTesting
protected KafkaSourceSplitEnumerator(
public KafkaSourceSplitEnumerator(
AdminClient adminClient,
Map<TopicPartition, KafkaSourceSplit> pendingSplit,
Map<TopicPartition, KafkaSourceSplit> assignedSplit) {
Expand All @@ -97,6 +101,16 @@ protected KafkaSourceSplitEnumerator(
this.assignedSplit = assignedSplit;
}

@VisibleForTesting
public KafkaSourceSplitEnumerator(
AdminClient adminClient,
Map<TopicPartition, KafkaSourceSplit> pendingSplit,
Map<TopicPartition, KafkaSourceSplit> assignedSplit,
boolean isStreamingMode) {
this(adminClient, pendingSplit, assignedSplit);
this.isStreamingMode = isStreamingMode;
}

@Override
public void open() {
if (discoveryIntervalMillis > 0) {
Expand Down Expand Up @@ -204,7 +218,7 @@ public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
private Map<TopicPartition, ? extends KafkaSourceSplit> convertToNextSplit(
List<KafkaSourceSplit> splits) {
try {
Map<TopicPartition, Long> listOffsets =
Map<TopicPartition, Long> latestOffsets =
listOffsets(
splits.stream()
.map(KafkaSourceSplit::getTopicPartition)
Expand All @@ -214,7 +228,10 @@ public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
splits.forEach(
split -> {
split.setStartOffset(split.getEndOffset() + 1);
split.setEndOffset(listOffsets.get(split.getTopicPartition()));
split.setEndOffset(
isStreamingMode
? Long.MAX_VALUE
: latestOffsets.get(split.getTopicPartition()));
});
return splits.stream()
.collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, split -> split));
Expand Down Expand Up @@ -305,7 +322,10 @@ private Set<KafkaSourceSplit> getTopicInfo() throws ExecutionException, Interrup
// Obtain the corresponding topic TablePath from kafka topic
TablePath tablePath = topicMappingTablePathMap.get(partition.topic());
KafkaSourceSplit split = new KafkaSourceSplit(tablePath, partition);
split.setEndOffset(latestOffsets.get(split.getTopicPartition()));
split.setEndOffset(
isStreamingMode
? Long.MAX_VALUE
: latestOffsets.get(partition));
return split;
})
.collect(Collectors.toSet());
Expand Down Expand Up @@ -344,6 +364,7 @@ private static int getSplitOwner(TopicPartition tp, int numReaders) {
private Map<TopicPartition, Long> listOffsets(
Collection<TopicPartition> partitions, OffsetSpec offsetSpec)
throws ExecutionException, InterruptedException {

Map<TopicPartition, OffsetSpec> topicPartitionOffsets =
partitions.stream()
.collect(Collectors.toMap(partition -> partition, __ -> offsetSpec));
Expand Down Expand Up @@ -391,7 +412,8 @@ private void discoverySplits() throws ExecutionException, InterruptedException {
assignSplit();
}

private void fetchPendingPartitionSplit() throws ExecutionException, InterruptedException {
@VisibleForTesting
public void fetchPendingPartitionSplit() throws ExecutionException, InterruptedException {
getTopicInfo()
.forEach(
split -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

class KafkaSourceSplitEnumeratorTest {

AdminClient adminClient = Mockito.mock(KafkaAdminClient.class);
// prepare
TopicPartition partition = new TopicPartition("test", 0);

@BeforeEach
void init() {

Mockito.when(adminClient.listOffsets(Mockito.any(java.util.Map.class)))
.thenReturn(
new ListOffsetsResult(
new HashMap<
TopicPartition,
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>() {
{
put(
partition,
KafkaFuture.completedFuture(
new ListOffsetsResult.ListOffsetsResultInfo(
0, 0, Optional.of(0))));
}
}));
Mockito.when(adminClient.describeTopics(Mockito.any(java.util.Collection.class)))
.thenReturn(
DescribeTopicsResult.ofTopicNames(
new HashMap<String, KafkaFuture<TopicDescription>>() {
{
put(
partition.topic(),
KafkaFuture.completedFuture(
new TopicDescription(
partition.topic(),
false,
Collections.singletonList(
new TopicPartitionInfo(
0,
null,
Collections
.emptyList(),
Collections
.emptyList())))));
}
}));
}

@Test
void addSplitsBack() {
// test
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
new HashMap<TopicPartition, KafkaSourceSplit>() {
{
put(partition, new KafkaSourceSplit(null, partition));
}
};
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
List<KafkaSourceSplit> splits = Arrays.asList(new KafkaSourceSplit(null, partition));
KafkaSourceSplitEnumerator enumerator =
new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit);
enumerator.addSplitsBack(splits, 1);
Assertions.assertTrue(pendingSplit.size() == splits.size());
Assertions.assertNull(assignedSplit.get(partition));
Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0);
}

@Test
void addStreamingSplitsBack() {
// test
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
new HashMap<TopicPartition, KafkaSourceSplit>() {
{
put(partition, new KafkaSourceSplit(null, partition));
}
};
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
List<KafkaSourceSplit> splits =
Collections.singletonList(new KafkaSourceSplit(null, partition));
KafkaSourceSplitEnumerator enumerator =
new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit, true);
enumerator.addSplitsBack(splits, 1);
Assertions.assertEquals(pendingSplit.size(), splits.size());
Assertions.assertNull(assignedSplit.get(partition));
Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == Long.MAX_VALUE);
}

@Test
void addStreamingSplits() throws ExecutionException, InterruptedException {
// test
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
new HashMap<TopicPartition, KafkaSourceSplit>();
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
List<KafkaSourceSplit> splits =
Collections.singletonList(new KafkaSourceSplit(null, partition));
KafkaSourceSplitEnumerator enumerator =
new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit, true);
enumerator.fetchPendingPartitionSplit();
Assertions.assertEquals(pendingSplit.size(), splits.size());
Assertions.assertNotNull(pendingSplit.get(partition));
Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == Long.MAX_VALUE);
}

@Test
void addplits() throws ExecutionException, InterruptedException {
// test
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
new HashMap<TopicPartition, KafkaSourceSplit>();
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
List<KafkaSourceSplit> splits =
Collections.singletonList(new KafkaSourceSplit(null, partition));
KafkaSourceSplitEnumerator enumerator =
new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit, false);
enumerator.fetchPendingPartitionSplit();
Assertions.assertEquals(pendingSplit.size(), splits.size());
Assertions.assertNotNull(pendingSplit.get(partition));
Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0);
}
}

This file was deleted.

0 comments on commit 2a45253

Please sign in to comment.