From 291c4e7a0ce63f25a238efc34bbef8b224ac1ec9 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 31 Jan 2019 18:02:24 -0700 Subject: [PATCH] Fix file reading in ccr restore service (#38117) Currently we use the raw byte array length when calling the IndexInput read call to determine how many bytes we want to read. However, due to how BigArrays works, the array length might be longer than the reference length. This commit fixes the issue and uses the BytesRef length when calling read. Additionally, it expands the index follow test to index many more documents. These documents should potentially lead to large enough segment files to trigger scenarios where this fix matters. --- .../repository/CcrRestoreSourceService.java | 3 +- .../elasticsearch/xpack/CcrIntegTestCase.java | 22 ------------ .../xpack/ccr/IndexFollowingIT.java | 35 +++++++++++++------ 3 files changed, 26 insertions(+), 34 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index a72b2f21d71df..f093143112d3d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -235,8 +235,7 @@ private long readFileBytes(String fileName, BytesReference reference) throws IOE BytesRefIterator refIterator = reference.iterator(); BytesRef ref; while ((ref = refIterator.next()) != null) { - byte[] refBytes = ref.bytes; - indexInput.readBytes(refBytes, 0, refBytes.length); + indexInput.readBytes(ref.bytes, ref.offset, ref.length); } long offsetAfterRead = indexInput.getFilePointer(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index c4fdeb116ae86..2dccc0e96b7a2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -70,7 +70,6 @@ import org.elasticsearch.transport.nio.MockNioTransportPlugin; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.LocalStateCcr; -import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -551,27 +550,6 @@ protected void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index fol }); } - protected void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numberOfShards, long expectedTotal) throws Exception { - assertBusy(() -> { - long[] numOfOptimizedOps = new long[numberOfShards]; - for (int shardId = 0; shardId < numberOfShards; shardId++) { - for (String node : getFollowerCluster().nodesInclude(followerIndex.getName())) { - IndicesService indicesService = getFollowerCluster().getInstance(IndicesService.class, node); - IndexShard shard = indicesService.getShardOrNull(new ShardId(followerIndex, shardId)); - if (shard != null && shard.routingEntry().primary()) { - try { - FollowingEngine engine = ((FollowingEngine) IndexShardTestCase.getEngine(shard)); - numOfOptimizedOps[shardId] = engine.getNumberOfOptimizedIndexing(); - } catch (AlreadyClosedException e) { - throw new AssertionError(e); // causes assertBusy to retry - } - } - } - } - assertThat(Arrays.stream(numOfOptimizedOps).sum(), equalTo(expectedTotal)); - }); - } - static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception { CountDownLatch latch = new CountDownLatch(1); clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 55fcb6ace89fd..74c44704e2e1c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; @@ -101,9 +102,30 @@ public void testFollowIndex() throws Exception { assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureLeaderYellow("index1"); - final int firstBatchNumDocs = randomIntBetween(2, 64); + final int firstBatchNumDocs; + // Sometimes we want to index a lot of documents to ensure that the recovery works with larger files + if (rarely()) { + firstBatchNumDocs = randomIntBetween(1800, 2000); + } else { + firstBatchNumDocs = randomIntBetween(10, 64); + } + final int flushPoint = (int) (firstBatchNumDocs * 0.75); + logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); - for (int i = 0; i < firstBatchNumDocs; i++) { + BulkRequestBuilder bulkRequestBuilder = leaderClient().prepareBulk(); + for (int i = 0; i < flushPoint; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + IndexRequest indexRequest = new IndexRequest("index1", "doc", Integer.toString(i)) + .source(source, XContentType.JSON) + .timeout(TimeValue.timeValueSeconds(1)); + bulkRequestBuilder.add(indexRequest); + } + bulkRequestBuilder.get(); + + leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(true).get(); + + // Index some docs after the flush that might be recovered in the normal index following operations + for (int i = flushPoint; i < firstBatchNumDocs; i++) { final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); } @@ -147,7 +169,7 @@ public void testFollowIndex() throws Exception { for (int i = 0; i < firstBatchNumDocs; i++) { assertBusy(assertExpectedDocumentRunnable(i)); } - assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs); + pauseFollow("index2"); followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get(); final int secondBatchNumDocs = randomIntBetween(2, 64); @@ -172,8 +194,6 @@ public void testFollowIndex() throws Exception { for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { assertBusy(assertExpectedDocumentRunnable(i)); } - assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfPrimaryShards, - firstBatchNumDocs + secondBatchNumDocs); pauseFollow("index2"); assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards); } @@ -287,7 +307,6 @@ public void testFollowIndexWithoutWaitForComplete() throws Exception { for (int i = 0; i < firstBatchNumDocs; i++) { assertBusy(assertExpectedDocumentRunnable(i)); } - assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs); pauseFollow("index2"); } @@ -432,8 +451,6 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) assertIndexFullyReplicatedToFollower("index1", "index2"); pauseFollow("index2"); leaderClient().admin().indices().prepareRefresh("index1").get(); - assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfShards, - leaderClient().prepareSearch("index1").get().getHits().getTotalHits().value); assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfShards); } @@ -475,7 +492,6 @@ public void testFollowIndexWithNestedField() throws Exception { } pauseFollow("index2"); assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), 1); - assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), 1, numDocs); } public void testUnfollowNonExistingIndex() { @@ -538,7 +554,6 @@ public void testFollowIndexMaxOperationSizeInBytes() throws Exception { } pauseFollow("index2"); assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), 1); - assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), 1, numDocs); } public void testAttemptToChangeCcrFollowingIndexSetting() throws Exception {