Skip to content

Commit

Permalink
[Clean up] Auto reformat controller tests (#9706)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Nov 1, 2022
1 parent 85ee94f commit 694999c
Show file tree
Hide file tree
Showing 13 changed files with 372 additions and 378 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@
import org.mockito.ArgumentMatchers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;


/**
Expand All @@ -76,7 +78,6 @@ public class ConsumingSegmentInfoReaderStatelessTest {
private PinotHelixResourceManager _helix;
private final Map<String, FakeConsumingInfoServer> _serverMap = new HashMap<>();


@BeforeClass
public void setUp()
throws IOException {
Expand All @@ -88,56 +89,54 @@ public void setUp()
partitionToOffset0.put("0", "150");
Map<String, String> partitionToOffset1 = new HashMap<>();
partitionToOffset1.put("1", "150");
FakeConsumingInfoServer s0 = new FakeConsumingInfoServer(Lists
.newArrayList(
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0,
partitionToOffset0, new SegmentConsumerInfo.PartitionOffsetInfo(
partitionToOffset0, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())),
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
partitionToOffset1, new SegmentConsumerInfo.PartitionOffsetInfo(
partitionToOffset1, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))));
FakeConsumingInfoServer s0 = new FakeConsumingInfoServer(Lists.newArrayList(
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0, partitionToOffset0,
new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset0, Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap())),
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1,
new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap()))));
s0.start(uriPath, createHandler(200, s0._consumerInfos, 0));
_serverMap.put("server0", s0);

// server1 - 1 consumer each for p0 and p1. CONSUMING.
FakeConsumingInfoServer s1 = new FakeConsumingInfoServer(Lists
.newArrayList(
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0,
partitionToOffset0, new SegmentConsumerInfo.PartitionOffsetInfo(
partitionToOffset0, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())),
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
partitionToOffset1, new SegmentConsumerInfo.PartitionOffsetInfo(
partitionToOffset1, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))));
FakeConsumingInfoServer s1 = new FakeConsumingInfoServer(Lists.newArrayList(
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0, partitionToOffset0,
new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset0, Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap())),
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1,
new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap()))));
s1.start(uriPath, createHandler(200, s1._consumerInfos, 0));
_serverMap.put("server1", s1);

// server2 - p1 consumer CONSUMING. p0 consumer NOT_CONSUMING
FakeConsumingInfoServer s2 = new FakeConsumingInfoServer(Lists
.newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "NOT_CONSUMING", 0,
partitionToOffset0, new SegmentConsumerInfo.PartitionOffsetInfo(
partitionToOffset0, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())),
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1,
new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap()))));
FakeConsumingInfoServer s2 = new FakeConsumingInfoServer(Lists.newArrayList(
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "NOT_CONSUMING", 0, partitionToOffset0,
new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset0, Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap())),
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1,
new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap()))));
s2.start(uriPath, createHandler(200, s2._consumerInfos, 0));
_serverMap.put("server2", s2);

// server3 - 1 consumer for p1. No consumer for p0
FakeConsumingInfoServer s3 = new FakeConsumingInfoServer(
Lists.newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
partitionToOffset1, new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, Collections.emptyMap(),
FakeConsumingInfoServer s3 = new FakeConsumingInfoServer(Lists.newArrayList(
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1,
new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap()))));
s3.start(uriPath, createHandler(200, s3._consumerInfos, 0));
_serverMap.put("server3", s3);

// server4 - unreachable/error/timeout
FakeConsumingInfoServer s4 = new FakeConsumingInfoServer(Lists
.newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0,
partitionToOffset0, new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset0,
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())),
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1,
new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap()))));
FakeConsumingInfoServer s4 = new FakeConsumingInfoServer(Lists.newArrayList(
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0, partitionToOffset0,
new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset0, Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap())),
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1,
new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap()))));
s4.start(uriPath, createHandler(200, s4._consumerInfos, TIMEOUT_MSEC * EXTENDED_TIMEOUT_FACTOR));
_serverMap.put("server4", s4);
}
Expand Down Expand Up @@ -198,11 +197,11 @@ private BiMap<String, String> serverEndpoints(String... servers) {
private void mockSetup(final String[] servers, final Set<String> consumingSegments)
throws InvalidConfigException {
when(_helix.getServerToSegmentsMap(anyString())).thenAnswer(invocationOnMock -> subsetOfServerSegments(servers));
when(_helix.getDataInstanceAdminEndpoints(ArgumentMatchers.anySet()))
.thenAnswer(invocationOnMock -> serverEndpoints(servers));
when(_helix.getDataInstanceAdminEndpoints(ArgumentMatchers.anySet())).thenAnswer(
invocationOnMock -> serverEndpoints(servers));
when(_helix.getConsumingSegments(anyString())).thenAnswer(invocationOnMock -> consumingSegments);
when(_helix.getServersForSegment(anyString(), anyString())).thenAnswer(invocationOnMock -> new HashSet<>(
Arrays.asList(servers)));
when(_helix.getServersForSegment(anyString(), anyString())).thenAnswer(
invocationOnMock -> new HashSet<>(Arrays.asList(servers)));
}

private ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap testRunner(final String[] servers,
Expand All @@ -224,9 +223,8 @@ private TableStatus.IngestionStatus testRunnerIngestionStatus(final String[] ser
private void checkIngestionStatus(final String[] servers, final Set<String> consumingSegments,
TableStatus.IngestionState expectedState)
throws InvalidConfigException {
TableStatus.IngestionStatus ingestionStatus =
testRunnerIngestionStatus(servers, consumingSegments, TABLE_NAME);
Assert.assertEquals(ingestionStatus.getIngestionState(), expectedState);
TableStatus.IngestionStatus ingestionStatus = testRunnerIngestionStatus(servers, consumingSegments, TABLE_NAME);
assertEquals(ingestionStatus.getIngestionState(), expectedState);
}

@Test
Expand All @@ -235,7 +233,7 @@ public void testEmptyTable()
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap consumingSegmentsInfoMap =
testRunner(new String[]{}, Collections.emptySet(), TABLE_NAME);
checkIngestionStatus(new String[]{}, Collections.emptySet(), TableStatus.IngestionState.HEALTHY);
Assert.assertTrue(consumingSegmentsInfoMap._segmentToConsumingInfoMap.isEmpty());
assertTrue(consumingSegmentsInfoMap._segmentToConsumingInfoMap.isEmpty());
}

/**
Expand All @@ -252,16 +250,16 @@ public void testHappyPath()

List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos =
consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
Assert.assertEquals(consumingSegmentInfos.size(), 2);
assertEquals(consumingSegmentInfos.size(), 2);
for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) {
checkConsumingSegmentInfo(info, Sets.newHashSet("server0", "server1"),
ConsumerState.CONSUMING.toString(), "0", "150");
checkConsumingSegmentInfo(info, Sets.newHashSet("server0", "server1"), ConsumerState.CONSUMING.toString(), "0",
"150");
}
consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
Assert.assertEquals(consumingSegmentInfos.size(), 2);
assertEquals(consumingSegmentInfos.size(), 2);
for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) {
checkConsumingSegmentInfo(info, Sets.newHashSet("server0", "server1"),
ConsumerState.CONSUMING.toString(), "1", "150");
checkConsumingSegmentInfo(info, Sets.newHashSet("server0", "server1"), ConsumerState.CONSUMING.toString(), "1",
"150");
}
}

Expand All @@ -279,21 +277,19 @@ public void testNotConsumingState()

List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos =
consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
Assert.assertEquals(consumingSegmentInfos.size(), 2);
assertEquals(consumingSegmentInfos.size(), 2);
for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) {
if (info._serverName.equals("server0")) {
checkConsumingSegmentInfo(info, Sets.newHashSet("server0"), ConsumerState.CONSUMING.toString(),
"0", "150");
checkConsumingSegmentInfo(info, Sets.newHashSet("server0"), ConsumerState.CONSUMING.toString(), "0", "150");
} else {
checkConsumingSegmentInfo(info, Sets.newHashSet("server2"),
ConsumerState.NOT_CONSUMING.toString(), "0", "150");
checkConsumingSegmentInfo(info, Sets.newHashSet("server2"), ConsumerState.NOT_CONSUMING.toString(), "0", "150");
}
}
consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
Assert.assertEquals(consumingSegmentInfos.size(), 2);
assertEquals(consumingSegmentInfos.size(), 2);
for (ConsumingSegmentInfoReader.ConsumingSegmentInfo info : consumingSegmentInfos) {
checkConsumingSegmentInfo(info, Sets.newHashSet("server0", "server2"),
ConsumerState.CONSUMING.toString(), "1", "150");
checkConsumingSegmentInfo(info, Sets.newHashSet("server0", "server2"), ConsumerState.CONSUMING.toString(), "1",
"150");
}
}

Expand All @@ -311,10 +307,10 @@ public void testNoConsumerButConsumingInIdealState()

List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos =
consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
Assert.assertTrue(consumingSegmentInfos.isEmpty());
assertTrue(consumingSegmentInfos.isEmpty());

consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
Assert.assertEquals(consumingSegmentInfos.size(), 1);
assertEquals(consumingSegmentInfos.size(), 1);
checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server3"),
ConsumerState.CONSUMING.toString(), "1", "150");
}
Expand All @@ -334,10 +330,10 @@ public void testNoConsumerOfflineInIdealState()

List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos =
consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
Assert.assertNull(consumingSegmentInfos);
assertNull(consumingSegmentInfos);

consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
Assert.assertEquals(consumingSegmentInfos.size(), 1);
assertEquals(consumingSegmentInfos.size(), 1);
checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server3"),
ConsumerState.CONSUMING.toString(), "1", "150");
}
Expand All @@ -356,20 +352,20 @@ public void testErrorFromServer()

List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> consumingSegmentInfos =
consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
Assert.assertEquals(consumingSegmentInfos.size(), 1);
assertEquals(consumingSegmentInfos.size(), 1);
checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server0"),
ConsumerState.CONSUMING.toString(), "0", "150");

consumingSegmentInfos = consumingSegmentsInfoMap._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
Assert.assertEquals(consumingSegmentInfos.size(), 1);
assertEquals(consumingSegmentInfos.size(), 1);
checkConsumingSegmentInfo(consumingSegmentInfos.get(0), Sets.newHashSet("server0"),
ConsumerState.CONSUMING.toString(), "1", "150");
}

private void checkConsumingSegmentInfo(ConsumingSegmentInfoReader.ConsumingSegmentInfo info, Set<String> serverNames,
String consumerState, String partition, String offset) {
Assert.assertTrue(serverNames.contains(info._serverName));
Assert.assertEquals(info._consumerState, consumerState);
Assert.assertEquals(info._partitionOffsetInfo._currentOffsetsMap.get(partition), offset);
assertTrue(serverNames.contains(info._serverName));
assertEquals(info._consumerState, consumerState);
assertEquals(info._partitionOffsetInfo._currentOffsetsMap.get(partition), offset);
}
}
Loading

0 comments on commit 694999c

Please sign in to comment.