diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index c9fd3c83e04..d0c0724aa24 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -60,6 +60,7 @@ import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.MessageExt; @@ -127,13 +128,8 @@ public class MQClientInstance { private final ConcurrentMap> brokerVersionTable = new ConcurrentHashMap<>(); private final Set brokerSupportV2HeartbeatSet = new HashSet<>(); private final ConcurrentMap brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap<>(); - private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread")); - private final ScheduledExecutorService fetchRemoteConfigExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "MQClientFactoryFetchRemoteConfigScheduledThread"); - } - }); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MQClientFactoryScheduledThread")); + private final ScheduledExecutorService fetchRemoteConfigExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MQClientFactoryFetchRemoteConfigScheduledThread")); private final PullMessageService pullMessageService; private final RebalanceService rebalanceService; private final DefaultMQProducer defaultMQProducer; diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java index db8bbd66ef2..08f018dd075 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java @@ -25,9 +25,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.common.ThreadLocalIndex; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -39,12 +39,7 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(); private volatile boolean startDetectorEnable = false; - private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "LatencyFaultToleranceScheduledThread"); - } - }); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("LatencyFaultToleranceScheduledThread")); private final Resolver resolver;