Skip to content

Commit

Permalink
Add a frozen engine implementation
Browse files Browse the repository at this point in the history
This change adds a `frozen` engine that allows lazily open a directory reader
on a read-only shard. The engine wraps general purpose searchers in a LazyDirectoryReader
that also allows to release and reset the underlying index readers after any and before
secondary search phases.

Relates to elastic#34352

F
  • Loading branch information
s1monw committed Oct 8, 2018
1 parent 6f32f71 commit 99aa8ad
Show file tree
Hide file tree
Showing 10 changed files with 890 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,16 @@ public static void fsync(final Path fileToSync, final boolean isDir) throws IOEx
}
}

/**
* An IO operation with a single input.
* @see java.util.function.Consumer
*/
@FunctionalInterface
public interface IOConsumer<T> {
/**
* Performs this operation on the given argument.
*/
void accept(T input) throws IOException;
}

}
99 changes: 99 additions & 0 deletions server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.FilterCodecReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
Expand All @@ -39,12 +43,20 @@
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafMetaData;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.Terms;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.FieldDoc;
Expand Down Expand Up @@ -217,6 +229,10 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc
return si;
}

public static IndexCommit getIndexCommit(SegmentInfos si, Directory directory) throws IOException {
return new CommitPoint(si, directory);
}

/**
* This method removes all lucene files from the given directory. It will first try to delete all commit points / segments
* files to ensure broken commits or corrupted indices will not be opened in the future. If any of the segment files can't be deleted
Expand Down Expand Up @@ -967,4 +983,87 @@ public CacheHelper getReaderCacheHelper() {
public static NumericDocValuesField newSoftDeletesField() {
return new NumericDocValuesField(SOFT_DELETES_FIELD, 1);
}

/**
* Returns an empty leaf reader with the given max docs. The reader will be fully deleted.
*/
public static LeafReader emptyReader(final int maxDoc) {
return new LeafReader() {
final Bits liveDocs = new Bits.MatchNoBits(maxDoc);

public Terms terms(String field) {
return null;
}

public NumericDocValues getNumericDocValues(String field) {
return null;
}

public BinaryDocValues getBinaryDocValues(String field) {
return null;
}

public SortedDocValues getSortedDocValues(String field) {
return null;
}

public SortedNumericDocValues getSortedNumericDocValues(String field) {
return null;
}

public SortedSetDocValues getSortedSetDocValues(String field) {
return null;
}

public NumericDocValues getNormValues(String field) {
return null;
}

public FieldInfos getFieldInfos() {
return new FieldInfos(new FieldInfo[0]);
}

public Bits getLiveDocs() {
return this.liveDocs;
}

public PointValues getPointValues(String fieldName) {
return null;
}

public void checkIntegrity() {
}

public Fields getTermVectors(int docID) {
return null;
}

public int numDocs() {
return 0;
}

public int maxDoc() {
return maxDoc;
}

public void document(int docID, StoredFieldVisitor visitor) {
}

protected void doClose() {
}

public LeafMetaData getMetaData() {
return new LeafMetaData(Version.LATEST.major, Version.LATEST, (Sort)null);
}

public CacheHelper getCoreCacheHelper() {
return null;
}

public CacheHelper getReaderCacheHelper() {
return null;
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.index.IndexRequest;
Expand All @@ -66,6 +65,7 @@
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
Expand Down Expand Up @@ -784,7 +784,7 @@ public final CommitStats commitStats() {
/**
* Global stats on segments.
*/
public final SegmentsStats segmentsStats(boolean includeSegmentFileSizes) {
public SegmentsStats segmentsStats(boolean includeSegmentFileSizes) {
ensureOpen();
Set<String> segmentName = new HashSet<>();
SegmentsStats stats = new SegmentsStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ final class RamAccountingSearcherFactory extends SearcherFactory {

@Override
public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException {
processReaders(reader, previousReader);
return super.newSearcher(reader, previousReader);
}

public void processReaders(IndexReader reader, IndexReader previousReader) {
final CircuitBreaker breaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING);

// Construct a list of the previous segment readers, we only want to track memory used
Expand Down Expand Up @@ -79,6 +84,5 @@ public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader)
segmentReader.getCoreCacheHelper().addClosedListener(k -> breaker.addWithoutBreaking(-ramBytesUsed));
}
}
return super.newSearcher(reader, previousReader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
*
* @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function)
*/
public final class ReadOnlyEngine extends Engine {
public class ReadOnlyEngine extends Engine {

private final SegmentInfos lastCommittedSegmentInfos;
private final SeqNoStats seqNoStats;
Expand All @@ -66,6 +66,7 @@ public final class ReadOnlyEngine extends Engine {
private final IndexCommit indexCommit;
private final Lock indexWriterLock;
private final DocsStats docsStats;
protected final RamAccountingSearcherFactory searcherFactory;

/**
* Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened
Expand All @@ -82,6 +83,7 @@ public final class ReadOnlyEngine extends Engine {
public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats translogStats, boolean obtainLock,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) {
super(config);
this.searcherFactory = new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService());
try {
Store store = config.getStore();
store.incRef();
Expand All @@ -96,14 +98,10 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats;
reader = ElasticsearchDirectoryReader.wrap(open(directory), config.getShardId());
if (config.getIndexSettings().isSoftDeleteEnabled()) {
reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
}
reader = readerWrapperFunction.apply(reader);
this.indexCommit = reader.getIndexCommit();
this.searcherManager = new SearcherManager(reader,
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
reader = open(directory);
reader = wrapReader(reader, readerWrapperFunction);
searcherManager = new SearcherManager(reader, searcherFactory);
this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory);
this.docsStats = docsStats(lastCommittedSegmentInfos);
this.indexWriterLock = indexWriterLock;
success = true;
Expand All @@ -117,6 +115,15 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
}
}

protected final DirectoryReader wrapReader(DirectoryReader reader,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) throws IOException {
reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId());
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
}
return readerWrapperFunction.apply(reader);
}

protected DirectoryReader open(final Directory directory) throws IOException {
return DirectoryReader.open(directory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ final class DefaultSearchContext extends SearchContext {
private final Map<String, SearchExtBuilder> searchExtBuilders = new HashMap<>();
private final Map<Class<?>, Collector> queryCollectors = new HashMap<>();
private final QueryShardContext queryShardContext;
private FetchPhase fetchPhase;
private final FetchPhase fetchPhase;

DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
Expand Down Expand Up @@ -186,7 +186,7 @@ final class DefaultSearchContext extends SearchContext {

@Override
public void doClose() {
// clear and scope phase we have
// clear and scope phase we have
Releasables.close(searcher, engineSearcher);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,8 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio
public DirectoryReader getDirectoryReader() {
return engineSearcher.getDirectoryReader();
}

public Engine.Searcher getEngineSearcher() {
return engineSearcher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -545,6 +546,13 @@ public EngineConfig config(IndexSettings indexSettings, Store store, Path transl

public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier) {
return config(indexSettings, store, translogPath, mergePolicy, refreshListener, indexSort, globalCheckpointSupplier,
new NoneCircuitBreakerService());
}

public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier,
CircuitBreakerService breakerService) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
Engine.EventListener listener = new Engine.EventListener() {
Expand All @@ -559,8 +567,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort,
new NoneCircuitBreakerService(),
globalCheckpointSupplier == null ?
breakerService, globalCheckpointSupplier == null ?
new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) :
globalCheckpointSupplier, primaryTerm::get, tombstoneDocSupplier());
return config;
Expand Down
Loading

0 comments on commit 99aa8ad

Please sign in to comment.