From 9db5ed00a8369d5c696e836661230110ea2ea44d Mon Sep 17 00:00:00 2001 From: Chirag Wadhwa <122860692+chirag-wadhwa5@users.noreply.github.com> Date: Mon, 11 Nov 2024 14:36:11 +0530 Subject: [PATCH] KAFKA-16726: Added share.auto.offset.reset dynamic config for share groups (#17573) This PR adds another dynamic config share.auto.offset.reset fir share groups. Reviewers: Andrew Schofield , Apoorv Mittal , Abhinav Dixit , Manikumar Reddy --- .../kafka/server/share/ShareFetchUtils.java | 24 +- .../kafka/server/share/SharePartition.java | 31 +- .../share/SharePartitionManagerTest.java | 15 + .../server/share/SharePartitionTest.java | 267 ++++++++++++- .../kafka/test/api/ShareConsumerTest.java | 244 +++++++++++- .../unit/kafka/server/KafkaApisTest.scala | 5 +- .../ShareFetchAcknowledgeRequestTest.scala | 354 +++++++++++------- .../kafka/coordinator/group/GroupConfig.java | 42 ++- .../coordinator/group/GroupConfigTest.java | 57 ++- .../share/ShareCoordinatorShard.java | 2 +- .../persister/NoOpShareStatePersister.java | 4 +- .../share/persister/PartitionFactory.java | 8 +- 12 files changed, 889 insertions(+), 164 deletions(-) diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java b/core/src/main/java/kafka/server/share/ShareFetchUtils.java index bd504deac49e8..3515362152b02 100644 --- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java +++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java @@ -19,9 +19,11 @@ import kafka.cluster.Partition; import kafka.server.ReplicaManager; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.OffsetNotAvailableException; import org.apache.kafka.common.message.ShareFetchResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.FileRecords; @@ -40,6 +42,7 @@ import java.util.Optional; import scala.Option; +import scala.Some; /** * Utility class for post-processing of share fetch operations. @@ -125,7 +128,26 @@ static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, Replic Option timestampAndOffset = replicaManager.fetchOffsetForTimestamp( topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(), Optional.empty(), true).timestampAndOffsetOpt(); - return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset; + if (timestampAndOffset.isEmpty()) { + throw new OffsetNotAvailableException("offset for Earliest timestamp not found for topic partition: " + topicIdPartition); + } + return timestampAndOffset.get().offset; + } + + /** + * The method is used to get the offset for the latest timestamp for the topic-partition. + * + * @return The offset for the latest timestamp. + */ + static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager) { + // Isolation level is set to READ_UNCOMMITTED, matching with that used in share fetch requests + Option timestampAndOffset = replicaManager.fetchOffsetForTimestamp( + topicIdPartition.topicPartition(), ListOffsetsRequest.LATEST_TIMESTAMP, new Some<>(IsolationLevel.READ_UNCOMMITTED), + Optional.empty(), true).timestampAndOffsetOpt(); + if (timestampAndOffset.isEmpty()) { + throw new OffsetNotAvailableException("offset for Latest timestamp not found for topic partition: " + topicIdPartition); + } + return timestampAndOffset.get().offset; } static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) { diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 740bf697de405..71baea1017441 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.GroupConfig; import org.apache.kafka.coordinator.group.GroupConfigManager; import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; @@ -72,6 +73,9 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp; +import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp; + /** * The SharePartition is used to track the state of a partition that is shared between multiple * consumers. The class maintains the state of the records that have been fetched from the leader @@ -421,8 +425,12 @@ public CompletableFuture maybeInitialize() { return; } - // Set the state epoch and end offset from the persisted state. - startOffset = partitionData.startOffset() != -1 ? partitionData.startOffset() : 0; + try { + startOffset = startOffsetDuringInitialization(partitionData.startOffset()); + } catch (Exception e) { + completeInitializationWithException(future, e); + return; + } stateEpoch = partitionData.stateEpoch(); List stateBatches = partitionData.stateBatches(); @@ -448,7 +456,7 @@ public CompletableFuture maybeInitialize() { // and start/end offsets. maybeUpdateCachedStateAndOffsets(); } else { - updateEndOffsetAndResetFetchOffsetMetadata(partitionData.startOffset()); + updateEndOffsetAndResetFetchOffsetMetadata(startOffset); } // Set the partition state to Active and complete the future. partitionState = SharePartitionState.ACTIVE; @@ -2058,6 +2066,23 @@ private void releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFl } } + private long startOffsetDuringInitialization(long partitionDataStartOffset) throws Exception { + // Set the state epoch and end offset from the persisted state. + if (partitionDataStartOffset != PartitionFactory.UNINITIALIZED_START_OFFSET) { + return partitionDataStartOffset; + } + GroupConfig.ShareGroupAutoOffsetReset offsetResetStrategy; + if (groupConfigManager.groupConfig(groupId).isPresent()) { + offsetResetStrategy = groupConfigManager.groupConfig(groupId).get().shareAutoOffsetReset(); + } else { + offsetResetStrategy = GroupConfig.defaultShareAutoOffsetReset(); + } + + if (offsetResetStrategy == GroupConfig.ShareGroupAutoOffsetReset.EARLIEST) + return offsetForEarliestTimestamp(topicIdPartition, replicaManager); + return offsetForLatestTimestamp(topicIdPartition, replicaManager); + } + // Visible for testing. Should only be used for testing purposes. NavigableMap cachedState() { return new ConcurrentSkipListMap<>(cachedState); diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 67c2a6cce778c..46abf04b0a643 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -17,6 +17,7 @@ package kafka.server.share; import kafka.cluster.Partition; +import kafka.log.OffsetResultHolder; import kafka.server.LogReadResult; import kafka.server.ReplicaManager; import kafka.server.ReplicaQuota; @@ -42,6 +43,7 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.ShareFetchRequest; @@ -1040,6 +1042,8 @@ public void testMultipleSequentialShareFetches() { partitionMaxBytes.put(tp5, PARTITION_MAX_BYTES); partitionMaxBytes.put(tp6, PARTITION_MAX_BYTES); + mockFetchOffsetForTimestamp(mockReplicaManager); + Time time = mock(Time.class); when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L); Metrics metrics = new Metrics(); @@ -1109,6 +1113,9 @@ public void testMultipleConcurrentShareFetches() throws InterruptedException { partitionMaxBytes.put(tp3, PARTITION_MAX_BYTES); final Time time = new MockTime(0, System.currentTimeMillis(), 0); + + mockFetchOffsetForTimestamp(mockReplicaManager); + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); @@ -1233,6 +1240,8 @@ public void testReplicaManagerFetchShouldProceed() { TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)); Map partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES); + mockFetchOffsetForTimestamp(mockReplicaManager); + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); @@ -2482,6 +2491,12 @@ private void validateShareFetchFutureException(CompletableFuture> buildLogReadResult(Set topicIdPartitions) { List> logReadResults = new ArrayList<>(); topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 6538a1f40126f..3e90005902e10 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -16,12 +16,14 @@ */ package kafka.server.share; +import kafka.log.OffsetResultHolder; import kafka.server.ReplicaManager; import kafka.server.share.SharePartition.InFlightState; import kafka.server.share.SharePartition.RecordState; import kafka.server.share.SharePartition.SharePartitionState; import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; @@ -34,9 +36,11 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.group.GroupConfig; @@ -77,6 +81,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import scala.Option; + import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID; import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -191,6 +197,244 @@ public void testMaybeInitialize() { assertNull(sharePartition.cachedState().get(11L).offsetState()); } + @Test + public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsEarliest() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData( + 0, PartitionFactory.DEFAULT_STATE_EPOCH, + PartitionFactory.UNINITIALIZED_START_OFFSET, + PartitionFactory.DEFAULT_ERROR_CODE, + PartitionFactory.DEFAULT_ERR_MESSAGE, + Collections.emptyList()))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + + GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); + GroupConfig groupConfig = Mockito.mock(GroupConfig.class); + Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); + Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST); + + ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); + + FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty()); + Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). + when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); + + SharePartition sharePartition = SharePartitionBuilder.builder() + .withPersister(persister) + .withGroupConfigManager(groupConfigManager) + .withReplicaManager(replicaManager) + .build(); + + CompletableFuture result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + // replicaManager.fetchOffsetForTimestamp should be called with "ListOffsetsRequest.EARLIEST_TIMESTAMP" + Mockito.verify(replicaManager).fetchOffsetForTimestamp( + Mockito.any(TopicPartition.class), + Mockito.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP), + Mockito.any(), + Mockito.any(), + Mockito.anyBoolean() + ); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(0, sharePartition.startOffset()); + assertEquals(0, sharePartition.endOffset()); + assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch()); + } + + @Test + public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsLatest() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData( + 0, PartitionFactory.DEFAULT_STATE_EPOCH, + PartitionFactory.UNINITIALIZED_START_OFFSET, + PartitionFactory.DEFAULT_ERROR_CODE, + PartitionFactory.DEFAULT_ERR_MESSAGE, + Collections.emptyList()))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + + GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); + GroupConfig groupConfig = Mockito.mock(GroupConfig.class); + Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); + Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.LATEST); + + ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); + + FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty()); + Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). + when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); + + SharePartition sharePartition = SharePartitionBuilder.builder() + .withPersister(persister) + .withGroupConfigManager(groupConfigManager) + .withReplicaManager(replicaManager) + .build(); + + CompletableFuture result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + // replicaManager.fetchOffsetForTimestamp should be called with "ListOffsetsRequest.LATEST_TIMESTAMP" + Mockito.verify(replicaManager).fetchOffsetForTimestamp( + Mockito.any(TopicPartition.class), + Mockito.eq(ListOffsetsRequest.LATEST_TIMESTAMP), + Mockito.any(), + Mockito.any(), + Mockito.anyBoolean() + ); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(15, sharePartition.startOffset()); + assertEquals(15, sharePartition.endOffset()); + assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch()); + } + + @Test + public void testMaybeInitializeDefaultStartEpochGroupConfigNotPresent() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData( + 0, PartitionFactory.DEFAULT_STATE_EPOCH, + PartitionFactory.UNINITIALIZED_START_OFFSET, + PartitionFactory.DEFAULT_ERROR_CODE, + PartitionFactory.DEFAULT_ERR_MESSAGE, + Collections.emptyList()))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + + GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); + Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty()); + + ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); + + FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty()); + Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). + when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); + + SharePartition sharePartition = SharePartitionBuilder.builder() + .withPersister(persister) + .withGroupConfigManager(groupConfigManager) + .withReplicaManager(replicaManager) + .build(); + + CompletableFuture result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + // replicaManager.fetchOffsetForTimestamp should be called with "ListOffsetsRequest.LATEST_TIMESTAMP" + Mockito.verify(replicaManager).fetchOffsetForTimestamp( + Mockito.any(TopicPartition.class), + Mockito.eq(ListOffsetsRequest.LATEST_TIMESTAMP), + Mockito.any(), + Mockito.any(), + Mockito.anyBoolean() + ); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(15, sharePartition.startOffset()); + assertEquals(15, sharePartition.endOffset()); + assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch()); + } + + @Test + public void testMaybeInitializeFetchOffsetForLatestTimestampThrowsError() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData( + 0, PartitionFactory.DEFAULT_STATE_EPOCH, + PartitionFactory.UNINITIALIZED_START_OFFSET, + PartitionFactory.DEFAULT_ERROR_CODE, + PartitionFactory.DEFAULT_ERR_MESSAGE, + Collections.emptyList()))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + + GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); + Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty()); + + ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); + + Mockito.when(replicaManager.fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean())) + .thenThrow(new RuntimeException("fetch offsets exception")); + + SharePartition sharePartition = SharePartitionBuilder.builder() + .withPersister(persister) + .withGroupConfigManager(groupConfigManager) + .withReplicaManager(replicaManager) + .build(); + + CompletableFuture result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + + // replicaManager.fetchOffsetForTimestamp should be called with "ListOffsetsRequest.LATEST_TIMESTAMP" + Mockito.verify(replicaManager).fetchOffsetForTimestamp( + Mockito.any(TopicPartition.class), + Mockito.eq(ListOffsetsRequest.LATEST_TIMESTAMP), + Mockito.any(), + Mockito.any(), + Mockito.anyBoolean() + ); + + assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); + } + + @Test + public void testMaybeInitializeFetchOffsetForEarliestTimestampThrowsError() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData( + 0, PartitionFactory.DEFAULT_STATE_EPOCH, + PartitionFactory.UNINITIALIZED_START_OFFSET, + PartitionFactory.DEFAULT_ERROR_CODE, + PartitionFactory.DEFAULT_ERR_MESSAGE, + Collections.emptyList()))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + + GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); + GroupConfig groupConfig = Mockito.mock(GroupConfig.class); + Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); + Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST); + + ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); + + Mockito.when(replicaManager.fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean())) + .thenThrow(new RuntimeException("fetch offsets exception")); + + SharePartition sharePartition = SharePartitionBuilder.builder() + .withPersister(persister) + .withGroupConfigManager(groupConfigManager) + .withReplicaManager(replicaManager) + .build(); + + CompletableFuture result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + + // replicaManager.fetchOffsetForTimestamp should be called with "ListOffsetsRequest.EARLIEST_TIMESTAMP" + Mockito.verify(replicaManager).fetchOffsetForTimestamp( + Mockito.any(TopicPartition.class), + Mockito.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP), + Mockito.any(), + Mockito.any(), + Mockito.anyBoolean() + ); + + assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); + } + @Test public void testMaybeInitializeSharePartitionAgain() { Persister persister = Mockito.mock(Persister.class); @@ -460,7 +704,13 @@ public void testMaybeInitializeWithInvalidPartitionResponse() { @Test public void testMaybeInitializeWithNoOpShareStatePersister() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); + + FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty()); + Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). + when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); + + SharePartition sharePartition = SharePartitionBuilder.builder().withReplicaManager(replicaManager).build(); CompletableFuture result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertFalse(result.isCompletedExceptionally()); @@ -825,7 +1075,13 @@ public void testCanAcquireRecordsWithCachedDataAndLimitReached() { @Test public void testMaybeAcquireAndReleaseFetchLock() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); + + FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty()); + Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). + when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); + + SharePartition sharePartition = SharePartitionBuilder.builder().withReplicaManager(replicaManager).build(); sharePartition.maybeInitialize(); assertTrue(sharePartition.maybeAcquireFetchLock()); // Lock cannot be acquired again, as already acquired. @@ -5233,7 +5489,7 @@ private static class SharePartitionBuilder { private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES; private Persister persister = new NoOpShareStatePersister(); - private final ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); + private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); private GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); private SharePartitionState state = SharePartitionState.EMPTY; @@ -5257,6 +5513,11 @@ private SharePartitionBuilder withMaxDeliveryCount(int maxDeliveryCount) { return this; } + private SharePartitionBuilder withReplicaManager(ReplicaManager replicaManager) { + this.replicaManager = replicaManager; + return this; + } + private SharePartitionBuilder withGroupConfigManager(GroupConfigManager groupConfigManager) { this.groupConfigManager = groupConfigManager; return this; diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index ade43e899b450..b7d127eb429a3 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -19,6 +19,9 @@ import kafka.api.BaseConsumerTest; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsOptions; +import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.consumer.AcknowledgeType; @@ -34,6 +37,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidRecordStateException; import org.apache.kafka.common.errors.InvalidTopicException; @@ -47,6 +51,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.test.KafkaClusterTestKit; import org.apache.kafka.common.test.TestKitNodes; +import org.apache.kafka.coordinator.group.GroupConfig; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -60,6 +65,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -101,6 +107,8 @@ public class ShareConsumerTest { private static final String DEFAULT_STATE_PERSISTER = "org.apache.kafka.server.share.persister.DefaultStatePersister"; private static final String NO_OP_PERSISTER = "org.apache.kafka.server.share.persister.NoOpShareStatePersister"; + private Admin adminClient; + @BeforeEach public void createCluster(TestInfo testInfo) throws Exception { String persisterClassName = NO_OP_PERSISTER; @@ -131,11 +139,13 @@ public void createCluster(TestInfo testInfo) throws Exception { cluster.waitForReadyBrokers(); createTopic("topic"); createTopic("topic2"); + adminClient = createAdminClient(); warmup(); } @AfterEach public void destroyCluster() throws Exception { + adminClient.close(); cluster.close(); } @@ -156,6 +166,7 @@ public void testSubscribeAndPollNoRecords(String persister) { Set subscription = Collections.singleton(tp.topic()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); + alterShareAutoOffsetReset("group1", "earliest"); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); shareConsumer.close(); assertEquals(0, records.count()); @@ -168,6 +179,7 @@ public void testSubscribePollUnsubscribe(String persister) { Set subscription = Collections.singleton(tp.topic()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); + alterShareAutoOffsetReset("group1", "earliest"); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); shareConsumer.unsubscribe(); assertEquals(Collections.emptySet(), shareConsumer.subscription()); @@ -182,6 +194,7 @@ public void testSubscribePollSubscribe(String persister) { Set subscription = Collections.singleton(tp.topic()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); + alterShareAutoOffsetReset("group1", "earliest"); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); shareConsumer.subscribe(subscription); @@ -198,6 +211,7 @@ public void testSubscribeUnsubscribePollFails(String persister) { Set subscription = Collections.singleton(tp.topic()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); + alterShareAutoOffsetReset("group1", "earliest"); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); shareConsumer.unsubscribe(); assertEquals(Collections.emptySet(), shareConsumer.subscription()); @@ -214,6 +228,7 @@ public void testSubscribeSubscribeEmptyPollFails(String persister) { Set subscription = Collections.singleton(tp.topic()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); + alterShareAutoOffsetReset("group1", "earliest"); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); shareConsumer.subscribe(Collections.emptySet()); assertEquals(Collections.emptySet(), shareConsumer.subscription()); @@ -231,6 +246,7 @@ public void testSubscriptionAndPoll(String persister) { producer.send(record); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); shareConsumer.close(); @@ -245,6 +261,7 @@ public void testSubscriptionAndPollMultiple(String persister) { producer.send(record); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); producer.send(record); @@ -273,6 +290,8 @@ public void testAcknowledgementSentOnSubscriptionChange(String persister) throws shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); @@ -307,6 +326,8 @@ public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String pe shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); // Now in the second poll, we implicitly acknowledge the record received in the first poll. @@ -334,6 +355,8 @@ public void testAcknowledgementCommitCallbackOnClose(String persister) { shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); @@ -361,6 +384,8 @@ public void testAcknowledgementCommitCallbackInvalidRecordStateException(String shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap)); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); @@ -420,6 +445,7 @@ public void testHeaders(String persister) { KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); List> records = consumeRecords(shareConsumer, numRecords); assertEquals(numRecords, records.size()); @@ -442,6 +468,7 @@ private void testHeadersSerializeDeserialize(Serializer serializer, Dese KafkaShareConsumer shareConsumer = createShareConsumer(deserializer, new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); List> records = consumeRecords(shareConsumer, numRecords); assertEquals(numRecords, records.size()); @@ -468,6 +495,8 @@ public void testMaxPollRecords(String persister) { KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1", Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(maxPollRecords))); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); + List> records = consumeRecords(shareConsumer, numRecords); long i = 0L; for (ConsumerRecord record : records) { @@ -513,6 +542,8 @@ public void testControlRecordsSkipped(String persister) throws Exception { KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(4, records.count()); assertEquals(transactional1.offset(), records.records(tp).get(0).offset()); @@ -538,6 +569,7 @@ public void testExplicitAcknowledgeSuccess(String persister) { producer.send(record); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); records.forEach(shareConsumer::acknowledge); @@ -556,6 +588,7 @@ public void testExplicitAcknowledgeCommitSuccess(String persister) { producer.send(record); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); records.forEach(shareConsumer::acknowledge); @@ -583,6 +616,7 @@ public void testExplicitAcknowledgementCommitAsync(String persister) throws Inte KafkaShareConsumer shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer1.subscribe(Collections.singleton(tp.topic())); shareConsumer2.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); Map> partitionOffsetsMap1 = new HashMap<>(); Map partitionExceptionMap1 = new HashMap<>(); @@ -634,6 +668,7 @@ public void testExplicitAcknowledgementCommitAsyncPartialBatch(String persister) KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer1.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); Map> partitionOffsetsMap = new HashMap<>(); Map partitionExceptionMap = new HashMap<>(); @@ -689,6 +724,7 @@ public void testExplicitAcknowledgeReleasePollAccept(String persister) { producer.send(record); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); @@ -709,6 +745,7 @@ public void testExplicitAcknowledgeReleaseAccept(String persister) { producer.send(record); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); @@ -727,6 +764,7 @@ public void testExplicitAcknowledgeReleaseClose(String persister) { producer.send(record); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); @@ -734,7 +772,6 @@ public void testExplicitAcknowledgeReleaseClose(String persister) { producer.close(); } - @ParameterizedTest(name = "{displayName}.persister={0}") @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testExplicitAcknowledgeThrowsNotInBatch(String persister) { @@ -743,6 +780,7 @@ public void testExplicitAcknowledgeThrowsNotInBatch(String persister) { producer.send(record); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); ConsumerRecord consumedRecord = records.records(tp).get(0); @@ -762,6 +800,7 @@ public void testImplicitAcknowledgeFailsExplicit(String persister) { producer.send(record); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); ConsumerRecord consumedRecord = records.records(tp).get(0); @@ -780,6 +819,7 @@ public void testImplicitAcknowledgeCommitSync(String persister) { producer.send(record); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); Map> result = shareConsumer.commitSync(); @@ -805,6 +845,7 @@ public void testImplicitAcknowledgementCommitAsync(String persister) throws Inte KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); Map> partitionOffsetsMap1 = new HashMap<>(); Map partitionExceptionMap1 = new HashMap<>(); @@ -843,6 +884,8 @@ public void testFetchRecordLargerThanMaxPartitionFetchBytes(String persister) th KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1", Collections.singletonMap(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, String.valueOf(maxPartitionFetchBytes))); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); shareConsumer.close(); @@ -857,9 +900,11 @@ public void testMultipleConsumersWithDifferentGroupIds(String persister) throws KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer1.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); KafkaShareConsumer shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group2"); shareConsumer2.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group2", "earliest"); // producing 3 records to the topic producer.send(record); @@ -907,6 +952,7 @@ public void testMultipleConsumersInGroupSequentialConsumption(String persister) shareConsumer1.subscribe(Collections.singleton(tp.topic())); KafkaShareConsumer shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer2.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); int totalMessages = 2000; for (int i = 0; i < totalMessages; i++) { @@ -943,9 +989,16 @@ public void testMultipleConsumersInGroupConcurrentConsumption(String persister) int producerCount = 4; int messagesPerProducer = 5000; + String groupId = "group1"; + ExecutorService producerExecutorService = Executors.newFixedThreadPool(producerCount); ExecutorService consumerExecutorService = Executors.newFixedThreadPool(consumerCount); + // This consumer is created to register the share group id with the groupCoordinator + // so that the config share.auto.offset.reset can be altered for this group + createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId); + alterShareAutoOffsetReset(groupId, "earliest"); + for (int i = 0; i < producerCount; i++) { producerExecutorService.submit(() -> produceMessages(messagesPerProducer)); } @@ -957,7 +1010,7 @@ public void testMultipleConsumersInGroupConcurrentConsumption(String persister) consumerExecutorService.submit(() -> { CompletableFuture future = new CompletableFuture<>(); futures.add(future); - consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, "group1", consumerNumber, 30, true, future, Optional.of(maxBytes)); + consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, groupId, consumerNumber, 30, true, future, Optional.of(maxBytes)); }); } @@ -990,6 +1043,19 @@ public void testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe int messagesPerProducer = 2000; final int totalMessagesSent = producerCount * messagesPerProducer; + String groupId1 = "group1"; + String groupId2 = "group2"; + String groupId3 = "group3"; + + // These consumers are created to register the share group ids with the groupCoordinator + // so that the config share.auto.offset.reset can be altered for these groups + createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId1); + alterShareAutoOffsetReset(groupId1, "earliest"); + createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId2); + alterShareAutoOffsetReset(groupId2, "earliest"); + createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId3); + alterShareAutoOffsetReset(groupId3, "earliest"); + ExecutorService producerExecutorService = Executors.newFixedThreadPool(producerCount); ExecutorService shareGroupExecutorService1 = Executors.newFixedThreadPool(consumerCount); ExecutorService shareGroupExecutorService2 = Executors.newFixedThreadPool(consumerCount); @@ -1095,6 +1161,7 @@ public void testConsumerCloseInGroupSequential(String persister) { shareConsumer1.subscribe(Collections.singleton(tp.topic())); KafkaShareConsumer shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer2.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); int totalMessages = 1500; for (int i = 0; i < totalMessages; i++) { @@ -1139,6 +1206,13 @@ public void testMultipleConsumersInGroupFailureConcurrentConsumption(String pers int producerCount = 4; int messagesPerProducer = 5000; + String groupId = "group1"; + + // This consumer is created to register the share group id with the groupCoordinator + // so that the config share.auto.offset.reset can be altered for this group + createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId); + alterShareAutoOffsetReset(groupId, "earliest"); + ExecutorService consumerExecutorService = Executors.newFixedThreadPool(consumerCount); ExecutorService producerExecutorService = Executors.newFixedThreadPool(producerCount); @@ -1157,7 +1231,7 @@ public void testMultipleConsumersInGroupFailureConcurrentConsumption(String pers // The "failing" consumer polls but immediately closes, which releases the records for the other consumers CompletableFuture future = new CompletableFuture<>(); AtomicInteger failedMessagesConsumed = new AtomicInteger(0); - consumeMessages(failedMessagesConsumed, producerCount * messagesPerProducer, "group1", 0, 1, false, future); + consumeMessages(failedMessagesConsumed, producerCount * messagesPerProducer, groupId, 0, 1, false, future); startSignal.countDown(); }); @@ -1174,7 +1248,7 @@ public void testMultipleConsumersInGroupFailureConcurrentConsumption(String pers consumerExecutorService.submit(() -> { CompletableFuture future = new CompletableFuture<>(); futuresSuccess.add(future); - consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, "group1", consumerNumber, 40, true, future, Optional.of(maxBytes)); + consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, groupId, consumerNumber, 40, true, future, Optional.of(maxBytes)); }); } producerExecutorService.shutdown(); @@ -1203,6 +1277,7 @@ public void testAcquisitionLockTimeoutOnConsumer(String persister) throws Interr KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer1.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); producer.send(producerRecord1); @@ -1251,6 +1326,7 @@ public void testAcknowledgeCommitCallbackCallsShareConsumerDisallowed(String per KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + alterShareAutoOffsetReset("group1", "earliest"); shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackWithShareConsumer<>(shareConsumer)); shareConsumer.subscribe(Collections.singleton(tp.topic())); @@ -1292,6 +1368,7 @@ public void testAcknowledgeCommitCallbackCallsShareConsumerWakeup(String persist KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + alterShareAutoOffsetReset("group1", "earliest"); // The acknowledgment commit callback will try to call a method of KafkaShareConsumer shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackWakeup<>(shareConsumer)); @@ -1331,6 +1408,7 @@ public void testAcknowledgeCommitCallbackThrowsException(String persister) { KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); producer.send(record); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + alterShareAutoOffsetReset("group1", "earliest"); shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackThrows<>()); shareConsumer.subscribe(Collections.singleton(tp.topic())); @@ -1363,6 +1441,7 @@ public void onComplete(Map> offsetsMap, Exception ex public void testPollThrowsInterruptExceptionIfInterrupted(String persister) { KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); // interrupt the thread and call poll try { @@ -1386,6 +1465,7 @@ public void testPollThrowsInterruptExceptionIfInterrupted(String persister) { public void testSubscribeOnInvalidTopicThrowsInvalidTopicException(String persister) { KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton("topic abc")); + alterShareAutoOffsetReset("group1", "earliest"); // The exception depends upon a metadata response which arrives asynchronously. If the delay is // too short, the poll might return before the error is known. @@ -1405,6 +1485,7 @@ public void testWakeupWithFetchedRecordsAvailable(String persister) { producer.send(record); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); shareConsumer.subscribe(Collections.singleton(tp.topic())); + alterShareAutoOffsetReset("group1", "earliest"); shareConsumer.wakeup(); assertThrows(WakeupException.class, () -> shareConsumer.poll(Duration.ZERO)); @@ -1423,6 +1504,7 @@ public void testSubscriptionFollowedByTopicCreation(String persister) throws Int KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); String topic = "foo"; shareConsumer.subscribe(Collections.singleton(topic)); + alterShareAutoOffsetReset("group1", "earliest"); // Topic is created post creation of share consumer and subscription createTopic(topic); @@ -1458,6 +1540,7 @@ public void testSubscriptionAndPollFollowedByTopicDeletion(String persister) thr KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); // Consumer subscribes to the topics -> bar and baz. shareConsumer.subscribe(Arrays.asList(topic1, topic2)); + alterShareAutoOffsetReset("group1", "earliest"); producer.send(recordTopic1).get(); TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, @@ -1490,6 +1573,13 @@ public void testLsoMovementByRecordsDeletion(String persister) { KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); ProducerRecord record = new ProducerRecord<>(tp.topic(), 0, null, "key".getBytes(), "value".getBytes()); + String groupId = "group1"; + + // This consumer is created to register the share group id with the groupCoordinator + // so that the config share.auto.offset.reset can be altered for this group + createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId); + alterShareAutoOffsetReset(groupId, "earliest"); + // We write 10 records to the topic, so they would be written from offsets 0-9 on the topic. try { for (int i = 0; i < 10; i++) { @@ -1499,13 +1589,12 @@ public void testLsoMovementByRecordsDeletion(String persister) { fail("Failed to send records: " + e); } - Admin adminClient = createAdminClient(); // We delete records before offset 5, so the LSO should move to 5. adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(5L))); AtomicInteger totalMessagesConsumed = new AtomicInteger(0); CompletableFuture future = new CompletableFuture<>(); - consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true, future); + consumeMessages(totalMessagesConsumed, 5, groupId, 1, 10, true, future); // The records returned belong to offsets 5-9. assertEquals(5, totalMessagesConsumed.get()); try { @@ -1528,7 +1617,7 @@ public void testLsoMovementByRecordsDeletion(String persister) { totalMessagesConsumed = new AtomicInteger(0); future = new CompletableFuture<>(); - consumeMessages(totalMessagesConsumed, 1, "group1", 1, 10, true, future); + consumeMessages(totalMessagesConsumed, 1, groupId, 1, 10, true, future); // The record returned belong to offset 14. assertEquals(1, totalMessagesConsumed.get()); try { @@ -1542,14 +1631,135 @@ public void testLsoMovementByRecordsDeletion(String persister) { totalMessagesConsumed = new AtomicInteger(0); future = new CompletableFuture<>(); - consumeMessages(totalMessagesConsumed, 0, "group1", 1, 5, true, future); + consumeMessages(totalMessagesConsumed, 0, groupId, 1, 5, true, future); assertEquals(0, totalMessagesConsumed.get()); try { assertEquals(0, future.get()); } catch (Exception e) { fail("Exception occurred : " + e.getMessage()); } - adminClient.close(); + producer.close(); + } + + @ParameterizedTest(name = "{displayName}.persister={0}") + @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) + public void testShareAutoOffsetResetDefaultValue(String persister) { + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + // Producing a record. + producer.send(record); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + // No records should be consumed because share.auto.offset.reset has a default of "latest". Since the record + // was produced before share partition was initialized (which happens after the first share fetch request + // in the poll method), the start offset would be the latest offset, i.e. 1 (the next offset after the already + // present 0th record) + assertEquals(0, records.count()); + // Producing another record. + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + // Now the next record should be consumed successfully + assertEquals(1, records.count()); + shareConsumer.close(); + producer.close(); + } + + @ParameterizedTest(name = "{displayName}.persister={0}") + @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) + public void testShareAutoOffsetResetEarliest(String persister) { + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + // Changing the value of share.auto.offset.reset value to "earliest" + alterShareAutoOffsetReset("group1", "earliest"); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + // Producing a record. + producer.send(record); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + // Since the value for share.auto.offset.reset has been altered to "earliest", the consumer should consume + // all messages present on the partition + assertEquals(1, records.count()); + // Producing another record. + producer.send(record); + records = shareConsumer.poll(Duration.ofMillis(5000)); + // The next records should also be consumed successfully + assertEquals(1, records.count()); + shareConsumer.close(); + producer.close(); + } + + @ParameterizedTest(name = "{displayName}.persister={0}") + @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) + public void testShareAutoOffsetResetEarliestAfterLsoMovement(String persister) throws Exception { + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumer.subscribe(Collections.singleton(tp.topic())); + // Changing the value of share.auto.offset.reset value to "earliest" + alterShareAutoOffsetReset("group1", "earliest"); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + // We write 10 records to the topic, so they would be written from offsets 0-9 on the topic. + try { + for (int i = 0; i < 10; i++) { + producer.send(record).get(); + } + } catch (Exception e) { + fail("Failed to send records: " + e); + } + + // We delete records before offset 5, so the LSO should move to 5. + adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(5L))); + + AtomicInteger totalMessagesConsumed = new AtomicInteger(0); + CompletableFuture future = new CompletableFuture<>(); + consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true, future); + // The records returned belong to offsets 5-9. + assertEquals(5, totalMessagesConsumed.get()); + assertEquals(5, future.get()); + + shareConsumer.close(); + producer.close(); + } + + @ParameterizedTest(name = "{displayName}.persister={0}") + @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) + public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String persister) { + KafkaShareConsumer shareConsumerEarliest = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + shareConsumerEarliest.subscribe(Collections.singleton(tp.topic())); + // Changing the value of share.auto.offset.reset value to "earliest" for group1 + alterShareAutoOffsetReset("group1", "earliest"); + + KafkaShareConsumer shareConsumerLatest = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group2"); + shareConsumerLatest.subscribe(Collections.singleton(tp.topic())); + // Changing the value of share.auto.offset.reset value to "latest" for group2 + alterShareAutoOffsetReset("group2", "latest"); + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); + // Producing a record. + producer.send(record); + ConsumerRecords records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000)); + // Since the value for share.auto.offset.reset has been altered to "earliest", the consumer should consume + // all messages present on the partition + assertEquals(1, records1.count()); + + ConsumerRecords records2 = shareConsumerLatest.poll(Duration.ofMillis(5000)); + // Since the value for share.auto.offset.reset has been altered to "latest", the consumer should not consume + // any message + assertEquals(0, records2.count()); + + // Producing another record. + producer.send(record); + + records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000)); + // The next record should also be consumed successfully by group1 + assertEquals(1, records1.count()); + + records2 = shareConsumerLatest.poll(Duration.ofMillis(5000)); + // The next record should also be consumed successfully by group2 + assertEquals(1, records2.count()); + + shareConsumerEarliest.close(); + shareConsumerLatest.close(); producer.close(); } @@ -1729,6 +1939,7 @@ private void warmup() throws InterruptedException, ExecutionException, TimeoutEx try { producer.send(record).get(15000, TimeUnit.MILLISECONDS); shareConsumer.subscribe(subscription); + alterShareAutoOffsetReset("warmupgroup1", "earliest"); TestUtils.waitForCondition( () -> shareConsumer.poll(Duration.ofMillis(5000)).count() == 1, 30000, 200L, () -> "warmup record not received"); } finally { @@ -1736,4 +1947,19 @@ private void warmup() throws InterruptedException, ExecutionException, TimeoutEx shareConsumer.close(); } } + + private void alterShareAutoOffsetReset(String groupId, String newValue) { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId); + Map> alterEntries = new HashMap<>(); + alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry( + GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, newValue), AlterConfigOp.OpType.SET))); + AlterConfigsOptions alterOptions = new AlterConfigsOptions(); + try { + adminClient.incrementalAlterConfigs(alterEntries, alterOptions) + .all() + .get(60, TimeUnit.SECONDS); + } catch (Exception e) { + fail("Exception was thrown: ", e); + } + } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 7b78793373bb2..2e6ffdf400b81 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -74,9 +74,9 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAndEpoch, SecurityUtils, Utils} -import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG} +import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG} import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig -import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig} +import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig} import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorConfigTest} import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.metadata.LeaderAndIsr @@ -585,6 +585,7 @@ class KafkaApisTest extends Logging { cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString) cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString) cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString) + cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, GroupConfig.defaultShareAutoOffsetReset.toString) when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs) val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData() diff --git a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala index cdc8cf5dd758a..8097021e4cb52 100644 --- a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala @@ -170,14 +170,17 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Send the first share fetch request to initialize the share partition + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic created above produceData(topicIdPartition, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Send the share fetch request to fetch the records produced above - val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) + // Send the second share fetch request to fetch the records produced above + val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap) val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -235,16 +238,19 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicIdPartition2 = new TopicIdPartition(topicId, new TopicPartition(topic, 1)) val topicIdPartition3 = new TopicIdPartition(topicId, new TopicPartition(topic, 2)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2, topicIdPartition3) + + // Send the first share fetch request to initialize the share partitions + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic partitions created above produceData(topicIdPartition1, 10) produceData(topicIdPartition2, 10) produceData(topicIdPartition3, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2, topicIdPartition3) - - // Send the share fetch request to fetch the records produced above - val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) + // Send the second share fetch request to fetch the records produced above + val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap) val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -325,12 +331,6 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val leader2 = partitionToLeaders(topicIdPartition2) val leader3 = partitionToLeaders(topicIdPartition3) - initProducer() - // Producing 10 records to the topic partitions created above - produceData(topicIdPartition1, 10) - produceData(topicIdPartition2, 10) - produceData(topicIdPartition3, 10) - val send1: Seq[TopicIdPartition] = Seq(topicIdPartition1) val send2: Seq[TopicIdPartition] = Seq(topicIdPartition2) val send3: Seq[TopicIdPartition] = Seq(topicIdPartition3) @@ -338,14 +338,31 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty - // Crete different share fetch requests for different partitions as they may have leaders on separate brokers - val shareFetchRequest1 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap) - val shareFetchRequest2 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap) - val shareFetchRequest3 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap) + // Send the first share fetch request to initialize the share partitions + // Create different share fetch requests for different partitions as they may have leaders on separate brokers + var shareFetchRequest1 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap) + var shareFetchRequest2 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap) + var shareFetchRequest3 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap) + + var shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1) + var shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2) + var shareFetchResponse3 = connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3) + + initProducer() + // Producing 10 records to the topic partitions created above + produceData(topicIdPartition1, 10) + produceData(topicIdPartition2, 10) + produceData(topicIdPartition3, 10) + + // Send the second share fetch request to fetch the records produced above + // Create different share fetch requests for different partitions as they may have leaders on separate brokers + shareFetchRequest1 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap) + shareFetchRequest2 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap) + shareFetchRequest3 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap) - val shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1) - val shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2) - val shareFetchResponse3 = connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3) + shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1) + shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2) + shareFetchResponse3 = connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3) val shareFetchResponseData1 = shareFetchResponse1.data() assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode) @@ -427,15 +444,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Send the first share fetch request to initialize share partitions + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic created above produceData(topicIdPartition, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Send the share fetch request to fetch the records produced above - var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH - var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch) + // Send the second share fetch request to fetch the records produced above + var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) + var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -482,7 +502,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // Producing 10 more records to the topic produceData(topicIdPartition, 10) - // Sending a second share fetch request to check if acknowledgements were done successfully + // Sending a third share fetch request to check if acknowledgements were done successfully shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty) @@ -540,15 +560,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Send the first share fetch request to initialize the share partition + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic created above produceData(topicIdPartition, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Send the share fetch request to fetch the records produced above - var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH - var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch) + // Send the second share fetch request to fetch the records produced above + var shareSessionEpoch: Int = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) + var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -571,7 +594,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // Producing 10 more records to the topic created above produceData(topicIdPartition, 10) - // Send a Share Fetch request with piggybacked acknowledgements + // Send the third Share Fetch request with piggybacked acknowledgements shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch() @@ -599,7 +622,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // Producing 10 more records to the topic produceData(topicIdPartition, 10) - // Sending a third share fetch request to confirm if acknowledgements were done successfully + // Sending a fourth share fetch request to confirm if acknowledgements were done successfully shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty) @@ -657,15 +680,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Send the first share fetch request to initialize the share partiion + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic created above produceData(topicIdPartition, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Send the share fetch request to fetch the records produced above - var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH - var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch) + // Send the second share fetch request to fetch the records produced above + var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) + var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -709,7 +735,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val acknowledgePartitionData = shareAcknowledgeResponseData.responses().get(0).partitions().get(0) compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, acknowledgePartitionData) - // Sending a second share fetch request to check if acknowledgements were done successfully + // Sending a third share fetch request to check if acknowledgements were done successfully shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty) @@ -767,15 +793,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Send the first share fetch request to initialize the share partition + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic created above produceData(topicIdPartition, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Send the share fetch request to fetch the records produced above - var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH - var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch) + // Send the second share fetch request to fetch the records produced above + var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) + var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -798,7 +827,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // Producing 10 more records to the topic created above produceData(topicIdPartition, 10) - // Send a Share Fetch request with piggybacked acknowledgements + // Send a third Share Fetch request with piggybacked acknowledgements shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch() @@ -862,15 +891,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Send the first share fetch request to initialize the share partition + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic created above produceData(topicIdPartition, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Send the share fetch request to fetch the records produced above - var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH - var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch) + // Send the second share fetch request to fetch the records produced above + var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) + var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -917,7 +949,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // Producing 10 more records to the topic produceData(topicIdPartition, 10) - // Sending a second share fetch request to check if acknowledgements were done successfully + // Sending a third share fetch request to check if acknowledgements were done successfully shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty) @@ -975,15 +1007,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Send the first share fetch request to initialize the share partition + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic created above produceData(topicIdPartition, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Send the share fetch request to fetch the records produced above - var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH - var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch) + // Send the second share fetch request to fetch the records produced above + var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) + var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -1006,7 +1041,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // Producing 10 more records to the topic created above produceData(topicIdPartition, 10) - // Send a Share Fetch request with piggybacked acknowledgements + // Send a third Share Fetch request with piggybacked acknowledgements shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch() @@ -1034,7 +1069,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // Producing 10 more records to the topic produceData(topicIdPartition, 10) - // Sending a third share fetch request to confirm if acknowledgements were done successfully + // Sending a fourth share fetch request to confirm if acknowledgements were done successfully shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty) @@ -1094,15 +1129,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Send the first share fetch request to initialize the shar partition + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic created above produceData(topicIdPartition, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Send the share fetch request to fetch the records produced above - var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH - var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch) + // Send the second share fetch request to fetch the records produced above + var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) + var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -1146,7 +1184,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var acknowledgePartitionData = shareAcknowledgeResponseData.responses().get(0).partitions().get(0) compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, acknowledgePartitionData) - // Sending a second share fetch request to check if acknowledgements were done successfully + // Sending a third share fetch request to check if acknowledgements were done successfully shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty) @@ -1193,7 +1231,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // Producing 10 new records to the topic produceData(topicIdPartition, 10) - // Sending a third share fetch request to check if acknowledgements were done successfully + // Sending a fourth share fetch request to check if acknowledgements were done successfully shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty) @@ -1251,6 +1289,11 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Send the first share fetch request to initialize the share partition + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 3 large messages to the topic created above produceData(topicIdPartition, 10) @@ -1258,10 +1301,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo produceData(topicIdPartition, "large message 2", new String(new Array[Byte](MAX_PARTITION_BYTES/3))) produceData(topicIdPartition, "large message 3", new String(new Array[Byte](MAX_PARTITION_BYTES/3))) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Send the share fetch request to fetch the records produced above - val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) + // Send the second share fetch request to fetch the records produced above + val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap) val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -1311,6 +1352,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo ) def testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers(): Unit = { val groupId: String = "group" + + val memberId = Uuid.randomUuid() val memberId1 = Uuid.randomUuid() val memberId2 = Uuid.randomUuid() val memberId3 = Uuid.randomUuid() @@ -1323,12 +1366,15 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Sending a dummy share fetch request to initialize the share partition + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10000 records to the topic created above produceData(topicIdPartition, 10000) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - // Sending 3 share Fetch Requests with same groupId to the same topicPartition but with different memberIds, // mocking the behaviour of multiple share consumers from the same share group val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH) @@ -1418,23 +1464,28 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Sending 3 dummy share Fetch Requests with to inititlaize the share partitions for each share group\ + sendFirstShareFetchRequest(memberId1, groupId1, send) + sendFirstShareFetchRequest(memberId2, groupId2, send) + sendFirstShareFetchRequest(memberId3, groupId3, send) + initProducer() // Producing 10 records to the topic created above produceData(topicIdPartition, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Sending 3 share Fetch Requests with same groupId to the same topicPartition but with different memberIds, - // mocking the behaviour of multiple share consumers from the same share group - val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH) + // Sending 3 share Fetch Requests with different groupId and different memberIds to the same topicPartition, + // mocking the behaviour of 3 different share groups + val metadata1 = new ShareRequestMetadata(memberId1, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val acknowledgementsMap1: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest1 = createShareFetchRequest(groupId1, metadata1, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap1) - val metadata2: ShareRequestMetadata = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH) + val metadata2 = new ShareRequestMetadata(memberId2, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val acknowledgementsMap2: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest2 = createShareFetchRequest(groupId2, metadata2, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap2) - val metadata3: ShareRequestMetadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH) + val metadata3 = new ShareRequestMetadata(memberId3, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest3 = createShareFetchRequest(groupId3, metadata3, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap3) @@ -1509,15 +1560,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Send the first share fetch request to initialize the share partition + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic created above produceData(topicIdPartition, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Send the share fetch request to fetch the records produced above - var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH - var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch) + // Send the second share fetch request to fetch the records produced above + var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) + var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -1540,7 +1594,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // Producing 10 more records to the topic created above produceData(topicIdPartition, 10) - // Send a Share Fetch request with piggybacked acknowledgements + // Send a third Share Fetch request with piggybacked acknowledgements shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch() @@ -1616,15 +1670,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Send the first share fetch request to initialize the share partition + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic created above produceData(topicIdPartition, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Send the share fetch request to fetch the records produced above - var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH - var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch) + // Send the second share fetch request to fetch the records produced above + var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) + var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -1647,7 +1704,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // Producing 10 more records to the topic created above produceData(topicIdPartition, 10) - // Send a Share Fetch request with piggybacked acknowledgements + // Send a third Share Fetch request with piggybacked acknowledgements shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch() @@ -1804,12 +1861,28 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareAcknowledgeResponseData.errorCode) } - @ClusterTest( - serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), - new ClusterConfigProperty(key = "group.share.enable", value = "true"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + @ClusterTests( + Array( + new ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), + new ClusterConfigProperty(key = "group.share.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ) + ), + new ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), + new ClusterConfigProperty(key = "group.share.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "group.share.persister.class.name", value = "org.apache.kafka.server.share.persister.DefaultStatePersister"), + new ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true") + ) + ), ) ) def testShareFetchRequestInvalidShareSessionEpoch(): Unit = { @@ -1824,14 +1897,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Send the first share fetch request to initialize the share partition + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic created above produceData(topicIdPartition, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Send the share fetch request to fetch the records produced above - var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) + // Send the second share fetch request to fetch the records produced above + var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) + var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty) var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -1850,8 +1927,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val partitionData = shareFetchResponseData.responses().get(0).partitions().get(0) compareFetchResponsePartitions(expectedPartitionData, partitionData) - // Sending Share Fetch request with invalid share session epoch - metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))) + // Sending a thord Share Fetch request with invalid share session epoch + shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch)) + metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty) shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -1895,14 +1973,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Send the first share fetch request to initialize the share partition + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic created above produceData(topicIdPartition, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Send the share fetch request to fetch the records produced above - var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) + // Send the second share fetch request to fetch the records produced above + var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) + var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty) val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -1922,7 +2004,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo compareFetchResponsePartitions(expectedPartitionData, partitionData) // Sending Share Acknowledge request with invalid share session epoch - metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))) + shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch)) + metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]] = Map(topicIdPartition -> List(new ShareAcknowledgeRequestData.AcknowledgementBatch() .setFirstOffset(0) @@ -1972,14 +2055,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Send the first share fetch request to initialize the share partition + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic created above produceData(topicIdPartition, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Send the share fetch request to fetch the records produced above - var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) + // Send the second share fetch request to fetch the records produced above + var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) + var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty) var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -1998,8 +2085,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val partitionData = shareFetchResponseData.responses().get(0).partitions().get(0) compareFetchResponsePartitions(expectedPartitionData, partitionData) - // Sending a Share Fetch request with wrong member Id - metadata = new ShareRequestMetadata(wrongMemberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) + // Sending a third Share Fetch request with wrong member Id + shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) + metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty) shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -2044,14 +2132,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // Send the first share fetch request to initialize the share partition + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic created above produceData(topicIdPartition, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition) - - // Send the share fetch request to fetch the records produced above - var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) + // Send the second share fetch request to fetch the records produced above + var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) + var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty) val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -2071,7 +2163,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo compareFetchResponsePartitions(expectedPartitionData, partitionData) // Sending a Share Acknowledge request with wrong member Id - metadata = new ShareRequestMetadata(wrongMemberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) + shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) + metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]] = Map(topicIdPartition -> List(new ShareAcknowledgeRequestData.AcknowledgementBatch() .setFirstOffset(0) @@ -2122,15 +2215,19 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicIdPartition1 = new TopicIdPartition(topicId, new TopicPartition(topic, partition1)) val topicIdPartition2 = new TopicIdPartition(topicId, new TopicPartition(topic, partition2)) + val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2) + + // Send the first share fetch request to initialize the share partition + sendFirstShareFetchRequest(memberId, groupId, send) + initProducer() // Producing 10 records to the topic partitions created above produceData(topicIdPartition1, 10) produceData(topicIdPartition2, 10) - val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2) - - // Send the share fetch request to fetch the records produced above - var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) + // Send the second share fetch request to fetch the records produced above + var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) + var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap) var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -2145,8 +2242,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo produceData(topicIdPartition1, 10) produceData(topicIdPartition2, 10) - // Send the share fetch request to with forget list populated with topicIdPartition2 - metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) + // Send another share fetch request with forget list populated with topicIdPartition2 + shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) + metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val forget: Seq[TopicIdPartition] = Seq(topicIdPartition1) shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, Seq.empty, forget, acknowledgementsMap) shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) @@ -2167,6 +2265,12 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo compareFetchResponsePartitions(expectedPartitionData, partitionData) } + private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String, topicIdPartitions: Seq[TopicIdPartition]): Unit = { + val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) + val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, topicIdPartitions, Seq.empty, Map.empty) + connectAndReceive[ShareFetchResponse](shareFetchRequest) + } + private def expectedAcquiredRecords(firstOffsets: util.List[Long], lastOffsets: util.List[Long], deliveryCounts: util.List[Int]): util.List[AcquiredRecords] = { val acquiredRecordsList: util.List[AcquiredRecords] = new util.ArrayList() for (i <- firstOffsets.indices) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java index b65e297bb047e..934055d9d5bf2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java @@ -21,8 +21,10 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -31,6 +33,8 @@ import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Type.INT; +import static org.apache.kafka.common.config.ConfigDef.Type.STRING; +import static org.apache.kafka.common.config.ConfigDef.ValidString.in; /** * Group configuration related parameters and supporting methods like validation, etc. are @@ -48,6 +52,10 @@ public final class GroupConfig extends AbstractConfig { public static final String SHARE_RECORD_LOCK_DURATION_MS_CONFIG = "share.record.lock.duration.ms"; + public static final String SHARE_AUTO_OFFSET_RESET_CONFIG = "share.auto.offset.reset"; + public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = ShareGroupAutoOffsetReset.LATEST.toString(); + public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to initialize the share-partition start offset."; + public final int consumerSessionTimeoutMs; public final int consumerHeartbeatIntervalMs; @@ -58,6 +66,8 @@ public final class GroupConfig extends AbstractConfig { public final int shareRecordLockDurationMs; + public final String shareAutoOffsetReset; + private static final ConfigDef CONFIG = new ConfigDef() .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, @@ -88,7 +98,13 @@ public final class GroupConfig extends AbstractConfig { ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1000), MEDIUM, - ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC); + ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC) + .define(SHARE_AUTO_OFFSET_RESET_CONFIG, + STRING, + SHARE_AUTO_OFFSET_RESET_DEFAULT, + in(Utils.enumOptions(ShareGroupAutoOffsetReset.class)), + MEDIUM, + SHARE_AUTO_OFFSET_RESET_DOC); public GroupConfig(Map props) { super(CONFIG, props, false); @@ -97,6 +113,7 @@ public GroupConfig(Map props) { this.shareSessionTimeoutMs = getInt(SHARE_SESSION_TIMEOUT_MS_CONFIG); this.shareHeartbeatIntervalMs = getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG); this.shareRecordLockDurationMs = getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG); + this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG); } public static ConfigDef configDef() { @@ -203,6 +220,13 @@ public static GroupConfig fromProps(Map defaults, Properties overrides) { return new GroupConfig(props); } + /** + * The default share group auto offset reset strategy. + */ + public static ShareGroupAutoOffsetReset defaultShareAutoOffsetReset() { + return ShareGroupAutoOffsetReset.valueOf(SHARE_AUTO_OFFSET_RESET_DEFAULT.toUpperCase(Locale.ROOT)); + } + /** * The consumer group session timeout in milliseconds. */ @@ -237,4 +261,20 @@ public int shareHeartbeatIntervalMs() { public int shareRecordLockDurationMs() { return shareRecordLockDurationMs; } + + /** + * The share group auto offset reset strategy. + */ + public ShareGroupAutoOffsetReset shareAutoOffsetReset() { + return ShareGroupAutoOffsetReset.valueOf(shareAutoOffsetReset.toUpperCase(Locale.ROOT)); + } + + public enum ShareGroupAutoOffsetReset { + LATEST, EARLIEST; + + @Override + public String toString() { + return super.toString().toLowerCase(Locale.ROOT); + } + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java index 9a817a0a40d21..fe11f50d2ff43 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigTest; @@ -27,6 +28,7 @@ import java.util.Map; import java.util.Properties; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -57,8 +59,10 @@ public void testFromPropsInvalid() { assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2"); } else if (GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG.equals(name)) { assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2"); + } else if (GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG.equals(name)) { + assertPropertyInvalid(name, "hello", "1.0"); } else { - assertPropertyInvalid(name, "not_a_number", "-1"); + assertPropertyInvalid(name, "not_a_number", "-0.1"); } }); } @@ -71,6 +75,21 @@ private void assertPropertyInvalid(String name, Object... values) { } } + @Test + public void testValidShareAutoOffsetResetValues() { + + Properties props = createValidGroupConfig(); + + // Check for value "latest" + props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest"); + doTestValidProps(props); + props = createValidGroupConfig(); + + // Check for value "earliest" + props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest"); + doTestValidProps(props); + } + @Test public void testInvalidProps() { @@ -78,56 +97,65 @@ public void testInvalidProps() { // Check for invalid consumerSessionTimeoutMs, < MIN props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "1"); - doTestInvalidProps(props); + doTestInvalidProps(props, InvalidConfigurationException.class); props = createValidGroupConfig(); // Check for invalid consumerSessionTimeoutMs, > MAX props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "70000"); - doTestInvalidProps(props); + doTestInvalidProps(props, InvalidConfigurationException.class); props = createValidGroupConfig(); // Check for invalid consumerHeartbeatIntervalMs, < MIN props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "1"); - doTestInvalidProps(props); + doTestInvalidProps(props, InvalidConfigurationException.class); props = createValidGroupConfig(); // Check for invalid consumerHeartbeatIntervalMs, > MAX props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "70000"); - doTestInvalidProps(props); + doTestInvalidProps(props, InvalidConfigurationException.class); props = createValidGroupConfig(); // Check for invalid shareSessionTimeoutMs, < MIN props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "1"); - doTestInvalidProps(props); + doTestInvalidProps(props, InvalidConfigurationException.class); props = createValidGroupConfig(); // Check for invalid shareSessionTimeoutMs, > MAX props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "70000"); - doTestInvalidProps(props); + doTestInvalidProps(props, InvalidConfigurationException.class); props = createValidGroupConfig(); // Check for invalid shareHeartbeatIntervalMs, < MIN props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "1"); - doTestInvalidProps(props); + doTestInvalidProps(props, InvalidConfigurationException.class); props = createValidGroupConfig(); // Check for invalid shareHeartbeatIntervalMs, > MAX props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "70000"); - doTestInvalidProps(props); + doTestInvalidProps(props, InvalidConfigurationException.class); props = createValidGroupConfig(); // Check for invalid shareRecordLockDurationMs, < MIN props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "10000"); - doTestInvalidProps(props); + doTestInvalidProps(props, InvalidConfigurationException.class); props = createValidGroupConfig(); // Check for invalid shareRecordLockDurationMs, > MAX props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "70000"); - doTestInvalidProps(props); + doTestInvalidProps(props, InvalidConfigurationException.class); + props = createValidGroupConfig(); + + // Check for invalid shareAutoOffsetReset + props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "hello"); + doTestInvalidProps(props, ConfigException.class); } - private void doTestInvalidProps(Properties props) { - assertThrows(InvalidConfigurationException.class, () -> GroupConfig.validate(props, createGroupCoordinatorConfig(), createShareGroupConfig())); + private void doTestInvalidProps(Properties props, Class exceptionClassName) { + assertThrows(exceptionClassName, () -> GroupConfig.validate(props, createGroupCoordinatorConfig(), createShareGroupConfig())); + } + + private void doTestValidProps(Properties props) { + assertDoesNotThrow(() -> GroupConfig.validate(props, createGroupCoordinatorConfig(), createShareGroupConfig())); } @Test @@ -138,6 +166,7 @@ public void testFromPropsWithDefaultValue() { defaultValue.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "10"); defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10"); defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "2000"); + defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest"); Properties props = new Properties(); props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20"); @@ -148,6 +177,7 @@ public void testFromPropsWithDefaultValue() { assertEquals(10, config.getInt(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG)); assertEquals(10, config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG)); assertEquals(2000, config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG)); + assertEquals("latest", config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG)); } @Test @@ -165,6 +195,7 @@ private Properties createValidGroupConfig() { props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "45000"); props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000"); props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000"); + props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest"); return props; } diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java index 2fa0d2162d3eb..a08f85a8108f4 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java @@ -401,7 +401,7 @@ public ReadShareGroupStateResponseData readState(ReadShareGroupStateRequestData return ReadShareGroupStateResponse.toResponseData( topicId, partition, - PartitionFactory.DEFAULT_START_OFFSET, + PartitionFactory.UNINITIALIZED_START_OFFSET, PartitionFactory.DEFAULT_STATE_EPOCH, Collections.emptyList() ); diff --git a/share/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java b/share/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java index 9d37b114b2730..83d3d7d74a89b 100644 --- a/share/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java +++ b/share/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java @@ -53,7 +53,7 @@ public CompletableFuture readState(ReadShareGroupStat for (TopicData topicData : reqData.topicsData()) { resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream(). map(partitionIdData -> PartitionFactory.newPartitionAllData( - partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.DEFAULT_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE, Collections.emptyList())) + partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE, Collections.emptyList())) .collect(Collectors.toList()))); } return CompletableFuture.completedFuture(new ReadShareGroupStateResult.Builder().setTopicsData(resultArgs).build()); @@ -93,7 +93,7 @@ public CompletableFuture readSummary(ReadShare for (TopicData topicData : reqData.topicsData()) { resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream(). map(partitionIdData -> PartitionFactory.newPartitionStateErrorData( - partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.DEFAULT_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE)) + partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE)) .collect(Collectors.toList()))); } return CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build()); diff --git a/share/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java b/share/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java index 7336547e8d672..abd44a854ee70 100644 --- a/share/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java +++ b/share/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java @@ -26,17 +26,17 @@ */ public class PartitionFactory { public static final int DEFAULT_STATE_EPOCH = 0; - public static final int DEFAULT_START_OFFSET = 0; + public static final int UNINITIALIZED_START_OFFSET = -1; public static final short DEFAULT_ERROR_CODE = Errors.NONE.code(); public static final int DEFAULT_LEADER_EPOCH = 0; public static final String DEFAULT_ERR_MESSAGE = Errors.NONE.message(); public static PartitionIdData newPartitionIdData(int partition) { - return new PartitionData(partition, DEFAULT_STATE_EPOCH, DEFAULT_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null); + return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null); } public static PartitionIdLeaderEpochData newPartitionIdLeaderEpochData(int partition, int leaderEpoch) { - return new PartitionData(partition, DEFAULT_STATE_EPOCH, DEFAULT_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, null); + return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, null); } public static PartitionStateData newPartitionStateData(int partition, int stateEpoch, long startOffset) { @@ -44,7 +44,7 @@ public static PartitionStateData newPartitionStateData(int partition, int stateE } public static PartitionErrorData newPartitionErrorData(int partition, short errorCode, String errorMessage) { - return new PartitionData(partition, DEFAULT_STATE_EPOCH, DEFAULT_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null); + return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null); } public static PartitionStateErrorData newPartitionStateErrorData(int partition, int stateEpoch, long startOffset, short errorCode, String errorMessage) {