diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index 90c183c2c13e..c0ed66186b28 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -59,6 +59,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor ### Simple > This example reads the data of kafka's topic_1, topic_2, topic_3 and prints it to the client.And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in Install SeaTunnel to install and deploy SeaTunnel. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job. +> In batch mode, during the enumerator sharding process, it will fetch the latest offset for each partition and use it as the stopping point. ```hocon # Defining the runtime environment diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java index 5688fde5b649..0ff99807f24f 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java @@ -104,7 +104,11 @@ public SourceReader createReader( @Override public SourceSplitEnumerator createEnumerator( SourceSplitEnumerator.Context enumeratorContext) { - return new KafkaSourceSplitEnumerator(kafkaSourceConfig, enumeratorContext, null); + return new KafkaSourceSplitEnumerator( + kafkaSourceConfig, + enumeratorContext, + null, + getBoundedness() == Boundedness.UNBOUNDED); } @Override @@ -112,7 +116,10 @@ public SourceSplitEnumerator restoreEnumerat SourceSplitEnumerator.Context enumeratorContext, KafkaSourceState checkpointState) { return new KafkaSourceSplitEnumerator( - kafkaSourceConfig, enumeratorContext, checkpointState); + kafkaSourceConfig, + enumeratorContext, + checkpointState, + getBoundedness() == Boundedness.UNBOUNDED); } @Override diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java index 06ce4565c3b3..6d6c1ca96f9e 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java @@ -70,10 +70,13 @@ public class KafkaSourceSplitEnumerator private final Map topicMappingTablePathMap = new HashMap<>(); + private boolean isStreamingMode; + KafkaSourceSplitEnumerator( KafkaSourceConfig kafkaSourceConfig, Context context, - KafkaSourceState sourceState) { + KafkaSourceState sourceState, + boolean isStreamingMode) { this.kafkaSourceConfig = kafkaSourceConfig; this.tablePathMetadataMap = kafkaSourceConfig.getMapMetadata(); this.context = context; @@ -81,10 +84,11 @@ public class KafkaSourceSplitEnumerator this.pendingSplit = new HashMap<>(); this.adminClient = initAdminClient(this.kafkaSourceConfig.getProperties()); this.discoveryIntervalMillis = kafkaSourceConfig.getDiscoveryIntervalMillis(); + this.isStreamingMode = isStreamingMode; } @VisibleForTesting - protected KafkaSourceSplitEnumerator( + public KafkaSourceSplitEnumerator( AdminClient adminClient, Map pendingSplit, Map assignedSplit) { @@ -97,6 +101,16 @@ protected KafkaSourceSplitEnumerator( this.assignedSplit = assignedSplit; } + @VisibleForTesting + public KafkaSourceSplitEnumerator( + AdminClient adminClient, + Map pendingSplit, + Map assignedSplit, + boolean isStreamingMode) { + this(adminClient, pendingSplit, assignedSplit); + this.isStreamingMode = isStreamingMode; + } + @Override public void open() { if (discoveryIntervalMillis > 0) { @@ -204,7 +218,7 @@ public void addSplitsBack(List splits, int subtaskId) { private Map convertToNextSplit( List splits) { try { - Map listOffsets = + Map latestOffsets = listOffsets( splits.stream() .map(KafkaSourceSplit::getTopicPartition) @@ -214,7 +228,10 @@ public void addSplitsBack(List splits, int subtaskId) { splits.forEach( split -> { split.setStartOffset(split.getEndOffset() + 1); - split.setEndOffset(listOffsets.get(split.getTopicPartition())); + split.setEndOffset( + isStreamingMode + ? Long.MAX_VALUE + : latestOffsets.get(split.getTopicPartition())); }); return splits.stream() .collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, split -> split)); @@ -305,7 +322,10 @@ private Set getTopicInfo() throws ExecutionException, Interrup // Obtain the corresponding topic TablePath from kafka topic TablePath tablePath = topicMappingTablePathMap.get(partition.topic()); KafkaSourceSplit split = new KafkaSourceSplit(tablePath, partition); - split.setEndOffset(latestOffsets.get(split.getTopicPartition())); + split.setEndOffset( + isStreamingMode + ? Long.MAX_VALUE + : latestOffsets.get(partition)); return split; }) .collect(Collectors.toSet()); @@ -344,6 +364,7 @@ private static int getSplitOwner(TopicPartition tp, int numReaders) { private Map listOffsets( Collection partitions, OffsetSpec offsetSpec) throws ExecutionException, InterruptedException { + Map topicPartitionOffsets = partitions.stream() .collect(Collectors.toMap(partition -> partition, __ -> offsetSpec)); @@ -391,7 +412,8 @@ private void discoverySplits() throws ExecutionException, InterruptedException { assignSplit(); } - private void fetchPendingPartitionSplit() throws ExecutionException, InterruptedException { + @VisibleForTesting + public void fetchPendingPartitionSplit() throws ExecutionException, InterruptedException { getTopicInfo() .forEach( split -> { diff --git a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java new file mode 100644 index 000000000000..00e059ecfe94 --- /dev/null +++ b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit; +import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +class KafkaSourceSplitEnumeratorTest { + + AdminClient adminClient = Mockito.mock(KafkaAdminClient.class); + // prepare + TopicPartition partition = new TopicPartition("test", 0); + + @BeforeEach + void init() { + + Mockito.when(adminClient.listOffsets(Mockito.any(java.util.Map.class))) + .thenReturn( + new ListOffsetsResult( + new HashMap< + TopicPartition, + KafkaFuture>() { + { + put( + partition, + KafkaFuture.completedFuture( + new ListOffsetsResult.ListOffsetsResultInfo( + 0, 0, Optional.of(0)))); + } + })); + Mockito.when(adminClient.describeTopics(Mockito.any(java.util.Collection.class))) + .thenReturn( + DescribeTopicsResult.ofTopicNames( + new HashMap>() { + { + put( + partition.topic(), + KafkaFuture.completedFuture( + new TopicDescription( + partition.topic(), + false, + Collections.singletonList( + new TopicPartitionInfo( + 0, + null, + Collections + .emptyList(), + Collections + .emptyList()))))); + } + })); + } + + @Test + void addSplitsBack() { + // test + Map assignedSplit = + new HashMap() { + { + put(partition, new KafkaSourceSplit(null, partition)); + } + }; + Map pendingSplit = new HashMap<>(); + List splits = Arrays.asList(new KafkaSourceSplit(null, partition)); + KafkaSourceSplitEnumerator enumerator = + new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit); + enumerator.addSplitsBack(splits, 1); + Assertions.assertTrue(pendingSplit.size() == splits.size()); + Assertions.assertNull(assignedSplit.get(partition)); + Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0); + } + + @Test + void addStreamingSplitsBack() { + // test + Map assignedSplit = + new HashMap() { + { + put(partition, new KafkaSourceSplit(null, partition)); + } + }; + Map pendingSplit = new HashMap<>(); + List splits = + Collections.singletonList(new KafkaSourceSplit(null, partition)); + KafkaSourceSplitEnumerator enumerator = + new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit, true); + enumerator.addSplitsBack(splits, 1); + Assertions.assertEquals(pendingSplit.size(), splits.size()); + Assertions.assertNull(assignedSplit.get(partition)); + Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == Long.MAX_VALUE); + } + + @Test + void addStreamingSplits() throws ExecutionException, InterruptedException { + // test + Map assignedSplit = + new HashMap(); + Map pendingSplit = new HashMap<>(); + List splits = + Collections.singletonList(new KafkaSourceSplit(null, partition)); + KafkaSourceSplitEnumerator enumerator = + new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit, true); + enumerator.fetchPendingPartitionSplit(); + Assertions.assertEquals(pendingSplit.size(), splits.size()); + Assertions.assertNotNull(pendingSplit.get(partition)); + Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == Long.MAX_VALUE); + } + + @Test + void addplits() throws ExecutionException, InterruptedException { + // test + Map assignedSplit = + new HashMap(); + Map pendingSplit = new HashMap<>(); + List splits = + Collections.singletonList(new KafkaSourceSplit(null, partition)); + KafkaSourceSplitEnumerator enumerator = + new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit, false); + enumerator.fetchPendingPartitionSplit(); + Assertions.assertEquals(pendingSplit.size(), splits.size()); + Assertions.assertNotNull(pendingSplit.get(partition)); + Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0); + } +} diff --git a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java deleted file mode 100644 index 6a8de812d31c..000000000000 --- a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.kafka.source; - -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.KafkaAdminClient; -import org.apache.kafka.clients.admin.ListOffsetsResult; -import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.TopicPartition; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -class KafkaSourceSplitEnumeratorTest { - - @Test - void addSplitsBack() { - // prepare - TopicPartition partition = new TopicPartition("test", 0); - - AdminClient adminClient = Mockito.mock(KafkaAdminClient.class); - Mockito.when(adminClient.listOffsets(Mockito.any(java.util.Map.class))) - .thenReturn( - new ListOffsetsResult( - new HashMap< - TopicPartition, - KafkaFuture>() { - { - put( - partition, - KafkaFuture.completedFuture( - new ListOffsetsResult.ListOffsetsResultInfo( - 0, 0, Optional.of(0)))); - } - })); - - // test - Map assignedSplit = - new HashMap() { - { - put(partition, new KafkaSourceSplit(null, partition)); - } - }; - Map pendingSplit = new HashMap<>(); - List splits = Arrays.asList(new KafkaSourceSplit(null, partition)); - KafkaSourceSplitEnumerator enumerator = - new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit); - enumerator.addSplitsBack(splits, 1); - Assertions.assertTrue(pendingSplit.size() == splits.size()); - Assertions.assertNull(assignedSplit.get(partition)); - } -}