Skip to content

Commit

Permalink
Added ServiceDisruptionScheme(s) and testAckedIndexing
Browse files Browse the repository at this point in the history
This commit adds the notion of ServiceDisruptionScheme allowing for introducing disruptions in our test cluster. This
abstraction as used in a couple of wrappers around the functionality offered by MockTransportService to simulate various
network partions. There is also one implementation for causing a node to be slow in processing cluster state updates.

This new mechnaism is integrated into existing tests DiscoveryWithNetworkFailuresTests.

A new test called testAckedIndexing is added to verify retrieval of documents whose indexing was acked during various disruptions.

Closes #6505
  • Loading branch information
bleskes authored and martijnvg committed Aug 5, 2014
1 parent 773e87d commit 02e34cb
Show file tree
Hide file tree
Showing 17 changed files with 1,144 additions and 152 deletions.
11 changes: 4 additions & 7 deletions src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
logger.error("unexpected failure during [{}]", t, source);
}

@Override
Expand Down Expand Up @@ -408,8 +408,7 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Throwable t) {
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing {} leave request as we are no longer master", node);
}
else {
} else {
logger.error("unexpected failure during [{}]", t, source);
}
}
Expand Down Expand Up @@ -448,8 +447,7 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Throwable t) {
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing [{}] as we are no longer master", source);
}
else {
} else {
logger.error("unexpected failure during [{}]", t, source);
}
}
Expand Down Expand Up @@ -486,8 +484,7 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Throwable t) {
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing [{}] as we are no longer master", source);
}
else {
} else {
logger.error("unexpected failure during [{}]", t, source);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ public void removeHandler(String action) {
}
}

protected TransportRequestHandler getHandler(String action) {
return serverHandlers.get(action);
}

