From 99aa8ad399efe5911453637812f07b821bb9431a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 10 Sep 2018 16:10:31 +0200 Subject: [PATCH 01/17] Add a frozen engine implementation This change adds a `frozen` engine that allows lazily open a directory reader on a read-only shard. The engine wraps general purpose searchers in a LazyDirectoryReader that also allows to release and reset the underlying index readers after any and before secondary search phases. Relates to #34352 F --- .../core/internal/io/IOUtils.java | 12 + .../elasticsearch/common/lucene/Lucene.java | 99 ++++ .../elasticsearch/index/engine/Engine.java | 4 +- .../engine/RamAccountingSearcherFactory.java | 6 +- .../index/engine/ReadOnlyEngine.java | 25 +- .../search/DefaultSearchContext.java | 4 +- .../search/internal/ContextIndexSearcher.java | 4 + .../index/engine/EngineTestCase.java | 11 +- .../index/engine/FrozenEngine.java | 490 ++++++++++++++++++ .../index/engine/FrozenEngineTests.java | 251 +++++++++ 10 files changed, 890 insertions(+), 16 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java diff --git a/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java b/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java index 493d809f9dc33..8796eff787dd0 100644 --- a/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java +++ b/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java @@ -278,4 +278,16 @@ public static void fsync(final Path fileToSync, final boolean isDir) throws IOEx } } + /** + * An IO operation with a single input. + * @see java.util.function.Consumer + */ + @FunctionalInterface + public interface IOConsumer { + /** + * Performs this operation on the given argument. + */ + void accept(T input) throws IOException; + } + } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index ea363e884613b..0fe8ae6e95481 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -28,8 +28,12 @@ import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.document.LatLonDocValuesField; import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.Fields; import org.apache.lucene.index.FilterCodecReader; import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.FilterLeafReader; @@ -39,12 +43,20 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafMetaData; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PointValues; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.StoredFieldVisitor; +import org.apache.lucene.index.Terms; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Explanation; import org.apache.lucene.search.FieldDoc; @@ -217,6 +229,10 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc return si; } + public static IndexCommit getIndexCommit(SegmentInfos si, Directory directory) throws IOException { + return new CommitPoint(si, directory); + } + /** * This method removes all lucene files from the given directory. It will first try to delete all commit points / segments * files to ensure broken commits or corrupted indices will not be opened in the future. If any of the segment files can't be deleted @@ -967,4 +983,87 @@ public CacheHelper getReaderCacheHelper() { public static NumericDocValuesField newSoftDeletesField() { return new NumericDocValuesField(SOFT_DELETES_FIELD, 1); } + + /** + * Returns an empty leaf reader with the given max docs. The reader will be fully deleted. + */ + public static LeafReader emptyReader(final int maxDoc) { + return new LeafReader() { + final Bits liveDocs = new Bits.MatchNoBits(maxDoc); + + public Terms terms(String field) { + return null; + } + + public NumericDocValues getNumericDocValues(String field) { + return null; + } + + public BinaryDocValues getBinaryDocValues(String field) { + return null; + } + + public SortedDocValues getSortedDocValues(String field) { + return null; + } + + public SortedNumericDocValues getSortedNumericDocValues(String field) { + return null; + } + + public SortedSetDocValues getSortedSetDocValues(String field) { + return null; + } + + public NumericDocValues getNormValues(String field) { + return null; + } + + public FieldInfos getFieldInfos() { + return new FieldInfos(new FieldInfo[0]); + } + + public Bits getLiveDocs() { + return this.liveDocs; + } + + public PointValues getPointValues(String fieldName) { + return null; + } + + public void checkIntegrity() { + } + + public Fields getTermVectors(int docID) { + return null; + } + + public int numDocs() { + return 0; + } + + public int maxDoc() { + return maxDoc; + } + + public void document(int docID, StoredFieldVisitor visitor) { + } + + protected void doClose() { + } + + public LeafMetaData getMetaData() { + return new LeafMetaData(Version.LATEST.major, Version.LATEST, (Sort)null); + } + + public CacheHelper getCoreCacheHelper() { + return null; + } + + public CacheHelper getReaderCacheHelper() { + return null; + } + }; + } + } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 460501c8b5238..18f6ec09344ca 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -43,7 +43,6 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountables; -import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.index.IndexRequest; @@ -66,6 +65,7 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; @@ -784,7 +784,7 @@ public final CommitStats commitStats() { /** * Global stats on segments. */ - public final SegmentsStats segmentsStats(boolean includeSegmentFileSizes) { + public SegmentsStats segmentsStats(boolean includeSegmentFileSizes) { ensureOpen(); Set segmentName = new HashSet<>(); SegmentsStats stats = new SegmentsStats(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java b/server/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java index 7972d426fba02..9630485cbc105 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java +++ b/server/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java @@ -48,6 +48,11 @@ final class RamAccountingSearcherFactory extends SearcherFactory { @Override public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException { + processReaders(reader, previousReader); + return super.newSearcher(reader, previousReader); + } + + public void processReaders(IndexReader reader, IndexReader previousReader) { final CircuitBreaker breaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING); // Construct a list of the previous segment readers, we only want to track memory used @@ -79,6 +84,5 @@ public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) segmentReader.getCoreCacheHelper().addClosedListener(k -> breaker.addWithoutBreaking(-ramBytesUsed)); } } - return super.newSearcher(reader, previousReader); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 26ef259a1e1c6..b6443187de902 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -57,7 +57,7 @@ * * @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function) */ -public final class ReadOnlyEngine extends Engine { +public class ReadOnlyEngine extends Engine { private final SegmentInfos lastCommittedSegmentInfos; private final SeqNoStats seqNoStats; @@ -66,6 +66,7 @@ public final class ReadOnlyEngine extends Engine { private final IndexCommit indexCommit; private final Lock indexWriterLock; private final DocsStats docsStats; + protected final RamAccountingSearcherFactory searcherFactory; /** * Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened @@ -82,6 +83,7 @@ public final class ReadOnlyEngine extends Engine { public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats translogStats, boolean obtainLock, Function readerWrapperFunction) { super(config); + this.searcherFactory = new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()); try { Store store = config.getStore(); store.incRef(); @@ -96,14 +98,10 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory); this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats; this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats; - reader = ElasticsearchDirectoryReader.wrap(open(directory), config.getShardId()); - if (config.getIndexSettings().isSoftDeleteEnabled()) { - reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); - } - reader = readerWrapperFunction.apply(reader); - this.indexCommit = reader.getIndexCommit(); - this.searcherManager = new SearcherManager(reader, - new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService())); + reader = open(directory); + reader = wrapReader(reader, readerWrapperFunction); + searcherManager = new SearcherManager(reader, searcherFactory); + this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory); this.docsStats = docsStats(lastCommittedSegmentInfos); this.indexWriterLock = indexWriterLock; success = true; @@ -117,6 +115,15 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats } } + protected final DirectoryReader wrapReader(DirectoryReader reader, + Function readerWrapperFunction) throws IOException { + reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId()); + if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); + } + return readerWrapperFunction.apply(reader); + } + protected DirectoryReader open(final Directory directory) throws IOException { return DirectoryReader.open(directory); } diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index d681a186892db..0b17ec72fbb17 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -154,7 +154,7 @@ final class DefaultSearchContext extends SearchContext { private final Map searchExtBuilders = new HashMap<>(); private final Map, Collector> queryCollectors = new HashMap<>(); private final QueryShardContext queryShardContext; - private FetchPhase fetchPhase; + private final FetchPhase fetchPhase; DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService, @@ -186,7 +186,7 @@ final class DefaultSearchContext extends SearchContext { @Override public void doClose() { - // clear and scope phase we have + // clear and scope phase we have Releasables.close(searcher, engineSearcher); } diff --git a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index 04a4629e9a875..8e7c2ef013a43 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -217,4 +217,8 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio public DirectoryReader getDirectoryReader() { return engineSearcher.getDirectoryReader(); } + + public Engine.Searcher getEngineSearcher() { + return engineSearcher; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 12f0d645d8a87..b78506fa8c938 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -81,6 +81,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; @@ -545,6 +546,13 @@ public EngineConfig config(IndexSettings indexSettings, Store store, Path transl public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier) { + return config(indexSettings, store, translogPath, mergePolicy, refreshListener, indexSort, globalCheckpointSupplier, + new NoneCircuitBreakerService()); + } + + public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier, + CircuitBreakerService breakerService) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); Engine.EventListener listener = new Engine.EventListener() { @@ -559,8 +567,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, - new NoneCircuitBreakerService(), - globalCheckpointSupplier == null ? + breakerService, globalCheckpointSupplier == null ? new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) : globalCheckpointSupplier, primaryTerm::get, tombstoneDocSupplier()); return config; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java new file mode 100644 index 0000000000000..7572061485931 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -0,0 +1,490 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.engine; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.FilterDirectoryReader; +import org.apache.lucene.index.FilterLeafReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.LeafMetaData; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.StoredFieldVisitor; +import org.apache.lucene.index.Terms; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Bits; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.core.internal.io.IOUtils; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.function.Function; + +/** + * This is a stand-alone read-only engine that maintains a lazy loaded index reader that is opened on calls to + * {@link Engine#acquireSearcher(String)}. The index reader opened is maintained until there are no reference to it anymore and then + * releases itself from the engine. The readers returned from this engine are lazy which allows release after and reset before a search + * phase starts. This allows releasing references as soon as possible on the search layer. + */ +public final class FrozenEngine extends ReadOnlyEngine { + private final CounterMetric openedReaders = new CounterMetric(); + private volatile DirectoryReader lastOpenedReader; + + public FrozenEngine(EngineConfig config) { + super(config, null, null, true, Function.identity()); + } + + @Override + protected DirectoryReader open(Directory directory) throws IOException { + // we fake an empty directly reader for the ReadOnlyEngine. this reader is only used + // to initialize the reference manager and to make the refresh call happy which is essentially + // a no-op now + IndexCommit indexCommit = Lucene.getIndexCommit(getLastCommittedSegmentInfos(), directory); + return new DirectoryReader(directory, new LeafReader[0]) { + @Override + protected DirectoryReader doOpenIfChanged() { + return null; + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexCommit commit) { + return null; + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) { + return null; + } + + @Override + public long getVersion() { + return 0; + } + + @Override + public boolean isCurrent() { + return true; // always current + } + + @Override + public IndexCommit getIndexCommit() { + return indexCommit; // TODO maybe we can return an empty commit? + } + + @Override + protected void doClose() { + } + + @Override + public CacheHelper getReaderCacheHelper() { + return null; + } + }; + } + + @SuppressForbidden(reason = "we manage references explicitly here") + private synchronized void onReaderClosed(IndexReader.CacheKey key) { + if (lastOpenedReader != null && key == lastOpenedReader.getReaderCacheHelper().getKey()) { + assert lastOpenedReader.getRefCount() == 0; + lastOpenedReader = null; + } + } + + @SuppressForbidden(reason = "we manage references explicitly here") + private synchronized DirectoryReader getOrOpenReader(boolean doOpen) throws IOException { + DirectoryReader reader = null; + boolean success = false; + try { + if (lastOpenedReader == null || lastOpenedReader.tryIncRef() == false) { + if (doOpen) { + reader = DirectoryReader.open(engineConfig.getStore().directory()); + searcherFactory.processReaders(reader, null); + openedReaders.inc(); + reader = lastOpenedReader = wrapReader(reader, Function.identity()); + reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed); + } + } else { + reader = lastOpenedReader; + } + success = true; + return reader; + } finally { + if (success == false) { + IOUtils.close(reader); + } + } + } + + @Override + @SuppressWarnings("fallthrough") + @SuppressForbidden( reason = "we manage references explicitly here") + public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException { + store.incRef(); + boolean success = false; + try { + final boolean openReader; + switch (source) { + case "load_seq_no": + case "load_version": + assert false : "this is a read-only engine"; + case "doc_stats": + assert false : "doc_stats are overwritten"; + case "segments": + case "segments_stats": + case "completion_stats": + case "refresh_needed": + openReader = false; + break; + default: + openReader = true; + } + // special case we only want to report segment stats if we have a reader open. in that case we only get a reader if we still + // have one open at the time and can inc it's reference. + DirectoryReader reader = getOrOpenReader(openReader); + if (reader == null) { + store.decRef(); + success = true; + // we just hand out an empty searcher in this case + return super.acquireSearcher(source, scope); + } else { + try { + LazyDirectoryReader lazyDirectoryReader = new LazyDirectoryReader(reader); + FrozenEngineSearcher newSearcher = new FrozenEngineSearcher(source, lazyDirectoryReader, + new IndexSearcher(lazyDirectoryReader), + s -> { + try { + s.getIndexReader().close(); + } finally { + store.decRef(); + } + }, logger); + success = true; + return newSearcher; + } finally { + if (success == false) { + reader.decRef(); // don't call close here we manage reference ourselves + } + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + if (success == false) { + store.decRef(); + } + } + } + + /** + * A Searcher impl that makes it straight forward to release readers after a search phase + */ + final class FrozenEngineSearcher extends Searcher { + + private final LazyDirectoryReader lazyDirectoryReader; + + FrozenEngineSearcher(String source, LazyDirectoryReader lazyDirectoryReader, IndexSearcher searcher, + IOUtils.IOConsumer + onClose, Logger logger) { + super(source, searcher, onClose, logger); + this.lazyDirectoryReader = lazyDirectoryReader; + } + + void releaseReader() throws IOException { + lazyDirectoryReader.release(); + } + + void resetReader() throws IOException { + lazyDirectoryReader.reset(getOrOpenReader(true)); + } + } + + /** + * This class allows us to use the same high level reader across multiple search phases but replace the underpinnings + * on/after each search phase. This is really important otherwise we would hold on to multiple readers across phases. + * + * This reader and it's leave reader counterpart overrides FilterDirectory/LeafReader for convenience to be unwrapped but still + * overrides all it's delegate methods. We have tests to ensure we never miss an override but we need to in order to make sure + * the wrapper leaf readers don't register themself as close listeners on the wrapped ones otherwise we fail plugging in new readers + * on the next search phase. + */ + static final class LazyDirectoryReader extends FilterDirectoryReader { + + private volatile DirectoryReader delegate; // volatile since it might be closed concurrently + + private LazyDirectoryReader(DirectoryReader reader) throws IOException { + super(reader, new SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader reader) { + return new LazyLeafReader(reader); + }; + }); + this.delegate = reader; + } + + @SuppressForbidden(reason = "we manage references explicitly here") + synchronized void release() throws IOException { + if (delegate != null) { // we are lenient here it's ok to double close + delegate.decRef(); + delegate = null; + if (tryIncRef()) { // only do this if we are not closed already + try { + for (LeafReaderContext leaf : leaves()) { + LazyLeafReader reader = (LazyLeafReader) leaf.reader(); + reader.in = null; + } + } finally { + decRef(); + } + } + } + } + + synchronized void reset(DirectoryReader delegate) { + if (this.delegate != null) { + throw new IllegalStateException("lazy reader is not released"); + } + assert (delegate instanceof LazyDirectoryReader) == false : "must not be a LazyDirectoryReader"; + List leaves = delegate.leaves(); + int ord = 0; + for (LeafReaderContext leaf : leaves()) { + LazyLeafReader reader = (LazyLeafReader) leaf.reader(); + LeafReader newReader = leaves.get(ord++).reader(); + assert reader.in == null; + reader.in = newReader; + assert reader.info.info.equals(Lucene.segmentReader(newReader).getSegmentInfo().info); + } + this.delegate = delegate; + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) { + throw new UnsupportedOperationException(); + } + + void ensureOpenOrReset() { + // ensure we fail early and with good exceptions + ensureOpen(); + if (delegate == null) { + throw new AlreadyClosedException("delegate is released"); + } + } + + @Override + public long getVersion() { + ensureOpenOrReset(); + return delegate.getVersion(); + } + + @Override + public boolean isCurrent() throws IOException { + ensureOpenOrReset(); + return delegate.isCurrent(); + } + + @Override + public IndexCommit getIndexCommit() throws IOException { + ensureOpenOrReset(); + return delegate.getIndexCommit(); + } + + @Override + protected void doClose() throws IOException { + release(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + ensureOpenOrReset(); + return delegate.getReaderCacheHelper(); + } + + @Override + public DirectoryReader getDelegate() { + ensureOpenOrReset(); + return delegate; + } + } + + /** + * We basically duplicate a FilterLeafReader here since we don't want the + * incoming reader to register with this reader as a parent reader. This would mean we barf if the incoming + * reader is closed and that is what we actually doing on purpose. + */ + static final class LazyLeafReader extends FilterLeafReader { + + private volatile LeafReader in; + private final SegmentCommitInfo info; + private final int numDocs; + private final int maxDocs; + + private LazyLeafReader(LeafReader in) { + super(Lucene.emptyReader(in.maxDoc())); // empty reader here to make FilterLeafReader happy + this.info = Lucene.segmentReader(in).getSegmentInfo(); + this.in = in; + numDocs = in.numDocs(); + maxDocs = in.maxDoc(); + // don't register in reader as a subreader here. + } + + private void ensureOpenOrReleased() { + ensureOpen(); + if (in == null) { + throw new AlreadyClosedException("leaf is already released"); + } + } + + @Override + public Bits getLiveDocs() { + ensureOpenOrReleased(); + return in.getLiveDocs(); + } + + @Override + public FieldInfos getFieldInfos() { + ensureOpenOrReleased(); + return in.getFieldInfos(); + } + + @Override + public PointValues getPointValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getPointValues(field); + } + + @Override + public Fields getTermVectors(int docID) + throws IOException { + ensureOpenOrReleased(); + return in.getTermVectors(docID); + } + + @Override + public int numDocs() { + return numDocs; + } + + @Override + public int maxDoc() { + return maxDocs; + } + + @Override + public void document(int docID, StoredFieldVisitor visitor) throws IOException { + ensureOpenOrReleased(); + in.document(docID, visitor); + } + + @Override + protected void doClose() throws IOException { + in.close(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + ensureOpenOrReleased(); + return in.getReaderCacheHelper(); + } + + @Override + public CacheHelper getCoreCacheHelper() { + ensureOpenOrReleased(); + return in.getCoreCacheHelper(); + } + + @Override + public Terms terms(String field) throws IOException { + ensureOpenOrReleased(); + return in.terms(field); + } + + @Override + public String toString() { + final StringBuilder buffer = new StringBuilder("LazyLeafReader("); + buffer.append(in); + buffer.append(')'); + return buffer.toString(); + } + + @Override + public NumericDocValues getNumericDocValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getNumericDocValues(field); + } + + @Override + public BinaryDocValues getBinaryDocValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getBinaryDocValues(field); + } + + @Override + public SortedDocValues getSortedDocValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getSortedDocValues(field); + } + + @Override + public SortedNumericDocValues getSortedNumericDocValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getSortedNumericDocValues(field); + } + + @Override + public SortedSetDocValues getSortedSetDocValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getSortedSetDocValues(field); + } + + @Override + public NumericDocValues getNormValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getNormValues(field); + } + + @Override + public LeafMetaData getMetaData() { + ensureOpenOrReleased(); + return in.getMetaData(); + } + + @Override + public void checkIntegrity() throws IOException { + ensureOpenOrReleased(); + in.checkIntegrity(); + } + + @Override + public LeafReader getDelegate() { + return in; + } + } + + // TODO expose this as stats on master + long getOpenedReaders() { + return openedReaders.count(); + } + + synchronized boolean isReaderOpen() { + return lastOpenedReader != null; + } // this is mainly for tests +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java new file mode 100644 index 0000000000000..6237677ed2fd7 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -0,0 +1,251 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.engine; + +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicLong; + +public class FrozenEngineTests extends EngineTestCase { + + public void testAcquireReleaseReset() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + int numDocs = scaledRandomIntBetween(10, 1000); + try (InternalEngine engine = createEngine(config)) { + addDocuments(globalCheckpoint, numDocs, engine); + engine.flushAndClose(); + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + assertFalse(frozenEngine.isReaderOpen()); + FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher + .getDirectoryReader()).shardId()); + assertTrue(frozenEngine.isReaderOpen()); + TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(search.scoreDocs.length, 10); + assertEquals(1, frozenEngine.getOpenedReaders()); + searcher.releaseReader(); + assertFalse(frozenEngine.isReaderOpen()); + assertEquals(1, frozenEngine.getOpenedReaders()); + expectThrows(AlreadyClosedException.class, () -> searcher.searcher().search(new MatchAllDocsQuery(), 10)); + searcher.resetReader(); + assertEquals(2, frozenEngine.getOpenedReaders()); + search = searcher.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(search.scoreDocs.length, 10); + searcher.close(); + } + } + } + } + + public void testAcquireReleaseResetTwoSearchers() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + int numDocs = scaledRandomIntBetween(10, 1000); + try (InternalEngine engine = createEngine(config)) { + addDocuments(globalCheckpoint, numDocs, engine); + engine.flushAndClose(); + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + assertFalse(frozenEngine.isReaderOpen()); + FrozenEngine.FrozenEngineSearcher searcher1 = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + assertTrue(frozenEngine.isReaderOpen()); + TopDocs search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(search.scoreDocs.length, 10); + assertEquals(1, frozenEngine.getOpenedReaders()); + searcher1.releaseReader(); + FrozenEngine.FrozenEngineSearcher searcher2 = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + search = searcher2.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(search.scoreDocs.length, 10); + assertTrue(frozenEngine.isReaderOpen()); + assertEquals(2, frozenEngine.getOpenedReaders()); + expectThrows(AlreadyClosedException.class, () -> searcher1.searcher().search(new MatchAllDocsQuery(), 10)); + searcher1.resetReader(); + assertEquals(2, frozenEngine.getOpenedReaders()); + search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(search.scoreDocs.length, 10); + searcher1.close(); + searcher2.close(); + } + } + } + } + + public void testSegmentStats() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + int numDocs = scaledRandomIntBetween(10, 1000); + try (InternalEngine engine = createEngine(config)) { + addDocuments(globalCheckpoint, numDocs, engine); + engine.flushAndClose(); + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean()); + assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); + searcher.releaseReader(); + assertEquals(1, frozenEngine.getOpenedReaders()); + segmentsStats = frozenEngine.segmentsStats(randomBoolean()); + assertEquals(0, segmentsStats.getCount()); + assertEquals(1, frozenEngine.getOpenedReaders()); + assertFalse(frozenEngine.isReaderOpen()); + searcher.resetReader(); + segmentsStats = frozenEngine.segmentsStats(randomBoolean()); + assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); + searcher.close(); + } + } + } + } + + public void testCircuitBreakerAccounting() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, + new HierarchyCircuitBreakerService(defaultSettings.getSettings(), + new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); + int numDocs = scaledRandomIntBetween(10, 1000); + CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); + long expectedUse; + try (InternalEngine engine = createEngine(config)) { + addDocuments(globalCheckpoint, numDocs, engine); + engine.refresh("test"); // pull the reader + expectedUse = breaker.getUsed(); + engine.flushAndClose(); + } + assertTrue(expectedUse > 0); + assertEquals(0, breaker.getUsed()); + try (FrozenEngine frozenEngine = new FrozenEngine(config)) { + FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher)frozenEngine.acquireSearcher("test"); + assertEquals(expectedUse, breaker.getUsed()); + searcher.releaseReader(); + assertEquals(1, frozenEngine.getOpenedReaders()); + assertEquals(0, breaker.getUsed()); + assertFalse(frozenEngine.isReaderOpen()); + searcher.resetReader(); + assertEquals(expectedUse, breaker.getUsed()); + searcher.close(); + assertEquals(0, breaker.getUsed()); + } + } + } + + private void addDocuments(AtomicLong globalCheckpoint, int numDocs, InternalEngine engine) throws IOException { + for (int i = 0; i < numDocs; i++) { + if (rarely()) { + continue; // gap in sequence number + } + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, + System.nanoTime(), -1, false)); + if (rarely()) { + engine.flush(); + } + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + } + engine.syncTranslog(); + } + + public void testSearchConcurrently() throws IOException, InterruptedException { + // even though we don't want this to be searched concurrently we better make sure we release all resources etc. + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, + new HierarchyCircuitBreakerService(defaultSettings.getSettings(), + new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); + int numDocs = scaledRandomIntBetween(10, 1000); + CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); + try (InternalEngine engine = createEngine(config)) { + addDocuments(globalCheckpoint, numDocs, engine); + engine.flushAndClose(); + int numIters = randomIntBetween(100, 1000); + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + int numThreads = randomIntBetween(2, 4); + Thread[] threads = new Thread[numThreads]; + CyclicBarrier barrier = new CyclicBarrier(numThreads); + CountDownLatch latch = new CountDownLatch(numThreads); + for (int i = 0; i < numThreads; i++) { + threads[i] = new Thread(() -> { + try (FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher) frozenEngine + .acquireSearcher("test")) { + barrier.await(); + searcher.releaseReader(); + for (int j = 0; j < numIters; j++) { + searcher.resetReader(); + assertTrue(frozenEngine.isReaderOpen()); + TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(search.scoreDocs.length, 10); + searcher.releaseReader(); + } + if (randomBoolean()) { + searcher.resetReader(); + } + } catch (Exception e) { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + }); + threads[i].start(); + } + latch.await(); + for (Thread t : threads) { + t.join(); + } + assertFalse(frozenEngine.isReaderOpen()); + assertEquals(0, breaker.getUsed()); + } + } + } + } + + private static void checkOverrideMethods(Class clazz) throws NoSuchMethodException, SecurityException { + final Class superClazz = clazz.getSuperclass(); + for (Method m : superClazz.getMethods()) { + final int mods = m.getModifiers(); + if (Modifier.isStatic(mods) || Modifier.isAbstract(mods) || Modifier.isFinal(mods) || m.isSynthetic() + || m.getName().equals("attributes") || m.getName().equals("getStats")) { + continue; + } + // The point of these checks is to ensure that methods from the super class + // are overwritten to make sure we never miss a method from FilterLeafReader / FilterDirectoryReader + final Method subM = clazz.getMethod(m.getName(), m.getParameterTypes()); + if (subM.getDeclaringClass() == superClazz + && m.getDeclaringClass() != Object.class + && m.getDeclaringClass() == subM.getDeclaringClass()) { + fail(clazz + " doesn't override" + m + " although it has been declared by it's superclass"); + } + } + } + + // here we make sure we catch any change to their super classes FilterLeafReader / FilterDirectoryReader + public void testOverrideMethods() throws Exception { + checkOverrideMethods(FrozenEngine.LazyDirectoryReader.class); + checkOverrideMethods(FrozenEngine.LazyLeafReader.class); + } +} From 5ec4796170556eecd3f9efa7be37e3cd7d2a0543 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 27 Sep 2018 23:03:23 +0200 Subject: [PATCH 02/17] Add a java level freeze/unfreeze API This change adds a high level freeze API that allows to open an index frozen and vice versa. Indices must be closed in order to become frozen and an open but frozen index must be closed to be defrosted. This change also adds a `index.frozen` setting to mark frozen indices and integrates the frozen engine with the `SearchOperationListener` that resets and releases the directory reader after and before search phases. Relates to #34352 Depends on #34357 --- .../OpenIndexClusterStateUpdateRequest.java | 2 +- .../admin/indices/open/OpenIndexResponse.java | 4 +- .../index/engine/FrozenEngine.java | 43 ++++ .../elasticsearch/xpack/core/XPackClient.java | 7 + .../elasticsearch/xpack/core/XPackPlugin.java | 17 ++ .../TransportOpenIndexAndFreezeAction.java | 240 ++++++++++++++++++ .../index/engine/FrozenIndexTests.java | 128 ++++++++++ 7 files changed, 438 insertions(+), 3 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportOpenIndexAndFreezeAction.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexClusterStateUpdateRequest.java index ea3abe5e21a54..4062393167b79 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexClusterStateUpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexClusterStateUpdateRequest.java @@ -28,7 +28,7 @@ public class OpenIndexClusterStateUpdateRequest extends IndicesClusterStateUpdat private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; - OpenIndexClusterStateUpdateRequest() { + public OpenIndexClusterStateUpdateRequest() { } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexResponse.java index 97db91f8973f4..6ff03cb5291ea 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexResponse.java @@ -40,10 +40,10 @@ public class OpenIndexResponse extends ShardsAcknowledgedResponse { declareAcknowledgedAndShardsAcknowledgedFields(PARSER); } - OpenIndexResponse() { + public OpenIndexResponse() { } - OpenIndexResponse(boolean acknowledged, boolean shardsAcknowledged) { + public OpenIndexResponse(boolean acknowledged, boolean shardsAcknowledged) { super(acknowledged, shardsAcknowledged); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 7572061485931..433e592707b59 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -33,7 +33,11 @@ import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.shard.SearchOperationListener; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.transport.TransportRequest; import java.io.IOException; import java.io.UncheckedIOException; @@ -47,6 +51,8 @@ * phase starts. This allows releasing references as soon as possible on the search layer. */ public final class FrozenEngine extends ReadOnlyEngine { + public static final Setting INDEX_FROZEN = Setting.boolSetting("index.frozen", false, Setting.Property.IndexScope, + Setting.Property.PrivateIndex); private final CounterMetric openedReaders = new CounterMetric(); private volatile DirectoryReader lastOpenedReader; @@ -218,6 +224,43 @@ void resetReader() throws IOException { } } + /* + * We register this listener for a frozen index that will + * 1. reset the reader every time the search context is validated which happens when the context is looked up ie. on a fetch phase + * etc. + * 2. register a releasable resource that is cleaned after each phase that releases the reader for this searcher + */ + public static class ReacquireEngineSearcherListener implements SearchOperationListener { + + @Override + public void validateSearchContext(SearchContext context, TransportRequest transportRequest) { + Searcher engineSearcher = context.searcher().getEngineSearcher(); + if (engineSearcher instanceof FrozenEngineSearcher) { + try { + ((FrozenEngineSearcher) engineSearcher).resetReader(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + // also register a release resource in this case if we have multiple roundtrips like in DFS + onNewContext(context); + } + + @Override + public void onNewContext(SearchContext context) { + Searcher engineSearcher = context.searcher().getEngineSearcher(); + context.addReleasable(() -> { + if (engineSearcher instanceof FrozenEngineSearcher) { + try { + ((FrozenEngineSearcher) engineSearcher).releaseReader(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + }, SearchContext.Lifetime.PHASE); + } + } + /** * This class allows us to use the same high level reader across multiple search phases but replace the underpinnings * on/after each search phase. This is really important otherwise we would hold on to multiple readers across phases. diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClient.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClient.java index 3f27f66b27b77..7b86a4aeb811a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClient.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClient.java @@ -6,11 +6,13 @@ package org.elasticsearch.xpack.core; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.license.LicensingClient; import org.elasticsearch.protocol.xpack.XPackInfoRequest; import org.elasticsearch.protocol.xpack.XPackInfoResponse; +import org.elasticsearch.xpack.core.action.TransportOpenIndexAndFreezeAction; import org.elasticsearch.xpack.core.action.XPackInfoAction; import org.elasticsearch.xpack.core.action.XPackInfoRequestBuilder; import org.elasticsearch.xpack.core.ccr.client.CcrClient; @@ -96,4 +98,9 @@ public XPackInfoRequestBuilder prepareInfo() { public void info(XPackInfoRequest request, ActionListener listener) { client.execute(XPackInfoAction.INSTANCE, request, listener); } + + public void openAndFreeze(TransportOpenIndexAndFreezeAction.OpenIndexAndFreezeRequest request, ActionListener + listener) { + client.execute(TransportOpenIndexAndFreezeAction.OpenIndexAndFreezeAction.INSTANCE, request, listener); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index 6430513d9798d..1cd8ea6656548 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -37,8 +37,10 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.FrozenEngine; import org.elasticsearch.license.LicenseService; import org.elasticsearch.license.LicensesMetaData; import org.elasticsearch.license.Licensing; @@ -55,6 +57,7 @@ import org.elasticsearch.snapshots.SourceOnlySnapshotRepository; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.core.action.TransportOpenIndexAndFreezeAction; import org.elasticsearch.xpack.core.action.TransportXPackInfoAction; import org.elasticsearch.xpack.core.action.TransportXPackUsageAction; import org.elasticsearch.xpack.core.action.XPackInfoAction; @@ -266,6 +269,8 @@ public Collection createComponents(Client client, ClusterService cluster List> actions = new ArrayList<>(); actions.add(new ActionHandler<>(XPackInfoAction.INSTANCE, TransportXPackInfoAction.class)); actions.add(new ActionHandler<>(XPackUsageAction.INSTANCE, TransportXPackUsageAction.class)); + actions.add(new ActionHandler<>(TransportOpenIndexAndFreezeAction.OpenIndexAndFreezeAction.INSTANCE, + TransportOpenIndexAndFreezeAction.class)); actions.addAll(licensing.getActions()); return actions; } @@ -359,7 +364,10 @@ public Map getRepositories(Environment env, NamedXCo public Optional getEngineFactory(IndexSettings indexSettings) { if (indexSettings.getValue(SourceOnlySnapshotRepository.SOURCE_ONLY)) { return Optional.of(SourceOnlySnapshotRepository.getEngineFactory()); + } else if (indexSettings.getValue(FrozenEngine.INDEX_FROZEN)) { + return Optional.of(FrozenEngine::new); } + return Optional.empty(); } @@ -367,6 +375,15 @@ public Optional getEngineFactory(IndexSettings indexSettings) { public List> getSettings() { List> settings = super.getSettings(); settings.add(SourceOnlySnapshotRepository.SOURCE_ONLY); + settings.add(FrozenEngine.INDEX_FROZEN); return settings; } + + @Override + public void onIndexModule(IndexModule indexModule) { + if (FrozenEngine.INDEX_FROZEN.get(indexModule.getSettings())) { + indexModule.addSearchOperationListener(new FrozenEngine.ReacquireEngineSearcherListener()); + } + super.onIndexModule(indexModule); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportOpenIndexAndFreezeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportOpenIndexAndFreezeAction.java new file mode 100644 index 0000000000000..35e0811438410 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportOpenIndexAndFreezeAction.java @@ -0,0 +1,240 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.action; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ack.OpenIndexClusterStateUpdateResponse; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.FrozenEngine; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; + +public final class TransportOpenIndexAndFreezeAction extends + TransportMasterNodeAction { + + private final MetaDataIndexStateService indexStateService; + private final DestructiveOperations destructiveOperations; + + @Inject + public TransportOpenIndexAndFreezeAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, MetaDataIndexStateService indexStateService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + DestructiveOperations destructiveOperations) { + super(settings, OpenIndexAndFreezeAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, + OpenIndexAndFreezeRequest::new); + this.indexStateService = indexStateService; + this.destructiveOperations = destructiveOperations; + } + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected void doExecute(Task task, OpenIndexAndFreezeRequest request, ActionListener listener) { + destructiveOperations.failDestructive(request.indices()); + super.doExecute(task, request, listener); + } + + @Override + protected OpenIndexResponse newResponse() { + return new OpenIndexResponse(); + } + + @Override + protected void masterOperation(OpenIndexAndFreezeRequest request, ClusterState state, ActionListener listener) { + final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); + if (concreteIndices == null || concreteIndices.length == 0) { + listener.onResponse(new OpenIndexResponse(true, true)); + return; + } + + clusterService.submitStateUpdateTask("toggle-frozen-settings", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(final ClusterState currentState) throws Exception { + final MetaData.Builder builder = MetaData.builder(currentState.metaData()); + ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + for (Index index : concreteIndices) { + IndexMetaData meta = currentState.metaData().index(index); + if (meta.getState() != IndexMetaData.State.CLOSE) { + throw new IllegalStateException("index [" + index.getName() + "] is not closed"); + } + final IndexMetaData.Builder imdBuilder = IndexMetaData.builder(meta); + final Settings.Builder settingsBuilder = + Settings.builder() + .put(currentState.metaData().index(index).getSettings()) + .put("index.blocks.write", request.freeze()) + .put(FrozenEngine.INDEX_FROZEN.getKey(), request.freeze()) + .put(IndexSettings.INDEX_SEARCH_THROTTLED.getKey(), request.freeze()); + if (request.freeze()) { + blocks.addIndexBlock(index.getName(), IndexMetaData.INDEX_WRITE_BLOCK); + // we never remove this block when unfreeze for now. we don't know if it was read-only + } + imdBuilder.settings(settingsBuilder); + builder.put(imdBuilder.build(), true); + } + return ClusterState.builder(currentState).blocks(blocks).metaData(builder).build(); + } + + @Override + public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { + OpenIndexClusterStateUpdateRequest updateRequest = new OpenIndexClusterStateUpdateRequest() + .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) + .indices(concreteIndices).waitForActiveShards(request.waitForActiveShards()); + indexStateService.openIndex(updateRequest, new ActionListener() { + + @Override + public void onResponse(OpenIndexClusterStateUpdateResponse response) { + listener.onResponse(new OpenIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged())); + } + + @Override + public void onFailure(Exception t) { + logger.debug(() -> new ParameterizedMessage("failed to open indices [{}]", (Object) concreteIndices), t); + listener.onFailure(t); + } + }); + } + + @Override + public void onFailure(final String source, final Exception e) { + listener.onFailure(e); + } + + }); + } + + @Override + protected ClusterBlockException checkBlock(OpenIndexAndFreezeRequest request, ClusterState state) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, + indexNameExpressionResolver.concreteIndexNames(state, request)); + } + + public static class OpenIndexAndFreezeAction extends Action { + + public static final OpenIndexAndFreezeAction INSTANCE = new OpenIndexAndFreezeAction(); + public static final String NAME = "indices:admin/open_and_freeze"; + + private OpenIndexAndFreezeAction() { + super(NAME); + } + + @Override + public OpenIndexResponse newResponse() { + return new OpenIndexResponse(); + } + } + + public static class OpenIndexAndFreezeRequest extends AcknowledgedRequest + implements IndicesRequest.Replaceable { + private OpenIndexRequest openIndexRequest; + private boolean freeze = true; + + public OpenIndexAndFreezeRequest() { + openIndexRequest = new OpenIndexRequest(); + } + + public OpenIndexAndFreezeRequest(String... indices) { + openIndexRequest = new OpenIndexRequest(indices); + } + + @Override + public ActionRequestValidationException validate() { + return openIndexRequest.validate(); + } + + public void setFreeze(boolean freeze) { + this.freeze = freeze; + } + + public boolean freeze() { + return freeze; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + openIndexRequest = new OpenIndexRequest(); + openIndexRequest.readFrom(in); + freeze = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + openIndexRequest.writeTo(out); + out.writeBoolean(freeze); + } + + @Override + public String[] indices() { + return openIndexRequest.indices(); + } + + @Override + public IndicesOptions indicesOptions() { + return openIndexRequest.indicesOptions(); + } + + @Override + public IndicesRequest indices(String... indices) { + openIndexRequest.indices(indices); + return this; + } + + public OpenIndexAndFreezeRequest indicesOptions(IndicesOptions indicesOptions) { + openIndexRequest.indicesOptions(indicesOptions); + return this; + } + + public ActiveShardCount waitForActiveShards() { + return openIndexRequest.waitForActiveShards(); + } + + public OpenIndexAndFreezeRequest waitForActiveShards(ActiveShardCount waitForActiveShards) { + openIndexRequest.waitForActiveShards(waitForActiveShards); + return this; + } + + public OpenIndexAndFreezeRequest waitForActiveShards(int waitForActiveShards) { + openIndexRequest.waitForActiveShards(waitForActiveShards); + return this; + } + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java new file mode 100644 index 0000000000000..51647361fec87 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -0,0 +1,128 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.engine; + +import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.core.XPackClient; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.action.TransportOpenIndexAndFreezeAction; +import org.hamcrest.Matchers; + +import java.util.Collection; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; + +public class FrozenIndexTests extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return pluginList(XPackPlugin.class); + } + + public void testCloseFreezeAndOpen() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).build()); + client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().prepareIndex("index", "type", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().prepareIndex("index", "type", "3").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + client().admin().indices().prepareFlush("index").get(); + client().admin().indices().prepareClose("index").get(); + XPackClient xPackClient = new XPackClient(client()); + PlainActionFuture future = new PlainActionFuture<>(); + xPackClient.openAndFreeze(new TransportOpenIndexAndFreezeAction.OpenIndexAndFreezeRequest("index"), future); + future.get(); + expectThrows(ClusterBlockException.class, () -> client().prepareIndex("index", "type", "4").setSource("field", "value") + .setRefreshPolicy(IMMEDIATE).get()); + IndicesService indexServices = getInstanceFromNode(IndicesService.class); + Index index = resolveIndex("index"); + IndexService indexService = indexServices.indexServiceSafe(index); + IndexShard shard = indexService.getShard(0); + Engine engine = IndexShardTestCase.getEngine(shard); + assertEquals(0, ((FrozenEngine)engine).getOpenedReaders()); + boolean useDFS = randomBoolean(); + assertHitCount(client().prepareSearch().setSearchType(useDFS ? SearchType.DFS_QUERY_THEN_FETCH + : SearchType.QUERY_THEN_FETCH).get(), 3); + assertThat(engine, Matchers.instanceOf(FrozenEngine.class)); + assertEquals(useDFS ? 3 : 2, ((FrozenEngine)engine).getOpenedReaders()); + assertFalse(((FrozenEngine)engine).isReaderOpen()); + assertTrue(indexService.getIndexSettings().isSearchThrottled()); + try (Engine.Searcher searcher = shard.acquireSearcher("test")) { + assertThat(searcher, Matchers.instanceOf(FrozenEngine.FrozenEngineSearcher.class)); + } + // now scroll + SearchResponse searchResponse = client().prepareSearch().setScroll(TimeValue.timeValueMinutes(1)).setSize(1).get(); + do { + assertHitCount(searchResponse, 3); + assertEquals(1, searchResponse.getHits().getHits().length); + SearchService searchService = getInstanceFromNode(SearchService.class); + assertThat(searchService.getActiveContexts(), Matchers.greaterThanOrEqualTo(1)); + for (int i = 0; i < 2; i++) { + shard = indexService.getShard(i); + engine = IndexShardTestCase.getEngine(shard); + assertFalse(((FrozenEngine) engine).isReaderOpen()); + } + searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get(); + } while (searchResponse.getHits().getHits().length > 0); + + } + + public void testFreezeAndUnfreeze() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).build()); + client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().prepareIndex("index", "type", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().prepareIndex("index", "type", "3").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + client().admin().indices().prepareFlush("index").get(); + client().admin().indices().prepareClose("index").get(); + XPackClient xPackClient = new XPackClient(client()); + PlainActionFuture future = new PlainActionFuture<>(); + TransportOpenIndexAndFreezeAction.OpenIndexAndFreezeRequest request = + new TransportOpenIndexAndFreezeAction.OpenIndexAndFreezeRequest("index"); + xPackClient.openAndFreeze(request, future); + future.get(); + { + IndicesService indexServices = getInstanceFromNode(IndicesService.class); + Index index = resolveIndex("index"); + IndexService indexService = indexServices.indexServiceSafe(index); + assertTrue(indexService.getIndexSettings().isSearchThrottled()); + IndexShard shard = indexService.getShard(0); + Engine engine = IndexShardTestCase.getEngine(shard); + assertEquals(0, ((FrozenEngine) engine).getOpenedReaders()); + client().admin().indices().prepareClose("index").get(); + } + request.setFreeze(false); + PlainActionFuture future1= new PlainActionFuture<>(); + xPackClient.openAndFreeze(request, future1); + future1.get(); + client().admin().indices().prepareUpdateSettings("index").setSettings(Settings.builder().put("index.blocks.write", false)).get(); + { + IndicesService indexServices = getInstanceFromNode(IndicesService.class); + Index index = resolveIndex("index"); + IndexService indexService = indexServices.indexServiceSafe(index); + assertFalse(indexService.getIndexSettings().isSearchThrottled()); + IndexShard shard = indexService.getShard(0); + Engine engine = IndexShardTestCase.getEngine(shard); + assertThat(engine, Matchers.instanceOf(InternalEngine.class)); + } + client().prepareIndex("index", "type", "4").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + } +} From 900917c8edf07fe8d3e4b684a5392dd0a844569e Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 16 Oct 2018 16:37:46 +0200 Subject: [PATCH 03/17] remove frozen index searcher since Engine.Searcher is now final --- .../index/engine/FrozenEngine.java | 46 +++++++------------ .../index/engine/FrozenEngineTests.java | 37 ++++++++------- 2 files changed, 35 insertions(+), 48 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 7572061485931..0f3698c965af2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -169,15 +169,8 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin } else { try { LazyDirectoryReader lazyDirectoryReader = new LazyDirectoryReader(reader); - FrozenEngineSearcher newSearcher = new FrozenEngineSearcher(source, lazyDirectoryReader, - new IndexSearcher(lazyDirectoryReader), - s -> { - try { - s.getIndexReader().close(); - } finally { - store.decRef(); - } - }, logger); + Searcher newSearcher = new Searcher(source, new IndexSearcher(lazyDirectoryReader), + () -> IOUtils.close(lazyDirectoryReader, store::decRef)); success = true; return newSearcher; } finally { @@ -195,27 +188,22 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin } } - /** - * A Searcher impl that makes it straight forward to release readers after a search phase - */ - final class FrozenEngineSearcher extends Searcher { - - private final LazyDirectoryReader lazyDirectoryReader; - - FrozenEngineSearcher(String source, LazyDirectoryReader lazyDirectoryReader, IndexSearcher searcher, - IOUtils.IOConsumer - onClose, Logger logger) { - super(source, searcher, onClose, logger); - this.lazyDirectoryReader = lazyDirectoryReader; - } + void release(LazyDirectoryReader reader) throws IOException { + reader.release(); + } - void releaseReader() throws IOException { - lazyDirectoryReader.release(); - } + void reset(LazyDirectoryReader reader) throws IOException { + reader.reset(getOrOpenReader(true)); + } - void resetReader() throws IOException { - lazyDirectoryReader.reset(getOrOpenReader(true)); + static LazyDirectoryReader unwrapLazyReader(DirectoryReader reader) { + while (reader instanceof FilterDirectoryReader) { + if (reader instanceof LazyDirectoryReader) { + return (LazyDirectoryReader) reader; + } + reader = ((FilterDirectoryReader) reader).getDelegate(); } + return null; } /** @@ -242,7 +230,7 @@ public LeafReader wrap(LeafReader reader) { } @SuppressForbidden(reason = "we manage references explicitly here") - synchronized void release() throws IOException { + private synchronized void release() throws IOException { if (delegate != null) { // we are lenient here it's ok to double close delegate.decRef(); delegate = null; @@ -259,7 +247,7 @@ synchronized void release() throws IOException { } } - synchronized void reset(DirectoryReader delegate) { + private synchronized void reset(DirectoryReader delegate) { if (this.delegate != null) { throw new IllegalStateException("lazy reader is not released"); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index 6237677ed2fd7..32628e3005708 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -38,18 +38,18 @@ public void testAcquireReleaseReset() throws IOException { engine.flushAndClose(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { assertFalse(frozenEngine.isReaderOpen()); - FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher .getDirectoryReader()).shardId()); assertTrue(frozenEngine.isReaderOpen()); TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); assertEquals(1, frozenEngine.getOpenedReaders()); - searcher.releaseReader(); + frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); assertFalse(frozenEngine.isReaderOpen()); assertEquals(1, frozenEngine.getOpenedReaders()); expectThrows(AlreadyClosedException.class, () -> searcher.searcher().search(new MatchAllDocsQuery(), 10)); - searcher.resetReader(); + frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); assertEquals(2, frozenEngine.getOpenedReaders()); search = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); @@ -70,19 +70,19 @@ public void testAcquireReleaseResetTwoSearchers() throws IOException { engine.flushAndClose(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { assertFalse(frozenEngine.isReaderOpen()); - FrozenEngine.FrozenEngineSearcher searcher1 = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + Engine.Searcher searcher1 = frozenEngine.acquireSearcher("test"); assertTrue(frozenEngine.isReaderOpen()); TopDocs search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); assertEquals(1, frozenEngine.getOpenedReaders()); - searcher1.releaseReader(); - FrozenEngine.FrozenEngineSearcher searcher2 = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader())); + Engine.Searcher searcher2 = frozenEngine.acquireSearcher("test"); search = searcher2.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); assertTrue(frozenEngine.isReaderOpen()); assertEquals(2, frozenEngine.getOpenedReaders()); expectThrows(AlreadyClosedException.class, () -> searcher1.searcher().search(new MatchAllDocsQuery(), 10)); - searcher1.resetReader(); + frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader())); assertEquals(2, frozenEngine.getOpenedReaders()); search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); @@ -103,16 +103,16 @@ public void testSegmentStats() throws IOException { addDocuments(globalCheckpoint, numDocs, engine); engine.flushAndClose(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { - FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); - searcher.releaseReader(); + frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); assertEquals(1, frozenEngine.getOpenedReaders()); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(0, segmentsStats.getCount()); assertEquals(1, frozenEngine.getOpenedReaders()); assertFalse(frozenEngine.isReaderOpen()); - searcher.resetReader(); + frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); searcher.close(); @@ -140,13 +140,13 @@ public void testCircuitBreakerAccounting() throws IOException { assertTrue(expectedUse > 0); assertEquals(0, breaker.getUsed()); try (FrozenEngine frozenEngine = new FrozenEngine(config)) { - FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher)frozenEngine.acquireSearcher("test"); + Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); assertEquals(expectedUse, breaker.getUsed()); - searcher.releaseReader(); + frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); assertEquals(1, frozenEngine.getOpenedReaders()); assertEquals(0, breaker.getUsed()); assertFalse(frozenEngine.isReaderOpen()); - searcher.resetReader(); + frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); assertEquals(expectedUse, breaker.getUsed()); searcher.close(); assertEquals(0, breaker.getUsed()); @@ -191,19 +191,18 @@ public void testSearchConcurrently() throws IOException, InterruptedException { CountDownLatch latch = new CountDownLatch(numThreads); for (int i = 0; i < numThreads; i++) { threads[i] = new Thread(() -> { - try (FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher) frozenEngine - .acquireSearcher("test")) { + try (Engine.Searcher searcher = frozenEngine.acquireSearcher("test")) { barrier.await(); - searcher.releaseReader(); + frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); for (int j = 0; j < numIters; j++) { - searcher.resetReader(); + frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); assertTrue(frozenEngine.isReaderOpen()); TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); - searcher.releaseReader(); + frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); } if (randomBoolean()) { - searcher.resetReader(); + frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); } } catch (Exception e) { throw new AssertionError(e); From 8324b72a7bb49c62feb3e8275f969b802a162928 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 16 Oct 2018 16:49:08 +0200 Subject: [PATCH 04/17] simplify reset/release --- .../index/engine/FrozenEngine.java | 21 +++++++--------- .../index/engine/FrozenEngineTests.java | 24 +++++++++---------- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 0f3698c965af2..d04c5d6ae9c46 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.index.engine; -import org.apache.logging.log4j.Logger; import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FieldInfos; @@ -168,7 +167,7 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin return super.acquireSearcher(source, scope); } else { try { - LazyDirectoryReader lazyDirectoryReader = new LazyDirectoryReader(reader); + LazyDirectoryReader lazyDirectoryReader = new LazyDirectoryReader(reader, this); Searcher newSearcher = new Searcher(source, new IndexSearcher(lazyDirectoryReader), () -> IOUtils.close(lazyDirectoryReader, store::decRef)); success = true; @@ -188,14 +187,6 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin } } - void release(LazyDirectoryReader reader) throws IOException { - reader.release(); - } - - void reset(LazyDirectoryReader reader) throws IOException { - reader.reset(getOrOpenReader(true)); - } - static LazyDirectoryReader unwrapLazyReader(DirectoryReader reader) { while (reader instanceof FilterDirectoryReader) { if (reader instanceof LazyDirectoryReader) { @@ -217,9 +208,10 @@ static LazyDirectoryReader unwrapLazyReader(DirectoryReader reader) { */ static final class LazyDirectoryReader extends FilterDirectoryReader { + private final FrozenEngine engine; private volatile DirectoryReader delegate; // volatile since it might be closed concurrently - private LazyDirectoryReader(DirectoryReader reader) throws IOException { + private LazyDirectoryReader(DirectoryReader reader, FrozenEngine engine) throws IOException { super(reader, new SubReaderWrapper() { @Override public LeafReader wrap(LeafReader reader) { @@ -227,10 +219,11 @@ public LeafReader wrap(LeafReader reader) { }; }); this.delegate = reader; + this.engine = engine; } @SuppressForbidden(reason = "we manage references explicitly here") - private synchronized void release() throws IOException { + synchronized void release() throws IOException { if (delegate != null) { // we are lenient here it's ok to double close delegate.decRef(); delegate = null; @@ -247,6 +240,10 @@ private synchronized void release() throws IOException { } } + void reset() throws IOException { + reset(engine.getOrOpenReader(true)); + } + private synchronized void reset(DirectoryReader delegate) { if (this.delegate != null) { throw new IllegalStateException("lazy reader is not released"); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index 32628e3005708..94ba1d83344dc 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -45,11 +45,11 @@ public void testAcquireReleaseReset() throws IOException { TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); assertEquals(1, frozenEngine.getOpenedReaders()); - frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); assertFalse(frozenEngine.isReaderOpen()); assertEquals(1, frozenEngine.getOpenedReaders()); expectThrows(AlreadyClosedException.class, () -> searcher.searcher().search(new MatchAllDocsQuery(), 10)); - frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); assertEquals(2, frozenEngine.getOpenedReaders()); search = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); @@ -75,14 +75,14 @@ public void testAcquireReleaseResetTwoSearchers() throws IOException { TopDocs search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); assertEquals(1, frozenEngine.getOpenedReaders()); - frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).release(); Engine.Searcher searcher2 = frozenEngine.acquireSearcher("test"); search = searcher2.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); assertTrue(frozenEngine.isReaderOpen()); assertEquals(2, frozenEngine.getOpenedReaders()); expectThrows(AlreadyClosedException.class, () -> searcher1.searcher().search(new MatchAllDocsQuery(), 10)); - frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).reset(); assertEquals(2, frozenEngine.getOpenedReaders()); search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); @@ -106,13 +106,13 @@ public void testSegmentStats() throws IOException { Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); - frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); assertEquals(1, frozenEngine.getOpenedReaders()); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(0, segmentsStats.getCount()); assertEquals(1, frozenEngine.getOpenedReaders()); assertFalse(frozenEngine.isReaderOpen()); - frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); searcher.close(); @@ -142,11 +142,11 @@ public void testCircuitBreakerAccounting() throws IOException { try (FrozenEngine frozenEngine = new FrozenEngine(config)) { Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); assertEquals(expectedUse, breaker.getUsed()); - frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); assertEquals(1, frozenEngine.getOpenedReaders()); assertEquals(0, breaker.getUsed()); assertFalse(frozenEngine.isReaderOpen()); - frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); assertEquals(expectedUse, breaker.getUsed()); searcher.close(); assertEquals(0, breaker.getUsed()); @@ -193,16 +193,16 @@ public void testSearchConcurrently() throws IOException, InterruptedException { threads[i] = new Thread(() -> { try (Engine.Searcher searcher = frozenEngine.acquireSearcher("test")) { barrier.await(); - frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); for (int j = 0; j < numIters; j++) { - frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); assertTrue(frozenEngine.isReaderOpen()); TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); - frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); } if (randomBoolean()) { - frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); } } catch (Exception e) { throw new AssertionError(e); From 85d99e6b3f7d741b1ef9565bc8baf74c883da08b Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 16 Oct 2018 16:52:20 +0200 Subject: [PATCH 05/17] adopt to changed reset / release methods --- .../org/elasticsearch/index/engine/FrozenEngine.java | 10 ++++++---- .../elasticsearch/index/engine/FrozenIndexTests.java | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 18690d32ea152..1930eb5c54bdb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -214,9 +214,10 @@ public static class ReacquireEngineSearcherListener implements SearchOperationLi @Override public void validateSearchContext(SearchContext context, TransportRequest transportRequest) { Searcher engineSearcher = context.searcher().getEngineSearcher(); - if (engineSearcher instanceof FrozenEngineSearcher) { + LazyDirectoryReader lazyDirectoryReader = unwrapLazyReader(engineSearcher.getDirectoryReader()); + if (lazyDirectoryReader != null) { try { - ((FrozenEngineSearcher) engineSearcher).resetReader(); + lazyDirectoryReader.reset(); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -229,9 +230,10 @@ public void validateSearchContext(SearchContext context, TransportRequest transp public void onNewContext(SearchContext context) { Searcher engineSearcher = context.searcher().getEngineSearcher(); context.addReleasable(() -> { - if (engineSearcher instanceof FrozenEngineSearcher) { + LazyDirectoryReader lazyDirectoryReader = unwrapLazyReader(engineSearcher.getDirectoryReader()); + if (lazyDirectoryReader != null) { try { - ((FrozenEngineSearcher) engineSearcher).releaseReader(); + lazyDirectoryReader.release(); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java index 51647361fec87..ec64a15a08a67 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -66,7 +66,7 @@ public void testCloseFreezeAndOpen() throws ExecutionException, InterruptedExcep assertFalse(((FrozenEngine)engine).isReaderOpen()); assertTrue(indexService.getIndexSettings().isSearchThrottled()); try (Engine.Searcher searcher = shard.acquireSearcher("test")) { - assertThat(searcher, Matchers.instanceOf(FrozenEngine.FrozenEngineSearcher.class)); + assertNotNull(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); } // now scroll SearchResponse searchResponse = client().prepareSearch().setScroll(TimeValue.timeValueMinutes(1)).setSize(1).get(); From a35e79f635771fa49f0a4bac7251db5ef3822ef2 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 16 Oct 2018 20:31:38 +0200 Subject: [PATCH 06/17] fix imports --- server/src/main/java/org/elasticsearch/index/engine/Engine.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index e2056dbd5b297..89545af641c28 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -65,7 +65,6 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; From 5981a206d6f8b61ca55915d2945e23ccaa9c3114 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 17 Oct 2018 14:21:08 +0200 Subject: [PATCH 07/17] apply feedback --- .../core/internal/io/IOUtils.java | 13 ---- .../index/engine/FrozenEngine.java | 4 +- .../index/engine/FrozenEngineTests.java | 67 +++++++++---------- 3 files changed, 35 insertions(+), 49 deletions(-) diff --git a/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java b/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java index 8796eff787dd0..46d19d2a814fe 100644 --- a/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java +++ b/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java @@ -277,17 +277,4 @@ public static void fsync(final Path fileToSync, final boolean isDir) throws IOEx throw ioe; } } - - /** - * An IO operation with a single input. - * @see java.util.function.Consumer - */ - @FunctionalInterface - public interface IOConsumer { - /** - * Performs this operation on the given argument. - */ - void accept(T input) throws IOException; - } - } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index d04c5d6ae9c46..7714abc3e281a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -201,7 +201,7 @@ static LazyDirectoryReader unwrapLazyReader(DirectoryReader reader) { * This class allows us to use the same high level reader across multiple search phases but replace the underpinnings * on/after each search phase. This is really important otherwise we would hold on to multiple readers across phases. * - * This reader and it's leave reader counterpart overrides FilterDirectory/LeafReader for convenience to be unwrapped but still + * This reader and its leave reader counterpart overrides FilterDirectory/LeafReader for convenience to be unwrapped but still * overrides all it's delegate methods. We have tests to ensure we never miss an override but we need to in order to make sure * the wrapper leaf readers don't register themself as close listeners on the wrapped ones otherwise we fail plugging in new readers * on the next search phase. @@ -465,7 +465,7 @@ public LeafReader getDelegate() { } // TODO expose this as stats on master - long getOpenedReaders() { + long getTotalOpenedReaders() { return openedReaders.count(); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index 94ba1d83344dc..f65109ec450d8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -32,9 +32,8 @@ public void testAcquireReleaseReset() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); - int numDocs = scaledRandomIntBetween(10, 1000); try (InternalEngine engine = createEngine(config)) { - addDocuments(globalCheckpoint, numDocs, engine); + int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine)); engine.flushAndClose(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { assertFalse(frozenEngine.isReaderOpen()); @@ -42,17 +41,17 @@ public void testAcquireReleaseReset() throws IOException { assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher .getDirectoryReader()).shardId()); assertTrue(frozenEngine.isReaderOpen()); - TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); - assertEquals(search.scoreDocs.length, 10); - assertEquals(1, frozenEngine.getOpenedReaders()); + TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); + assertEquals(1, frozenEngine.getTotalOpenedReaders()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); assertFalse(frozenEngine.isReaderOpen()); - assertEquals(1, frozenEngine.getOpenedReaders()); - expectThrows(AlreadyClosedException.class, () -> searcher.searcher().search(new MatchAllDocsQuery(), 10)); + assertEquals(1, frozenEngine.getTotalOpenedReaders()); + expectThrows(AlreadyClosedException.class, () -> searcher.searcher().search(new MatchAllDocsQuery(), numDocs)); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); - assertEquals(2, frozenEngine.getOpenedReaders()); - search = searcher.searcher().search(new MatchAllDocsQuery(), 10); - assertEquals(search.scoreDocs.length, 10); + assertEquals(2, frozenEngine.getTotalOpenedReaders()); + search = searcher.searcher().search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); searcher.close(); } } @@ -64,28 +63,27 @@ public void testAcquireReleaseResetTwoSearchers() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); - int numDocs = scaledRandomIntBetween(10, 1000); try (InternalEngine engine = createEngine(config)) { - addDocuments(globalCheckpoint, numDocs, engine); + int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine)); engine.flushAndClose(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { assertFalse(frozenEngine.isReaderOpen()); Engine.Searcher searcher1 = frozenEngine.acquireSearcher("test"); assertTrue(frozenEngine.isReaderOpen()); - TopDocs search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); - assertEquals(search.scoreDocs.length, 10); - assertEquals(1, frozenEngine.getOpenedReaders()); + TopDocs search = searcher1.searcher().search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); + assertEquals(1, frozenEngine.getTotalOpenedReaders()); FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).release(); Engine.Searcher searcher2 = frozenEngine.acquireSearcher("test"); - search = searcher2.searcher().search(new MatchAllDocsQuery(), 10); - assertEquals(search.scoreDocs.length, 10); + search = searcher2.searcher().search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); assertTrue(frozenEngine.isReaderOpen()); - assertEquals(2, frozenEngine.getOpenedReaders()); - expectThrows(AlreadyClosedException.class, () -> searcher1.searcher().search(new MatchAllDocsQuery(), 10)); + assertEquals(2, frozenEngine.getTotalOpenedReaders()); + expectThrows(AlreadyClosedException.class, () -> searcher1.searcher().search(new MatchAllDocsQuery(), numDocs)); FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).reset(); - assertEquals(2, frozenEngine.getOpenedReaders()); - search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); - assertEquals(search.scoreDocs.length, 10); + assertEquals(2, frozenEngine.getTotalOpenedReaders()); + search = searcher1.searcher().search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); searcher1.close(); searcher2.close(); } @@ -98,19 +96,18 @@ public void testSegmentStats() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); - int numDocs = scaledRandomIntBetween(10, 1000); try (InternalEngine engine = createEngine(config)) { - addDocuments(globalCheckpoint, numDocs, engine); + addDocuments(globalCheckpoint, engine); engine.flushAndClose(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); - assertEquals(1, frozenEngine.getOpenedReaders()); + assertEquals(1, frozenEngine.getTotalOpenedReaders()); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(0, segmentsStats.getCount()); - assertEquals(1, frozenEngine.getOpenedReaders()); + assertEquals(1, frozenEngine.getTotalOpenedReaders()); assertFalse(frozenEngine.isReaderOpen()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); @@ -128,11 +125,10 @@ public void testCircuitBreakerAccounting() throws IOException { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(), new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); - int numDocs = scaledRandomIntBetween(10, 1000); CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); long expectedUse; try (InternalEngine engine = createEngine(config)) { - addDocuments(globalCheckpoint, numDocs, engine); + addDocuments(globalCheckpoint, engine); engine.refresh("test"); // pull the reader expectedUse = breaker.getUsed(); engine.flushAndClose(); @@ -143,7 +139,7 @@ public void testCircuitBreakerAccounting() throws IOException { Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); assertEquals(expectedUse, breaker.getUsed()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); - assertEquals(1, frozenEngine.getOpenedReaders()); + assertEquals(1, frozenEngine.getTotalOpenedReaders()); assertEquals(0, breaker.getUsed()); assertFalse(frozenEngine.isReaderOpen()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); @@ -154,11 +150,14 @@ public void testCircuitBreakerAccounting() throws IOException { } } - private void addDocuments(AtomicLong globalCheckpoint, int numDocs, InternalEngine engine) throws IOException { + private int addDocuments(AtomicLong globalCheckpoint, InternalEngine engine) throws IOException { + int numDocs = scaledRandomIntBetween(10, 1000); + int numDocsAdded = 0; for (int i = 0; i < numDocs; i++) { if (rarely()) { continue; // gap in sequence number } + numDocsAdded++; ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, System.nanoTime(), -1, false)); @@ -168,6 +167,7 @@ private void addDocuments(AtomicLong globalCheckpoint, int numDocs, InternalEngi globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); } engine.syncTranslog(); + return numDocsAdded; } public void testSearchConcurrently() throws IOException, InterruptedException { @@ -178,10 +178,9 @@ public void testSearchConcurrently() throws IOException, InterruptedException { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(), new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); - int numDocs = scaledRandomIntBetween(10, 1000); CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); try (InternalEngine engine = createEngine(config)) { - addDocuments(globalCheckpoint, numDocs, engine); + int numDocsAdded = addDocuments(globalCheckpoint, engine); engine.flushAndClose(); int numIters = randomIntBetween(100, 1000); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { @@ -197,8 +196,8 @@ public void testSearchConcurrently() throws IOException, InterruptedException { for (int j = 0; j < numIters; j++) { FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); assertTrue(frozenEngine.isReaderOpen()); - TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); - assertEquals(search.scoreDocs.length, 10); + TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), Math.min(10, numDocsAdded)); + assertEquals(search.scoreDocs.length, Math.min(10, numDocsAdded)); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); } if (randomBoolean()) { From af08d6871a84c2bd1308da82f7a2be35e6ca80d8 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 5 Nov 2018 16:48:14 +0100 Subject: [PATCH 08/17] Fix compilation --- .../org/elasticsearch/index/engine/InternalEngineTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 379043fa93954..a0dd8191b6ed6 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5057,15 +5057,15 @@ public void testLuceneSnapshotRefreshesOnlyOnce() throws Exception { null, new ReferenceManager.RefreshListener() { @Override - public void beforeRefresh() throws IOException { + public void beforeRefresh() { refreshCounter.incrementAndGet(); } @Override - public void afterRefresh(boolean didRefresh) throws IOException { + public void afterRefresh(boolean didRefresh) { } - }, null, () -> SequenceNumbers.NO_OPS_PERFORMED))) { + }, null, () -> SequenceNumbers.NO_OPS_PERFORMED, new NoneCircuitBreakerService()))) { for (long seqNo = 0; seqNo <= maxSeqNo; seqNo++) { final ParsedDocument doc = testParsedDocument("id_" + seqNo, null, testDocumentWithTextField("test"), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); From 0a74147657b7a0d608485c5f56e3484bdefd772f Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 6 Nov 2018 20:44:18 +0100 Subject: [PATCH 09/17] fix compilation --- .../org/elasticsearch/index/engine/FrozenEngineTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index f65109ec450d8..a10fdf27df22d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -122,7 +122,7 @@ public void testCircuitBreakerAccounting() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, null, globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(), new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); @@ -175,7 +175,7 @@ public void testSearchConcurrently() throws IOException, InterruptedException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, null, globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(), new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); From 39412072578f89e4052e38b71010b82ce2a23893 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 6 Nov 2018 21:15:58 +0100 Subject: [PATCH 10/17] apply review comments from @bleskes --- .../index/engine/ReadOnlyEngine.java | 8 +- .../index/engine/FrozenEngine.java | 77 +++++++++++++------ .../index/engine/FrozenEngineTests.java | 3 - 3 files changed, 59 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index b6443187de902..fc4b0632c8076 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -98,10 +98,10 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory); this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats; this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats; - reader = open(directory); + this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory); + reader = open(indexCommit); reader = wrapReader(reader, readerWrapperFunction); searcherManager = new SearcherManager(reader, searcherFactory); - this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory); this.docsStats = docsStats(lastCommittedSegmentInfos); this.indexWriterLock = indexWriterLock; success = true; @@ -124,8 +124,8 @@ protected final DirectoryReader wrapReader(DirectoryReader reader, return readerWrapperFunction.apply(reader); } - protected DirectoryReader open(final Directory directory) throws IOException { - return DirectoryReader.open(directory); + protected DirectoryReader open(IndexCommit commit) throws IOException { + return DirectoryReader.open(commit); } private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 7714abc3e281a..6bb9d735195bb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -54,12 +54,11 @@ public FrozenEngine(EngineConfig config) { } @Override - protected DirectoryReader open(Directory directory) throws IOException { - // we fake an empty directly reader for the ReadOnlyEngine. this reader is only used + protected DirectoryReader open(IndexCommit indexCommit) throws IOException { + // we fake an empty DirectoryReader for the ReadOnlyEngine. this reader is only used // to initialize the reference manager and to make the refresh call happy which is essentially // a no-op now - IndexCommit indexCommit = Lucene.getIndexCommit(getLastCommittedSegmentInfos(), directory); - return new DirectoryReader(directory, new LeafReader[0]) { + return new DirectoryReader(indexCommit.getDirectory(), new LeafReader[0]) { @Override protected DirectoryReader doOpenIfChanged() { return null; @@ -103,6 +102,12 @@ public CacheHelper getReaderCacheHelper() { @SuppressForbidden(reason = "we manage references explicitly here") private synchronized void onReaderClosed(IndexReader.CacheKey key) { + // it might look awkward that we have to check here if the keys match but if we concurrently + // access the lastOpenedReader there might be 2 threads competing for the cached reference in + // a way that thread 1 counts down the lastOpenedReader reference and before thread 1 can execute + // the close listener we already open and assign a new reader to lastOpenedReader. In this case + // the cache key doesn't match and we just ignore it since we use this method only to null out the + // lastOpenedReader member to ensure resources can be GCed if (lastOpenedReader != null && key == lastOpenedReader.getReaderCacheHelper().getKey()) { assert lastOpenedReader.getRefCount() == 0; lastOpenedReader = null; @@ -110,19 +115,32 @@ private synchronized void onReaderClosed(IndexReader.CacheKey key) { } @SuppressForbidden(reason = "we manage references explicitly here") - private synchronized DirectoryReader getOrOpenReader(boolean doOpen) throws IOException { + private synchronized DirectoryReader getOrOpenReader() throws IOException { DirectoryReader reader = null; boolean success = false; try { - if (lastOpenedReader == null || lastOpenedReader.tryIncRef() == false) { - if (doOpen) { - reader = DirectoryReader.open(engineConfig.getStore().directory()); - searcherFactory.processReaders(reader, null); - openedReaders.inc(); - reader = lastOpenedReader = wrapReader(reader, Function.identity()); - reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed); - } - } else { + reader = getReader(); + if (reader == null) { + reader = DirectoryReader.open(engineConfig.getStore().directory()); + searcherFactory.processReaders(reader, null); + openedReaders.inc(); + reader = lastOpenedReader = wrapReader(reader, Function.identity()); + reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed); + } + success = true; + return reader; + } finally { + if (success == false) { + IOUtils.close(reader); + } + } + } + + private synchronized DirectoryReader getReader() throws IOException { + DirectoryReader reader = null; + boolean success = false; + try { + if (lastOpenedReader != null && lastOpenedReader.tryIncRef()) { reader = lastOpenedReader; } success = true; @@ -141,7 +159,7 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin store.incRef(); boolean success = false; try { - final boolean openReader; + final boolean maybeOpenReader; switch (source) { case "load_seq_no": case "load_version": @@ -152,18 +170,20 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin case "segments_stats": case "completion_stats": case "refresh_needed": - openReader = false; + maybeOpenReader = false; break; default: - openReader = true; + maybeOpenReader = true; } // special case we only want to report segment stats if we have a reader open. in that case we only get a reader if we still // have one open at the time and can inc it's reference. - DirectoryReader reader = getOrOpenReader(openReader); + DirectoryReader reader = maybeOpenReader ? getOrOpenReader() : getReader(); if (reader == null) { store.decRef(); success = true; - // we just hand out an empty searcher in this case + // we just hand out a searcher on top of an empty reader that we opened for the ReadOnlyEngine in the #open(IndexCommit) + // method. this is the case when we don't have a reader open right now and we get a stats call any other that falls in + // the category that doesn't trigger a reopen return super.acquireSearcher(source, scope); } else { try { @@ -201,7 +221,7 @@ static LazyDirectoryReader unwrapLazyReader(DirectoryReader reader) { * This class allows us to use the same high level reader across multiple search phases but replace the underpinnings * on/after each search phase. This is really important otherwise we would hold on to multiple readers across phases. * - * This reader and its leave reader counterpart overrides FilterDirectory/LeafReader for convenience to be unwrapped but still + * This reader and its leaf reader counterpart overrides FilterDirectory/LeafReader for convenience to be unwrapped but still * overrides all it's delegate methods. We have tests to ensure we never miss an override but we need to in order to make sure * the wrapper leaf readers don't register themself as close listeners on the wrapped ones otherwise we fail plugging in new readers * on the next search phase. @@ -228,6 +248,10 @@ synchronized void release() throws IOException { delegate.decRef(); delegate = null; if (tryIncRef()) { // only do this if we are not closed already + // we end up in this case when we are not closed but in an intermediate + // state were we want to release all or the real leaf readers ie. in between search phases + // but still want to keep this Lazy reference open. In oder to let the heavy real leaf + // readers to be GCed we need to null our the references. try { for (LeafReaderContext leaf : leaves()) { LazyLeafReader reader = (LazyLeafReader) leaf.reader(); @@ -241,12 +265,21 @@ synchronized void release() throws IOException { } void reset() throws IOException { - reset(engine.getOrOpenReader(true)); + boolean success = false; + DirectoryReader reader = engine.getOrOpenReader(); + try { + reset(reader); + success = true; + } finally { + if (success == false) { + IOUtils.close(reader); + } + } } private synchronized void reset(DirectoryReader delegate) { if (this.delegate != null) { - throw new IllegalStateException("lazy reader is not released"); + throw new AssertionError("lazy reader is not released"); } assert (delegate instanceof LazyDirectoryReader) == false : "must not be a LazyDirectoryReader"; List leaves = delegate.leaves(); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index a10fdf27df22d..92cfc2c9a7a2f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -154,9 +154,6 @@ private int addDocuments(AtomicLong globalCheckpoint, InternalEngine engine) thr int numDocs = scaledRandomIntBetween(10, 1000); int numDocsAdded = 0; for (int i = 0; i < numDocs; i++) { - if (rarely()) { - continue; // gap in sequence number - } numDocsAdded++; ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, From bee497a7c6bfb075c42dd053611e93c5a5f68356 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 6 Nov 2018 21:31:05 +0100 Subject: [PATCH 11/17] add more comments --- .../org/elasticsearch/index/engine/FrozenEngine.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 6bb9d735195bb..55a83ef8f1ee9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -44,6 +44,17 @@ * {@link Engine#acquireSearcher(String)}. The index reader opened is maintained until there are no reference to it anymore and then * releases itself from the engine. The readers returned from this engine are lazy which allows release after and reset before a search * phase starts. This allows releasing references as soon as possible on the search layer. + * + * Internally this class uses a set of wrapper abstractions to allow a reader that is used inside the {@link Engine.Searcher} returned from + * {@link #acquireSearcher(String, SearcherScope)} to release and reset it's internal resources. This is necessary to for instance release + * all SegmentReaders after a search phase finishes and reopen them before the next search phase starts. This together with a throttled + * threadpool (search_throttled) guarantees that at most N frozen shards have a low level index reader open at the same time. + * + * In particular we have LazyDirectoryReader that wraps its LeafReaders (the actual segment readers) inside LazyLeafReaders. Each of the + * LazyLeafReader delegates to segment LeafReader that can be reset (it's reference decremented and nulled out) on a search phase is + * finished. Before the next search phase starts we can reopen the corresponding reader and reset the reference to execute the search phase. + * This allows the SearchContext to hold on to the same LazyDirectoryReader across its lifecycle but under the hood resources (memory) is + * released while the SearchContext phases are not executing. */ public final class FrozenEngine extends ReadOnlyEngine { private final CounterMetric openedReaders = new CounterMetric(); From 2ff209e6a268c36df28dcdb33ce988d5d696eed7 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 6 Nov 2018 21:44:45 +0100 Subject: [PATCH 12/17] add javadocs --- .../main/java/org/elasticsearch/common/lucene/Lucene.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 7a8ccb8adc090..5be6bed0d577f 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -220,7 +220,7 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc throw new IllegalStateException("no commit found in the directory"); } } - final CommitPoint cp = new CommitPoint(si, directory); + final IndexCommit cp = getIndexCommit(si, directory); try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER) .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) .setIndexCommit(cp) @@ -232,6 +232,9 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc return si; } + /** + * Returns an index commit for the given {@link SegmentInfos} in the given directory. + */ public static IndexCommit getIndexCommit(SegmentInfos si, Directory directory) throws IOException { return new CommitPoint(si, directory); } From 6566ce493527413760fae0b59d76c3fdf2cd1461 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 6 Nov 2018 22:05:41 +0100 Subject: [PATCH 13/17] use refresh stats instead to track reopens --- .../index/engine/FrozenEngine.java | 21 +++--- .../index/engine/FrozenEngineTests.java | 64 +++++++++++++++---- 2 files changed, 61 insertions(+), 24 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 55a83ef8f1ee9..37c44ac6c31bd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -26,12 +26,11 @@ import org.apache.lucene.index.StoredFieldVisitor; import org.apache.lucene.index.Terms; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.core.internal.io.IOUtils; import java.io.IOException; @@ -54,10 +53,12 @@ * LazyLeafReader delegates to segment LeafReader that can be reset (it's reference decremented and nulled out) on a search phase is * finished. Before the next search phase starts we can reopen the corresponding reader and reset the reference to execute the search phase. * This allows the SearchContext to hold on to the same LazyDirectoryReader across its lifecycle but under the hood resources (memory) is - * released while the SearchContext phases are not executing. + * released while the SearchContext phases are not executing. + * + * The internal reopen of readers is treated like a refresh and refresh listeners are called up-on reopen. This allows to consume refresh + * stats in order to obtain the number of reopens. */ public final class FrozenEngine extends ReadOnlyEngine { - private final CounterMetric openedReaders = new CounterMetric(); private volatile DirectoryReader lastOpenedReader; public FrozenEngine(EngineConfig config) { @@ -132,11 +133,16 @@ private synchronized DirectoryReader getOrOpenReader() throws IOException { try { reader = getReader(); if (reader == null) { + for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) { + listeners.beforeRefresh(); + } reader = DirectoryReader.open(engineConfig.getStore().directory()); searcherFactory.processReaders(reader, null); - openedReaders.inc(); reader = lastOpenedReader = wrapReader(reader, Function.identity()); reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed); + for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) { + listeners.afterRefresh(true); + } } success = true; return reader; @@ -508,11 +514,6 @@ public LeafReader getDelegate() { } } - // TODO expose this as stats on master - long getTotalOpenedReaders() { - return openedReaders.count(); - } - synchronized boolean isReaderOpen() { return lastOpenedReader != null; } // this is mainly for tests diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index 92cfc2c9a7a2f..57b9ce1537442 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.index.engine; import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -17,12 +18,14 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import java.io.IOException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class FrozenEngineTests extends EngineTestCase { @@ -31,10 +34,13 @@ public void testAcquireReleaseReset() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + CountingRefreshListener listener = new CountingRefreshListener(); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null, + globalCheckpoint::get, new NoneCircuitBreakerService()); try (InternalEngine engine = createEngine(config)) { int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine)); engine.flushAndClose(); + listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { assertFalse(frozenEngine.isReaderOpen()); Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); @@ -43,13 +49,13 @@ public void testAcquireReleaseReset() throws IOException { assertTrue(frozenEngine.isReaderOpen()); TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), numDocs); assertEquals(search.scoreDocs.length, numDocs); - assertEquals(1, frozenEngine.getTotalOpenedReaders()); + assertEquals(1, listener.afterRefresh.get()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); assertFalse(frozenEngine.isReaderOpen()); - assertEquals(1, frozenEngine.getTotalOpenedReaders()); + assertEquals(1, listener.afterRefresh.get()); expectThrows(AlreadyClosedException.class, () -> searcher.searcher().search(new MatchAllDocsQuery(), numDocs)); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); - assertEquals(2, frozenEngine.getTotalOpenedReaders()); + assertEquals(2, listener.afterRefresh.get()); search = searcher.searcher().search(new MatchAllDocsQuery(), numDocs); assertEquals(search.scoreDocs.length, numDocs); searcher.close(); @@ -62,26 +68,29 @@ public void testAcquireReleaseResetTwoSearchers() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + CountingRefreshListener listener = new CountingRefreshListener(); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null, + globalCheckpoint::get, new NoneCircuitBreakerService()); try (InternalEngine engine = createEngine(config)) { int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine)); engine.flushAndClose(); + listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { assertFalse(frozenEngine.isReaderOpen()); Engine.Searcher searcher1 = frozenEngine.acquireSearcher("test"); assertTrue(frozenEngine.isReaderOpen()); TopDocs search = searcher1.searcher().search(new MatchAllDocsQuery(), numDocs); assertEquals(search.scoreDocs.length, numDocs); - assertEquals(1, frozenEngine.getTotalOpenedReaders()); + assertEquals(1, listener.afterRefresh.get()); FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).release(); Engine.Searcher searcher2 = frozenEngine.acquireSearcher("test"); search = searcher2.searcher().search(new MatchAllDocsQuery(), numDocs); assertEquals(search.scoreDocs.length, numDocs); assertTrue(frozenEngine.isReaderOpen()); - assertEquals(2, frozenEngine.getTotalOpenedReaders()); + assertEquals(2, listener.afterRefresh.get()); expectThrows(AlreadyClosedException.class, () -> searcher1.searcher().search(new MatchAllDocsQuery(), numDocs)); FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).reset(); - assertEquals(2, frozenEngine.getTotalOpenedReaders()); + assertEquals(2, listener.afterRefresh.get()); search = searcher1.searcher().search(new MatchAllDocsQuery(), numDocs); assertEquals(search.scoreDocs.length, numDocs); searcher1.close(); @@ -95,19 +104,22 @@ public void testSegmentStats() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + CountingRefreshListener listener = new CountingRefreshListener(); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null, + globalCheckpoint::get, new NoneCircuitBreakerService()); try (InternalEngine engine = createEngine(config)) { addDocuments(globalCheckpoint, engine); engine.flushAndClose(); + listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); - assertEquals(1, frozenEngine.getTotalOpenedReaders()); + assertEquals(1, listener.afterRefresh.get()); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(0, segmentsStats.getCount()); - assertEquals(1, frozenEngine.getTotalOpenedReaders()); + assertEquals(1, listener.afterRefresh.get()); assertFalse(frozenEngine.isReaderOpen()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); @@ -122,8 +134,9 @@ public void testCircuitBreakerAccounting() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, null, globalCheckpoint::get, - new HierarchyCircuitBreakerService(defaultSettings.getSettings(), + CountingRefreshListener listener = new CountingRefreshListener(); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null, + globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(), new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); long expectedUse; @@ -135,11 +148,12 @@ public void testCircuitBreakerAccounting() throws IOException { } assertTrue(expectedUse > 0); assertEquals(0, breaker.getUsed()); + listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(config)) { Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); assertEquals(expectedUse, breaker.getUsed()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); - assertEquals(1, frozenEngine.getTotalOpenedReaders()); + assertEquals(1, listener.afterRefresh.get()); assertEquals(0, breaker.getUsed()); assertFalse(frozenEngine.isReaderOpen()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); @@ -243,4 +257,26 @@ public void testOverrideMethods() throws Exception { checkOverrideMethods(FrozenEngine.LazyDirectoryReader.class); checkOverrideMethods(FrozenEngine.LazyLeafReader.class); } + + private class CountingRefreshListener implements ReferenceManager.RefreshListener { + + final AtomicInteger afterRefresh = new AtomicInteger(0); + private final AtomicInteger beforeRefresh = new AtomicInteger(0); + + @Override + public void beforeRefresh() { + beforeRefresh.incrementAndGet(); + } + + @Override + public void afterRefresh(boolean didRefresh) { + afterRefresh.incrementAndGet(); + assertEquals(beforeRefresh.get(), afterRefresh.get()); + } + + void reset() { + afterRefresh.set(0); + beforeRefresh.set(0); + } + } } From ea8a665ddf27f0a8dfc40fad99fb17a9a209a14a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 7 Nov 2018 08:13:37 +0100 Subject: [PATCH 14/17] fix forbidden API check --- .../main/java/org/elasticsearch/index/engine/FrozenEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 37c44ac6c31bd..1434fb0ec4a85 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -126,7 +126,6 @@ private synchronized void onReaderClosed(IndexReader.CacheKey key) { } } - @SuppressForbidden(reason = "we manage references explicitly here") private synchronized DirectoryReader getOrOpenReader() throws IOException { DirectoryReader reader = null; boolean success = false; @@ -153,6 +152,7 @@ private synchronized DirectoryReader getOrOpenReader() throws IOException { } } + @SuppressForbidden(reason = "we manage references explicitly here") private synchronized DirectoryReader getReader() throws IOException { DirectoryReader reader = null; boolean success = false; From be71799cbd67977b57de79fd3bdbeca967529b72 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 7 Nov 2018 16:32:13 +0100 Subject: [PATCH 15/17] make sure GCP is allways == LCP --- .../java/org/elasticsearch/index/engine/FrozenEngineTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index 57b9ce1537442..2c16df79122eb 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -175,7 +175,7 @@ private int addDocuments(AtomicLong globalCheckpoint, InternalEngine engine) thr if (rarely()) { engine.flush(); } - globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + globalCheckpoint.set(engine.getLocalCheckpoint()); } engine.syncTranslog(); return numDocsAdded; From 80b6827b7827dc1212624ee763225a3a585df0bc Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 7 Nov 2018 16:41:47 +0100 Subject: [PATCH 16/17] make store release more inituitive --- .../org/elasticsearch/index/engine/FrozenEngine.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 1434fb0ec4a85..4f8142a6d8ac7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -196,12 +196,15 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin // have one open at the time and can inc it's reference. DirectoryReader reader = maybeOpenReader ? getOrOpenReader() : getReader(); if (reader == null) { - store.decRef(); - success = true; // we just hand out a searcher on top of an empty reader that we opened for the ReadOnlyEngine in the #open(IndexCommit) // method. this is the case when we don't have a reader open right now and we get a stats call any other that falls in // the category that doesn't trigger a reopen - return super.acquireSearcher(source, scope); + try { + return super.acquireSearcher(source, scope); + } finally { + success = true; + store.decRef(); // this is the reference we acquired in the beginning of this method + } } else { try { LazyDirectoryReader lazyDirectoryReader = new LazyDirectoryReader(reader, this); From f24740827b22732a5cb097aae15c42241a63925a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 7 Nov 2018 16:49:51 +0100 Subject: [PATCH 17/17] use a releaseReference pattern instead of success patter --- .../elasticsearch/index/engine/FrozenEngine.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 4f8142a6d8ac7..0cd67e5ebc505 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -174,7 +174,7 @@ private synchronized DirectoryReader getReader() throws IOException { @SuppressForbidden( reason = "we manage references explicitly here") public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException { store.incRef(); - boolean success = false; + boolean releaseRefeference = true; try { final boolean maybeOpenReader; switch (source) { @@ -199,21 +199,16 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin // we just hand out a searcher on top of an empty reader that we opened for the ReadOnlyEngine in the #open(IndexCommit) // method. this is the case when we don't have a reader open right now and we get a stats call any other that falls in // the category that doesn't trigger a reopen - try { - return super.acquireSearcher(source, scope); - } finally { - success = true; - store.decRef(); // this is the reference we acquired in the beginning of this method - } + return super.acquireSearcher(source, scope); } else { try { LazyDirectoryReader lazyDirectoryReader = new LazyDirectoryReader(reader, this); Searcher newSearcher = new Searcher(source, new IndexSearcher(lazyDirectoryReader), () -> IOUtils.close(lazyDirectoryReader, store::decRef)); - success = true; + releaseRefeference = false; return newSearcher; } finally { - if (success == false) { + if (releaseRefeference) { reader.decRef(); // don't call close here we manage reference ourselves } } @@ -221,7 +216,7 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin } catch (IOException e) { throw new UncheckedIOException(e); } finally { - if (success == false) { + if (releaseRefeference) { store.decRef(); } }