From 8f725d4267ab21631213806ab556f1ac89ef37bc Mon Sep 17 00:00:00 2001 From: YanYunyang <313169664@qq.com> Date: Fri, 5 Jul 2024 11:37:54 +0800 Subject: [PATCH] [ISSUE #8358] The worker thread has been converted into a daemon thread. --- .../rocketmq/client/impl/factory/MQClientInstance.java | 10 +++------- .../client/latency/LatencyFaultToleranceImpl.java | 9 ++------- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index c9fd3c83e04..d0c0724aa24 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -60,6 +60,7 @@ import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.MessageExt; @@ -127,13 +128,8 @@ public class MQClientInstance { private final ConcurrentMap> brokerVersionTable = new ConcurrentHashMap<>(); private final Set brokerSupportV2HeartbeatSet = new HashSet<>(); private final ConcurrentMap brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap<>(); - private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread")); - private final ScheduledExecutorService fetchRemoteConfigExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "MQClientFactoryFetchRemoteConfigScheduledThread"); - } - }); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MQClientFactoryScheduledThread")); + private final ScheduledExecutorService fetchRemoteConfigExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MQClientFactoryFetchRemoteConfigScheduledThread")); private final PullMessageService pullMessageService; private final RebalanceService rebalanceService; private final DefaultMQProducer defaultMQProducer; diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java index db8bbd66ef2..08f018dd075 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java @@ -25,9 +25,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.common.ThreadLocalIndex; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -39,12 +39,7 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(); private volatile boolean startDetectorEnable = false; - private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "LatencyFaultToleranceScheduledThread"); - } - }); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("LatencyFaultToleranceScheduledThread")); private final Resolver resolver;