Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move the IndexDeletionPolicy to be engine internal #24930

Merged
merged 3 commits into from
May 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand All @@ -63,7 +64,6 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -100,7 +100,6 @@ public abstract class Engine implements Closeable {
protected final Store store;
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
protected final EventListener eventListener;
protected final SnapshotDeletionPolicy deletionPolicy;
protected final ReentrantLock failEngineLock = new ReentrantLock();
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
Expand All @@ -121,15 +120,13 @@ public abstract class Engine implements Closeable {

protected Engine(EngineConfig engineConfig) {
Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine");
Objects.requireNonNull(engineConfig.getDeletionPolicy(), "Snapshot deletion policy must be provided to the engine");

this.engineConfig = engineConfig;
this.shardId = engineConfig.getShardId();
this.store = engineConfig.getStore();
this.logger = Loggers.getLogger(Engine.class, // we use the engine class directly here to make sure all subclasses have the same logger name
engineConfig.getIndexSettings().getSettings(), engineConfig.getShardId());
this.eventListener = engineConfig.getEventListener();
this.deletionPolicy = engineConfig.getDeletionPolicy();
}

/** Returns 0 in the case where accountable is null, otherwise returns {@code ramBytesUsed()} */
Expand Down Expand Up @@ -828,7 +825,7 @@ public void forceMerge(boolean flush) throws IOException {
*
* @param flushFirst indicates whether the engine should flush before returning the snapshot
*/
public abstract IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException;
public abstract IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException;

/**
* fail engine due to some error. the engine will also be closed.
Expand Down Expand Up @@ -1387,6 +1384,28 @@ public int hashCode() {
}
}

public static class IndexCommitRef implements Closeable {
private final AtomicBoolean closed = new AtomicBoolean();
private final CheckedRunnable<IOException> onClose;
private final IndexCommit indexCommit;

IndexCommitRef(SnapshotDeletionPolicy deletionPolicy) throws IOException {
indexCommit = deletionPolicy.snapshot();
onClose = () -> deletionPolicy.release(indexCommit);
}

@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
onClose.run();
}
}

public IndexCommit getIndexCommit() {
return indexCommit;
}
}

public void onSettingsChanged() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
Expand Down Expand Up @@ -58,7 +57,6 @@ public final class EngineConfig {
private final ThreadPool threadPool;
private final Engine.Warmer warmer;
private final Store store;
private final SnapshotDeletionPolicy deletionPolicy;
private final MergePolicy mergePolicy;
private final Analyzer analyzer;
private final Similarity similarity;
Expand Down Expand Up @@ -109,7 +107,7 @@ public final class EngineConfig {
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
IndexSettings indexSettings, Engine.Warmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
IndexSettings indexSettings, Engine.Warmer warmer, Store store,
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
Expand All @@ -123,7 +121,6 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
this.threadPool = threadPool;
this.warmer = warmer == null ? (a) -> {} : warmer;
this.store = store;
this.deletionPolicy = deletionPolicy;
this.mergePolicy = mergePolicy;
this.analyzer = analyzer;
this.similarity = similarity;
Expand Down Expand Up @@ -214,14 +211,6 @@ public Store getStore() {
return store;
}

/**
* Returns a {@link SnapshotDeletionPolicy} used in the engines
* {@link org.apache.lucene.index.IndexWriter}.
*/
public SnapshotDeletionPolicy getDeletionPolicy() {
return deletionPolicy;
}

/**
* Returns the {@link org.apache.lucene.index.MergePolicy} for the engines {@link org.apache.lucene.index.IndexWriter}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
Expand Down Expand Up @@ -126,6 +127,8 @@ public class InternalEngine extends Engine {

private final String uidField;

private final SnapshotDeletionPolicy deletionPolicy;

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
// incoming indexing ops to a single thread:
Expand All @@ -137,12 +140,14 @@ public class InternalEngine extends Engine {
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();


public InternalEngine(EngineConfig engineConfig) throws EngineException {
super(engineConfig);
openMode = engineConfig.getOpenMode();
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
}
deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
this.versionMap = new LiveVersionMap();
store.incRef();
Expand Down Expand Up @@ -1414,7 +1419,7 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
}

@Override
public IndexCommit acquireIndexCommit(final boolean flushFirst) throws EngineException {
public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
if (flushFirst) {
Expand All @@ -1425,7 +1430,7 @@ public IndexCommit acquireIndexCommit(final boolean flushFirst) throws EngineExc
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
logger.trace("pulling snapshot");
return deletionPolicy.snapshot();
return new IndexCommitRef(deletionPolicy);
} catch (IOException e) {
throw new SnapshotFailedEngineException(shardId, e);
}
Expand Down
31 changes: 9 additions & 22 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.Sort;
Expand Down Expand Up @@ -161,7 +159,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final String checkIndexOnStartup;
private final CodecService codecService;
private final Engine.Warmer warmer;
private final SnapshotDeletionPolicy deletionPolicy;
private final SimilarityService similarityService;
private final TranslogConfig translogConfig;
private final IndexEventListener indexEventListener;
Expand Down Expand Up @@ -230,7 +227,6 @@ public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardP
final Settings settings = indexSettings.getSettings();
this.codecService = new CodecService(mapperService, logger);
this.warmer = warmer;
this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
this.similarityService = similarityService;
Objects.requireNonNull(store, "Store must be provided to the index shard");
this.engineFactory = engineFactory == null ? new InternalEngineFactory() : engineFactory;
Expand Down Expand Up @@ -878,11 +874,11 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {

/**
* Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
* commit won't be freed until the commit / snapshot is released via {@link #releaseIndexCommit(IndexCommit)}.
* commit won't be freed until the commit / snapshot is closed.
*
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
*/
public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException {
public Engine.IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException {
IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
Expand All @@ -893,14 +889,6 @@ public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException
}


/**
* Releases a snapshot taken from {@link #acquireIndexCommit(boolean)} this must be called to release the resources
* referenced by the given snapshot {@link IndexCommit}.
*/
public void releaseIndexCommit(IndexCommit snapshot) throws IOException {
deletionPolicy.release(snapshot);
}

/**
* gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard,
* without having to worry about the current state of the engine and concurrent flushes.
Expand All @@ -915,25 +903,24 @@ public void releaseIndexCommit(IndexCommit snapshot) throws IOException {
* @throws java.nio.file.NoSuchFileException if one or more files referenced by a commit are not present.
*/
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
IndexCommit indexCommit = null;
Engine.IndexCommitRef indexCommit = null;
store.incRef();
try {
Engine engine;
synchronized (mutex) {
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized.
// That can be done out of mutex, since the engine can be closed half way.
Engine engine = getEngineOrNull();
engine = getEngineOrNull();
if (engine == null) {
return store.getMetadata(null, true);
}
}
indexCommit = deletionPolicy.snapshot();
return store.getMetadata(indexCommit);
indexCommit = engine.acquireIndexCommit(false);
return store.getMetadata(indexCommit.getIndexCommit());
} finally {
store.decRef();
if (indexCommit != null) {
deletionPolicy.release(indexCommit);
}
IOUtils.close(indexCommit);
}
}

Expand Down Expand Up @@ -1838,7 +1825,7 @@ private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger);
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(openMode, shardId,
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.index.shard;

import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
Expand All @@ -28,6 +27,7 @@
import org.apache.lucene.store.NoLockFactory;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.store.Store;

import java.io.Closeable;
Expand All @@ -38,7 +38,7 @@
final class LocalShardSnapshot implements Closeable {
private final IndexShard shard;
private final Store store;
private final IndexCommit indexCommit;
private final Engine.IndexCommitRef indexCommit;
private final AtomicBoolean closed = new AtomicBoolean(false);

LocalShardSnapshot(IndexShard shard) {
Expand Down Expand Up @@ -66,7 +66,7 @@ Directory getSnapshotDirectory() {
return new FilterDirectory(store.directory()) {
@Override
public String[] listAll() throws IOException {
Collection<String> fileNames = indexCommit.getFileNames();
Collection<String> fileNames = indexCommit.getIndexCommit().getFileNames();
final String[] fileNameArray = fileNames.toArray(new String[fileNames.size()]);
return fileNameArray;
}
Expand Down Expand Up @@ -115,7 +115,7 @@ public void close() throws IOException {
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
shard.releaseIndexCommit(indexCommit);
indexCommit.close();
} finally {
store.decRef();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbersService;
Expand Down Expand Up @@ -135,20 +136,20 @@ public RecoveryResponse recoverToTarget() throws IOException {
if (isSequenceNumberBasedRecoveryPossible) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
} else {
final IndexCommit phase1Snapshot;
final Engine.IndexCommitRef phase1Snapshot;
try {
phase1Snapshot = shard.acquireIndexCommit(false);
} catch (final Exception e) {
IOUtils.closeWhileHandlingException(translogView);
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
try {
phase1(phase1Snapshot, translogView);
phase1(phase1Snapshot.getIndexCommit(), translogView);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
try {
shard.releaseIndexCommit(phase1Snapshot);
IOUtils.close(phase1Snapshot);
} catch (final IOException ex) {
logger.warn("releasing snapshot caused exception", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -45,6 +44,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.SnapshotFailedEngineException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -376,17 +376,14 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina

try {
// we flush first to make sure we get the latest writes snapshotted
IndexCommit snapshotIndexCommit = indexShard.acquireIndexCommit(true);
try {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotIndexCommit, snapshotStatus);
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(true)) {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n");
logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, repository,
TimeValue.timeValueMillis(snapshotStatus.time()), sb);
}
} finally {
indexShard.releaseIndexCommit(snapshotIndexCommit);
}
} catch (SnapshotFailedEngineException e) {
throw e;
Expand Down
Loading