Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune only gc deletes below the local checkpoint #28790

Merged
merged 15 commits into from
Mar 26, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -1617,15 +1617,41 @@ public void trimTranslog() throws EngineException {
}

private void pruneDeletedTombstones() {
/*
* We need to deploy two different trimming strategies for GC deletes on primary and replicas. Delete operations on primary
* are remembered for at least one GC delete cycle and trimmed periodically. This is, at the moment, the best we can do on
* primary for user facing APIs but this arbitrary time limit is problematic for replicas. On replicas however we should
* trim only deletes whose seqno at most the local checkpoint. This requirement is explained as follows.
*
* Suppose o1 and o2 are two operations on the same document with seq#(o1) < seq#(o2), and o2 arrives before o1 on the replica.
* o2 is processed normally since it arrives first; when o1 arrives it should be discarded:
* - If seq#(o1) <= LCP, then it will be not be added to Lucene, as it was already previously added.
* - If seq#(o1) > LCP, then it depends on the nature of o2:
* *) If o2 is a delete then its seq# is recorded in the VersionMap, since seq#(o2) > seq#(o1) > LCP,
* so a lookup can find it and determine that o1 is stale.
* *) If o2 is an indexing then its seq# is either in Lucene (if refreshed) or the VersionMap (if not refreshed yet),
* so a real-time lookup can find it and determine that o1 is stale.
*
* Here we prefer to deploy a single trimming strategy, which satisfies two constraints, on both primary and replicas because:
* - It's simpler - no need to distinguish if an engine is running at primary mode or replica mode or being promoted.
* - If a replica subsequently is promoted, user experience is maintained as that replica remembers deletes for the last GC cycle.
*
* However, the version map may consume less memory if we deploy two different trimming strategies for primary and replicas.
*/
final long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
versionMap.pruneTombstones(timeMSec, engineConfig.getIndexSettings().getGcDeletesInMillis());
final long maxTimestampToPrune = timeMSec - engineConfig.getIndexSettings().getGcDeletesInMillis();
versionMap.pruneTombstones(maxTimestampToPrune, localCheckpointTracker.getCheckpoint());
lastDeleteVersionPruneTimeMSec = timeMSec;
}

// testing
void clearDeletedTombstones() {
// clean with current time Long.MAX_VALUE and interval 0 since we use a greater than relationship here.
versionMap.pruneTombstones(Long.MAX_VALUE, 0);
versionMap.pruneTombstones(Long.MAX_VALUE, localCheckpointTracker.getMaxSeqNo());
}

// for testing
final Map<BytesRef, DeleteVersionValue> getDeletedTombstones() {
return versionMap.getAllTombstones();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,10 @@ void removeTombstoneUnderLock(BytesRef uid) {
}
}

