Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not create engine under IndexShard#mutex #45263

Merged
merged 15 commits into from
Aug 26, 2019
Merged
128 changes: 88 additions & 40 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl

protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state;
// ensure happens-before relation between addRefreshListener() and postRecovery()
private final Object postRecoveryMutex = new Object();
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
final EngineFactory engineFactory;

private final IndexingOperationListener indexingOperationListeners;
Expand Down Expand Up @@ -1192,20 +1195,23 @@ public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException {
* @throws java.nio.file.NoSuchFileException if one or more files referenced by a commit are not present.
*/
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
assert Thread.holdsLock(mutex) == false : "snapshotting store metadata under mutex";
Engine.IndexCommitRef indexCommit = null;
store.incRef();
try {
Engine engine;
synchronized (mutex) {
synchronized (engineMutex) {
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized.
// That can be done out of mutex, since the engine can be closed half way.
engine = getEngineOrNull();
if (engine == null) {
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
synchronized (mutex) {
final Engine engine = getEngineOrNull();
if (engine != null) {
indexCommit = engine.acquireLastIndexCommit(false);
}
}
if (indexCommit == null) {
return store.getMetadata(null, true);
}
}
indexCommit = engine.acquireLastIndexCommit(false);
return store.getMetadata(indexCommit.getIndexCommit());
} finally {
store.decRef();
Expand Down Expand Up @@ -1334,23 +1340,24 @@ public void close(String reason, boolean flushEngine) throws IOException {
}
}

public IndexShard postRecovery(String reason)
throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
}
if (state == IndexShardState.STARTED) {
throw new IndexShardStartedException(shardId);
}
public void postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
synchronized (postRecoveryMutex) {
// we need to refresh again to expose all operations that were index until now. Otherwise
// we may not expose operations that were indexed with a refresh listener that was immediately
// responded to in addRefreshListener.
// responded to in addRefreshListener. The refresh must happen under the same mutex used in addRefreshListener
// and before moving this shard to POST_RECOVERY state (i.e., allow to read from this shard).
getEngine().refresh("post_recovery");
recoveryState.setStage(RecoveryState.Stage.DONE);
changeState(IndexShardState.POST_RECOVERY, reason);
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
}
if (state == IndexShardState.STARTED) {
throw new IndexShardStartedException(shardId);
}
recoveryState.setStage(RecoveryState.Stage.DONE);
changeState(IndexShardState.POST_RECOVERY, reason);
}
}
return this;
}

/**
Expand Down Expand Up @@ -1583,6 +1590,7 @@ public void openEngineAndSkipTranslogRecovery() throws IOException {
}

private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException {
assert Thread.holdsLock(mutex) == false : "opening engine under mutex";
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
Expand All @@ -1595,16 +1603,24 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty()
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
+ "] but got " + getRetentionLeases();
synchronized (mutex) {
verifyNotClosed();
assert currentEngineReference.get() == null : "engine is running";
synchronized (engineMutex) {
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
synchronized (mutex) {
try {
verifyNotClosed();
assert currentEngineReference.get() == null : "engine is running";
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
} finally {
if (currentEngineReference.get() != newEngine) {
newEngine.close();
}
}
}
}
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
Expand All @@ -1627,6 +1643,7 @@ private boolean assertSequenceNumbersInCommit() throws IOException {
}

private void onNewEngine(Engine newEngine) {
assert Thread.holdsLock(engineMutex);
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
}

Expand Down Expand Up @@ -2673,7 +2690,13 @@ private DocumentMapperForType docMapper(String type) {
}

private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
Sort indexSort = indexSortSupplier.get();
final Sort indexSort = indexSortSupplier.get();
final Engine.Warmer warmer = reader -> {
assert Thread.holdsLock(mutex) == false : "warming engine under mutex";
if (this.warmer != null) {
this.warmer.warm(reader);
}
};
return new EngineConfig(shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService != null ? mapperService.indexAnalyzer() : null,
Expand Down Expand Up @@ -3235,10 +3258,10 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
if (isReadAllowed()) {
readAllowed = true;
} else {
// check again under mutex. this is important to create a happens before relationship
// check again under postRecoveryMutex. this is important to create a happens before relationship
// between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond
// to a listener before a refresh actually happened that contained that operation.
synchronized (mutex) {
synchronized (postRecoveryMutex) {
readAllowed = isReadAllowed();
}
}
Expand Down Expand Up @@ -3303,6 +3326,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
*/
void resetEngineToGlobalCheckpoint() throws IOException {
assert Thread.holdsLock(engineMutex) == false : "resetting engine under mutex";
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
: "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']';
sync(); // persist the global checkpoint to disk
Expand All @@ -3314,23 +3338,28 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
SetOnce<Engine> newEngineReference = new SetOnce<>();
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
synchronized (mutex) {
verifyNotClosed();
// we must create both new read-only engine and new read-write engine under mutex to ensure snapshotStoreMetadata,
synchronized (engineMutex) {
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
// acquireXXXCommit and close works.
final Engine readOnlyEngine =
new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) {
@Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
synchronized (mutex) {
synchronized (engineMutex) {
if (newEngineReference.get() == null) {
throw new AlreadyClosedException("engine was closed");
}
// ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay
return newEngineReference.get().acquireLastIndexCommit(false);
}
}

@Override
public IndexCommitRef acquireSafeIndexCommit() {
synchronized (mutex) {
synchronized (engineMutex) {
if (newEngineReference.get() == null) {
throw new AlreadyClosedException("engine was closed");
}
return newEngineReference.get().acquireSafeIndexCommit();
}
}
Expand All @@ -3347,9 +3376,28 @@ public void close() throws IOException {
IOUtils.close(super::close, newEngine);
}
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
synchronized (mutex) {
try {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
} finally {
if (currentEngineReference.get() != readOnlyEngine) {
readOnlyEngine.close();
}
}
}
final Engine newReadWriteEngine = engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker));
synchronized (mutex) {
try {
verifyNotClosed();
newEngineReference.set(newReadWriteEngine);
onNewEngine(newReadWriteEngine);
} finally {
if (newEngineReference.get() != newReadWriteEngine) {
newReadWriteEngine.close(); // shard was closed
}
}
}
}
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.Engine.DeleteResult;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
Expand Down Expand Up @@ -4123,4 +4125,39 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) {
assertThat(readonlyShard.docStats().getCount(), equalTo(numDocs));
closeShards(readonlyShard);
}

