From 5e0649fedfe452d24e8c4dc23ee32ddcac2c1e4f Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 6 Nov 2018 16:19:01 +0100 Subject: [PATCH 01/10] [RCI] Check blocks while having indexing permit in TransportReplicationAction --- .../TransportReplicationAction.java | 66 +++++++++++-------- .../TransportReplicationActionTests.java | 59 ++++++++++++++--- 2 files changed, 88 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index e030fa4f15190..e387ccb64eea7 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -45,6 +45,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -235,6 +236,32 @@ protected TransportRequestOptions transportOptions(Settings settings) { return TransportRequestOptions.EMPTY; } + private String concreteIndex(final ClusterState state, final ReplicationRequest request) { + return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index(); + } + + private boolean handleBlockExceptions(final ClusterState state, + final ReplicationRequest request, + final CheckedConsumer consumer) { + ClusterBlockLevel globalBlockLevel = globalBlockLevel(); + if (globalBlockLevel != null) { + ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel); + if (blockException != null) { + consumer.accept(blockException); + return true; + } + } + ClusterBlockLevel indexBlockLevel = indexBlockLevel(); + if (indexBlockLevel != null) { + ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, concreteIndex(state, request)); + if (blockException != null) { + consumer.accept(blockException); + return true; + } + } + return false; + } + protected boolean retryPrimaryException(final Throwable e) { return e.getClass() == ReplicationOperation.RetryOnPrimaryException.class || TransportActions.isShardNotAvailableException(e); @@ -310,6 +337,10 @@ protected void doRun() throws Exception { @Override public void onResponse(PrimaryShardReference primaryShardReference) { try { + final ClusterState clusterState = clusterService.state(); + if (handleBlockExceptions(clusterState, request, this::handleBlockException)) { + return; + } if (primaryShardReference.isRelocated()) { primaryShardReference.close(); // release shard operation lock as soon as possible setPhase(replicationTask, "primary_delegation"); @@ -323,7 +354,7 @@ public void onResponse(PrimaryShardReference primaryShardReference) { response.readFrom(in); return response; }; - DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId()); + DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction, new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm), transportOptions, @@ -356,6 +387,11 @@ public void handleException(TransportException exp) { } } + private void handleBlockException(final ClusterBlockException blockException) { + logger.trace("cluster is blocked, action failed on primary", blockException); + throw blockException; + } + @Override public void onFailure(Exception e) { setPhase(replicationTask, "finished"); @@ -696,12 +732,12 @@ public void onFailure(Exception e) { protected void doRun() { setPhase(task, "routing"); final ClusterState state = observer.setAndGetObservedState(); - if (handleBlockExceptions(state)) { + if (handleBlockExceptions(state, request, this::handleBlockException)) { return; } // request does not have a shardId yet, we need to pass the concrete index to resolve shardId - final String concreteIndex = concreteIndex(state); + final String concreteIndex = concreteIndex(state, request); final IndexMetaData indexMetaData = state.metaData().index(concreteIndex); if (indexMetaData == null) { retry(new IndexNotFoundException(concreteIndex)); @@ -776,35 +812,11 @@ private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) { return false; } - private String concreteIndex(ClusterState state) { - return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index(); - } - private ShardRouting primary(ClusterState state) { IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId()); return indexShard.primaryShard(); } - private boolean handleBlockExceptions(ClusterState state) { - ClusterBlockLevel globalBlockLevel = globalBlockLevel(); - if (globalBlockLevel != null) { - ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel); - if (blockException != null) { - handleBlockException(blockException); - return true; - } - } - ClusterBlockLevel indexBlockLevel = indexBlockLevel(); - if (indexBlockLevel != null) { - ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, concreteIndex(state)); - if (blockException != null) { - handleBlockException(blockException); - return true; - } - } - return false; - } - private void handleBlockException(ClusterBlockException blockException) { if (blockException.retryable()) { logger.trace("cluster is blocked, scheduling a retry", blockException); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index aeda5f1c3fa80..718bf4e28ad51 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -108,6 +108,7 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -191,7 +192,7 @@ void assertListenerThrows(String msg, PlainActionFuture listener, Class listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); @@ -248,6 +249,53 @@ protected ClusterBlockLevel globalBlockLevel() { assertListenerThrows("should fail with an IndexNotFoundException when no blocks checked", listener, IndexNotFoundException.class); } + public void testBlocksInPrimaryAction() { + final boolean globalBlock = randomBoolean(); + + final TestAction actionWithBlocks = + new TestAction(Settings.EMPTY, "internal:actionWithBlocks", transportService, clusterService, shardStateAction, threadPool) { + @Override + protected ClusterBlockLevel globalBlockLevel() { + return globalBlock ? ClusterBlockLevel.WRITE : null; + } + + @Override + protected ClusterBlockLevel indexBlockLevel() { + return globalBlock == false ? ClusterBlockLevel.WRITE : null; + } + }; + + final String index = "index"; + final ShardId shardId = new ShardId(index, "_na_", 0); + setState(clusterService, stateWithActivePrimary(index, true, randomInt(5))); + + final ClusterBlocks.Builder block = ClusterBlocks.builder(); + if (globalBlock) { + block.addGlobalBlock(new ClusterBlock(randomIntBetween(1, 16), "test global block", randomBoolean(), randomBoolean(), + randomBoolean(), RestStatus.BAD_REQUEST, ClusterBlockLevel.ALL)); + } else { + block.addIndexBlock(index, new ClusterBlock(randomIntBetween(1, 16), "test index block", randomBoolean(), randomBoolean(), + randomBoolean(), RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE)); + } + setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block)); + + final ClusterState clusterState = clusterService.state(); + final String targetAllocationID = clusterState.getRoutingTable().shardRoutingTable(shardId).primaryShard().allocationId().getId(); + final long primaryTerm = clusterState.metaData().index(index).primaryTerm(shardId.id()); + final Request request = new Request(shardId); + final ReplicationTask task = maybeTask(); + final PlainActionFuture listener = new PlainActionFuture<>(); + + final TransportReplicationAction.AsyncPrimaryAction asyncPrimaryActionWithBlocks = + actionWithBlocks.new AsyncPrimaryAction(request, targetAllocationID, primaryTerm, createTransportChannel(listener), task); + asyncPrimaryActionWithBlocks.run(); + + final ExecutionException exception = expectThrows(ExecutionException.class, listener::get); + assertThat(exception.getCause(), instanceOf(ClusterBlockException.class)); + assertThat(exception.getCause(), hasToString(containsString("test " + (globalBlock ? "global" : "index") + " block"))); + assertPhase(task, "finished"); + } + public void assertIndexShardUninitialized() { assertEquals(0, count.get()); } @@ -1115,15 +1163,6 @@ private class TestAction extends TransportReplicationAction()), new IndexNameExpressionResolver(), - Request::new, Request::new, ThreadPool.Names.SAME); - } - @Override protected TestResponse newResponseInstance() { return new TestResponse(); From 8dc7f234f2e21bfe8ef17da467ac52b21c371658 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 8 Nov 2018 12:37:54 +0100 Subject: [PATCH 02/10] Use Optional --- .../TransportReplicationAction.java | 77 +++++++++---------- .../TransportReplicationActionTests.java | 3 +- 2 files changed, 39 insertions(+), 41 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index e387ccb64eea7..3ccbc19ecd2e5 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -45,7 +45,6 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -84,6 +83,7 @@ import java.io.IOException; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Supplier; @@ -240,26 +240,22 @@ private String concreteIndex(final ClusterState state, final ReplicationRequest return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index(); } - private boolean handleBlockExceptions(final ClusterState state, - final ReplicationRequest request, - final CheckedConsumer consumer) { + private Optional handleBlockExceptions(final ClusterState state, final String indexName) { ClusterBlockLevel globalBlockLevel = globalBlockLevel(); if (globalBlockLevel != null) { ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel); if (blockException != null) { - consumer.accept(blockException); - return true; + return Optional.of(blockException); } } ClusterBlockLevel indexBlockLevel = indexBlockLevel(); if (indexBlockLevel != null) { - ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, concreteIndex(state, request)); + ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, indexName); if (blockException != null) { - consumer.accept(blockException); - return true; + return Optional.of(blockException); } } - return false; + return Optional.empty(); } protected boolean retryPrimaryException(final Throwable e) { @@ -338,12 +334,11 @@ protected void doRun() throws Exception { public void onResponse(PrimaryShardReference primaryShardReference) { try { final ClusterState clusterState = clusterService.state(); - if (handleBlockExceptions(clusterState, request, this::handleBlockException)) { - return; - } + final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index()); + handleBlockExceptions(clusterState, indexMetaData.getIndex().getName()).ifPresent(this::handleBlockException); if (primaryShardReference.isRelocated()) { primaryShardReference.close(); // release shard operation lock as soon as possible - setPhase(replicationTask, "primary_delegation"); + setPhase(replicationTask, "primary_delegation"); // delegate primary phase to relocation target // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase. @@ -732,35 +727,37 @@ public void onFailure(Exception e) { protected void doRun() { setPhase(task, "routing"); final ClusterState state = observer.setAndGetObservedState(); - if (handleBlockExceptions(state, request, this::handleBlockException)) { - return; - } - - // request does not have a shardId yet, we need to pass the concrete index to resolve shardId final String concreteIndex = concreteIndex(state, request); - final IndexMetaData indexMetaData = state.metaData().index(concreteIndex); - if (indexMetaData == null) { - retry(new IndexNotFoundException(concreteIndex)); - return; - } - if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { - throw new IndexClosedException(indexMetaData.getIndex()); - } - - // resolve all derived request fields, so we can route and apply it - resolveRequest(indexMetaData, request); - assert request.shardId() != null : "request shardId must be set in resolveRequest"; - assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest"; - final ShardRouting primary = primary(state); - if (retryIfUnavailable(state, primary)) { - return; - } - final DiscoveryNode node = state.nodes().get(primary.currentNodeId()); - if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { - performLocalAction(state, primary, node, indexMetaData); + Optional blockException = handleBlockExceptions(state, concreteIndex); + if (blockException.isPresent()) { + handleBlockException(blockException.get()); } else { - performRemoteAction(state, primary, node); + // request does not have a shardId yet, we need to pass the concrete index to resolve shardId + final IndexMetaData indexMetaData = state.metaData().index(concreteIndex); + if (indexMetaData == null) { + retry(new IndexNotFoundException(concreteIndex)); + return; + } + if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { + throw new IndexClosedException(indexMetaData.getIndex()); + } + + // resolve all derived request fields, so we can route and apply it + resolveRequest(indexMetaData, request); + assert request.shardId() != null : "request shardId must be set in resolveRequest"; + assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest"; + + final ShardRouting primary = primary(state); + if (retryIfUnavailable(state, primary)) { + return; + } + final DiscoveryNode node = state.nodes().get(primary.currentNodeId()); + if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { + performLocalAction(state, primary, node, indexMetaData); + } else { + performRemoteAction(state, primary, node); + } } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 718bf4e28ad51..ac1688afe8f76 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -730,7 +730,7 @@ public void testSeqNoIsSetOnPrimary() throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); - final IndexShard shard = mock(IndexShard.class); + final IndexShard shard = mockIndexShard(shardId, clusterService); when(shard.getPendingPrimaryTerm()).thenReturn(primaryTerm); when(shard.routingEntry()).thenReturn(routingEntry); when(shard.isRelocatedPrimary()).thenReturn(false); @@ -1222,6 +1222,7 @@ final IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterSe private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) { final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.shardId()).thenReturn(shardId); doAnswer(invocation -> { ActionListener callback = (ActionListener) invocation.getArguments()[0]; count.incrementAndGet(); From 85299c2738c178725755bea5c83d7cc3863d74b5 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 8 Nov 2018 14:30:55 +0100 Subject: [PATCH 03/10] Fix checkstyle --- .../action/support/replication/TransportReplicationAction.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 3ccbc19ecd2e5..331d1675d25e0 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -746,7 +746,8 @@ protected void doRun() { // resolve all derived request fields, so we can route and apply it resolveRequest(indexMetaData, request); assert request.shardId() != null : "request shardId must be set in resolveRequest"; - assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest"; + assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : + "request waitForActiveShards must be set in resolveRequest"; final ShardRouting primary = primary(state); if (retryIfUnavailable(state, primary)) { From ab42a14e67710ced4d93e3db765fe1fbd9b42522 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 8 Nov 2018 15:58:47 +0100 Subject: [PATCH 04/10] Simplify handleBlockException --- .../TransportReplicationAction.java | 42 +++++++++---------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 331d1675d25e0..2439b86795aa9 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -83,7 +83,6 @@ import java.io.IOException; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Supplier; @@ -240,22 +239,22 @@ private String concreteIndex(final ClusterState state, final ReplicationRequest return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index(); } - private Optional handleBlockExceptions(final ClusterState state, final String indexName) { + private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) { ClusterBlockLevel globalBlockLevel = globalBlockLevel(); if (globalBlockLevel != null) { ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel); if (blockException != null) { - return Optional.of(blockException); + return blockException; } } ClusterBlockLevel indexBlockLevel = indexBlockLevel(); if (indexBlockLevel != null) { ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, indexName); if (blockException != null) { - return Optional.of(blockException); + return blockException; } } - return Optional.empty(); + return null; } protected boolean retryPrimaryException(final Throwable e) { @@ -335,7 +334,13 @@ public void onResponse(PrimaryShardReference primaryShardReference) { try { final ClusterState clusterState = clusterService.state(); final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index()); - handleBlockExceptions(clusterState, indexMetaData.getIndex().getName()).ifPresent(this::handleBlockException); + + final ClusterBlockException blockException = blockExceptions(clusterState, indexMetaData.getIndex().getName()); + if (blockException != null) { + logger.trace("cluster is blocked, action failed on primary", blockException); + throw blockException; + } + if (primaryShardReference.isRelocated()) { primaryShardReference.close(); // release shard operation lock as soon as possible setPhase(replicationTask, "primary_delegation"); @@ -382,11 +387,6 @@ public void handleException(TransportException exp) { } } - private void handleBlockException(final ClusterBlockException blockException) { - logger.trace("cluster is blocked, action failed on primary", blockException); - throw blockException; - } - @Override public void onFailure(Exception e) { setPhase(replicationTask, "finished"); @@ -729,9 +729,14 @@ protected void doRun() { final ClusterState state = observer.setAndGetObservedState(); final String concreteIndex = concreteIndex(state, request); - Optional blockException = handleBlockExceptions(state, concreteIndex); - if (blockException.isPresent()) { - handleBlockException(blockException.get()); + final ClusterBlockException blockException = blockExceptions(state, concreteIndex); + if (blockException != null) { + if (blockException.retryable()) { + logger.trace("cluster is blocked, scheduling a retry", blockException); + retry(blockException); + } else { + finishAsFailed(blockException); + } } else { // request does not have a shardId yet, we need to pass the concrete index to resolve shardId final IndexMetaData indexMetaData = state.metaData().index(concreteIndex); @@ -815,15 +820,6 @@ private ShardRouting primary(ClusterState state) { return indexShard.primaryShard(); } - private void handleBlockException(ClusterBlockException blockException) { - if (blockException.retryable()) { - logger.trace("cluster is blocked, scheduling a retry", blockException); - retry(blockException); - } else { - finishAsFailed(blockException); - } - } - private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction, final TransportRequest requestToPerform) { transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler() { From b8ff35f60ce2951e124ca460ddb70418ba8c42b2 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Sat, 10 Nov 2018 09:01:13 +0100 Subject: [PATCH 05/10] Apply feedback --- .../TransportReplicationAction.java | 68 +++++++-------- .../TransportReplicationActionTests.java | 86 ++++--------------- 2 files changed, 49 insertions(+), 105 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 2439b86795aa9..8e754f2eb1eae 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -259,7 +259,15 @@ private ClusterBlockException blockExceptions(final ClusterState state, final St protected boolean retryPrimaryException(final Throwable e) { return e.getClass() == ReplicationOperation.RetryOnPrimaryException.class - || TransportActions.isShardNotAvailableException(e); + || TransportActions.isShardNotAvailableException(e) + || isRetryableClusterBlockException(e); + } + + boolean isRetryableClusterBlockException(final Throwable e) { + if (e instanceof ClusterBlockException) { + return ((ClusterBlockException) e).retryable(); + } + return false; } protected class OperationTransportHandler implements TransportRequestHandler { @@ -343,7 +351,7 @@ public void onResponse(PrimaryShardReference primaryShardReference) { if (primaryShardReference.isRelocated()) { primaryShardReference.close(); // release shard operation lock as soon as possible - setPhase(replicationTask, "primary_delegation"); + setPhase(replicationTask, "primary_delegation"); // delegate primary phase to relocation target // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase. @@ -729,41 +737,31 @@ protected void doRun() { final ClusterState state = observer.setAndGetObservedState(); final String concreteIndex = concreteIndex(state, request); - final ClusterBlockException blockException = blockExceptions(state, concreteIndex); - if (blockException != null) { - if (blockException.retryable()) { - logger.trace("cluster is blocked, scheduling a retry", blockException); - retry(blockException); - } else { - finishAsFailed(blockException); - } - } else { - // request does not have a shardId yet, we need to pass the concrete index to resolve shardId - final IndexMetaData indexMetaData = state.metaData().index(concreteIndex); - if (indexMetaData == null) { - retry(new IndexNotFoundException(concreteIndex)); - return; - } - if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { - throw new IndexClosedException(indexMetaData.getIndex()); - } + // request does not have a shardId yet, we need to pass the concrete index to resolve shardId + final IndexMetaData indexMetaData = state.metaData().index(concreteIndex); + if (indexMetaData == null) { + retry(new IndexNotFoundException(concreteIndex)); + return; + } + if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { + throw new IndexClosedException(indexMetaData.getIndex()); + } - // resolve all derived request fields, so we can route and apply it - resolveRequest(indexMetaData, request); - assert request.shardId() != null : "request shardId must be set in resolveRequest"; - assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : - "request waitForActiveShards must be set in resolveRequest"; + // resolve all derived request fields, so we can route and apply it + resolveRequest(indexMetaData, request); + assert request.shardId() != null : "request shardId must be set in resolveRequest"; + assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : + "request waitForActiveShards must be set in resolveRequest"; - final ShardRouting primary = primary(state); - if (retryIfUnavailable(state, primary)) { - return; - } - final DiscoveryNode node = state.nodes().get(primary.currentNodeId()); - if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { - performLocalAction(state, primary, node, indexMetaData); - } else { - performRemoteAction(state, primary, node); - } + final ShardRouting primary = primary(state); + if (retryIfUnavailable(state, primary)) { + return; + } + final DiscoveryNode node = state.nodes().get(primary.currentNodeId()); + if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { + performLocalAction(state, primary, node, indexMetaData); + } else { + performRemoteAction(state, primary, node); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index ac1688afe8f76..3a75ee52673bf 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -89,6 +89,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Locale; @@ -183,7 +184,7 @@ public static void afterClass() { threadPool = null; } - void assertListenerThrows(String msg, PlainActionFuture listener, Class klass) throws InterruptedException { + private void assertListenerThrows(String msg, PlainActionFuture listener, Class klass) throws InterruptedException { try { listener.get(); fail(msg); @@ -192,63 +193,6 @@ void assertListenerThrows(String msg, PlainActionFuture listener, Class listener = new PlainActionFuture<>(); - ReplicationTask task = maybeTask(); - TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithBlocks", - transportService, clusterService, shardStateAction, threadPool) { - @Override - protected ClusterBlockLevel globalBlockLevel() { - return ClusterBlockLevel.WRITE; - } - }; - - ClusterBlocks.Builder block = ClusterBlocks.builder().addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, - false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); - setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block)); - TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); - reroutePhase.run(); - assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class); - assertPhase(task, "failed"); - - block = ClusterBlocks.builder() - .addGlobalBlock(new ClusterBlock(1, "retryable", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); - setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block)); - listener = new PlainActionFuture<>(); - reroutePhase = action.new ReroutePhase(task, new Request().timeout("5ms"), listener); - reroutePhase.run(); - assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class); - assertPhase(task, "failed"); - assertFalse(request.isRetrySet.get()); - - listener = new PlainActionFuture<>(); - reroutePhase = action.new ReroutePhase(task, request = new Request(), listener); - reroutePhase.run(); - assertFalse("primary phase should wait on retryable block", listener.isDone()); - assertPhase(task, "waiting_for_retry"); - assertTrue(request.isRetrySet.get()); - - block = ClusterBlocks.builder().addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, false, - RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); - setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block)); - assertListenerThrows("primary phase should fail operation when moving from a retryable block to a non-retryable one", listener, - ClusterBlockException.class); - assertIndexShardUninitialized(); - - action = new TestAction(Settings.EMPTY, "internal:testActionWithNoBlocks", transportService, clusterService, shardStateAction, - threadPool) { - @Override - protected ClusterBlockLevel globalBlockLevel() { - return null; - } - }; - listener = new PlainActionFuture<>(); - reroutePhase = action.new ReroutePhase(task, new Request().timeout("5ms"), listener); - reroutePhase.run(); - assertListenerThrows("should fail with an IndexNotFoundException when no blocks checked", listener, IndexNotFoundException.class); - } - public void testBlocksInPrimaryAction() { final boolean globalBlock = randomBoolean(); @@ -425,21 +369,12 @@ public void testClosedIndexOnReroute() throws InterruptedException { PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); - ClusterBlockLevel indexBlockLevel = randomBoolean() ? ClusterBlockLevel.WRITE : null; TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithBlocks", transportService, - clusterService, shardStateAction, threadPool) { - @Override - protected ClusterBlockLevel indexBlockLevel() { - return indexBlockLevel; - } - }; + clusterService, shardStateAction, threadPool); TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); - if (indexBlockLevel == ClusterBlockLevel.WRITE) { - assertListenerThrows("must throw block exception", listener, ClusterBlockException.class); - } else { - assertListenerThrows("must throw index closed exception", listener, IndexClosedException.class); - } + assertListenerThrows("must throw index closed exception", listener, IndexClosedException.class); + assertPhase(task, "failed"); assertFalse(request.isRetrySet.get()); } @@ -1070,6 +1005,17 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl transportService.stop(); } + public void testIsRetryableClusterBlockException() { + final TestAction action = new TestAction(Settings.EMPTY, "internal:testIsRetryableClusterBlockException", transportService, + clusterService, shardStateAction, threadPool); + assertFalse(action.isRetryableClusterBlockException(randomRetryPrimaryException(new ShardId("index", "_na_", 0)))); + + final boolean retryable = randomBoolean(); + ClusterBlock randomBlock = new ClusterBlock(randomIntBetween(1, 16), "test", retryable, randomBoolean(), + randomBoolean(), randomFrom(RestStatus.values()), EnumSet.of(randomFrom(ClusterBlockLevel.values()))); + assertEquals(retryable, action.isRetryableClusterBlockException(new ClusterBlockException(Collections.singleton(randomBlock)))); + } + private void assertConcreteShardRequest(TransportRequest capturedRequest, Request expectedRequest, AllocationId expectedAllocationId) { final TransportReplicationAction.ConcreteShardRequest concreteShardRequest = (TransportReplicationAction.ConcreteShardRequest) capturedRequest; From 8f097fae0edbfaef035aea8333de44fc81d04fac Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 12 Nov 2018 12:28:48 +0100 Subject: [PATCH 06/10] Adapt ReroutePhase tests --- .../TransportReplicationActionTests.java | 145 +++++++++++++++++- 1 file changed, 137 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 3a75ee52673bf..021b47e620097 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -75,6 +75,8 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransport; +import org.elasticsearch.transport.ReceiveTimeoutTransportException; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; @@ -101,6 +103,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static java.util.Collections.singleton; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS; @@ -113,6 +116,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.Is.is; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; @@ -184,12 +188,137 @@ public static void afterClass() { threadPool = null; } - private void assertListenerThrows(String msg, PlainActionFuture listener, Class klass) throws InterruptedException { - try { - listener.get(); - fail(msg); - } catch (ExecutionException ex) { - assertThat(ex.getCause(), instanceOf(klass)); + private T assertListenerThrows(String msg, PlainActionFuture listener, Class klass) { + ExecutionException exception = expectThrows(ExecutionException.class, msg, listener::get); + assertThat(exception.getCause(), instanceOf(klass)); + @SuppressWarnings("unchecked") + final T cause = (T) exception.getCause(); + return cause; + } + + public void testBlocksInReroutePhase() throws Exception { + final ClusterBlock nonRetryableBlock = + new ClusterBlock(1, "non retryable", false, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL); + final ClusterBlock retryableBlock = + new ClusterBlock(1, "retryable", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL); + + final TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithBlocks", + transportService, clusterService, shardStateAction, threadPool) { + @Override + protected ClusterBlockLevel globalBlockLevel() { + return ClusterBlockLevel.WRITE; + } + }; + + setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary("index", true, 0)); + + { + setState(clusterService, + ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(nonRetryableBlock))); + + Request request = new Request().index("index").setShardId(new ShardId("index", "_na_", 0)); + PlainActionFuture listener = new PlainActionFuture<>(); + ReplicationTask task = maybeTask(); + + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); + reroutePhase.run(); + + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests, arrayWithSize(1)); + assertThat(capturedRequests[0].action, equalTo("internal:testActionWithBlocks[p]")); + transport.handleRemoteError(capturedRequests[0].requestId, new ClusterBlockException(singleton(nonRetryableBlock))); + RemoteTransportException exception = + assertListenerThrows("primary action should fail operation", listener, RemoteTransportException.class); + assertThat(exception.unwrapCause(), instanceOf(ClusterBlockException.class)); + assertThat(((ClusterBlockException) exception.unwrapCause()).blocks().iterator().next(), is(nonRetryableBlock)); + assertPhase(task, "failed"); + } + { + setState(clusterService, + ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(retryableBlock))); + + Request requestWithTimeout = new Request().index("index").setShardId(new ShardId("index", "_na_", 0)).timeout("5ms"); + PlainActionFuture listener = new PlainActionFuture<>(); + ReplicationTask task = maybeTask(); + + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, requestWithTimeout, listener); + reroutePhase.run(); + + assertFalse(listener.isDone()); + assertPhase(task, "waiting_on_primary"); + assertFalse(requestWithTimeout.isRetrySet.get()); + + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests, arrayWithSize(1)); + assertThat(capturedRequests[0].action, equalTo("internal:testActionWithBlocks[p]")); + transport.handleRemoteError(capturedRequests[0].requestId, new ClusterBlockException(singleton(retryableBlock))); + + assertFalse("reroute phase is retrying", listener.isDone()); + assertBusy(() -> { + assertPhase(task, "waiting_for_retry"); + assertTrue(requestWithTimeout.isRetrySet.get()); + }); + + assertBusy(() -> { + // Cluster state update triggers a retry in the reroute phase + setState(clusterService, ClusterState.builder(clusterService.state())); + CapturingTransport.CapturedRequest[] capturedRequestsAndClear = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequestsAndClear, arrayWithSize(1)); + transport.handleRemoteError(capturedRequestsAndClear[0].requestId, new ClusterBlockException(singleton(retryableBlock))); + assertPhase(task, "failed"); + assertTrue(listener.isDone()); + }); + } + { + setState(clusterService, + ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(retryableBlock))); + + Request requestWithTimeout = new Request().index("index").setShardId(new ShardId("index", "_na_", 0)).timeout("5ms"); + PlainActionFuture listener = new PlainActionFuture<>(); + ReplicationTask task = maybeTask(); + + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, requestWithTimeout, listener); + reroutePhase.run(); + + assertFalse(listener.isDone()); + assertPhase(task, "waiting_on_primary"); + assertFalse(requestWithTimeout.isRetrySet.get()); + + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests, arrayWithSize(1)); + assertThat(capturedRequests[0].action, equalTo("internal:testActionWithBlocks[p]")); + transport.handleRemoteError(capturedRequests[0].requestId, new ClusterBlockException(singleton(retryableBlock))); + + assertFalse("reroute phase is retrying", listener.isDone()); + assertBusy(() -> { + assertPhase(task, "waiting_for_retry"); + assertTrue(requestWithTimeout.isRetrySet.get()); + }); + + setState(clusterService, + ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(nonRetryableBlock))); + + CapturingTransport.CapturedRequest[] capturedRequestsAndClear = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequestsAndClear, arrayWithSize(1)); + transport.handleRemoteError(capturedRequestsAndClear[0].requestId, new ClusterBlockException(singleton(nonRetryableBlock))); + RemoteTransportException exception = assertListenerThrows("primary phase should fail operation when moving from a retryable " + + "block to a non-retryable one", listener, RemoteTransportException.class); + assertThat(exception.unwrapCause(), instanceOf(ClusterBlockException.class)); + assertThat(((ClusterBlockException) exception.unwrapCause()).blocks().iterator().next(), is(nonRetryableBlock)); + assertPhase(task, "failed"); + assertIndexShardUninitialized(); + } + { + Request requestWithTimeout = new Request().index("unknown").setShardId(new ShardId("unknown", "_na_", 0)).timeout("5ms"); + PlainActionFuture listener = new PlainActionFuture<>(); + ReplicationTask task = maybeTask(); + + TestAction testActionWithNoBlocks = new TestAction(Settings.EMPTY, "internal:testActionWithNoBlocks", transportService, + clusterService, shardStateAction, threadPool); + listener = new PlainActionFuture<>(); + TestAction.ReroutePhase reroutePhase = testActionWithNoBlocks.new ReroutePhase(task, requestWithTimeout, listener); + reroutePhase.run(); + assertListenerThrows("should fail with an IndexNotFoundException when no blocks", listener, IndexNotFoundException.class); } } @@ -670,7 +799,7 @@ public void testSeqNoIsSetOnPrimary() throws Exception { when(shard.routingEntry()).thenReturn(routingEntry); when(shard.isRelocatedPrimary()).thenReturn(false); IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId); - Set inSyncIds = randomBoolean() ? Collections.singleton(routingEntry.allocationId().getId()) : + Set inSyncIds = randomBoolean() ? singleton(routingEntry.allocationId().getId()) : clusterService.state().metaData().index(index).inSyncAllocationIds(0); when(shard.getReplicationGroup()).thenReturn( new ReplicationGroup(shardRoutingTable, @@ -1013,7 +1142,7 @@ public void testIsRetryableClusterBlockException() { final boolean retryable = randomBoolean(); ClusterBlock randomBlock = new ClusterBlock(randomIntBetween(1, 16), "test", retryable, randomBoolean(), randomBoolean(), randomFrom(RestStatus.values()), EnumSet.of(randomFrom(ClusterBlockLevel.values()))); - assertEquals(retryable, action.isRetryableClusterBlockException(new ClusterBlockException(Collections.singleton(randomBlock)))); + assertEquals(retryable, action.isRetryableClusterBlockException(new ClusterBlockException(singleton(randomBlock)))); } private void assertConcreteShardRequest(TransportRequest capturedRequest, Request expectedRequest, AllocationId expectedAllocationId) { From 39f5a3fa20fdcc51b38cbe1f9abaa313633e7cbe Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 12 Nov 2018 12:32:52 +0100 Subject: [PATCH 07/10] checkstyle --- .../support/replication/TransportReplicationActionTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 021b47e620097..3772206266a49 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -75,7 +75,6 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransport; -import org.elasticsearch.transport.ReceiveTimeoutTransportException; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; From 09a9068c2680375d2bfad55f6982e6c4b6907104 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 12 Nov 2018 17:07:03 +0100 Subject: [PATCH 08/10] More feedback --- .../TransportReplicationAction.java | 57 +++++++++------ .../TransportReplicationActionTests.java | 73 ++++--------------- 2 files changed, 49 insertions(+), 81 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 8e754f2eb1eae..4a68ae08133d2 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -736,32 +736,41 @@ protected void doRun() { setPhase(task, "routing"); final ClusterState state = observer.setAndGetObservedState(); final String concreteIndex = concreteIndex(state, request); + final ClusterBlockException blockException = blockExceptions(state, concreteIndex); + if (blockException != null) { + if (isRetryableClusterBlockException(blockException)) { + logger.trace("cluster is blocked, scheduling a retry", blockException); + retry(blockException); + } else { + finishAsFailed(blockException); + } + } else { + // request does not have a shardId yet, we need to pass the concrete index to resolve shardId + final IndexMetaData indexMetaData = state.metaData().index(concreteIndex); + if (indexMetaData == null) { + retry(new IndexNotFoundException(concreteIndex)); + return; + } + if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { + throw new IndexClosedException(indexMetaData.getIndex()); + } - // request does not have a shardId yet, we need to pass the concrete index to resolve shardId - final IndexMetaData indexMetaData = state.metaData().index(concreteIndex); - if (indexMetaData == null) { - retry(new IndexNotFoundException(concreteIndex)); - return; - } - if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { - throw new IndexClosedException(indexMetaData.getIndex()); - } - - // resolve all derived request fields, so we can route and apply it - resolveRequest(indexMetaData, request); - assert request.shardId() != null : "request shardId must be set in resolveRequest"; - assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : - "request waitForActiveShards must be set in resolveRequest"; + // resolve all derived request fields, so we can route and apply it + resolveRequest(indexMetaData, request); + assert request.shardId() != null : "request shardId must be set in resolveRequest"; + assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : + "request waitForActiveShards must be set in resolveRequest"; - final ShardRouting primary = primary(state); - if (retryIfUnavailable(state, primary)) { - return; - } - final DiscoveryNode node = state.nodes().get(primary.currentNodeId()); - if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { - performLocalAction(state, primary, node, indexMetaData); - } else { - performRemoteAction(state, primary, node); + final ShardRouting primary = primary(state); + if (retryIfUnavailable(state, primary)) { + return; + } + final DiscoveryNode node = state.nodes().get(primary.currentNodeId()); + if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { + performLocalAction(state, primary, node, indexMetaData); + } else { + performRemoteAction(state, primary, node); + } } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 3772206266a49..1fbb307900a26 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -75,7 +75,6 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransport; -import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; @@ -215,20 +214,15 @@ protected ClusterBlockLevel globalBlockLevel() { setState(clusterService, ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(nonRetryableBlock))); - Request request = new Request().index("index").setShardId(new ShardId("index", "_na_", 0)); + Request request = new Request(); PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); - CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); - assertThat(capturedRequests, arrayWithSize(1)); - assertThat(capturedRequests[0].action, equalTo("internal:testActionWithBlocks[p]")); - transport.handleRemoteError(capturedRequests[0].requestId, new ClusterBlockException(singleton(nonRetryableBlock))); - RemoteTransportException exception = - assertListenerThrows("primary action should fail operation", listener, RemoteTransportException.class); - assertThat(exception.unwrapCause(), instanceOf(ClusterBlockException.class)); + ClusterBlockException exception = + assertListenerThrows("primary action should fail operation", listener, ClusterBlockException.class); assertThat(((ClusterBlockException) exception.unwrapCause()).blocks().iterator().next(), is(nonRetryableBlock)); assertPhase(task, "failed"); } @@ -236,75 +230,40 @@ protected ClusterBlockLevel globalBlockLevel() { setState(clusterService, ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(retryableBlock))); - Request requestWithTimeout = new Request().index("index").setShardId(new ShardId("index", "_na_", 0)).timeout("5ms"); + Request requestWithTimeout = new Request().timeout("5ms"); PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, requestWithTimeout, listener); reroutePhase.run(); - assertFalse(listener.isDone()); - assertPhase(task, "waiting_on_primary"); - assertFalse(requestWithTimeout.isRetrySet.get()); - - CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); - assertThat(capturedRequests, arrayWithSize(1)); - assertThat(capturedRequests[0].action, equalTo("internal:testActionWithBlocks[p]")); - transport.handleRemoteError(capturedRequests[0].requestId, new ClusterBlockException(singleton(retryableBlock))); - - assertFalse("reroute phase is retrying", listener.isDone()); - assertBusy(() -> { - assertPhase(task, "waiting_for_retry"); - assertTrue(requestWithTimeout.isRetrySet.get()); - }); - - assertBusy(() -> { - // Cluster state update triggers a retry in the reroute phase - setState(clusterService, ClusterState.builder(clusterService.state())); - CapturingTransport.CapturedRequest[] capturedRequestsAndClear = transport.getCapturedRequestsAndClear(); - assertThat(capturedRequestsAndClear, arrayWithSize(1)); - transport.handleRemoteError(capturedRequestsAndClear[0].requestId, new ClusterBlockException(singleton(retryableBlock))); - assertPhase(task, "failed"); - assertTrue(listener.isDone()); - }); + ClusterBlockException exception = + assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class); + assertThat(((ClusterBlockException) exception.unwrapCause()).blocks().iterator().next(), is(retryableBlock)); + assertPhase(task, "failed"); + assertTrue(requestWithTimeout.isRetrySet.get()); } { setState(clusterService, ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(retryableBlock))); - Request requestWithTimeout = new Request().index("index").setShardId(new ShardId("index", "_na_", 0)).timeout("5ms"); + Request request = new Request(); PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); - TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, requestWithTimeout, listener); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); - assertFalse(listener.isDone()); - assertPhase(task, "waiting_on_primary"); - assertFalse(requestWithTimeout.isRetrySet.get()); - - CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); - assertThat(capturedRequests, arrayWithSize(1)); - assertThat(capturedRequests[0].action, equalTo("internal:testActionWithBlocks[p]")); - transport.handleRemoteError(capturedRequests[0].requestId, new ClusterBlockException(singleton(retryableBlock))); - - assertFalse("reroute phase is retrying", listener.isDone()); - assertBusy(() -> { - assertPhase(task, "waiting_for_retry"); - assertTrue(requestWithTimeout.isRetrySet.get()); - }); + assertFalse("primary phase should wait on retryable block", listener.isDone()); + assertPhase(task, "waiting_for_retry"); + assertTrue(request.isRetrySet.get()); setState(clusterService, ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(nonRetryableBlock))); - CapturingTransport.CapturedRequest[] capturedRequestsAndClear = transport.getCapturedRequestsAndClear(); - assertThat(capturedRequestsAndClear, arrayWithSize(1)); - transport.handleRemoteError(capturedRequestsAndClear[0].requestId, new ClusterBlockException(singleton(nonRetryableBlock))); - RemoteTransportException exception = assertListenerThrows("primary phase should fail operation when moving from a retryable " + - "block to a non-retryable one", listener, RemoteTransportException.class); - assertThat(exception.unwrapCause(), instanceOf(ClusterBlockException.class)); + ClusterBlockException exception = assertListenerThrows("primary phase should fail operation when moving from a retryable " + + "block to a non-retryable one", listener, ClusterBlockException.class); assertThat(((ClusterBlockException) exception.unwrapCause()).blocks().iterator().next(), is(nonRetryableBlock)); - assertPhase(task, "failed"); assertIndexShardUninitialized(); } { From 93d94fd5a990c501a648a03c25c0899d0815d5d3 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 12 Nov 2018 17:17:24 +0100 Subject: [PATCH 09/10] Restore blockException.retryable() --- .../action/support/replication/TransportReplicationAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 4a68ae08133d2..38972a7f77462 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -738,7 +738,7 @@ protected void doRun() { final String concreteIndex = concreteIndex(state, request); final ClusterBlockException blockException = blockExceptions(state, concreteIndex); if (blockException != null) { - if (isRetryableClusterBlockException(blockException)) { + if (blockException.retryable()) { logger.trace("cluster is blocked, scheduling a retry", blockException); retry(blockException); } else { From ed7591bcf9f6f476c81b7b3be54853a6674a7b2f Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 13 Nov 2018 15:39:33 +0100 Subject: [PATCH 10/10] Also check index blocks --- .../TransportReplicationActionTests.java | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 1fbb307900a26..c8c40a7f5841a 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -194,27 +194,42 @@ private T assertListenerThrows(String msg, PlainActionFuture listener, Cl return cause; } + private void setStateWithBlock(final ClusterService clusterService, final ClusterBlock block, final boolean globalBlock) { + final ClusterBlocks.Builder blocks = ClusterBlocks.builder(); + if (globalBlock) { + blocks.addGlobalBlock(block); + } else { + blocks.addIndexBlock("index", block); + } + setState(clusterService, ClusterState.builder(clusterService.state()).blocks(blocks).build()); + } + public void testBlocksInReroutePhase() throws Exception { final ClusterBlock nonRetryableBlock = new ClusterBlock(1, "non retryable", false, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL); final ClusterBlock retryableBlock = new ClusterBlock(1, "retryable", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL); + final boolean globalBlock = randomBoolean(); final TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithBlocks", transportService, clusterService, shardStateAction, threadPool) { @Override protected ClusterBlockLevel globalBlockLevel() { - return ClusterBlockLevel.WRITE; + return globalBlock ? ClusterBlockLevel.WRITE : null; + } + + @Override + protected ClusterBlockLevel indexBlockLevel() { + return globalBlock == false ? ClusterBlockLevel.WRITE : null; } }; setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary("index", true, 0)); { - setState(clusterService, - ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(nonRetryableBlock))); + setStateWithBlock(clusterService, nonRetryableBlock, globalBlock); - Request request = new Request(); + Request request = globalBlock ? new Request() : new Request().index("index"); PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); @@ -227,10 +242,9 @@ protected ClusterBlockLevel globalBlockLevel() { assertPhase(task, "failed"); } { - setState(clusterService, - ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(retryableBlock))); + setStateWithBlock(clusterService, retryableBlock, globalBlock); - Request requestWithTimeout = new Request().timeout("5ms"); + Request requestWithTimeout = (globalBlock ? new Request() : new Request().index("index")).timeout("5ms"); PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); @@ -244,10 +258,9 @@ protected ClusterBlockLevel globalBlockLevel() { assertTrue(requestWithTimeout.isRetrySet.get()); } { - setState(clusterService, - ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(retryableBlock))); + setStateWithBlock(clusterService, retryableBlock, globalBlock); - Request request = new Request(); + Request request = globalBlock ? new Request() : new Request().index("index"); PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); @@ -258,8 +271,7 @@ protected ClusterBlockLevel globalBlockLevel() { assertPhase(task, "waiting_for_retry"); assertTrue(request.isRetrySet.get()); - setState(clusterService, - ClusterState.builder(clusterService.state()).blocks(ClusterBlocks.builder().addGlobalBlock(nonRetryableBlock))); + setStateWithBlock(clusterService, nonRetryableBlock, globalBlock); ClusterBlockException exception = assertListenerThrows("primary phase should fail operation when moving from a retryable " + "block to a non-retryable one", listener, ClusterBlockException.class);