Skip to content

Commit

Permalink
introduce Engine#acquireDirectoryReaderSupplier(...)
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnvg committed Feb 13, 2025
1 parent ce44c20 commit 8072907
Showing 1 changed file with 70 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.Environment;
Expand Down Expand Up @@ -106,6 +107,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -1020,18 +1022,15 @@ private VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) th
VersionValue versionValue = getVersionFromMap(op.uid());
if (versionValue == null) {
assert incrementIndexVersionLookup(); // used for asserting in tests
var referenceManager = getReferenceManager(SearcherScope.INTERNAL);
final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion;
var indexReader = referenceManager.acquire();
try {
try (var directoryReaderSupplier = acquireDirectoryReaderSupplier(SearcherScope.INTERNAL)) {
var indexReader = directoryReaderSupplier.getDirectoryReader();
if (engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES) {
assert engineConfig.getLeafSorter() == DataStream.TIMESERIES_LEAF_READERS_SORTER;
docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(indexReader, op.uid(), op.id(), loadSeqNo);
} else {
docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(indexReader, op.uid(), loadSeqNo);
}
} finally {
referenceManager.release(indexReader);
}
if (docIdAndVersion != null) {
versionValue = new IndexVersionValue(null, docIdAndVersion.version, docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);
Expand Down Expand Up @@ -3465,4 +3464,70 @@ public LiveVersionMap getLiveVersionMap() {
protected long getPreCommitSegmentGeneration() {
return preCommitSegmentGeneration.get();
}

DirectoryReaderSupplier acquireDirectoryReaderSupplier(SearcherScope scope) throws EngineException {
assert scope == SearcherScope.INTERNAL : "acquireDirectoryReaderSupplier(...) isn't prepared for external usage";

/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
if (store.tryIncRef() == false) {
throw new AlreadyClosedException(shardId + " store is closed", failedEngine.get());
}
Releasable releasable = store::decRef;
try {
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
ElasticsearchDirectoryReader acquire = referenceManager.acquire();
releasable = null; // success - hand over the reference to the engine reader
return new DirectoryReaderSupplier(acquire) {

@Override
void doClose() {
try {
referenceManager.release(acquire);
} catch (IOException e) {
throw new UncheckedIOException("failed to close", e);
} catch (AlreadyClosedException e) {
// This means there's a bug somewhere: don't suppress it
throw new AssertionError(e);
} finally {
store.decRef();
}
}
};
} catch (AlreadyClosedException ex) {
throw ex;
} catch (Exception ex) {
maybeFailEngine("acquire_reader", ex);
ensureOpen(ex); // throw EngineCloseException here if we are already closed
logger.error("failed to acquire reader", ex);
throw new EngineException(shardId, "failed to acquire reader", ex);
} finally {
Releasables.close(releasable);
}
}

abstract static class DirectoryReaderSupplier implements Releasable {
private final DirectoryReader directoryReader;
private final AtomicBoolean released = new AtomicBoolean(false);

DirectoryReaderSupplier(DirectoryReader directoryReader) {
this.directoryReader = directoryReader;
}

public DirectoryReader getDirectoryReader() {
return directoryReader;
}

@Override
public final void close() {
if (released.compareAndSet(false, true)) {
doClose();
} else {
assert false : "DirectoryReaderSupplier was released twice";
}
}

abstract void doClose();
}
}

0 comments on commit 8072907

Please sign in to comment.