Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15561: Client support for new SubscriptionPattern based subscription #15188

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
61037a8
created SubscriptionPattern for new regex-subscription functionalities
Jan 9, 2024
423b6a2
added logic in HeartBeatRequestManager to send topic regex to broker
Jan 11, 2024
a90da13
added functions that use SubscriptionPattern to subscribe to topic
Jan 13, 2024
4df71e9
fixed checkStyle errors in SubscriptionPattern
Jan 13, 2024
819c5ec
Retrigger CI
Jan 14, 2024
8e8e6c6
Re-trigger tests
Jan 15, 2024
e6a1bd5
added SubscribedTopicRegex to ConsumerGroupdHeartbeatRequestData, ref…
Feb 2, 2024
38c08d3
reverted unwanted log level changes in log4j.properties and added sub…
Feb 2, 2024
20a1835
Merge branch 'trunk' into KAFKA-15561
Phuc-Hong-Tran Feb 4, 2024
1f718ff
refactored subscribeInternal in AsyncKafkaConsumer
Phuc-Hong-Tran Feb 6, 2024
366bc72
refactored toString method to include subscriptionPattern in Subscrip…
Feb 9, 2024
5841899
Added commments to explain subscribe methods in KafkaConsumer that u…
Feb 15, 2024
c66c474
Added tests relating to SubscriptionPattern for SubscriptionState, As…
Feb 16, 2024
a48a84d
Refactored AsyncKafkaConsumerTest
Feb 17, 2024
df300c7
Fixed consumerGroupHeartbeatRequestData RPC versioning
Feb 19, 2024
aa0b1d1
Added comment to explain what was added in ConsumerGroupHeartbeatRequ…
Feb 19, 2024
bbf604d
Revert changes on subscribe methods in subscriptionState
Feb 19, 2024
53fb53d
Fix unwanted spacing in OffsetsRequestManagerTest.java
Phuc-Hong-Tran Mar 5, 2024
0d6caa1
Merge branch 'trunk' into KAFKA-15561
Phuc-Hong-Tran Mar 8, 2024
695b58b
Update SubscriptionPattern's documentation
Phuc-Hong-Tran Mar 8, 2024
b5eb9c3
Revert "Fix unwanted spacing in OffsetsRequestManagerTest.java"
Mar 8, 2024
ec26dcb
Refactored AsyncKafkaConsumerTest, moved some pattern validations int…
Mar 13, 2024
443739a
fixed styleCheck in SubscriptionPattern
Mar 13, 2024
d035c7d
Revert changes in MockConsumer
Mar 13, 2024
3d3ff82
Revert "fixed styleCheck in SubscriptionPattern"
Mar 13, 2024
0f8b7d3
revert previous changes
Mar 13, 2024
d228f6d
fixed spotbugs failures in client package
Mar 13, 2024
969e0a6
Included logic to check for null input in subscribe method that use S…
Mar 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ public interface Consumer<K, V> extends Closeable {
*/
void subscribe(Pattern pattern);

/**
* @see KafkaConsumer#subscribe(SubscriptionPattern, ConsumerRebalanceListener)
*/
void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback);

/**
* @see KafkaConsumer#subscribe(SubscriptionPattern)
*/
void subscribe(SubscriptionPattern pattern);

/**
* @see KafkaConsumer#unsubscribe()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,51 @@ public void subscribe(Pattern pattern) {
delegate.subscribe(pattern);
}

/**
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against all topics existing at the time of check.
* This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering
* the max metadata age, the consumer will refresh metadata more often and check for matching topics.
* <p>
* See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the
* use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there
* is a change to the topics matching the provided pattern and when consumer group membership changes.
* Group rebalances only take place during an active call to {@link #poll(Duration)}.
*
* @param pattern SubscriptionPattern to subscribe to
* @param listener Non-null listener instance to get notifications on partition assignment/revocation for the
* subscribed topics
* @throws IllegalArgumentException If pattern or listener is null
* @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()}), or if not
* configured at-least one partition assignment strategy
*/
@Override
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {
delegate.subscribe(pattern, callback);
}

