From f95ac084618997fe34063d2760ff22198c6a4f2c Mon Sep 17 00:00:00 2001 From: Sharayu Gandhi Date: Wed, 16 Nov 2022 12:32:06 -0800 Subject: [PATCH] Fix the replication in segment assignment --- .../BalancedNumSegmentAssignmentStrategy.java | 7 +++- ...ReplicaGroupSegmentAssignmentStrategy.java | 6 +++- ...eNonReplicaGroupSegmentAssignmentTest.java | 33 +++++++++++++++++++ .../SegmentAssignmentStrategyFactoryTest.java | 7 ++-- 4 files changed, 48 insertions(+), 5 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java index 58f1552f40ad..1d26abc296ff 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java @@ -26,6 +26,7 @@ import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,11 @@ public void init(HelixManager helixManager, TableConfig tableConfig) { _tableNameWithType = tableConfig.getTableName(); SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig(); Preconditions.checkState(validationAndRetentionConfig != null, "Validation Config is null"); - _replication = validationAndRetentionConfig.getReplicationNumber(); + // Number of replicas per partition of low-level consumers check is for the real time tables only + // TODO: Cleanup required once we fetch the replication number from table config depending on table type + _replication = tableConfig.getTableType() == TableType.REALTIME + ? validationAndRetentionConfig.getReplicasPerPartitionNumber() + : validationAndRetentionConfig.getReplicationNumber(); LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table: " + "{} with replication: {}", _tableNameWithType, _replication); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java index 1db91cb9a4bb..94069dc8c240 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java @@ -53,7 +53,11 @@ public void init(HelixManager helixManager, TableConfig tableConfig) { _tableName = tableConfig.getTableName(); SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig(); Preconditions.checkState(validationAndRetentionConfig != null, "Validation Config is null"); - _replication = validationAndRetentionConfig.getReplicationNumber(); + // Number of replicas per partition of low-level consumers check is for the real time tables only + // TODO: Cleanup required once we fetch the replication number from table config depending on table type + _replication = tableConfig.getTableType() == TableType.REALTIME + ? validationAndRetentionConfig.getReplicasPerPartitionNumber() + : validationAndRetentionConfig.getReplicationNumber(); ReplicaGroupStrategyConfig replicaGroupStrategyConfig = validationAndRetentionConfig.getReplicaGroupStrategyConfig(); _partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java index b52c5f87e470..4e2096794947 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java @@ -50,6 +50,7 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { private static final int NUM_REPLICAS = 3; + private static final String NUM_REPLICAS_PER_PARTITION = "4"; private static final int NUM_PARTITIONS = 4; private static final int NUM_SEGMENTS = 100; private static final String CONSUMING_INSTANCE_NAME_PREFIX = "consumingInstance_"; @@ -110,6 +111,38 @@ public void testFactory() { assertTrue(_segmentAssignment instanceof RealtimeSegmentAssignment); } + @Test + public void testReplicationForSegmentAssignment() { + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS) + .setLLC(true).build(); + // Update the replication by changing the NUM_REPLICAS_PER_PARTITION + tableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_PER_PARTITION); + SegmentAssignment segmentAssignment = + SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), tableConfig); + + Map onlyCompletedInstancePartitionMap = + ImmutableMap.of(InstancePartitionsType.COMPLETED, _instancePartitionsMap.get(InstancePartitionsType.COMPLETED)); + Map> currentAssignment = new TreeMap<>(); + + Map> expectedUploadedSegmentToInstances = ImmutableMap.of("uploadedSegment_0", + ImmutableList.of("completedInstance_0", "completedInstance_1", "completedInstance_2", "completedInstance_3"), + "uploadedSegment_1", + ImmutableList.of("completedInstance_4", "completedInstance_5", "completedInstance_6", "completedInstance_7"), + "uploadedSegment_2", + ImmutableList.of("completedInstance_8", "completedInstance_9", "completedInstance_0", "completedInstance_1"), + "uploadedSegment_3", + ImmutableList.of("completedInstance_2", "completedInstance_3", "completedInstance_4", "completedInstance_5")); + + expectedUploadedSegmentToInstances.forEach((segmentName, expectedInstances) -> { + List actualInstances = + segmentAssignment.assignSegment(segmentName, currentAssignment, onlyCompletedInstancePartitionMap); + assertEquals(actualInstances, expectedInstances); + currentAssignment + .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(actualInstances, SegmentStateModel.ONLINE)); + }); + } + @Test public void testAssignSegment() { Map onlyConsumingInstancePartitionMap = diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java index 7d0fd9567c45..b0f12240f51f 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java @@ -97,9 +97,9 @@ public void testBalancedNumSegmentAssignmentStrategyforOfflineTables() { } @Test - public void testBalancedNumSegmentAssignmentStrategyforRealtimeTables() { - TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).build(); - + public void testBalancedNumSegmentAssignmentStrategyForRealtimeTables() { + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setLLC(true).build(); InstancePartitions instancePartitions = new InstancePartitions(INSTANCE_PARTITIONS_NAME); instancePartitions.setInstances(0, 0, INSTANCES); @@ -107,6 +107,7 @@ public void testBalancedNumSegmentAssignmentStrategyforRealtimeTables() { .getSegmentAssignmentStrategy(null, tableConfig, InstancePartitionsType.COMPLETED.toString(), instancePartitions); Assert.assertNotNull(segmentAssignmentStrategy); + Assert.assertTrue(segmentAssignmentStrategy instanceof BalancedNumSegmentAssignmentStrategy); }