Skip to content

Commit

Permalink
KAFKA-16784 Migrate TopicBasedRemoteLogMetadataManagerMultipleSubscri…
Browse files Browse the repository at this point in the history
…ptionsTest to use ClusterTestExtensions (#15992)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
FrankYang0529 authored May 21, 2024
1 parent 8908352 commit 9fe3932
Showing 1 changed file with 100 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@
package org.apache.kafka.server.log.remote.metadata.storage;


import kafka.utils.EmptyTestInfo;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
Expand All @@ -26,18 +32,14 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
Expand All @@ -48,67 +50,34 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
@Tag("integration")
@ExtendWith(ClusterTestExtensions.class)
@ClusterTestDefaults(brokers = 3)
public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
private final ClusterInstance clusterInstance;

private static final int SEG_SIZE = 1024 * 1024;

private final Time time = new MockTime(1);
private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness();

private TopicBasedRemoteLogMetadataManager rlmm() {
return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest(ClusterInstance clusterInstance) {
this.clusterInstance = clusterInstance;
}

@BeforeEach
public void setup() {
// Start the cluster only.
remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo());
}

@AfterEach
public void teardown() throws IOException {
remoteLogMetadataManagerHarness.close();
}

@Test
@ClusterTest
public void testMultiplePartitionSubscriptions() throws Exception {
// Create topics.
String leaderTopic = "leader";
HashMap<Object, Seq<Object>> assignedLeaderTopicReplicas = new HashMap<>();
List<Object> leaderTopicReplicas = new ArrayList<>();
// Set broker id 0 as the first entry which is taken as the leader.
leaderTopicReplicas.add(0);
leaderTopicReplicas.add(1);
leaderTopicReplicas.add(2);
assignedLeaderTopicReplicas.put(0, JavaConverters.asScalaBuffer(leaderTopicReplicas));
remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic,
JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
remoteLogMetadataManagerHarness.listenerName());
createTopic(leaderTopic, Collections.singletonMap(0, Arrays.asList(0, 1, 2)));

String followerTopic = "follower";
HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new HashMap<>();
List<Object> followerTopicReplicas = new ArrayList<>();
// Set broker id 1 as the first entry which is taken as the leader.
followerTopicReplicas.add(1);
followerTopicReplicas.add(2);
followerTopicReplicas.add(0);
assignedFollowerTopicReplicas.put(0, JavaConverters.asScalaBuffer(followerTopicReplicas));
remoteLogMetadataManagerHarness.createTopicWithAssignment(
followerTopic, JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
remoteLogMetadataManagerHarness.listenerName());
createTopic(followerTopic, Collections.singletonMap(0, Arrays.asList(1, 2, 0)));

String topicWithNoMessages = "no-messages-topic";
HashMap<Object, Seq<Object>> assignedTopicReplicas = new HashMap<>();
List<Object> noMessagesTopicReplicas = new ArrayList<>();
// Set broker id 1 as the first entry which is taken as the leader.
noMessagesTopicReplicas.add(1);
noMessagesTopicReplicas.add(2);
noMessagesTopicReplicas.add(0);
assignedTopicReplicas.put(0, JavaConverters.asScalaBuffer(noMessagesTopicReplicas));
remoteLogMetadataManagerHarness.createTopicWithAssignment(
topicWithNoMessages, JavaConverters.mapAsScalaMap(assignedTopicReplicas),
remoteLogMetadataManagerHarness.listenerName());
createTopic(topicWithNoMessages, Collections.singletonMap(0, Arrays.asList(1, 2, 0)));

final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
Expand All @@ -134,71 +103,84 @@ public void testMultiplePartitionSubscriptions() throws Exception {
return result;
}).when(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(any());

remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), true, numMetadataTopicPartitions -> new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions) {
@Override
public int metadataPartition(TopicIdPartition topicIdPartition) {
// Always return partition 0 except for noMessagesTopicIdPartition. So that, any new user
// partition(other than noMessagesTopicIdPartition) added to RLMM will use the same metadata partition.
// That will make the secondary consumer assignment.
if (emptyTopicIdPartition.equals(topicIdPartition)) {
return 1;
} else {
return 0;
}
}
}, () -> spyRemotePartitionMetadataStore);

// Add segments for these partitions but an exception is received as they have not yet been subscribed.
// These messages would have been published to the respective metadata topic partitions but the ConsumerManager
// has not yet been subscribing as they are not yet registered.
RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () -> rlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []",
exception.getMessage());

RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
exception = Assertions.assertThrows(ExecutionException.class, () -> rlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []",
exception.getMessage());

// `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered.
Assertions.assertThrows(RemoteStorageException.class, () -> rlmm().listRemoteLogSegments(leaderTopicIdPartition));
Assertions.assertThrows(RemoteStorageException.class, () -> rlmm().listRemoteLogSegments(followerTopicIdPartition));

rlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition),
Collections.emptySet());

// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
initializationPhaser.awaitAdvanceInterruptibly(initializationPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS); // similar to CountdownLatch::await
handleRemoteLogSegmentMetadataPhaser.awaitAdvanceInterruptibly(handleRemoteLogSegmentMetadataPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS);
verify(spyRemotePartitionMetadataStore).markInitialized(leaderTopicIdPartition);
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(leaderSegmentMetadata);
clearInvocations(spyRemotePartitionMetadataStore);

// leader partitions would have received as it is registered, but follower partition is not yet registered,
// hence it throws an exception.
Assertions.assertTrue(rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext());
Assertions.assertThrows(RemoteStorageException.class, () -> rlmm().listRemoteLogSegments(followerTopicIdPartition));

// Register follower partition
// Phaser::bulkRegister and Phaser::register provide the "countUp" feature
initializationPhaser.bulkRegister(2); // 1 for emptyTopicIdPartition and 1 for followerTopicIdPartition
handleRemoteLogSegmentMetadataPhaser.register(); // 1 for followerTopicIdPartition, emptyTopicIdPartition doesn't have a RemoteLogSegmentMetadata event
rlmm().onPartitionLeadershipChanges(Collections.singleton(emptyTopicIdPartition),
Collections.singleton(followerTopicIdPartition));

initializationPhaser.awaitAdvanceInterruptibly(initializationPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS);
handleRemoteLogSegmentMetadataPhaser.awaitAdvanceInterruptibly(handleRemoteLogSegmentMetadataPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS);

verify(spyRemotePartitionMetadataStore).markInitialized(followerTopicIdPartition);
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(followerSegmentMetadata);
// In this state, all the metadata should be available in RLMM for both leader and follower partitions.
Assertions.assertTrue(rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments found");
Assertions.assertTrue(rlmm().listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments found");
try (TopicBasedRemoteLogMetadataManager remoteLogMetadataManager = RemoteLogMetadataManagerTestUtils.builder()
.bootstrapServers(clusterInstance.bootstrapServers())
.startConsumerThread(true)
.remoteLogMetadataTopicPartitioner(numMetadataTopicPartitions -> new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions) {
@Override
public int metadataPartition(TopicIdPartition topicIdPartition) {
// Always return partition 0 except for noMessagesTopicIdPartition. So that, any new user
// partition(other than noMessagesTopicIdPartition) added to RLMM will use the same metadata partition.
// That will make the secondary consumer assignment.
if (emptyTopicIdPartition.equals(topicIdPartition)) {
return 1;
} else {
return 0;
}
}
})
.remotePartitionMetadataStore(() -> spyRemotePartitionMetadataStore)
.build()) {

// Add segments for these partitions but an exception is received as they have not yet been subscribed.
// These messages would have been published to the respective metadata topic partitions but the ConsumerManager
// has not yet been subscribing as they are not yet registered.
RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []",
exception.getMessage());

RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
exception = Assertions.assertThrows(ExecutionException.class, () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []",
exception.getMessage());

// `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered.
Assertions.assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition));
Assertions.assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition));


remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition),
Collections.emptySet());
// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
initializationPhaser.awaitAdvanceInterruptibly(initializationPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS); // similar to CountdownLatch::await
handleRemoteLogSegmentMetadataPhaser.awaitAdvanceInterruptibly(handleRemoteLogSegmentMetadataPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS);
verify(spyRemotePartitionMetadataStore).markInitialized(leaderTopicIdPartition);
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(leaderSegmentMetadata);
clearInvocations(spyRemotePartitionMetadataStore);

// leader partitions would have received as it is registered, but follower partition is not yet registered,
// hence it throws an exception.
Assertions.assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext());
Assertions.assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition));

// Register follower partition
// Phaser::bulkRegister and Phaser::register provide the "countUp" feature
initializationPhaser.bulkRegister(2); // 1 for emptyTopicIdPartition and 1 for followerTopicIdPartition
handleRemoteLogSegmentMetadataPhaser.register(); // 1 for followerTopicIdPartition, emptyTopicIdPartition doesn't have a RemoteLogSegmentMetadata event
remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(emptyTopicIdPartition),
Collections.singleton(followerTopicIdPartition));

initializationPhaser.awaitAdvanceInterruptibly(initializationPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS);
handleRemoteLogSegmentMetadataPhaser.awaitAdvanceInterruptibly(handleRemoteLogSegmentMetadataPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS);

verify(spyRemotePartitionMetadataStore).markInitialized(followerTopicIdPartition);
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(followerSegmentMetadata);
// In this state, all the metadata should be available in RLMM for both leader and follower partitions.
Assertions.assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments found");
Assertions.assertTrue(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments found");
}
}

private void createTopic(String topic, Map<Integer, List<Integer>> replicasAssignments) {
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
admin.createTopics(Collections.singletonList(new NewTopic(topic, replicasAssignments)));
Assertions.assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic, replicasAssignments.size()));
}
}
}

0 comments on commit 9fe3932

Please sign in to comment.