Skip to content

Commit

Permalink
Merge branch 'develop' into fix_typo_v2
Browse files Browse the repository at this point in the history
# Conflicts:
#	client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
  • Loading branch information
hakusai22 committed May 22, 2024
2 parents fd11848 + 94bb64f commit e7d4545
Show file tree
Hide file tree
Showing 57 changed files with 480 additions and 116 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ java_library(
"@maven//:org_awaitility_awaitility",
"@maven//:org_openjdk_jmh_jmh_core",
"@maven//:org_openjdk_jmh_jmh_generator_annprocess",
"@maven//:org_mockito_mockito_junit_jupiter",
],
)
1 change: 1 addition & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ maven_install(
"com.alipay.sofa:jraft-core:1.3.14",
"com.alipay.sofa:hessian:3.3.6",
"io.netty:netty-tcnative-boringssl-static:2.0.48.Final",
"org.mockito:mockito-junit-jupiter:4.11.0",
],
fetch_sources = True,
repositories = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ public DefaultAuthenticationContext build(Metadata metadata, GeneratedMessageV3
@Override
public DefaultAuthenticationContext build(ChannelHandlerContext context, RemotingCommand request) {
HashMap<String, String> fields = request.getExtFields();
if (MapUtils.isEmpty(fields)) {
throw new AuthenticationException("authentication field is null.");
}
DefaultAuthenticationContext result = new DefaultAuthenticationContext();
result.setChannelId(context.channel().id().asLongText());
result.setRpcCode(String.valueOf(request.getCode()));
if (MapUtils.isEmpty(fields)) {
return result;
}
if (!fields.containsKey(SessionCredentials.ACCESS_KEY)) {
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,14 @@ private void handleException(Exception e, CompletableFuture<?> result) {
}

private AuthenticationMetadataProvider getAuthenticationMetadataProvider() {
if (authorizationMetadataProvider == null) {
if (authenticationMetadataProvider == null) {
throw new IllegalStateException("The authenticationMetadataProvider is not configured");
}
return authenticationMetadataProvider;
}

private AuthorizationMetadataProvider getAuthorizationMetadataProvider() {
if (authenticationMetadataProvider == null) {
if (authorizationMetadataProvider == null) {
throw new IllegalStateException("The authorizationMetadataProvider is not configured");
}
return authorizationMetadataProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected HandlerChain<DefaultAuthenticationContext, CompletableFuture<Void>> ne
.addNext(new DefaultAuthenticationHandler(this.authConfig, metadataService));
}

private void doAuditLog(DefaultAuthenticationContext context, Throwable ex) {
protected void doAuditLog(DefaultAuthenticationContext context, Throwable ex) {
if (StringUtils.isBlank(context.getUsername())) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.HashMap;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclException;
Expand Down Expand Up @@ -160,6 +161,9 @@ public List<DefaultAuthorizationContext> build(ChannelHandlerContext context, Re
List<DefaultAuthorizationContext> result = new ArrayList<>();
try {
HashMap<String, String> fields = command.getExtFields();
if (MapUtils.isEmpty(fields)) {
return result;
}
Subject subject = null;
if (fields.containsKey(SessionCredentials.ACCESS_KEY)) {
subject = User.of(fields.get(SessionCredentials.ACCESS_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected HandlerChain<DefaultAuthorizationContext, CompletableFuture<Void>> new
.addNext(new AclAuthorizationHandler(authConfig, metadataService));
}

private void doAuditLog(DefaultAuthorizationContext context, Throwable ex) {
protected void doAuditLog(DefaultAuthorizationContext context, Throwable ex) {
if (context.getSubject() == null) {
return;
}
Expand Down
4 changes: 4 additions & 0 deletions broker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ java_library(
GenTestRules(
name = "GeneratedTestRules",
test_files = glob(["src/test/java/**/*Test.java"]),
exclude_tests = [
# These tests are extremely slow and flaky, exclude them before they are properly fixed.
"src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest",
],
deps = [
":tests",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ public BrokerController(
this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
);

this.brokerStatsManager.setProduerStateGetter(new BrokerStatsManager.StateGetter() {
this.brokerStatsManager.setProducerStateGetter(new BrokerStatsManager.StateGetter() {
@Override
public boolean online(String instanceId, String group, String topic) {
if (getTopicConfigManager().getTopicConfigTable().containsKey(NamespaceUtil.wrapNamespace(instanceId, topic))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ public boolean rejectRequest() {

private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
long startTime = System.currentTimeMillis();
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateTopicRequestHeader requestHeader =
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
Expand Down Expand Up @@ -514,8 +515,10 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext
LOGGER.error("Update / create topic failed for [{}]", request, e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(e.getMessage());
return response;
}

long executionTime = System.currentTimeMillis() - startTime;
LOGGER.info("executionTime of create topic:{} is {} ms" , topic, executionTime);
return response;
}

Expand Down Expand Up @@ -1450,6 +1453,7 @@ public void onException(Throwable e) {

private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
long startTime = System.currentTimeMillis();
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

LOGGER.info("AdminBrokerProcessor#updateAndCreateSubscriptionGroup called by {}",
Expand All @@ -1462,6 +1466,8 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c

response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
long executionTime = System.currentTimeMillis() - startTime;
LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms" ,config.getGroupName() ,executionTime);
return response;
}

Expand Down Expand Up @@ -2907,7 +2913,7 @@ private RemotingCommand updateUser(ChannelHandlerContext ctx,
return this.brokerController.getAuthenticationMetadataManager().updateUser(old);
}).thenAccept(nil -> response.setCode(ResponseCode.SUCCESS))
.exceptionally(ex -> {
LOGGER.error("delete user {} error", requestHeader.getUsername(), ex);
LOGGER.error("update user {} error", requestHeader.getUsername(), ex);
return handleAuthException(response, ex);
})
.join();
Expand Down Expand Up @@ -3154,7 +3160,7 @@ private boolean validateSlave(RemotingCommand response) {
}

private boolean validateBlackListConfigExist(Properties properties) {
for (String blackConfig:configBlackList) {
for (String blackConfig : configBlackList) {
if (properties.containsKey(blackConfig)) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public boolean load() {
result = result && this.correctDelayOffset();
return result;
}

public boolean loadWhenSyncDelayOffset() {
boolean result = super.load();
result = result && this.parseDelayLevel();
Expand Down Expand Up @@ -377,7 +377,7 @@ public void run() {
if (isStarted()) {
this.executeOnTimeUp();
}
} catch (Exception e) {
} catch (Throwable e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeUp exception", e);
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);
Expand Down
3 changes: 2 additions & 1 deletion client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ java_library(
"@maven//:io_netty_netty_all",
"@maven//:io_opentracing_opentracing_api",
"@maven//:io_opentracing_opentracing_mock",
"@maven//:org_awaitility_awaitility",
"@maven//:org_awaitility_awaitility",
"@maven//:org_mockito_mockito_junit_jupiter",
],
resources = glob(["src/test/resources/certs/*.pem"]) + glob(["src/test/resources/certs/*.key"])
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public List<MessageQueue> allocate(String consumerGroup, String currentCID, List
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
result.add(mqAll.get(startIndex + i));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ private void processReplyMessage(MessageExt replyMsg) {
}
} else {
String bornHost = replyMsg.getBornHostString();
logger.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s , reply from host: %s",
correlationId, bornHost));
logger.warn("receive reply message, but not matched any request, CorrelationId: {} , reply from host: {}",
correlationId, bornHost);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClie
e);
}

throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
throw new MQClientException("Unknown why, Can not find Message Queue for this topic, " + topic, null);
}

public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ private PopResult processPopResponse(final String brokerName, final RemotingComm
index = sortMap.get(queueIdKey).indexOf(offset);
msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index);
if (msgQueueOffset != offset) {
log.warn("Queue offset[%d] of msg is strange, not equal to the stored in msg, %s",
log.warn("Queue offset[{}] of msg is strange, not equal to the stored in msg, {}",
msgQueueOffset, messageExt);
}
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
Expand All @@ -1181,7 +1181,7 @@ private PopResult processPopResponse(final String brokerName, final RemotingComm
index = sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset());
msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index);
if (msgQueueOffset != messageExt.getQueueOffset()) {
log.warn("Queue offset[%d] of msg is strange, not equal to the stored in msg, %s", msgQueueOffset, messageExt);
log.warn("Queue offset[{}] of msg is strange, not equal to the stored in msg, {}", msgQueueOffset, messageExt);
}
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, Strin
result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
result.setRemark(UtilAll.exceptionSimpleDesc(e));

log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
log.warn("consumeMessageDirectly exception: {} Group: {} Msgs: {} MQ: {}",
UtilAll.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
mq), e);
mq, e);
}

result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
Expand Down Expand Up @@ -410,11 +410,11 @@ public void run() {
}
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
UtilAll.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue), e);
messageQueue, e);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsu
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
}

@Override
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
Expand All @@ -96,10 +97,11 @@ public void run() {
log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
}
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}, 1000, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}

@Override
public void shutdown(long awaitTerminateMillis) {
this.stopped = true;
this.scheduledExecutorService.shutdown();
Expand Down Expand Up @@ -181,11 +183,11 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, Strin
result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
result.setRemark(UtilAll.exceptionSimpleDesc(e));

log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
log.warn("consumeMessageDirectly exception: {} Group: {} Msgs: {} MQ: {}",
UtilAll.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
mq), e);
mq, e);
}

result.setAutoCommit(context.isAutoCommit());
Expand Down Expand Up @@ -497,11 +499,11 @@ public void run() {

status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
UtilAll.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue), e);
messageQueue, e);
hasException = true;
} finally {
this.processQueue.getConsumeLock().readLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, Strin
result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
result.setRemark(UtilAll.exceptionSimpleDesc(e));

log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
log.warn("consumeMessageDirectly exception: {} Group: {} Msgs: {} MQ: {}",
UtilAll.exceptionSimpleDesc(e),
ConsumeMessagePopConcurrentlyService.this.consumerGroup,
msgs,
mq), e);
mq, e);
}

result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, Strin
result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
result.setRemark(UtilAll.exceptionSimpleDesc(e));

log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
log.warn("consumeMessageDirectly exception: {} Group: {} Msgs: {} MQ: {}",
UtilAll.exceptionSimpleDesc(e),
ConsumeMessagePopOrderlyService.this.consumerGroup,
msgs,
mq), e);
mq, e);
}

result.setAutoCommit(context.isAutoCommit());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1237,18 +1237,16 @@ private boolean isSetEqual(Set<MessageQueue> set1, Set<MessageQueue> set2) {
return true;
}

if (set1 == null || set2 == null || set1.size() != set2.size() || set1.size() == 0) {
if (set1 == null || set2 == null || set1.size() != set2.size()) {
return false;
}

Iterator<MessageQueue> iter = set2.iterator();
boolean isEqual = true;
while (iter.hasNext()) {
if (!set1.contains(iter.next())) {
isEqual = false;
for (MessageQueue messageQueue : set2) {
if (!set1.contains(messageQueue)) {
return false;
}
}
return isEqual;
return true;
}

public AssignedMessageQueue getAssignedMessageQueue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ private boolean updateProcessQueueTableInRebalance(final String topic, final Set
}

this.removeDirtyOffset(mq);
ProcessQueue pq = createProcessQueue(topic);
ProcessQueue pq = createProcessQueue();
pq.setLocked(true);
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
Expand Down Expand Up @@ -779,8 +779,6 @@ public boolean removeUnnecessaryPopMessageQueue(final MessageQueue mq, final Pop

public abstract PopProcessQueue createPopProcessQueue();

public abstract ProcessQueue createProcessQueue(String topicName);

public void removeProcessQueue(final MessageQueue mq) {
ProcessQueue prev = this.processQueueTable.remove(mq);
if (prev != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,4 @@ public PopProcessQueue createPopProcessQueue() {
return null;
}

public ProcessQueue createProcessQueue(String topicName) {
return createProcessQueue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,4 @@ public PopProcessQueue createPopProcessQueue() {
return null;
}

public ProcessQueue createProcessQueue(String topicName) {
return createProcessQueue();
}

}
Loading

0 comments on commit e7d4545

Please sign in to comment.