From b7480e1a890040479fb7c39385d811df1508f61a Mon Sep 17 00:00:00 2001 From: YalikWang <34478654+YalikWang@users.noreply.github.com> Date: Tue, 16 Apr 2024 10:03:13 +0800 Subject: [PATCH] [Fix][connector-rocketmq] commit a correct offset to broker & reduce ThreadInterruptedException log (#6668) --- .../source/RocketMqConsumerThread.java | 30 ++++++ .../rocketmq/source/RocketMqSourceReader.java | 94 +++++++++---------- .../e2e/connector/rocketmq/RocketMqIT.java | 45 +++++++++ ...rocketmq-source_tex_with_offset_check.conf | 72 ++++++++++++++ 4 files changed, 189 insertions(+), 52 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_tex_with_offset_check.conf diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java index bfd34c30362..fcdef8bb949 100644 --- a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java +++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java @@ -23,7 +23,10 @@ import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageQueue; +import java.util.Collections; +import java.util.Objects; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -33,6 +36,11 @@ public class RocketMqConsumerThread implements Runnable { private final ConsumerMetadata metadata; private final LinkedBlockingQueue> tasks; + private MessageQueue assignedMessageQueue; + + /** It is different from the committed offset,just means the last offset that has been polled */ + private long lastPolledOffset = -2; + public RocketMqConsumerThread(ConsumerMetadata metadata) { this.metadata = metadata; this.tasks = new LinkedBlockingQueue<>(); @@ -70,4 +78,26 @@ public void run() { public LinkedBlockingQueue> getTasks() { return tasks; } + + public void assign(RocketMqSourceSplit sourceSplit) throws MQClientException { + boolean messageQueueChanged = + assignedMessageQueue == null + || !Objects.equals(assignedMessageQueue, sourceSplit.getMessageQueue()); + if (messageQueueChanged) { + this.assignedMessageQueue = sourceSplit.getMessageQueue(); + consumer.assign(Collections.singleton(assignedMessageQueue)); + } + if (messageQueueChanged || lastPolledOffset != sourceSplit.getStartOffset() - 1) { + if (sourceSplit.getStartOffset() >= 0) { + Long committedOffset = consumer.committed(assignedMessageQueue); + if (!Objects.equals(committedOffset, sourceSplit.getStartOffset())) { + consumer.seek(assignedMessageQueue, sourceSplit.getStartOffset()); + } + } + } + } + + public void markLastPolledOffset(long offset) { + this.lastPolledOffset = offset; + } } diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java index 2beef96f1bc..90bc8f32315 100644 --- a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java +++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java @@ -18,7 +18,6 @@ package org.apache.seatunnel.connectors.seatunnel.rocketmq.source; import org.apache.seatunnel.shade.com.google.common.collect.Maps; -import org.apache.seatunnel.shade.com.google.common.collect.Sets; import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.source.Boundedness; @@ -37,6 +36,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -50,7 +50,7 @@ public class RocketMqSourceReader implements SourceReader sourceSplits; private final Map> checkpointOffsets; @@ -65,7 +65,7 @@ public class RocketMqSourceReader implements SourceReader deserializationSchema, - SourceReader.Context context) { + Context context) { this.metadata = metadata; this.context = context; this.sourceSplits = new HashSet<>(); @@ -115,21 +115,16 @@ public void pollNext(Collector output) throws Exception { sourceSplit -> { CompletableFuture completableFuture = new CompletableFuture<>(); try { - consumerThreads - .get(sourceSplit.getMessageQueue()) + RocketMqConsumerThread rocketMqConsumerThread = + consumerThreads.get(sourceSplit.getMessageQueue()); + rocketMqConsumerThread .getTasks() .put( consumer -> { try { - Set messageQueues = - Sets.newHashSet( - sourceSplit.getMessageQueue()); - consumer.assign(messageQueues); - if (sourceSplit.getStartOffset() >= 0) { - consumer.seek( - sourceSplit.getMessageQueue(), - sourceSplit.getStartOffset()); - } + rocketMqConsumerThread.assign(sourceSplit); + MessageQueue assignedMessageQueue = + sourceSplit.getMessageQueue(); List records = consumer.poll( metadata.getBaseConfig() @@ -141,47 +136,36 @@ public void pollNext(Collector output) throws Exception { sourceSplit.getStartOffset(), sourceSplit.getEndOffset()); } - Map> groupRecords = + List messages = records.stream() - .collect( - Collectors.groupingBy( - record -> - new MessageQueue( - record - .getTopic(), - record - .getBrokerName(), - record - .getQueueId()))); - for (MessageQueue messageQueue : messageQueues) { - if (!groupRecords.containsKey(messageQueue)) { - continue; - } - List messages = - groupRecords.get(messageQueue); - for (MessageExt record : messages) { - deserializationSchema.deserialize( - record.getBody(), output); - if (Boundedness.BOUNDED.equals( - context.getBoundedness()) - && record.getQueueOffset() - >= sourceSplit - .getEndOffset()) { - break; - } - } - long lastOffset = -1; - if (!messages.isEmpty()) { - lastOffset = - messages.get(messages.size() - 1) - .getQueueOffset(); - sourceSplit.setStartOffset(lastOffset); - } - - if (lastOffset >= sourceSplit.getEndOffset()) { - sourceSplit.setEndOffset(lastOffset); + .filter( + record -> + isQueueMatch( + assignedMessageQueue, + record)) + .collect(Collectors.toList()); + long lastOffset = -1; + for (MessageExt record : messages) { + deserializationSchema.deserialize( + record.getBody(), output); + lastOffset = record.getQueueOffset(); + if (Boundedness.BOUNDED.equals( + context.getBoundedness()) + && record.getQueueOffset() + >= sourceSplit.getEndOffset()) { + break; } } + if (lastOffset >= 0) { + // set start offset for next poll cycleLife + sourceSplit.setStartOffset(lastOffset + 1); + rocketMqConsumerThread.markLastPolledOffset( + lastOffset); + } + if (lastOffset >= sourceSplit.getEndOffset()) { + // just for bounded mode + sourceSplit.setEndOffset(lastOffset); + } } catch (Exception e) { completableFuture.completeExceptionally(e); } @@ -200,6 +184,12 @@ record -> } } + private boolean isQueueMatch(MessageQueue assignedMessageQueue, MessageExt record) { + return Objects.equals(assignedMessageQueue.getTopic(), record.getTopic()) + && Objects.equals(assignedMessageQueue.getBrokerName(), record.getBrokerName()) + && Objects.equals(assignedMessageQueue.getQueueId(), record.getQueueId()); + } + @Override public List snapshotState(long checkpointId) throws Exception { List pendingSplit = diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java index aba0a9f2c07..01d7c77683f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java @@ -37,7 +37,9 @@ import org.apache.seatunnel.connectors.seatunnel.rocketmq.serialize.DefaultSeaTunnelRowSerializer; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.engine.common.Constant; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; @@ -68,10 +70,12 @@ import java.time.Duration; import java.time.LocalDate; import java.time.LocalDateTime; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import static org.apache.seatunnel.e2e.connector.rocketmq.RocketMqContainer.NAMESRV_PORT; @@ -214,6 +218,27 @@ public void testSourceRocketMqTextToConsole(TestContainer container) Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "flink and spark won't commit offset when batch job finished") + public void testSourceRocketMqTextToConsoleWithOffsetCheck(TestContainer container) + throws IOException, InterruptedException { + DefaultSeaTunnelRowSerializer serializer = + new DefaultSeaTunnelRowSerializer( + "test_topic_text_offset_check", + SEATUNNEL_ROW_TYPE, + SchemaFormat.TEXT, + DEFAULT_FIELD_DELIMITER); + generateTestData( + row -> serializer.serializeRow(row), "test_topic_text_offset_check", 0, 10); + Container.ExecResult execResult = + container.executeJob("/rocketmq-source_tex_with_offset_check.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + checkOffsetNoDiff("test_topic_text_offset_check", "SeaTunnel-Consumer-Group"); + } + @TestTemplate public void testSourceRocketMqJsonToConsole(TestContainer container) throws IOException, InterruptedException { @@ -375,6 +400,26 @@ private Map getRocketMqConsumerData(String topicName) { return data; } + private void checkOffsetNoDiff(String topicName, String consumerGroup) { + RocketMqBaseConfiguration config = newConfiguration(); + config.setGroupId(consumerGroup); + List> offsetTopics = + RocketMqAdminUtil.offsetTopics(config, Arrays.asList(topicName)); + Map offsetMap = offsetTopics.get(0); + Set messageQueues = offsetMap.keySet(); + Map currentOffsets = + RocketMqAdminUtil.currentOffsets(config, Arrays.asList(topicName), messageQueues); + for (Map.Entry offsetEntry : offsetMap.entrySet()) { + MessageQueue messageQueue = offsetEntry.getKey(); + long maxOffset = offsetEntry.getValue().getMaxOffset(); + Long consumeOffset = currentOffsets.get(messageQueue); + Assertions.assertEquals( + maxOffset, + consumeOffset, + "Offset different,maxOffset=" + maxOffset + ",consumeOffset=" + consumeOffset); + } + } + public RocketMqBaseConfiguration newConfiguration() { return RocketMqBaseConfiguration.newBuilder() .groupId(ROCKETMQ_GROUP) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_tex_with_offset_check.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_tex_with_offset_check.conf new file mode 100644 index 00000000000..094660837a6 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_tex_with_offset_check.conf @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 1000 + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Rocketmq { + name.srv.addr = "rocketmq-e2e:9876" + topics = "test_topic_text_offset_check" + result_table_name = "rocketmq_table" + consumer.group = "SeaTunnel-Consumer-Group" + schema = { + fields { + id = bigint + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(2, 1)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + format = text + # The default field delimiter is "," + field_delimiter = "," + } +} + +transform { +} + +sink { + Console { + source_table_name = "rocketmq_table" + } +} \ No newline at end of file