diff --git a/BUILD.bazel b/BUILD.bazel index 358527c3149..ba33a9e6123 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -44,5 +44,6 @@ java_library( "@maven//:org_awaitility_awaitility", "@maven//:org_openjdk_jmh_jmh_core", "@maven//:org_openjdk_jmh_jmh_generator_annprocess", + "@maven//:org_mockito_mockito_junit_jupiter", ], ) diff --git a/WORKSPACE b/WORKSPACE index 8230edef5c0..e1f7743302a 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -111,6 +111,7 @@ maven_install( "com.alipay.sofa:jraft-core:1.3.14", "com.alipay.sofa:hessian:3.3.6", "io.netty:netty-tcnative-boringssl-static:2.0.48.Final", + "org.mockito:mockito-junit-jupiter:4.11.0", ], fetch_sources = True, repositories = [ diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authentication/builder/DefaultAuthenticationContextBuilder.java b/auth/src/main/java/org/apache/rocketmq/auth/authentication/builder/DefaultAuthenticationContextBuilder.java index 6b178b96573..c1e970fa6eb 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authentication/builder/DefaultAuthenticationContextBuilder.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authentication/builder/DefaultAuthenticationContextBuilder.java @@ -98,12 +98,12 @@ public DefaultAuthenticationContext build(Metadata metadata, GeneratedMessageV3 @Override public DefaultAuthenticationContext build(ChannelHandlerContext context, RemotingCommand request) { HashMap fields = request.getExtFields(); - if (MapUtils.isEmpty(fields)) { - throw new AuthenticationException("authentication field is null."); - } DefaultAuthenticationContext result = new DefaultAuthenticationContext(); result.setChannelId(context.channel().id().asLongText()); result.setRpcCode(String.valueOf(request.getCode())); + if (MapUtils.isEmpty(fields)) { + return result; + } if (!fields.containsKey(SessionCredentials.ACCESS_KEY)) { return result; } diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authentication/manager/AuthenticationMetadataManagerImpl.java b/auth/src/main/java/org/apache/rocketmq/auth/authentication/manager/AuthenticationMetadataManagerImpl.java index 6eabe69f456..39620ca8d25 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authentication/manager/AuthenticationMetadataManagerImpl.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authentication/manager/AuthenticationMetadataManagerImpl.java @@ -207,14 +207,14 @@ private void handleException(Exception e, CompletableFuture result) { } private AuthenticationMetadataProvider getAuthenticationMetadataProvider() { - if (authorizationMetadataProvider == null) { + if (authenticationMetadataProvider == null) { throw new IllegalStateException("The authenticationMetadataProvider is not configured"); } return authenticationMetadataProvider; } private AuthorizationMetadataProvider getAuthorizationMetadataProvider() { - if (authenticationMetadataProvider == null) { + if (authorizationMetadataProvider == null) { throw new IllegalStateException("The authorizationMetadataProvider is not configured"); } return authorizationMetadataProvider; diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/DefaultAuthenticationProvider.java b/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/DefaultAuthenticationProvider.java index 482b02db030..98e7ede7ee3 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/DefaultAuthenticationProvider.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/DefaultAuthenticationProvider.java @@ -68,7 +68,7 @@ protected HandlerChain> ne .addNext(new DefaultAuthenticationHandler(this.authConfig, metadataService)); } - private void doAuditLog(DefaultAuthenticationContext context, Throwable ex) { + protected void doAuditLog(DefaultAuthenticationContext context, Throwable ex) { if (StringUtils.isBlank(context.getUsername())) { return; } diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java b/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java index daa039162b4..d6d1556ca20 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java @@ -40,6 +40,7 @@ import java.util.HashMap; import java.util.List; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.common.AclException; @@ -160,6 +161,9 @@ public List build(ChannelHandlerContext context, Re List result = new ArrayList<>(); try { HashMap fields = command.getExtFields(); + if (MapUtils.isEmpty(fields)) { + return result; + } Subject subject = null; if (fields.containsKey(SessionCredentials.ACCESS_KEY)) { subject = User.of(fields.get(SessionCredentials.ACCESS_KEY)); diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/DefaultAuthorizationProvider.java b/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/DefaultAuthorizationProvider.java index 15fb5a5b85b..75111030328 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/DefaultAuthorizationProvider.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/DefaultAuthorizationProvider.java @@ -78,7 +78,7 @@ protected HandlerChain> new .addNext(new AclAuthorizationHandler(authConfig, metadataService)); } - private void doAuditLog(DefaultAuthorizationContext context, Throwable ex) { + protected void doAuditLog(DefaultAuthorizationContext context, Throwable ex) { if (context.getSubject() == null) { return; } diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel index b2ee2549bcc..785b7657740 100644 --- a/broker/BUILD.bazel +++ b/broker/BUILD.bazel @@ -95,6 +95,10 @@ java_library( GenTestRules( name = "GeneratedTestRules", test_files = glob(["src/test/java/**/*Test.java"]), + exclude_tests = [ + # These tests are extremely slow and flaky, exclude them before they are properly fixed. + "src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest", + ], deps = [ ":tests", ], diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index a9dcc0af1f8..76224db5cb5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -408,7 +408,7 @@ public BrokerController( this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig ); - this.brokerStatsManager.setProduerStateGetter(new BrokerStatsManager.StateGetter() { + this.brokerStatsManager.setProducerStateGetter(new BrokerStatsManager.StateGetter() { @Override public boolean online(String instanceId, String group, String topic) { if (getTopicConfigManager().getTopicConfigTable().containsKey(NamespaceUtil.wrapNamespace(instanceId, topic))) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 362caf9ca68..a1a6f5bf6ce 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -455,6 +455,7 @@ public boolean rejectRequest() { private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + long startTime = System.currentTimeMillis(); final RemotingCommand response = RemotingCommand.createResponseCommand(null); final CreateTopicRequestHeader requestHeader = (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class); @@ -514,8 +515,10 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext LOGGER.error("Update / create topic failed for [{}]", request, e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(e.getMessage()); + return response; } - + long executionTime = System.currentTimeMillis() - startTime; + LOGGER.info("executionTime of create topic:{} is {} ms" , topic, executionTime); return response; } @@ -1450,6 +1453,7 @@ public void onException(Throwable e) { private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + long startTime = System.currentTimeMillis(); final RemotingCommand response = RemotingCommand.createResponseCommand(null); LOGGER.info("AdminBrokerProcessor#updateAndCreateSubscriptionGroup called by {}", @@ -1462,6 +1466,8 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c response.setCode(ResponseCode.SUCCESS); response.setRemark(null); + long executionTime = System.currentTimeMillis() - startTime; + LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms" ,config.getGroupName() ,executionTime); return response; } @@ -2907,7 +2913,7 @@ private RemotingCommand updateUser(ChannelHandlerContext ctx, return this.brokerController.getAuthenticationMetadataManager().updateUser(old); }).thenAccept(nil -> response.setCode(ResponseCode.SUCCESS)) .exceptionally(ex -> { - LOGGER.error("delete user {} error", requestHeader.getUsername(), ex); + LOGGER.error("update user {} error", requestHeader.getUsername(), ex); return handleAuthException(response, ex); }) .join(); @@ -3154,7 +3160,7 @@ private boolean validateSlave(RemotingCommand response) { } private boolean validateBlackListConfigExist(Properties properties) { - for (String blackConfig:configBlackList) { + for (String blackConfig : configBlackList) { if (properties.containsKey(blackConfig)) { return true; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java index ef7e4f67894..e13b36df910 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java @@ -224,7 +224,7 @@ public boolean load() { result = result && this.correctDelayOffset(); return result; } - + public boolean loadWhenSyncDelayOffset() { boolean result = super.load(); result = result && this.parseDelayLevel(); @@ -377,7 +377,7 @@ public void run() { if (isStarted()) { this.executeOnTimeUp(); } - } catch (Exception e) { + } catch (Throwable e) { // XXX: warn and notify me log.error("ScheduleMessageService, executeOnTimeUp exception", e); this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD); diff --git a/client/BUILD.bazel b/client/BUILD.bazel index e491cfcef0c..9b6fbc298c2 100644 --- a/client/BUILD.bazel +++ b/client/BUILD.bazel @@ -49,7 +49,8 @@ java_library( "@maven//:io_netty_netty_all", "@maven//:io_opentracing_opentracing_api", "@maven//:io_opentracing_opentracing_mock", - "@maven//:org_awaitility_awaitility", + "@maven//:org_awaitility_awaitility", + "@maven//:org_mockito_mockito_junit_jupiter", ], resources = glob(["src/test/resources/certs/*.pem"]) + glob(["src/test/resources/certs/*.key"]) ) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java index 75e5d1c218b..6f63a6fc607 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java @@ -42,7 +42,7 @@ public List allocate(String consumerGroup, String currentCID, List int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { - result.add(mqAll.get((startIndex + i) % mqAll.size())); + result.add(mqAll.get(startIndex + i)); } return result; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index 2f18c610c14..e46c651f928 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -288,8 +288,8 @@ private void processReplyMessage(MessageExt replyMsg) { } } else { String bornHost = replyMsg.getBornHostString(); - logger.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s , reply from host: %s", - correlationId, bornHost)); + logger.warn("receive reply message, but not matched any request, CorrelationId: {} , reply from host: {}", + correlationId, bornHost); } } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index b1d07b85f78..bcfe29bd4f6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -181,7 +181,7 @@ public Set fetchSubscribeMessageQueues(String topic) throws MQClie e); } - throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null); + throw new MQClientException("Unknown why, Can not find Message Queue for this topic, " + topic, null); } public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 701af5fcde0..082a36266da 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1168,7 +1168,7 @@ private PopResult processPopResponse(final String brokerName, final RemotingComm index = sortMap.get(queueIdKey).indexOf(offset); msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index); if (msgQueueOffset != offset) { - log.warn("Queue offset[%d] of msg is strange, not equal to the stored in msg, %s", + log.warn("Queue offset[{}] of msg is strange, not equal to the stored in msg, {}", msgQueueOffset, messageExt); } messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, @@ -1181,7 +1181,7 @@ private PopResult processPopResponse(final String brokerName, final RemotingComm index = sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset()); msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index); if (msgQueueOffset != messageExt.getQueueOffset()) { - log.warn("Queue offset[%d] of msg is strange, not equal to the stored in msg, %s", msgQueueOffset, messageExt); + log.warn("Queue offset[{}] of msg is strange, not equal to the stored in msg, {}", msgQueueOffset, messageExt); } messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index ea6c8072b57..b151fefbbb3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -169,11 +169,11 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, Strin result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); result.setRemark(UtilAll.exceptionSimpleDesc(e)); - log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", + log.warn("consumeMessageDirectly exception: {} Group: {} Msgs: {} MQ: {}", UtilAll.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, - mq), e); + mq, e); } result.setSpentTimeMills(System.currentTimeMillis() - beginTime); @@ -410,11 +410,11 @@ public void run() { } status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { - log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s", + log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", UtilAll.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, - messageQueue), e); + messageQueue, e); hasException = true; } long consumeRT = System.currentTimeMillis() - beginTimestamp; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index 627bca1d232..3ca465da70d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -85,6 +85,7 @@ public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsu this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag)); } + @Override public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @@ -96,10 +97,11 @@ public void run() { log.error("scheduleAtFixedRate lockMQPeriodically exception", e); } } - }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); + }, 1000, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } } + @Override public void shutdown(long awaitTerminateMillis) { this.stopped = true; this.scheduledExecutorService.shutdown(); @@ -181,11 +183,11 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, Strin result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); result.setRemark(UtilAll.exceptionSimpleDesc(e)); - log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", + log.warn("consumeMessageDirectly exception: {} Group: {} Msgs: {} MQ: {}", UtilAll.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, - mq), e); + mq, e); } result.setAutoCommit(context.isAutoCommit()); @@ -497,11 +499,11 @@ public void run() { status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { - log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s", + log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", UtilAll.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, - messageQueue), e); + messageQueue, e); hasException = true; } finally { this.processQueue.getConsumeLock().readLock().unlock(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java index 9d7b5069ae2..891409492a8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java @@ -153,11 +153,11 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, Strin result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); result.setRemark(UtilAll.exceptionSimpleDesc(e)); - log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", + log.warn("consumeMessageDirectly exception: {} Group: {} Msgs: {} MQ: {}", UtilAll.exceptionSimpleDesc(e), ConsumeMessagePopConcurrentlyService.this.consumerGroup, msgs, - mq), e); + mq, e); } result.setSpentTimeMills(System.currentTimeMillis() - beginTime); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java index 5dc5b060b3a..a0903de5fe5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java @@ -175,11 +175,11 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, Strin result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); result.setRemark(UtilAll.exceptionSimpleDesc(e)); - log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", + log.warn("consumeMessageDirectly exception: {} Group: {} Msgs: {} MQ: {}", UtilAll.exceptionSimpleDesc(e), ConsumeMessagePopOrderlyService.this.consumerGroup, msgs, - mq), e); + mq, e); } result.setAutoCommit(context.isAutoCommit()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 78dafd0ff72..a3276cd7823 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -1237,18 +1237,16 @@ private boolean isSetEqual(Set set1, Set set2) { return true; } - if (set1 == null || set2 == null || set1.size() != set2.size() || set1.size() == 0) { + if (set1 == null || set2 == null || set1.size() != set2.size()) { return false; } - Iterator iter = set2.iterator(); - boolean isEqual = true; - while (iter.hasNext()) { - if (!set1.contains(iter.next())) { - isEqual = false; + for (MessageQueue messageQueue : set2) { + if (!set1.contains(messageQueue)) { + return false; } } - return isEqual; + return true; } public AssignedMessageQueue getAssignedMessageQueue() { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 05e672c4e07..414fa236158 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -525,7 +525,7 @@ private boolean updateProcessQueueTableInRebalance(final String topic, final Set } this.removeDirtyOffset(mq); - ProcessQueue pq = createProcessQueue(topic); + ProcessQueue pq = createProcessQueue(); pq.setLocked(true); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { @@ -779,8 +779,6 @@ public boolean removeUnnecessaryPopMessageQueue(final MessageQueue mq, final Pop public abstract PopProcessQueue createPopProcessQueue(); - public abstract ProcessQueue createProcessQueue(String topicName); - public void removeProcessQueue(final MessageQueue mq) { ProcessQueue prev = this.processQueueTable.remove(mq); if (prev != null) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java index 335d89b7877..330772f22bb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java @@ -177,7 +177,4 @@ public PopProcessQueue createPopProcessQueue() { return null; } - public ProcessQueue createProcessQueue(String topicName) { - return createProcessQueue(); - } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java index 1b5f9766174..e0b682868ab 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java @@ -103,8 +103,4 @@ public PopProcessQueue createPopProcessQueue() { return null; } - public ProcessQueue createProcessQueue(String topicName) { - return createProcessQueue(); - } - } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index f28890d306f..fe2f19b2f9a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -140,10 +140,6 @@ public boolean clientRebalance(String topic) { return defaultMQPushConsumerImpl.getDefaultMQPushConsumer().isClientRebalance() || defaultMQPushConsumerImpl.isConsumeOrderly() || MessageModel.BROADCASTING.equals(messageModel); } - public boolean removeUnnecessaryPopMessageQueue(final MessageQueue mq, final PopProcessQueue pq) { - return true; - } - @Override public ConsumeType consumeType() { return ConsumeType.CONSUME_PASSIVELY; @@ -288,11 +284,6 @@ public ProcessQueue createProcessQueue() { return new ProcessQueue(); } - @Override - public ProcessQueue createProcessQueue(String topicName) { - return createProcessQueue(); - } - @Override public PopProcessQueue createPopProcessQueue() { return new PopProcessQueue(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 227f3346d0d..b4ebf692736 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -125,8 +125,8 @@ public class MQClientInstance { private final ConcurrentMap> brokerAddrTable = new ConcurrentHashMap<>(); private final ConcurrentMap> brokerVersionTable = new ConcurrentHashMap<>(); - private final Set brokerSupportV2HeartbeatSet = new HashSet(); - private final ConcurrentMap brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap(); + private final Set brokerSupportV2HeartbeatSet = new HashSet<>(); + private final ConcurrentMap brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap<>(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread")); private final ScheduledExecutorService fetchRemoteConfigExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override @@ -1070,7 +1070,7 @@ public boolean doRebalance() { balanced = false; } } catch (Throwable e) { - log.error("doRebalance exception", e); + log.error("doRebalance for consumer group [{}] exception", entry.getKey(), e); } } } @@ -1161,7 +1161,7 @@ public FindBrokerResult findBrokerAddressInSubscribe( Entry entry = map.entrySet().iterator().next(); brokerAddr = entry.getValue(); slave = entry.getKey() != MixAll.MASTER_ID; - found = true; + found = brokerAddr != null; } } @@ -1222,8 +1222,7 @@ public String findBrokerAddrByTopic(final String topic) { if (topicRouteData != null) { List brokers = topicRouteData.getBrokerDatas(); if (!brokers.isEmpty()) { - int index = random.nextInt(brokers.size()); - BrokerData bd = brokers.get(index % brokers.size()); + BrokerData bd = brokers.get(random.nextInt(brokers.size())); return bd.selectBrokerAddr(); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index ea039a82747..c9a41cc0fad 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -571,7 +571,7 @@ public void run() { class BackpressureSendCallBack implements SendCallback { public boolean isSemaphoreAsyncSizeAcquired = false; - public boolean isSemaphoreAsyncNumAcquired = false; + public boolean isSemaphoreAsyncNumbAcquired = false; public int msgLen; private final SendCallback sendCallback; @@ -584,7 +584,7 @@ public void onSuccess(SendResult sendResult) { if (isSemaphoreAsyncSizeAcquired) { semaphoreAsyncSendSize.release(msgLen); } - if (isSemaphoreAsyncNumAcquired) { + if (isSemaphoreAsyncNumbAcquired) { semaphoreAsyncSendNum.release(); } sendCallback.onSuccess(sendResult); @@ -595,7 +595,7 @@ public void onException(Throwable e) { if (isSemaphoreAsyncSizeAcquired) { semaphoreAsyncSendSize.release(msgLen); } - if (isSemaphoreAsyncNumAcquired) { + if (isSemaphoreAsyncNumbAcquired) { semaphoreAsyncSendNum.release(); } sendCallback.onException(e); @@ -607,16 +607,16 @@ public void executeAsyncMessageSend(Runnable runnable, final Message msg, final throws MQClientException, InterruptedException { ExecutorService executor = this.getAsyncSenderExecutor(); boolean isEnableBackpressureForAsyncMode = this.getDefaultMQProducer().isEnableBackpressureForAsyncMode(); - boolean isSemaphoreAsyncNumAcquired = false; + boolean isSemaphoreAsyncNumbAcquired = false; boolean isSemaphoreAsyncSizeAcquired = false; int msgLen = msg.getBody() == null ? 1 : msg.getBody().length; try { if (isEnableBackpressureForAsyncMode) { long costTime = System.currentTimeMillis() - beginStartTime; - isSemaphoreAsyncNumAcquired = timeout - costTime > 0 + isSemaphoreAsyncNumbAcquired = timeout - costTime > 0 && semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS); - if (!isSemaphoreAsyncNumAcquired) { + if (!isSemaphoreAsyncNumbAcquired) { sendCallback.onException( new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout")); return; @@ -631,7 +631,7 @@ public void executeAsyncMessageSend(Runnable runnable, final Message msg, final } } sendCallback.isSemaphoreAsyncSizeAcquired = isSemaphoreAsyncSizeAcquired; - sendCallback.isSemaphoreAsyncNumAcquired = isSemaphoreAsyncNumAcquired; + sendCallback.isSemaphoreAsyncNumbAcquired = isSemaphoreAsyncNumbAcquired; sendCallback.msgLen = msgLen; executor.submit(runnable); } catch (RejectedExecutionException e) { @@ -762,7 +762,7 @@ private SendResult sendDefaultImpl( } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true); - log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); + log.warn("sendKernelImpl exception, resend at once, InvokeID: {}, RT: {}ms, Broker: {}", invokeID, endTimestamp - beginTimestampPrev, mq, e); log.warn(msg.toString()); exception = e; continue; @@ -775,7 +775,7 @@ private SendResult sendDefaultImpl( // Otherwise, isolate this broker. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, true); } - log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); + log.warn("sendKernelImpl exception, resend at once, InvokeID: {}, RT: {}ms, Broker: {}", invokeID, endTimestamp - beginTimestampPrev, mq, e); if (log.isDebugEnabled()) { log.debug(msg.toString()); } @@ -784,7 +784,7 @@ private SendResult sendDefaultImpl( } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, false); - log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); + log.warn("sendKernelImpl exception, resend at once, InvokeID: {}, RT: {}ms, Broker: {}", invokeID, endTimestamp - beginTimestampPrev, mq, e); if (log.isDebugEnabled()) { log.debug(msg.toString()); } @@ -801,7 +801,7 @@ private SendResult sendDefaultImpl( } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true); - log.warn("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e); + log.warn("sendKernelImpl exception, throw exception, InvokeID: {}, RT: {}ms, Broker: {}", invokeID, endTimestamp - beginTimestampPrev, mq, e); if (log.isDebugEnabled()) { log.debug(msg.toString()); } diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java index f629fe44a87..db8bbd66ef2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java @@ -55,6 +55,7 @@ public LatencyFaultToleranceImpl(Resolver resolver, ServiceDetector serviceDetec this.serviceDetector = serviceDetector; } + @Override public void detectByOneRound() { for (Map.Entry item : this.faultItemTable.entrySet()) { FaultItem brokerItem = item.getValue(); @@ -77,6 +78,7 @@ public void detectByOneRound() { } } + @Override public void startDetector() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override @@ -92,6 +94,7 @@ public void run() { }, 3, 3, TimeUnit.SECONDS); } + @Override public void shutdown() { this.scheduledExecutorService.shutdown(); } @@ -128,6 +131,7 @@ public boolean isAvailable(final String name) { return true; } + @Override public boolean isReachable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { @@ -141,10 +145,12 @@ public void remove(final String name) { this.faultItemTable.remove(name); } + @Override public boolean isStartDetectorEnable() { return startDetectorEnable; } + @Override public void setStartDetectorEnable(boolean startDetectorEnable) { this.startDetectorEnable = startDetectorEnable; } @@ -177,10 +183,12 @@ public String toString() { '}'; } + @Override public void setDetectTimeout(final int detectTimeout) { this.detectTimeout = detectTimeout; } + @Override public void setDetectInterval(final int detectInterval) { this.detectInterval = detectInterval; } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 9b7cf90c057..9dd7c79393e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -72,6 +72,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ResponseCode.TOPIC_NOT_EXIST, ResponseCode.SERVICE_NOT_AVAILABLE, ResponseCode.SYSTEM_ERROR, + ResponseCode.SYSTEM_BUSY, ResponseCode.NO_PERMISSION, ResponseCode.NO_BUYER_ID, ResponseCode.NOT_IN_CURRENT_UNIT diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index d44f22616f4..1fe19773a5a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -153,6 +153,7 @@ public void setNamespaceV2(String namespaceV2) { this.namespaceV2 = namespaceV2; } + @Override public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException { if (isStarted.compareAndSet(false, true)) { traceProducer.setNamesrvAddr(nameSrvAddr); @@ -330,7 +331,7 @@ class TraceDataSegment { private int currentMsgKeySize; private final String traceTopicName; private final String regionId; - private final List traceTransferBeanList = new ArrayList(); + private final List traceTransferBeanList = new ArrayList<>(); TraceDataSegment(String traceTopicName, String regionId) { this.traceTopicName = traceTopicName; @@ -345,7 +346,7 @@ public void addTraceTransferBean(TraceTransferBean traceTransferBean) { this.currentMsgKeySize = traceTransferBean.getTransKey().stream() .reduce(currentMsgKeySize, (acc, x) -> acc + x.length(), Integer::sum); if (currentMsgSize >= traceProducer.getMaxMessageSize() - 10 * 1000 || currentMsgKeySize >= MAX_MSG_KEY_SIZE) { - List dataToSend = new ArrayList(traceTransferBeanList); + List dataToSend = new ArrayList<>(traceTransferBeanList); AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend); traceExecutor.submit(asyncDataSendTask); this.clear(); @@ -356,7 +357,7 @@ public void sendAllData() { if (this.traceTransferBeanList.isEmpty()) { return; } - List dataToSend = new ArrayList(traceTransferBeanList); + List dataToSend = new ArrayList<>(traceTransferBeanList); AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend); traceExecutor.submit(asyncDataSendTask); diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index 24e39f56689..65237bc8f76 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -63,6 +63,8 @@ import org.mockito.Spy; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.quality.Strictness; +import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.stubbing.Answer; import static org.assertj.core.api.Assertions.assertThat; @@ -74,11 +76,13 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) +@MockitoSettings(strictness = Strictness.LENIENT) public class DefaultLitePullConsumerTest { @Spy private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); @@ -743,7 +747,7 @@ public PullResult answer(InvocationOnMock mock) throws Throwable { } }); - when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false)); + doAnswer(x -> new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString()); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index dc892a3548b..97d8d04e648 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -83,7 +83,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Matchers; +import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; @@ -387,7 +387,7 @@ public Object answer(InvocationOnMock mock) throws Throwable { callback.operationSucceed(responseFuture.getResponseCommand()); return null; } - }).when(remotingClient).invokeAsync(Matchers.anyString(), Matchers.any(RemotingCommand.class), Matchers.anyLong(), Matchers.any(InvokeCallback.class)); + }).when(remotingClient).invokeAsync(ArgumentMatchers.anyString(), ArgumentMatchers.any(RemotingCommand.class), ArgumentMatchers.anyLong(), ArgumentMatchers.any(InvokeCallback.class)); SendMessageContext sendMessageContext = new SendMessageContext(); sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer())); msg.getProperties().put("MSG_TYPE", "reply"); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImplTest.java new file mode 100644 index 00000000000..3986c497eb5 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImplTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.consumer; + +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashSet; +import java.util.Set; + + +public class DefaultLitePullConsumerImplTest { + private final DefaultLitePullConsumerImpl consumer = new DefaultLitePullConsumerImpl(new DefaultLitePullConsumer(), null); + + private static Method isSetEqualMethod; + + @BeforeClass + public static void initReflectionMethod() throws NoSuchMethodException { + Class consumerClass = DefaultLitePullConsumerImpl.class; + Method testMethod = consumerClass.getDeclaredMethod("isSetEqual", Set.class, Set.class); + testMethod.setAccessible(true); + isSetEqualMethod = testMethod; + } + + + /** + * The two empty sets should be equal + */ + @Test + public void testIsSetEqual1() throws InvocationTargetException, IllegalAccessException { + Set set1 = new HashSet<>(); + Set set2 = new HashSet<>(); + boolean equalResult = (boolean) isSetEqualMethod.invoke(consumer, set1, set2); + Assert.assertTrue(equalResult); + } + + + /** + * When a set has elements and one does not, the two sets are not equal + */ + @Test + public void testIsSetEqual2() throws InvocationTargetException, IllegalAccessException { + Set set1 = new HashSet<>(); + set1.add(new MessageQueue("testTopic","testBroker",111)); + Set set2 = new HashSet<>(); + boolean equalResult = (boolean) isSetEqualMethod.invoke(consumer, set1, set2); + Assert.assertFalse(equalResult); + } + + /** + * The two null sets should be equal + */ + @Test + public void testIsSetEqual3() throws InvocationTargetException, IllegalAccessException { + Set set1 = null; + Set set2 = null; + boolean equalResult = (boolean) isSetEqualMethod.invoke(consumer, set1, set2); + Assert.assertTrue(equalResult); + } + + @Test + public void testIsSetEqual4() throws InvocationTargetException, IllegalAccessException { + Set set1 = null; + Set set2 = new HashSet<>(); + boolean equalResult = (boolean) isSetEqualMethod.invoke(consumer, set1, set2); + Assert.assertFalse(equalResult); + } + + @Test + public void testIsSetEqual5() throws InvocationTargetException, IllegalAccessException { + Set set1 = new HashSet<>(); + set1.add(new MessageQueue("testTopic","testBroker",111)); + Set set2 = new HashSet<>(); + set2.add(new MessageQueue("testTopic","testBroker",111)); + boolean equalResult = (boolean) isSetEqualMethod.invoke(consumer, set1, set2); + Assert.assertTrue(equalResult); + } + +} diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java new file mode 100644 index 00000000000..752bc98eabd --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.impl.mqclient; + +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.utils.FutureUtils; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; + +@RunWith(MockitoJUnitRunner.class) +public class MQClientAPIExtTest { + MQClientAPIExt mqClientAPIExt; + @Mock + NettyRemotingClient remotingClientMock; + + @Before + public void before() { + mqClientAPIExt = Mockito.spy(new MQClientAPIExt(new ClientConfig(), new NettyClientConfig(), null, null)); + Mockito.when(mqClientAPIExt.getRemotingClient()).thenReturn(remotingClientMock); + Mockito.when(remotingClientMock.invoke(anyString(), any(), anyLong())).thenReturn(FutureUtils.completeExceptionally(new RemotingTimeoutException("addr"))); + } + + @Test + public void sendMessageAsync() { + String topic = "test"; + Message msg = new Message(topic, "test".getBytes()); + SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setProducerGroup("test"); + requestHeader.setDefaultTopic("test"); + requestHeader.setDefaultTopicQueueNums(1); + requestHeader.setQueueId(0); + requestHeader.setSysFlag(0); + requestHeader.setBornTimestamp(0L); + requestHeader.setFlag(0); + requestHeader.setProperties("test"); + requestHeader.setReconsumeTimes(0); + requestHeader.setUnitMode(false); + requestHeader.setBatch(false); + CompletableFuture future = mqClientAPIExt.sendMessageAsync("127.0.0.1:10911", "test", msg, requestHeader, 10); + assertThatThrownBy(future::get).getCause().isInstanceOf(RemotingTimeoutException.class); + } +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java index 5e997596194..099f0d8d560 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -52,8 +52,8 @@ public boolean load() { private boolean loadBak() { String fileName = null; try { - fileName = this.configFilePath(); - String jsonString = MixAll.file2String(fileName + ".bak"); + fileName = this.configFilePath() + ".bak"; + String jsonString = MixAll.file2String(fileName); if (jsonString != null && jsonString.length() > 0) { this.decode(jsonString); log.info("load " + fileName + " OK"); diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java index 9680acec74d..5e581a34eec 100644 --- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java +++ b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java @@ -50,7 +50,8 @@ public static TopicMessageType parseFromMessageProperty(Map mess return TopicMessageType.TRANSACTION; } else if (messageProperty.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null || messageProperty.get(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null - || messageProperty.get(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) { + || messageProperty.get(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null + || messageProperty.get(MessageConst.PROPERTY_TIMER_DELAY_MS) != null) { return TopicMessageType.DELAY; } else if (messageProperty.get(MessageConst.PROPERTY_SHARDING_KEY) != null) { return TopicMessageType.FIFO; diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java index d38281bf83a..71c796b283a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java @@ -55,10 +55,10 @@ public void run() { } public void printAtMinutes() { - log.info(String.format("[%s] [%s] Stats Every 5 Minutes, Value: %d", + log.info("[{}] [{}] Stats Every 5 Minutes, Value: {}", this.statsName, this.statsKey, - this.value.get())); + this.value.get()); } public AtomicLong getValue() { diff --git a/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java b/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java new file mode 100644 index 00000000000..67525ae8087 --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.attribute; + +import java.util.HashMap; +import java.util.Map; +import org.apache.rocketmq.common.message.MessageConst; +import org.junit.Assert; +import org.junit.Test; + +public class TopicMessageTypeTest { + @Test + public void testParseFromMessageProperty() { + Map properties = new HashMap<>(); + + // TRANSACTION + properties.put(MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); + Assert.assertEquals(TopicMessageType.TRANSACTION, TopicMessageType.parseFromMessageProperty(properties)); + + // DELAY + properties.clear(); + properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3"); + Assert.assertEquals(TopicMessageType.DELAY, TopicMessageType.parseFromMessageProperty(properties)); + + properties.clear(); + properties.put(MessageConst.PROPERTY_TIMER_DELIVER_MS, System.currentTimeMillis() + 10000 + ""); + Assert.assertEquals(TopicMessageType.DELAY, TopicMessageType.parseFromMessageProperty(properties)); + + properties.clear(); + properties.put(MessageConst.PROPERTY_TIMER_DELAY_SEC, 10 + ""); + Assert.assertEquals(TopicMessageType.DELAY, TopicMessageType.parseFromMessageProperty(properties)); + + properties.clear(); + properties.put(MessageConst.PROPERTY_TIMER_DELAY_MS, 10000 + ""); + Assert.assertEquals(TopicMessageType.DELAY, TopicMessageType.parseFromMessageProperty(properties)); + + // FIFO + properties.clear(); + properties.put(MessageConst.PROPERTY_SHARDING_KEY, "sharding_key"); + Assert.assertEquals(TopicMessageType.FIFO, TopicMessageType.parseFromMessageProperty(properties)); + + // NORMAL + properties.clear(); + Assert.assertEquals(TopicMessageType.NORMAL, TopicMessageType.parseFromMessageProperty(properties)); + } +} diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java index a032b7b6211..be487849ce5 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java @@ -569,7 +569,7 @@ public void handle(long term, MemberState.Role role) { break; } tryTimes++; - log.error(String.format("Controller leader append initial log failed, try %d times", tryTimes)); + log.error("Controller leader append initial log failed, try {} times", tryTimes); if (tryTimes % 3 == 0) { log.warn("Controller leader append initial log failed too many times, please wait a while"); } diff --git a/docs/cn/BrokerContainer.md b/docs/cn/BrokerContainer.md index 400c331fc8e..f55674349af 100644 --- a/docs/cn/BrokerContainer.md +++ b/docs/cn/BrokerContainer.md @@ -72,7 +72,7 @@ sh mqbrokercontainer -c broker-container.conf ``` mqbrokercontainer脚本路径为distribution/bin/mqbrokercontainer。 -## 运行时增加或较少Broker +## 运行时增加或减少Broker 当BrokerContainer进程启动后,也可以通过Admin命令来增加或减少Broker。 diff --git a/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java index bfbddf49135..25f8cfdecb8 100644 --- a/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java +++ b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java @@ -68,6 +68,7 @@ public void testRegister() { e.printStackTrace(); assertThat(Boolean.FALSE).isTrue(); } + FilterFactory.INSTANCE.unRegister("Nothing"); } @Test diff --git a/pom.xml b/pom.xml index 6307ae18fe4..a72cf473f3a 100644 --- a/pom.xml +++ b/pom.xml @@ -146,6 +146,7 @@ 4.13.2 3.22.0 3.10.0 + 4.11.0 2.0.9 4.1.0 0.30 @@ -840,6 +841,12 @@ ${mockito-core.version} test + + org.mockito + mockito-junit-jupiter + ${mockito-junit-jupiter.version} + test + org.awaitility awaitility @@ -1097,6 +1104,12 @@ ${mockito-core.version} test + + org.mockito + mockito-junit-jupiter + ${mockito-junit-jupiter.version} + test + org.awaitility awaitility diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java index 09ddacde1c4..2cdd92ba5be 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.broker.client.ProducerManager; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.proxy.service.client.ProxyClientRemotingProcessor; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; @@ -72,6 +73,10 @@ public class ProxyClientRemotingProcessorTest { @Test public void testTransactionCheck() throws Exception { + // Temporarily skip this test on the Mac system as it is flaky + if (MixAll.isMac()) { + return; + } CompletableFuture> proxyRelayResultFuture = new CompletableFuture<>(); when(proxyRelayService.processCheckTransactionState(any(), any(), any(), any())) .thenReturn(new RelayData<>( @@ -123,7 +128,7 @@ public void testTransactionCheck() throws Exception { } }); } - await().atMost(Duration.ofSeconds(1)).until(() -> count.get() == 100); + await().atMost(Duration.ofSeconds(3)).until(() -> count.get() == 100); verify(observer, times(2)).onNext(any()); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java index 25ae1509a95..a01c356f779 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java @@ -227,7 +227,7 @@ public void testRenewExceedMaxRenewTimes() { Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes())))) .thenReturn(ackResultFuture); - await().atMost(Duration.ofSeconds(1)).until(() -> { + await().atMost(Duration.ofSeconds(3)).until(() -> { receiptHandleManager.scheduleRenewTask(); try { ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index ede6005f541..1bc5e57db52 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -631,7 +631,7 @@ private Channel getAndCreateChannel(final String addr) throws InterruptedExcepti if (channelFuture == null) { return null; } - return getAndCreateChannelAsync(addr).awaitUninterruptibly().channel(); + return channelFuture.awaitUninterruptibly().channel(); } private ChannelFuture getAndCreateNameserverChannelAsync() throws InterruptedException { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SimpleSubscriptionData.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SimpleSubscriptionData.java index ec2b51e0b96..bb5c3074b44 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SimpleSubscriptionData.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SimpleSubscriptionData.java @@ -65,7 +65,8 @@ public void setVersion(long version) { this.version = version; } - @Override public boolean equals(Object o) { + @Override + public boolean equals(Object o) { if (this == o) { return true; } @@ -73,11 +74,12 @@ public void setVersion(long version) { return false; } SimpleSubscriptionData that = (SimpleSubscriptionData) o; - return version == that.version && Objects.equals(topic, that.topic); + return Objects.equals(topic, that.topic) && Objects.equals(expressionType, that.expressionType) && Objects.equals(expression, that.expression); } - @Override public int hashCode() { - return Objects.hash(topic, version); + @Override + public int hashCode() { + return Objects.hash(topic, expressionType, expression); } @Override public String toString() { diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/subscription/SimpleSubscriptionDataTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/subscription/SimpleSubscriptionDataTest.java new file mode 100644 index 00000000000..fa8e4ba6ae4 --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/subscription/SimpleSubscriptionDataTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol.subscription; + +import com.google.common.collect.Sets; +import java.util.Set; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class SimpleSubscriptionDataTest { + @Test + public void testNotEqual() { + String topic = "test-topic"; + String expressionType = "TAG"; + String expression1 = "test-expression-1"; + String expression2 = "test-expression-2"; + SimpleSubscriptionData simpleSubscriptionData1 = new SimpleSubscriptionData(topic, expressionType, expression1, 1); + SimpleSubscriptionData simpleSubscriptionData2 = new SimpleSubscriptionData(topic, expressionType, expression2, 1); + assertThat(simpleSubscriptionData1.equals(simpleSubscriptionData2)).isFalse(); + } + + @Test + public void testEqual() { + String topic = "test-topic"; + String expressionType = "TAG"; + String expression1 = "test-expression-1"; + String expression2 = "test-expression-1"; + SimpleSubscriptionData simpleSubscriptionData1 = new SimpleSubscriptionData(topic, expressionType, expression1, 1); + SimpleSubscriptionData simpleSubscriptionData2 = new SimpleSubscriptionData(topic, expressionType, expression2, 1); + assertThat(simpleSubscriptionData1.equals(simpleSubscriptionData2)).isTrue(); + } + + @Test + public void testSetNotEqual() { + String topic = "test-topic"; + String expressionType = "TAG"; + String expression1 = "test-expression-1"; + String expression2 = "test-expression-2"; + Set set1 = Sets.newHashSet(new SimpleSubscriptionData(topic, expressionType, expression1, 1)); + Set set2 = Sets.newHashSet(new SimpleSubscriptionData(topic, expressionType, expression2, 1)); + assertThat(set1.equals(set2)).isFalse(); + } + + @Test + public void testSetEqual() { + String topic = "test-topic"; + String expressionType = "TAG"; + String expression1 = "test-expression-1"; + String expression2 = "test-expression-1"; + Set set1 = Sets.newHashSet(new SimpleSubscriptionData(topic, expressionType, expression1, 1)); + Set set2 = Sets.newHashSet(new SimpleSubscriptionData(topic, expressionType, expression2, 1)); + assertThat(set1.equals(set2)).isTrue(); + } +} \ No newline at end of file diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index 489d7b4fbce..c165d333fd0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -142,7 +142,7 @@ public class BrokerStatsManager { private MomentStatsItemSet momentStatsItemSetFallTime; private final StatisticsManager accountStatManager = new StatisticsManager(); - private StateGetter produerStateGetter; + private StateGetter producerStateGetter; private StateGetter consumerStateGetter; private BrokerConfig brokerConfig; @@ -270,7 +270,7 @@ public boolean online(StatisticsItem item) { String kind = item.getStatKind(); if (ACCOUNT_SEND.equals(kind) || ACCOUNT_SEND_REJ.equals(kind)) { - return produerStateGetter.online(instanceId, group, topic); + return producerStateGetter.online(instanceId, group, topic); } else if (ACCOUNT_RCV.equals(kind) || ACCOUNT_SEND_BACK.equals(kind) || ACCOUNT_SEND_BACK_TO_DLQ.equals(kind) || ACCOUNT_REV_REJ.equals(kind)) { return consumerStateGetter.online(instanceId, group, topic); } @@ -296,12 +296,12 @@ public MomentStatsItemSet getMomentStatsItemSetFallTime() { return momentStatsItemSetFallTime; } - public StateGetter getProduerStateGetter() { - return produerStateGetter; + public StateGetter getProducerStateGetter() { + return producerStateGetter; } - public void setProduerStateGetter(StateGetter produerStateGetter) { - this.produerStateGetter = produerStateGetter; + public void setProducerStateGetter(StateGetter producerStateGetter) { + this.producerStateGetter = producerStateGetter; } public StateGetter getConsumerStateGetter() { diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java index a7046bca7da..67a781aacc2 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java @@ -58,7 +58,7 @@ public RMQPopConsumer(String nsAddr, String topic, String subExpression, @Override public void start() { client = ConsumerFactory.getRMQPopClient(); - log.info(String.format("consumer[%s] started!", consumerGroup)); + log.info("consumer[{}] started!", consumerGroup); } @Override diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java index d3d5de9e271..47a8db3c9a7 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java @@ -117,8 +117,7 @@ public static boolean createSub(String nameSrvAddr, String clusterName, String c for (String addr : masterSet) { try { mqAdminExt.createAndUpdateSubscriptionGroupConfig(addr, config); - log.info(String.format("create subscription group %s to %s success.\n", consumerId, - addr)); + log.info("create subscription group {} to {} success.", consumerId, addr); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000 * 1); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java index d2713919509..b64cda33420 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -287,8 +287,7 @@ public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, consumer.setDebug(); } mqClients.add(consumer); - log.info(String.format("consumer[%s] start,topic[%s],subExpression[%s]", consumerGroup, - topic, subExpression)); + log.info("consumer[{}] start,topic[{}],subExpression[{}]", consumerGroup, topic, subExpression); return consumer; } diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopSubCheckIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopSubCheckIT.java index 71b2bbaeb08..fd5ab108ecb 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopSubCheckIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopSubCheckIT.java @@ -64,7 +64,7 @@ public void tearDown() { @Test public void testNormalPopAck() throws Exception { String topic = initTopic(); - log.info(String.format("use topic: %s; group: %s !", topic, group)); + log.info("use topic: {}; group: {} !", topic, group); RMQNormalProducer producer = getProducer(NAMESRV_ADDR, topic); producer.getProducer().setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE); diff --git a/tieredstore/README.md b/tieredstore/README.md index 64014b94992..ccb3bbe0d13 100644 --- a/tieredstore/README.md +++ b/tieredstore/README.md @@ -12,8 +12,8 @@ This article is a cookbook for RocketMQ tiered storage. Use the following steps to easily use tiered storage -1. Change `messageStorePlugIn` to `org.apache.rocketmq.tieredstore.MessageStoreExtend` in your `broker.conf`. -2. Configure your backend service provider. change `tieredBackendServiceProvider` to your storage medium implement. We give a default implement: POSIX provider, and you need to change `tieredStoreFilepath` to the mount point of storage medium for tiered storage. +1. Change `messageStorePlugIn` to `org.apache.rocketmq.tieredstore.TieredMessageStore` in your `broker.conf`. +2. Configure your backend service provider. change `tieredBackendServiceProvider` to your storage medium implement. We give a default implement: POSIX provider, and you need to change `tieredStoreFilePath` to the mount point of storage medium for tiered storage. 3. Start the broker and enjoy! ## Configuration @@ -22,10 +22,10 @@ The following are some core configurations, for more details, see [TieredMessage | Configuration | Default value | Unit | Function | | ------------------------------- |---------------------------------------------------------------| ----------- | ------------------------------------------------------------------------------- | -| messageStorePlugIn | | | Set to org.apache.rocketmq.tieredstore.MessageStoreExtend to use tiered storage | +| messageStorePlugIn | | | Set to org.apache.rocketmq.tieredstore.TieredMessageStore to use tiered storage | | tieredMetadataServiceProvider | org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore | | Select your metadata provider | | tieredBackendServiceProvider | org.apache.rocketmq.tieredstore.provider.PosixFileSegment | | Select your backend service provider | -| tieredStoreFilepath | | | Select the directory using for tiered storage, only for POSIX provider. | +| tieredStoreFilePath | | | Select the directory using for tiered storage, only for POSIX provider. | | tieredStorageLevel | NOT_IN_DISK | | The options are DISABLE, NOT_IN_DISK, NOT_IN_MEM, FORCE | | tieredStoreFileReservedTime | 72 | hour | Default topic TTL in tiered storage | | tieredStoreGroupCommitCount | 2500 | | The number of messages that trigger one batch transfer | diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java index 8a319ed3899..6ac0939571f 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java @@ -60,4 +60,17 @@ public CompletableFuture getMinOffsetFromFileAsync() { return firstOffset.get(); }); } + + @Override + public void destroyExpiredFile(long expireTimestamp) { + long beforeOffset = this.getMinOffset(); + super.destroyExpiredFile(expireTimestamp); + long afterOffset = this.getMinOffset(); + + if (beforeOffset != afterOffset) { + log.info("CommitLog min cq offset reset, filePath={}, offset={}, expireTimestamp={}, change={}-{}", + filePath, firstOffset.get(), expireTimestamp, beforeOffset, afterOffset); + firstOffset.set(GET_OFFSET_ERROR); + } + } } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/DefaultMetadataStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/DefaultMetadataStore.java index 630276a97f6..09500bf6da8 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/DefaultMetadataStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/DefaultMetadataStore.java @@ -164,7 +164,10 @@ public QueueMetadata getQueue(MessageQueue mq) { @Override public void iterateQueue(String topic, Consumer callback) { - queueMetadataTable.get(topic).values().forEach(callback); + ConcurrentMap metadataConcurrentMap = queueMetadataTable.get(topic); + if (metadataConcurrentMap != null) { + metadataConcurrentMap.values().forEach(callback); + } } @Override diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java index 7e030d305eb..1e912690b2f 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java @@ -93,19 +93,33 @@ public void getMinOffsetFromFileAsyncTest() { for (int i = 6; i < 9; i++) { ByteBuffer byteBuffer = MessageFormatUtilTest.buildMockedMessageBuffer(); byteBuffer.putLong(MessageFormatUtil.QUEUE_OFFSET_POSITION, i); - Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, 1L)); + Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, i)); } Assert.assertEquals(-1L, flatFile.getMinOffsetFromFileAsync().join().longValue()); // append some messages for (int i = 9; i < 30; i++) { + if (i == 20) { + flatFile.commitAsync().join(); + flatFile.rollingNewFile(flatFile.getAppendOffset()); + } ByteBuffer byteBuffer = MessageFormatUtilTest.buildMockedMessageBuffer(); byteBuffer.putLong(MessageFormatUtil.QUEUE_OFFSET_POSITION, i); - Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, 1L)); + Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, i)); } flatFile.commitAsync().join(); Assert.assertEquals(6L, flatFile.getMinOffsetFromFile()); Assert.assertEquals(6L, flatFile.getMinOffsetFromFileAsync().join().longValue()); + + // recalculate min offset here + flatFile.destroyExpiredFile(20L); + Assert.assertEquals(20L, flatFile.getMinOffsetFromFile()); + Assert.assertEquals(20L, flatFile.getMinOffsetFromFileAsync().join().longValue()); + + // clean expired file again + flatFile.destroyExpiredFile(20L); + Assert.assertEquals(20L, flatFile.getMinOffsetFromFile()); + Assert.assertEquals(20L, flatFile.getMinOffsetFromFileAsync().join().longValue()); } } \ No newline at end of file diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java index 79647932dae..2a007af4e9d 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java @@ -46,7 +46,7 @@ public void init() { storeConfig = new MessageStoreConfig(); storeConfig.setStorePathRootDir(storePath); storeConfig.setTieredBackendServiceProvider(PosixFileSegment.class.getName()); - storeConfig.setBrokerName(storeConfig.getBrokerName()); + storeConfig.setBrokerName("brokerName"); metadataStore = new DefaultMetadataStore(storeConfig); }