Skip to content

Commit

Permalink
Introduce a History UUID as a requirement for ops based recovery (#26577
Browse files Browse the repository at this point in the history
)

The new ops based recovery, introduce as part of  #10708, is based on the assumption that all operations below the global checkpoint known to the replica do not need to be synced with the primary. This is based on the guarantee that all ops below it are available on primary and they are equal. Under normal operations this guarantee holds. Sadly, it can be violated when a primary is restored from an old snapshot. At the point the restore primary can miss operations below the replica's global checkpoint, or even worse may have total different operations at the same spot. This PR introduces the notion of a history uuid to be able to capture the difference with the restored primary (in a follow up PR).

The History UUID is generated by a primary when it is first created and is synced to the replicas which are recovered via a file based recovery. The PR adds a requirement to ops based recovery to make sure that the history uuid of the source and the target are equal. Under normal operations, all shard copies will stay with that history uuid for the rest of the index lifetime and thus this is a noop. However, it gives us a place to guarantee we fall back to file base syncing in special events like a restore from snapshot (to be done as a follow up) and when someone calls the truncate translog command which can go wrong when combined with primary recovery (this is done in this PR).

We considered in the past to use the translog uuid for this function (i.e., sync it across copies) and thus avoid adding an extra identifier. This idea was rejected as it removes the ability to verify that a specific translog really belongs to a specific lucene index. We also feel that having a history uuid will serve us well in the future.
  • Loading branch information
bleskes authored Sep 14, 2017
1 parent e69c39a commit 1ca0b5e
Show file tree
Hide file tree
Showing 21 changed files with 385 additions and 156 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/
allprojects {
ext.bwc_tests_enabled = true
ext.bwc_tests_enabled = false
}

task verifyBwcTestsEnabled {
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
public abstract class Engine implements Closeable {

public static final String SYNC_COMMIT_ID = "sync_id";
public static final String HISTORY_UUID_KEY = "history_uuid";

protected final ShardId shardId;
protected final String allocationId;
Expand Down Expand Up @@ -183,6 +184,9 @@ public MergeStats getMergeStats() {
return new MergeStats();
}

/** returns the history uuid for the engine */
public abstract String getHistoryUUID();

/**
* A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
Expand Down Expand Up @@ -142,6 +143,8 @@ public class InternalEngine extends Engine {
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();

@Nullable
private final String historyUUID;

public InternalEngine(EngineConfig engineConfig) throws EngineException {
super(engineConfig);
Expand Down Expand Up @@ -174,15 +177,23 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
writer = createWriter(false);
String existingHistoryUUID = loadHistoryUUIDFromCommit(writer);
if (existingHistoryUUID == null) {
historyUUID = UUIDs.randomBase64UUID();
} else {
historyUUID = existingHistoryUUID;
}
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
break;
case OPEN_INDEX_CREATE_TRANSLOG:
writer = createWriter(false);
historyUUID = loadHistoryUUIDFromCommit(writer);
seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
break;
case CREATE_INDEX_AND_TRANSLOG:
writer = createWriter(true);
historyUUID = UUIDs.randomBase64UUID();
seqNoStats = new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
Expand Down Expand Up @@ -342,6 +353,12 @@ private void recoverFromTranslogInternal() throws IOException {
flush(true, true);
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
refreshLastCommittedSegmentInfos();
} else if (lastCommittedSegmentInfos.getUserData().containsKey(HISTORY_UUID_KEY) == false) {
assert historyUUID != null;
// put the history uuid into the index
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
refreshLastCommittedSegmentInfos();
}
// clean up what's not needed
translog.trimUnreferencedReaders();
Expand Down Expand Up @@ -382,6 +399,11 @@ public Translog getTranslog() {
return translog;
}

@Override
public String getHistoryUUID() {
return historyUUID;
}

/**
* Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current
* translog id into lucene and returns null.
Expand All @@ -401,6 +423,19 @@ private String loadTranslogUUIDFromCommit(IndexWriter writer) throws IOException
}
}

/**
* Reads the current stored history ID from the IW commit data. If the id is not found, returns null.
*/
@Nullable
private String loadHistoryUUIDFromCommit(final IndexWriter writer) throws IOException {
String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY);
if (uuid == null) {
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) :
"index was created after 6_0_0_rc1 but has no history uuid";
}
return uuid;
}

