Skip to content

Commit

Permalink
Move the IndexDeletionPolicy to be engine internal (#24930)
Browse files Browse the repository at this point in the history
The `IndexDeletionPolicy` is currently instantiated by `IndexShard` and is then passed through to the engine as a parameter. That's a shame as it is really just an implementation detail and the engine already has a method to acquire a commit.

This is preparing for a follow up PR that will we connect the index deletion policy with a new translog deletion policy.

Relates to #10708
  • Loading branch information
bleskes authored May 29, 2017
1 parent 5741005 commit dfdf496
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 89 deletions.
29 changes: 24 additions & 5 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand All @@ -63,7 +64,6 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -100,7 +100,6 @@ public abstract class Engine implements Closeable {
protected final Store store;
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
protected final EventListener eventListener;
protected final SnapshotDeletionPolicy deletionPolicy;
protected final ReentrantLock failEngineLock = new ReentrantLock();
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
Expand All @@ -121,15 +120,13 @@ public abstract class Engine implements Closeable {

protected Engine(EngineConfig engineConfig) {
Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine");
Objects.requireNonNull(engineConfig.getDeletionPolicy(), "Snapshot deletion policy must be provided to the engine");

this.engineConfig = engineConfig;
this.shardId = engineConfig.getShardId();
this.store = engineConfig.getStore();
this.logger = Loggers.getLogger(Engine.class, // we use the engine class directly here to make sure all subclasses have the same logger name
engineConfig.getIndexSettings().getSettings(), engineConfig.getShardId());
this.eventListener = engineConfig.getEventListener();
this.deletionPolicy = engineConfig.getDeletionPolicy();
}

/** Returns 0 in the case where accountable is null, otherwise returns {@code ramBytesUsed()} */
Expand Down Expand Up @@ -828,7 +825,7 @@ public void forceMerge(boolean flush) throws IOException {
*
* @param flushFirst indicates whether the engine should flush before returning the snapshot
*/
public abstract IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException;
public abstract IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException;

/**
* fail engine due to some error. the engine will also be closed.
Expand Down Expand Up @@ -1387,6 +1384,28 @@ public int hashCode() {
}
}

public static class IndexCommitRef implements Closeable {
private final AtomicBoolean closed = new AtomicBoolean();
private final CheckedRunnable<IOException> onClose;
private final IndexCommit indexCommit;

IndexCommitRef(SnapshotDeletionPolicy deletionPolicy) throws IOException {
indexCommit = deletionPolicy.snapshot();
onClose = () -> deletionPolicy.release(indexCommit);
}

@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
onClose.run();
}
}

public IndexCommit getIndexCommit() {
return indexCommit;
}
}

public void onSettingsChanged() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
Expand Down Expand Up @@ -58,7 +57,6 @@ public final class EngineConfig {
private final ThreadPool threadPool;
private final Engine.Warmer warmer;
private final Store store;
private final SnapshotDeletionPolicy deletionPolicy;
private final MergePolicy mergePolicy;
private final Analyzer analyzer;
private final Similarity similarity;
Expand Down Expand Up @@ -109,7 +107,7 @@ public final class EngineConfig {
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
IndexSettings indexSettings, Engine.Warmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
IndexSettings indexSettings, Engine.Warmer warmer, Store store,
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
Expand All @@ -123,7 +121,6 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
this.threadPool = threadPool;
this.warmer = warmer == null ? (a) -> {} : warmer;
this.store = store;
this.deletionPolicy = deletionPolicy;
this.mergePolicy = mergePolicy;
this.analyzer = analyzer;
this.similarity = similarity;
Expand Down Expand Up @@ -214,14 +211,6 @@ public Store getStore() {
return store;
}

/**
* Returns a {@link SnapshotDeletionPolicy} used in the engines
* {@link org.apache.lucene.index.IndexWriter}.
*/
public SnapshotDeletionPolicy getDeletionPolicy() {
return deletionPolicy;
}

/**
* Returns the {@link org.apache.lucene.index.MergePolicy} for the engines {@link org.apache.lucene.index.IndexWriter}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
Expand Down Expand Up @@ -126,6 +127,8 @@ public class InternalEngine extends Engine {

private final String uidField;

private final SnapshotDeletionPolicy deletionPolicy;

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
// incoming indexing ops to a single thread:
Expand All @@ -137,12 +140,14 @@ public class InternalEngine extends Engine {
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();


public InternalEngine(EngineConfig engineConfig) throws EngineException {
super(engineConfig);
openMode = engineConfig.getOpenMode();
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
}
deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
this.versionMap = new LiveVersionMap();
store.incRef();
Expand Down Expand Up @@ -1414,7 +1419,7 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
}

@Override
public IndexCommit acquireIndexCommit(final boolean flushFirst) throws EngineException {
public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
if (flushFirst) {
Expand All @@ -1425,7 +1430,7 @@ public IndexCommit acquireIndexCommit(final boolean flushFirst) throws EngineExc
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
logger.trace("pulling snapshot");
return deletionPolicy.snapshot();
return new IndexCommitRef(deletionPolicy);
} catch (IOException e) {
throw new SnapshotFailedEngineException(shardId, e);
}
Expand Down
31 changes: 9 additions & 22 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.Sort;
Expand Down Expand Up @@ -161,7 +159,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final String checkIndexOnStartup;
private final CodecService codecService;
private final Engine.Warmer warmer;
private final SnapshotDeletionPolicy deletionPolicy;
private final SimilarityService similarityService;
private final TranslogConfig translogConfig;
private final IndexEventListener indexEventListener;
Expand Down Expand Up @@ -230,7 +227,6 @@ public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardP
final Settings settings = indexSettings.getSettings();
this.codecService = new CodecService(mapperService, logger);
this.warmer = warmer;
this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
this.similarityService = similarityService;
Objects.requireNonNull(store, "Store must be provided to the index shard");
this.engineFactory = engineFactory == null ? new InternalEngineFactory() : engineFactory;
Expand Down Expand Up @@ -878,11 +874,11 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {

/**
* Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
* commit won't be freed until the commit / snapshot is released via {@link #releaseIndexCommit(IndexCommit)}.
* commit won't be freed until the commit / snapshot is closed.
*
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
*/
public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException {
public Engine.IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException {
IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
Expand All @@ -893,14 +889,6 @@ public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException
}


/**
* Releases a snapshot taken from {@link #acquireIndexCommit(boolean)} this must be called to release the resources
* referenced by the given snapshot {@link IndexCommit}.
*/
public void releaseIndexCommit(IndexCommit snapshot) throws IOException {
deletionPolicy.release(snapshot);
}

/**
* gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard,
* without having to worry about the current state of the engine and concurrent flushes.
Expand All @@ -915,25 +903,24 @@ public void releaseIndexCommit(IndexCommit snapshot) throws IOException {
* @throws java.nio.file.NoSuchFileException if one or more files referenced by a commit are not present.
*/
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
IndexCommit indexCommit = null;
Engine.IndexCommitRef indexCommit = null;
store.incRef();
try {
Engine engine;
synchronized (mutex) {
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized.
// That can be done out of mutex, since the engine can be closed half way.
Engine engine = getEngineOrNull();
engine = getEngineOrNull();
if (engine == null) {
return store.getMetadata(null, true);
}
}
indexCommit = deletionPolicy.snapshot();
return store.getMetadata(indexCommit);
indexCommit = engine.acquireIndexCommit(false);
return store.getMetadata(indexCommit.getIndexCommit());
} finally {
store.decRef();
if (indexCommit != null) {
deletionPolicy.release(indexCommit);
}
IOUtils.close(indexCommit);
}
}

Expand Down Expand Up @@ -1838,7 +1825,7 @@ private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger);
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(openMode, shardId,
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.index.shard;

import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
Expand All @@ -28,6 +27,7 @@
import org.apache.lucene.store.NoLockFactory;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.store.Store;

import java.io.Closeable;
Expand All @@ -38,7 +38,7 @@
final class LocalShardSnapshot implements Closeable {
private final IndexShard shard;
private final Store store;
private final IndexCommit indexCommit;
private final Engine.IndexCommitRef indexCommit;
private final AtomicBoolean closed = new AtomicBoolean(false);

LocalShardSnapshot(IndexShard shard) {
Expand Down Expand Up @@ -66,7 +66,7 @@ Directory getSnapshotDirectory() {
return new FilterDirectory(store.directory()) {
@Override
public String[] listAll() throws IOException {
Collection<String> fileNames = indexCommit.getFileNames();
Collection<String> fileNames = indexCommit.getIndexCommit().getFileNames();
final String[] fileNameArray = fileNames.toArray(new String[fileNames.size()]);
return fileNameArray;
}
Expand Down Expand Up @@ -115,7 +115,7 @@ public void close() throws IOException {
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
shard.releaseIndexCommit(indexCommit);
indexCommit.close();
} finally {
store.decRef();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbersService;
Expand Down Expand Up @@ -135,20 +136,20 @@ public RecoveryResponse recoverToTarget() throws IOException {
if (isSequenceNumberBasedRecoveryPossible) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
} else {
final IndexCommit phase1Snapshot;
final Engine.IndexCommitRef phase1Snapshot;
try {
phase1Snapshot = shard.acquireIndexCommit(false);
} catch (final Exception e) {
IOUtils.closeWhileHandlingException(translogView);
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
try {
phase1(phase1Snapshot, translogView);
phase1(phase1Snapshot.getIndexCommit(), translogView);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
try {
shard.releaseIndexCommit(phase1Snapshot);
IOUtils.close(phase1Snapshot);
} catch (final IOException ex) {
logger.warn("releasing snapshot caused exception", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -45,6 +44,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.SnapshotFailedEngineException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -376,17 +376,14 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina

try {
// we flush first to make sure we get the latest writes snapshotted
IndexCommit snapshotIndexCommit = indexShard.acquireIndexCommit(true);
try {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotIndexCommit, snapshotStatus);
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(true)) {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n");
logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, repository,
TimeValue.timeValueMillis(snapshotStatus.time()), sb);
}
} finally {
indexShard.releaseIndexCommit(snapshotIndexCommit);
}
} catch (SnapshotFailedEngineException e) {
throw e;
Expand Down
Loading

0 comments on commit dfdf496

Please sign in to comment.