Skip to content

Commit

Permalink
[Fix][connector-rocketmq] commit a correct offset to broker & reduce …
Browse files Browse the repository at this point in the history
…ThreadInterruptedException log (#6668)
  • Loading branch information
YalikWang authored Apr 16, 2024
1 parent dba9953 commit b7480e1
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -33,6 +36,11 @@ public class RocketMqConsumerThread implements Runnable {
private final ConsumerMetadata metadata;
private final LinkedBlockingQueue<Consumer<DefaultLitePullConsumer>> tasks;

private MessageQueue assignedMessageQueue;

/** It is different from the committed offset,just means the last offset that has been polled */
private long lastPolledOffset = -2;

public RocketMqConsumerThread(ConsumerMetadata metadata) {
this.metadata = metadata;
this.tasks = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -70,4 +78,26 @@ public void run() {
public LinkedBlockingQueue<Consumer<DefaultLitePullConsumer>> getTasks() {
return tasks;
}

public void assign(RocketMqSourceSplit sourceSplit) throws MQClientException {
boolean messageQueueChanged =
assignedMessageQueue == null
|| !Objects.equals(assignedMessageQueue, sourceSplit.getMessageQueue());
if (messageQueueChanged) {
this.assignedMessageQueue = sourceSplit.getMessageQueue();
consumer.assign(Collections.singleton(assignedMessageQueue));
}
if (messageQueueChanged || lastPolledOffset != sourceSplit.getStartOffset() - 1) {
if (sourceSplit.getStartOffset() >= 0) {
Long committedOffset = consumer.committed(assignedMessageQueue);
if (!Objects.equals(committedOffset, sourceSplit.getStartOffset())) {
consumer.seek(assignedMessageQueue, sourceSplit.getStartOffset());
}
}
}
}

public void markLastPolledOffset(long offset) {
this.lastPolledOffset = offset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;

import org.apache.seatunnel.shade.com.google.common.collect.Maps;
import org.apache.seatunnel.shade.com.google.common.collect.Sets;

import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
Expand All @@ -37,6 +36,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -50,7 +50,7 @@ public class RocketMqSourceReader implements SourceReader<SeaTunnelRow, RocketMq

private static final long THREAD_WAIT_TIME = 500L;

private final SourceReader.Context context;
private final Context context;
private final ConsumerMetadata metadata;
private final Set<RocketMqSourceSplit> sourceSplits;
private final Map<Long, Map<MessageQueue, Long>> checkpointOffsets;
Expand All @@ -65,7 +65,7 @@ public class RocketMqSourceReader implements SourceReader<SeaTunnelRow, RocketMq
public RocketMqSourceReader(
ConsumerMetadata metadata,
DeserializationSchema<SeaTunnelRow> deserializationSchema,
SourceReader.Context context) {
Context context) {
this.metadata = metadata;
this.context = context;
this.sourceSplits = new HashSet<>();
Expand Down Expand Up @@ -115,21 +115,16 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
sourceSplit -> {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
try {
consumerThreads
.get(sourceSplit.getMessageQueue())
RocketMqConsumerThread rocketMqConsumerThread =
consumerThreads.get(sourceSplit.getMessageQueue());
rocketMqConsumerThread
.getTasks()
.put(
consumer -> {
try {
Set<MessageQueue> messageQueues =
Sets.newHashSet(
sourceSplit.getMessageQueue());
consumer.assign(messageQueues);
if (sourceSplit.getStartOffset() >= 0) {
consumer.seek(
sourceSplit.getMessageQueue(),
sourceSplit.getStartOffset());
}
rocketMqConsumerThread.assign(sourceSplit);
MessageQueue assignedMessageQueue =
sourceSplit.getMessageQueue();
List<MessageExt> records =
consumer.poll(
metadata.getBaseConfig()
Expand All @@ -141,47 +136,36 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
sourceSplit.getStartOffset(),
sourceSplit.getEndOffset());
}
Map<MessageQueue, List<MessageExt>> groupRecords =
List<MessageExt> messages =
records.stream()
.collect(
Collectors.groupingBy(
record ->
new MessageQueue(
record
.getTopic(),
record
.getBrokerName(),
record
.getQueueId())));
for (MessageQueue messageQueue : messageQueues) {
if (!groupRecords.containsKey(messageQueue)) {
continue;
}
List<MessageExt> messages =
groupRecords.get(messageQueue);
for (MessageExt record : messages) {
deserializationSchema.deserialize(
record.getBody(), output);
if (Boundedness.BOUNDED.equals(
context.getBoundedness())
&& record.getQueueOffset()
>= sourceSplit
.getEndOffset()) {
break;
}
}
long lastOffset = -1;
if (!messages.isEmpty()) {
lastOffset =
messages.get(messages.size() - 1)
.getQueueOffset();
sourceSplit.setStartOffset(lastOffset);
}

if (lastOffset >= sourceSplit.getEndOffset()) {
sourceSplit.setEndOffset(lastOffset);
.filter(
record ->
isQueueMatch(
assignedMessageQueue,
record))
.collect(Collectors.toList());
long lastOffset = -1;
for (MessageExt record : messages) {
deserializationSchema.deserialize(
record.getBody(), output);
lastOffset = record.getQueueOffset();
if (Boundedness.BOUNDED.equals(
context.getBoundedness())
&& record.getQueueOffset()
>= sourceSplit.getEndOffset()) {
break;
}
}
if (lastOffset >= 0) {
// set start offset for next poll cycleLife
sourceSplit.setStartOffset(lastOffset + 1);
rocketMqConsumerThread.markLastPolledOffset(
lastOffset);
}
if (lastOffset >= sourceSplit.getEndOffset()) {
// just for bounded mode
sourceSplit.setEndOffset(lastOffset);
}
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
Expand All @@ -200,6 +184,12 @@ record ->
}
}

private boolean isQueueMatch(MessageQueue assignedMessageQueue, MessageExt record) {
return Objects.equals(assignedMessageQueue.getTopic(), record.getTopic())
&& Objects.equals(assignedMessageQueue.getBrokerName(), record.getBrokerName())
&& Objects.equals(assignedMessageQueue.getQueueId(), record.getQueueId());
}

@Override
public List<RocketMqSourceSplit> snapshotState(long checkpointId) throws Exception {
List<RocketMqSourceSplit> pendingSplit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.apache.seatunnel.connectors.seatunnel.rocketmq.serialize.DefaultSeaTunnelRowSerializer;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.engine.common.Constant;

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
Expand Down Expand Up @@ -68,10 +70,12 @@
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

import static org.apache.seatunnel.e2e.connector.rocketmq.RocketMqContainer.NAMESRV_PORT;
Expand Down Expand Up @@ -214,6 +218,27 @@ public void testSourceRocketMqTextToConsole(TestContainer container)
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}

@TestTemplate
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "flink and spark won't commit offset when batch job finished")
public void testSourceRocketMqTextToConsoleWithOffsetCheck(TestContainer container)
throws IOException, InterruptedException {
DefaultSeaTunnelRowSerializer serializer =
new DefaultSeaTunnelRowSerializer(
"test_topic_text_offset_check",
SEATUNNEL_ROW_TYPE,
SchemaFormat.TEXT,
DEFAULT_FIELD_DELIMITER);
generateTestData(
row -> serializer.serializeRow(row), "test_topic_text_offset_check", 0, 10);
Container.ExecResult execResult =
container.executeJob("/rocketmq-source_tex_with_offset_check.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
checkOffsetNoDiff("test_topic_text_offset_check", "SeaTunnel-Consumer-Group");
}

@TestTemplate
public void testSourceRocketMqJsonToConsole(TestContainer container)
throws IOException, InterruptedException {
Expand Down Expand Up @@ -375,6 +400,26 @@ private Map<String, String> getRocketMqConsumerData(String topicName) {
return data;
}

private void checkOffsetNoDiff(String topicName, String consumerGroup) {
RocketMqBaseConfiguration config = newConfiguration();
config.setGroupId(consumerGroup);
List<Map<MessageQueue, TopicOffset>> offsetTopics =
RocketMqAdminUtil.offsetTopics(config, Arrays.asList(topicName));
Map<MessageQueue, TopicOffset> offsetMap = offsetTopics.get(0);
Set<MessageQueue> messageQueues = offsetMap.keySet();
Map<MessageQueue, Long> currentOffsets =
RocketMqAdminUtil.currentOffsets(config, Arrays.asList(topicName), messageQueues);
for (Map.Entry<MessageQueue, TopicOffset> offsetEntry : offsetMap.entrySet()) {
MessageQueue messageQueue = offsetEntry.getKey();
long maxOffset = offsetEntry.getValue().getMaxOffset();
Long consumeOffset = currentOffsets.get(messageQueue);
Assertions.assertEquals(
maxOffset,
consumeOffset,
"Offset different,maxOffset=" + maxOffset + ",consumeOffset=" + consumeOffset);
}
}

public RocketMqBaseConfiguration newConfiguration() {
return RocketMqBaseConfiguration.newBuilder()
.groupId(ROCKETMQ_GROUP)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 1000

# You can set spark configuration here
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
Rocketmq {
name.srv.addr = "rocketmq-e2e:9876"
topics = "test_topic_text_offset_check"
result_table_name = "rocketmq_table"
consumer.group = "SeaTunnel-Consumer-Group"
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(2, 1)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
format = text
# The default field delimiter is ","
field_delimiter = ","
}
}

transform {
}

sink {
Console {
source_table_name = "rocketmq_table"
}
}

0 comments on commit b7480e1

Please sign in to comment.