private SearcherManager createSearcherManager() throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
Expand Down Expand Up @@ -1312,30 +1347,8 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
}
/*
* we have to inc-ref the store here since if the engine is closed by a tragic event
* we don't acquire the write lock and wait until we have exclusive access. This might also
* dec the store reference which can essentially close the store and unless we can inc the reference
* we can't use it.
*/
store.incRef();
try {
// reread the last committed segment infos
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
} catch (Exception e) {
if (isClosed.get() == false) {
try {
logger.warn("failed to read latest segment infos on flush", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
if (Lucene.isCorruptionException(e)) {
throw new FlushFailedEngineException(shardId, e);
}
}
} finally {
store.decRef();
}
refreshLastCommittedSegmentInfos();

}
newCommitId = lastCommittedSegmentInfos.getId();
} catch (FlushFailedEngineException ex) {
Expand All @@ -1353,6 +1366,33 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
return new CommitId(newCommitId);
}

private void refreshLastCommittedSegmentInfos() {
/*
* we have to inc-ref the store here since if the engine is closed by a tragic event
* we don't acquire the write lock and wait until we have exclusive access. This might also
* dec the store reference which can essentially close the store and unless we can inc the reference
* we can't use it.
*/
store.incRef();
try {
// reread the last committed segment infos
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
} catch (Exception e) {
if (isClosed.get() == false) {
try {
logger.warn("failed to read latest segment infos on flush", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
if (Lucene.isCorruptionException(e)) {
throw new FlushFailedEngineException(shardId, e);
}
}
} finally {
store.decRef();
}
}

@Override
public void rollTranslogGeneration() throws EngineException {
try (ReleasableLock ignored = readLock.acquire()) {
Expand Down Expand Up @@ -1874,7 +1914,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(5);
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
Expand All @@ -1883,6 +1923,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
if (historyUUID != null) {
commitData.put(HISTORY_UUID_KEY, historyUUID);
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});
Expand Down Expand Up @@ -1992,7 +2035,7 @@ public boolean isRecovering() {
* Gets the commit data from {@link IndexWriter} as a map.
*/
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) {
Map<String, String> commitData = new HashMap<>(5);
Map<String, String> commitData = new HashMap<>(6);
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
commitData.put(entry.getKey(), entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,10 @@ public Translog getTranslog() {
return getEngine().getTranslog();
}

public String getHistoryUUID() {
return getEngine().getHistoryUUID();
}

public IndexEventListener getIndexEventListener() {
return indexEventListener;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.mapper.MapperService;
Expand Down Expand Up @@ -162,10 +164,11 @@ void addIndices(
* document-level semantics.
*/
writer.setLiveCommitData(() -> {
final HashMap<String, String> liveCommitData = new HashMap<>(2);
final HashMap<String, String> liveCommitData = new HashMap<>(4);
liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp));
liveCommitData.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
return liveCommitData.entrySet().iterator();
});
writer.commit();
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;

import java.io.Closeable;
import java.io.EOFException;
Expand Down Expand Up @@ -1027,6 +1028,20 @@ public Map<String, String> getCommitUserData() {
return commitUserData;
}

/**
* returns the history uuid the store points at, or null if not existant.
*/
public String getHistoryUUID() {
return commitUserData.get(Engine.HISTORY_UUID_KEY);
}

/**
* returns the translog uuid the store points at
*/
public String getTranslogUUID() {
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
}

/**
* Returns true iff this metadata contains the given file.
*/
Expand Down
Loading

0 comments on commit 1ca0b5e

Please sign in to comment.