Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Fix typo and refactor new group coordinator tests #17072

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,7 @@ private static boolean isGroupIdNotEmpty(String groupId) {

/**
* This is the handler commonly used by all the operations that requires to convert errors to
* coordinator errors. The handler also handles and log unexpected errors.
* coordinator errors. The handler also handles and logs unexpected errors.
*
* @param operationName The name of the operation.
* @param operationInput The operation's input for logging purposes.
Expand Down Expand Up @@ -1272,7 +1272,7 @@ private <IN, OUT> OUT handleOperationException(

/**
* This is the handler used by offset fetch operations to convert errors to coordinator errors.
* The handler also handles and log unexpected errors.
* The handler also handles and logs unexpected errors.
*
* @param operationName The name of the operation.
* @param request The OffsetFetchRequestGroup request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@

public class GroupCoordinatorServiceTest {

@FunctionalInterface
interface TriFunction<A, B, C, R> {
R apply(A a, B b, C c);
}

@SuppressWarnings("unchecked")
private CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime() {
return (CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord>) mock(CoordinatorRuntime.class);
Expand Down Expand Up @@ -1107,8 +1112,14 @@ public void testDescribeGroupsWhenNotStarted() throws ExecutionException, Interr
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
@CsvSource({
"false, false",
"false, true",
"true, false",
"true, true",
})
public void testFetchOffsets(
boolean fetchAllOffsets,
boolean requireStable
) throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Expand All @@ -1124,10 +1135,13 @@ public void testFetchOffsets(

OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group")
.setGroupId("group");
if (!fetchAllOffsets) {
request
.setTopics(Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Collections.singletonList(0))));
}

OffsetFetchResponseData.OffsetFetchResponseGroup response =
new OffsetFetchResponseData.OffsetFetchResponseGroup()
Expand All @@ -1140,20 +1154,22 @@ public void testFetchOffsets(

if (requireStable) {
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("fetch-offsets"),
ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : "fetch-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think __consumer_offsets can replace by Topic.GROUP_METADATA_TOPIC_NAME, use constant is better than string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__consumer_offsets is used throughout the new group coordinator tests. I think it makes sense to update all instances in the new group coordinator at once and pushed an update for it.

ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(response));
} else {
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("fetch-offsets"),
ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : "fetch-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(response));
}

CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchOffsets(
TriFunction<RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, Boolean, CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>> fetchOffsets =
fetchAllOffsets ? service::fetchAllOffsets : service::fetchOffsets;
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = fetchOffsets.apply(
requestContext(ApiKeys.OFFSET_FETCH),
request,
requireStable
Expand All @@ -1163,8 +1179,14 @@ public void testFetchOffsets(
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
@CsvSource({
"false, false",
"false, true",
"true, false",
"true, true",
})
public void testFetchOffsetsWhenNotStarted(
boolean fetchAllOffsets,
boolean requireStable
) throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Expand All @@ -1178,12 +1200,17 @@ public void testFetchOffsetsWhenNotStarted(

OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group")
.setGroupId("group");
if (!fetchAllOffsets) {
request
.setTopics(Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Collections.singletonList(0))));
}

CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchOffsets(
TriFunction<RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, Boolean, CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>> fetchOffsets =
fetchAllOffsets ? service::fetchAllOffsets : service::fetchOffsets;
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = fetchOffsets.apply(
requestContext(ApiKeys.OFFSET_FETCH),
request,
requireStable
Expand All @@ -1199,13 +1226,19 @@ public void testFetchOffsetsWhenNotStarted(

@ParameterizedTest
@CsvSource({
"UNKNOWN_TOPIC_OR_PARTITION, NOT_COORDINATOR",
"NOT_ENOUGH_REPLICAS, NOT_COORDINATOR",
"REQUEST_TIMED_OUT, NOT_COORDINATOR",
"NOT_LEADER_OR_FOLLOWER, NOT_COORDINATOR",
"KAFKA_STORAGE_ERROR, NOT_COORDINATOR",
"false, UNKNOWN_TOPIC_OR_PARTITION, NOT_COORDINATOR",
"false, NOT_ENOUGH_REPLICAS, NOT_COORDINATOR",
"false, REQUEST_TIMED_OUT, NOT_COORDINATOR",
"false, NOT_LEADER_OR_FOLLOWER, NOT_COORDINATOR",
"false, KAFKA_STORAGE_ERROR, NOT_COORDINATOR",
"true, UNKNOWN_TOPIC_OR_PARTITION, NOT_COORDINATOR",
"true, NOT_ENOUGH_REPLICAS, NOT_COORDINATOR",
"true, REQUEST_TIMED_OUT, NOT_COORDINATOR",
"true, NOT_LEADER_OR_FOLLOWER, NOT_COORDINATOR",
"true, KAFKA_STORAGE_ERROR, NOT_COORDINATOR",
})
public void testFetchOffsetsWithWrappedError(
boolean fetchAllOffsets,
Errors error,
Errors expectedError
) throws ExecutionException, InterruptedException {
Expand All @@ -1222,152 +1255,24 @@ public void testFetchOffsetsWithWrappedError(

OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group")
.setGroupId("group");
if (!fetchAllOffsets) {
request
.setTopics(Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Collections.singletonList(0))));

when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("fetch-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(new CompletionException(error.exception())));

CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchOffsets(
requestContext(ApiKeys.OFFSET_FETCH),
request,
true
);

assertEquals(
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId("group")
.setErrorCode(expectedError.code()),
future.get()
);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFetchAllOffsets(
boolean requireStable
) throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime,
new GroupCoordinatorMetrics(),
createConfigManager()
);

service.startup(() -> 1);

OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group");

OffsetFetchResponseData.OffsetFetchResponseGroup response =
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId("group")
.setTopics(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(0)
.setCommittedOffset(100L)))));

if (requireStable) {
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("fetch-all-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(response));
} else {
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("fetch-all-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(response));
}

CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchAllOffsets(
requestContext(ApiKeys.OFFSET_FETCH),
request,
requireStable
);

assertEquals(response, future.get(5, TimeUnit.SECONDS));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFetchAllOffsetsWhenNotStarted(
boolean requireStable
) throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime,
new GroupCoordinatorMetrics(),
createConfigManager()
);

OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group");

CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchAllOffsets(
requestContext(ApiKeys.OFFSET_FETCH),
request,
requireStable
);

assertEquals(
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId("group")
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
future.get()
);
}

@ParameterizedTest
@CsvSource({
"UNKNOWN_TOPIC_OR_PARTITION, NOT_COORDINATOR",
"NOT_ENOUGH_REPLICAS, NOT_COORDINATOR",
"REQUEST_TIMED_OUT, NOT_COORDINATOR",
"NOT_LEADER_OR_FOLLOWER, NOT_COORDINATOR",
"KAFKA_STORAGE_ERROR, NOT_COORDINATOR",
})
public void testFetchAllOffsetsWithWrappedError(
Errors error,
Errors expectedError
) throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime,
new GroupCoordinatorMetrics(),
createConfigManager()
);

service.startup(() -> 1);

OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group");

when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("fetch-all-offsets"),
ArgumentMatchers.eq(fetchAllOffsets ? "fetch-all-offsets" : "fetch-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(new CompletionException(error.exception())));
)).thenReturn(FutureUtils.failedFuture(new CompletionException(error.exception())));

CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchAllOffsets(
TriFunction<RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, Boolean, CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>> fetchOffsets =
fetchAllOffsets ? service::fetchAllOffsets : service::fetchOffsets;
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = fetchOffsets.apply(
requestContext(ApiKeys.OFFSET_FETCH),
request,
true
Expand Down