From b0447719ad1e667319e54d38fda41d6c2593a7b9 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Sun, 1 Sep 2024 00:56:32 +0100 Subject: [PATCH 1/3] MINOR: Fix typo and refactor new group coordinator offset fetch tests Merge the tests for fetchOffsets and fetchAllOffsets together into parameterized tests since they share the same structure. --- .../group/GroupCoordinatorService.java | 4 +- .../group/GroupCoordinatorServiceTest.java | 203 +++++------------- 2 files changed, 56 insertions(+), 151 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 7b093114734a7..5e3417ad174af 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -1217,7 +1217,7 @@ private static boolean isGroupIdNotEmpty(String groupId) { /** * This is the handler commonly used by all the operations that requires to convert errors to - * coordinator errors. The handler also handles and log unexpected errors. + * coordinator errors. The handler also handles and logs unexpected errors. * * @param operationName The name of the operation. * @param operationInput The operation's input for logging purposes. @@ -1272,7 +1272,7 @@ private OUT handleOperationException( /** * This is the handler used by offset fetch operations to convert errors to coordinator errors. - * The handler also handles and log unexpected errors. + * The handler also handles and logs unexpected errors. * * @param operationName The name of the operation. * @param request The OffsetFetchRequestGroup request. diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 1056d80dd387a..201edb24d3714 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -114,6 +114,11 @@ public class GroupCoordinatorServiceTest { + @FunctionalInterface + interface TriFunction { + R apply(A a, B b, C c); + } + @SuppressWarnings("unchecked") private CoordinatorRuntime mockRuntime() { return (CoordinatorRuntime) mock(CoordinatorRuntime.class); @@ -1107,8 +1112,14 @@ public void testDescribeGroupsWhenNotStarted() throws ExecutionException, Interr } @ParameterizedTest - @ValueSource(booleans = {true, false}) + @CsvSource({ + "false, false", + "false, true", + "true, false", + "true, true", + }) public void testFetchOffsets( + boolean fetchAllOffsets, boolean requireStable ) throws ExecutionException, InterruptedException, TimeoutException { CoordinatorRuntime runtime = mockRuntime(); @@ -1124,10 +1135,13 @@ public void testFetchOffsets( OffsetFetchRequestData.OffsetFetchRequestGroup request = new OffsetFetchRequestData.OffsetFetchRequestGroup() - .setGroupId("group") + .setGroupId("group"); + if (!fetchAllOffsets) { + request .setTopics(Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics() .setName("foo") .setPartitionIndexes(Collections.singletonList(0)))); + } OffsetFetchResponseData.OffsetFetchResponseGroup response = new OffsetFetchResponseData.OffsetFetchResponseGroup() @@ -1140,20 +1154,22 @@ public void testFetchOffsets( if (requireStable) { when(runtime.scheduleWriteOperation( - ArgumentMatchers.eq("fetch-offsets"), + ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : "fetch-offsets"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(response)); } else { when(runtime.scheduleReadOperation( - ArgumentMatchers.eq("fetch-offsets"), + ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : "fetch-offsets"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(response)); } - CompletableFuture future = service.fetchOffsets( + TriFunction> fetchOffsets = + fetchAllOffsets ? service::fetchAllOffsets : service::fetchOffsets; + CompletableFuture future = fetchOffsets.apply( requestContext(ApiKeys.OFFSET_FETCH), request, requireStable @@ -1163,8 +1179,14 @@ public void testFetchOffsets( } @ParameterizedTest - @ValueSource(booleans = {true, false}) + @CsvSource({ + "false, false", + "false, true", + "true, false", + "true, true", + }) public void testFetchOffsetsWhenNotStarted( + boolean fetchAllOffsets, boolean requireStable ) throws ExecutionException, InterruptedException { CoordinatorRuntime runtime = mockRuntime(); @@ -1178,12 +1200,17 @@ public void testFetchOffsetsWhenNotStarted( OffsetFetchRequestData.OffsetFetchRequestGroup request = new OffsetFetchRequestData.OffsetFetchRequestGroup() - .setGroupId("group") + .setGroupId("group"); + if (!fetchAllOffsets) { + request .setTopics(Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics() .setName("foo") .setPartitionIndexes(Collections.singletonList(0)))); + } - CompletableFuture future = service.fetchOffsets( + TriFunction> fetchOffsets = + fetchAllOffsets ? service::fetchAllOffsets : service::fetchOffsets; + CompletableFuture future = fetchOffsets.apply( requestContext(ApiKeys.OFFSET_FETCH), request, requireStable @@ -1199,13 +1226,19 @@ public void testFetchOffsetsWhenNotStarted( @ParameterizedTest @CsvSource({ - "UNKNOWN_TOPIC_OR_PARTITION, NOT_COORDINATOR", - "NOT_ENOUGH_REPLICAS, NOT_COORDINATOR", - "REQUEST_TIMED_OUT, NOT_COORDINATOR", - "NOT_LEADER_OR_FOLLOWER, NOT_COORDINATOR", - "KAFKA_STORAGE_ERROR, NOT_COORDINATOR", + "false, UNKNOWN_TOPIC_OR_PARTITION, NOT_COORDINATOR", + "false, NOT_ENOUGH_REPLICAS, NOT_COORDINATOR", + "false, REQUEST_TIMED_OUT, NOT_COORDINATOR", + "false, NOT_LEADER_OR_FOLLOWER, NOT_COORDINATOR", + "false, KAFKA_STORAGE_ERROR, NOT_COORDINATOR", + "true, UNKNOWN_TOPIC_OR_PARTITION, NOT_COORDINATOR", + "true, NOT_ENOUGH_REPLICAS, NOT_COORDINATOR", + "true, REQUEST_TIMED_OUT, NOT_COORDINATOR", + "true, NOT_LEADER_OR_FOLLOWER, NOT_COORDINATOR", + "true, KAFKA_STORAGE_ERROR, NOT_COORDINATOR", }) public void testFetchOffsetsWithWrappedError( + boolean fetchAllOffsets, Errors error, Errors expectedError ) throws ExecutionException, InterruptedException { @@ -1222,152 +1255,24 @@ public void testFetchOffsetsWithWrappedError( OffsetFetchRequestData.OffsetFetchRequestGroup request = new OffsetFetchRequestData.OffsetFetchRequestGroup() - .setGroupId("group") + .setGroupId("group"); + if (!fetchAllOffsets) { + request .setTopics(Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics() .setName("foo") .setPartitionIndexes(Collections.singletonList(0)))); - - when(runtime.scheduleWriteOperation( - ArgumentMatchers.eq("fetch-offsets"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), - ArgumentMatchers.eq(Duration.ofMillis(5000)), - ArgumentMatchers.any() - )).thenReturn(FutureUtils.failedFuture(new CompletionException(error.exception()))); - - CompletableFuture future = service.fetchOffsets( - requestContext(ApiKeys.OFFSET_FETCH), - request, - true - ); - - assertEquals( - new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId("group") - .setErrorCode(expectedError.code()), - future.get() - ); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testFetchAllOffsets( - boolean requireStable - ) throws ExecutionException, InterruptedException, TimeoutException { - CoordinatorRuntime runtime = mockRuntime(); - GroupCoordinatorService service = new GroupCoordinatorService( - new LogContext(), - createConfig(), - runtime, - new GroupCoordinatorMetrics(), - createConfigManager() - ); - - service.startup(() -> 1); - - OffsetFetchRequestData.OffsetFetchRequestGroup request = - new OffsetFetchRequestData.OffsetFetchRequestGroup() - .setGroupId("group"); - - OffsetFetchResponseData.OffsetFetchResponseGroup response = - new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId("group") - .setTopics(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics() - .setName("foo") - .setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions() - .setPartitionIndex(0) - .setCommittedOffset(100L))))); - - if (requireStable) { - when(runtime.scheduleWriteOperation( - ArgumentMatchers.eq("fetch-all-offsets"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), - ArgumentMatchers.eq(Duration.ofMillis(5000)), - ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(response)); - } else { - when(runtime.scheduleReadOperation( - ArgumentMatchers.eq("fetch-all-offsets"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), - ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(response)); } - CompletableFuture future = service.fetchAllOffsets( - requestContext(ApiKeys.OFFSET_FETCH), - request, - requireStable - ); - - assertEquals(response, future.get(5, TimeUnit.SECONDS)); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testFetchAllOffsetsWhenNotStarted( - boolean requireStable - ) throws ExecutionException, InterruptedException { - CoordinatorRuntime runtime = mockRuntime(); - GroupCoordinatorService service = new GroupCoordinatorService( - new LogContext(), - createConfig(), - runtime, - new GroupCoordinatorMetrics(), - createConfigManager() - ); - - OffsetFetchRequestData.OffsetFetchRequestGroup request = - new OffsetFetchRequestData.OffsetFetchRequestGroup() - .setGroupId("group"); - - CompletableFuture future = service.fetchAllOffsets( - requestContext(ApiKeys.OFFSET_FETCH), - request, - requireStable - ); - - assertEquals( - new OffsetFetchResponseData.OffsetFetchResponseGroup() - .setGroupId("group") - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), - future.get() - ); - } - - @ParameterizedTest - @CsvSource({ - "UNKNOWN_TOPIC_OR_PARTITION, NOT_COORDINATOR", - "NOT_ENOUGH_REPLICAS, NOT_COORDINATOR", - "REQUEST_TIMED_OUT, NOT_COORDINATOR", - "NOT_LEADER_OR_FOLLOWER, NOT_COORDINATOR", - "KAFKA_STORAGE_ERROR, NOT_COORDINATOR", - }) - public void testFetchAllOffsetsWithWrappedError( - Errors error, - Errors expectedError - ) throws ExecutionException, InterruptedException { - CoordinatorRuntime runtime = mockRuntime(); - GroupCoordinatorService service = new GroupCoordinatorService( - new LogContext(), - createConfig(), - runtime, - new GroupCoordinatorMetrics(), - createConfigManager() - ); - - service.startup(() -> 1); - - OffsetFetchRequestData.OffsetFetchRequestGroup request = - new OffsetFetchRequestData.OffsetFetchRequestGroup() - .setGroupId("group"); - when(runtime.scheduleWriteOperation( - ArgumentMatchers.eq("fetch-all-offsets"), + ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : "fetch-offsets"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() - )).thenReturn(FutureUtils.failedFuture(new CompletionException(error.exception()))); + )).thenReturn(FutureUtils.failedFuture(new CompletionException(error.exception()))); - CompletableFuture future = service.fetchAllOffsets( + TriFunction> fetchOffsets = + fetchAllOffsets ? service::fetchAllOffsets : service::fetchOffsets; + CompletableFuture future = fetchOffsets.apply( requestContext(ApiKeys.OFFSET_FETCH), request, true From 4b7e6ecd2097aa21df8e76d4c0aed590e46f2856 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 2 Sep 2024 20:16:21 +0100 Subject: [PATCH 2/3] Use Topic.GROUP_METADATA_TOPIC_NAME in new group coordinator tests --- .../group/GroupCoordinatorServiceTest.java | 89 ++++++++++--------- .../group/GroupMetadataManagerTest.java | 3 +- .../group/classic/ClassicGroupTest.java | 3 +- .../GroupCoordinatorMetricsShardTest.java | 7 +- .../metrics/GroupCoordinatorMetricsTest.java | 7 +- .../modern/consumer/ConsumerGroupTest.java | 5 +- 6 files changed, 60 insertions(+), 54 deletions(-) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 201edb24d3714..bccfa57397c16 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; @@ -189,7 +190,7 @@ public void testConsumerGroupHeartbeat() throws ExecutionException, InterruptedE when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("consumer-group-heartbeat"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture( @@ -241,7 +242,7 @@ public void testConsumerGroupHeartbeatWithException( when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("consumer-group-heartbeat"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture(exception)); @@ -315,7 +316,7 @@ public void testOnElection() { service.onElection(5, 10); verify(runtime, times(1)).scheduleLoadOperation( - new TopicPartition("__consumer_offsets", 5), + new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 5), 10 ); } @@ -338,7 +339,7 @@ public void testOnResignation() { service.onResignation(5, OptionalInt.of(10)); verify(runtime, times(1)).scheduleUnloadOperation( - new TopicPartition("__consumer_offsets", 5), + new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 5), OptionalInt.of(10) ); } @@ -358,7 +359,7 @@ public void testOnResignationWithEmptyLeaderEpoch() { service.onResignation(5, OptionalInt.empty()); verify(runtime, times(1)).scheduleUnloadOperation( - new TopicPartition("__consumer_offsets", 5), + new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 5), OptionalInt.empty() ); } @@ -382,7 +383,7 @@ public void testJoinGroup() { when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("classic-group-join"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture( @@ -417,7 +418,7 @@ public void testJoinGroupWithException() throws Exception { when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("classic-group-join"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture(new IllegalStateException())); @@ -560,7 +561,7 @@ public void testSyncGroup() { when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("classic-group-sync"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture( @@ -594,7 +595,7 @@ public void testSyncGroupWithException() throws Exception { when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("classic-group-sync"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture(new IllegalStateException())); @@ -688,7 +689,7 @@ public void testHeartbeat() throws Exception { when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("classic-group-heartbeat"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture( @@ -722,7 +723,7 @@ public void testHeartbeatCoordinatorNotAvailableException() throws Exception { when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("classic-group-heartbeat"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture( @@ -756,7 +757,7 @@ public void testHeartbeatCoordinatorException() throws Exception { when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("classic-group-heartbeat"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture( @@ -1000,14 +1001,14 @@ public void testDescribeGroups() throws Exception { when(runtime.scheduleReadOperation( ArgumentMatchers.eq("describe-groups"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1))); CompletableFuture describedGroupFuture = new CompletableFuture<>(); when(runtime.scheduleReadOperation( ArgumentMatchers.eq("describe-groups"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)), ArgumentMatchers.any() )).thenReturn(describedGroupFuture); @@ -1043,7 +1044,7 @@ public void testDescribeGroupsInvalidGroupId() throws Exception { when(runtime.scheduleReadOperation( ArgumentMatchers.eq("describe-groups"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup))); @@ -1068,7 +1069,7 @@ public void testDescribeGroupCoordinatorLoadInProgress() throws Exception { when(runtime.scheduleReadOperation( ArgumentMatchers.eq("describe-groups"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture( new CoordinatorLoadInProgressException(null) @@ -1155,14 +1156,14 @@ public void testFetchOffsets( if (requireStable) { when(runtime.scheduleWriteOperation( ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : "fetch-offsets"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(response)); } else { when(runtime.scheduleReadOperation( ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : "fetch-offsets"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(response)); } @@ -1265,7 +1266,7 @@ public void testFetchOffsetsWithWrappedError( when(runtime.scheduleWriteOperation( ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : "fetch-offsets"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture(new CompletionException(error.exception()))); @@ -1304,7 +1305,7 @@ public void testLeaveGroup() throws Exception { when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("classic-group-leave"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture( @@ -1346,7 +1347,7 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception { when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("classic-group-leave"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture( @@ -1425,14 +1426,14 @@ public void testConsumerGroupDescribe() throws InterruptedException, ExecutionEx when(runtime.scheduleReadOperation( ArgumentMatchers.eq("consumer-group-describe"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1))); CompletableFuture describedGroupFuture = new CompletableFuture<>(); when(runtime.scheduleReadOperation( ArgumentMatchers.eq("consumer-group-describe"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)), ArgumentMatchers.any() )).thenReturn(describedGroupFuture); @@ -1469,7 +1470,7 @@ public void testConsumerGroupDescribeInvalidGroupId() throws ExecutionException, when(runtime.scheduleReadOperation( ArgumentMatchers.eq("consumer-group-describe"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup))); @@ -1494,7 +1495,7 @@ public void testConsumerGroupDescribeCoordinatorLoadInProgress() throws Executio when(runtime.scheduleReadOperation( ArgumentMatchers.eq("consumer-group-describe"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture( new CoordinatorLoadInProgressException(null) @@ -1524,7 +1525,7 @@ public void testConsumerGroupDescribeCoordinatorNotActive() throws ExecutionExce ); when(runtime.scheduleReadOperation( ArgumentMatchers.eq("consumer-group-describe"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture( Errors.COORDINATOR_NOT_AVAILABLE.exception() @@ -1579,7 +1580,7 @@ public void testDeleteOffsets() throws Exception { when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("delete-offsets"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(response)); @@ -1622,7 +1623,7 @@ public void testDeleteOffsetsInvalidGroupId() throws Exception { when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("delete-offsets"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(response)); @@ -1670,7 +1671,7 @@ public void testDeleteOffsetsWithException( when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("delete-offsets"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture(exception)); @@ -1751,7 +1752,7 @@ public void testDeleteGroups() throws Exception { when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("delete-groups"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 2)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(resultCollection1)); @@ -1759,14 +1760,14 @@ public void testDeleteGroups() throws Exception { CompletableFuture resultCollectionFuture = new CompletableFuture<>(); when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("delete-groups"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(resultCollectionFuture); when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("delete-groups"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())); @@ -1800,7 +1801,7 @@ public void testDeleteGroupsWithException( when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("delete-groups"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture(exception)); @@ -1965,7 +1966,7 @@ public void testCommitTransactionalOffsets() throws ExecutionException, Interrup when(runtime.scheduleTransactionalWriteOperation( ArgumentMatchers.eq("txn-commit-offset"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq("transactional-id"), ArgumentMatchers.eq(10L), ArgumentMatchers.eq((short) 5), @@ -2024,7 +2025,7 @@ public void testCommitTransactionalOffsetsWithWrappedError( when(runtime.scheduleTransactionalWriteOperation( ArgumentMatchers.eq("txn-commit-offset"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq("transactional-id"), ArgumentMatchers.eq(10L), ArgumentMatchers.eq((short) 5), @@ -2056,7 +2057,7 @@ public void testCompleteTransaction() throws ExecutionException, InterruptedExce when(runtime.scheduleTransactionCompletion( ArgumentMatchers.eq("write-txn-marker"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(100L), ArgumentMatchers.eq((short) 5), ArgumentMatchers.eq(10), @@ -2065,7 +2066,7 @@ public void testCompleteTransaction() throws ExecutionException, InterruptedExce )).thenReturn(CompletableFuture.completedFuture(null)); CompletableFuture future = service.completeTransaction( - new TopicPartition("__consumer_offsets", 0), + new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0), 100L, (short) 5, 10, @@ -2211,7 +2212,7 @@ public void testShareGroupHeartbeat() throws ExecutionException, InterruptedExce when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("share-group-heartbeat"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture( @@ -2249,7 +2250,7 @@ public void testShareGroupHeartbeatWithException( when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("share-group-heartbeat"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture(exception)); @@ -2291,14 +2292,14 @@ public void testShareGroupDescribe() throws InterruptedException, ExecutionExcep when(runtime.scheduleReadOperation( ArgumentMatchers.eq("share-group-describe"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1))); CompletableFuture describedGroupFuture = new CompletableFuture<>(); when(runtime.scheduleReadOperation( ArgumentMatchers.eq("share-group-describe"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)), ArgumentMatchers.any() )).thenReturn(describedGroupFuture); @@ -2335,7 +2336,7 @@ public void testShareGroupDescribeInvalidGroupId() throws ExecutionException, In when(runtime.scheduleReadOperation( ArgumentMatchers.eq("share-group-describe"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup))); @@ -2360,7 +2361,7 @@ public void testShareGroupDescribeCoordinatorLoadInProgress() throws ExecutionEx when(runtime.scheduleReadOperation( ArgumentMatchers.eq("share-group-describe"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture( new CoordinatorLoadInProgressException(null) @@ -2390,7 +2391,7 @@ public void testShareGroupDescribeCoordinatorNotActive() throws ExecutionExcepti ); when(runtime.scheduleReadOperation( ArgumentMatchers.eq("share-group-describe"), - ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture( Errors.COORDINATOR_NOT_AVAILABLE.exception() diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index b500bf57adda9..1f67b2616f738 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnreleasedInstanceIdException; import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; @@ -3677,7 +3678,7 @@ public void testReplayGroupMetadataRecords(boolean useDefaultRebalanceTimeout) { new GroupCoordinatorMetricsShard( context.snapshotRegistry, Collections.emptyMap(), - new TopicPartition("__consumer_offsets", 0) + new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) ), 1, Optional.of("consumer"), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java index 1c5dfaa10541b..b9284a366667b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.IllegalGenerationException; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol; import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection; import org.apache.kafka.common.message.JoinGroupResponseData; @@ -87,7 +88,7 @@ public class ClassicGroupTest { private final GroupCoordinatorMetricsShard metrics = new GroupCoordinatorMetricsShard( new SnapshotRegistry(logContext), Collections.emptyMap(), - new TopicPartition("__consumer_offsets", 0) + new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) ); private ClassicGroup group = null; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java index 950c359294ae2..edafe7af1cfb2 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -50,7 +51,7 @@ public void testTimelineGaugeCounters() { MetricsRegistry registry = new MetricsRegistry(); Metrics metrics = new Metrics(); SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - TopicPartition tp = new TopicPartition("__consumer_offsets", 0); + TopicPartition tp = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0); GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics); GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(snapshotRegistry, tp); @@ -102,7 +103,7 @@ public void testTimelineGaugeCounters() { public void testGenericGroupStateTransitionMetrics() { MetricsRegistry registry = new MetricsRegistry(); Metrics metrics = new Metrics(); - TopicPartition tp = new TopicPartition("__consumer_offsets", 0); + TopicPartition tp = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0); GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics); GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(new SnapshotRegistry(new LogContext()), tp); coordinatorMetrics.activateMetricsShard(shard); @@ -156,7 +157,7 @@ public void testConsumerGroupStateTransitionMetrics() { MetricsRegistry registry = new MetricsRegistry(); Metrics metrics = new Metrics(); SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - TopicPartition tp = new TopicPartition("__consumer_offsets", 0); + TopicPartition tp = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0); GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics); GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(snapshotRegistry, tp); coordinatorMetrics.activateMetricsShard(shard); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java index 31995a7175784..c6a4b3970bfa3 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -124,8 +125,8 @@ public void aggregateShards() { GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics); SnapshotRegistry snapshotRegistry0 = new SnapshotRegistry(new LogContext()); SnapshotRegistry snapshotRegistry1 = new SnapshotRegistry(new LogContext()); - TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0); - TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1); + TopicPartition tp0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0); + TopicPartition tp1 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1); GroupCoordinatorMetricsShard shard0 = coordinatorMetrics.newMetricsShard(snapshotRegistry0, tp0); GroupCoordinatorMetricsShard shard1 = coordinatorMetrics.newMetricsShard(snapshotRegistry1, tp1); coordinatorMetrics.activateMetricsShard(shard0); @@ -178,7 +179,7 @@ public void testGlobalSensors() { Metrics metrics = new Metrics(time); GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics); GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard( - new SnapshotRegistry(new LogContext()), new TopicPartition("__consumer_offsets", 0) + new SnapshotRegistry(new LogContext()), new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) ); shard.record(CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME, 10); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java index 15d5e71bc3759..331463e82b22c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.utils.LogContext; @@ -1107,7 +1108,7 @@ public void testAsListedGroup() { GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard( snapshotRegistry, Collections.emptyMap(), - new TopicPartition("__consumer_offsets", 0) + new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) ); ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard); snapshotRegistry.idempotentCreateSnapshot(0); @@ -1321,7 +1322,7 @@ public void testIsInStatesCaseInsensitive() { GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard( snapshotRegistry, Collections.emptyMap(), - new TopicPartition("__consumer_offsets", 0) + new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) ); ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard); snapshotRegistry.idempotentCreateSnapshot(0); From 871591aec8fd97875516d2be62bc6c843920739d Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 9 Oct 2024 09:35:24 +0100 Subject: [PATCH 3/3] fixup: fix merge --- .../apache/kafka/coordinator/group/GroupMetadataManagerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 4a64238270c31..86b9e19802415 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -35,7 +35,6 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnreleasedInstanceIdException; import org.apache.kafka.common.errors.UnsupportedAssignorException; -import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;