From 92671f66b32c9069c1dcef9e27a78aaf92ae969f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 19 Feb 2018 10:11:34 -0500 Subject: [PATCH 1/9] Add deleteOperation to replication test case --- .../ESIndexLevelReplicationTestCase.java | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index f74ffdc4b4dc4..7a67b04c51356 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -22,6 +22,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkItemRequest; @@ -30,11 +31,13 @@ import org.elasticsearch.action.bulk.BulkShardResponse; import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkActionTests; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.resync.ResyncReplicationRequest; import org.elasticsearch.action.resync.ResyncReplicationResponse; import org.elasticsearch.action.resync.TransportResyncReplicationAction; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -609,6 +612,13 @@ private TransportWriteAction.WritePrimaryResult + BulkShardRequest executeReplicationRequestOnPrimary(IndexShard primary, Request request) throws Exception { + final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(), + new BulkItemRequest[]{new BulkItemRequest(0, request)}); + return executeShardBulkOnPrimary(primary, bulkShardRequest).replicaRequest(); + } + private void executeShardBulkOnReplica(IndexShard replica, BulkShardRequest request) throws Exception { final Translog.Location location = TransportShardBulkAction.performOnReplica(request, replica); TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger); @@ -618,13 +628,14 @@ private void executeShardBulkOnReplica(IndexShard replica, BulkShardRequest requ * indexes the given requests on the supplied primary, modifying it for replicas */ BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception { - final BulkItemRequest bulkItemRequest = new BulkItemRequest(0, request); - BulkItemRequest[] bulkItemRequests = new BulkItemRequest[1]; - bulkItemRequests[0] = bulkItemRequest; - final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(), bulkItemRequests); - final TransportWriteAction.WritePrimaryResult result = - executeShardBulkOnPrimary(primary, bulkShardRequest); - return result.replicaRequest(); + return executeReplicationRequestOnPrimary(primary, request); + } + + /** + * Executes the delete request on the primary, and modifies it for replicas. + */ + BulkShardRequest deleteOnPrimary(DeleteRequest request, IndexShard primary) throws Exception { + return executeReplicationRequestOnPrimary(primary, request); } /** @@ -634,6 +645,13 @@ void indexOnReplica(BulkShardRequest request, IndexShard replica) throws Excepti executeShardBulkOnReplica(replica, request); } + /** + * Executes the delete request on the given replica shard. + */ + void deleteOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + executeShardBulkOnReplica(replica, request); + } + class GlobalCheckpointSync extends ReplicationAction< GlobalCheckpointSyncAction.Request, GlobalCheckpointSyncAction.Request, From ff814c87bf5355f973fdaf6d7062c91ce5f9c64a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 21 Feb 2018 15:05:11 -0500 Subject: [PATCH 2/9] Add out of order delivery for append only request --- .../IndexLevelReplicationTests.java | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 8c15a2a84ddb8..8fe54e7491985 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -26,8 +26,10 @@ import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkShardRequest; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; @@ -58,7 +60,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -368,6 +369,27 @@ public void testSeqNoCollision() throws Exception { } } + /** + * This test ensures the consistency between primary and replica when non-append-only (eg. index request with id or delete) operation + * of the same document is processed before the original append-only request on replicas. The append-only document can be exposed and + * deleted on the primary before it is added to replica. Replicas should treat a late append-only request as a regular index request. + */ + public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception { + try (ReplicationGroup shards = createGroup(1)) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + // Append-only request - without id + final BulkShardRequest indexRequest = indexOnPrimary( + new IndexRequest(index.getName(), "type", null).source("{}", XContentType.JSON), primary); + final String docId = Iterables.get(getShardDocUIDs(primary), 0); + final BulkShardRequest deleteRequest = deleteOnPrimary(new DeleteRequest(index.getName(), "type", docId), primary); + deleteOnReplica(deleteRequest, replica); + indexOnReplica(indexRequest, replica); + shards.assertAllEqual(0); + } + } + /** Throws documentFailure on every indexing operation */ static class ThrowingDocumentFailureEngineFactory implements EngineFactory { final String documentFailureMessage; From 467045af6907143bd9640f18612199058c16cd8a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 22 Feb 2018 09:59:07 -0500 Subject: [PATCH 3/9] track max_seqno of non-append-only requests --- .../index/engine/InternalEngine.java | 52 +++++++++-- .../index/engine/InternalEngineTests.java | 88 +++++++++++++++++++ 2 files changed, 132 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 100a133042d74..5c3352cc72ffb 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -137,6 +137,7 @@ public class InternalEngine extends Engine { private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); + private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); private final CounterMetric numVersionLookups = new CounterMetric(); private final CounterMetric numIndexVersionsLookups = new CounterMetric(); /** @@ -189,7 +190,7 @@ public InternalEngine(EngineConfig engineConfig) { this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, logger, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint, startingCommit); writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit); - updateMaxUnsafeAutoIdTimestampFromWriter(writer); + bootstrapAppendOnlyInfoFromWriter(writer); assert engineConfig.getForceNewHistoryUUID() == false || openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG || openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG @@ -364,15 +365,16 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException { } } - private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) { - long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE; + private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) { for (Map.Entry entry : writer.getLiveCommitData()) { - if (entry.getKey().equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) { - commitMaxUnsafeAutoIdTimestamp = Long.parseLong(entry.getValue()); - break; + final String key = entry.getKey(); + if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) { + maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), Long.parseLong(entry.getValue()))); + } + if (key.equals(SequenceNumbers.MAX_SEQ_NO)) { + maxSeqNoOfNonAppendOnlyOperations.set(Math.max(maxSeqNoOfNonAppendOnlyOperations.get(), Long.parseLong(entry.getValue()))); } } - maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp)); } @Override @@ -893,11 +895,15 @@ public IndexResult index(Index index) throws IOException { private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { final IndexingStrategy plan; - if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) { + final boolean appendOnlyRequest = canOptimizeAddDocument(index); + if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && mayHaveBeenExposedBefore(index) == false) { // no need to deal with out of order delivery - we never saw this one assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]"; plan = IndexingStrategy.optimizedAppendOnly(index.seqNo()); } else { + if (appendOnlyRequest == false) { + updateMaxSeqNoOfNonAppendOnlyOperations(index); + } versionMap.enforceSafeAccess(); // drop out of order operations assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() : @@ -1032,6 +1038,35 @@ private boolean mayHaveBeenIndexedBefore(Index index) { return mayHaveBeenIndexBefore; } + /** + * Checks if the document of an append-only request may have been exposed or seen before on replica. + * It's ok to return `true` even if this is not the case but returning `false` must always be correct. + */ + private boolean mayHaveBeenExposedBefore(Index index) { + /* + * As soon as an append-only request was indexed into the primary, it can be exposed to a search then users can issue + * a follow up operation on it. In rare cases, the follow up operation can be arrived and processed on a replica before + * the original append-only. In this case we can't simply proceed with the append only without consulting the version map. + * If a replica has seen a non-append-only operation with a higher seqno than the seqno of an append-only, it may have seen + * the document of that append-only request. However, if the seqno of an append-only is higher than seqno of any non-append-only + * requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization. + */ + assert canOptimizeAddDocument(index); + assert index.origin() != Operation.Origin.PRIMARY; + return index.seqNo() <= maxSeqNoOfNonAppendOnlyOperations.get(); + } + + private void updateMaxSeqNoOfNonAppendOnlyOperations(Operation op) { + assert op.origin() != Operation.Origin.PRIMARY; + maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(op.seqNo(), curr)); + assert maxSeqNoOfNonAppendOnlyOperations.get() >= op.seqNo(); + } + + // for testing + long getMaxSeqNoOfNonAppendOnlyOperations() { + return maxSeqNoOfNonAppendOnlyOperations.get(); + } + private static void index(final List docs, final IndexWriter indexWriter) throws IOException { if (docs.size() > 1) { indexWriter.addDocuments(docs); @@ -1187,6 +1222,7 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() : "resolving out of order delivery based on versioning but version type isn't fit for it. got [" + delete.versionType() + "]"; + updateMaxSeqNoOfNonAppendOnlyOperations(delete); // unlike the primary, replicas don't really care to about found status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return true for the found flag in favor of code simplicity diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 7d74946b60421..7c76f8db8f181 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4572,4 +4572,92 @@ public void testStressUpdateSameDocWhileGettingIt() throws IOException, Interrup } } } + + public void testTrackMaxSeqNoOfNonAppendOnlyOperations() throws Exception { + IOUtils.close(engine, store); + store = createStore(); + final Path translogPath = createTempDir(); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) { + final CountDownLatch latch = new CountDownLatch(1); + final Thread appendOnlyIndexer = new Thread(() -> { + try { + latch.countDown(); + final int numDocs = scaledRandomIntBetween(100, 1000); + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument("append-only" + i, null, testDocumentWithTextField(), SOURCE, null); + if (randomBoolean()) { + engine.index(appendOnlyReplica(doc, randomBoolean(), 1, engine.getLocalCheckpointTracker().generateSeqNo())); + } else { + engine.index(appendOnlyPrimary(doc, randomBoolean(), randomNonNegativeLong())); + } + } + } catch (Exception ex) { + throw new RuntimeException("Failed to index", ex); + } + }); + appendOnlyIndexer.setName("append-only indexer"); + appendOnlyIndexer.start(); + latch.await(); + long maxSeqNoOfNonAppendOnly = SequenceNumbers.NO_OPS_PERFORMED; + final int numOps = scaledRandomIntBetween(100, 1000); + for (int i = 0; i < numOps; i++) { + ParsedDocument parsedDocument = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null); + if (randomBoolean()) { // On replica - update max_seqno for non-append-only operations + final long seqno = engine.getLocalCheckpointTracker().generateSeqNo(); + final Engine.Index doc = replicaIndexForDoc(parsedDocument, 1, seqno, randomBoolean()); + if (randomBoolean()) { + engine.index(doc); + } else { + engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), seqno, doc.primaryTerm(), + doc.version(), doc.versionType(), doc.origin(), threadPool.relativeTimeInMillis())); + } + maxSeqNoOfNonAppendOnly = seqno; + } else { // On primary - do not update max_seqno for non-append-only operations + if (randomBoolean()) { + engine.index(indexForDoc(parsedDocument)); + } else { + engine.delete(new Engine.Delete(parsedDocument.type(), parsedDocument.id(), newUid(parsedDocument.id()))); + } + } + } + appendOnlyIndexer.join(120_000); + assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(maxSeqNoOfNonAppendOnly)); + globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); + engine.syncTranslog(); + engine.flush(); + } + try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) { + assertThat("max_seqno from non-append-only was not bootstrap from the safe commit", + engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(globalCheckpoint.get())); + } + } + + public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception { + long lookupTimes = 0L; + final LocalCheckpointTracker localCheckpointTracker = engine.getLocalCheckpointTracker(); + final int initDocs = between(0, 10); + for (int i = 0; i < initDocs; i++) { + index(engine, i); + lookupTimes++; + } + // doc1 is delayed and arrived after a non-append-only op. + final long seqNoDoc1 = localCheckpointTracker.generateSeqNo(); + Engine.IndexResult regularDoc = engine.index(replicaIndexForDoc( + testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, localCheckpointTracker.generateSeqNo(), false)); + lookupTimes++; + assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); // + assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(regularDoc.getSeqNo())); + + // should not optimize for doc1 and process as a regular doc (eg. look up in version map) + engine.index(appendOnlyReplica(testParsedDocument("append-only-1", null, testDocumentWithTextField(), SOURCE, null), + false, randomNonNegativeLong(), seqNoDoc1)); + lookupTimes++; + assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); + + // optimize for other append-only 2 (its seqno > max_seqno of non-append-only) - do not look up in version map. + engine.index(appendOnlyReplica(testParsedDocument("append-only-2", null, testDocumentWithTextField(), SOURCE, null), + false, randomNonNegativeLong(), localCheckpointTracker.generateSeqNo())); + assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); + } } From 198c504173095dbb1299fd128ef3bdaa05ac5194 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 22 Feb 2018 12:10:33 -0500 Subject: [PATCH 4/9] inline max_seqno methods --- .../index/engine/InternalEngine.java | 39 ++++++------------- 1 file changed, 11 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 5c3352cc72ffb..928210500fdcf 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -896,13 +896,20 @@ public IndexResult index(Index index) throws IOException { private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { final IndexingStrategy plan; final boolean appendOnlyRequest = canOptimizeAddDocument(index); - if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && mayHaveBeenExposedBefore(index) == false) { - // no need to deal with out of order delivery - we never saw this one + if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) { + /* + * As soon as an append-only request was indexed into the primary, it can be exposed to a search then users can issue + * a follow-up operation on it. In rare cases, the follow up operation can be arrived and processed on a replica before + * the original append-only. In this case we can't simply proceed with the append only without consulting the version map. + * If a replica has seen a non-append-only operation with a higher seqno than the seqno of an append-only, it may have seen + * the document of that append-only request. However if the seqno of an append-only is higher than seqno of any non-append-only + * requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization. + */ assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]"; plan = IndexingStrategy.optimizedAppendOnly(index.seqNo()); } else { if (appendOnlyRequest == false) { - updateMaxSeqNoOfNonAppendOnlyOperations(index); + maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr)); } versionMap.enforceSafeAccess(); // drop out of order operations @@ -1038,30 +1045,6 @@ private boolean mayHaveBeenIndexedBefore(Index index) { return mayHaveBeenIndexBefore; } - /** - * Checks if the document of an append-only request may have been exposed or seen before on replica. - * It's ok to return `true` even if this is not the case but returning `false` must always be correct. - */ - private boolean mayHaveBeenExposedBefore(Index index) { - /* - * As soon as an append-only request was indexed into the primary, it can be exposed to a search then users can issue - * a follow up operation on it. In rare cases, the follow up operation can be arrived and processed on a replica before - * the original append-only. In this case we can't simply proceed with the append only without consulting the version map. - * If a replica has seen a non-append-only operation with a higher seqno than the seqno of an append-only, it may have seen - * the document of that append-only request. However, if the seqno of an append-only is higher than seqno of any non-append-only - * requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization. - */ - assert canOptimizeAddDocument(index); - assert index.origin() != Operation.Origin.PRIMARY; - return index.seqNo() <= maxSeqNoOfNonAppendOnlyOperations.get(); - } - - private void updateMaxSeqNoOfNonAppendOnlyOperations(Operation op) { - assert op.origin() != Operation.Origin.PRIMARY; - maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(op.seqNo(), curr)); - assert maxSeqNoOfNonAppendOnlyOperations.get() >= op.seqNo(); - } - // for testing long getMaxSeqNoOfNonAppendOnlyOperations() { return maxSeqNoOfNonAppendOnlyOperations.get(); @@ -1222,7 +1205,7 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() : "resolving out of order delivery based on versioning but version type isn't fit for it. got [" + delete.versionType() + "]"; - updateMaxSeqNoOfNonAppendOnlyOperations(delete); + maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr)); // unlike the primary, replicas don't really care to about found status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return true for the found flag in favor of code simplicity From d1ef1854b1314795244af706fbf78582732bc415 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 23 Feb 2018 10:37:44 -0500 Subject: [PATCH 5/9] Add assertions --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 928210500fdcf..9a0408b3f3283 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -910,6 +910,8 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio } else { if (appendOnlyRequest == false) { maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr)); + assert maxSeqNoOfNonAppendOnlyOperations.get() >= index.seqNo() : "max_seqno of non-append-only was not updated;" + + "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]"; } versionMap.enforceSafeAccess(); // drop out of order operations @@ -1206,6 +1208,8 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept "resolving out of order delivery based on versioning but version type isn't fit for it. got [" + delete.versionType() + "]"; maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr)); + assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" + + "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]"; // unlike the primary, replicas don't really care to about found status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return true for the found flag in favor of code simplicity From 341063053895919b5aa87a5234b0501ae3074b39 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 3 Mar 2018 21:28:28 -0800 Subject: [PATCH 6/9] Just assign max_seqno from commit --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 9a0408b3f3283..c63b9d7a94a18 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -372,7 +372,7 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) { maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), Long.parseLong(entry.getValue()))); } if (key.equals(SequenceNumbers.MAX_SEQ_NO)) { - maxSeqNoOfNonAppendOnlyOperations.set(Math.max(maxSeqNoOfNonAppendOnlyOperations.get(), Long.parseLong(entry.getValue()))); + maxSeqNoOfNonAppendOnlyOperations.set(Long.parseLong(entry.getValue())); } } } From c2b83f0955640478c1cbbfc16eeb507b19633a8b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 3 Mar 2018 21:42:48 -0800 Subject: [PATCH 7/9] Test with delete --- .../index/engine/InternalEngineTests.java | 17 +++++++++++------ .../index/engine/EngineTestCase.java | 4 ++++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 7c76f8db8f181..a1c0f14a4cb7d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4642,16 +4642,21 @@ public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception { lookupTimes++; } // doc1 is delayed and arrived after a non-append-only op. - final long seqNoDoc1 = localCheckpointTracker.generateSeqNo(); - Engine.IndexResult regularDoc = engine.index(replicaIndexForDoc( - testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, localCheckpointTracker.generateSeqNo(), false)); + final long seqNoAppendOnly1 = localCheckpointTracker.generateSeqNo(); + final long seqnoNormalOp = localCheckpointTracker.generateSeqNo(); + if (randomBoolean()) { + engine.index(replicaIndexForDoc( + testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, seqnoNormalOp, false)); + } else { + engine.delete(replicaDeleteForDoc("d", 1, seqnoNormalOp, randomNonNegativeLong())); + } lookupTimes++; - assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); // - assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(regularDoc.getSeqNo())); + assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); + assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(seqnoNormalOp)); // should not optimize for doc1 and process as a regular doc (eg. look up in version map) engine.index(appendOnlyReplica(testParsedDocument("append-only-1", null, testDocumentWithTextField(), SOURCE, null), - false, randomNonNegativeLong(), seqNoDoc1)); + false, randomNonNegativeLong(), seqNoAppendOnly1)); lookupTimes++; assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 62be0d48bf31c..2d6237de107b4 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -464,4 +464,8 @@ protected Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); } + protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo, long startTime) { + return new Engine.Delete("test", id, newUid(id), seqNo, 1, version, VersionType.EXTERNAL, + Engine.Operation.Origin.REPLICA, startTime); + } } From 440f874a853c5904f0fac3ac6c648ef34171308d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 26 Mar 2018 14:35:54 -0400 Subject: [PATCH 8/9] Minor fix in internal engine test --- .../org/elasticsearch/index/engine/InternalEngineTests.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2c0ddd5b41d91..e022330b664c3 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4530,12 +4530,11 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { } } - public void testTrackMaxSeqNoOfNonAppendOnlyOperations() throws Exception { IOUtils.close(engine, store); store = createStore(); final Path translogPath = createTempDir(); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) { final CountDownLatch latch = new CountDownLatch(1); final Thread appendOnlyIndexer = new Thread(() -> { From 05050a526619487302b005d7b3a8763f1ae1935b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 26 Mar 2018 14:41:25 -0400 Subject: [PATCH 9/9] assert when bootstrapAppendOnlyInfoFromWriter --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 94444f5ddac86..0fda2f04ac5a4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -350,9 +350,13 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) { for (Map.Entry entry : writer.getLiveCommitData()) { final String key = entry.getKey(); if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) { - maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), Long.parseLong(entry.getValue()))); + assert maxUnsafeAutoIdTimestamp.get() == -1 : + "max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]"; + maxUnsafeAutoIdTimestamp.set(Long.parseLong(entry.getValue())); } if (key.equals(SequenceNumbers.MAX_SEQ_NO)) { + assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 : + "max unsafe append-only seq# was assigned already [" + maxSeqNoOfNonAppendOnlyOperations.get() + "]"; maxSeqNoOfNonAppendOnlyOperations.set(Long.parseLong(entry.getValue())); } }