Skip to content

Commit

Permalink
Introduce translog no-op
Browse files Browse the repository at this point in the history
As the translog evolves towards a full operations log as part of the
sequence numbers push, there is a need for the translog to be able to
represent operations for which a sequence number was assigned, but the
operation did not mutate the index. Examples of how this can arise are
operations that fail after the sequence number is assigned, and gaps in
this history that arise when an operation is assigned a sequence number
but the operation never completed (e.g., a node crash). It is important
that these operations appear in the history so that they can be
replicated and replayed during recovery as otherwise the history will be
incomplete and local checkpoints will not be able to advance. This
commit introduces a no-op to the translog to set the stage for these
efforts.

Relates #22291
  • Loading branch information
jasontedor authored Dec 22, 2016
1 parent 91cb563 commit 7946396
Show file tree
Hide file tree
Showing 37 changed files with 501 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public void writeString(String str) throws IOException {
// make sure any possible char can fit into the buffer in any possible iteration
// we need at most 3 bytes so we flush the buffer once we have less than 3 bytes
// left before we start another iteration
if (offset > buffer.length-3) {
if (offset > buffer.length - 3) {
writeBytes(buffer, offset);
offset = 0;
}
Expand Down
65 changes: 63 additions & 2 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -296,6 +295,8 @@ public Condition newCondition() {
*/
public abstract DeleteResult delete(final Delete delete);

public abstract NoOpResult noOp(final NoOp noOp);

/**
* Base class for index and delete operation results
* Holds result meta data (e.g. translog location, updated version)
Expand Down Expand Up @@ -382,6 +383,7 @@ void freeze() {
}

public static class IndexResult extends Result {

private final boolean created;

public IndexResult(long version, long seqNo, boolean created) {
Expand All @@ -397,9 +399,11 @@ public IndexResult(Exception failure, long version, long seqNo) {
public boolean isCreated() {
return created;
}

}

public static class DeleteResult extends Result {

private final boolean found;

public DeleteResult(long version, long seqNo, boolean found) {
Expand All @@ -415,6 +419,19 @@ public DeleteResult(Exception failure, long version, long seqNo) {
public boolean isFound() {
return found;
}

}

static class NoOpResult extends Result {

NoOpResult(long seqNo) {
super(Operation.TYPE.NO_OP, 0, seqNo);
}

NoOpResult(long seqNo, Exception failure) {
super(Operation.TYPE.NO_OP, failure, 0, seqNo);
}

}

/**
Expand Down Expand Up @@ -910,7 +927,7 @@ public abstract static class Operation {

/** type of operation (index, delete), subclasses use static types */
public enum TYPE {
INDEX, DELETE;
INDEX, DELETE, NO_OP;

private final String lowercase;

Expand Down Expand Up @@ -1114,6 +1131,50 @@ TYPE operationType() {
public int estimatedSizeInBytes() {
return (uid().field().length() + uid().text().length()) * 2 + 20;
}

}

public static class NoOp extends Operation {

private final String reason;

public String reason() {
return reason;
}

public NoOp(
final Term uid,
final long seqNo,
final long primaryTerm,
final long version,
final VersionType versionType,
final Origin origin,
final long startTime,
final String reason) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
this.reason = reason;
}

@Override
public String type() {
throw new UnsupportedOperationException();
}

@Override
String id() {
throw new UnsupportedOperationException();
}

@Override
TYPE operationType() {
return TYPE.NO_OP;
}

@Override
public int estimatedSizeInBytes() {
return 2 * reason.length() + 2 * Long.BYTES;
}

}

public static class Get {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOExc
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
assert pendingTranslogRecovery.get(): "translogRecovery is not pending but should be";
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
Expand Down Expand Up @@ -375,7 +375,7 @@ private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog(
* specified global checkpoint.
*
* @param globalCheckpoint the global checkpoint to use
* @param indexWriter the index writer (for the Lucene commit point)
* @param indexWriter the index writer (for the Lucene commit point)
* @return the sequence number stats
*/
private static SeqNoStats loadSeqNoStatsFromLucene(final long globalCheckpoint, final IndexWriter indexWriter) {
Expand Down Expand Up @@ -434,7 +434,7 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws
if (get.versionType().isVersionConflictForReads(versionValue.version(), get.version())) {
Uid uid = Uid.createUid(get.uid().text());
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(),
get.versionType().explainConflictForReads(versionValue.version(), get.version()));
get.versionType().explainConflictForReads(versionValue.version(), get.version()));
}
refresh("realtime_get");
}
Expand Down Expand Up @@ -532,7 +532,7 @@ public IndexResult index(Index index) {
*
* @return failure if the failure is a document specific failure (e.g. analysis chain failure)
* or throws Exception if the failure caused the engine to fail (e.g. out of disk, lucene tragic event)
*
* <p>
* Note: pkg-private for testing
*/
final Exception checkIfDocumentFailureOrThrow(final Operation operation, final Exception failure) {
Expand Down Expand Up @@ -577,7 +577,7 @@ private boolean canOptimizeAddDocument(Index index) {
case PEER_RECOVERY:
case REPLICA:
assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL
: "version: " + index.version() + " type: " + index.versionType();
: "version: " + index.version() + " type: " + index.versionType();
return true;
case LOCAL_TRANSLOG_RECOVERY:
assert index.isRetry();
Expand All @@ -596,10 +596,10 @@ private boolean assertSequenceNumber(final Engine.Operation.Origin origin, final
" index version: " + engineConfig.getIndexSettings().getIndexVersionCreated() + ". seq no: " + seqNo;
} else if (origin == Operation.Origin.PRIMARY) {
// sequence number should not be set when operation origin is primary
assert seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO : "primary ops should never an assigned seq no. got: " + seqNo;
assert seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO : "primary ops should never have an assigned seq no. got: " + seqNo;
} else {
// sequence number should be set when operation origin is not primary
assert seqNo >= 0 : "replica ops should an assigned seq no. origin: " + origin +
assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no. origin: " + origin +
" index version: " + engineConfig.getIndexSettings().getIndexVersionCreated();
}
return true;
Expand Down Expand Up @@ -651,7 +651,7 @@ private IndexResult innerIndex(Index index) throws IOException {
if (deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp()) {
break;
}
} while(maxUnsafeAutoIdTimestamp.compareAndSet(deOptimizeTimestamp,
} while (maxUnsafeAutoIdTimestamp.compareAndSet(deOptimizeTimestamp,
index.getAutoGeneratedIdTimestamp()) == false);
assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
} else {
Expand Down Expand Up @@ -859,6 +859,34 @@ private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, Ve
return found;
}

@Override
public NoOpResult noOp(final NoOp noOp) {
NoOpResult noOpResult;
try (final ReleasableLock ignored = readLock.acquire()) {
noOpResult = innerNoOp(noOp);
} catch (final Exception e) {
noOpResult = new NoOpResult(noOp.seqNo(), e);
}
return noOpResult;
}

private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
assert noOp.seqNo() > SequenceNumbersService.NO_OPS_PERFORMED;
final long seqNo = noOp.seqNo();
try {
final NoOpResult noOpResult = new NoOpResult(noOp.seqNo());
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
noOpResult.setTook(System.nanoTime() - noOp.startTime());
noOpResult.freeze();
return noOpResult;
} finally {
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seqNoService().markSeqNoAsCompleted(seqNo);
}
}
}

@Override
public void refresh(String source) throws EngineException {
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public DeleteResult delete(Delete delete) {
throw new UnsupportedOperationException(shardId + " delete operation not allowed on shadow engine");
}

@Override
public NoOpResult noOp(NoOp noOp) {
throw new UnsupportedOperationException(shardId + " no-op operation not allowed on shadow engine");
}

@Override
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) {
throw new UnsupportedOperationException(shardId + " sync commit operation not allowed on shadow engine");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.rest.RestStatus;

Expand Down Expand Up @@ -94,6 +96,7 @@ public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throw
}
}
}

return opsRecovered;
}

Expand Down Expand Up @@ -158,22 +161,28 @@ private void performRecoveryOperation(Engine engine, Translog.Operation operatio
.routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(),
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, index.getAutoGeneratedIdTimestamp(), true);
maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
if (logger.isTraceEnabled()) {
logger.trace("[translog] recover [index] op of [{}][{}]", index.type(), index.id());
}
logger.trace("[translog] recover [index] op [({}, {})] of [{}][{}]", index.seqNo(), index.primaryTerm(), index.type(), index.id());
index(engine, engineIndex);
break;
case DELETE:
Translog.Delete delete = (Translog.Delete) operation;
Uid uid = Uid.createUid(delete.uid().text());
if (logger.isTraceEnabled()) {
logger.trace("[translog] recover [delete] op of [{}][{}]", uid.type(), uid.id());
}
logger.trace("[translog] recover [delete] op [({}, {})] of [{}][{}]", delete.seqNo(), delete.primaryTerm(), uid.type(), uid.id());
final Engine.Delete engineDelete = new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.seqNo(),
delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(),
origin, System.nanoTime());
delete(engine, engineDelete);
break;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
final long seqNo = noOp.seqNo();
final long primaryTerm = noOp.primaryTerm();
final String reason = noOp.reason();
logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason);
final Engine.NoOp engineNoOp =
new Engine.NoOp(null, seqNo, primaryTerm, 0, VersionType.INTERNAL, origin, System.nanoTime(), reason);
noOp(engine, engineNoOp);
break;
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
Expand Down Expand Up @@ -206,6 +215,9 @@ protected void delete(Engine engine, Engine.Delete engineDelete) {
engine.delete(engineDelete);
}

protected void noOp(Engine engine, Engine.NoOp engineNoOp) {
engine.noOp(engineNoOp);
}

/**
* Called once for every processed operation by this recovery performer.
Expand Down
Loading

0 comments on commit 7946396

Please sign in to comment.