diff --git a/.github/workflows/bazel.yml b/.github/workflows/bazel.yml index 7652b93048c..af674592bb9 100644 --- a/.github/workflows/bazel.yml +++ b/.github/workflows/bazel.yml @@ -19,4 +19,4 @@ jobs: - name: Build run: bazel build --config=remote //... - name: Run Tests - run: bazel test --config=remote --nocache_test_results //... \ No newline at end of file + run: bazel test --config=remote //... \ No newline at end of file diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 1afb9113eb4..e593a17c98e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -150,6 +150,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ private MessageListener messageListener; + /** + * Listener to call if message queue assignment is changed. + */ + private MessageQueueListener messageQueueListener; + /** * Offset Storage */ @@ -987,4 +992,12 @@ public boolean isClientRebalance() { public void setClientRebalance(boolean clientRebalance) { this.clientRebalance = clientRebalance; } + + public MessageQueueListener getMessageQueueListener() { + return messageQueueListener; + } + + public void setMessageQueueListener(MessageQueueListener messageQueueListener) { + this.messageQueueListener = messageQueueListener; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java index f4a8eda23a4..81e06ee4176 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java @@ -29,20 +29,22 @@ */ public interface MQConsumer extends MQAdmin { /** - * If consuming failure,message will be send back to the brokers,and delay consuming some time + * If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after + * interval specified in delay level. */ @Deprecated void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; /** - * If consuming failure,message will be send back to the broker,and delay consuming some time + * If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after + * interval specified in delay level. */ void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; /** - * Fetch message queues from consumer cache according to the topic + * Fetch message queues from consumer cache pertaining to the given topic. * * @param topic message topic * @return queue set diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java index 63795a6eeb0..74510f4c3ea 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java @@ -26,8 +26,7 @@ public interface MessageQueueListener { /** * @param topic message topic * @param mqAll all queues in this message topic - * @param mqDivided collection of queues,assigned to the current consumer + * @param mqAssigned collection of queues, assigned to the current consumer */ - void messageQueueChanged(final String topic, final Set mqAll, - final Set mqDivided); + void messageQueueChanged(final String topic, final Set mqAll, final Set mqAssigned); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index e57579321cb..cfb89b5c887 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -35,6 +35,7 @@ import org.apache.rocketmq.client.consumer.AckResult; import org.apache.rocketmq.client.consumer.AckStatus; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.PopCallback; import org.apache.rocketmq.client.consumer.PopResult; @@ -132,7 +133,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private long queueMaxSpanFlowControlTimes = 0; //10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h - private int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200}; + private final int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200}; private static final int MAX_POP_INVISIBLE_TIME = 300000; private static final int MIN_POP_INVISIBLE_TIME = 5000; @@ -1553,4 +1554,11 @@ public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenExcept int[] getPopDelayLevel() { return popDelayLevel; } + + public MessageQueueListener getMessageQueueListener() { + if (null == defaultMQPushConsumer) { + return null; + } + return defaultMQPushConsumer.getMessageQueueListener(); + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index df509f37161..f9cf429c69f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -20,6 +20,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.ReadOffsetType; import org.apache.rocketmq.client.exception.MQClientException; @@ -52,7 +53,7 @@ public RebalancePushImpl(String consumerGroup, MessageModel messageModel, @Override public void messageQueueChanged(String topic, Set mqAll, Set mqDivided) { - /** + /* * When rebalance result changed, should update subscription's version to notify broker. * Fix: inconsistency subscription may lead to consumer miss messages. */ @@ -82,6 +83,11 @@ public void messageQueueChanged(String topic, Set mqAll, Set()); setCmdVersion(cmd); + cmd.makeCustomHeaderToNet(); return cmd; } diff --git a/remoting/BUILD.bazel b/remoting/BUILD.bazel index db8b24301d0..072148bc08c 100644 --- a/remoting/BUILD.bazel +++ b/remoting/BUILD.bazel @@ -65,6 +65,7 @@ java_library( "@maven//:io_opentelemetry_opentelemetry_sdk_metrics", "@maven//:org_apache_tomcat_annotations_api", "@maven//:org_apache_commons_commons_lang3", + "@maven//:org_jetbrains_annotations", ], resources = glob(["src/test/resources/certs/*.pem"]) + glob(["src/test/resources/certs/*.key"]) ) diff --git a/store/BUILD.bazel b/store/BUILD.bazel index bf594aaa69a..4b046c68eb0 100644 --- a/store/BUILD.bazel +++ b/store/BUILD.bazel @@ -79,6 +79,7 @@ GenTestRules( "src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest", "src/test/java/org/apache/rocketmq/store/MappedFileQueueTest", "src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest", + "src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest", ], test_files = glob(["src/test/java/**/*Test.java"]), deps = [ diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java index b2c9b06589b..684b718ae5d 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.test.client.consumer.balance; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.test.base.BaseConf; @@ -112,4 +114,19 @@ public void test3ConsumerAndCrashOne() { consumer2.getListener().getAllUndupMsgBody()).size()); assertThat(balance).isEqualTo(true); } + + @Test + public void testMessageQueueListener() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + + RMQNormalConsumer consumer1 = getConsumer(NAMESRV_ADDR, topic, "*", new RMQNormalListener()); + // Register message queue listener + consumer1.getConsumer().setMessageQueueListener((topic, mqAll, mqAssigned) -> latch.countDown()); + + // Without message queue listener + RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListener()); + + Assert.assertTrue(latch.await(30, TimeUnit.SECONDS)); + } } diff --git a/tieredstore/BUILD.bazel b/tieredstore/BUILD.bazel index 8b6705ac283..e16fca90d07 100644 --- a/tieredstore/BUILD.bazel +++ b/tieredstore/BUILD.bazel @@ -41,6 +41,7 @@ java_library( "@maven//:org_apache_tomcat_annotations_api", "@maven//:com_alibaba_fastjson", "@maven//:org_apache_rocketmq_rocketmq_rocksdb", + "@maven//:commons_collections_commons_collections", ], ) diff --git a/tools/BUILD.bazel b/tools/BUILD.bazel index 9ccc115335d..05d88f7b002 100644 --- a/tools/BUILD.bazel +++ b/tools/BUILD.bazel @@ -39,6 +39,7 @@ java_library( "@maven//:commons_collections_commons_collections", "@maven//:io_github_aliyunmq_rocketmq_slf4j_api", "@maven//:io_github_aliyunmq_rocketmq_logback_classic", + "@maven//:org_apache_rocketmq_rocketmq_rocksdb", ], )