Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the replication in segment assignment strategy #9816

Merged
merged 1 commit into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,7 +51,11 @@ public void init(HelixManager helixManager, TableConfig tableConfig) {
_tableNameWithType = tableConfig.getTableName();
SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig();
Preconditions.checkState(validationAndRetentionConfig != null, "Validation Config is null");
_replication = validationAndRetentionConfig.getReplicationNumber();
// Number of replicas per partition of low-level consumers check is for the real time tables only
// TODO: Cleanup required once we fetch the replication number from table config depending on table type
_replication = tableConfig.getTableType() == TableType.REALTIME
? validationAndRetentionConfig.getReplicasPerPartitionNumber()
: validationAndRetentionConfig.getReplicationNumber();
LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table: " + "{} with replication: {}",
_tableNameWithType, _replication);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ public void init(HelixManager helixManager, TableConfig tableConfig) {
_tableName = tableConfig.getTableName();
SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig();
Preconditions.checkState(validationAndRetentionConfig != null, "Validation Config is null");
_replication = validationAndRetentionConfig.getReplicationNumber();
// Number of replicas per partition of low-level consumers check is for the real time tables only
// TODO: Cleanup required once we fetch the replication number from table config depending on table type
_replication = tableConfig.getTableType() == TableType.REALTIME
? validationAndRetentionConfig.getReplicasPerPartitionNumber()
: validationAndRetentionConfig.getReplicationNumber();
ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
validationAndRetentionConfig.getReplicaGroupStrategyConfig();
_partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

public class RealtimeNonReplicaGroupSegmentAssignmentTest {
private static final int NUM_REPLICAS = 3;
private static final String NUM_REPLICAS_PER_PARTITION = "4";
private static final int NUM_PARTITIONS = 4;
private static final int NUM_SEGMENTS = 100;
private static final String CONSUMING_INSTANCE_NAME_PREFIX = "consumingInstance_";
Expand Down Expand Up @@ -110,6 +111,38 @@ public void testFactory() {
assertTrue(_segmentAssignment instanceof RealtimeSegmentAssignment);
}

@Test
public void testReplicationForSegmentAssignment() {
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
.setLLC(true).build();
// Update the replication by changing the NUM_REPLICAS_PER_PARTITION
tableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_PER_PARTITION);
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), tableConfig);

Map<InstancePartitionsType, InstancePartitions> onlyCompletedInstancePartitionMap =
ImmutableMap.of(InstancePartitionsType.COMPLETED, _instancePartitionsMap.get(InstancePartitionsType.COMPLETED));
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();

Map<String, List<String>> expectedUploadedSegmentToInstances = ImmutableMap.of("uploadedSegment_0",
ImmutableList.of("completedInstance_0", "completedInstance_1", "completedInstance_2", "completedInstance_3"),
"uploadedSegment_1",
ImmutableList.of("completedInstance_4", "completedInstance_5", "completedInstance_6", "completedInstance_7"),
"uploadedSegment_2",
ImmutableList.of("completedInstance_8", "completedInstance_9", "completedInstance_0", "completedInstance_1"),
"uploadedSegment_3",
ImmutableList.of("completedInstance_2", "completedInstance_3", "completedInstance_4", "completedInstance_5"));

expectedUploadedSegmentToInstances.forEach((segmentName, expectedInstances) -> {
List<String> actualInstances =
segmentAssignment.assignSegment(segmentName, currentAssignment, onlyCompletedInstancePartitionMap);
assertEquals(actualInstances, expectedInstances);
currentAssignment
.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(actualInstances, SegmentStateModel.ONLINE));
});
}

@Test
public void testAssignSegment() {
Map<InstancePartitionsType, InstancePartitions> onlyConsumingInstancePartitionMap =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,17 @@ public void testBalancedNumSegmentAssignmentStrategyforOfflineTables() {
}

@Test
public void testBalancedNumSegmentAssignmentStrategyforRealtimeTables() {
TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).build();

public void testBalancedNumSegmentAssignmentStrategyForRealtimeTables() {
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setLLC(true).build();
InstancePartitions instancePartitions = new InstancePartitions(INSTANCE_PARTITIONS_NAME);
instancePartitions.setInstances(0, 0, INSTANCES);

SegmentAssignmentStrategy segmentAssignmentStrategy = SegmentAssignmentStrategyFactory
.getSegmentAssignmentStrategy(null, tableConfig, InstancePartitionsType.COMPLETED.toString(),
instancePartitions);
Assert.assertNotNull(segmentAssignmentStrategy);

Assert.assertTrue(segmentAssignmentStrategy instanceof BalancedNumSegmentAssignmentStrategy);
}

Expand Down