diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 4201395578390..2c8376e5ccd8a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -73,6 +73,16 @@ public interface Consumer extends Closeable { */ void subscribe(Pattern pattern); + /** + * @see KafkaConsumer#subscribe(SubscriptionPattern, ConsumerRebalanceListener) + */ + void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback); + + /** + * @see KafkaConsumer#subscribe(SubscriptionPattern) + */ + void subscribe(SubscriptionPattern pattern); + /** * @see KafkaConsumer#unsubscribe() */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 8ba5cff7c0874..92e4cf2a55078 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidRegularExpression; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Deserializer; @@ -755,6 +756,55 @@ public void subscribe(Pattern pattern) { delegate.subscribe(pattern); } + /** + * Subscribe to all topics matching the specified pattern, to get dynamically assigned partitions. + * The pattern matching will be done periodically against all topics. This is only supported under the + * CONSUMER group protocol (see {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG}). + *

+ * If the provided pattern is not compatible with Google RE2/J, an {@link InvalidRegularExpression} will be + * eventually thrown on a call to {@link #poll(Duration)} following this call to subscribe. + *

+ * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the + * use of the {@link ConsumerRebalanceListener}. Generally, rebalances are triggered when there + * is a change to the topics matching the provided pattern and when consumer group membership changes. + * Group rebalances only take place during an active call to {@link #poll(Duration)}. + * + * @param pattern Pattern to subscribe to, that must be compatible with Google RE2/J. + * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the + * subscribed topics. + * @throws IllegalArgumentException If pattern is null or empty, or if the listener is null. + * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called + * previously (without a subsequent call to {@link #unsubscribe()}). + */ + @Override + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener listener) { + delegate.subscribe(pattern, listener); + } + + /** + * Subscribe to all topics matching the specified pattern, to get dynamically assigned partitions. + * The pattern matching will be done periodically against topics. This is only supported under the + * CONSUMER group protocol (see {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG}) + *

+ * If the provided pattern is not compatible with Google RE2/J, an {@link InvalidRegularExpression} will be + * eventually thrown on a call to {@link #poll(Duration)} following this call to subscribe. + *

+ * This is a short-hand for {@link #subscribe(Pattern, ConsumerRebalanceListener)}, which + * uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer + * {@link #subscribe(Pattern, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets + * to be reset. You should also provide your own listener if you are doing your own offset + * management since the listener gives you an opportunity to commit offsets before a rebalance finishes. + * + * @param pattern Pattern to subscribe to, that must be compatible with Google RE2/J. + * @throws IllegalArgumentException If pattern is null or empty. + * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called + * previously (without a subsequent call to {@link #unsubscribe()}). + */ + @Override + public void subscribe(SubscriptionPattern pattern) { + delegate.subscribe(pattern); + } + /** * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}. * This also clears any partitions directly assigned through {@link #assign(Collection)}. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 3c29c749acfaa..4bdffc48c237c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -143,6 +143,16 @@ public synchronized void subscribe(Pattern pattern) { subscribe(pattern, Optional.empty()); } + @Override + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { + throw new UnsupportedOperationException("Subscribe to RE2/J regular expression not supported in MockConsumer yet"); + } + + @Override + public void subscribe(SubscriptionPattern pattern) { + throw new UnsupportedOperationException("Subscribe to RE2/J regular expression not supported in MockConsumer yet"); + } + @Override public void subscribe(Collection topics, final ConsumerRebalanceListener listener) { if (listener == null) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java new file mode 100644 index 0000000000000..d6e168b8da179 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import java.util.Objects; + +/** + * Represents a regular expression compatible with Google RE2/J, used to subscribe to topics. + * This just keeps the String representation of the pattern, and all validations to ensure + * it is RE2/J compatible are delegated to the broker. + */ +public class SubscriptionPattern { + + /** + * String representation the regular expression, compatible with RE2/J. + */ + private final String pattern; + + public SubscriptionPattern(String pattern) { + this.pattern = pattern; + } + + /** + * @return Regular expression pattern compatible with RE2/J. + */ + public String pattern() { + return this.pattern; + } + + @Override + public String toString() { + return pattern; + } + + @Override + public int hashCode() { + return pattern.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof SubscriptionPattern && + Objects.equals(pattern, ((SubscriptionPattern) obj).pattern); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index ba7eed19f11fd..3ab84fd7aec0a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -34,6 +34,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.clients.consumer.internals.events.AllTopicsMetadataEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; @@ -1795,6 +1796,16 @@ public void subscribe(Pattern pattern) { subscribeInternal(pattern, Optional.empty()); } + @Override + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { + subscribeToRegex(pattern, Optional.ofNullable(callback)); + } + + @Override + public void subscribe(SubscriptionPattern pattern) { + subscribeToRegex(pattern, Optional.empty()); + } + @Override public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { if (listener == null) @@ -1857,6 +1868,29 @@ private void subscribeInternal(Pattern pattern, Optional listener) { + maybeThrowInvalidGroupIdException(); + throwIfSubscriptionPatternIsInvalid(pattern); + log.info("Subscribing to regular expression {}", pattern); + + // TODO: generate event to update subscribed regex so it's included in the next HB. + } + + private void throwIfSubscriptionPatternIsInvalid(SubscriptionPattern subscriptionPattern) { + if (subscriptionPattern == null) { + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null"); + } + if (subscriptionPattern.pattern().isEmpty()) { + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be empty"); + } + } + private void subscribeInternal(Collection topics, Optional listener) { acquireAndEnsureOpen(); try { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index e423c26176349..a3ac5e5698b35 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -35,6 +35,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.IsolationLevel; @@ -520,6 +521,18 @@ public void subscribe(Pattern pattern) { subscribeInternal(pattern, Optional.empty()); } + @Override + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { + throw new UnsupportedOperationException(String.format("Subscribe to RE2/J pattern is not supported when using" + + "the %s protocol defined in config %s", GroupProtocol.CLASSIC, ConsumerConfig.GROUP_PROTOCOL_CONFIG)); + } + + @Override + public void subscribe(SubscriptionPattern pattern) { + throw new UnsupportedOperationException(String.format("Subscribe to RE2/J pattern is not supported when using" + + "the %s protocol defined in config %s", GroupProtocol.CLASSIC, ConsumerConfig.GROUP_PROTOCOL_CONFIG)); + } + /** * Internal helper method for {@link #subscribe(Pattern)} and * {@link #subscribe(Pattern, ConsumerRebalanceListener)} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 7d122c2986c0d..2deaec2efea36 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer; @@ -3609,6 +3610,17 @@ public void testPollSendsRequestToJoin(GroupProtocol groupProtocol) throws Inter "Expected " + (groupProtocol == GroupProtocol.CLASSIC ? "JoinGroup" : "Heartbeat") + " request"); } + @ParameterizedTest + @EnumSource(value = GroupProtocol.class, names = "CLASSIC") + public void testSubscribeToRe2jPatternNotSupportedForClassicConsumer(GroupProtocol groupProtocol) { + KafkaConsumer consumer = newConsumerNoAutoCommit(groupProtocol, time, mock(NetworkClient.class), subscription, + mock(ConsumerMetadata.class)); + assertThrows(UnsupportedOperationException.class, () -> + consumer.subscribe(new SubscriptionPattern("t*"))); + assertThrows(UnsupportedOperationException.class, () -> + consumer.subscribe(new SubscriptionPattern("t*"), mock(ConsumerRebalanceListener.class))); + } + private boolean requestGenerated(MockClient client, ApiKeys apiKey) { return client.requests().stream().anyMatch(request -> request.requestBuilder().apiKey().equals(apiKey)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 8eb8ec4c85bd5..91e4cae0f98a5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; +import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent; @@ -1829,6 +1830,19 @@ public void testSeekToEnd() { assertEquals(OffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy()); } + @Test + public void testSubscribeToRe2JPatternValidation() { + consumer = newConsumer(); + + Throwable t = assertThrows(IllegalArgumentException.class, () -> consumer.subscribe((SubscriptionPattern) null)); + assertEquals("Topic pattern to subscribe to cannot be null", t.getMessage()); + + t = assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(new SubscriptionPattern(""))); + assertEquals("Topic pattern to subscribe to cannot be empty", t.getMessage()); + + assertDoesNotThrow(() -> consumer.subscribe(new SubscriptionPattern("t*"))); + } + private Map mockTopicPartitionOffset() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3);