From dc278705527c3c332c6d426ae3a7efe5ac5d0465 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 3 Nov 2022 23:24:51 +0530 Subject: [PATCH] Dedicated call for primary term validation Signed-off-by: Ashish Singh --- .../PendingReplicationActions.java | 7 +- .../replication/ReplicationOperation.java | 8 ++- .../org/opensearch/index/IndexService.java | 2 +- .../org/opensearch/index/IndexSettings.java | 3 +- .../index/seqno/ReplicationTracker.java | 65 +++++++++++++++---- .../opensearch/index/shard/IndexShard.java | 10 +++ .../index/shard/ReplicationGroup.java | 36 +++++++++- .../recovery/RecoverySourceHandler.java | 2 +- ...moteStoreReplicaRecoverySourceHandler.java | 32 ++++++++- 9 files changed, 145 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/support/replication/PendingReplicationActions.java b/server/src/main/java/org/opensearch/action/support/replication/PendingReplicationActions.java index 7087b64758888..6a11835ea4f1f 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/PendingReplicationActions.java +++ b/server/src/main/java/org/opensearch/action/support/replication/PendingReplicationActions.java @@ -43,6 +43,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.function.Consumer; @@ -99,7 +100,11 @@ public void accept(ReplicationGroup replicationGroup) { if (isNewerVersion(replicationGroup)) { synchronized (this) { if (isNewerVersion(replicationGroup)) { - acceptNewTrackedAllocationIds(replicationGroup.getTrackedAllocationIds()); + // TODO - Rename the below method to account for trackingPrimaryTermAllocationIds as well + Set newTrackedAllocationIds = new HashSet<>(); + newTrackedAllocationIds.addAll(replicationGroup.getTrackedAllocationIds()); + newTrackedAllocationIds.addAll(replicationGroup.getTrackPrimaryTermAllocationIds()); + acceptNewTrackedAllocationIds(newTrackedAllocationIds); replicationGroupVersion = replicationGroup.getVersion(); } } diff --git a/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java index da37eee88a4e0..18a0b87f83390 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java @@ -53,6 +53,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ReplicationGroup; import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; import org.opensearch.node.NodeClosedException; import org.opensearch.rest.RestStatus; import org.opensearch.threadpool.ThreadPool; @@ -226,7 +227,12 @@ private void performOnReplicas( final ShardRouting primaryRouting = primary.routingEntry(); - for (final ShardRouting shard : replicationGroup.getReplicationTargets()) { + List replicationTargets = new ArrayList<>(replicationGroup.getReplicationTargets()); + if (replicaRequest instanceof PublishCheckpointRequest) { + replicationTargets.addAll(replicationGroup.getPrimaryTermTargets()); + } + + for (final ShardRouting shard : replicationTargets) { if (shard.isSameAllocation(primaryRouting) == false) { performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, pendingReplicationActions); } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 92f957633db84..153d78f1cafab 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -549,7 +549,7 @@ public synchronized IndexShard createShard( retentionLeaseSyncer, circuitBreakerService, // TODO Replace with remote translog factory in the follow up PR - this.indexSettings.isRemoteTranslogStoreEnabled() ? null : new InternalTranslogFactory(), + new InternalTranslogFactory(), this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, remoteStore ); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 7648f0a192ce7..41235bc7fe063 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -1001,7 +1001,8 @@ public boolean isRemoteStoreEnabled() { * Returns if remote translog store is enabled for this index. */ public boolean isRemoteTranslogStoreEnabled() { - return isRemoteTranslogStoreEnabled; + // return isRemoteTranslogStoreEnabled; + return isSegRepEnabled(); } /** diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 0da1687f20769..a4b4214a2289a 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -689,11 +689,18 @@ public static class CheckpointState implements Writeable { */ boolean tracked; - public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked) { + /** + * Whether this shard is used for primary term validation i.e. this shard is hit by the primary for validation + * of primary term. + */ + boolean trackPrimaryTerm; + + public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked, boolean trackPrimaryTerm) { this.localCheckpoint = localCheckpoint; this.globalCheckpoint = globalCheckpoint; this.inSync = inSync; this.tracked = tracked; + this.trackPrimaryTerm = trackPrimaryTerm; } public CheckpointState(StreamInput in) throws IOException { @@ -701,6 +708,7 @@ public CheckpointState(StreamInput in) throws IOException { this.globalCheckpoint = in.readZLong(); this.inSync = in.readBoolean(); this.tracked = in.readBoolean(); + this.trackPrimaryTerm = in.readBoolean(); } @Override @@ -709,13 +717,14 @@ public void writeTo(StreamOutput out) throws IOException { out.writeZLong(globalCheckpoint); out.writeBoolean(inSync); out.writeBoolean(tracked); + out.writeBoolean(trackPrimaryTerm); } /** * Returns a full copy of this object */ public CheckpointState copy() { - return new CheckpointState(localCheckpoint, globalCheckpoint, inSync, tracked); + return new CheckpointState(localCheckpoint, globalCheckpoint, inSync, tracked, trackPrimaryTerm); } public long getLocalCheckpoint() { @@ -737,6 +746,8 @@ public String toString() { + inSync + ", tracked=" + tracked + + ", trackPrimaryTerm=" + + trackPrimaryTerm + '}'; } @@ -750,7 +761,8 @@ public boolean equals(Object o) { if (localCheckpoint != that.localCheckpoint) return false; if (globalCheckpoint != that.globalCheckpoint) return false; if (inSync != that.inSync) return false; - return tracked == that.tracked; + if (tracked != that.tracked) return false; + return trackPrimaryTerm == that.trackPrimaryTerm; } @Override @@ -759,6 +771,7 @@ public int hashCode() { result = 31 * result + Long.hashCode(globalCheckpoint); result = 31 * result + Boolean.hashCode(inSync); result = 31 * result + Boolean.hashCode(tracked); + result = 31 * result + Boolean.hashCode(trackPrimaryTerm); return result; } } @@ -902,7 +915,7 @@ private boolean invariant() { if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) { // all tracked shard copies have a corresponding peer-recovery retention lease for (final ShardRouting shardRouting : routingTable.assignedShards()) { - if (checkpoints.get(shardRouting.allocationId().getId()).tracked && !indexSettings().isRemoteTranslogStoreEnabled()) { + if (checkpoints.get(shardRouting.allocationId().getId()).tracked) { assert retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) : "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases; assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals( @@ -1032,6 +1045,7 @@ private ReplicationGroup calculateReplicationGroup() { routingTable, checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()), checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).map(Map.Entry::getKey).collect(Collectors.toSet()), + checkpoints.entrySet().stream().filter(e -> e.getValue().trackPrimaryTerm).map(Map.Entry::getKey).collect(Collectors.toSet()), newVersion ); } @@ -1181,6 +1195,7 @@ public synchronized void updateFromClusterManager( ) { assert invariant(); if (applyingClusterStateVersion > appliedClusterStateVersion) { + boolean remoteTranslogEnabled = indexSettings().isRemoteTranslogStoreEnabled(); // check that the cluster-manager does not fabricate new in-sync entries out of thin air once we are in primary mode assert !primaryMode || inSyncAllocationIds.stream().allMatch(inSyncId -> checkpoints.containsKey(inSyncId) && checkpoints.get(inSyncId).inSync) @@ -1201,13 +1216,16 @@ public synchronized void updateFromClusterManager( // add new initializingIds that are missing locally. These are fresh shard copies - and not in-sync for (String initializingId : initializingAllocationIds) { if (checkpoints.containsKey(initializingId) == false) { - final boolean inSync = inSyncAllocationIds.contains(initializingId); - assert inSync == false : "update from cluster-manager in primary mode has " + final boolean inSync = !remoteTranslogEnabled && inSyncAllocationIds.contains(initializingId); + final boolean trackPrimaryTerm = remoteTranslogEnabled && inSyncAllocationIds.contains(initializingId); + assert inSync == false && trackPrimaryTerm == false : "update from cluster-manager in primary mode has " + initializingId + " as in-sync but it does not exist locally"; final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; - final long globalCheckpoint = localCheckpoint; - checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync)); + checkpoints.put( + initializingId, + new CheckpointState(localCheckpoint, localCheckpoint, inSync, inSync, trackPrimaryTerm) + ); } } if (removedEntries) { @@ -1216,13 +1234,22 @@ public synchronized void updateFromClusterManager( } else { for (String initializingId : initializingAllocationIds) { final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; - final long globalCheckpoint = localCheckpoint; - checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false, false)); + checkpoints.put(initializingId, new CheckpointState(localCheckpoint, localCheckpoint, false, false, false)); } for (String inSyncId : inSyncAllocationIds) { final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; - final long globalCheckpoint = localCheckpoint; - checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true, true)); + boolean inSync = true; + // If Remote Translog is not enabled, then we default to inSync behavior. + if (remoteTranslogEnabled) { + ShardRouting primaryShard = routingTable.primaryShard(); + // If Remote Translog is enabled, then the primary shard and it's target relocating shard (if present) + // should be marked as inSync. If inSync, then replication calls would work which would ensure + // primary term validation itself. If not inSync, then the primary term validation call has to happen. + inSync = inSyncId.equals(primaryShard.allocationId().getId()) + || (primaryShard.getTargetRelocatingShard() != null + && inSyncId.equals(primaryShard.getTargetRelocatingShard().allocationId().getId())); + } + checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, localCheckpoint, inSync, inSync, !inSync)); } } appliedClusterStateVersion = applyingClusterStateVersion; @@ -1273,6 +1300,20 @@ public synchronized void initiateTracking(final String allocationId) { assert invariant(); } + public synchronized void initiateTrackingPrimaryTerm(final String allocationId) { + assert invariant(); + assert primaryMode; + assert handoffInProgress == false; + CheckpointState cps = checkpoints.get(allocationId); + if (cps == null) { + // can happen if replica was removed from cluster but recovery process is unaware of it yet + throw new IllegalStateException("no local checkpoint tracking information available"); + } + cps.trackPrimaryTerm = true; + updateReplicationGroupAndNotify(); + assert invariant(); + } + /** * Marks the shard with the provided allocation ID as in-sync with the primary shard. This method will block until the local checkpoint * on the specified shard advances above the current global checkpoint. diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 3a3c4b19a02f6..6b956a60ae5ba 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2760,6 +2760,16 @@ public void initiateTracking(final String allocationId) { replicationTracker.initiateTracking(allocationId); } + /** + * TODO - Add JavaDoc + * + * @param allocationId the allocation ID of the shard for which recovery was initiated + */ + public void initiateTrackingPrimaryTerm(final String allocationId) { + assert assertPrimaryMode(); + replicationTracker.initiateTrackingPrimaryTerm(allocationId); + } + /** * Marks the shard with the provided allocation ID as in-sync with the primary shard. See * {@link ReplicationTracker#markAllocationIdAsInSync(String, long)} diff --git a/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java b/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java index 6d19e9f500411..8e73386f6a544 100644 --- a/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java +++ b/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java @@ -37,6 +37,7 @@ import org.opensearch.common.util.set.Sets; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -49,10 +50,12 @@ public class ReplicationGroup { private final IndexShardRoutingTable routingTable; private final Set inSyncAllocationIds; private final Set trackedAllocationIds; + private final Set trackPrimaryTermAllocationIds; private final long version; private final Set unavailableInSyncShards; // derived from the other fields private final List replicationTargets; // derived from the other fields + private final List primaryTermTargets; // derived from the other fields private final List skippedShards; // derived from the other fields public ReplicationGroup( @@ -60,14 +63,26 @@ public ReplicationGroup( Set inSyncAllocationIds, Set trackedAllocationIds, long version + ) { + this(routingTable, inSyncAllocationIds, trackedAllocationIds, Collections.emptySet(), version); + } + + public ReplicationGroup( + IndexShardRoutingTable routingTable, + Set inSyncAllocationIds, + Set trackedAllocationIds, + Set trackPrimaryTermAllocationIds, + long version ) { this.routingTable = routingTable; this.inSyncAllocationIds = inSyncAllocationIds; this.trackedAllocationIds = trackedAllocationIds; + this.trackPrimaryTermAllocationIds = trackPrimaryTermAllocationIds; this.version = version; this.unavailableInSyncShards = Sets.difference(inSyncAllocationIds, routingTable.getAllAllocationIds()); this.replicationTargets = new ArrayList<>(); + this.primaryTermTargets = new ArrayList<>(); this.skippedShards = new ArrayList<>(); for (final ShardRouting shard : routingTable) { if (shard.unassigned()) { @@ -76,6 +91,8 @@ public ReplicationGroup( } else { if (trackedAllocationIds.contains(shard.allocationId().getId())) { replicationTargets.add(shard); + } else if (trackPrimaryTermAllocationIds.contains(shard.allocationId().getId())) { + primaryTermTargets.add(shard); } else { assert inSyncAllocationIds.contains(shard.allocationId().getId()) == false : "in-sync shard copy but not tracked: " + shard; @@ -85,6 +102,8 @@ public ReplicationGroup( ShardRouting relocationTarget = shard.getTargetRelocatingShard(); if (trackedAllocationIds.contains(relocationTarget.allocationId().getId())) { replicationTargets.add(relocationTarget); + } else if (trackPrimaryTermAllocationIds.contains(relocationTarget.allocationId().getId())) { + primaryTermTargets.add(relocationTarget); } else { skippedShards.add(relocationTarget); assert inSyncAllocationIds.contains(relocationTarget.allocationId().getId()) == false @@ -111,6 +130,10 @@ public Set getTrackedAllocationIds() { return trackedAllocationIds; } + public Set getTrackPrimaryTermAllocationIds() { + return trackPrimaryTermAllocationIds; + } + /** * Returns the set of shard allocation ids that are in the in-sync set but have no assigned routing entry */ @@ -125,6 +148,13 @@ public List getReplicationTargets() { return replicationTargets; } + /** + * Returns the subset of shards in the routing table where the primary term check must happen. Includes relocation targets. + */ + public List getPrimaryTermTargets() { + return primaryTermTargets; + } + /** * Returns the subset of shards in the routing table that are unassigned or initializing and not ready yet to receive operations * (i.e. engine not opened yet). Includes relocation targets. @@ -142,7 +172,8 @@ public boolean equals(Object o) { if (!routingTable.equals(that.routingTable)) return false; if (!inSyncAllocationIds.equals(that.inSyncAllocationIds)) return false; - return trackedAllocationIds.equals(that.trackedAllocationIds); + if (!trackedAllocationIds.equals(that.trackedAllocationIds)) return false; + return trackPrimaryTermAllocationIds.equals(that.trackPrimaryTermAllocationIds); } @Override @@ -150,6 +181,7 @@ public int hashCode() { int result = routingTable.hashCode(); result = 31 * result + inSyncAllocationIds.hashCode(); result = 31 * result + trackedAllocationIds.hashCode(); + result = 31 * result + trackPrimaryTermAllocationIds.hashCode(); return result; } @@ -162,6 +194,8 @@ public String toString() { + inSyncAllocationIds + ", trackedAllocationIds=" + trackedAllocationIds + + ", trackPrimaryTermAllocationIds=" + + trackPrimaryTermAllocationIds + '}'; } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 7acb7cfe72060..a15a67b3c5c3c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -111,7 +111,7 @@ public abstract class RecoverySourceHandler { // Request containing source and target node information protected final StartRecoveryRequest request; private final int chunkSizeInBytes; - private final RecoveryTargetHandler recoveryTarget; + protected final RecoveryTargetHandler recoveryTarget; private final int maxConcurrentOperations; private final ThreadPool threadPool; protected final CancellableThreads cancellableThreads = new CancellableThreads(); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteStoreReplicaRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteStoreReplicaRecoverySourceHandler.java index 6477c16dd92e4..dc5f199e63413 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteStoreReplicaRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteStoreReplicaRecoverySourceHandler.java @@ -11,12 +11,15 @@ import org.apache.lucene.index.IndexCommit; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; +import org.opensearch.common.StopWatch; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.engine.RecoveryEngineException; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardClosedException; +import org.opensearch.index.shard.IndexShardState; import org.opensearch.indices.RunUnderPrimaryPermit; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transports; @@ -88,9 +91,15 @@ protected void innerRecoveryToTarget(ActionListener listener, prepareEngineStep.whenComplete(prepareEngineTime -> { assert Transports.assertNotTransportThread(this + "[phase2]"); + /* + * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. + * This means that any document indexed into the primary after this will be replicated to this replica as well + * make sure to do this before sampling the max sequence number in the next step, to ensure that we send + * all documents up to maxSeqNo in phase2. + */ RunUnderPrimaryPermit.run( - () -> shard.initiateTracking(request.targetAllocationId()), - shardId + " initiating tracking of " + request.targetAllocationId(), + () -> shard.initiateTrackingPrimaryTerm(request.targetAllocationId()), + shardId + " initiating tracking primary term of " + request.targetAllocationId(), shard, cancellableThreads, logger @@ -101,4 +110,23 @@ protected void innerRecoveryToTarget(ActionListener listener, finalizeStepAndCompleteFuture(startingSeqNo, sendSnapshotStep, sendFileStep, prepareEngineStep, onFailure); } + + void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionListener listener) throws IOException { + if (shard.state() == IndexShardState.CLOSED) { + throw new IndexShardClosedException(request.shardId()); + } + cancellableThreads.checkForCancel(); + StopWatch stopWatch = new StopWatch().start(); + logger.trace("finalizing recovery"); + + final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); // this global checkpoint is persisted in finalizeRecovery + final StepListener finalizeListener = new StepListener<>(); + cancellableThreads.checkForCancel(); + recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener); + finalizeListener.whenComplete(r -> { + stopWatch.stop(); + logger.trace("finalizing recovery took [{}]", stopWatch.totalTime()); + listener.onResponse(null); + }, listener::onFailure); + } }