/**
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done on the broker against topics existing at the time of check.
* <p>
* This is a short-hand for {@link #subscribe(SubscriptionPattern, ConsumerRebalanceListener)}, which
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be "shorthand".

* uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer
* {@link #subscribe(SubscriptionPattern, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets
* to be reset. You should also provide your own listener if you are doing your own offset
* management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
*
* @param pattern SubscriptionPattern to subscribe to
* @throws IllegalArgumentException If pattern is null
* @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()}), or if not
* configured at-least one partition assignment strategy
*/
@Override
public void subscribe(SubscriptionPattern pattern) {
delegate.subscribe(pattern);
}

/**
* Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}.
* This also clears any partitions directly assigned through {@link #assign(Collection)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ public synchronized void subscribe(Pattern pattern) {
subscribe(pattern, Optional.empty());
}

@Override
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be very easy to adapt the implementation of subscribe(Pattern pattern, ConsumerRebalanceListener callback) for this rather than leaving it blank.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. This is necessary for full coverage of the client-side changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield, just a question, why do we need to adapt the implementation of subscribe(Pattern pattern, ConsumerRebalanceListener callback) while the SubscriptionPattern is to be resolved on the broker, not locally like Pattern. If we go that route the MockConsumer won't really simulate how the AsyncConsumer works

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind about the details of the implementation here, but I expect you do need a mocked implemented of this new method in order to complete the PR.

}

@Override
public void subscribe(SubscriptionPattern pattern) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here.

}

@Override
public void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) {
if (listener == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;

/**
* A class that hold a regular expression which compatible with Google's RE2J engine. Visit
* <a href="https://github.com/google/re2j">this repository</a> for details on RE2J engine.
*/

public class SubscriptionPattern {
final private String pattern;
public SubscriptionPattern(final String pattern) {
this.pattern = pattern;
}

public String pattern() {
return this.pattern;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
Expand Down Expand Up @@ -1666,6 +1667,19 @@ public void subscribe(Pattern pattern) {
subscribeInternal(pattern, Optional.empty());
}

@Override
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener listener) {
if (listener == null)
throw new IllegalArgumentException("RebalanceListener cannot be null");

subscribeInternal(pattern, Optional.of(listener));
}

@Override
public void subscribe(SubscriptionPattern pattern) {
subscribeInternal(pattern, Optional.empty());
}

@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
if (listener == null)
Expand Down Expand Up @@ -1730,6 +1744,21 @@ private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListen
}
}

private void subscribeInternal(SubscriptionPattern pattern, Optional<ConsumerRebalanceListener> listener) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
if (pattern == null || pattern.pattern().isEmpty())
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
"null" : "empty"));
throwIfNoAssignorsConfigured();
log.info("Subscribed to pattern: '{}'", pattern.pattern());
subscriptions.subscribe(pattern, listener);
Comment on lines +1781 to +1782
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've made a lot of effort to move any mutation of the SubscriptionState object to the background thread. Is there any reason we should not follow the same pattern (pun intended) here? If there's a good reason not to, we definitely should add some documentary comments to that effect.

} finally {
release();
}
}

private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
acquireAndEnsureOpen();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,11 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() {
}
} else {
// SubscribedTopicRegex - only sent if has changed since the last heartbeat
// - not supported yet
String subscriptionRegex = this.subscriptions.subscriptionPattern().pattern();
if (!subscriptionRegex.equals(sentFields.subscribedTopicRegex)) {
data.setSubscribedTopicRegex(subscriptionRegex);
sentFields.subscribedTopicRegex = subscriptionRegex;
}
}

