From ac59c03e64f6ecefd637723283f5f96808def766 Mon Sep 17 00:00:00 2001 From: rongtong Date: Mon, 6 May 2024 19:47:37 +0800 Subject: [PATCH 01/26] [ISSUE #8095] Fix some flaky tests on Mac's workflow (#8083) --- BUILD.bazel | 1 + WORKSPACE | 1 + broker/BUILD.bazel | 4 ++++ client/BUILD.bazel | 3 ++- .../consumer/DefaultLitePullConsumerTest.java | 6 +++++- .../rocketmq/client/impl/MQClientAPIImplTest.java | 4 ++-- pom.xml | 13 +++++++++++++ .../mqclient/ProxyClientRemotingProcessorTest.java | 7 ++++++- .../receipt/DefaultReceiptHandleManagerTest.java | 2 +- 9 files changed, 35 insertions(+), 6 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 358527c3149..ba33a9e6123 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -44,5 +44,6 @@ java_library( "@maven//:org_awaitility_awaitility", "@maven//:org_openjdk_jmh_jmh_core", "@maven//:org_openjdk_jmh_jmh_generator_annprocess", + "@maven//:org_mockito_mockito_junit_jupiter", ], ) diff --git a/WORKSPACE b/WORKSPACE index 8230edef5c0..e1f7743302a 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -111,6 +111,7 @@ maven_install( "com.alipay.sofa:jraft-core:1.3.14", "com.alipay.sofa:hessian:3.3.6", "io.netty:netty-tcnative-boringssl-static:2.0.48.Final", + "org.mockito:mockito-junit-jupiter:4.11.0", ], fetch_sources = True, repositories = [ diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel index b2ee2549bcc..785b7657740 100644 --- a/broker/BUILD.bazel +++ b/broker/BUILD.bazel @@ -95,6 +95,10 @@ java_library( GenTestRules( name = "GeneratedTestRules", test_files = glob(["src/test/java/**/*Test.java"]), + exclude_tests = [ + # These tests are extremely slow and flaky, exclude them before they are properly fixed. + "src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest", + ], deps = [ ":tests", ], diff --git a/client/BUILD.bazel b/client/BUILD.bazel index e491cfcef0c..9b6fbc298c2 100644 --- a/client/BUILD.bazel +++ b/client/BUILD.bazel @@ -49,7 +49,8 @@ java_library( "@maven//:io_netty_netty_all", "@maven//:io_opentracing_opentracing_api", "@maven//:io_opentracing_opentracing_mock", - "@maven//:org_awaitility_awaitility", + "@maven//:org_awaitility_awaitility", + "@maven//:org_mockito_mockito_junit_jupiter", ], resources = glob(["src/test/resources/certs/*.pem"]) + glob(["src/test/resources/certs/*.key"]) ) diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index 24e39f56689..65237bc8f76 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -63,6 +63,8 @@ import org.mockito.Spy; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.quality.Strictness; +import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.stubbing.Answer; import static org.assertj.core.api.Assertions.assertThat; @@ -74,11 +76,13 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) +@MockitoSettings(strictness = Strictness.LENIENT) public class DefaultLitePullConsumerTest { @Spy private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); @@ -743,7 +747,7 @@ public PullResult answer(InvocationOnMock mock) throws Throwable { } }); - when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false)); + doAnswer(x -> new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString()); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index dc892a3548b..97d8d04e648 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -83,7 +83,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Matchers; +import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; @@ -387,7 +387,7 @@ public Object answer(InvocationOnMock mock) throws Throwable { callback.operationSucceed(responseFuture.getResponseCommand()); return null; } - }).when(remotingClient).invokeAsync(Matchers.anyString(), Matchers.any(RemotingCommand.class), Matchers.anyLong(), Matchers.any(InvokeCallback.class)); + }).when(remotingClient).invokeAsync(ArgumentMatchers.anyString(), ArgumentMatchers.any(RemotingCommand.class), ArgumentMatchers.anyLong(), ArgumentMatchers.any(InvokeCallback.class)); SendMessageContext sendMessageContext = new SendMessageContext(); sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer())); msg.getProperties().put("MSG_TYPE", "reply"); diff --git a/pom.xml b/pom.xml index 6307ae18fe4..a72cf473f3a 100644 --- a/pom.xml +++ b/pom.xml @@ -146,6 +146,7 @@ 4.13.2 3.22.0 3.10.0 + 4.11.0 2.0.9 4.1.0 0.30 @@ -840,6 +841,12 @@ ${mockito-core.version} test + + org.mockito + mockito-junit-jupiter + ${mockito-junit-jupiter.version} + test + org.awaitility awaitility @@ -1097,6 +1104,12 @@ ${mockito-core.version} test + + org.mockito + mockito-junit-jupiter + ${mockito-junit-jupiter.version} + test + org.awaitility awaitility diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java index 09ddacde1c4..2cdd92ba5be 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.broker.client.ProducerManager; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.proxy.service.client.ProxyClientRemotingProcessor; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; @@ -72,6 +73,10 @@ public class ProxyClientRemotingProcessorTest { @Test public void testTransactionCheck() throws Exception { + // Temporarily skip this test on the Mac system as it is flaky + if (MixAll.isMac()) { + return; + } CompletableFuture> proxyRelayResultFuture = new CompletableFuture<>(); when(proxyRelayService.processCheckTransactionState(any(), any(), any(), any())) .thenReturn(new RelayData<>( @@ -123,7 +128,7 @@ public void testTransactionCheck() throws Exception { } }); } - await().atMost(Duration.ofSeconds(1)).until(() -> count.get() == 100); + await().atMost(Duration.ofSeconds(3)).until(() -> count.get() == 100); verify(observer, times(2)).onNext(any()); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java index 25ae1509a95..a01c356f779 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java @@ -227,7 +227,7 @@ public void testRenewExceedMaxRenewTimes() { Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes())))) .thenReturn(ackResultFuture); - await().atMost(Duration.ofSeconds(1)).until(() -> { + await().atMost(Duration.ofSeconds(3)).until(() -> { receiptHandleManager.scheduleRenewTask(); try { ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); From a15088cbd33d665a457472e0f513507dc89d8a8d Mon Sep 17 00:00:00 2001 From: cnScarb Date: Tue, 7 May 2024 09:49:43 +0800 Subject: [PATCH 02/26] [ISSUE #8096] fix log placeholder --- .../rocketmq/client/impl/ClientRemotingProcessor.java | 4 ++-- .../org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 4 ++-- .../impl/consumer/ConsumeMessageConcurrentlyService.java | 8 ++++---- .../impl/consumer/ConsumeMessageOrderlyService.java | 8 ++++---- .../consumer/ConsumeMessagePopConcurrentlyService.java | 4 ++-- .../impl/consumer/ConsumeMessagePopOrderlyService.java | 4 ++-- .../client/impl/producer/DefaultMQProducerImpl.java | 8 ++++---- .../org/apache/rocketmq/common/stats/MomentStatsItem.java | 4 ++-- .../rocketmq/controller/impl/DLedgerController.java | 2 +- .../apache/rocketmq/test/client/rmq/RMQPopConsumer.java | 2 +- .../org/apache/rocketmq/test/util/MQAdminTestUtils.java | 3 +-- .../test/java/org/apache/rocketmq/test/base/BaseConf.java | 3 +-- .../rocketmq/test/client/consumer/pop/PopSubCheckIT.java | 2 +- 13 files changed, 27 insertions(+), 29 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index 2f18c610c14..e46c651f928 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -288,8 +288,8 @@ private void processReplyMessage(MessageExt replyMsg) { } } else { String bornHost = replyMsg.getBornHostString(); - logger.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s , reply from host: %s", - correlationId, bornHost)); + logger.warn("receive reply message, but not matched any request, CorrelationId: {} , reply from host: {}", + correlationId, bornHost); } } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 0c58affa34a..9b15279cb62 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1168,7 +1168,7 @@ private PopResult processPopResponse(final String brokerName, final RemotingComm index = sortMap.get(queueIdKey).indexOf(offset); msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index); if (msgQueueOffset != offset) { - log.warn("Queue offset[%d] of msg is strange, not equal to the stored in msg, %s", + log.warn("Queue offset[{}] of msg is strange, not equal to the stored in msg, {}", msgQueueOffset, messageExt); } messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, @@ -1181,7 +1181,7 @@ private PopResult processPopResponse(final String brokerName, final RemotingComm index = sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset()); msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index); if (msgQueueOffset != messageExt.getQueueOffset()) { - log.warn("Queue offset[%d] of msg is strange, not equal to the stored in msg, %s", msgQueueOffset, messageExt); + log.warn("Queue offset[{}] of msg is strange, not equal to the stored in msg, {}", msgQueueOffset, messageExt); } messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index ea6c8072b57..b151fefbbb3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -169,11 +169,11 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, Strin result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); result.setRemark(UtilAll.exceptionSimpleDesc(e)); - log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", + log.warn("consumeMessageDirectly exception: {} Group: {} Msgs: {} MQ: {}", UtilAll.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, - mq), e); + mq, e); } result.setSpentTimeMills(System.currentTimeMillis() - beginTime); @@ -410,11 +410,11 @@ public void run() { } status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { - log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s", + log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", UtilAll.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, - messageQueue), e); + messageQueue, e); hasException = true; } long consumeRT = System.currentTimeMillis() - beginTimestamp; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index cab4fe5d69f..36d686048ce 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -181,11 +181,11 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, Strin result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); result.setRemark(UtilAll.exceptionSimpleDesc(e)); - log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", + log.warn("consumeMessageDirectly exception: {} Group: {} Msgs: {} MQ: {}", UtilAll.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, - mq), e); + mq, e); } result.setAutoCommit(context.isAutoCommit()); @@ -497,11 +497,11 @@ public void run() { status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { - log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s", + log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", UtilAll.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, - messageQueue), e); + messageQueue, e); hasException = true; } finally { this.processQueue.getConsumeLock().readLock().unlock(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java index a61454f5955..3713d1aba4d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java @@ -153,11 +153,11 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, Strin result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); result.setRemark(UtilAll.exceptionSimpleDesc(e)); - log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", + log.warn("consumeMessageDirectly exception: {} Group: {} Msgs: {} MQ: {}", UtilAll.exceptionSimpleDesc(e), ConsumeMessagePopConcurrentlyService.this.consumerGroup, msgs, - mq), e); + mq, e); } result.setSpentTimeMills(System.currentTimeMillis() - beginTime); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java index ae6adfea5df..4eab1ccf664 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java @@ -175,11 +175,11 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, Strin result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); result.setRemark(UtilAll.exceptionSimpleDesc(e)); - log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", + log.warn("consumeMessageDirectly exception: {} Group: {} Msgs: {} MQ: {}", UtilAll.exceptionSimpleDesc(e), ConsumeMessagePopOrderlyService.this.consumerGroup, msgs, - mq), e); + mq, e); } result.setAutoCommit(context.isAutoCommit()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index d171411d023..5b7bd2dc9dc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -762,7 +762,7 @@ private SendResult sendDefaultImpl( } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true); - log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); + log.warn("sendKernelImpl exception, resend at once, InvokeID: {}, RT: {}ms, Broker: {}", invokeID, endTimestamp - beginTimestampPrev, mq, e); log.warn(msg.toString()); exception = e; continue; @@ -775,7 +775,7 @@ private SendResult sendDefaultImpl( // Otherwise, isolate this broker. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, true); } - log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); + log.warn("sendKernelImpl exception, resend at once, InvokeID: {}, RT: {}ms, Broker: {}", invokeID, endTimestamp - beginTimestampPrev, mq, e); if (log.isDebugEnabled()) { log.debug(msg.toString()); } @@ -784,7 +784,7 @@ private SendResult sendDefaultImpl( } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, false); - log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); + log.warn("sendKernelImpl exception, resend at once, InvokeID: {}, RT: {}ms, Broker: {}", invokeID, endTimestamp - beginTimestampPrev, mq, e); if (log.isDebugEnabled()) { log.debug(msg.toString()); } @@ -801,7 +801,7 @@ private SendResult sendDefaultImpl( } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true); - log.warn("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); + log.warn("sendKernelImpl exception, throw exception, InvokeID: {}, RT: {}ms, Broker: {}", invokeID, endTimestamp - beginTimestampPrev, mq, e); if (log.isDebugEnabled()) { log.debug(msg.toString()); } diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java index d38281bf83a..71c796b283a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java @@ -55,10 +55,10 @@ public void run() { } public void printAtMinutes() { - log.info(String.format("[%s] [%s] Stats Every 5 Minutes, Value: %d", + log.info("[{}] [{}] Stats Every 5 Minutes, Value: {}", this.statsName, this.statsKey, - this.value.get())); + this.value.get()); } public AtomicLong getValue() { diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java index a032b7b6211..be487849ce5 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java @@ -569,7 +569,7 @@ public void handle(long term, MemberState.Role role) { break; } tryTimes++; - log.error(String.format("Controller leader append initial log failed, try %d times", tryTimes)); + log.error("Controller leader append initial log failed, try {} times", tryTimes); if (tryTimes % 3 == 0) { log.warn("Controller leader append initial log failed too many times, please wait a while"); } diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java index a7046bca7da..67a781aacc2 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java @@ -58,7 +58,7 @@ public RMQPopConsumer(String nsAddr, String topic, String subExpression, @Override public void start() { client = ConsumerFactory.getRMQPopClient(); - log.info(String.format("consumer[%s] started!", consumerGroup)); + log.info("consumer[{}] started!", consumerGroup); } @Override diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java index d3d5de9e271..47a8db3c9a7 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java @@ -117,8 +117,7 @@ public static boolean createSub(String nameSrvAddr, String clusterName, String c for (String addr : masterSet) { try { mqAdminExt.createAndUpdateSubscriptionGroupConfig(addr, config); - log.info(String.format("create subscription group %s to %s success.\n", consumerId, - addr)); + log.info("create subscription group {} to {} success.", consumerId, addr); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000 * 1); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java index d2713919509..b64cda33420 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -287,8 +287,7 @@ public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, consumer.setDebug(); } mqClients.add(consumer); - log.info(String.format("consumer[%s] start,topic[%s],subExpression[%s]", consumerGroup, - topic, subExpression)); + log.info("consumer[{}] start,topic[{}],subExpression[{}]", consumerGroup, topic, subExpression); return consumer; } diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopSubCheckIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopSubCheckIT.java index e2a657f4343..3034876846f 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopSubCheckIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopSubCheckIT.java @@ -64,7 +64,7 @@ public void tearDown() { @Test public void testNormalPopAck() throws Exception { String topic = initTopic(); - log.info(String.format("use topic: %s; group: %s !", topic, group)); + log.info("use topic: {}; group: {} !", topic, group); RMQNormalProducer producer = getProducer(NAMESRV_ADDR, topic); producer.getProducer().setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE); From 0f0324a7dd9b2e994aeb4a4f5c8631f8465daae5 Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Tue, 7 May 2024 09:52:48 +0800 Subject: [PATCH 03/26] [ISSUE #8076] Fix correct min cq offset when delete tiered storage CommitLog (#8082) --- .../tieredstore/file/FlatCommitLogFile.java | 13 +++++++++++++ .../file/FlatCommitLogFileTest.java | 18 ++++++++++++++++-- .../tieredstore/file/FlatFileStoreTest.java | 2 +- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java index 8a319ed3899..6ac0939571f 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java @@ -60,4 +60,17 @@ public CompletableFuture getMinOffsetFromFileAsync() { return firstOffset.get(); }); } + + @Override + public void destroyExpiredFile(long expireTimestamp) { + long beforeOffset = this.getMinOffset(); + super.destroyExpiredFile(expireTimestamp); + long afterOffset = this.getMinOffset(); + + if (beforeOffset != afterOffset) { + log.info("CommitLog min cq offset reset, filePath={}, offset={}, expireTimestamp={}, change={}-{}", + filePath, firstOffset.get(), expireTimestamp, beforeOffset, afterOffset); + firstOffset.set(GET_OFFSET_ERROR); + } + } } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java index 7e030d305eb..1e912690b2f 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java @@ -93,19 +93,33 @@ public void getMinOffsetFromFileAsyncTest() { for (int i = 6; i < 9; i++) { ByteBuffer byteBuffer = MessageFormatUtilTest.buildMockedMessageBuffer(); byteBuffer.putLong(MessageFormatUtil.QUEUE_OFFSET_POSITION, i); - Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, 1L)); + Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, i)); } Assert.assertEquals(-1L, flatFile.getMinOffsetFromFileAsync().join().longValue()); // append some messages for (int i = 9; i < 30; i++) { + if (i == 20) { + flatFile.commitAsync().join(); + flatFile.rollingNewFile(flatFile.getAppendOffset()); + } ByteBuffer byteBuffer = MessageFormatUtilTest.buildMockedMessageBuffer(); byteBuffer.putLong(MessageFormatUtil.QUEUE_OFFSET_POSITION, i); - Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, 1L)); + Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, i)); } flatFile.commitAsync().join(); Assert.assertEquals(6L, flatFile.getMinOffsetFromFile()); Assert.assertEquals(6L, flatFile.getMinOffsetFromFileAsync().join().longValue()); + + // recalculate min offset here + flatFile.destroyExpiredFile(20L); + Assert.assertEquals(20L, flatFile.getMinOffsetFromFile()); + Assert.assertEquals(20L, flatFile.getMinOffsetFromFileAsync().join().longValue()); + + // clean expired file again + flatFile.destroyExpiredFile(20L); + Assert.assertEquals(20L, flatFile.getMinOffsetFromFile()); + Assert.assertEquals(20L, flatFile.getMinOffsetFromFileAsync().join().longValue()); } } \ No newline at end of file diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java index 79647932dae..2a007af4e9d 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java @@ -46,7 +46,7 @@ public void init() { storeConfig = new MessageStoreConfig(); storeConfig.setStorePathRootDir(storePath); storeConfig.setTieredBackendServiceProvider(PosixFileSegment.class.getName()); - storeConfig.setBrokerName(storeConfig.getBrokerName()); + storeConfig.setBrokerName("brokerName"); metadataStore = new DefaultMetadataStore(storeConfig); } From d05ff5ef6871839fc202612136bb21a24e67998a Mon Sep 17 00:00:00 2001 From: Liu Shengzhong Date: Tue, 7 May 2024 10:44:26 +0800 Subject: [PATCH 04/26] [ISSUE #8098] Fix parsing delay message type from property --- .../common/attribute/TopicMessageType.java | 3 +- .../attribute/TopicMessageTypeTest.java | 60 +++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java index 9680acec74d..5e581a34eec 100644 --- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java +++ b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java @@ -50,7 +50,8 @@ public static TopicMessageType parseFromMessageProperty(Map mess return TopicMessageType.TRANSACTION; } else if (messageProperty.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null || messageProperty.get(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null - || messageProperty.get(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) { + || messageProperty.get(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null + || messageProperty.get(MessageConst.PROPERTY_TIMER_DELAY_MS) != null) { return TopicMessageType.DELAY; } else if (messageProperty.get(MessageConst.PROPERTY_SHARDING_KEY) != null) { return TopicMessageType.FIFO; diff --git a/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java b/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java new file mode 100644 index 00000000000..67525ae8087 --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.attribute; + +import java.util.HashMap; +import java.util.Map; +import org.apache.rocketmq.common.message.MessageConst; +import org.junit.Assert; +import org.junit.Test; + +public class TopicMessageTypeTest { + @Test + public void testParseFromMessageProperty() { + Map properties = new HashMap<>(); + + // TRANSACTION + properties.put(MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); + Assert.assertEquals(TopicMessageType.TRANSACTION, TopicMessageType.parseFromMessageProperty(properties)); + + // DELAY + properties.clear(); + properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3"); + Assert.assertEquals(TopicMessageType.DELAY, TopicMessageType.parseFromMessageProperty(properties)); + + properties.clear(); + properties.put(MessageConst.PROPERTY_TIMER_DELIVER_MS, System.currentTimeMillis() + 10000 + ""); + Assert.assertEquals(TopicMessageType.DELAY, TopicMessageType.parseFromMessageProperty(properties)); + + properties.clear(); + properties.put(MessageConst.PROPERTY_TIMER_DELAY_SEC, 10 + ""); + Assert.assertEquals(TopicMessageType.DELAY, TopicMessageType.parseFromMessageProperty(properties)); + + properties.clear(); + properties.put(MessageConst.PROPERTY_TIMER_DELAY_MS, 10000 + ""); + Assert.assertEquals(TopicMessageType.DELAY, TopicMessageType.parseFromMessageProperty(properties)); + + // FIFO + properties.clear(); + properties.put(MessageConst.PROPERTY_SHARDING_KEY, "sharding_key"); + Assert.assertEquals(TopicMessageType.FIFO, TopicMessageType.parseFromMessageProperty(properties)); + + // NORMAL + properties.clear(); + Assert.assertEquals(TopicMessageType.NORMAL, TopicMessageType.parseFromMessageProperty(properties)); + } +} From 1a2fc17b5e64061464f32023fcdb247eec943ce3 Mon Sep 17 00:00:00 2001 From: dingshuangxi888 Date: Tue, 7 May 2024 14:54:32 +0800 Subject: [PATCH 05/26] [ISSUE #8100] Expose print audit log function (#8101) --- .../authentication/provider/DefaultAuthenticationProvider.java | 2 +- .../authorization/provider/DefaultAuthorizationProvider.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/DefaultAuthenticationProvider.java b/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/DefaultAuthenticationProvider.java index 482b02db030..98e7ede7ee3 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/DefaultAuthenticationProvider.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/DefaultAuthenticationProvider.java @@ -68,7 +68,7 @@ protected HandlerChain> ne .addNext(new DefaultAuthenticationHandler(this.authConfig, metadataService)); } - private void doAuditLog(DefaultAuthenticationContext context, Throwable ex) { + protected void doAuditLog(DefaultAuthenticationContext context, Throwable ex) { if (StringUtils.isBlank(context.getUsername())) { return; } diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/DefaultAuthorizationProvider.java b/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/DefaultAuthorizationProvider.java index 15fb5a5b85b..75111030328 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/DefaultAuthorizationProvider.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/DefaultAuthorizationProvider.java @@ -78,7 +78,7 @@ protected HandlerChain> new .addNext(new AclAuthorizationHandler(authConfig, metadataService)); } - private void doAuditLog(DefaultAuthorizationContext context, Throwable ex) { + protected void doAuditLog(DefaultAuthorizationContext context, Throwable ex) { if (context.getSubject() == null) { return; } From dcf892a15662da858f0c6dadc93da272b18a0f00 Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan Date: Tue, 7 May 2024 16:46:26 +0800 Subject: [PATCH 06/26] Fix SimpleSubscriptionData equal (#8104) --- .../subscription/SimpleSubscriptionData.java | 10 +-- .../SimpleSubscriptionDataTest.java | 70 +++++++++++++++++++ 2 files changed, 76 insertions(+), 4 deletions(-) create mode 100644 remoting/src/test/java/org/apache/rocketmq/remoting/protocol/subscription/SimpleSubscriptionDataTest.java diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SimpleSubscriptionData.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SimpleSubscriptionData.java index ec2b51e0b96..bb5c3074b44 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SimpleSubscriptionData.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SimpleSubscriptionData.java @@ -65,7 +65,8 @@ public void setVersion(long version) { this.version = version; } - @Override public boolean equals(Object o) { + @Override + public boolean equals(Object o) { if (this == o) { return true; } @@ -73,11 +74,12 @@ public void setVersion(long version) { return false; } SimpleSubscriptionData that = (SimpleSubscriptionData) o; - return version == that.version && Objects.equals(topic, that.topic); + return Objects.equals(topic, that.topic) && Objects.equals(expressionType, that.expressionType) && Objects.equals(expression, that.expression); } - @Override public int hashCode() { - return Objects.hash(topic, version); + @Override + public int hashCode() { + return Objects.hash(topic, expressionType, expression); } @Override public String toString() { diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/subscription/SimpleSubscriptionDataTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/subscription/SimpleSubscriptionDataTest.java new file mode 100644 index 00000000000..fa8e4ba6ae4 --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/subscription/SimpleSubscriptionDataTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol.subscription; + +import com.google.common.collect.Sets; +import java.util.Set; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class SimpleSubscriptionDataTest { + @Test + public void testNotEqual() { + String topic = "test-topic"; + String expressionType = "TAG"; + String expression1 = "test-expression-1"; + String expression2 = "test-expression-2"; + SimpleSubscriptionData simpleSubscriptionData1 = new SimpleSubscriptionData(topic, expressionType, expression1, 1); + SimpleSubscriptionData simpleSubscriptionData2 = new SimpleSubscriptionData(topic, expressionType, expression2, 1); + assertThat(simpleSubscriptionData1.equals(simpleSubscriptionData2)).isFalse(); + } + + @Test + public void testEqual() { + String topic = "test-topic"; + String expressionType = "TAG"; + String expression1 = "test-expression-1"; + String expression2 = "test-expression-1"; + SimpleSubscriptionData simpleSubscriptionData1 = new SimpleSubscriptionData(topic, expressionType, expression1, 1); + SimpleSubscriptionData simpleSubscriptionData2 = new SimpleSubscriptionData(topic, expressionType, expression2, 1); + assertThat(simpleSubscriptionData1.equals(simpleSubscriptionData2)).isTrue(); + } + + @Test + public void testSetNotEqual() { + String topic = "test-topic"; + String expressionType = "TAG"; + String expression1 = "test-expression-1"; + String expression2 = "test-expression-2"; + Set set1 = Sets.newHashSet(new SimpleSubscriptionData(topic, expressionType, expression1, 1)); + Set set2 = Sets.newHashSet(new SimpleSubscriptionData(topic, expressionType, expression2, 1)); + assertThat(set1.equals(set2)).isFalse(); + } + + @Test + public void testSetEqual() { + String topic = "test-topic"; + String expressionType = "TAG"; + String expression1 = "test-expression-1"; + String expression2 = "test-expression-1"; + Set set1 = Sets.newHashSet(new SimpleSubscriptionData(topic, expressionType, expression1, 1)); + Set set2 = Sets.newHashSet(new SimpleSubscriptionData(topic, expressionType, expression2, 1)); + assertThat(set1.equals(set2)).isTrue(); + } +} \ No newline at end of file From d9742a6cc80f0a0fe306850a2abd3ec605597b5c Mon Sep 17 00:00:00 2001 From: zzl <87265072+zhiliatom@users.noreply.github.com> Date: Wed, 8 May 2024 11:59:33 +0800 Subject: [PATCH 07/26] [ISSUE #8105] fix typo about udpate user (#8106) --- .../apache/rocketmq/broker/processor/AdminBrokerProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 362caf9ca68..78a5ba92eed 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -2907,7 +2907,7 @@ private RemotingCommand updateUser(ChannelHandlerContext ctx, return this.brokerController.getAuthenticationMetadataManager().updateUser(old); }).thenAccept(nil -> response.setCode(ResponseCode.SUCCESS)) .exceptionally(ex -> { - LOGGER.error("delete user {} error", requestHeader.getUsername(), ex); + LOGGER.error("update user {} error", requestHeader.getUsername(), ex); return handleAuthException(response, ex); }) .join(); From 634092a16ff915627ebb6798ef99fd656bbf0894 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Wed, 8 May 2024 13:59:46 +0800 Subject: [PATCH 08/26] [ISSUE #5923] Fix tiered store README.md error about Configuration (#8110) --- tieredstore/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tieredstore/README.md b/tieredstore/README.md index edc229e1041..d420494461a 100644 --- a/tieredstore/README.md +++ b/tieredstore/README.md @@ -13,7 +13,7 @@ This article is a cookbook for RocketMQ tiered storage. Use the following steps to easily use tiered storage 1. Change `messageStorePlugIn` to `org.apache.rocketmq.tieredstore.MessageStoreExtend` in your `broker.conf`. -2. Configure your backend service provider. change `tieredBackendServiceProvider` to your storage medium implement. We give a default implement: POSIX provider, and you need to change `tieredStoreFilepath` to the mount point of storage medium for tiered storage. +2. Configure your backend service provider. change `tieredBackendServiceProvider` to your storage medium implement. We give a default implement: POSIX provider, and you need to change `tieredStoreFilePath` to the mount point of storage medium for tiered storage. 3. Start the broker and enjoy! ## Configuration @@ -25,7 +25,7 @@ The following are some core configurations, for more details, see [TieredMessage | messageStorePlugIn | | | Set to org.apache.rocketmq.tieredstore.MessageStoreExtend to use tiered storage | | tieredMetadataServiceProvider | org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore | | Select your metadata provider | | tieredBackendServiceProvider | org.apache.rocketmq.tieredstore.provider.PosixFileSegment | | Select your backend service provider | -| tieredStoreFilepath | | | Select the directory using for tiered storage, only for POSIX provider. | +| tieredStoreFilePath | | | Select the directory using for tiered storage, only for POSIX provider. | | tieredStorageLevel | NOT_IN_DISK | | The options are DISABLE, NOT_IN_DISK, NOT_IN_MEM, FORCE | | tieredStoreFileReservedTime | 72 | hour | Default topic TTL in tiered storage | | tieredStoreGroupCommitCount | 2500 | | The number of messages that trigger one batch transfer | From 52c5d89244d57bf1625b09b494c2762200c75303 Mon Sep 17 00:00:00 2001 From: Kaiyao Ke <47203510+kaiyaok2@users.noreply.github.com> Date: Wed, 8 May 2024 01:00:40 -0500 Subject: [PATCH 09/26] [ISSUE #8092] Fixed non-idempotent test --- .../src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java index bfbddf49135..25f8cfdecb8 100644 --- a/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java +++ b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java @@ -68,6 +68,7 @@ public void testRegister() { e.printStackTrace(); assertThat(Boolean.FALSE).isTrue(); } + FilterFactory.INSTANCE.unRegister("Nothing"); } @Test From f0c243d741dd0c7d47716efe557bc5a0e50ba29a Mon Sep 17 00:00:00 2001 From: Willhow <65004897+Willhow-Gao@users.noreply.github.com> Date: Wed, 8 May 2024 14:01:31 +0800 Subject: [PATCH 10/26] [ISSUE #8090] Optimize isSetEqual for DefaultLitePullConsumerImpl (#8091) * [ISSUE #8090]1.optimize isSetEqual method and add Unit tests; 2.fix a typo in MQAdminImpl * remove author message * Modify code style --- .../rocketmq/client/impl/MQAdminImpl.java | 2 +- .../consumer/DefaultLitePullConsumerImpl.java | 12 +-- .../DefaultLitePullConsumerImplTest.java | 98 +++++++++++++++++++ 3 files changed, 104 insertions(+), 8 deletions(-) create mode 100644 client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImplTest.java diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index b1d07b85f78..bcfe29bd4f6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -181,7 +181,7 @@ public Set fetchSubscribeMessageQueues(String topic) throws MQClie e); } - throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null); + throw new MQClientException("Unknown why, Can not find Message Queue for this topic, " + topic, null); } public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 78dafd0ff72..a3276cd7823 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -1237,18 +1237,16 @@ private boolean isSetEqual(Set set1, Set set2) { return true; } - if (set1 == null || set2 == null || set1.size() != set2.size() || set1.size() == 0) { + if (set1 == null || set2 == null || set1.size() != set2.size()) { return false; } - Iterator iter = set2.iterator(); - boolean isEqual = true; - while (iter.hasNext()) { - if (!set1.contains(iter.next())) { - isEqual = false; + for (MessageQueue messageQueue : set2) { + if (!set1.contains(messageQueue)) { + return false; } } - return isEqual; + return true; } public AssignedMessageQueue getAssignedMessageQueue() { diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImplTest.java new file mode 100644 index 00000000000..3986c497eb5 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImplTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.consumer; + +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashSet; +import java.util.Set; + + +public class DefaultLitePullConsumerImplTest { + private final DefaultLitePullConsumerImpl consumer = new DefaultLitePullConsumerImpl(new DefaultLitePullConsumer(), null); + + private static Method isSetEqualMethod; + + @BeforeClass + public static void initReflectionMethod() throws NoSuchMethodException { + Class consumerClass = DefaultLitePullConsumerImpl.class; + Method testMethod = consumerClass.getDeclaredMethod("isSetEqual", Set.class, Set.class); + testMethod.setAccessible(true); + isSetEqualMethod = testMethod; + } + + + /** + * The two empty sets should be equal + */ + @Test + public void testIsSetEqual1() throws InvocationTargetException, IllegalAccessException { + Set set1 = new HashSet<>(); + Set set2 = new HashSet<>(); + boolean equalResult = (boolean) isSetEqualMethod.invoke(consumer, set1, set2); + Assert.assertTrue(equalResult); + } + + + /** + * When a set has elements and one does not, the two sets are not equal + */ + @Test + public void testIsSetEqual2() throws InvocationTargetException, IllegalAccessException { + Set set1 = new HashSet<>(); + set1.add(new MessageQueue("testTopic","testBroker",111)); + Set set2 = new HashSet<>(); + boolean equalResult = (boolean) isSetEqualMethod.invoke(consumer, set1, set2); + Assert.assertFalse(equalResult); + } + + /** + * The two null sets should be equal + */ + @Test + public void testIsSetEqual3() throws InvocationTargetException, IllegalAccessException { + Set set1 = null; + Set set2 = null; + boolean equalResult = (boolean) isSetEqualMethod.invoke(consumer, set1, set2); + Assert.assertTrue(equalResult); + } + + @Test + public void testIsSetEqual4() throws InvocationTargetException, IllegalAccessException { + Set set1 = null; + Set set2 = new HashSet<>(); + boolean equalResult = (boolean) isSetEqualMethod.invoke(consumer, set1, set2); + Assert.assertFalse(equalResult); + } + + @Test + public void testIsSetEqual5() throws InvocationTargetException, IllegalAccessException { + Set set1 = new HashSet<>(); + set1.add(new MessageQueue("testTopic","testBroker",111)); + Set set2 = new HashSet<>(); + set2.add(new MessageQueue("testTopic","testBroker",111)); + boolean equalResult = (boolean) isSetEqualMethod.invoke(consumer, set1, set2); + Assert.assertTrue(equalResult); + } + +} From 5dae822053ef813b1406274843522b2f126af184 Mon Sep 17 00:00:00 2001 From: zzl <87265072+zhiliatom@users.noreply.github.com> Date: Wed, 8 May 2024 14:05:33 +0800 Subject: [PATCH 11/26] [ISSUE #8108] Fix check MetadataProvider when enable acl2.0 (#8109) --- .../manager/AuthenticationMetadataManagerImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authentication/manager/AuthenticationMetadataManagerImpl.java b/auth/src/main/java/org/apache/rocketmq/auth/authentication/manager/AuthenticationMetadataManagerImpl.java index 6eabe69f456..39620ca8d25 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authentication/manager/AuthenticationMetadataManagerImpl.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authentication/manager/AuthenticationMetadataManagerImpl.java @@ -207,14 +207,14 @@ private void handleException(Exception e, CompletableFuture result) { } private AuthenticationMetadataProvider getAuthenticationMetadataProvider() { - if (authorizationMetadataProvider == null) { + if (authenticationMetadataProvider == null) { throw new IllegalStateException("The authenticationMetadataProvider is not configured"); } return authenticationMetadataProvider; } private AuthorizationMetadataProvider getAuthorizationMetadataProvider() { - if (authenticationMetadataProvider == null) { + if (authorizationMetadataProvider == null) { throw new IllegalStateException("The authorizationMetadataProvider is not configured"); } return authorizationMetadataProvider; From ad027560780c2af801e4bcb447d85bf0df62071a Mon Sep 17 00:00:00 2001 From: bxfjb <48467309+bxfjb@users.noreply.github.com> Date: Wed, 8 May 2024 15:39:49 +0800 Subject: [PATCH 12/26] [ISSUE #8049] fix tiered store delete empty topic NPE (#8050) Co-authored-by: zhaoyuhan --- .../rocketmq/tieredstore/metadata/DefaultMetadataStore.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/DefaultMetadataStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/DefaultMetadataStore.java index 630276a97f6..09500bf6da8 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/DefaultMetadataStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/DefaultMetadataStore.java @@ -164,7 +164,10 @@ public QueueMetadata getQueue(MessageQueue mq) { @Override public void iterateQueue(String topic, Consumer callback) { - queueMetadataTable.get(topic).values().forEach(callback); + ConcurrentMap metadataConcurrentMap = queueMetadataTable.get(topic); + if (metadataConcurrentMap != null) { + metadataConcurrentMap.values().forEach(callback); + } } @Override From 89c1509014b15c2ce91161356cf09a18c945a926 Mon Sep 17 00:00:00 2001 From: dingshuangxi888 Date: Thu, 9 May 2024 11:35:04 +0800 Subject: [PATCH 13/26] [ISSUE #8046] Fix authentication context build for no extFields (#8102) --- .../builder/DefaultAuthenticationContextBuilder.java | 6 +++--- .../builder/DefaultAuthorizationContextBuilder.java | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authentication/builder/DefaultAuthenticationContextBuilder.java b/auth/src/main/java/org/apache/rocketmq/auth/authentication/builder/DefaultAuthenticationContextBuilder.java index 6b178b96573..c1e970fa6eb 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authentication/builder/DefaultAuthenticationContextBuilder.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authentication/builder/DefaultAuthenticationContextBuilder.java @@ -98,12 +98,12 @@ public DefaultAuthenticationContext build(Metadata metadata, GeneratedMessageV3 @Override public DefaultAuthenticationContext build(ChannelHandlerContext context, RemotingCommand request) { HashMap fields = request.getExtFields(); - if (MapUtils.isEmpty(fields)) { - throw new AuthenticationException("authentication field is null."); - } DefaultAuthenticationContext result = new DefaultAuthenticationContext(); result.setChannelId(context.channel().id().asLongText()); result.setRpcCode(String.valueOf(request.getCode())); + if (MapUtils.isEmpty(fields)) { + return result; + } if (!fields.containsKey(SessionCredentials.ACCESS_KEY)) { return result; } diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java b/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java index daa039162b4..d6d1556ca20 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java @@ -40,6 +40,7 @@ import java.util.HashMap; import java.util.List; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.common.AclException; @@ -160,6 +161,9 @@ public List build(ChannelHandlerContext context, Re List result = new ArrayList<>(); try { HashMap fields = command.getExtFields(); + if (MapUtils.isEmpty(fields)) { + return result; + } Subject subject = null; if (fields.containsKey(SessionCredentials.ACCESS_KEY)) { subject = User.of(fields.get(SessionCredentials.ACCESS_KEY)); From e5b357c3dfd9066c6bd363e44533371d195e26e6 Mon Sep 17 00:00:00 2001 From: cserwen Date: Fri, 10 May 2024 11:10:42 +0800 Subject: [PATCH 14/26] [ISSUE #5838] retry to send when broker returns SYSTEM_BUSY (#5845) --- .../org/apache/rocketmq/client/producer/DefaultMQProducer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 0abf925a82a..b350ba074db 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -72,6 +72,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ResponseCode.TOPIC_NOT_EXIST, ResponseCode.SERVICE_NOT_AVAILABLE, ResponseCode.SYSTEM_ERROR, + ResponseCode.SYSTEM_BUSY, ResponseCode.NO_PERMISSION, ResponseCode.NO_BUYER_ID, ResponseCode.NOT_IN_CURRENT_UNIT From 159a603219e363e5f991a0c85213f6dc13f0a9be Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Fri, 10 May 2024 14:16:07 +0800 Subject: [PATCH 15/26] [ISSUE #5923] Fix tiered store README.md error (#8115) --- tieredstore/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tieredstore/README.md b/tieredstore/README.md index d420494461a..baeb56acc36 100644 --- a/tieredstore/README.md +++ b/tieredstore/README.md @@ -12,7 +12,7 @@ This article is a cookbook for RocketMQ tiered storage. Use the following steps to easily use tiered storage -1. Change `messageStorePlugIn` to `org.apache.rocketmq.tieredstore.MessageStoreExtend` in your `broker.conf`. +1. Change `messageStorePlugIn` to `org.apache.rocketmq.tieredstore.TieredMessageStore` in your `broker.conf`. 2. Configure your backend service provider. change `tieredBackendServiceProvider` to your storage medium implement. We give a default implement: POSIX provider, and you need to change `tieredStoreFilePath` to the mount point of storage medium for tiered storage. 3. Start the broker and enjoy! @@ -22,7 +22,7 @@ The following are some core configurations, for more details, see [TieredMessage | Configuration | Default value | Unit | Function | | ------------------------------- |---------------------------------------------------------------| ----------- | ------------------------------------------------------------------------------- | -| messageStorePlugIn | | | Set to org.apache.rocketmq.tieredstore.MessageStoreExtend to use tiered storage | +| messageStorePlugIn | | | Set to org.apache.rocketmq.tieredstore.TieredMessageStore to use tiered storage | | tieredMetadataServiceProvider | org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore | | Select your metadata provider | | tieredBackendServiceProvider | org.apache.rocketmq.tieredstore.provider.PosixFileSegment | | Select your backend service provider | | tieredStoreFilePath | | | Select the directory using for tiered storage, only for POSIX provider. | From 609196890765b89116e12745279d287b91eae2ee Mon Sep 17 00:00:00 2001 From: Colin Lee Date: Mon, 13 May 2024 17:29:51 +0800 Subject: [PATCH 16/26] [ISSUE #8124] Avoid scheduled tasks exiting because of unknown exceptions Signed-off-by: Colin Lee Co-authored-by: lirui --- .../rocketmq/broker/schedule/ScheduleMessageService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java index ef7e4f67894..e13b36df910 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java @@ -224,7 +224,7 @@ public boolean load() { result = result && this.correctDelayOffset(); return result; } - + public boolean loadWhenSyncDelayOffset() { boolean result = super.load(); result = result && this.parseDelayLevel(); @@ -377,7 +377,7 @@ public void run() { if (isStarted()) { this.executeOnTimeUp(); } - } catch (Exception e) { + } catch (Throwable e) { // XXX: warn and notify me log.error("ScheduleMessageService, executeOnTimeUp exception", e); this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD); From d943456bcaa8317b8d3ae55f9a1117a5bf8478ed Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan Date: Tue, 14 May 2024 10:40:42 +0800 Subject: [PATCH 17/26] Add unit test for MQClientAPIExtTest (#8080) --- .../impl/mqclient/MQClientAPIExtTest.java | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java new file mode 100644 index 00000000000..752bc98eabd --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.impl.mqclient; + +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.utils.FutureUtils; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; + +@RunWith(MockitoJUnitRunner.class) +public class MQClientAPIExtTest { + MQClientAPIExt mqClientAPIExt; + @Mock + NettyRemotingClient remotingClientMock; + + @Before + public void before() { + mqClientAPIExt = Mockito.spy(new MQClientAPIExt(new ClientConfig(), new NettyClientConfig(), null, null)); + Mockito.when(mqClientAPIExt.getRemotingClient()).thenReturn(remotingClientMock); + Mockito.when(remotingClientMock.invoke(anyString(), any(), anyLong())).thenReturn(FutureUtils.completeExceptionally(new RemotingTimeoutException("addr"))); + } + + @Test + public void sendMessageAsync() { + String topic = "test"; + Message msg = new Message(topic, "test".getBytes()); + SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setProducerGroup("test"); + requestHeader.setDefaultTopic("test"); + requestHeader.setDefaultTopicQueueNums(1); + requestHeader.setQueueId(0); + requestHeader.setSysFlag(0); + requestHeader.setBornTimestamp(0L); + requestHeader.setFlag(0); + requestHeader.setProperties("test"); + requestHeader.setReconsumeTimes(0); + requestHeader.setUnitMode(false); + requestHeader.setBatch(false); + CompletableFuture future = mqClientAPIExt.sendMessageAsync("127.0.0.1:10911", "test", msg, requestHeader, 10); + assertThatThrownBy(future::get).getCause().isInstanceOf(RemotingTimeoutException.class); + } +} \ No newline at end of file From 502e2d798e57fd4f40d16a90c44679af5ee7f986 Mon Sep 17 00:00:00 2001 From: hiyo <77013030+miles-ton@users.noreply.github.com> Date: Tue, 14 May 2024 14:45:29 +0800 Subject: [PATCH 18/26] [ISSUE #8118] Remove redundant mod in client (#8119) --- .../consumer/rebalance/AllocateMessageQueueAveragely.java | 2 +- .../apache/rocketmq/client/impl/factory/MQClientInstance.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java index 75e5d1c218b..6f63a6fc607 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java @@ -42,7 +42,7 @@ public List allocate(String consumerGroup, String currentCID, List int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { - result.add(mqAll.get((startIndex + i) % mqAll.size())); + result.add(mqAll.get(startIndex + i)); } return result; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 227f3346d0d..f964869ac24 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1222,8 +1222,7 @@ public String findBrokerAddrByTopic(final String topic) { if (topicRouteData != null) { List brokers = topicRouteData.getBrokerDatas(); if (!brokers.isEmpty()) { - int index = random.nextInt(brokers.size()); - BrokerData bd = brokers.get(index % brokers.size()); + BrokerData bd = brokers.get(random.nextInt(brokers.size())); return bd.selectBrokerAddr(); } } From 03b16aadce572b3908dc09b99347fcf7e6c6ac7c Mon Sep 17 00:00:00 2001 From: fujian-zfj <2573259572@qq.com> Date: Tue, 14 May 2024 14:47:37 +0800 Subject: [PATCH 19/26] [ISSUE #8061] Fix npe in netty remoting client (#8064) --- .../org/apache/rocketmq/remoting/netty/NettyRemotingClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index ede6005f541..1bc5e57db52 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -631,7 +631,7 @@ private Channel getAndCreateChannel(final String addr) throws InterruptedExcepti if (channelFuture == null) { return null; } - return getAndCreateChannelAsync(addr).awaitUninterruptibly().channel(); + return channelFuture.awaitUninterruptibly().channel(); } private ChannelFuture getAndCreateNameserverChannelAsync() throws InterruptedException { From 2fdafd232a45ffd4a72239782248e2d1a91d6abc Mon Sep 17 00:00:00 2001 From: hiyo <77013030+miles-ton@users.noreply.github.com> Date: Wed, 15 May 2024 10:01:21 +0800 Subject: [PATCH 20/26] [ISSUE #8136] Replace with createProcessQueue and remove createProcessQueue(topic) in client (#8139) --- .../apache/rocketmq/client/impl/consumer/RebalanceImpl.java | 4 +--- .../rocketmq/client/impl/consumer/RebalanceLitePullImpl.java | 3 --- .../rocketmq/client/impl/consumer/RebalancePullImpl.java | 4 ---- .../rocketmq/client/impl/consumer/RebalancePushImpl.java | 5 ----- 4 files changed, 1 insertion(+), 15 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 53addc5f50c..711df3a9f08 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -525,7 +525,7 @@ private boolean updateProcessQueueTableInRebalance(final String topic, final Set } this.removeDirtyOffset(mq); - ProcessQueue pq = createProcessQueue(topic); + ProcessQueue pq = createProcessQueue(); pq.setLocked(true); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { @@ -779,8 +779,6 @@ public boolean removeUnnecessaryPopMessageQueue(final MessageQueue mq, final Pop public abstract PopProcessQueue createPopProcessQueue(); - public abstract ProcessQueue createProcessQueue(String topicName); - public void removeProcessQueue(final MessageQueue mq) { ProcessQueue prev = this.processQueueTable.remove(mq); if (prev != null) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java index 335d89b7877..330772f22bb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java @@ -177,7 +177,4 @@ public PopProcessQueue createPopProcessQueue() { return null; } - public ProcessQueue createProcessQueue(String topicName) { - return createProcessQueue(); - } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java index 1b5f9766174..e0b682868ab 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java @@ -103,8 +103,4 @@ public PopProcessQueue createPopProcessQueue() { return null; } - public ProcessQueue createProcessQueue(String topicName) { - return createProcessQueue(); - } - } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index f28890d306f..59e087c0e04 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -288,11 +288,6 @@ public ProcessQueue createProcessQueue() { return new ProcessQueue(); } - @Override - public ProcessQueue createProcessQueue(String topicName) { - return createProcessQueue(); - } - @Override public PopProcessQueue createPopProcessQueue() { return new PopProcessQueue(); From 8150e13a29f9d5ea7bf8814960389837650164af Mon Sep 17 00:00:00 2001 From: Willhow <65004897+Willhow-Gao@users.noreply.github.com> Date: Thu, 16 May 2024 09:50:25 +0800 Subject: [PATCH 21/26] [ISSUE #8145] Optimize some code style in client module (#8146) --- .../ConsumeMessageOrderlyService.java | 8 ++++-- .../impl/consumer/RebalancePushImpl.java | 4 --- .../client/impl/factory/MQClientInstance.java | 6 ++-- .../impl/producer/DefaultMQProducerImpl.java | 28 +++++++++---------- .../latency/LatencyFaultToleranceImpl.java | 8 ++++++ .../client/trace/AsyncTraceDispatcher.java | 7 +++-- 6 files changed, 34 insertions(+), 27 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index 36d686048ce..3ca465da70d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -85,6 +85,7 @@ public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsu this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag)); } + @Override public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @@ -96,10 +97,11 @@ public void run() { log.error("scheduleAtFixedRate lockMQPeriodically exception", e); } } - }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); + }, 1000, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } } + @Override public void shutdown(long awaitTerminateMillis) { this.stopped = true; this.scheduledExecutorService.shutdown(); @@ -201,8 +203,8 @@ public void submitConsumeRequest( final List msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, - final boolean dispathToConsume) { - if (dispathToConsume) { + final boolean dispatchToConsume) { + if (dispatchToConsume) { ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index 59e087c0e04..fe2f19b2f9a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -140,10 +140,6 @@ public boolean clientRebalance(String topic) { return defaultMQPushConsumerImpl.getDefaultMQPushConsumer().isClientRebalance() || defaultMQPushConsumerImpl.isConsumeOrderly() || MessageModel.BROADCASTING.equals(messageModel); } - public boolean removeUnnecessaryPopMessageQueue(final MessageQueue mq, final PopProcessQueue pq) { - return true; - } - @Override public ConsumeType consumeType() { return ConsumeType.CONSUME_PASSIVELY; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index f964869ac24..1ff35a00d12 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -125,8 +125,8 @@ public class MQClientInstance { private final ConcurrentMap> brokerAddrTable = new ConcurrentHashMap<>(); private final ConcurrentMap> brokerVersionTable = new ConcurrentHashMap<>(); - private final Set brokerSupportV2HeartbeatSet = new HashSet(); - private final ConcurrentMap brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap(); + private final Set brokerSupportV2HeartbeatSet = new HashSet<>(); + private final ConcurrentMap brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap<>(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread")); private final ScheduledExecutorService fetchRemoteConfigExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override @@ -1161,7 +1161,7 @@ public FindBrokerResult findBrokerAddressInSubscribe( Entry entry = map.entrySet().iterator().next(); brokerAddr = entry.getValue(); slave = entry.getKey() != MixAll.MASTER_ID; - found = true; + found = brokerAddr != null; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 5b7bd2dc9dc..6268bcc0a17 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -570,8 +570,8 @@ public void run() { } class BackpressureSendCallBack implements SendCallback { - public boolean isSemaphoreAsyncSizeAquired = false; - public boolean isSemaphoreAsyncNumAquired = false; + public boolean isSemaphoreAsyncSizeAcquired = false; + public boolean isSemaphoreAsyncNumbAcquired = false; public int msgLen; private final SendCallback sendCallback; @@ -581,10 +581,10 @@ public BackpressureSendCallBack(final SendCallback sendCallback) { @Override public void onSuccess(SendResult sendResult) { - if (isSemaphoreAsyncSizeAquired) { + if (isSemaphoreAsyncSizeAcquired) { semaphoreAsyncSendSize.release(msgLen); } - if (isSemaphoreAsyncNumAquired) { + if (isSemaphoreAsyncNumbAcquired) { semaphoreAsyncSendNum.release(); } sendCallback.onSuccess(sendResult); @@ -592,10 +592,10 @@ public void onSuccess(SendResult sendResult) { @Override public void onException(Throwable e) { - if (isSemaphoreAsyncSizeAquired) { + if (isSemaphoreAsyncSizeAcquired) { semaphoreAsyncSendSize.release(msgLen); } - if (isSemaphoreAsyncNumAquired) { + if (isSemaphoreAsyncNumbAcquired) { semaphoreAsyncSendNum.release(); } sendCallback.onException(e); @@ -607,31 +607,31 @@ public void executeAsyncMessageSend(Runnable runnable, final Message msg, final throws MQClientException, InterruptedException { ExecutorService executor = this.getAsyncSenderExecutor(); boolean isEnableBackpressureForAsyncMode = this.getDefaultMQProducer().isEnableBackpressureForAsyncMode(); - boolean isSemaphoreAsyncNumAquired = false; - boolean isSemaphoreAsyncSizeAquired = false; + boolean isSemaphoreAsyncNumbAcquired = false; + boolean isSemaphoreAsyncSizeAcquired = false; int msgLen = msg.getBody() == null ? 1 : msg.getBody().length; try { if (isEnableBackpressureForAsyncMode) { long costTime = System.currentTimeMillis() - beginStartTime; - isSemaphoreAsyncNumAquired = timeout - costTime > 0 + isSemaphoreAsyncNumbAcquired = timeout - costTime > 0 && semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS); - if (!isSemaphoreAsyncNumAquired) { + if (!isSemaphoreAsyncNumbAcquired) { sendCallback.onException( new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout")); return; } costTime = System.currentTimeMillis() - beginStartTime; - isSemaphoreAsyncSizeAquired = timeout - costTime > 0 + isSemaphoreAsyncSizeAcquired = timeout - costTime > 0 && semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS); - if (!isSemaphoreAsyncSizeAquired) { + if (!isSemaphoreAsyncSizeAcquired) { sendCallback.onException( new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout")); return; } } - sendCallback.isSemaphoreAsyncSizeAquired = isSemaphoreAsyncSizeAquired; - sendCallback.isSemaphoreAsyncNumAquired = isSemaphoreAsyncNumAquired; + sendCallback.isSemaphoreAsyncSizeAcquired = isSemaphoreAsyncSizeAcquired; + sendCallback.isSemaphoreAsyncNumbAcquired = isSemaphoreAsyncNumbAcquired; sendCallback.msgLen = msgLen; executor.submit(runnable); } catch (RejectedExecutionException e) { diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java index f629fe44a87..db8bbd66ef2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java @@ -55,6 +55,7 @@ public LatencyFaultToleranceImpl(Resolver resolver, ServiceDetector serviceDetec this.serviceDetector = serviceDetector; } + @Override public void detectByOneRound() { for (Map.Entry item : this.faultItemTable.entrySet()) { FaultItem brokerItem = item.getValue(); @@ -77,6 +78,7 @@ public void detectByOneRound() { } } + @Override public void startDetector() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override @@ -92,6 +94,7 @@ public void run() { }, 3, 3, TimeUnit.SECONDS); } + @Override public void shutdown() { this.scheduledExecutorService.shutdown(); } @@ -128,6 +131,7 @@ public boolean isAvailable(final String name) { return true; } + @Override public boolean isReachable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { @@ -141,10 +145,12 @@ public void remove(final String name) { this.faultItemTable.remove(name); } + @Override public boolean isStartDetectorEnable() { return startDetectorEnable; } + @Override public void setStartDetectorEnable(boolean startDetectorEnable) { this.startDetectorEnable = startDetectorEnable; } @@ -177,10 +183,12 @@ public String toString() { '}'; } + @Override public void setDetectTimeout(final int detectTimeout) { this.detectTimeout = detectTimeout; } + @Override public void setDetectInterval(final int detectInterval) { this.detectInterval = detectInterval; } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index d44f22616f4..1fe19773a5a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -153,6 +153,7 @@ public void setNamespaceV2(String namespaceV2) { this.namespaceV2 = namespaceV2; } + @Override public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException { if (isStarted.compareAndSet(false, true)) { traceProducer.setNamesrvAddr(nameSrvAddr); @@ -330,7 +331,7 @@ class TraceDataSegment { private int currentMsgKeySize; private final String traceTopicName; private final String regionId; - private final List traceTransferBeanList = new ArrayList(); + private final List traceTransferBeanList = new ArrayList<>(); TraceDataSegment(String traceTopicName, String regionId) { this.traceTopicName = traceTopicName; @@ -345,7 +346,7 @@ public void addTraceTransferBean(TraceTransferBean traceTransferBean) { this.currentMsgKeySize = traceTransferBean.getTransKey().stream() .reduce(currentMsgKeySize, (acc, x) -> acc + x.length(), Integer::sum); if (currentMsgSize >= traceProducer.getMaxMessageSize() - 10 * 1000 || currentMsgKeySize >= MAX_MSG_KEY_SIZE) { - List dataToSend = new ArrayList(traceTransferBeanList); + List dataToSend = new ArrayList<>(traceTransferBeanList); AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend); traceExecutor.submit(asyncDataSendTask); this.clear(); @@ -356,7 +357,7 @@ public void sendAllData() { if (this.traceTransferBeanList.isEmpty()) { return; } - List dataToSend = new ArrayList(traceTransferBeanList); + List dataToSend = new ArrayList<>(traceTransferBeanList); AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend); traceExecutor.submit(asyncDataSendTask); From c00fac3c770c436717f9aad1d5c7bc6591c734b2 Mon Sep 17 00:00:00 2001 From: oopooa <41882826+oopooa@users.noreply.github.com> Date: Thu, 16 May 2024 09:56:09 +0800 Subject: [PATCH 22/26] [ISSUE #8148] Fix variable typo --- .../org/apache/rocketmq/broker/BrokerController.java | 2 +- .../rocketmq/store/stats/BrokerStatsManager.java | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index a9dcc0af1f8..76224db5cb5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -408,7 +408,7 @@ public BrokerController( this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig ); - this.brokerStatsManager.setProduerStateGetter(new BrokerStatsManager.StateGetter() { + this.brokerStatsManager.setProducerStateGetter(new BrokerStatsManager.StateGetter() { @Override public boolean online(String instanceId, String group, String topic) { if (getTopicConfigManager().getTopicConfigTable().containsKey(NamespaceUtil.wrapNamespace(instanceId, topic))) { diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index 489d7b4fbce..c165d333fd0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -142,7 +142,7 @@ public class BrokerStatsManager { private MomentStatsItemSet momentStatsItemSetFallTime; private final StatisticsManager accountStatManager = new StatisticsManager(); - private StateGetter produerStateGetter; + private StateGetter producerStateGetter; private StateGetter consumerStateGetter; private BrokerConfig brokerConfig; @@ -270,7 +270,7 @@ public boolean online(StatisticsItem item) { String kind = item.getStatKind(); if (ACCOUNT_SEND.equals(kind) || ACCOUNT_SEND_REJ.equals(kind)) { - return produerStateGetter.online(instanceId, group, topic); + return producerStateGetter.online(instanceId, group, topic); } else if (ACCOUNT_RCV.equals(kind) || ACCOUNT_SEND_BACK.equals(kind) || ACCOUNT_SEND_BACK_TO_DLQ.equals(kind) || ACCOUNT_REV_REJ.equals(kind)) { return consumerStateGetter.online(instanceId, group, topic); } @@ -296,12 +296,12 @@ public MomentStatsItemSet getMomentStatsItemSetFallTime() { return momentStatsItemSetFallTime; } - public StateGetter getProduerStateGetter() { - return produerStateGetter; + public StateGetter getProducerStateGetter() { + return producerStateGetter; } - public void setProduerStateGetter(StateGetter produerStateGetter) { - this.produerStateGetter = produerStateGetter; + public void setProducerStateGetter(StateGetter producerStateGetter) { + this.producerStateGetter = producerStateGetter; } public StateGetter getConsumerStateGetter() { From 9bdc9db9c46873f948fad2bdac5cc14aa1918f57 Mon Sep 17 00:00:00 2001 From: oopooa <41882826+oopooa@users.noreply.github.com> Date: Fri, 17 May 2024 08:35:23 +0800 Subject: [PATCH 23/26] [ISSUE #8155] Fix doc typo --- docs/cn/BrokerContainer.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/cn/BrokerContainer.md b/docs/cn/BrokerContainer.md index 236439284be..a4de9889f27 100644 --- a/docs/cn/BrokerContainer.md +++ b/docs/cn/BrokerContainer.md @@ -72,7 +72,7 @@ sh mqbrokercontainer -c broker-container.conf ``` mqbrokercontainer脚本路径为distribution/bin/mqbrokercontainer。 -## 运行时增加或较少Broker +## 运行时增加或减少Broker 当BrokerContainer进程启动后,也可以通过Admin命令来增加或减少Broker。 From 256217fb6283f8b4c535045682aaef6346be2a87 Mon Sep 17 00:00:00 2001 From: superdev42 <138118491+superdev42@users.noreply.github.com> Date: Mon, 20 May 2024 09:57:21 +0800 Subject: [PATCH 24/26] [ISSUE#8142] Show time of create topic and subScriptionGroup (#8143) * show time of create topic and subScriptionGroup * show time of create topic and subScriptionGroup again * show time of create topic and subScriptionGroup changed --------- Co-authored-by: wengxiaolong <22260113@zju.edu.cn> --- .../broker/processor/AdminBrokerProcessor.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 78a5ba92eed..a1a6f5bf6ce 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -455,6 +455,7 @@ public boolean rejectRequest() { private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + long startTime = System.currentTimeMillis(); final RemotingCommand response = RemotingCommand.createResponseCommand(null); final CreateTopicRequestHeader requestHeader = (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class); @@ -514,8 +515,10 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext LOGGER.error("Update / create topic failed for [{}]", request, e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(e.getMessage()); + return response; } - + long executionTime = System.currentTimeMillis() - startTime; + LOGGER.info("executionTime of create topic:{} is {} ms" , topic, executionTime); return response; } @@ -1450,6 +1453,7 @@ public void onException(Throwable e) { private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + long startTime = System.currentTimeMillis(); final RemotingCommand response = RemotingCommand.createResponseCommand(null); LOGGER.info("AdminBrokerProcessor#updateAndCreateSubscriptionGroup called by {}", @@ -1462,6 +1466,8 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c response.setCode(ResponseCode.SUCCESS); response.setRemark(null); + long executionTime = System.currentTimeMillis() - startTime; + LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms" ,config.getGroupName() ,executionTime); return response; } @@ -3154,7 +3160,7 @@ private boolean validateSlave(RemotingCommand response) { } private boolean validateBlackListConfigExist(Properties properties) { - for (String blackConfig:configBlackList) { + for (String blackConfig : configBlackList) { if (properties.containsKey(blackConfig)) { return true; } From 0ad0244fe504d9e20a15e83222826f1172bdc4b3 Mon Sep 17 00:00:00 2001 From: hiyo <77013030+miles-ton@users.noreply.github.com> Date: Mon, 20 May 2024 10:47:59 +0800 Subject: [PATCH 25/26] [ISSUE #8164] Log more accurate for the MQClientInstance#doRebalance (#8165) --- .../apache/rocketmq/client/impl/factory/MQClientInstance.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 1ff35a00d12..b4ebf692736 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1070,7 +1070,7 @@ public boolean doRebalance() { balanced = false; } } catch (Throwable e) { - log.error("doRebalance exception", e); + log.error("doRebalance for consumer group [{}] exception", entry.getKey(), e); } } } From 94bb64f73160e309438fb000719fff0619355dd4 Mon Sep 17 00:00:00 2001 From: mxsm Date: Tue, 21 May 2024 08:32:20 +0800 Subject: [PATCH 26/26] [ISSUE #8162]Optimize the logging printout for the ConfigManager#loadBak method (#8163) --- .../main/java/org/apache/rocketmq/common/ConfigManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java index 5e997596194..099f0d8d560 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -52,8 +52,8 @@ public boolean load() { private boolean loadBak() { String fileName = null; try { - fileName = this.configFilePath(); - String jsonString = MixAll.file2String(fileName + ".bak"); + fileName = this.configFilePath() + ".bak"; + String jsonString = MixAll.file2String(fileName); if (jsonString != null && jsonString.length() > 0) { this.decode(jsonString); log.info("load " + fileName + " OK");