Skip to content

Commit

Permalink
Dedicated call for primary term validation
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Nov 3, 2022
1 parent 77d0d3d commit dc27870
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
Expand Down Expand Up @@ -99,7 +100,11 @@ public void accept(ReplicationGroup replicationGroup) {
if (isNewerVersion(replicationGroup)) {
synchronized (this) {
if (isNewerVersion(replicationGroup)) {
acceptNewTrackedAllocationIds(replicationGroup.getTrackedAllocationIds());
// TODO - Rename the below method to account for trackingPrimaryTermAllocationIds as well
Set<String> newTrackedAllocationIds = new HashSet<>();
newTrackedAllocationIds.addAll(replicationGroup.getTrackedAllocationIds());
newTrackedAllocationIds.addAll(replicationGroup.getTrackPrimaryTermAllocationIds());
acceptNewTrackedAllocationIds(newTrackedAllocationIds);
replicationGroupVersion = replicationGroup.getVersion();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.ReplicationGroup;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
import org.opensearch.node.NodeClosedException;
import org.opensearch.rest.RestStatus;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -226,7 +227,12 @@ private void performOnReplicas(

final ShardRouting primaryRouting = primary.routingEntry();

for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
List<ShardRouting> replicationTargets = new ArrayList<>(replicationGroup.getReplicationTargets());
if (replicaRequest instanceof PublishCheckpointRequest) {
replicationTargets.addAll(replicationGroup.getPrimaryTermTargets());
}

for (final ShardRouting shard : replicationTargets) {
if (shard.isSameAllocation(primaryRouting) == false) {
performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, pendingReplicationActions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ public synchronized IndexShard createShard(
retentionLeaseSyncer,
circuitBreakerService,
// TODO Replace with remote translog factory in the follow up PR
this.indexSettings.isRemoteTranslogStoreEnabled() ? null : new InternalTranslogFactory(),
new InternalTranslogFactory(),
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
);
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,8 @@ public boolean isRemoteStoreEnabled() {
* Returns if remote translog store is enabled for this index.
*/
public boolean isRemoteTranslogStoreEnabled() {
return isRemoteTranslogStoreEnabled;
// return isRemoteTranslogStoreEnabled;
return isSegRepEnabled();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,18 +689,26 @@ public static class CheckpointState implements Writeable {
*/
boolean tracked;

public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked) {
/**
* Whether this shard is used for primary term validation i.e. this shard is hit by the primary for validation
* of primary term.
*/
boolean trackPrimaryTerm;

public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked, boolean trackPrimaryTerm) {
this.localCheckpoint = localCheckpoint;
this.globalCheckpoint = globalCheckpoint;
this.inSync = inSync;
this.tracked = tracked;
this.trackPrimaryTerm = trackPrimaryTerm;
}

public CheckpointState(StreamInput in) throws IOException {
this.localCheckpoint = in.readZLong();
this.globalCheckpoint = in.readZLong();
this.inSync = in.readBoolean();
this.tracked = in.readBoolean();
this.trackPrimaryTerm = in.readBoolean();
}

@Override
Expand All @@ -709,13 +717,14 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeZLong(globalCheckpoint);
out.writeBoolean(inSync);
out.writeBoolean(tracked);
out.writeBoolean(trackPrimaryTerm);
}

/**
* Returns a full copy of this object
*/
public CheckpointState copy() {
return new CheckpointState(localCheckpoint, globalCheckpoint, inSync, tracked);
return new CheckpointState(localCheckpoint, globalCheckpoint, inSync, tracked, trackPrimaryTerm);
}

public long getLocalCheckpoint() {
Expand All @@ -737,6 +746,8 @@ public String toString() {
+ inSync
+ ", tracked="
+ tracked
+ ", trackPrimaryTerm="
+ trackPrimaryTerm
+ '}';
}

Expand All @@ -750,7 +761,8 @@ public boolean equals(Object o) {
if (localCheckpoint != that.localCheckpoint) return false;
if (globalCheckpoint != that.globalCheckpoint) return false;
if (inSync != that.inSync) return false;
return tracked == that.tracked;
if (tracked != that.tracked) return false;
return trackPrimaryTerm == that.trackPrimaryTerm;
}

@Override
Expand All @@ -759,6 +771,7 @@ public int hashCode() {
result = 31 * result + Long.hashCode(globalCheckpoint);
result = 31 * result + Boolean.hashCode(inSync);
result = 31 * result + Boolean.hashCode(tracked);
result = 31 * result + Boolean.hashCode(trackPrimaryTerm);
return result;
}
}
Expand Down Expand Up @@ -902,7 +915,7 @@ private boolean invariant() {
if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) {
// all tracked shard copies have a corresponding peer-recovery retention lease
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
if (checkpoints.get(shardRouting.allocationId().getId()).tracked && !indexSettings().isRemoteTranslogStoreEnabled()) {
if (checkpoints.get(shardRouting.allocationId().getId()).tracked) {
assert retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
: "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases;
assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(
Expand Down Expand Up @@ -1032,6 +1045,7 @@ private ReplicationGroup calculateReplicationGroup() {
routingTable,
checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()),
checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).map(Map.Entry::getKey).collect(Collectors.toSet()),
checkpoints.entrySet().stream().filter(e -> e.getValue().trackPrimaryTerm).map(Map.Entry::getKey).collect(Collectors.toSet()),
newVersion
);
}
Expand Down Expand Up @@ -1181,6 +1195,7 @@ public synchronized void updateFromClusterManager(
) {
assert invariant();
if (applyingClusterStateVersion > appliedClusterStateVersion) {
boolean remoteTranslogEnabled = indexSettings().isRemoteTranslogStoreEnabled();
// check that the cluster-manager does not fabricate new in-sync entries out of thin air once we are in primary mode
assert !primaryMode
|| inSyncAllocationIds.stream().allMatch(inSyncId -> checkpoints.containsKey(inSyncId) && checkpoints.get(inSyncId).inSync)
Expand All @@ -1201,13 +1216,16 @@ public synchronized void updateFromClusterManager(
// add new initializingIds that are missing locally. These are fresh shard copies - and not in-sync
for (String initializingId : initializingAllocationIds) {
if (checkpoints.containsKey(initializingId) == false) {
final boolean inSync = inSyncAllocationIds.contains(initializingId);
assert inSync == false : "update from cluster-manager in primary mode has "
final boolean inSync = !remoteTranslogEnabled && inSyncAllocationIds.contains(initializingId);
final boolean trackPrimaryTerm = remoteTranslogEnabled && inSyncAllocationIds.contains(initializingId);
assert inSync == false && trackPrimaryTerm == false : "update from cluster-manager in primary mode has "
+ initializingId
+ " as in-sync but it does not exist locally";
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync));
checkpoints.put(
initializingId,
new CheckpointState(localCheckpoint, localCheckpoint, inSync, inSync, trackPrimaryTerm)
);
}
}
if (removedEntries) {
Expand All @@ -1216,13 +1234,22 @@ public synchronized void updateFromClusterManager(
} else {
for (String initializingId : initializingAllocationIds) {
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false, false));
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, localCheckpoint, false, false, false));
}
for (String inSyncId : inSyncAllocationIds) {
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true, true));
boolean inSync = true;
// If Remote Translog is not enabled, then we default to inSync behavior.
if (remoteTranslogEnabled) {
ShardRouting primaryShard = routingTable.primaryShard();
// If Remote Translog is enabled, then the primary shard and it's target relocating shard (if present)
// should be marked as inSync. If inSync, then replication calls would work which would ensure
// primary term validation itself. If not inSync, then the primary term validation call has to happen.
inSync = inSyncId.equals(primaryShard.allocationId().getId())
|| (primaryShard.getTargetRelocatingShard() != null
&& inSyncId.equals(primaryShard.getTargetRelocatingShard().allocationId().getId()));
}
checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, localCheckpoint, inSync, inSync, !inSync));
}
}
appliedClusterStateVersion = applyingClusterStateVersion;
Expand Down Expand Up @@ -1273,6 +1300,20 @@ public synchronized void initiateTracking(final String allocationId) {
assert invariant();
}

public synchronized void initiateTrackingPrimaryTerm(final String allocationId) {
assert invariant();
assert primaryMode;
assert handoffInProgress == false;
CheckpointState cps = checkpoints.get(allocationId);
if (cps == null) {
// can happen if replica was removed from cluster but recovery process is unaware of it yet
throw new IllegalStateException("no local checkpoint tracking information available");
}
cps.trackPrimaryTerm = true;
updateReplicationGroupAndNotify();
assert invariant();
}

/**
* Marks the shard with the provided allocation ID as in-sync with the primary shard. This method will block until the local checkpoint
* on the specified shard advances above the current global checkpoint.
Expand Down
10 changes: 10 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2760,6 +2760,16 @@ public void initiateTracking(final String allocationId) {
replicationTracker.initiateTracking(allocationId);
}

/**
* TODO - Add JavaDoc
*
* @param allocationId the allocation ID of the shard for which recovery was initiated
*/
public void initiateTrackingPrimaryTerm(final String allocationId) {
assert assertPrimaryMode();
replicationTracker.initiateTrackingPrimaryTerm(allocationId);
}

/**
* Marks the shard with the provided allocation ID as in-sync with the primary shard. See
* {@link ReplicationTracker#markAllocationIdAsInSync(String, long)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.common.util.set.Sets;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;

Expand All @@ -49,25 +50,39 @@ public class ReplicationGroup {
private final IndexShardRoutingTable routingTable;
private final Set<String> inSyncAllocationIds;
private final Set<String> trackedAllocationIds;
private final Set<String> trackPrimaryTermAllocationIds;
private final long version;

private final Set<String> unavailableInSyncShards; // derived from the other fields
private final List<ShardRouting> replicationTargets; // derived from the other fields
private final List<ShardRouting> primaryTermTargets; // derived from the other fields
private final List<ShardRouting> skippedShards; // derived from the other fields

public ReplicationGroup(
IndexShardRoutingTable routingTable,
Set<String> inSyncAllocationIds,
Set<String> trackedAllocationIds,
long version
) {
this(routingTable, inSyncAllocationIds, trackedAllocationIds, Collections.emptySet(), version);
}

public ReplicationGroup(
IndexShardRoutingTable routingTable,
Set<String> inSyncAllocationIds,
Set<String> trackedAllocationIds,
Set<String> trackPrimaryTermAllocationIds,
long version
) {
this.routingTable = routingTable;
this.inSyncAllocationIds = inSyncAllocationIds;
this.trackedAllocationIds = trackedAllocationIds;
this.trackPrimaryTermAllocationIds = trackPrimaryTermAllocationIds;
this.version = version;

this.unavailableInSyncShards = Sets.difference(inSyncAllocationIds, routingTable.getAllAllocationIds());
this.replicationTargets = new ArrayList<>();
this.primaryTermTargets = new ArrayList<>();
this.skippedShards = new ArrayList<>();
for (final ShardRouting shard : routingTable) {
if (shard.unassigned()) {
Expand All @@ -76,6 +91,8 @@ public ReplicationGroup(
} else {
if (trackedAllocationIds.contains(shard.allocationId().getId())) {
replicationTargets.add(shard);
} else if (trackPrimaryTermAllocationIds.contains(shard.allocationId().getId())) {
primaryTermTargets.add(shard);
} else {
assert inSyncAllocationIds.contains(shard.allocationId().getId()) == false : "in-sync shard copy but not tracked: "
+ shard;
Expand All @@ -85,6 +102,8 @@ public ReplicationGroup(
ShardRouting relocationTarget = shard.getTargetRelocatingShard();
if (trackedAllocationIds.contains(relocationTarget.allocationId().getId())) {
replicationTargets.add(relocationTarget);
} else if (trackPrimaryTermAllocationIds.contains(relocationTarget.allocationId().getId())) {
primaryTermTargets.add(relocationTarget);
} else {
skippedShards.add(relocationTarget);
assert inSyncAllocationIds.contains(relocationTarget.allocationId().getId()) == false
Expand All @@ -111,6 +130,10 @@ public Set<String> getTrackedAllocationIds() {
return trackedAllocationIds;
}

public Set<String> getTrackPrimaryTermAllocationIds() {
return trackPrimaryTermAllocationIds;
}

/**
* Returns the set of shard allocation ids that are in the in-sync set but have no assigned routing entry
*/
Expand All @@ -125,6 +148,13 @@ public List<ShardRouting> getReplicationTargets() {
return replicationTargets;
}

/**
* Returns the subset of shards in the routing table where the primary term check must happen. Includes relocation targets.
*/
public List<ShardRouting> getPrimaryTermTargets() {
return primaryTermTargets;
}

/**
* Returns the subset of shards in the routing table that are unassigned or initializing and not ready yet to receive operations
* (i.e. engine not opened yet). Includes relocation targets.
Expand All @@ -142,14 +172,16 @@ public boolean equals(Object o) {

if (!routingTable.equals(that.routingTable)) return false;
if (!inSyncAllocationIds.equals(that.inSyncAllocationIds)) return false;
return trackedAllocationIds.equals(that.trackedAllocationIds);
if (!trackedAllocationIds.equals(that.trackedAllocationIds)) return false;
return trackPrimaryTermAllocationIds.equals(that.trackPrimaryTermAllocationIds);
}

@Override
public int hashCode() {
int result = routingTable.hashCode();
result = 31 * result + inSyncAllocationIds.hashCode();
result = 31 * result + trackedAllocationIds.hashCode();
result = 31 * result + trackPrimaryTermAllocationIds.hashCode();
return result;
}

Expand All @@ -162,6 +194,8 @@ public String toString() {
+ inSyncAllocationIds
+ ", trackedAllocationIds="
+ trackedAllocationIds
+ ", trackPrimaryTermAllocationIds="
+ trackPrimaryTermAllocationIds
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public abstract class RecoverySourceHandler {
// Request containing source and target node information
protected final StartRecoveryRequest request;
private final int chunkSizeInBytes;
private final RecoveryTargetHandler recoveryTarget;
protected final RecoveryTargetHandler recoveryTarget;
private final int maxConcurrentOperations;
private final ThreadPool threadPool;
protected final CancellableThreads cancellableThreads = new CancellableThreads();
Expand Down
Loading

0 comments on commit dc27870

Please sign in to comment.