Skip to content

Commit

Permalink
iter
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnvg committed Feb 14, 2025
1 parent 552ad61 commit 1ee2699
Showing 1 changed file with 22 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -106,7 +107,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -1021,16 +1021,17 @@ private VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) th
VersionValue versionValue = getVersionFromMap(op.uid());
if (versionValue == null) {
assert incrementIndexVersionLookup(); // used for asserting in tests
final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion;
try (var directoryReaderSupplier = acquireDirectoryReaderSupplier(SearcherScope.INTERNAL)) {
var indexReader = directoryReaderSupplier.getDirectoryReader();
if (engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES) {
assert engineConfig.getLeafSorter() == DataStream.TIMESERIES_LEAF_READERS_SORTER;
docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(indexReader, op.uid(), op.id(), loadSeqNo);
} else {
docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(indexReader, op.uid(), loadSeqNo);
final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion = performActionWithDirectoryReader(
SearcherScope.INTERNAL,
directoryReader -> {
if (engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES) {
assert engineConfig.getLeafSorter() == DataStream.TIMESERIES_LEAF_READERS_SORTER;
return VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, op.uid(), op.id(), loadSeqNo);
} else {
return VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, op.uid(), loadSeqNo);
}
}
}
);
if (docIdAndVersion != null) {
versionValue = new IndexVersionValue(null, docIdAndVersion.version, docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);
}
Expand Down Expand Up @@ -3464,57 +3465,25 @@ protected long getPreCommitSegmentGeneration() {
return preCommitSegmentGeneration.get();
}

DirectoryReaderSupplier acquireDirectoryReaderSupplier(SearcherScope scope) throws EngineException {
assert scope == SearcherScope.INTERNAL : "acquireDirectoryReaderSupplier(...) isn't prepared for external usage";
<T> T performActionWithDirectoryReader(SearcherScope scope, CheckedFunction<DirectoryReader, T, IOException> action)
throws EngineException {
assert scope == SearcherScope.INTERNAL : "performActionWithDirectoryReader(...) isn't prepared for external usage";
assert store.hasReferences();
try {
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
ElasticsearchDirectoryReader acquire = referenceManager.acquire();
return new DirectoryReaderSupplier(acquire) {

@Override
void doClose() {
try {
referenceManager.release(acquire);
} catch (IOException e) {
throw new UncheckedIOException("failed to close", e);
} catch (AlreadyClosedException e) {
// This means there's a bug somewhere: don't suppress it
throw new AssertionError(e);
}
}
};
try {
return action.apply(acquire);
} finally {
referenceManager.release(acquire);
}
} catch (AlreadyClosedException ex) {
throw ex;
} catch (Exception ex) {
maybeFailEngine("acquire_directory_reader", ex);
maybeFailEngine("perform_action_directory_reader", ex);
ensureOpen(ex); // throw EngineCloseException here if we are already closed
logger.error("failed to acquire directory reader", ex);
throw new EngineException(shardId, "failed to acquire directory reader", ex);
}
}

abstract static class DirectoryReaderSupplier implements Releasable {
private final DirectoryReader directoryReader;
private final AtomicBoolean released = new AtomicBoolean(false);

DirectoryReaderSupplier(DirectoryReader directoryReader) {
this.directoryReader = directoryReader;
}

public DirectoryReader getDirectoryReader() {
return directoryReader;
logger.error("failed to perform action with directory reader", ex);
throw new EngineException(shardId, "failed to perform action with directory reader", ex);
}

@Override
public final void close() {
if (released.compareAndSet(false, true)) {
doClose();
} else {
assert false : "DirectoryReaderSupplier was released twice";
}
}

abstract void doClose();
}
}

0 comments on commit 1ee2699

Please sign in to comment.