From 054c2b042bb4483f969f6f259f76dd47aa373f6b Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 14 Feb 2025 21:50:00 +0100 Subject: [PATCH] Speed up InternalEngine#resolveDocVersion(...) method (#122374) (#122642) Use reference manager to get index reader instead of acquiring a searcher. The latter involves creating an index searcher, which is not used and expensive as part of the `resolveDocVersion(...)` method, because this method is invoked for each document that gets indexed. --- .../index/engine/InternalEngine.java | 50 ++++++++++++------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7f6fe40dbaaf0..f463dce2ec70e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -67,6 +67,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Booleans; +import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -1020,24 +1021,17 @@ private VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) th VersionValue versionValue = getVersionFromMap(op.uid()); if (versionValue == null) { assert incrementIndexVersionLookup(); // used for asserting in tests - final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion; - try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) { - if (engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES) { - assert engineConfig.getLeafSorter() == DataStream.TIMESERIES_LEAF_READERS_SORTER; - docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion( - searcher.getIndexReader(), - op.uid(), - op.id(), - loadSeqNo - ); - } else { - docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion( - searcher.getIndexReader(), - op.uid(), - loadSeqNo - ); + final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion = performActionWithDirectoryReader( + SearcherScope.INTERNAL, + directoryReader -> { + if (engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES) { + assert engineConfig.getLeafSorter() == DataStream.TIMESERIES_LEAF_READERS_SORTER; + return VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, op.uid(), op.id(), loadSeqNo); + } else { + return VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, op.uid(), loadSeqNo); + } } - } + ); if (docIdAndVersion != null) { versionValue = new IndexVersionValue(null, docIdAndVersion.version, docIdAndVersion.seqNo, docIdAndVersion.primaryTerm); } @@ -3470,4 +3464,26 @@ public LiveVersionMap getLiveVersionMap() { protected long getPreCommitSegmentGeneration() { return preCommitSegmentGeneration.get(); } + + T performActionWithDirectoryReader(SearcherScope scope, CheckedFunction action) + throws EngineException { + assert scope == SearcherScope.INTERNAL : "performActionWithDirectoryReader(...) isn't prepared for external usage"; + assert store.hasReferences(); + try { + ReferenceManager referenceManager = getReferenceManager(scope); + ElasticsearchDirectoryReader acquire = referenceManager.acquire(); + try { + return action.apply(acquire); + } finally { + referenceManager.release(acquire); + } + } catch (AlreadyClosedException ex) { + throw ex; + } catch (Exception ex) { + maybeFailEngine("perform_action_directory_reader", ex); + ensureOpen(ex); // throw EngineCloseException here if we are already closed + logger.error("failed to perform action with directory reader", ex); + throw new EngineException(shardId, "failed to perform action with directory reader", ex); + } + } }