From a04181a73c8bdb4015212a37763ec91c1cb2ad80 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 12 Feb 2025 14:33:19 +0100 Subject: [PATCH 1/4] Speed up InternalEngine#resolveDocVersion(...) method Use reference manager to get index reader instead of acquire a searcher. The latter involved creating an index searcher, which is not used and expensive as part og resolveDocVersion() method. --- .../index/engine/InternalEngine.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7f6fe40dbaaf0..6e11299615798 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1020,23 +1020,18 @@ private VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) th VersionValue versionValue = getVersionFromMap(op.uid()); if (versionValue == null) { assert incrementIndexVersionLookup(); // used for asserting in tests + var referenceManager = getReferenceManager(SearcherScope.INTERNAL); final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion; - try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) { + var indexReader = referenceManager.acquire(); + try { if (engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES) { assert engineConfig.getLeafSorter() == DataStream.TIMESERIES_LEAF_READERS_SORTER; - docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion( - searcher.getIndexReader(), - op.uid(), - op.id(), - loadSeqNo - ); + docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(indexReader, op.uid(), op.id(), loadSeqNo); } else { - docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion( - searcher.getIndexReader(), - op.uid(), - loadSeqNo - ); + docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(indexReader, op.uid(), loadSeqNo); } + } finally { + referenceManager.release(indexReader); } if (docIdAndVersion != null) { versionValue = new IndexVersionValue(null, docIdAndVersion.version, docIdAndVersion.seqNo, docIdAndVersion.primaryTerm); From 80729076585f7832534cb8e475edd8d4247dc057 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 13 Feb 2025 14:20:24 +0100 Subject: [PATCH 2/4] introduce Engine#acquireDirectoryReaderSupplier(...) --- .../index/engine/InternalEngine.java | 75 +++++++++++++++++-- 1 file changed, 70 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 6e11299615798..3887d5f3d2b03 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -70,6 +70,7 @@ import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; @@ -106,6 +107,7 @@ import java.io.Closeable; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -1020,18 +1022,15 @@ private VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) th VersionValue versionValue = getVersionFromMap(op.uid()); if (versionValue == null) { assert incrementIndexVersionLookup(); // used for asserting in tests - var referenceManager = getReferenceManager(SearcherScope.INTERNAL); final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion; - var indexReader = referenceManager.acquire(); - try { + try (var directoryReaderSupplier = acquireDirectoryReaderSupplier(SearcherScope.INTERNAL)) { + var indexReader = directoryReaderSupplier.getDirectoryReader(); if (engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES) { assert engineConfig.getLeafSorter() == DataStream.TIMESERIES_LEAF_READERS_SORTER; docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(indexReader, op.uid(), op.id(), loadSeqNo); } else { docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(indexReader, op.uid(), loadSeqNo); } - } finally { - referenceManager.release(indexReader); } if (docIdAndVersion != null) { versionValue = new IndexVersionValue(null, docIdAndVersion.version, docIdAndVersion.seqNo, docIdAndVersion.primaryTerm); @@ -3465,4 +3464,70 @@ public LiveVersionMap getLiveVersionMap() { protected long getPreCommitSegmentGeneration() { return preCommitSegmentGeneration.get(); } + + DirectoryReaderSupplier acquireDirectoryReaderSupplier(SearcherScope scope) throws EngineException { + assert scope == SearcherScope.INTERNAL : "acquireDirectoryReaderSupplier(...) isn't prepared for external usage"; + + /* Acquire order here is store -> manager since we need + * to make sure that the store is not closed before + * the searcher is acquired. */ + if (store.tryIncRef() == false) { + throw new AlreadyClosedException(shardId + " store is closed", failedEngine.get()); + } + Releasable releasable = store::decRef; + try { + ReferenceManager referenceManager = getReferenceManager(scope); + ElasticsearchDirectoryReader acquire = referenceManager.acquire(); + releasable = null; // success - hand over the reference to the engine reader + return new DirectoryReaderSupplier(acquire) { + + @Override + void doClose() { + try { + referenceManager.release(acquire); + } catch (IOException e) { + throw new UncheckedIOException("failed to close", e); + } catch (AlreadyClosedException e) { + // This means there's a bug somewhere: don't suppress it + throw new AssertionError(e); + } finally { + store.decRef(); + } + } + }; + } catch (AlreadyClosedException ex) { + throw ex; + } catch (Exception ex) { + maybeFailEngine("acquire_reader", ex); + ensureOpen(ex); // throw EngineCloseException here if we are already closed + logger.error("failed to acquire reader", ex); + throw new EngineException(shardId, "failed to acquire reader", ex); + } finally { + Releasables.close(releasable); + } + } + + abstract static class DirectoryReaderSupplier implements Releasable { + private final DirectoryReader directoryReader; + private final AtomicBoolean released = new AtomicBoolean(false); + + DirectoryReaderSupplier(DirectoryReader directoryReader) { + this.directoryReader = directoryReader; + } + + public DirectoryReader getDirectoryReader() { + return directoryReader; + } + + @Override + public final void close() { + if (released.compareAndSet(false, true)) { + doClose(); + } else { + assert false : "DirectoryReaderSupplier was released twice"; + } + } + + abstract void doClose(); + } } From b81fa738b37df2452430ee34c6288d811ccf64da Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 13 Feb 2025 15:26:09 +0100 Subject: [PATCH 3/4] assert that we have a reference --- .../index/engine/InternalEngine.java | 21 ++++--------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3887d5f3d2b03..8701a1269c27e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -70,7 +70,6 @@ import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; -import org.elasticsearch.core.Releasables; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; @@ -3467,18 +3466,10 @@ protected long getPreCommitSegmentGeneration() { DirectoryReaderSupplier acquireDirectoryReaderSupplier(SearcherScope scope) throws EngineException { assert scope == SearcherScope.INTERNAL : "acquireDirectoryReaderSupplier(...) isn't prepared for external usage"; - - /* Acquire order here is store -> manager since we need - * to make sure that the store is not closed before - * the searcher is acquired. */ - if (store.tryIncRef() == false) { - throw new AlreadyClosedException(shardId + " store is closed", failedEngine.get()); - } - Releasable releasable = store::decRef; + assert store.hasReferences(); try { ReferenceManager referenceManager = getReferenceManager(scope); ElasticsearchDirectoryReader acquire = referenceManager.acquire(); - releasable = null; // success - hand over the reference to the engine reader return new DirectoryReaderSupplier(acquire) { @Override @@ -3490,20 +3481,16 @@ void doClose() { } catch (AlreadyClosedException e) { // This means there's a bug somewhere: don't suppress it throw new AssertionError(e); - } finally { - store.decRef(); } } }; } catch (AlreadyClosedException ex) { throw ex; } catch (Exception ex) { - maybeFailEngine("acquire_reader", ex); + maybeFailEngine("acquire_directory_reader", ex); ensureOpen(ex); // throw EngineCloseException here if we are already closed - logger.error("failed to acquire reader", ex); - throw new EngineException(shardId, "failed to acquire reader", ex); - } finally { - Releasables.close(releasable); + logger.error("failed to acquire directory reader", ex); + throw new EngineException(shardId, "failed to acquire directory reader", ex); } } From 1ee2699832c2b31ea0d1fea1f9720bc4223ff374 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 14 Feb 2025 14:58:02 +0100 Subject: [PATCH 4/4] iter --- .../index/engine/InternalEngine.java | 75 ++++++------------- 1 file changed, 22 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8701a1269c27e..f463dce2ec70e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -67,6 +67,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Booleans; +import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -106,7 +107,6 @@ import java.io.Closeable; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -1021,16 +1021,17 @@ private VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) th VersionValue versionValue = getVersionFromMap(op.uid()); if (versionValue == null) { assert incrementIndexVersionLookup(); // used for asserting in tests - final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion; - try (var directoryReaderSupplier = acquireDirectoryReaderSupplier(SearcherScope.INTERNAL)) { - var indexReader = directoryReaderSupplier.getDirectoryReader(); - if (engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES) { - assert engineConfig.getLeafSorter() == DataStream.TIMESERIES_LEAF_READERS_SORTER; - docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(indexReader, op.uid(), op.id(), loadSeqNo); - } else { - docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(indexReader, op.uid(), loadSeqNo); + final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion = performActionWithDirectoryReader( + SearcherScope.INTERNAL, + directoryReader -> { + if (engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES) { + assert engineConfig.getLeafSorter() == DataStream.TIMESERIES_LEAF_READERS_SORTER; + return VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, op.uid(), op.id(), loadSeqNo); + } else { + return VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, op.uid(), loadSeqNo); + } } - } + ); if (docIdAndVersion != null) { versionValue = new IndexVersionValue(null, docIdAndVersion.version, docIdAndVersion.seqNo, docIdAndVersion.primaryTerm); } @@ -3464,57 +3465,25 @@ protected long getPreCommitSegmentGeneration() { return preCommitSegmentGeneration.get(); } - DirectoryReaderSupplier acquireDirectoryReaderSupplier(SearcherScope scope) throws EngineException { - assert scope == SearcherScope.INTERNAL : "acquireDirectoryReaderSupplier(...) isn't prepared for external usage"; + T performActionWithDirectoryReader(SearcherScope scope, CheckedFunction action) + throws EngineException { + assert scope == SearcherScope.INTERNAL : "performActionWithDirectoryReader(...) isn't prepared for external usage"; assert store.hasReferences(); try { ReferenceManager referenceManager = getReferenceManager(scope); ElasticsearchDirectoryReader acquire = referenceManager.acquire(); - return new DirectoryReaderSupplier(acquire) { - - @Override - void doClose() { - try { - referenceManager.release(acquire); - } catch (IOException e) { - throw new UncheckedIOException("failed to close", e); - } catch (AlreadyClosedException e) { - // This means there's a bug somewhere: don't suppress it - throw new AssertionError(e); - } - } - }; + try { + return action.apply(acquire); + } finally { + referenceManager.release(acquire); + } } catch (AlreadyClosedException ex) { throw ex; } catch (Exception ex) { - maybeFailEngine("acquire_directory_reader", ex); + maybeFailEngine("perform_action_directory_reader", ex); ensureOpen(ex); // throw EngineCloseException here if we are already closed - logger.error("failed to acquire directory reader", ex); - throw new EngineException(shardId, "failed to acquire directory reader", ex); - } - } - - abstract static class DirectoryReaderSupplier implements Releasable { - private final DirectoryReader directoryReader; - private final AtomicBoolean released = new AtomicBoolean(false); - - DirectoryReaderSupplier(DirectoryReader directoryReader) { - this.directoryReader = directoryReader; - } - - public DirectoryReader getDirectoryReader() { - return directoryReader; + logger.error("failed to perform action with directory reader", ex); + throw new EngineException(shardId, "failed to perform action with directory reader", ex); } - - @Override - public final void close() { - if (released.compareAndSet(false, true)) { - doClose(); - } else { - assert false : "DirectoryReaderSupplier was released twice"; - } - } - - abstract void doClose(); } }