Skip to content

Commit

Permalink
Tuned UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Dec 7, 2022
1 parent 2081f00 commit 7b0ce5c
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ public void testReplicationWithRemoteTranslogEnabled() throws Exception {
ReplicationGroup replicationGroup = new ReplicationGroup(routingTable, inSyncAllocationIds, inSyncAllocationIds, 0);
List<ShardRouting> replicationTargets = replicationGroup.getReplicationTargets();
assertEquals(inSyncAllocationIds.size(), replicationTargets.size());
// assertEquals(1, replicationTargets.stream().filter(s -> s.getReplicationMode() == ReplicationMode.FULL_REPLICATION).count());
assertTrue(
replicationTargets.stream().map(sh -> sh.allocationId().getId()).collect(Collectors.toSet()).containsAll(inSyncAllocationIds)
);

Request request = new Request(shardId);
PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
Expand Down Expand Up @@ -288,7 +290,9 @@ public void testPrimaryToPrimaryReplicationWithRemoteTranslogEnabled() throws Ex
ReplicationGroup replicationGroup = new ReplicationGroup(routingTable, inSyncAllocationIds, inSyncAllocationIds, 0);
List<ShardRouting> replicationTargets = replicationGroup.getReplicationTargets();
assertEquals(inSyncAllocationIds.size(), replicationTargets.size());
// assertEquals(2, replicationTargets.stream().filter(s -> s.getReplicationMode() == ReplicationMode.FULL_REPLICATION).count());
assertTrue(
replicationTargets.stream().map(sh -> sh.allocationId().getId()).collect(Collectors.toSet()).containsAll(inSyncAllocationIds)
);

Request request = new Request(shardId);
PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
Expand Down Expand Up @@ -348,7 +352,9 @@ public void testForceReplicationWithRemoteTranslogEnabled() throws Exception {
ReplicationGroup replicationGroup = new ReplicationGroup(routingTable, inSyncAllocationIds, inSyncAllocationIds, 0);
List<ShardRouting> replicationTargets = replicationGroup.getReplicationTargets();
assertEquals(inSyncAllocationIds.size(), replicationTargets.size());
// assertEquals(1, replicationTargets.stream().filter(s -> s.getReplicationMode() == ReplicationMode.FULL_REPLICATION).count());
assertTrue(
replicationTargets.stream().map(sh -> sh.allocationId().getId()).collect(Collectors.toSet()).containsAll(inSyncAllocationIds)
);

Request request = new Request(shardId);
PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1311,14 +1311,12 @@ public void testGlobalCheckpointUpdateWithRemoteTranslogEnabled() {
assertThat(tracker.getReplicationGroup().getReplicationTargets().size(), equalTo(1));
initializing.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED, false));
assertThat(tracker.getReplicationGroup().getReplicationTargets().size(), equalTo(1 + initializing.size()));
// assertEquals(
// tracker.getReplicationGroup()
// .getReplicationTargets()
// .stream()
// .filter(s -> s.getReplicationMode() == ReplicationTracker.ReplicationMode.FULL_REPLICATION)
// .count(),
// 1
// );
Set<AllocationId> replicationTargets = tracker.getReplicationGroup()
.getReplicationTargets()
.stream()
.map(ShardRouting::allocationId)
.collect(Collectors.toSet());
assertTrue(replicationTargets.containsAll(initializing));
allocations.keySet().forEach(aId -> updateLocalCheckpoint(tracker, aId.getId(), allocations.get(aId)));

assertEquals(tracker.getGlobalCheckpoint(), primaryLocalCheckpoint);
Expand All @@ -1328,11 +1326,7 @@ public void testGlobalCheckpointUpdateWithRemoteTranslogEnabled() {
initializing.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
allocations.keySet().forEach(aId -> updateLocalCheckpoint(tracker, aId.getId(), allocations.get(aId)));

final long minLocalCheckpointAfterUpdates = allocations.entrySet()
.stream()
.map(Map.Entry::getValue)
.min(Long::compareTo)
.orElse(UNASSIGNED_SEQ_NO);
final long minLocalCheckpointAfterUpdates = allocations.values().stream().min(Long::compareTo).orElse(UNASSIGNED_SEQ_NO);

// now insert an unknown active/insync id , the checkpoint shouldn't change but a refresh should be requested.
final AllocationId extraId = AllocationId.newInitializing();
Expand Down Expand Up @@ -1392,14 +1386,13 @@ public void testUpdateFromClusterManagerWithRemoteTranslogEnabled() {
assertEquals(tracker.getReplicationGroup().getReplicationTargets().size(), active.size());
initializing.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED, false));
assertEquals(tracker.getReplicationGroup().getReplicationTargets().size(), active.size() + initializing.size());
// assertEquals(
// tracker.getReplicationGroup()
// .getReplicationTargets()
// .stream()
// .filter(s -> s.getReplicationMode() == ReplicationTracker.ReplicationMode.FULL_REPLICATION)
// .count(),
// 1
// );
Set<AllocationId> replicationTargets = tracker.getReplicationGroup()
.getReplicationTargets()
.stream()
.map(ShardRouting::allocationId)
.collect(Collectors.toSet());
assertTrue(replicationTargets.containsAll(initializing));
assertTrue(replicationTargets.containsAll(active));
allocations.keySet().forEach(aId -> updateLocalCheckpoint(tracker, aId.getId(), allocations.get(aId)));

assertEquals(tracker.getGlobalCheckpoint(), primaryLocalCheckpoint);
Expand Down

0 comments on commit 7b0ce5c

Please sign in to comment.