class Adapter implements TransportServiceAdapter {

final MeanMetric rxMetric = new MeanMetric();
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
import static org.hamcrest.Matchers.equalTo;

public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public void continueIndexing(int numOfDocs) {
setBudget(numOfDocs);
}

/** Stop all background threads **/
/** Stop all background threads * */
public void stop() throws InterruptedException {
if (stop.get()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.client.RandomizingClient;
import org.hamcrest.Matchers;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.junit.*;

import java.io.IOException;
Expand Down Expand Up @@ -530,6 +531,7 @@ protected final void afterInternal() throws IOException {
boolean success = false;
try {
logger.info("[{}#{}]: cleaning up after test", getTestClass().getSimpleName(), getTestName());
clearDisruptionScheme();
final Scope currentClusterScope = getCurrentClusterScope();
try {
if (currentClusterScope != Scope.TEST) {
Expand Down Expand Up @@ -643,6 +645,15 @@ protected int numberOfReplicas() {
return between(minimumNumberOfReplicas(), maximumNumberOfReplicas());
}


public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
internalCluster().setDisruptionScheme(scheme);
}

public void clearDisruptionScheme() {
internalCluster().clearDisruptionScheme();
}

/**
* Returns a settings object used in {@link #createIndex(String...)} and {@link #prepareCreate(String)} and friends.
* This method can be overwritten by subclasses to set defaults for the indices that are created by the test.
Expand Down
63 changes: 63 additions & 0 deletions src/test/java/org/elasticsearch/test/InternalTestCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.cache.recycler.MockBigArraysModule;
import org.elasticsearch.test.cache.recycler.MockPageCacheRecyclerModule;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.engine.MockEngineModule;
import org.elasticsearch.test.store.MockFSIndexStoreModule;
import org.elasticsearch.test.transport.AssertingLocalTransportModule;
Expand Down Expand Up @@ -185,6 +186,8 @@ public final class InternalTestCluster extends TestCluster {

private final boolean hasFilterCache;

private ServiceDisruptionScheme activeDisruptionScheme;

public InternalTestCluster(long clusterSeed, String clusterName) {
this(clusterSeed, DEFAULT_MIN_NUM_DATA_NODES, DEFAULT_MAX_NUM_DATA_NODES, clusterName, SettingsSource.EMPTY, DEFAULT_NUM_CLIENT_NODES, DEFAULT_ENABLE_RANDOM_BENCH_NODES);
}
Expand Down Expand Up @@ -288,6 +291,10 @@ public String getClusterName() {
return clusterName;
}

public String[] getNodeNames() {
return nodes.keySet().toArray(Strings.EMPTY_ARRAY);
}

private static boolean isLocalTransportConfigured() {
if ("local".equals(System.getProperty("es.node.mode", "network"))) {
return true;
Expand Down Expand Up @@ -487,6 +494,7 @@ public synchronized void ensureAtMostNumDataNodes(int n) throws IOException {
while (limit.hasNext()) {
NodeAndClient next = limit.next();
nodesToRemove.add(next);
removeDistruptionSchemeFromNode(next);
next.close();
}
for (NodeAndClient toRemove : nodesToRemove) {
Expand Down Expand Up @@ -661,6 +669,10 @@ public boolean apply(NodeAndClient nodeAndClient) {
@Override
public void close() {
if (this.open.compareAndSet(true, false)) {
if (activeDisruptionScheme != null) {
activeDisruptionScheme.testClusterClosed();
activeDisruptionScheme = null;
}
IOUtils.closeWhileHandlingException(nodes.values());
nodes.clear();
executor.shutdownNow();
Expand Down Expand Up @@ -858,6 +870,7 @@ public synchronized void beforeTest(Random random, double transportClientRatio)
}

private synchronized void reset(boolean wipeData) throws IOException {
clearDisruptionScheme();
resetClients(); /* reset all clients - each test gets its own client based on the Random instance created above. */
if (wipeData) {
wipeDataDirectories();
Expand Down Expand Up @@ -1054,6 +1067,7 @@ public synchronized void stopRandomDataNode() throws IOException {
NodeAndClient nodeAndClient = getRandomNodeAndClient(new DataNodePredicate());
if (nodeAndClient != null) {
logger.info("Closing random node [{}] ", nodeAndClient.name);
removeDistruptionSchemeFromNode(nodeAndClient);
nodes.remove(nodeAndClient.name);
nodeAndClient.close();
}
Expand All @@ -1073,6 +1087,7 @@ public boolean apply(NodeAndClient nodeAndClient) {
});
if (nodeAndClient != null) {
logger.info("Closing filtered random node [{}] ", nodeAndClient.name);
removeDistruptionSchemeFromNode(nodeAndClient);
nodes.remove(nodeAndClient.name);
nodeAndClient.close();
}
Expand All @@ -1087,6 +1102,7 @@ public synchronized void stopCurrentMasterNode() throws IOException {
String masterNodeName = getMasterName();
assert nodes.containsKey(masterNodeName);
logger.info("Closing master node [{}] ", masterNodeName);
removeDistruptionSchemeFromNode(nodes.get(masterNodeName));
NodeAndClient remove = nodes.remove(masterNodeName);
remove.close();
}
Expand All @@ -1098,6 +1114,7 @@ public void stopRandomNonMasterNode() throws IOException {
NodeAndClient nodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName())));
if (nodeAndClient != null) {
logger.info("Closing random non master node [{}] current master [{}] ", nodeAndClient.name, getMasterName());
removeDistruptionSchemeFromNode(nodeAndClient);
nodes.remove(nodeAndClient.name);
nodeAndClient.close();
}
Expand Down Expand Up @@ -1151,6 +1168,9 @@ private void restartAllNodes(boolean rollingRestart, RestartCallback callback) t
if (!callback.doRestart(nodeAndClient.name)) {
logger.info("Closing node [{}] during restart", nodeAndClient.name);
toRemove.add(nodeAndClient);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.close();
}
}
Expand All @@ -1165,18 +1185,33 @@ private void restartAllNodes(boolean rollingRestart, RestartCallback callback) t
for (NodeAndClient nodeAndClient : nodes.values()) {
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
logger.info("Restarting node [{}] ", nodeAndClient.name);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.restart(callback);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
}
} else {
int numNodesRestarted = 0;
for (NodeAndClient nodeAndClient : nodes.values()) {
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
logger.info("Stopping node [{}] ", nodeAndClient.name);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.node.close();
}
for (NodeAndClient nodeAndClient : nodes.values()) {
logger.info("Starting node [{}] ", nodeAndClient.name);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.restart(callback);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
}
}
}
Expand Down Expand Up @@ -1374,6 +1409,7 @@ private synchronized void publishNode(NodeAndClient nodeAndClient) {
dataDirToClean.addAll(Arrays.asList(nodeEnv.nodeDataLocations()));
}
nodes.put(nodeAndClient.name, nodeAndClient);
applyDisruptionSchemeToNode(nodeAndClient);
}

public void closeNonSharedNodes(boolean wipeData) throws IOException {
Expand All @@ -1395,6 +1431,33 @@ public boolean hasFilterCache() {
return hasFilterCache;
}

public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
clearDisruptionScheme();
scheme.applyToCluster(this);
activeDisruptionScheme = scheme;
}

public void clearDisruptionScheme() {
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromCluster(this);
}
activeDisruptionScheme = null;
}

