Skip to content

Commit

Permalink
KAFKA-15561 [1/N]: Introduce new subscribe api for RE2J regex (#17897)
Browse files Browse the repository at this point in the history
Reviewers: David Jacot <[email protected]>
  • Loading branch information
lianetm authored Nov 22, 2024
1 parent c2352f8 commit 0d7c765
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ public interface Consumer<K, V> extends Closeable {
*/
void subscribe(Pattern pattern);

/**
* @see KafkaConsumer#subscribe(SubscriptionPattern, ConsumerRebalanceListener)
*/
void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback);

/**
* @see KafkaConsumer#subscribe(SubscriptionPattern)
*/
void subscribe(SubscriptionPattern pattern);

/**
* @see KafkaConsumer#unsubscribe()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidRegularExpression;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
Expand Down Expand Up @@ -755,6 +756,55 @@ public void subscribe(Pattern pattern) {
delegate.subscribe(pattern);
}

/**
* Subscribe to all topics matching the specified pattern, to get dynamically assigned partitions.
* The pattern matching will be done periodically against all topics. This is only supported under the
* CONSUMER group protocol (see {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG}).
* <p>
* If the provided pattern is not compatible with Google RE2/J, an {@link InvalidRegularExpression} will be
* eventually thrown on a call to {@link #poll(Duration)} following this call to subscribe.
* <p>
* See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the
* use of the {@link ConsumerRebalanceListener}. Generally, rebalances are triggered when there
* is a change to the topics matching the provided pattern and when consumer group membership changes.
* Group rebalances only take place during an active call to {@link #poll(Duration)}.
*
* @param pattern Pattern to subscribe to, that must be compatible with Google RE2/J.
* @param listener Non-null listener instance to get notifications on partition assignment/revocation for the
* subscribed topics.
* @throws IllegalArgumentException If pattern is null or empty, or if the listener is null.
* @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()}).
*/
@Override
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener listener) {
delegate.subscribe(pattern, listener);
}

/**
* Subscribe to all topics matching the specified pattern, to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics. This is only supported under the
* CONSUMER group protocol (see {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG})
* <p>
* If the provided pattern is not compatible with Google RE2/J, an {@link InvalidRegularExpression} will be
* eventually thrown on a call to {@link #poll(Duration)} following this call to subscribe.
* <p>
* This is a short-hand for {@link #subscribe(Pattern, ConsumerRebalanceListener)}, which
* uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer
* {@link #subscribe(Pattern, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets
* to be reset. You should also provide your own listener if you are doing your own offset
* management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
*
* @param pattern Pattern to subscribe to, that must be compatible with Google RE2/J.
* @throws IllegalArgumentException If pattern is null or empty.
* @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()}).
*/
@Override
public void subscribe(SubscriptionPattern pattern) {
delegate.subscribe(pattern);
}

/**
* Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}.
* This also clears any partitions directly assigned through {@link #assign(Collection)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ public synchronized void subscribe(Pattern pattern) {
subscribe(pattern, Optional.empty());
}

@Override
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {
throw new UnsupportedOperationException("Subscribe to RE2/J regular expression not supported in MockConsumer yet");
}

@Override
public void subscribe(SubscriptionPattern pattern) {
throw new UnsupportedOperationException("Subscribe to RE2/J regular expression not supported in MockConsumer yet");
}

@Override
public void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) {
if (listener == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;

import java.util.Objects;

/**
* Represents a regular expression compatible with Google RE2/J, used to subscribe to topics.
* This just keeps the String representation of the pattern, and all validations to ensure
* it is RE2/J compatible are delegated to the broker.
*/
public class SubscriptionPattern {

/**
* String representation the regular expression, compatible with RE2/J.
*/
private final String pattern;

public SubscriptionPattern(String pattern) {
this.pattern = pattern;
}

/**
* @return Regular expression pattern compatible with RE2/J.
*/
public String pattern() {
return this.pattern;
}

@Override
public String toString() {
return pattern;
}

@Override
public int hashCode() {
return pattern.hashCode();
}

@Override
public boolean equals(Object obj) {
return obj instanceof SubscriptionPattern &&
Objects.equals(pattern, ((SubscriptionPattern) obj).pattern);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.events.AllTopicsMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
Expand Down Expand Up @@ -1795,6 +1796,16 @@ public void subscribe(Pattern pattern) {
subscribeInternal(pattern, Optional.empty());
}

@Override
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {
subscribeToRegex(pattern, Optional.ofNullable(callback));
}

@Override
public void subscribe(SubscriptionPattern pattern) {
subscribeToRegex(pattern, Optional.empty());
}

@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
if (listener == null)
Expand Down Expand Up @@ -1857,6 +1868,29 @@ private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListen
}
}

