From 8b7715c6604ca59a8cafbd5ecfc038f518bfb30d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 13 Feb 2019 14:04:11 -0500 Subject: [PATCH] Fix excessive increments in soft delete policy (#38813) In this case, we were incrementing the policy too much. This means on every iteration we actually keep increasing the minimum retained sequence number, even with leases in place. It was a bug from when the soft deletes policy had retention leases incorporated into it. This commit fixes this bug by ensuring we only increment in the proper places, and adds careful tests for the various situations. --- .../index/engine/SoftDeletesPolicy.java | 8 +- .../index/engine/SoftDeletesPolicyTests.java | 90 ++++++++++++++++++- 2 files changed, 93 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java index 49b8f9d3483f2..9a9c7bd0ee869 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -137,11 +137,13 @@ public synchronized Tuple getRetentionPolicy() { .orElse(Long.MAX_VALUE); /* * The minimum sequence number to retain is the minimum of the minimum based on retention leases, and the number of operations - * below the global checkpoint to retain (index.soft_deletes.retention.operations). + * below the global checkpoint to retain (index.soft_deletes.retention.operations). The additional increments on the global + * checkpoint and the local checkpoint of the safe commit are due to the fact that we want to retain all operations above + * those checkpoints. */ final long minSeqNoForQueryingChanges = - Math.min(globalCheckpointSupplier.getAsLong() - retentionOperations, minimumRetainingSequenceNumber); - final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, localCheckpointOfSafeCommit) + 1; + Math.min(1 + globalCheckpointSupplier.getAsLong() - retentionOperations, minimumRetainingSequenceNumber); + final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, 1 + localCheckpointOfSafeCommit); /* * We take the maximum as minSeqNoToRetain can go backward as the retention operations value can be changed in settings, or from diff --git a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java index 8257aa99d0486..e4da636deaf6d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java @@ -32,6 +32,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.OptionalLong; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -98,7 +99,9 @@ public void testSoftDeletesRetentionLock() { .min() .orElse(Long.MAX_VALUE); long retainedSeqNo = - Math.min(safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, globalCheckpoint.get() - retainedOps)) + 1; + Math.min( + 1 + safeCommitCheckpoint, + Math.min(minimumRetainingSequenceNumber, 1 + globalCheckpoint.get() - retainedOps)); minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo); } assertThat(retentionQuery.getNumDims(), equalTo(1)); @@ -113,7 +116,7 @@ public void testSoftDeletesRetentionLock() { .min() .orElse(Long.MAX_VALUE); long retainedSeqNo = - Math.min(safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, globalCheckpoint.get() - retainedOps)) + 1; + Math.min(1 + safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, 1 + globalCheckpoint.get() - retainedOps)); minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo); assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo)); } @@ -141,4 +144,87 @@ public void testAlwaysFetchLatestRetentionLeases() { assertThat(policy.getRetentionPolicy().v2().leases(), contains(leases.toArray(new RetentionLease[0]))); } } + + public void testWhenGlobalCheckpointDictatesThePolicy() { + final int retentionOperations = randomIntBetween(0, 1024); + final AtomicLong globalCheckpoint = new AtomicLong(randomLongBetween(0, Long.MAX_VALUE - 2)); + final Collection leases = new ArrayList<>(); + final int numberOfLeases = randomIntBetween(0, 16); + for (int i = 0; i < numberOfLeases; i++) { + // setup leases where the minimum retained sequence number is more than the policy dictated by the global checkpoint + leases.add(new RetentionLease( + Integer.toString(i), + randomLongBetween(1 + globalCheckpoint.get() - retentionOperations + 1, Long.MAX_VALUE), + randomNonNegativeLong(), "test")); + } + final long primaryTerm = randomNonNegativeLong(); + final long version = randomNonNegativeLong(); + final Supplier leasesSupplier = + () -> new RetentionLeases( + primaryTerm, + version, + Collections.unmodifiableCollection(new ArrayList<>(leases))); + final SoftDeletesPolicy policy = + new SoftDeletesPolicy(globalCheckpoint::get, 0, retentionOperations, leasesSupplier); + // set the local checkpoint of the safe commit to more than the policy dicated by the global checkpoint + final long localCheckpointOfSafeCommit = randomLongBetween(1 + globalCheckpoint.get() - retentionOperations + 1, Long.MAX_VALUE); + policy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); + assertThat(policy.getMinRetainedSeqNo(), equalTo(1 + globalCheckpoint.get() - retentionOperations)); + } + + public void testWhenLocalCheckpointOfSafeCommitDictatesThePolicy() { + final int retentionOperations = randomIntBetween(0, 1024); + final long localCheckpointOfSafeCommit = randomLongBetween(-1, Long.MAX_VALUE - retentionOperations - 1); + final AtomicLong globalCheckpoint = + new AtomicLong(randomLongBetween(Math.max(0, localCheckpointOfSafeCommit + retentionOperations), Long.MAX_VALUE - 1)); + final Collection leases = new ArrayList<>(); + final int numberOfLeases = randomIntBetween(0, 16); + for (int i = 0; i < numberOfLeases; i++) { + leases.add(new RetentionLease( + Integer.toString(i), + randomLongBetween(1 + localCheckpointOfSafeCommit + 1, Long.MAX_VALUE), // leases are for more than the local checkpoint + randomNonNegativeLong(), "test")); + } + final long primaryTerm = randomNonNegativeLong(); + final long version = randomNonNegativeLong(); + final Supplier leasesSupplier = + () -> new RetentionLeases( + primaryTerm, + version, + Collections.unmodifiableCollection(new ArrayList<>(leases))); + + final SoftDeletesPolicy policy = + new SoftDeletesPolicy(globalCheckpoint::get, 0, retentionOperations, leasesSupplier); + policy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); + assertThat(policy.getMinRetainedSeqNo(), equalTo(1 + localCheckpointOfSafeCommit)); + } + + public void testWhenRetentionLeasesDictateThePolicy() { + final int retentionOperations = randomIntBetween(0, 1024); + final Collection leases = new ArrayList<>(); + final int numberOfLeases = randomIntBetween(1, 16); + for (int i = 0; i < numberOfLeases; i++) { + leases.add(new RetentionLease( + Integer.toString(i), + randomLongBetween(0, Long.MAX_VALUE - retentionOperations - 1), + randomNonNegativeLong(), "test")); + } + final OptionalLong minimumRetainingSequenceNumber = leases.stream().mapToLong(RetentionLease::retainingSequenceNumber).min(); + assert minimumRetainingSequenceNumber.isPresent() : leases; + final long localCheckpointOfSafeCommit = randomLongBetween(minimumRetainingSequenceNumber.getAsLong(), Long.MAX_VALUE - 1); + final AtomicLong globalCheckpoint = + new AtomicLong(randomLongBetween(minimumRetainingSequenceNumber.getAsLong() + retentionOperations, Long.MAX_VALUE - 1)); + final long primaryTerm = randomNonNegativeLong(); + final long version = randomNonNegativeLong(); + final Supplier leasesSupplier = + () -> new RetentionLeases( + primaryTerm, + version, + Collections.unmodifiableCollection(new ArrayList<>(leases))); + final SoftDeletesPolicy policy = + new SoftDeletesPolicy(globalCheckpoint::get, 0, retentionOperations, leasesSupplier); + policy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); + assertThat(policy.getMinRetainedSeqNo(), equalTo(minimumRetainingSequenceNumber.getAsLong())); + } + }