From 61037a817aaf9956f217f04ccf50a32bcac4e41a Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Tue, 9 Jan 2024 23:13:12 +1100 Subject: [PATCH 01/26] created SubscriptionPattern for new regex-subscription functionalities --- .../kafka/clients/consumer/Consumer.java | 4 +++ .../kafka/clients/consumer/KafkaConsumer.java | 12 ++++++++- .../kafka/clients/consumer/MockConsumer.java | 10 ++++++++ .../clients/consumer/SubscriptionPattern.java | 13 ++++++++++ .../internals/AsyncKafkaConsumer.java | 25 +++++++++++++++++++ .../internals/LegacyKafkaConsumer.java | 11 ++++++++ .../consumer/internals/SubscriptionState.java | 10 ++++++++ 7 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 4418501a39aad..82fea13ce9b81 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -72,6 +72,10 @@ public interface Consumer extends Closeable { */ void subscribe(Pattern pattern); + void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback); + + void subscribe(SubscriptionPattern pattern); + /** * @see KafkaConsumer#unsubscribe() */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 82e1f8b93da3c..247e33dcc5880 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -754,7 +754,17 @@ public void subscribe(Pattern pattern) { delegate.subscribe(pattern); } - /** + @Override + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { + + } + + @Override + public void subscribe(SubscriptionPattern pattern) { + + } + + /** * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}. * This also clears any partitions directly assigned through {@link #assign(Collection)}. * diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index ee6001ddf034e..47663db789ebd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -140,6 +140,16 @@ public synchronized void subscribe(Pattern pattern) { subscribe(pattern, Optional.empty()); } + @Override + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { + + } + + @Override + public void subscribe(SubscriptionPattern pattern) { + + } + @Override public void subscribe(Collection topics, final ConsumerRebalanceListener listener) { if (listener == null) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java new file mode 100644 index 0000000000000..d086fa8ab07cf --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java @@ -0,0 +1,13 @@ +package org.apache.kafka.clients.consumer; + +public class SubscriptionPattern { + final private String pattern; + + public SubscriptionPattern(final String pattern) { + this.pattern = pattern; + } + + public String pattern() { + return this.pattern; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index ad3e9131f1373..a5b4c80c09d8d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -36,6 +36,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; @@ -1660,6 +1661,16 @@ public void subscribe(Pattern pattern) { subscribeInternal(pattern, Optional.empty()); } + @Override + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { + + } + + @Override + public void subscribe(SubscriptionPattern pattern) { + + } + @Override public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { if (listener == null) @@ -1724,6 +1735,20 @@ private void subscribeInternal(Pattern pattern, Optional listener){ + acquireAndEnsureOpen(); + try { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.pattern().isEmpty()) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); + } finally { + release(); + } + } + private void subscribeInternal(Collection topics, Optional listener) { acquireAndEnsureOpen(); try { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java index bcc8cb40f2d91..81467568950dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java @@ -35,6 +35,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; @@ -494,6 +495,16 @@ public void subscribe(Pattern pattern) { subscribeInternal(pattern, Optional.empty()); } + @Override + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { + + } + + @Override + public void subscribe(SubscriptionPattern pattern) { + + } + /** * Internal helper method for {@link #subscribe(Pattern)} and * {@link #subscribe(Pattern, ConsumerRebalanceListener)} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 601726bd9dfd2..92ba1d2cf39dc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.PartitionStates; @@ -84,6 +85,8 @@ private enum SubscriptionType { /* the pattern user has requested */ private Pattern subscribedPattern; + private SubscriptionPattern subscriptionPattern; + /* the list of topics the user has requested */ private Set subscription; @@ -136,6 +139,7 @@ public SubscriptionState(LogContext logContext, OffsetResetStrategy defaultReset this.assignment = new PartitionStates<>(); this.groupSubscription = new HashSet<>(); this.subscribedPattern = null; + this.subscriptionPattern = null; this.subscriptionType = SubscriptionType.NONE; } @@ -174,6 +178,12 @@ public synchronized void subscribe(Pattern pattern, Optional listener) { + registerRebalanceListener(listener); + setSubscriptionType(SubscriptionType.AUTO_PATTERN); + this.subscriptionPattern = pattern; + } + public synchronized boolean subscribeFromPattern(Set topics) { if (subscriptionType != SubscriptionType.AUTO_PATTERN) throw new IllegalArgumentException("Attempt to subscribe from pattern while subscription type set to " + From 423b6a2fd283e65ff11970c4e25c86e8146bc1e1 Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Thu, 11 Jan 2024 23:04:34 +1100 Subject: [PATCH 02/26] added logic in HeartBeatRequestManager to send topic regex to broker --- .../consumer/internals/HeartbeatRequestManager.java | 8 +++++++- .../clients/consumer/internals/SubscriptionState.java | 4 ++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java index 11846eedeafdc..88820a65311d9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java @@ -535,7 +535,11 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } } else { // SubscribedTopicRegex - only sent if has changed since the last heartbeat - // - not supported yet + String subscriptionTopicRegex = subscriptions.subscriptionPattern().pattern(); + if(!subscriptionTopicRegex.equals(sentFields.subscribedTopicRegex)){ + data.setSubscribedTopicRegex(subscriptionTopicRegex); + sentFields.subscribedTopicRegex = subscriptionTopicRegex; + } } // ServerAssignor - only sent if has changed since the last heartbeat @@ -578,6 +582,7 @@ static class SentFields { private String instanceId = null; private int rebalanceTimeoutMs = -1; private TreeSet subscribedTopicNames = null; + private String subscribedTopicRegex = null; private String serverAssignor = null; private TreeSet topicPartitions = null; @@ -587,6 +592,7 @@ void reset() { instanceId = null; rebalanceTimeoutMs = -1; subscribedTopicNames = null; + subscribedTopicRegex = null; serverAssignor = null; topicPartitions = null; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 92ba1d2cf39dc..4aba300dc9fba 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -338,6 +338,10 @@ public synchronized Set subscription() { return Collections.emptySet(); } + public synchronized SubscriptionPattern subscriptionPattern() { + return this.subscriptionPattern; + } + public synchronized Set pausedPartitions() { return collectPartitions(TopicPartitionState::isPaused); } From a90da133a94f5b824168d12ae45e605d6daf54be Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Sat, 13 Jan 2024 22:26:56 +1100 Subject: [PATCH 03/26] added functions that use SubscriptionPattern to subscribe to topic --- .../clients/consumer/internals/AsyncKafkaConsumer.java | 8 ++++++-- .../consumer/internals/HeartbeatRequestManager.java | 8 +------- .../clients/consumer/internals/SubscriptionState.java | 1 + 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index a5b4c80c09d8d..2aa4eb3815e63 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1662,13 +1662,16 @@ public void subscribe(Pattern pattern) { } @Override - public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener listener) { + if (listener == null) + throw new IllegalArgumentException("RebalanceListener cannot be null"); + subscribeInternal(pattern, Optional.of(listener)); } @Override public void subscribe(SubscriptionPattern pattern) { - + subscribeInternal(pattern, Optional.empty()); } @Override @@ -1744,6 +1747,7 @@ private void subscribeInternal(SubscriptionPattern pattern, Optional subscribedTopicNames = null; - private String subscribedTopicRegex = null; private String serverAssignor = null; private TreeSet topicPartitions = null; @@ -592,7 +587,6 @@ void reset() { instanceId = null; rebalanceTimeoutMs = -1; subscribedTopicNames = null; - subscribedTopicRegex = null; serverAssignor = null; topicPartitions = null; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 4aba300dc9fba..d859721b5817d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -85,6 +85,7 @@ private enum SubscriptionType { /* the pattern user has requested */ private Pattern subscribedPattern; + /* we should rename this to something more specific */ private SubscriptionPattern subscriptionPattern; /* the list of topics the user has requested */ From 4df71e974bf819cc146f8e1fd2a323a486c7d129 Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Sun, 14 Jan 2024 00:11:36 +1100 Subject: [PATCH 04/26] fixed checkStyle errors in SubscriptionPattern --- .../kafka/clients/consumer/KafkaConsumer.java | 16 +++------- .../clients/consumer/SubscriptionPattern.java | 31 ++++++++++++++----- .../internals/AsyncKafkaConsumer.java | 2 +- .../internals/OffsetsRequestManagerTest.java | 1 - 4 files changed, 29 insertions(+), 21 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 247e33dcc5880..94b1f6962fa52 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -753,18 +753,12 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { public void subscribe(Pattern pattern) { delegate.subscribe(pattern); } + @Override + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {} + @Override + public void subscribe(SubscriptionPattern pattern) {} - @Override - public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { - - } - - @Override - public void subscribe(SubscriptionPattern pattern) { - - } - - /** + /** * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}. * This also clears any partitions directly assigned through {@link #assign(Collection)}. * diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java index d086fa8ab07cf..40c6170caebba 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java @@ -1,13 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.clients.consumer; public class SubscriptionPattern { - final private String pattern; + final private String pattern; + public SubscriptionPattern(final String pattern) { + this.pattern = pattern; + } - public SubscriptionPattern(final String pattern) { - this.pattern = pattern; - } - - public String pattern() { - return this.pattern; - } + public String pattern() { + return this.pattern; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 2aa4eb3815e63..45e88e7864f5e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1738,7 +1738,7 @@ private void subscribeInternal(Pattern pattern, Optional listener){ + private void subscribeInternal(SubscriptionPattern pattern, Optional listener) { acquireAndEnsureOpen(); try { maybeThrowInvalidGroupIdException(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java index 5ca034d636029..a6342e96f687d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java @@ -86,7 +86,6 @@ import static org.mockito.Mockito.when; public class OffsetsRequestManagerTest { - private OffsetsRequestManager requestManager; private ConsumerMetadata metadata; private SubscriptionState subscriptionState; From 819c5ec6d0b75f859f47700f3f9d61c575dde7ae Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Sun, 14 Jan 2024 18:34:06 +1100 Subject: [PATCH 05/26] Retrigger CI --- .../kafka/clients/consumer/internals/SubscriptionState.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index d859721b5817d..5c2d265428d91 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -85,7 +85,7 @@ private enum SubscriptionType { /* the pattern user has requested */ private Pattern subscribedPattern; - /* we should rename this to something more specific */ + /* we should rename this to something more specific. */ private SubscriptionPattern subscriptionPattern; /* the list of topics the user has requested */ From 8e8e6c69a04d82aeb359ed65d797a6fe6ef705fd Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Mon, 15 Jan 2024 12:33:47 +1100 Subject: [PATCH 06/26] Re-trigger tests --- .../kafka/clients/consumer/internals/SubscriptionState.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 5c2d265428d91..d859721b5817d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -85,7 +85,7 @@ private enum SubscriptionType { /* the pattern user has requested */ private Pattern subscribedPattern; - /* we should rename this to something more specific. */ + /* we should rename this to something more specific */ private SubscriptionPattern subscriptionPattern; /* the list of topics the user has requested */ From e6a1bd54150ce10fdcd94137a696a0642ed225a0 Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Fri, 2 Feb 2024 14:34:46 +0700 Subject: [PATCH 07/26] added SubscribedTopicRegex to ConsumerGroupdHeartbeatRequestData, refactored KafkaConsumer so that it will call the right function when SubscriptionPattern is used, added log to LegacyKafkaConsumer indicating it will not support SubscriptionPattern --- .../org/apache/kafka/clients/consumer/KafkaConsumer.java | 8 ++++++-- .../consumer/internals/HeartbeatRequestManager.java | 8 +++++++- .../clients/consumer/internals/LegacyKafkaConsumer.java | 4 ++-- .../clients/consumer/internals/SubscriptionState.java | 3 ++- .../common/message/ConsumerGroupHeartbeatRequest.json | 2 ++ clients/src/test/resources/log4j.properties | 2 +- core/src/test/resources/log4j.properties | 2 +- 7 files changed, 21 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 94b1f6962fa52..f508162d42b2e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -754,9 +754,13 @@ public void subscribe(Pattern pattern) { delegate.subscribe(pattern); } @Override - public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {} + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { + delegate.subscribe(pattern, callback); + } @Override - public void subscribe(SubscriptionPattern pattern) {} + public void subscribe(SubscriptionPattern pattern) { + delegate.subscribe(pattern); + } /** * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java index 11846eedeafdc..6984b63328ad0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; @@ -535,7 +536,11 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } } else { // SubscribedTopicRegex - only sent if has changed since the last heartbeat - // - not supported yet + String subscriptionRegex = this.subscriptions.subscriptionPattern().pattern(); + if (!subscriptionRegex.equals(sentFields.subscribedTopicRegex)) { + data.setSubscribedTopicRegex(subscriptionRegex); + sentFields.subscribedTopicRegex = subscriptionRegex; + } } // ServerAssignor - only sent if has changed since the last heartbeat @@ -578,6 +583,7 @@ static class SentFields { private String instanceId = null; private int rebalanceTimeoutMs = -1; private TreeSet subscribedTopicNames = null; + private String subscribedTopicRegex = null; private String serverAssignor = null; private TreeSet topicPartitions = null; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java index 81467568950dd..5a1775c76b47f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java @@ -497,12 +497,12 @@ public void subscribe(Pattern pattern) { @Override public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { - + log.warn("Operation not supported in new consumer group protocol"); } @Override public void subscribe(SubscriptionPattern pattern) { - + log.warn("Operation not supported in new consumer group protocol"); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index d859721b5817d..3e2cbd058e8e6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -85,7 +85,7 @@ private enum SubscriptionType { /* the pattern user has requested */ private Pattern subscribedPattern; - /* we should rename this to something more specific */ + /* RE2J compatible regex */ private SubscriptionPattern subscriptionPattern; /* the list of topics the user has requested */ @@ -317,6 +317,7 @@ public synchronized void unsubscribe() { this.groupSubscription = Collections.emptySet(); this.assignment.clear(); this.subscribedPattern = null; + this.subscriptionPattern = null; this.subscriptionType = SubscriptionType.NONE; this.assignmentId++; } diff --git a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json index 71c6e2e25025e..95c26421ce26a 100644 --- a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json @@ -35,6 +35,8 @@ "about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "topicName", "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }, + { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" }, { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null", diff --git a/clients/src/test/resources/log4j.properties b/clients/src/test/resources/log4j.properties index 0992580eca1d8..0eee976fae025 100644 --- a/clients/src/test/resources/log4j.properties +++ b/clients/src/test/resources/log4j.properties @@ -18,6 +18,6 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.org.apache.kafka=ERROR +log4j.logger.org.apache.kafka=DEBUG # We are testing for a particular INFO log message in CommonNameLoggingTrustManagerFactoryWrapper log4j.logger.org.apache.kafka.common.security.ssl.CommonNameLoggingTrustManagerFactoryWrapper=INFO diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index f7fb7364a3c38..ffcd7c0170735 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -18,7 +18,7 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.kafka=WARN +log4j.logger.kafka=DEBUG log4j.logger.org.apache.kafka=WARN From 38c08d3fcf8768fa8ae7d1cb3b7fc8cb98645902 Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Fri, 2 Feb 2024 18:07:38 +0700 Subject: [PATCH 08/26] reverted unwanted log level changes in log4j.properties and added subscriptionRegex into HeartbeatRequestManagerTest --- .../clients/consumer/internals/HeartbeatRequestManager.java | 1 - .../consumer/internals/HeartbeatRequestManagerTest.java | 4 ++++ clients/src/test/resources/log4j.properties | 2 +- core/src/test/resources/log4j.properties | 2 +- 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java index 6984b63328ad0..ef2e27bf05cec 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java @@ -18,7 +18,6 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java index e44414b0677b9..30de6ef00e8c7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java @@ -312,6 +312,7 @@ public void testValidateConsumerGroupHeartbeatRequest(final short version) { assertEquals(memberEpoch, heartbeatRequest.data().memberEpoch()); assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, heartbeatRequest.data().rebalanceTimeoutMs()); assertEquals(subscribedTopics, heartbeatRequest.data().subscribedTopicNames()); + assertEquals(null, heartbeatRequest.data().subscribedTopicRegex()); assertEquals(DEFAULT_GROUP_INSTANCE_ID, heartbeatRequest.data().instanceId()); assertEquals(DEFAULT_REMOTE_ASSIGNOR, heartbeatRequest.data().serverAssignor()); } @@ -470,6 +471,7 @@ public void testHeartbeatState() { assertNull(data.instanceId()); assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); assertEquals(Collections.emptyList(), data.subscribedTopicNames()); + assertEquals(null, data.subscribedTopicRegex()); assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); assertEquals(Collections.emptyList(), data.topicPartitions()); membershipManager.onHeartbeatRequestSent(); @@ -484,6 +486,7 @@ public void testHeartbeatState() { assertNull(data.instanceId()); assertEquals(-1, data.rebalanceTimeoutMs()); assertNull(data.subscribedTopicNames()); + assertNull(data.subscribedTopicRegex()); assertNull(data.serverAssignor()); assertNull(data.topicPartitions()); membershipManager.onHeartbeatRequestSent(); @@ -501,6 +504,7 @@ public void testHeartbeatState() { assertNull(data.instanceId()); assertEquals(-1, data.rebalanceTimeoutMs()); assertEquals(Collections.singletonList(topic), data.subscribedTopicNames()); + assertNull(data.subscribedTopicRegex()); assertNull(data.serverAssignor()); assertNull(data.topicPartitions()); membershipManager.onHeartbeatRequestSent(); diff --git a/clients/src/test/resources/log4j.properties b/clients/src/test/resources/log4j.properties index 0eee976fae025..0992580eca1d8 100644 --- a/clients/src/test/resources/log4j.properties +++ b/clients/src/test/resources/log4j.properties @@ -18,6 +18,6 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.org.apache.kafka=DEBUG +log4j.logger.org.apache.kafka=ERROR # We are testing for a particular INFO log message in CommonNameLoggingTrustManagerFactoryWrapper log4j.logger.org.apache.kafka.common.security.ssl.CommonNameLoggingTrustManagerFactoryWrapper=INFO diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index ffcd7c0170735..f7fb7364a3c38 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -18,7 +18,7 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.kafka=DEBUG +log4j.logger.kafka=WARN log4j.logger.org.apache.kafka=WARN From 1f718ff40ac810e8c5838ce1cafe16b7ef33110a Mon Sep 17 00:00:00 2001 From: Phuc-Hong-Tran <44060007+Phuc-Hong-Tran@users.noreply.github.com> Date: Tue, 6 Feb 2024 18:09:54 +0700 Subject: [PATCH 09/26] refactored subscribeInternal in AsyncKafkaConsumer Co-authored-by: Bruno Cadonna --- .../kafka/clients/consumer/internals/AsyncKafkaConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 2bccd562ffc6e..f53b8c26fa1e6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1752,7 +1752,7 @@ private void subscribeInternal(SubscriptionPattern pattern, Optional Date: Fri, 9 Feb 2024 20:14:17 +0700 Subject: [PATCH 10/26] refactored toString method to include subscriptionPattern in SubscriptionState --- .../consumer/internals/SubscriptionState.java | 1 + .../kafka/api/PlaintextConsumerTest.scala | 47 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index b7a9572aa98fb..86a145b25af8d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -112,6 +112,7 @@ public synchronized String toString() { return "SubscriptionState{" + "type=" + subscriptionType + ", subscribedPattern=" + subscribedPattern + + ", subscriptionPattern=" + subscriptionPattern + ", subscription=" + String.join(",", subscription) + ", groupSubscription=" + String.join(",", groupSubscription) + ", defaultResetStrategy=" + defaultResetStrategy + diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 23d522c41ae4d..5f7a181d2549f 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -492,6 +492,53 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(0, consumer.assignment().size) } + def testRE2JPatternSubscription(quorum: String, groupProtocol: String): Unit = { + val numRecords = 10000 + val producer = createProducer() + sendRecords(producer, numRecords, tp) + + val topic1 = "tblablac" // matches subscribed pattern + createTopic(topic1, 2, brokerCount) + sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0)) + sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1)) + + val topic2 = "tblablak" // does not match subscribed pattern + createTopic(topic2, 2, brokerCount) + sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 0)) + sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 1)) + + val topic3 = "tblab1" // does not match subscribed pattern + createTopic(topic3, 2, brokerCount) + sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 0)) + sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 1)) + + val consumer = createConsumer() + assertEquals(0, consumer.assignment().size) + + val pattern = Pattern.compile("t.*c") + consumer.subscribe(pattern, new TestConsumerReassignmentListener) + + var assignment = Set( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1)) + awaitAssignment(consumer, assignment) + + val topic4 = "tsomec" // matches subscribed pattern + createTopic(topic4, 2, brokerCount) + sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 0)) + sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 1)) + + assignment ++= Set( + new TopicPartition(topic4, 0), + new TopicPartition(topic4, 1)) + awaitAssignment(consumer, assignment) + + consumer.unsubscribe() + assertEquals(0, consumer.assignment().size) + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testCommitMetadata(quorum: String, groupProtocol: String): Unit = { From 5841899db85df47ee7c6bac96e898a54297feebf Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Thu, 15 Feb 2024 18:59:46 +0700 Subject: [PATCH 11/26] Added commments to explain subscribe methods in KafkaConsumer that use SubscriptionPattern and what SubscriptionPattern is used for --- .../kafka/clients/consumer/Consumer.java | 6 +++ .../kafka/clients/consumer/KafkaConsumer.java | 37 +++++++++++++++++++ .../clients/consumer/SubscriptionPattern.java | 5 +++ 3 files changed, 48 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 82fea13ce9b81..a69f7abcc915c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -72,8 +72,14 @@ public interface Consumer extends Closeable { */ void subscribe(Pattern pattern); + /** + * @see KafkaConsumer#subscribe(SubscriptionPattern, ConsumerRebalanceListener) + */ void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback); + /** + * @see KafkaConsumer#subscribe(SubscriptionPattern) + */ void subscribe(SubscriptionPattern pattern); /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 2f163f2cf5a26..52993c111de84 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -753,10 +753,47 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { public void subscribe(Pattern pattern) { delegate.subscribe(pattern); } + + /** + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done periodically against all topics existing at the time of check. + * This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering + * the max metadata age, the consumer will refresh metadata more often and check for matching topics. + *

+ * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the + * use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there + * is a change to the topics matching the provided pattern and when consumer group membership changes. + * Group rebalances only take place during an active call to {@link #poll(Duration)}. + * + * @param pattern SubscriptionPattern to subscribe to + * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the + * subscribed topics + * @throws IllegalArgumentException If pattern or listener is null + * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called + * previously (without a subsequent call to {@link #unsubscribe()}), or if not + * configured at-least one partition assignment strategy + */ @Override public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { delegate.subscribe(pattern, callback); } + + /** + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done on the broker against topics existing at the time of check. + *

+ * This is a short-hand for {@link #subscribe(SubscriptionPattern, ConsumerRebalanceListener)}, which + * uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer + * {@link #subscribe(SubscriptionPattern, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets + * to be reset. You should also provide your own listener if you are doing your own offset + * management since the listener gives you an opportunity to commit offsets before a rebalance finishes. + * + * @param pattern SubscriptionPattern to subscribe to + * @throws IllegalArgumentException If pattern is null + * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called + * previously (without a subsequent call to {@link #unsubscribe()}), or if not + * configured at-least one partition assignment strategy + */ @Override public void subscribe(SubscriptionPattern pattern) { delegate.subscribe(pattern); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java index 40c6170caebba..34148249a2ac7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java @@ -16,6 +16,11 @@ */ package org.apache.kafka.clients.consumer; +/** + * A class that hold a regular expression which compatible with Google's RE2J engine. Visit + * this repository for details on RE2J engine. + */ + public class SubscriptionPattern { final private String pattern; public SubscriptionPattern(final String pattern) { From c66c474941af1083a8a7921a6d30c670fa3930d4 Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Fri, 16 Feb 2024 18:05:07 +0700 Subject: [PATCH 12/26] Added tests relating to SubscriptionPattern for SubscriptionState, AsyncKafkaConsumer --- .../kafka/clients/consumer/KafkaConsumer.java | 70 +++++++++---------- .../clients/consumer/KafkaConsumerTest.java | 15 ++++ .../internals/AsyncKafkaConsumerTest.java | 18 +++++ .../internals/SubscriptionStateTest.java | 11 +++ .../kafka/api/PlaintextConsumerTest.scala | 47 ------------- 5 files changed, 79 insertions(+), 82 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 52993c111de84..1f4eb9530dd0e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -754,46 +754,46 @@ public void subscribe(Pattern pattern) { delegate.subscribe(pattern); } - /** - * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. - * The pattern matching will be done periodically against all topics existing at the time of check. - * This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering - * the max metadata age, the consumer will refresh metadata more often and check for matching topics. - *

- * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the - * use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there - * is a change to the topics matching the provided pattern and when consumer group membership changes. - * Group rebalances only take place during an active call to {@link #poll(Duration)}. - * - * @param pattern SubscriptionPattern to subscribe to - * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the - * subscribed topics - * @throws IllegalArgumentException If pattern or listener is null - * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called - * previously (without a subsequent call to {@link #unsubscribe()}), or if not - * configured at-least one partition assignment strategy - */ + /** + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done periodically against all topics existing at the time of check. + * This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering + * the max metadata age, the consumer will refresh metadata more often and check for matching topics. + *

+ * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the + * use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there + * is a change to the topics matching the provided pattern and when consumer group membership changes. + * Group rebalances only take place during an active call to {@link #poll(Duration)}. + * + * @param pattern SubscriptionPattern to subscribe to + * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the + * subscribed topics + * @throws IllegalArgumentException If pattern or listener is null + * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called + * previously (without a subsequent call to {@link #unsubscribe()}), or if not + * configured at-least one partition assignment strategy + */ @Override public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { delegate.subscribe(pattern, callback); } - /** - * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. - * The pattern matching will be done on the broker against topics existing at the time of check. - *

- * This is a short-hand for {@link #subscribe(SubscriptionPattern, ConsumerRebalanceListener)}, which - * uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer - * {@link #subscribe(SubscriptionPattern, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets - * to be reset. You should also provide your own listener if you are doing your own offset - * management since the listener gives you an opportunity to commit offsets before a rebalance finishes. - * - * @param pattern SubscriptionPattern to subscribe to - * @throws IllegalArgumentException If pattern is null - * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called - * previously (without a subsequent call to {@link #unsubscribe()}), or if not - * configured at-least one partition assignment strategy - */ + /** + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done on the broker against topics existing at the time of check. + *

+ * This is a short-hand for {@link #subscribe(SubscriptionPattern, ConsumerRebalanceListener)}, which + * uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer + * {@link #subscribe(SubscriptionPattern, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets + * to be reset. You should also provide your own listener if you are doing your own offset + * management since the listener gives you an opportunity to commit offsets before a rebalance finishes. + * + * @param pattern SubscriptionPattern to subscribe to + * @throws IllegalArgumentException If pattern is null + * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called + * previously (without a subsequent call to {@link #unsubscribe()}), or if not + * configured at-least one partition assignment strategy + */ @Override public void subscribe(SubscriptionPattern pattern) { delegate.subscribe(pattern); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index dc5b5ff2f9069..3dcdd9ba04ce9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -485,6 +485,13 @@ public void testSubscriptionOnNullPattern(GroupProtocol groupProtocol) { assertThrows(IllegalArgumentException.class, () -> consumer.subscribe((Pattern) null)); } + @ParameterizedTest + @EnumSource(value = GroupProtocol.class, names = "CONSUMER") + public void testSubscriptionOnNullSubscriptionPattern(GroupProtocol groupProtocol) { + consumer = newConsumer(groupProtocol, groupId); + assertThrows(IllegalArgumentException.class, + () -> consumer.subscribe((SubscriptionPattern) null)); + } @ParameterizedTest @EnumSource(GroupProtocol.class) @@ -494,6 +501,14 @@ public void testSubscriptionOnEmptyPattern(GroupProtocol groupProtocol) { () -> consumer.subscribe(Pattern.compile(""))); } + @ParameterizedTest + @EnumSource(value = GroupProtocol.class, names = "CONSUMER") + public void testSubscriptionOnEmptySubscriptionPattern(GroupProtocol groupProtocol) { + consumer = newConsumer(groupProtocol, groupId); + assertThrows(IllegalArgumentException.class, + () -> consumer.subscribe(new SubscriptionPattern(""))); + } + @ParameterizedTest @EnumSource(GroupProtocol.class) public void testSubscriptionWithEmptyPartitionAssignment(GroupProtocol groupProtocol) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 691467d29fbd5..8bf10a6627db4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; import org.apache.kafka.clients.consumer.RoundRobinAssignor; +import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; @@ -582,6 +583,23 @@ public void testPartitionRevocationOnClose() { assertTrue(subscriptions.assignedPartitions().isEmpty()); assertEquals(1, listener.revokedCount); } + @Test + public void testSubscribeToSubscriptionPattern() { + MockRebalanceListener listener = new MockRebalanceListener(); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + consumer = newConsumer( + mock(FetchBuffer.class), + mock(ConsumerInterceptors.class), + mock(ConsumerRebalanceListenerInvoker.class), + subscriptions, + singletonList(new RoundRobinAssignor()), + "group-id", + "client-id"); + + SubscriptionPattern pattern = new SubscriptionPattern("t*"); + consumer.subscribe(pattern, listener); + assertEquals(subscriptions.subscriptionPattern(), pattern); + } @Test public void testFailedPartitionRevocationOnClose() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index fe6cc5f028171..b72fc487fb1a4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncation; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Node; @@ -186,6 +187,16 @@ public void partitionAssignmentChangeOnPatternSubscription() { assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); } + @Test + public void testSubscriptionPatternInclusionInSubscriptionState() { + state.subscribe(new SubscriptionPattern("*t"), Optional.of(rebalanceListener)); + assertTrue(state.assignedPartitions().isEmpty()); + assertEquals(0, state.numAssignedPartitions()); + + state.unsubscribe(); + assertEquals(state.subscriptionPattern(), null); + assertEquals(0, state.numAssignedPartitions()); + } @Test public void verifyAssignmentId() { diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 5f7a181d2549f..23d522c41ae4d 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -492,53 +492,6 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(0, consumer.assignment().size) } - def testRE2JPatternSubscription(quorum: String, groupProtocol: String): Unit = { - val numRecords = 10000 - val producer = createProducer() - sendRecords(producer, numRecords, tp) - - val topic1 = "tblablac" // matches subscribed pattern - createTopic(topic1, 2, brokerCount) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0)) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1)) - - val topic2 = "tblablak" // does not match subscribed pattern - createTopic(topic2, 2, brokerCount) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 0)) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 1)) - - val topic3 = "tblab1" // does not match subscribed pattern - createTopic(topic3, 2, brokerCount) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 0)) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 1)) - - val consumer = createConsumer() - assertEquals(0, consumer.assignment().size) - - val pattern = Pattern.compile("t.*c") - consumer.subscribe(pattern, new TestConsumerReassignmentListener) - - var assignment = Set( - new TopicPartition(topic, 0), - new TopicPartition(topic, 1), - new TopicPartition(topic1, 0), - new TopicPartition(topic1, 1)) - awaitAssignment(consumer, assignment) - - val topic4 = "tsomec" // matches subscribed pattern - createTopic(topic4, 2, brokerCount) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 0)) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 1)) - - assignment ++= Set( - new TopicPartition(topic4, 0), - new TopicPartition(topic4, 1)) - awaitAssignment(consumer, assignment) - - consumer.unsubscribe() - assertEquals(0, consumer.assignment().size) - } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testCommitMetadata(quorum: String, groupProtocol: String): Unit = { From a48a84def3fc31a10b8d9ea68b218e6558ae335b Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Sat, 17 Feb 2024 10:17:55 +0700 Subject: [PATCH 13/26] Refactored AsyncKafkaConsumerTest --- .../clients/consumer/internals/AsyncKafkaConsumerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 8bf10a6627db4..3f7baa2811cc1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -584,7 +584,7 @@ public void testPartitionRevocationOnClose() { assertEquals(1, listener.revokedCount); } @Test - public void testSubscribeToSubscriptionPattern() { + public void testSubscribeUsingSubscriptionPattern() { MockRebalanceListener listener = new MockRebalanceListener(); SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); consumer = newConsumer( From df300c7dd860d9a0cde79b5a22c9c9c00641f8a1 Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Mon, 19 Feb 2024 20:18:58 +0700 Subject: [PATCH 14/26] Fixed consumerGroupHeartbeatRequestData RPC versioning --- .../common/message/ConsumerGroupHeartbeatRequest.json | 5 +++-- .../clients/consumer/internals/SubscriptionStateTest.java | 5 ++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json index 95c26421ce26a..96e0b4227493d 100644 --- a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json @@ -18,7 +18,8 @@ "type": "request", "listeners": ["zkBroker", "broker"], "name": "ConsumerGroupHeartbeatRequest", - "validVersions": "0", + "latestVersionUnstable": true, + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", @@ -35,7 +36,7 @@ "about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "topicName", "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }, - { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + { "name": "SubscribedTopicRegex", "type": "string", "versions": "1+", "nullableVersions": "1+", "default": "null", "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" }, { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index b72fc487fb1a4..0ec003804a25d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -189,9 +189,12 @@ public void partitionAssignmentChangeOnPatternSubscription() { } @Test public void testSubscriptionPatternInclusionInSubscriptionState() { - state.subscribe(new SubscriptionPattern("*t"), Optional.of(rebalanceListener)); + SubscriptionPattern pattern = new SubscriptionPattern("*t"); + + state.subscribe(pattern, Optional.of(rebalanceListener)); assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); + assertTrue(state.subscriptionPattern().equals(pattern)); state.unsubscribe(); assertEquals(state.subscriptionPattern(), null); From aa0b1d10c675cd457c126f3b60cd7181cda1df40 Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Mon, 19 Feb 2024 21:13:18 +0700 Subject: [PATCH 15/26] Added comment to explain what was added in ConsumerGroupHeartbeatRequest.json --- .../consumer/internals/SubscriptionState.java | 12 ++++++++++++ .../message/ConsumerGroupHeartbeatRequest.json | 2 ++ 2 files changed, 14 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 86a145b25af8d..2ab099a1b6f38 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -177,12 +177,20 @@ public synchronized boolean subscribe(Set topics, Optional listener) { registerRebalanceListener(listener); setSubscriptionType(SubscriptionType.AUTO_PATTERN); + + if(!this.subscriptionPattern.equals(null)) { + this.subscriptionPattern = null; + } this.subscribedPattern = pattern; } public synchronized void subscribe(SubscriptionPattern pattern, Optional listener) { registerRebalanceListener(listener); setSubscriptionType(SubscriptionType.AUTO_PATTERN); + + if(!this.subscribedPattern.equals(null)) { + this.subscribedPattern = null; + } this.subscriptionPattern = pattern; } @@ -345,6 +353,10 @@ public synchronized SubscriptionPattern subscriptionPattern() { return this.subscriptionPattern; } + public synchronized Pattern subscribedPattern() { + return this.subscribedPattern; + } + public synchronized Set pausedPartitions() { return collectPartitions(TopicPartitionState::isPaused); } diff --git a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json index 96e0b4227493d..e8f4613b44cbe 100644 --- a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//Version 1 added SubscribedTopicRegex to the request for KIP-848 + { "apiKey": 68, "type": "request", From bbf604d753f0909a36906a1de1f09930b672808d Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Mon, 19 Feb 2024 21:44:59 +0700 Subject: [PATCH 16/26] Revert changes on subscribe methods in subscriptionState --- .../consumer/internals/SubscriptionState.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 2ab099a1b6f38..86a145b25af8d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -177,20 +177,12 @@ public synchronized boolean subscribe(Set topics, Optional listener) { registerRebalanceListener(listener); setSubscriptionType(SubscriptionType.AUTO_PATTERN); - - if(!this.subscriptionPattern.equals(null)) { - this.subscriptionPattern = null; - } this.subscribedPattern = pattern; } public synchronized void subscribe(SubscriptionPattern pattern, Optional listener) { registerRebalanceListener(listener); setSubscriptionType(SubscriptionType.AUTO_PATTERN); - - if(!this.subscribedPattern.equals(null)) { - this.subscribedPattern = null; - } this.subscriptionPattern = pattern; } @@ -353,10 +345,6 @@ public synchronized SubscriptionPattern subscriptionPattern() { return this.subscriptionPattern; } - public synchronized Pattern subscribedPattern() { - return this.subscribedPattern; - } - public synchronized Set pausedPartitions() { return collectPartitions(TopicPartitionState::isPaused); } From 53fb53d79cede76766edf742e4e52d35da73ed40 Mon Sep 17 00:00:00 2001 From: Phuc-Hong-Tran <44060007+Phuc-Hong-Tran@users.noreply.github.com> Date: Tue, 5 Mar 2024 11:09:15 +1100 Subject: [PATCH 17/26] Fix unwanted spacing in OffsetsRequestManagerTest.java --- .../clients/consumer/internals/OffsetsRequestManagerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java index a6342e96f687d..0998fba272f9e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java @@ -86,6 +86,7 @@ import static org.mockito.Mockito.when; public class OffsetsRequestManagerTest { + private OffsetsRequestManager requestManager; private ConsumerMetadata metadata; private SubscriptionState subscriptionState; @@ -954,4 +955,4 @@ private ListOffsetsResponse buildListOffsetsResponse( return new ListOffsetsResponse(responseData); } -} \ No newline at end of file +} From 695b58b75b853eb978356a95e90e97055d27404e Mon Sep 17 00:00:00 2001 From: Phuc-Hong-Tran <44060007+Phuc-Hong-Tran@users.noreply.github.com> Date: Fri, 8 Mar 2024 21:44:38 +1100 Subject: [PATCH 18/26] Update SubscriptionPattern's documentation Co-authored-by: Bruno Cadonna --- .../apache/kafka/clients/consumer/SubscriptionPattern.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java index 34148249a2ac7..378c6c3d22577 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java @@ -17,8 +17,10 @@ package org.apache.kafka.clients.consumer; /** - * A class that hold a regular expression which compatible with Google's RE2J engine. Visit - * this repository for details on RE2J engine. + * Represents a regular expression used to subscribe to topics. The pattern + * must be a Google RE2/J compatible pattern. Visit + * + * @see RE2/J regular expression engine */ public class SubscriptionPattern { From b5eb9c39b6ebc68575de571c99ce0e037c8e82f0 Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Fri, 8 Mar 2024 21:36:02 +1100 Subject: [PATCH 19/26] Revert "Fix unwanted spacing in OffsetsRequestManagerTest.java" This reverts commit 53fb53d79cede76766edf742e4e52d35da73ed40. --- .../clients/consumer/internals/OffsetsRequestManagerTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java index 84828ed7c252d..b5fbd831892fc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java @@ -86,7 +86,6 @@ import static org.mockito.Mockito.when; public class OffsetsRequestManagerTest { - private OffsetsRequestManager requestManager; private ConsumerMetadata metadata; private SubscriptionState subscriptionState; @@ -955,4 +954,4 @@ private ListOffsetsResponse buildListOffsetsResponse( return new ListOffsetsResponse(responseData); } -} +} \ No newline at end of file From ec26dcbf0272ee06a6ab1c35be7583379d27a16e Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Wed, 13 Mar 2024 13:40:38 +1100 Subject: [PATCH 20/26] Refactored AsyncKafkaConsumerTest, moved some pattern validations into SubscriptionPattern, refactored subscribeInternal for AsyncKafkaConsumer --- .../kafka/clients/consumer/KafkaConsumer.java | 16 ++++++---------- .../clients/consumer/SubscriptionPattern.java | 8 +++++++- .../consumer/internals/AsyncKafkaConsumer.java | 4 ---- .../internals/HeartbeatRequestManager.java | 6 +----- .../consumer/internals/LegacyKafkaConsumer.java | 4 ++-- .../consumer/internals/SubscriptionState.java | 6 ++++++ .../message/ConsumerGroupHeartbeatRequest.json | 5 +---- .../clients/consumer/KafkaConsumerTest.java | 4 ++-- .../internals/AsyncKafkaConsumerTest.java | 4 ++-- .../internals/HeartbeatRequestManagerTest.java | 4 ---- .../internals/SubscriptionStateTest.java | 5 +++++ 11 files changed, 32 insertions(+), 34 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 1f4eb9530dd0e..0181b58fd988a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -716,9 +716,7 @@ public void subscribe(Collection topics) { * the max metadata age, the consumer will refresh metadata more often and check for matching topics. *

* See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the - * use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there - * is a change to the topics matching the provided pattern and when consumer group membership changes. - * Group rebalances only take place during an active call to {@link #poll(Duration)}. + * use of the {@link ConsumerRebalanceListener}. * * @param pattern Pattern to subscribe to * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the @@ -756,14 +754,11 @@ public void subscribe(Pattern pattern) { /** * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. - * The pattern matching will be done periodically against all topics existing at the time of check. - * This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering - * the max metadata age, the consumer will refresh metadata more often and check for matching topics. + * The pattern matching will be done on the broker against topics existing at the time of check + * and matching topic(s) will be returned to the client. *

* See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the - * use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there - * is a change to the topics matching the provided pattern and when consumer group membership changes. - * Group rebalances only take place during an active call to {@link #poll(Duration)}. + * use of the {@link ConsumerRebalanceListener}. * * @param pattern SubscriptionPattern to subscribe to * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the @@ -780,7 +775,8 @@ public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener cal /** * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. - * The pattern matching will be done on the broker against topics existing at the time of check. + * The pattern matching will be done on the broker against topics existing at the time of check + * and matching topic(s) will be returned to the client. *

* This is a short-hand for {@link #subscribe(SubscriptionPattern, ConsumerRebalanceListener)}, which * uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java index 378c6c3d22577..ed9392b5547ed 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java @@ -26,7 +26,13 @@ public class SubscriptionPattern { final private String pattern; public SubscriptionPattern(final String pattern) { - this.pattern = pattern; + if(pattern.equals("") || pattern == null) { + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + } else { + this.pattern = pattern; + } + } public String pattern() { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index a4a782c576499..8cc0d57bc9c76 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1775,10 +1775,6 @@ private void subscribeInternal(SubscriptionPattern pattern, Optional topics, Optional listener) { registerRebalanceListener(listener); setSubscriptionType(SubscriptionType.AUTO_PATTERN); + this.subscriptionPattern = null; this.subscribedPattern = pattern; } public synchronized void subscribe(SubscriptionPattern pattern, Optional listener) { registerRebalanceListener(listener); setSubscriptionType(SubscriptionType.AUTO_PATTERN); + this.subscribedPattern = null; this.subscriptionPattern = pattern; } @@ -345,6 +347,10 @@ public synchronized SubscriptionPattern subscriptionPattern() { return this.subscriptionPattern; } + public synchronized Pattern subscribedPattern() { + return this.subscribedPattern; + } + public synchronized Set pausedPartitions() { return collectPartitions(TopicPartitionState::isPaused); } diff --git a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json index e8f4613b44cbe..83e444bfa6873 100644 --- a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json @@ -20,8 +20,7 @@ "type": "request", "listeners": ["zkBroker", "broker"], "name": "ConsumerGroupHeartbeatRequest", - "latestVersionUnstable": true, - "validVersions": "0-1", + "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", @@ -38,8 +37,6 @@ "about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "topicName", "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }, - { "name": "SubscribedTopicRegex", "type": "string", "versions": "1+", "nullableVersions": "1+", "default": "null", - "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" }, { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null", diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index c1ff7e0f409a5..1b32ac5bbe447 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -485,7 +485,7 @@ public void testSubscriptionOnNullPattern(GroupProtocol groupProtocol) { () -> consumer.subscribe((Pattern) null)); } @ParameterizedTest - @EnumSource(value = GroupProtocol.class, names = "CONSUMER") + @EnumSource(value = GroupProtocol.class) public void testSubscriptionOnNullSubscriptionPattern(GroupProtocol groupProtocol) { consumer = newConsumer(groupProtocol, groupId); assertThrows(IllegalArgumentException.class, @@ -501,7 +501,7 @@ public void testSubscriptionOnEmptyPattern(GroupProtocol groupProtocol) { } @ParameterizedTest - @EnumSource(value = GroupProtocol.class, names = "CONSUMER") + @EnumSource(value = GroupProtocol.class) public void testSubscriptionOnEmptySubscriptionPattern(GroupProtocol groupProtocol) { consumer = newConsumer(groupProtocol, groupId); assertThrows(IllegalArgumentException.class, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index b89f0790fa862..5052958669357 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -695,7 +695,7 @@ public void testPartitionRevocationOnClose() { @Test public void testSubscribeUsingSubscriptionPattern() { MockRebalanceListener listener = new MockRebalanceListener(); - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscriptions = mock(SubscriptionState.class); consumer = newConsumer( mock(FetchBuffer.class), mock(ConsumerInterceptors.class), @@ -707,7 +707,7 @@ public void testSubscribeUsingSubscriptionPattern() { SubscriptionPattern pattern = new SubscriptionPattern("t*"); consumer.subscribe(pattern, listener); - assertEquals(subscriptions.subscriptionPattern(), pattern); + verify(subscriptions).subscribe(pattern, Optional.of(listener)); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java index 7e9f8f3c5981e..8e05e505be471 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java @@ -320,7 +320,6 @@ public void testValidateConsumerGroupHeartbeatRequest(final short version) { assertEquals(memberEpoch, heartbeatRequest.data().memberEpoch()); assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, heartbeatRequest.data().rebalanceTimeoutMs()); assertEquals(subscribedTopics, heartbeatRequest.data().subscribedTopicNames()); - assertEquals(null, heartbeatRequest.data().subscribedTopicRegex()); assertEquals(DEFAULT_GROUP_INSTANCE_ID, heartbeatRequest.data().instanceId()); assertEquals(DEFAULT_REMOTE_ASSIGNOR, heartbeatRequest.data().serverAssignor()); } @@ -400,7 +399,6 @@ public void testHeartbeatState() { assertNull(data.instanceId()); assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); assertEquals(Collections.emptyList(), data.subscribedTopicNames()); - assertEquals(null, data.subscribedTopicRegex()); assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); assertEquals(Collections.emptyList(), data.topicPartitions()); membershipManager.onHeartbeatRequestSent(); @@ -415,7 +413,6 @@ public void testHeartbeatState() { assertNull(data.instanceId()); assertEquals(-1, data.rebalanceTimeoutMs()); assertNull(data.subscribedTopicNames()); - assertNull(data.subscribedTopicRegex()); assertNull(data.serverAssignor()); assertNull(data.topicPartitions()); membershipManager.onHeartbeatRequestSent(); @@ -433,7 +430,6 @@ public void testHeartbeatState() { assertNull(data.instanceId()); assertEquals(-1, data.rebalanceTimeoutMs()); assertEquals(Collections.singletonList(topic), data.subscribedTopicNames()); - assertNull(data.subscribedTopicRegex()); assertNull(data.serverAssignor()); assertNull(data.topicPartitions()); membershipManager.onHeartbeatRequestSent(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 0ec003804a25d..f905877d5c71f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -142,6 +142,7 @@ public void partitionAssignmentChangeOnPatternSubscription() { // assigned partitions should remain unchanged assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); + assertEquals(state.subscriptionPattern(), null); state.subscribeFromPattern(Collections.singleton(topic)); // assigned partitions should remain unchanged @@ -168,6 +169,7 @@ public void partitionAssignmentChangeOnPatternSubscription() { // assigned partitions should remain unchanged assertEquals(singleton(t1p0), state.assignedPartitions()); assertEquals(1, state.numAssignedPartitions()); + assertEquals(state.subscriptionPattern(), null); state.subscribeFromPattern(singleton(topic)); // assigned partitions should remain unchanged @@ -186,6 +188,7 @@ public void partitionAssignmentChangeOnPatternSubscription() { // assigned partitions should immediately change assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); + assertEquals(state.subscriptionPattern(), null); } @Test public void testSubscriptionPatternInclusionInSubscriptionState() { @@ -194,11 +197,13 @@ public void testSubscriptionPatternInclusionInSubscriptionState() { state.subscribe(pattern, Optional.of(rebalanceListener)); assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); + assertEquals(state.subscribedPattern(), null); assertTrue(state.subscriptionPattern().equals(pattern)); state.unsubscribe(); assertEquals(state.subscriptionPattern(), null); assertEquals(0, state.numAssignedPartitions()); + assertEquals(state.subscribedPattern(), null); } @Test From 443739aee78526ab022ccb5786a93711c4b0e869 Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Wed, 13 Mar 2024 14:21:12 +1100 Subject: [PATCH 21/26] fixed styleCheck in SubscriptionPattern --- .../org/apache/kafka/clients/consumer/SubscriptionPattern.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java index ed9392b5547ed..e19272748812b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java @@ -26,7 +26,7 @@ public class SubscriptionPattern { final private String pattern; public SubscriptionPattern(final String pattern) { - if(pattern.equals("") || pattern == null) { + if (pattern.equals("") || pattern == null) { throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? "null" : "empty")); } else { From d035c7dcb160ef777c98a16179f8c5383ea6cf15 Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Wed, 13 Mar 2024 14:28:42 +1100 Subject: [PATCH 22/26] Revert changes in MockConsumer --- .../apache/kafka/clients/consumer/MockConsumer.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index d73837211a964..27faa80c65421 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -141,16 +141,6 @@ public synchronized void subscribe(Pattern pattern) { subscribe(pattern, Optional.empty()); } - @Override - public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { - - } - - @Override - public void subscribe(SubscriptionPattern pattern) { - - } - @Override public void subscribe(Collection topics, final ConsumerRebalanceListener listener) { if (listener == null) From 3d3ff82d61ff960fac8750bbe2ef8ce47b0555a5 Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Wed, 13 Mar 2024 14:35:51 +1100 Subject: [PATCH 23/26] Revert "fixed styleCheck in SubscriptionPattern" This reverts commit 443739aee78526ab022ccb5786a93711c4b0e869. --- .../org/apache/kafka/clients/consumer/SubscriptionPattern.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java index e19272748812b..ed9392b5547ed 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java @@ -26,7 +26,7 @@ public class SubscriptionPattern { final private String pattern; public SubscriptionPattern(final String pattern) { - if (pattern.equals("") || pattern == null) { + if(pattern.equals("") || pattern == null) { throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? "null" : "empty")); } else { From 0f8b7d3d56a1c83a56493756dd49cc3e6a05668c Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Wed, 13 Mar 2024 14:37:07 +1100 Subject: [PATCH 24/26] revert previous changes --- .../apache/kafka/clients/consumer/MockConsumer.java | 10 ++++++++++ .../kafka/clients/consumer/SubscriptionPattern.java | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 27faa80c65421..d73837211a964 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -141,6 +141,16 @@ public synchronized void subscribe(Pattern pattern) { subscribe(pattern, Optional.empty()); } + @Override + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { + + } + + @Override + public void subscribe(SubscriptionPattern pattern) { + + } + @Override public void subscribe(Collection topics, final ConsumerRebalanceListener listener) { if (listener == null) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java index ed9392b5547ed..e19272748812b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java @@ -26,7 +26,7 @@ public class SubscriptionPattern { final private String pattern; public SubscriptionPattern(final String pattern) { - if(pattern.equals("") || pattern == null) { + if (pattern.equals("") || pattern == null) { throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? "null" : "empty")); } else { From d228f6dc30ef561d7e78d6059db3ea5d084a8415 Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Wed, 13 Mar 2024 15:18:07 +1100 Subject: [PATCH 25/26] fixed spotbugs failures in client package --- .../org/apache/kafka/clients/consumer/SubscriptionPattern.java | 2 +- .../clients/consumer/internals/HeartbeatRequestManager.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java index e19272748812b..987b9cf049a6b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java @@ -26,7 +26,7 @@ public class SubscriptionPattern { final private String pattern; public SubscriptionPattern(final String pattern) { - if (pattern.equals("") || pattern == null) { + if (pattern == null || pattern.equals("")) { throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? "null" : "empty")); } else { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java index 3f715cae04974..02cc0b05fb6b8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java @@ -596,7 +596,6 @@ static class SentFields { private String instanceId = null; private int rebalanceTimeoutMs = -1; private TreeSet subscribedTopicNames = null; - private String subscribedTopicRegex = null; private String serverAssignor = null; private TreeSet topicPartitions = null; From 969e0a622f1bfdbcadcef690696fb469b36ccc68 Mon Sep 17 00:00:00 2001 From: "phuchong.tran" Date: Fri, 15 Mar 2024 10:57:43 +1100 Subject: [PATCH 26/26] Included logic to check for null input in subscribe method that use SubscriptionPattern --- .../kafka/clients/consumer/internals/AsyncKafkaConsumer.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 8cc0d57bc9c76..171bbe5ae3742 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1775,6 +1775,9 @@ private void subscribeInternal(SubscriptionPattern pattern, Optional