/**
* Subscribe to the RE2/J pattern. This will generate an event to update the pattern in the
* subscription, so it's included in a next heartbeat request sent to the broker. No validation of the pattern is
* performed by the client (other than null/empty checks).
*/
private void subscribeToRegex(SubscriptionPattern pattern,
Optional<ConsumerRebalanceListener> listener) {
maybeThrowInvalidGroupIdException();
throwIfSubscriptionPatternIsInvalid(pattern);
log.info("Subscribing to regular expression {}", pattern);

// TODO: generate event to update subscribed regex so it's included in the next HB.
}

private void throwIfSubscriptionPatternIsInvalid(SubscriptionPattern subscriptionPattern) {
if (subscriptionPattern == null) {
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");
}
if (subscriptionPattern.pattern().isEmpty()) {
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be empty");
}
}

private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
acquireAndEnsureOpen();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
Expand Down Expand Up @@ -520,6 +521,18 @@ public void subscribe(Pattern pattern) {
subscribeInternal(pattern, Optional.empty());
}

@Override
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {
throw new UnsupportedOperationException(String.format("Subscribe to RE2/J pattern is not supported when using" +
"the %s protocol defined in config %s", GroupProtocol.CLASSIC, ConsumerConfig.GROUP_PROTOCOL_CONFIG));
}

@Override
public void subscribe(SubscriptionPattern pattern) {
throw new UnsupportedOperationException(String.format("Subscribe to RE2/J pattern is not supported when using" +
"the %s protocol defined in config %s", GroupProtocol.CLASSIC, ConsumerConfig.GROUP_PROTOCOL_CONFIG));
}

/**
* Internal helper method for {@link #subscribe(Pattern)} and
* {@link #subscribe(Pattern, ConsumerRebalanceListener)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
Expand Down Expand Up @@ -3609,6 +3610,17 @@ public void testPollSendsRequestToJoin(GroupProtocol groupProtocol) throws Inter
"Expected " + (groupProtocol == GroupProtocol.CLASSIC ? "JoinGroup" : "Heartbeat") + " request");
}

@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testSubscribeToRe2jPatternNotSupportedForClassicConsumer(GroupProtocol groupProtocol) {
KafkaConsumer<String, String> consumer = newConsumerNoAutoCommit(groupProtocol, time, mock(NetworkClient.class), subscription,
mock(ConsumerMetadata.class));
assertThrows(UnsupportedOperationException.class, () ->
consumer.subscribe(new SubscriptionPattern("t*")));
assertThrows(UnsupportedOperationException.class, () ->
consumer.subscribe(new SubscriptionPattern("t*"), mock(ConsumerRebalanceListener.class)));
}

private boolean requestGenerated(MockClient client, ApiKeys apiKey) {
return client.requests().stream().anyMatch(request -> request.requestBuilder().apiKey().equals(apiKey));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
Expand Down Expand Up @@ -1829,6 +1830,19 @@ public void testSeekToEnd() {
assertEquals(OffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy());
}

@Test
public void testSubscribeToRe2JPatternValidation() {
consumer = newConsumer();

Throwable t = assertThrows(IllegalArgumentException.class, () -> consumer.subscribe((SubscriptionPattern) null));
assertEquals("Topic pattern to subscribe to cannot be null", t.getMessage());

t = assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(new SubscriptionPattern("")));
assertEquals("Topic pattern to subscribe to cannot be empty", t.getMessage());

assertDoesNotThrow(() -> consumer.subscribe(new SubscriptionPattern("t*")));
}

private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
Expand Down

0 comments on commit 0d7c765

Please sign in to comment.