diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 4a614d8874aff..43e909afc93ee 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; @@ -92,6 +93,31 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ volatile boolean primaryMode; + /** + * The current operation primary term. Management of this value is done through {@link IndexShard} and must only be done when safe. See + * {@link #setOperationPrimaryTerm(long)}. + */ + private volatile long operationPrimaryTerm; + + /** + * Returns the current operation primary term. + * + * @return the primary term + */ + public long getOperationPrimaryTerm() { + return operationPrimaryTerm; + } + + /** + * Sets the current operation primary term. This method should be invoked only when no other operations are possible on the shard. That + * is, either from the constructor of {@link IndexShard} or while holding all permits on the {@link IndexShard} instance. + * + * @param operationPrimaryTerm the new operation primary term + */ + public void setOperationPrimaryTerm(final long operationPrimaryTerm) { + this.operationPrimaryTerm = operationPrimaryTerm; + } + /** * Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling {@link #startRelocationHandoff} * and is finished by either calling {@link #completeRelocationHandoff} or {@link #abortRelocationHandoff}, depending on whether the diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 772607903e61d..031499153fdc3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -199,7 +199,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm - private volatile long operationPrimaryTerm; protected final AtomicReference currentEngineReference = new AtomicReference<>(); final EngineFactory engineFactory; @@ -308,7 +307,7 @@ public IndexShard( final String aId = shardRouting.allocationId().getId(); this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger); - this.replicationTracker = + final ReplicationTracker replicationTracker = new ReplicationTracker( shardId, aId, @@ -317,6 +316,7 @@ public IndexShard( globalCheckpointListeners::globalCheckpointUpdated, threadPool::absoluteTimeInMillis, retentionLeaseSyncer); + this.replicationTracker = replicationTracker; // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis @@ -336,8 +336,9 @@ public boolean shouldCache(Query query) { } indexShardOperationPermits = new IndexShardOperationPermits(shardId, threadPool); searcherWrapper = indexSearcherWrapper; - pendingPrimaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); - operationPrimaryTerm = pendingPrimaryTerm; + final long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); + this.pendingPrimaryTerm = primaryTerm; + replicationTracker.setOperationPrimaryTerm(primaryTerm); refreshListeners = buildRefreshListeners(); lastSearcherAccess.set(threadPool.relativeTimeInMillis()); persistMetadata(path, indexSettings, shardRouting, null, logger); @@ -399,7 +400,7 @@ public long getPendingPrimaryTerm() { /** Returns the primary term that is currently being used to assign to operations */ public long getOperationPrimaryTerm() { - return this.operationPrimaryTerm; + return replicationTracker.getOperationPrimaryTerm(); } /** @@ -508,7 +509,7 @@ public void updateShardState(final ShardRouting newRouting, assert pendingPrimaryTerm == newPrimaryTerm : "shard term changed on primary. expected [" + newPrimaryTerm + "] but was [" + pendingPrimaryTerm + "]" + ", current routing: " + currentRouting + ", new routing: " + newRouting; - assert operationPrimaryTerm == newPrimaryTerm; + assert getOperationPrimaryTerm() == newPrimaryTerm; try { replicationTracker.activatePrimaryMode(getLocalCheckpoint()); /* @@ -704,14 +705,14 @@ public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType boolean isRetry) throws IOException { assert versionType.validateVersionForWrites(version); - return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, ifSeqNo, + return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, getOperationPrimaryTerm(), version, versionType, ifSeqNo, ifPrimaryTerm, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse); } public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse) throws IOException { - return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, null, UNASSIGNED_SEQ_NO, 0, + return applyIndexOperation(getEngine(), seqNo, getOperationPrimaryTerm(), version, null, UNASSIGNED_SEQ_NO, 0, autoGeneratedTimeStamp, isRetry, Engine.Operation.Origin.REPLICA, sourceToParse); } @@ -719,8 +720,8 @@ private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long o @Nullable VersionType versionType, long ifSeqNo, long ifPrimaryTerm, long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin, SourceToParse sourceToParse) throws IOException { - assert opPrimaryTerm <= this.operationPrimaryTerm: "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm - + "]"; + assert opPrimaryTerm <= getOperationPrimaryTerm() + : "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]"; ensureWriteAllowed(origin); Engine.Index operation; try { @@ -783,13 +784,13 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc } public Engine.NoOpResult markSeqNoAsNoop(long seqNo, String reason) throws IOException { - return markSeqNoAsNoop(getEngine(), seqNo, operationPrimaryTerm, reason, Engine.Operation.Origin.REPLICA); + return markSeqNoAsNoop(getEngine(), seqNo, getOperationPrimaryTerm(), reason, Engine.Operation.Origin.REPLICA); } private Engine.NoOpResult markSeqNoAsNoop(Engine engine, long seqNo, long opPrimaryTerm, String reason, Engine.Operation.Origin origin) throws IOException { - assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm - + "]"; + assert opPrimaryTerm <= getOperationPrimaryTerm() + : "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]"; long startTime = System.nanoTime(); ensureWriteAllowed(origin); final Engine.NoOp noOp = new Engine.NoOp(seqNo, opPrimaryTerm, origin, startTime, reason); @@ -805,31 +806,31 @@ private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) { } public Engine.IndexResult getFailedIndexResult(Exception e, long version) { - return new Engine.IndexResult(e, version, operationPrimaryTerm); + return new Engine.IndexResult(e, version, getOperationPrimaryTerm()); } public Engine.DeleteResult getFailedDeleteResult(Exception e, long version) { - return new Engine.DeleteResult(e, version, operationPrimaryTerm); + return new Engine.DeleteResult(e, version, getOperationPrimaryTerm()); } public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType, long ifSeqNo, long ifPrimaryTerm) throws IOException { assert versionType.validateVersionForWrites(version); - return applyDeleteOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType, + return applyDeleteOperation(getEngine(), UNASSIGNED_SEQ_NO, getOperationPrimaryTerm(), version, type, id, versionType, ifSeqNo, ifPrimaryTerm, Engine.Operation.Origin.PRIMARY); } public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException { return applyDeleteOperation( - getEngine(), seqNo, operationPrimaryTerm, version, type, id, null, UNASSIGNED_SEQ_NO, 0, Engine.Operation.Origin.REPLICA); + getEngine(), seqNo, getOperationPrimaryTerm(), version, type, id, null, UNASSIGNED_SEQ_NO, 0, Engine.Operation.Origin.REPLICA); } private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, String type, String id, @Nullable VersionType versionType, long ifSeqNo, long ifPrimaryTerm, Engine.Operation.Origin origin) throws IOException { - assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm - + "]"; + assert opPrimaryTerm <= getOperationPrimaryTerm() + : "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]"; ensureWriteAllowed(origin); // When there is a single type, the unique identifier is only composed of the _id, // so there is no way to differentiate foo#1 from bar#1. This is especially an issue @@ -845,7 +846,7 @@ private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long return new Engine.DeleteResult(update); } } catch (MapperParsingException | IllegalArgumentException | TypeMissingException e) { - return new Engine.DeleteResult(e, version, operationPrimaryTerm, seqNo, false); + return new Engine.DeleteResult(e, version, getOperationPrimaryTerm(), seqNo, false); } if (mapperService.resolveDocumentType(type).equals(mapperService.documentMapper().type()) == false) { // We should never get there due to the fact that we generate mapping updates on deletes, @@ -1272,7 +1273,7 @@ public void prepareForIndexRecovery() { } public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { - getEngine().trimOperationsFromTranslog(operationPrimaryTerm, aboveSeqNo); + getEngine().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo); } /** @@ -2382,7 +2383,7 @@ private EngineConfig newEngineConfig() { Collections.singletonList(refreshListeners), Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), indexSort, circuitBreakerService, replicationTracker, replicationTracker::getRetentionLeases, - () -> operationPrimaryTerm, tombstoneDocSupplier()); + () -> getOperationPrimaryTerm(), tombstoneDocSupplier()); } /** @@ -2462,7 +2463,7 @@ private void bumpPrimaryTerm(final long newPrimaryTerm, @Nullable ActionListener combineWithAction) { assert Thread.holdsLock(mutex); assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null); - assert operationPrimaryTerm <= pendingPrimaryTerm; + assert getOperationPrimaryTerm() <= pendingPrimaryTerm; final CountDownLatch termUpdated = new CountDownLatch(1); asyncBlockOperations(new ActionListener() { @Override @@ -2488,12 +2489,12 @@ private void innerFail(final Exception e) { public void onResponse(final Releasable releasable) { final RunOnce releaseOnce = new RunOnce(releasable::close); try { - assert operationPrimaryTerm <= pendingPrimaryTerm; + assert getOperationPrimaryTerm() <= pendingPrimaryTerm; termUpdated.await(); // indexShardOperationPermits doesn't guarantee that async submissions are executed // in the order submitted. We need to guard against another term bump - if (operationPrimaryTerm < newPrimaryTerm) { - operationPrimaryTerm = newPrimaryTerm; + if (getOperationPrimaryTerm() < newPrimaryTerm) { + replicationTracker.setOperationPrimaryTerm(newPrimaryTerm); onBlocked.run(); } } catch (final Exception e) { @@ -2579,14 +2580,14 @@ private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm, final ActionListener operationListener = new ActionListener() { @Override public void onResponse(final Releasable releasable) { - if (opPrimaryTerm < operationPrimaryTerm) { + if (opPrimaryTerm < getOperationPrimaryTerm()) { releasable.close(); final String message = String.format( Locale.ROOT, "%s operation primary term [%d] is too old (current [%d])", shardId, opPrimaryTerm, - operationPrimaryTerm); + getOperationPrimaryTerm()); onPermitAcquired.onFailure(new IllegalStateException(message)); } else { assert assertReplicationTarget(); @@ -2647,7 +2648,7 @@ public void onFailure(final Exception e) { } private boolean requirePrimaryTermUpdate(final long opPrimaryTerm, final boolean allPermits) { - return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > operationPrimaryTerm); + return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > getOperationPrimaryTerm()); } public int getActiveOperationsCount() {