Skip to content

Commit

Permalink
KAFKA-18060 new coordinator does not handle TxnOffsetCommitRequest wi…
Browse files Browse the repository at this point in the history
…th empty member id when using CONSUMER group (apache#17914)

There are two issues in KAFKA-18060:

1) New coordinator can't handle the TxnOffsetCommitRequest with empty member id, and TxnOffsetCommitRequest v0-v2 do definitely has an empty member ID, causing ConsumerGroup#validateOffsetCommit to throw an UnknownMemberIdException. This prevents the old producer from calling sendOffsetsToTransaction. Note that TxnOffsetCommitRequest versions v0-v2 are included in KIP-896, so it seems the new coordinator should support that operations

2) The deprecated API Producer#sendOffsetsToTransaction does not use v0-v2 to send TxnOffsetCommitRequest with an empty member ID. Unfortunately, it has been released for a while. Therefore, the new coordinator needs to handle TxnOffsetCommitRequest with an empty member ID for all versions.

Taken from the two issues above, we need to handle empty member id in all API versions when new coordinator are dealing with TxnOffsetCommitRequest.

Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
brandboat authored Dec 3, 2024
1 parent 180112a commit ac8b3df
Show file tree
Hide file tree
Showing 6 changed files with 416 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
Expand All @@ -35,7 +36,9 @@
import java.util.Optional;

import static org.apache.kafka.common.requests.TxnOffsetCommitRequest.getErrorResponse;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class TxnOffsetCommitRequestTest extends OffsetCommitRequestTest {

Expand Down Expand Up @@ -151,4 +154,18 @@ public void testGetErrorResponse() {

assertEquals(expectedResponse, getErrorResponse(builderWithGroupMetadata.data, Errors.UNKNOWN_MEMBER_ID));
}

@Test
public void testVersionSupportForGroupMetadata() {
for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
assertDoesNotThrow(() -> builder.build(version));
if (version >= 3) {
assertDoesNotThrow(() -> builderWithGroupMetadata.build(version));
} else {
assertEquals("Broker doesn't support group metadata commit API on version " + version +
", minimum supported request version is 3 which requires brokers to be on version 2.5 or above.",
assertThrows(UnsupportedVersionException.class, () -> builderWithGroupMetadata.build(version)).getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupR
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment
import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.ProducerIdAndEpoch
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.junit.jupiter.api.Assertions.{assertEquals, fail}

Expand Down Expand Up @@ -59,6 +60,19 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
}
}

protected def createTransactionStateTopic(): Unit = {
val admin = cluster.admin()
try {
TestUtils.createTransactionStateTopicWithAdmin(
admin = admin,
brokers = brokers(),
controllers = controllerServers()
)
} finally {
admin.close()
}
}

protected def createTopic(
topic: String,
numPartitions: Int
Expand Down Expand Up @@ -194,6 +208,114 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
assertEquals(expectedResponse, response.data)
}

protected def commitTxnOffset(
groupId: String,
memberId: String,
generationId: Int,
producerId: Long,
producerEpoch: Short,
transactionalId: String,
topic: String,
partition: Int,
offset: Long,
expectedError: Errors,
version: Short = ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
): Unit = {
val request = new TxnOffsetCommitRequest.Builder(
new TxnOffsetCommitRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setGenerationId(generationId)
.setProducerId(producerId)
.setProducerEpoch(producerEpoch)
.setTransactionalId(transactionalId)
.setTopics(List(
new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
.setName(topic)
.setPartitions(List(
new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
.setPartitionIndex(partition)
.setCommittedOffset(offset)
).asJava)
).asJava)
).build(version)

val expectedResponse = new TxnOffsetCommitResponseData()
.setTopics(List(
new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
.setName(topic)
.setPartitions(List(
new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(expectedError.code)
).asJava)
).asJava)

val response = connectAndReceive[TxnOffsetCommitResponse](request)
assertEquals(expectedResponse, response.data)
}

protected def addOffsetsToTxn(
groupId: String,
producerId: Long,
producerEpoch: Short,
transactionalId: String,
version: Short = ApiKeys.ADD_OFFSETS_TO_TXN.latestVersion(isUnstableApiEnabled)
): Unit = {
val request = new AddOffsetsToTxnRequest.Builder(
new AddOffsetsToTxnRequestData()
.setTransactionalId(transactionalId)
.setProducerId(producerId)
.setProducerEpoch(producerEpoch)
.setGroupId(groupId)
).build(version)

val response = connectAndReceive[AddOffsetsToTxnResponse](request)
assertEquals(new AddOffsetsToTxnResponseData(), response.data)
}

protected def initProducerId(
transactionalId: String,
transactionTimeoutMs: Int = 60000,
producerIdAndEpoch: ProducerIdAndEpoch,
expectedError: Errors,
version: Short = ApiKeys.INIT_PRODUCER_ID.latestVersion(isUnstableApiEnabled)
): ProducerIdAndEpoch = {
val request = new InitProducerIdRequest.Builder(
new InitProducerIdRequestData()
.setTransactionalId(transactionalId)
.setTransactionTimeoutMs(transactionTimeoutMs)
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch))
.build(version)

val response = connectAndReceive[InitProducerIdResponse](request).data
assertEquals(expectedError.code, response.errorCode)
new ProducerIdAndEpoch(response.producerId, response.producerEpoch)
}

protected def endTxn(
producerId: Long,
producerEpoch: Short,
transactionalId: String,
isTransactionV2Enabled: Boolean,
committed: Boolean,
expectedError: Errors,
version: Short = ApiKeys.END_TXN.latestVersion(isUnstableApiEnabled)
): Unit = {
val request = new EndTxnRequest.Builder(
new EndTxnRequestData()
.setProducerId(producerId)
.setProducerEpoch(producerEpoch)
.setTransactionalId(transactionalId)
.setCommitted(committed),
isUnstableApiEnabled,
isTransactionV2Enabled
).build(version)

assertEquals(expectedError, connectAndReceive[EndTxnResponse](request).error)
}

protected def fetchOffsets(
groupId: String,
memberId: String,
Expand Down
Loading

0 comments on commit ac8b3df

Please sign in to comment.