public void testCloseShardWhileEngineIsWarming() throws Exception {
CountDownLatch warmerStarted = new CountDownLatch(1);
CountDownLatch warmerBlocking = new CountDownLatch(1);
IndexShard shard = newShard(true, Settings.EMPTY, config -> {
Engine.Warmer warmer = reader -> {
try {
warmerStarted.countDown();
warmerBlocking.await();
config.getWarmer().warm(reader);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
};
EngineConfig configWithWarmer = new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(),
config.getIndexSettings(), warmer, config.getStore(), config.getMergePolicy(), config.getAnalyzer(),
config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(),
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier());
return new InternalEngine(configWithWarmer);
});
Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard)));
recoveryThread.start();
try {
warmerStarted.await();
shard.close("testing", false);
assertThat(shard.state, equalTo(IndexShardState.CLOSED));
} finally {
warmerBlocking.countDown();
}
recoveryThread.join();
shard.store().close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1130,34 +1130,38 @@ public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exceptio
}

public static void assertAtMostOneLuceneDocumentPerSequenceNumber(Engine engine) throws IOException {
if (engine.config().getIndexSettings().isSoftDeleteEnabled() == false || engine instanceof InternalEngine == false) {
return;
if (engine instanceof InternalEngine) {
try {
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
assertAtMostOneLuceneDocumentPerSequenceNumber(engine.config().getIndexSettings(), searcher.getDirectoryReader());
}
} catch (AlreadyClosedException ignored) {
// engine was closed
}
}
try {
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
DirectoryReader reader = Lucene.wrapAllDocsLive(searcher.getDirectoryReader());
Set<Long> seqNos = new HashSet<>();
for (LeafReaderContext leaf : reader.leaves()) {
NumericDocValues primaryTermDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
int docId;
while ((docId = seqNoDocValues.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
assertTrue(seqNoDocValues.advanceExact(docId));
long seqNo = seqNoDocValues.longValue();
assertThat(seqNo, greaterThanOrEqualTo(0L));
if (primaryTermDocValues.advanceExact(docId)) {
if (seqNos.add(seqNo) == false) {
final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor();
leaf.reader().document(docId, idFieldVisitor);
throw new AssertionError("found multiple documents for seq=" + seqNo + " id=" + idFieldVisitor.getId());
}
}
}

public static void assertAtMostOneLuceneDocumentPerSequenceNumber(IndexSettings indexSettings,
DirectoryReader reader) throws IOException {
Set<Long> seqNos = new HashSet<>();
final DirectoryReader wrappedReader = indexSettings.isSoftDeleteEnabled() ? Lucene.wrapAllDocsLive(reader) : reader;
for (LeafReaderContext leaf : wrappedReader.leaves()) {
NumericDocValues primaryTermDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
int docId;
while ((docId = seqNoDocValues.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
assertTrue(seqNoDocValues.advanceExact(docId));
long seqNo = seqNoDocValues.longValue();
assertThat(seqNo, greaterThanOrEqualTo(0L));
if (primaryTermDocValues.advanceExact(docId)) {
if (seqNos.add(seqNo) == false) {
final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor();
leaf.reader().document(docId, idFieldVisitor);
throw new AssertionError("found multiple documents for seq=" + seqNo + " id=" + idFieldVisitor.getId());
}
}
}
} catch (AlreadyClosedException ignored) {

}
}

Expand Down
Loading