private void applyDisruptionSchemeToNode(NodeAndClient nodeAndClient) {
if (activeDisruptionScheme != null) {
assert nodes.containsKey(nodeAndClient.name);
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
}

private void removeDistruptionSchemeFromNode(NodeAndClient nodeAndClient) {
if (activeDisruptionScheme != null) {
assert nodes.containsKey(nodeAndClient.name);
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
}

private synchronized Collection<NodeAndClient> dataNodeAndClients() {
return Collections2.filter(nodes.values(), new DataNodePredicate());
}
Expand Down
1 change: 1 addition & 0 deletions src/test/java/org/elasticsearch/test/TestCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.indices.IndexMissingException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.disruption;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.transport.MockTransportService;

import java.util.Random;
import java.util.Set;

public class NetworkDelaysPartition extends NetworkPartition {

static long DEFAULT_DELAY_MIN = 10000;
static long DEFAULT_DELAY_MAX = 90000;


final long delayMin;
final long delayMax;

TimeValue duration;

public NetworkDelaysPartition(Random random) {
this(random, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX);
}

public NetworkDelaysPartition(Random random, long delayMin, long delayMax) {
super(random);
this.delayMin = delayMin;
this.delayMax = delayMax;
}

public NetworkDelaysPartition(String node1, String node2, Random random) {
this(node1, node2, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX, random);
}

public NetworkDelaysPartition(String node1, String node2, long delayMin, long delayMax, Random random) {
super(node1, node2, random);
this.delayMin = delayMin;
this.delayMax = delayMax;
}

public NetworkDelaysPartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, Random random) {
this(nodesSideOne, nodesSideTwo, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX, random);
}

public NetworkDelaysPartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, long delayMin, long delayMax, Random random) {
super(nodesSideOne, nodesSideTwo, random);
this.delayMin = delayMin;
this.delayMax = delayMax;

}

@Override
public synchronized void startDisrupting() {
duration = new TimeValue(delayMin + random.nextInt((int) (delayMax - delayMin)));
super.startDisrupting();
}

@Override
void applyDisruption(DiscoveryNode node1, MockTransportService transportService1,
DiscoveryNode node2, MockTransportService transportService2) {
transportService1.addUnresponsiveRule(node1, duration);
transportService1.addUnresponsiveRule(node2, duration);
}

@Override
protected String getPartitionDescription() {
return "network delays for [" + duration + "]";
}

}
Loading

0 comments on commit 02e34cb

Please sign in to comment.