// ServerAssignor - only sent if has changed since the last heartbeat
Expand Down Expand Up @@ -591,6 +595,7 @@ static class SentFields {
private String instanceId = null;
private int rebalanceTimeoutMs = -1;
private TreeSet<String> subscribedTopicNames = null;
private String subscribedTopicRegex = null;
private String serverAssignor = null;
private TreeSet<String> topicPartitions = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
Expand Down Expand Up @@ -495,6 +496,16 @@ public void subscribe(Pattern pattern) {
subscribeInternal(pattern, Optional.empty());
}

@Override
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {
log.warn("Operation not supported in new consumer group protocol");
}

@Override
public void subscribe(SubscriptionPattern pattern) {
log.warn("Operation not supported in new consumer group protocol");
}

/**
* Internal helper method for {@link #subscribe(Pattern)} and
* {@link #subscribe(Pattern, ConsumerRebalanceListener)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.PartitionStates;
Expand Down Expand Up @@ -84,6 +85,9 @@ private enum SubscriptionType {
/* the pattern user has requested */
private Pattern subscribedPattern;

/* RE2J compatible regex */
private SubscriptionPattern subscriptionPattern;
Comment on lines 85 to +89
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about having a class that contains either a SubscriptionPattern or a Pattern?
Something like :

public class JavaPatternOrSubscriptionPattern {

    private final Pattern javaPattern;

    private final SubscriptionPattern subscriptionPattern;

    private JavaPatternOrSubscriptionPattern(final Pattern pattern) {
        this.pattern = pattern;
    } 

    private JavaPatternOrSubscriptionPattern(final SubscriptionPattern pattern) {
        this.subscriptionPattern = pattern;
    } 

    public static JavaPatternOrSubscriptionPattern javaPattern(final Pattern pattern) {
        return new JavaPatternOrSubscriptionPattern(pattern);
    }

    public static JavaPatternOrSubscriptionPattern subscriptionPattern(final SubscriptionPattern pattern) {
        return new JavaPatternOrSubscriptionPattern(pattern);
    }

    public String pattern() {
        return subscriptionPattern != null ? subscriptionPattern.pattern() : javaPattern.pattern();
    }

    public String toString() {
       return "pattern = " + pattern();
    } 

   ...
} 

In such a way we would encapsulate the code that ensures that there is only of both set.

Copy link
Contributor Author

@Phuc-Hong-Tran Phuc-Hong-Tran Mar 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a bit overkill to have an abstraction like this, the Pattern and SubscriptionPattern are already abstractions for the underneath regex string. What do you guys think @lianetm @dajac?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, we should adopt this class.

Copy link
Contributor Author

@Phuc-Hong-Tran Phuc-Hong-Tran Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, I think adding this class would be a bit much when it only encapsulate the logic to check whether pattern or subscriptionPattern is set

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @cadonna. It seems very odd to me having subscribedPattern and subscriptionPattern. I understand that there are old patterns and new patterns, but really they are achieving the same thing. Hiding the difference inside subscribedPattern seems sensible to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield I understand, will change the subscribedPattern accordingly


/* the list of topics the user has requested */
private Set<String> subscription;

Expand All @@ -108,6 +112,7 @@ public synchronized String toString() {
return "SubscriptionState{" +
"type=" + subscriptionType +
", subscribedPattern=" + subscribedPattern +
", subscriptionPattern=" + subscriptionPattern +
", subscription=" + String.join(",", subscription) +
", groupSubscription=" + String.join(",", groupSubscription) +
", defaultResetStrategy=" + defaultResetStrategy +
Expand Down Expand Up @@ -136,6 +141,7 @@ public SubscriptionState(LogContext logContext, OffsetResetStrategy defaultReset
this.assignment = new PartitionStates<>();
this.groupSubscription = new HashSet<>();
this.subscribedPattern = null;
this.subscriptionPattern = null;
this.subscriptionType = SubscriptionType.NONE;
}

Expand Down Expand Up @@ -174,6 +180,12 @@ public synchronized void subscribe(Pattern pattern, Optional<ConsumerRebalanceLi
this.subscribedPattern = pattern;
}

public synchronized void subscribe(SubscriptionPattern pattern, Optional<ConsumerRebalanceListener> listener) {
registerRebalanceListener(listener);
setSubscriptionType(SubscriptionType.AUTO_PATTERN);
this.subscriptionPattern = pattern;
}

public synchronized boolean subscribeFromPattern(Set<String> topics) {
if (subscriptionType != SubscriptionType.AUTO_PATTERN)
throw new IllegalArgumentException("Attempt to subscribe from pattern while subscription type set to " +
Expand Down Expand Up @@ -306,6 +318,7 @@ public synchronized void unsubscribe() {
this.groupSubscription = Collections.emptySet();
this.assignment.clear();
this.subscribedPattern = null;
this.subscriptionPattern = null;
this.subscriptionType = SubscriptionType.NONE;
this.assignmentId++;
}
Expand All @@ -328,6 +341,10 @@ public synchronized Set<String> subscription() {
return Collections.emptySet();
}

public synchronized SubscriptionPattern subscriptionPattern() {
return this.subscriptionPattern;
}

public synchronized Set<TopicPartition> pausedPartitions() {
return collectPartitions(TopicPartitionState::isPaused);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
"about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
{ "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "topicName",
"about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
{ "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" },
{ "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." },
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,13 @@ public void testSubscriptionOnNullPattern(GroupProtocol groupProtocol) {
assertThrows(IllegalArgumentException.class,
() -> consumer.subscribe((Pattern) null));
}
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CONSUMER")
public void testSubscriptionOnNullSubscriptionPattern(GroupProtocol groupProtocol) {
consumer = newConsumer(groupProtocol, groupId);
assertThrows(IllegalArgumentException.class,
() -> consumer.subscribe((SubscriptionPattern) null));
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
Expand All @@ -494,6 +501,14 @@ public void testSubscriptionOnEmptyPattern(GroupProtocol groupProtocol) {
() -> consumer.subscribe(Pattern.compile("")));
}

@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CONSUMER")
public void testSubscriptionOnEmptySubscriptionPattern(GroupProtocol groupProtocol) {
consumer = newConsumer(groupProtocol, groupId);
assertThrows(IllegalArgumentException.class,
() -> consumer.subscribe(new SubscriptionPattern("")));
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testSubscriptionWithEmptyPartitionAssignment(GroupProtocol groupProtocol) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
Expand Down Expand Up @@ -582,6 +583,23 @@ public void testPartitionRevocationOnClose() {
assertTrue(subscriptions.assignedPartitions().isEmpty());
assertEquals(1, listener.revokedCount);
}
@Test
public void testSubscribeToSubscriptionPattern() {
MockRebalanceListener listener = new MockRebalanceListener();
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
consumer = newConsumer(
mock(FetchBuffer.class),
mock(ConsumerInterceptors.class),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
singletonList(new RoundRobinAssignor()),
"group-id",
"client-id");

SubscriptionPattern pattern = new SubscriptionPattern("t*");
consumer.subscribe(pattern, listener);
assertEquals(subscriptions.subscriptionPattern(), pattern);
}

@Test
public void testFailedPartitionRevocationOnClose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ public void testValidateConsumerGroupHeartbeatRequest(final short version) {
assertEquals(memberEpoch, heartbeatRequest.data().memberEpoch());
assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, heartbeatRequest.data().rebalanceTimeoutMs());
assertEquals(subscribedTopics, heartbeatRequest.data().subscribedTopicNames());
assertEquals(null, heartbeatRequest.data().subscribedTopicRegex());
assertEquals(DEFAULT_GROUP_INSTANCE_ID, heartbeatRequest.data().instanceId());
assertEquals(DEFAULT_REMOTE_ASSIGNOR, heartbeatRequest.data().serverAssignor());
}
Expand Down Expand Up @@ -477,6 +478,7 @@ public void testHeartbeatState() {
assertNull(data.instanceId());
assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs());
assertEquals(Collections.emptyList(), data.subscribedTopicNames());
assertEquals(null, data.subscribedTopicRegex());
assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor());
assertEquals(Collections.emptyList(), data.topicPartitions());
membershipManager.onHeartbeatRequestSent();
Expand All @@ -491,6 +493,7 @@ public void testHeartbeatState() {
assertNull(data.instanceId());
assertEquals(-1, data.rebalanceTimeoutMs());
assertNull(data.subscribedTopicNames());
assertNull(data.subscribedTopicRegex());
assertNull(data.serverAssignor());
assertNull(data.topicPartitions());
membershipManager.onHeartbeatRequestSent();
Expand All @@ -508,6 +511,7 @@ public void testHeartbeatState() {
assertNull(data.instanceId());
assertEquals(-1, data.rebalanceTimeoutMs());
assertEquals(Collections.singletonList(topic), data.subscribedTopicNames());
assertNull(data.subscribedTopicRegex());
assertNull(data.serverAssignor());
assertNull(data.topicPartitions());
membershipManager.onHeartbeatRequestSent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
import static org.mockito.Mockito.when;

public class OffsetsRequestManagerTest {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I find it's better to avoid changes in unrelated files, even if minor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please revert this change?

private OffsetsRequestManager requestManager;
private ConsumerMetadata metadata;
private SubscriptionState subscriptionState;
Expand Down
Loading