Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce translog no-op #22291

Merged
merged 4 commits into from
Dec 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public void writeString(String str) throws IOException {
// make sure any possible char can fit into the buffer in any possible iteration
// we need at most 3 bytes so we flush the buffer once we have less than 3 bytes
// left before we start another iteration
if (offset > buffer.length-3) {
if (offset > buffer.length - 3) {
writeBytes(buffer, offset);
offset = 0;
}
Expand Down
65 changes: 63 additions & 2 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -296,6 +295,8 @@ public Condition newCondition() {
*/
public abstract DeleteResult delete(final Delete delete);

public abstract NoOpResult noOp(final NoOp noOp);

/**
* Base class for index and delete operation results
* Holds result meta data (e.g. translog location, updated version)
Expand Down Expand Up @@ -382,6 +383,7 @@ void freeze() {
}

public static class IndexResult extends Result {

private final boolean created;

public IndexResult(long version, long seqNo, boolean created) {
Expand All @@ -397,9 +399,11 @@ public IndexResult(Exception failure, long version, long seqNo) {
public boolean isCreated() {
return created;
}

}

public static class DeleteResult extends Result {

private final boolean found;

public DeleteResult(long version, long seqNo, boolean found) {
Expand All @@ -415,6 +419,19 @@ public DeleteResult(Exception failure, long version, long seqNo) {
public boolean isFound() {
return found;
}

}

static class NoOpResult extends Result {

NoOpResult(long seqNo) {
super(Operation.TYPE.NO_OP, 0, seqNo);
}

NoOpResult(long seqNo, Exception failure) {
super(Operation.TYPE.NO_OP, failure, 0, seqNo);
}

}

/**
Expand Down Expand Up @@ -910,7 +927,7 @@ public abstract static class Operation {

/** type of operation (index, delete), subclasses use static types */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comment needs updating

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this comment.

public enum TYPE {
INDEX, DELETE;
INDEX, DELETE, NO_OP;

private final String lowercase;

Expand Down Expand Up @@ -1114,6 +1131,50 @@ TYPE operationType() {
public int estimatedSizeInBytes() {
return (uid().field().length() + uid().text().length()) * 2 + 20;
}

}

public static class NoOp extends Operation {

private final String reason;

public String reason() {
return reason;
}

public NoOp(
final Term uid,
final long seqNo,
final long primaryTerm,
final long version,
final VersionType versionType,
final Origin origin,
final long startTime,
final String reason) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
this.reason = reason;
}

@Override
public String type() {
throw new UnsupportedOperationException();
}

@Override
String id() {
throw new UnsupportedOperationException();
}

@Override
TYPE operationType() {
return TYPE.NO_OP;
}

@Override
public int estimatedSizeInBytes() {
return 2 * reason.length() + 2 * Long.BYTES;
}

}

public static class Get {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOExc
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
assert pendingTranslogRecovery.get(): "translogRecovery is not pending but should be";
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
Expand Down Expand Up @@ -375,7 +375,7 @@ private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog(
* specified global checkpoint.
*
* @param globalCheckpoint the global checkpoint to use
* @param indexWriter the index writer (for the Lucene commit point)
* @param indexWriter the index writer (for the Lucene commit point)
* @return the sequence number stats
*/
private static SeqNoStats loadSeqNoStatsFromLucene(final long globalCheckpoint, final IndexWriter indexWriter) {
Expand Down Expand Up @@ -434,7 +434,7 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws
if (get.versionType().isVersionConflictForReads(versionValue.version(), get.version())) {
Uid uid = Uid.createUid(get.uid().text());
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(),
get.versionType().explainConflictForReads(versionValue.version(), get.version()));
get.versionType().explainConflictForReads(versionValue.version(), get.version()));
}
refresh("realtime_get");
}
Expand Down Expand Up @@ -532,7 +532,7 @@ public IndexResult index(Index index) {
*
* @return failure if the failure is a document specific failure (e.g. analysis chain failure)
* or throws Exception if the failure caused the engine to fail (e.g. out of disk, lucene tragic event)
*
* <p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the IDE did that? I will remove.

* Note: pkg-private for testing
*/
final Exception checkIfDocumentFailureOrThrow(final Operation operation, final Exception failure) {
Expand Down Expand Up @@ -577,7 +577,7 @@ private boolean canOptimizeAddDocument(Index index) {
case PEER_RECOVERY:
case REPLICA:
assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL
: "version: " + index.version() + " type: " + index.versionType();
: "version: " + index.version() + " type: " + index.versionType();
return true;
case LOCAL_TRANSLOG_RECOVERY:
assert index.isRetry();
Expand All @@ -596,10 +596,10 @@ private boolean assertSequenceNumber(final Engine.Operation.Origin origin, final
" index version: " + engineConfig.getIndexSettings().getIndexVersionCreated() + ". seq no: " + seqNo;
} else if (origin == Operation.Origin.PRIMARY) {
// sequence number should not be set when operation origin is primary
assert seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO : "primary ops should never an assigned seq no. got: " + seqNo;
assert seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO : "primary ops should never have an assigned seq no. got: " + seqNo;
} else {
// sequence number should be set when operation origin is not primary
assert seqNo >= 0 : "replica ops should an assigned seq no. origin: " + origin +
assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no. origin: " + origin +
" index version: " + engineConfig.getIndexSettings().getIndexVersionCreated();
}
return true;
Expand Down Expand Up @@ -651,7 +651,7 @@ private IndexResult innerIndex(Index index) throws IOException {
if (deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp()) {
break;
}
} while(maxUnsafeAutoIdTimestamp.compareAndSet(deOptimizeTimestamp,
} while (maxUnsafeAutoIdTimestamp.compareAndSet(deOptimizeTimestamp,
index.getAutoGeneratedIdTimestamp()) == false);
assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
} else {
Expand Down Expand Up @@ -859,6 +859,34 @@ private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, Ve
return found;
}

@Override
public NoOpResult noOp(final NoOp noOp) {
NoOpResult noOpResult;
try (final ReleasableLock ignored = readLock.acquire()) {
noOpResult = innerNoOp(noOp);
} catch (final Exception e) {
noOpResult = new NoOpResult(noOp.seqNo(), e);
}
return noOpResult;
}

private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
assert noOp.seqNo() > SequenceNumbersService.NO_OPS_PERFORMED;
final long seqNo = noOp.seqNo();
try {
final NoOpResult noOpResult = new NoOpResult(noOp.seqNo());
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
noOpResult.setTook(System.nanoTime() - noOp.startTime());
noOpResult.freeze();
return noOpResult;
} finally {
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seq no must be assigned here, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I prefer the symmetry with innerIndex and innerDelete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

seqNoService().markSeqNoAsCompleted(seqNo);
}
}
}

@Override
public void refresh(String source) throws EngineException {
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public DeleteResult delete(Delete delete) {
throw new UnsupportedOperationException(shardId + " delete operation not allowed on shadow engine");
}

@Override
public NoOpResult noOp(NoOp noOp) {
throw new UnsupportedOperationException(shardId + " no-op operation not allowed on shadow engine");
}

@Override
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) {
throw new UnsupportedOperationException(shardId + " sync commit operation not allowed on shadow engine");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.rest.RestStatus;

Expand Down Expand Up @@ -94,6 +96,7 @@ public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throw
}
}
}

return opsRecovered;
}

Expand Down Expand Up @@ -158,22 +161,28 @@ private void performRecoveryOperation(Engine engine, Translog.Operation operatio
.routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(),
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, index.getAutoGeneratedIdTimestamp(), true);
maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
if (logger.isTraceEnabled()) {
logger.trace("[translog] recover [index] op of [{}][{}]", index.type(), index.id());
}
logger.trace("[translog] recover [index] op [({}, {})] of [{}][{}]", index.seqNo(), index.primaryTerm(), index.type(), index.id());
index(engine, engineIndex);
break;
case DELETE:
Translog.Delete delete = (Translog.Delete) operation;
Uid uid = Uid.createUid(delete.uid().text());
if (logger.isTraceEnabled()) {
logger.trace("[translog] recover [delete] op of [{}][{}]", uid.type(), uid.id());
}
logger.trace("[translog] recover [delete] op [({}, {})] of [{}][{}]", delete.seqNo(), delete.primaryTerm(), uid.type(), uid.id());
final Engine.Delete engineDelete = new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.seqNo(),
delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(),
origin, System.nanoTime());
delete(engine, engineDelete);
break;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
final long seqNo = noOp.seqNo();
final long primaryTerm = noOp.primaryTerm();
final String reason = noOp.reason();
logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason);
final Engine.NoOp engineNoOp =
new Engine.NoOp(null, seqNo, primaryTerm, 0, VersionType.INTERNAL, origin, System.nanoTime(), reason);
noOp(engine, engineNoOp);
break;
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
Expand Down Expand Up @@ -206,6 +215,9 @@ protected void delete(Engine engine, Engine.Delete engineDelete) {
engine.delete(engineDelete);
}

protected void noOp(Engine engine, Engine.NoOp engineNoOp) {
engine.noOp(engineNoOp);
}

/**
* Called once for every processed operation by this recovery performer.
Expand Down
Loading