Skip to content

Commit

Permalink
Added ServiceDisruptionScheme(s) and testAckedIndexing
Browse files Browse the repository at this point in the history
This commit adds the notion of ServiceDisruptionScheme allowing for introducing disruptions in our test cluster. This
abstraction as used in a couple of wrappers around the functionality offered by MockTransportService to simulate various
network partions. There is also one implementation for causing a node to be slow in processing cluster state updates.

This new mechnaism is integrated into existing tests DiscoveryWithNetworkFailuresTests.

A new test called testAckedIndexing is added to verify retrieval of documents whose indexing was acked during various disruptions.

Closes #6505
  • Loading branch information
bleskes committed Jul 8, 2014
1 parent 59fe6a8 commit a9cf336
Show file tree
Hide file tree
Showing 17 changed files with 1,145 additions and 156 deletions.
17 changes: 6 additions & 11 deletions src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
logger.error("unexpected failure during [{}]", t, source);
}

@Override
Expand Down Expand Up @@ -406,8 +406,7 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Throwable t) {
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing {} leave request as we are no longer master", node);
}
else {
} else {
logger.error("unexpected failure during [{}]", t, source);
}
}
Expand Down Expand Up @@ -446,8 +445,7 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Throwable t) {
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing [{}] as we are no longer master", source);
}
else {
} else {
logger.error("unexpected failure during [{}]", t, source);
}
}
Expand Down Expand Up @@ -484,8 +482,7 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Throwable t) {
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing [{}] as we are no longer master", source);
}
else {
} else {
logger.error("unexpected failure during [{}]", t, source);
}
}
Expand Down Expand Up @@ -594,7 +591,7 @@ void handleNewClusterStateFromMaster(ClusterState newClusterState, final Publish
return;
}
if (master) {
logger.debug("received cluster state from [{}] which is also master but with cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName);
logger.debug("received cluster state from [{}] which is also master but with cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName);
final ClusterState newState = newClusterState;
clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override
Expand Down Expand Up @@ -638,7 +635,6 @@ public void onFailure(String source, Throwable t) {
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
processNewClusterStates.add(processClusterState);


assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master";
assert !newClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block";

Expand Down Expand Up @@ -1014,8 +1010,7 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Throwable t) {
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing [{}] as we are no longer master", source);
}
else {
} else {
logger.error("unexpected failure during [{}]", t, source);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ public void removeHandler(String action) {
}
}

protected TransportRequestHandler getHandler(String action) {
return serverHandlers.get(action);
}

class Adapter implements TransportServiceAdapter {

final MeanMetric rxMetric = new MeanMetric();
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
import static org.hamcrest.Matchers.equalTo;

public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
Expand Down
5 changes: 2 additions & 3 deletions src/test/java/org/elasticsearch/test/BackgroundIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.recovery.RecoveryWhileUnderLoadTests;
import org.junit.Assert;

import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -40,7 +39,7 @@

public class BackgroundIndexer implements AutoCloseable {

private final ESLogger logger = Loggers.getLogger(RecoveryWhileUnderLoadTests.class);
private final ESLogger logger = Loggers.getLogger(getClass());

final Thread[] writers;
final CountDownLatch stopLatch;
Expand Down Expand Up @@ -218,7 +217,7 @@ public void continueIndexing(int numOfDocs) {
setBudget(numOfDocs);
}

/** Stop all background threads **/
/** Stop all background threads * */
public void stop() throws InterruptedException {
if (stop.get()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.client.RandomizingClient;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.junit.*;

import java.io.IOException;
Expand Down Expand Up @@ -518,6 +519,7 @@ protected final void afterInternal() throws IOException {
boolean success = false;
try {
logger.info("[{}#{}]: cleaning up after test", getTestClass().getSimpleName(), getTestName());
clearDisruptionScheme();
final Scope currentClusterScope = getCurrentClusterScope();
try {
if (currentClusterScope != Scope.TEST) {
Expand Down Expand Up @@ -631,6 +633,15 @@ protected int numberOfReplicas() {
return between(minimumNumberOfReplicas(), maximumNumberOfReplicas());
}


public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
internalCluster().setDisruptionScheme(scheme);
}

public void clearDisruptionScheme() {
internalCluster().clearDisruptionScheme();
}

/**
* Returns a settings object used in {@link #createIndex(String...)} and {@link #prepareCreate(String)} and friends.
* This method can be overwritten by subclasses to set defaults for the indices that are created by the test.
Expand Down
63 changes: 63 additions & 0 deletions src/test/java/org/elasticsearch/test/InternalTestCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.cache.recycler.MockBigArraysModule;
import org.elasticsearch.test.cache.recycler.MockPageCacheRecyclerModule;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.engine.MockEngineModule;
import org.elasticsearch.test.store.MockFSIndexStoreModule;
import org.elasticsearch.test.transport.AssertingLocalTransportModule;
Expand Down Expand Up @@ -175,6 +176,8 @@ public final class InternalTestCluster extends TestCluster {

private final boolean hasFilterCache;

private ServiceDisruptionScheme activeDisruptionScheme;

public InternalTestCluster(long clusterSeed, String clusterName) {
this(clusterSeed, DEFAULT_MIN_NUM_DATA_NODES, DEFAULT_MAX_NUM_DATA_NODES, clusterName, NodeSettingsSource.EMPTY, DEFAULT_NUM_CLIENT_NODES, DEFAULT_ENABLE_RANDOM_BENCH_NODES);
}
Expand Down Expand Up @@ -277,6 +280,10 @@ public String getClusterName() {
return clusterName;
}

public String[] getNodeNames() {
return nodes.keySet().toArray(Strings.EMPTY_ARRAY);
}

private static boolean isLocalTransportConfigured() {
if ("local".equals(System.getProperty("es.node.mode", "network"))) {
return true;
Expand Down Expand Up @@ -476,6 +483,7 @@ public synchronized void ensureAtMostNumDataNodes(int n) throws IOException {
while (limit.hasNext()) {
NodeAndClient next = limit.next();
nodesToRemove.add(next);
removeDistruptionSchemeFromNode(next);
next.close();
}
for (NodeAndClient toRemove : nodesToRemove) {
Expand Down Expand Up @@ -639,6 +647,10 @@ public boolean apply(NodeAndClient nodeAndClient) {
@Override
public void close() {
if (this.open.compareAndSet(true, false)) {
if (activeDisruptionScheme != null) {
activeDisruptionScheme.testClusterClosed();
activeDisruptionScheme = null;
}
IOUtils.closeWhileHandlingException(nodes.values());
nodes.clear();
executor.shutdownNow();
Expand Down Expand Up @@ -811,6 +823,7 @@ public synchronized void beforeTest(Random random, double transportClientRatio)
}

private synchronized void reset(boolean wipeData) throws IOException {
clearDisruptionScheme();
resetClients(); /* reset all clients - each test gets its own client based on the Random instance created above. */
if (wipeData) {
wipeDataDirectories();
Expand Down Expand Up @@ -1007,6 +1020,7 @@ public synchronized void stopRandomDataNode() throws IOException {
NodeAndClient nodeAndClient = getRandomNodeAndClient(new DataNodePredicate());
if (nodeAndClient != null) {
logger.info("Closing random node [{}] ", nodeAndClient.name);
removeDistruptionSchemeFromNode(nodeAndClient);
nodes.remove(nodeAndClient.name);
nodeAndClient.close();
}
Expand All @@ -1026,6 +1040,7 @@ public boolean apply(NodeAndClient nodeAndClient) {
});
if (nodeAndClient != null) {
logger.info("Closing filtered random node [{}] ", nodeAndClient.name);
removeDistruptionSchemeFromNode(nodeAndClient);
nodes.remove(nodeAndClient.name);
nodeAndClient.close();
}
Expand All @@ -1040,6 +1055,7 @@ public synchronized void stopCurrentMasterNode() throws IOException {
String masterNodeName = getMasterName();
assert nodes.containsKey(masterNodeName);
logger.info("Closing master node [{}] ", masterNodeName);
removeDistruptionSchemeFromNode(nodes.get(masterNodeName));
NodeAndClient remove = nodes.remove(masterNodeName);
remove.close();
}
Expand All @@ -1051,6 +1067,7 @@ public void stopRandomNonMasterNode() throws IOException {
NodeAndClient nodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName())));
if (nodeAndClient != null) {
logger.info("Closing random non master node [{}] current master [{}] ", nodeAndClient.name, getMasterName());
removeDistruptionSchemeFromNode(nodeAndClient);
nodes.remove(nodeAndClient.name);
nodeAndClient.close();
}
Expand Down Expand Up @@ -1104,6 +1121,9 @@ private void restartAllNodes(boolean rollingRestart, RestartCallback callback) t
if (!callback.doRestart(nodeAndClient.name)) {
logger.info("Closing node [{}] during restart", nodeAndClient.name);
toRemove.add(nodeAndClient);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.close();
}
}
Expand All @@ -1118,18 +1138,33 @@ private void restartAllNodes(boolean rollingRestart, RestartCallback callback) t
for (NodeAndClient nodeAndClient : nodes.values()) {
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
logger.info("Restarting node [{}] ", nodeAndClient.name);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.restart(callback);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
}
} else {
int numNodesRestarted = 0;
for (NodeAndClient nodeAndClient : nodes.values()) {
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
logger.info("Stopping node [{}] ", nodeAndClient.name);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.node.close();
}
for (NodeAndClient nodeAndClient : nodes.values()) {
logger.info("Starting node [{}] ", nodeAndClient.name);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.restart(callback);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
}
}
}
Expand Down Expand Up @@ -1337,6 +1372,7 @@ private synchronized void publishNode(NodeAndClient nodeAndClient) {
dataDirToClean.addAll(Arrays.asList(nodeEnv.nodeDataLocations()));
}
nodes.put(nodeAndClient.name, nodeAndClient);
applyDisruptionSchemeToNode(nodeAndClient);
}

public void closeNonSharedNodes(boolean wipeData) throws IOException {
Expand All @@ -1358,6 +1394,33 @@ public boolean hasFilterCache() {
return hasFilterCache;
}

public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
clearDisruptionScheme();
scheme.applyToCluster(this);
activeDisruptionScheme = scheme;
}

public void clearDisruptionScheme() {
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromCluster(this);
}
activeDisruptionScheme = null;
}

private void applyDisruptionSchemeToNode(NodeAndClient nodeAndClient) {
if (activeDisruptionScheme != null) {
assert nodes.containsKey(nodeAndClient.name);
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
}

private void removeDistruptionSchemeFromNode(NodeAndClient nodeAndClient) {
if (activeDisruptionScheme != null) {
assert nodes.containsKey(nodeAndClient.name);
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
}

private synchronized Collection<NodeAndClient> dataNodeAndClients() {
return Collections2.filter(nodes.values(), new DataNodePredicate());
}
Expand Down
1 change: 1 addition & 0 deletions src/test/java/org/elasticsearch/test/TestCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.indices.IndexMissingException;
Expand Down
Loading

0 comments on commit a9cf336

Please sign in to comment.