Skip to content

Commit

Permalink
Add one protected method to InternalEngine as a hook point (#105113)
Browse files Browse the repository at this point in the history
This PR adds one new protected method to InternalEngine to allow further
customization for flush behaviour. 

Relates: ES-7759
  • Loading branch information
ywangd authored Feb 12, 2024
1 parent 8429157 commit 4eaff75
Showing 1 changed file with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -139,7 +138,7 @@ public class InternalEngine extends Engine {
private final ExternalReaderManager externalReaderManager;
private final ElasticsearchReaderManager internalReaderManager;

private final Lock flushLock = new ReentrantLock();
private final ReentrantLock flushLock = new ReentrantLock();
private final ReentrantLock optimizeLock = new ReentrantLock();

// A uid (in the form of BytesRef) to the version map
Expand Down Expand Up @@ -2177,10 +2176,9 @@ private boolean shouldPeriodicallyFlush(long flushThresholdSizeInBytes, long flu
protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionListener<FlushResult> listener) throws EngineException {
ensureOpen(); // best-effort, a concurrent failEngine() can still happen but that's ok
if (force && waitIfOngoing == false) {
assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing;
throw new IllegalArgumentException(
"wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing
);
final String message = "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing;
assert false : message;
throw new IllegalArgumentException(message);
}
final long generation;
if (flushLock.tryLock() == false) {
Expand Down Expand Up @@ -2252,6 +2250,8 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList
logger.trace("released flush lock");
}

afterFlush(generation);

// We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
// (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
if (engineConfig.isEnableGcDeletes()) {
Expand All @@ -2261,6 +2261,10 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList
waitForCommitDurability(generation, listener.map(v -> new FlushResult(true, generation)));
}

protected final boolean isFlushLockIsHeldByCurrentThread() {
return flushLock.isHeldByCurrentThread();
}

protected boolean hasUncommittedChanges() {
return indexWriter.hasUncommittedChanges();
}
Expand Down Expand Up @@ -2288,6 +2292,8 @@ private void refreshLastCommittedSegmentInfos() {
}
}

protected void afterFlush(long generation) {}

@Override
public void rollTranslogGeneration() throws EngineException {
try (var ignored = acquireEnsureOpenRef()) {
Expand Down Expand Up @@ -2865,6 +2871,7 @@ protected void doRun() throws Exception {
* @param translog the translog
*/
protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException {
assert isFlushLockIsHeldByCurrentThread();
ensureCanFlush();
try {
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
Expand Down

0 comments on commit 4eaff75

Please sign in to comment.