void pruneTombstones(long currentTime, long pruneInterval) {
/**
* Try to prune tombstones whose timestamp is less than maxTimestampToPrune and seqno at most the maxSeqNoToPrune.
*/
void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) {
for (Map.Entry<BytesRef, DeleteVersionValue> entry : tombstones.entrySet()) {
final BytesRef uid = entry.getKey();
try (Releasable lock = keyedLock.tryAcquire(uid)) {
Expand All @@ -387,9 +390,8 @@ void pruneTombstones(long currentTime, long pruneInterval) {
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
final DeleteVersionValue versionValue = tombstones.get(uid);
if (versionValue != null) {
// check if the value is old enough to be removed
final boolean isTooOld = currentTime - versionValue.time > pruneInterval;
if (isTooOld) {
// check if the value is old enough and safe to be removed
if (versionValue.time < maxTimestampToPrune && versionValue.seqNo <= maxSeqNoToPrune) {
// version value can't be removed it's
// not yet flushed to lucene ie. it's part of this current maps object
final boolean isNotTrackedByCurrentMaps = versionValue.time < maps.getMinDeleteTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
Expand All @@ -91,6 +92,7 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
Expand Down Expand Up @@ -165,6 +167,7 @@
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -4572,4 +4575,45 @@ public void testStressUpdateSameDocWhileGettingIt() throws IOException, Interrup
}
}
}

public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception {
final IndexSettings indexSettings = engine.config().getIndexSettings();
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
.settings(Settings.builder().put(indexSettings.getSettings())
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "-1")).build();
indexSettings.updateIndexMetaData(indexMetaData);
engine.onSettingsChanged();
engine.engineConfig.setEnableGcDeletes(true);
int addedDocs = scaledRandomIntBetween(0, 10);
for (int i = 0; i < addedDocs; i++) {
index(engine, i);
}
final AtomicLong clock = new AtomicLong();
final Set<Long> trimmedDeletes = new HashSet<>();
final int trimmedBatch = between(10, 20);
for (int i = 0; i < trimmedBatch; i++) {
final long seqno = engine.getLocalCheckpointTracker().generateSeqNo();
engine.delete(replicaDeleteForDoc(UUIDs.randomBase64UUID(), 1, seqno, clock.incrementAndGet()));
trimmedDeletes.add(seqno);
}
final long gapSeqNo = engine.getLocalCheckpointTracker().generateSeqNo(); // Gap here.
final Set<Long> rememberedDeletes = new HashSet<>();
final int rememberedBatch = between(10, 20);
for (int i = 0; i < rememberedBatch; i++) {
final long seqno = engine.getLocalCheckpointTracker().generateSeqNo();
engine.delete(replicaDeleteForDoc(UUIDs.randomBase64UUID(), 1, seqno, clock.incrementAndGet()));
rememberedDeletes.add(seqno);
}
assertThat(engine.getDeletedTombstones().values().stream().map(deleteVersion -> deleteVersion.seqNo).collect(Collectors.toSet()),
equalTo(Sets.union(trimmedDeletes, rememberedDeletes)));
engine.refresh("test");
// Only prune deletes below the local checkpoint.
engine.maybePruneDeletes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this relate to clock? also refresh already maybe prunes deletes

assertThat(engine.getDeletedTombstones().values().stream().map(deleteVersion -> deleteVersion.seqNo).collect(Collectors.toSet()),
equalTo(rememberedDeletes));
// Fill the gap - should be able to prune all deletes.
engine.index(replicaIndexForDoc(testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, gapSeqNo, false));
engine.maybePruneDeletes();
assertThat(engine.getDeletedTombstones().entrySet(), empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.RamUsageTester;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.Assertions;
import org.elasticsearch.bootstrap.JavaVersion;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
Expand All @@ -38,7 +38,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.empty;

public class LiveVersionMapTests extends ESTestCase {

Expand Down Expand Up @@ -93,14 +94,15 @@ public void testBasics() throws IOException {
map.afterRefresh(randomBoolean());
assertNull(map.getUnderLock(uid("test")));


map.putUnderLock(uid("test"), new DeleteVersionValue(1,1,1,1));
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
map.beforeRefresh();
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
map.afterRefresh(randomBoolean());
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
map.pruneTombstones(2, 0);
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
map.pruneTombstones(2, 1);
assertNull(map.getUnderLock(uid("test")));
}
}
Expand All @@ -121,8 +123,10 @@ public void testConcurrently() throws IOException, InterruptedException {
CountDownLatch startGun = new CountDownLatch(numThreads);
CountDownLatch done = new CountDownLatch(numThreads);
int randomValuesPerThread = randomIntBetween(5000, 20000);
AtomicLong clock = new AtomicLong(0);
AtomicLong lastPrunedTimestamp = new AtomicLong(-1);
final AtomicLong clock = new AtomicLong(0);
final AtomicLong lastPrunedTimestamp = new AtomicLong(-1);
final AtomicLong maxSeqNo = new AtomicLong();
final AtomicLong lastPrunedSeqNo = new AtomicLong();
for (int j = 0; j < threads.length; j++) {
threads[j] = new Thread(() -> {
startGun.countDown();
Expand All @@ -135,29 +139,31 @@ public void testConcurrently() throws IOException, InterruptedException {
try {
for (int i = 0; i < randomValuesPerThread; ++i) {
BytesRef bytesRef = randomFrom(random(), keyList);
final long clockTick = clock.get();
try (Releasable r = map.acquireLock(bytesRef)) {
VersionValue versionValue = values.computeIfAbsent(bytesRef,
v -> new VersionValue(randomLong(), randomLong(), randomLong()));
v -> new VersionValue(randomLong(), maxSeqNo.incrementAndGet(), randomLong()));
boolean isDelete = versionValue instanceof DeleteVersionValue;
if (isDelete) {
map.removeTombstoneUnderLock(bytesRef);
deletes.remove(bytesRef);
}
if (isDelete == false && rarely()) {
versionValue = new DeleteVersionValue(versionValue.version + 1, versionValue.seqNo + 1,
versionValue = new DeleteVersionValue(versionValue.version + 1, maxSeqNo.incrementAndGet(),
versionValue.term, clock.getAndIncrement());
deletes.put(bytesRef, (DeleteVersionValue) versionValue);
} else {
versionValue = new VersionValue(versionValue.version + 1, versionValue.seqNo + 1, versionValue.term);
versionValue = new VersionValue(versionValue.version + 1, maxSeqNo.incrementAndGet(), versionValue.term);
}
values.put(bytesRef, versionValue);
map.putUnderLock(bytesRef, versionValue);
}
if (rarely()) {
map.pruneTombstones(clockTick, 0);
// timestamp we pruned the deletes
lastPrunedTimestamp.updateAndGet(prev -> Math.max(clockTick, prev)); // make sure we track the latest
final long pruneSeqNo = randomLongBetween(0, maxSeqNo.get());
final long clockTick = randomLongBetween(0, clock.get());
map.pruneTombstones(clockTick, pruneSeqNo);
// make sure we track the latest timestamp and seqno we pruned the deletes
lastPrunedTimestamp.updateAndGet(prev -> Math.max(clockTick, prev));
lastPrunedSeqNo.updateAndGet(prev -> Math.max(pruneSeqNo, prev));
}
}
} finally {
Expand Down Expand Up @@ -221,15 +227,17 @@ public void testConcurrently() throws IOException, InterruptedException {
VersionValue value = map.getUnderLock(e.getKey());
// here we keep track of the deletes and ensure that all deletes that are not visible anymore ie. not in the map
// have a timestamp that is smaller or equal to the maximum timestamp that we pruned on
final DeleteVersionValue delete = e.getValue();
if (value == null) {
assertTrue(e.getValue().time + " > " + lastPrunedTimestamp.get(), e.getValue().time <= lastPrunedTimestamp.get());
assertTrue(delete.time + " > " + lastPrunedTimestamp.get() + "," + delete.seqNo + " > " + lastPrunedSeqNo.get(),
delete.time <= lastPrunedTimestamp.get() && delete.seqNo <= lastPrunedSeqNo.get());
} else {
assertEquals(value, e.getValue());
assertEquals(value, delete);
}
}
});
map.pruneTombstones(clock.incrementAndGet(), 0);
assertEquals(0, StreamSupport.stream(map.getAllTombstones().entrySet().spliterator(), false).count());
map.pruneTombstones(clock.incrementAndGet(), maxSeqNo.get());
assertThat(map.getAllTombstones().entrySet(), empty());
}

public void testCarryOnSafeAccess() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkItemRequest;
Expand All @@ -30,11 +31,13 @@
import org.elasticsearch.action.bulk.BulkShardResponse;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkActionTests;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.resync.ResyncReplicationRequest;
import org.elasticsearch.action.resync.ResyncReplicationResponse;
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand Down Expand Up @@ -609,6 +612,13 @@ private TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardRespo
return result;
}

private <Request extends ReplicatedWriteRequest & DocWriteRequest>
BulkShardRequest executeReplicationRequestOnPrimary(IndexShard primary, Request request) throws Exception {
final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(),
new BulkItemRequest[]{new BulkItemRequest(0, request)});
return executeShardBulkOnPrimary(primary, bulkShardRequest).replicaRequest();
}

private void executeShardBulkOnReplica(IndexShard replica, BulkShardRequest request) throws Exception {
final Translog.Location location = TransportShardBulkAction.performOnReplica(request, replica);
TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger);
Expand All @@ -618,13 +628,14 @@ private void executeShardBulkOnReplica(IndexShard replica, BulkShardRequest requ
* indexes the given requests on the supplied primary, modifying it for replicas
*/
BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
final BulkItemRequest bulkItemRequest = new BulkItemRequest(0, request);
BulkItemRequest[] bulkItemRequests = new BulkItemRequest[1];
bulkItemRequests[0] = bulkItemRequest;
final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(), bulkItemRequests);
final TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardResponse> result =
executeShardBulkOnPrimary(primary, bulkShardRequest);
return result.replicaRequest();
return executeReplicationRequestOnPrimary(primary, request);
}

/**
* Executes the delete request on the primary, and modifies it for replicas.
*/
BulkShardRequest deleteOnPrimary(DeleteRequest request, IndexShard primary) throws Exception {
return executeReplicationRequestOnPrimary(primary, request);
}

/**
Expand All @@ -634,6 +645,13 @@ void indexOnReplica(BulkShardRequest request, IndexShard replica) throws Excepti
executeShardBulkOnReplica(replica, request);
}

/**
* Executes the delete request on the given replica shard.
*/
void deleteOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
executeShardBulkOnReplica(replica, request);
}

class GlobalCheckpointSync extends ReplicationAction<
GlobalCheckpointSyncAction.Request,
GlobalCheckpointSyncAction.Request,
Expand Down
Loading