Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

H2ORunner for JUnit tests #4237

Merged
merged 1 commit into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions h2o-core/src/test/java/water/rapids/RapidsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import water.*;
import water.fvec.Frame;
import water.fvec.NFSFileVec;
Expand All @@ -18,6 +19,8 @@
import water.rapids.vals.ValFrame;
import water.rapids.vals.ValNums;
import water.rapids.vals.ValStrs;
import water.runner.CloudSize;
import water.runner.H2ORunner;
import water.util.ArrayUtils;
import water.util.FileUtils;
import water.util.Log;
Expand All @@ -28,24 +31,22 @@
import java.util.Random;

import static org.junit.Assert.*;
import static water.TestUtil.*;
import static water.rapids.Rapids.IllegalASTException;


public class RapidsTest extends TestUtil {
@RunWith(H2ORunner.class)
@CloudSize(1)
public class RapidsTest{
@Rule
public transient ExpectedException ee = ExpectedException.none();

@BeforeClass
public static void setup() {
stall_till_cloudsize(1);
}


@Test
public void testSpearmanIris() {
Session session = new Session();
Scope.enter();
try {
final Frame iris = TestUtil.parse_test_file(Key.make("iris_spearman"), "smalldata/junit/iris.csv");
final Frame iris = parse_test_file(Key.make("iris_spearman"), "smalldata/junit/iris.csv");
Scope.track_generic(iris);
final Val spearmanMatrix = Rapids.exec("(cor iris_spearman iris_spearman \"complete.obs\" \"Spearman\")", session);
assertTrue(spearmanMatrix instanceof ValFrame);
Expand Down
56 changes: 56 additions & 0 deletions h2o-core/src/test/java/water/runner/CheckKeysTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package water.runner;

import org.junit.Ignore;
import water.*;
import water.util.ArrayUtils;

import java.util.Arrays;
import java.util.Set;

@Ignore
public class CheckKeysTask extends MRTask<CheckKeysTask> {

Key[] leakedKeys;

/**
* Determines if a key leak is ignorable
*
* @param key A leaked key
* @param value An instance of {@link Value} associated with the key
* @return True if the leak is considered to be ignorable, otherwise false
*/
protected static boolean isIgnorableKeyLeak(final Key key, final Value value) {

return value == null || value.isVecGroup() || value.isESPCGroup() || key.equals(Job.LIST) ||
(value.isJob() && value.<Job>get().isStopped());
}

@Override
public void reduce(CheckKeysTask mrt) {
leakedKeys = ArrayUtils.append(leakedKeys, mrt.leakedKeys);
}

@Override
protected void setupLocal() {

final Set<Key> initKeys = LocalTestRuntime.initKeys;
final Set<Key> keysAfterTest = H2O.localKeySet();

final int numLeakedKeys = keysAfterTest.size() - initKeys.size();
leakedKeys = numLeakedKeys > 0 ? new Key[numLeakedKeys] : new Key[]{};
if (numLeakedKeys > 0) {
int leakedKeysPointer = 0;

for (Key key : keysAfterTest) {
if (initKeys.contains(key)) continue;

final Value keyValue = Value.STORE_get(key);
if (!isIgnorableKeyLeak(key, keyValue)) {
leakedKeys[leakedKeysPointer++] = key;
}
}
if (leakedKeysPointer < numLeakedKeys) leakedKeys = Arrays.copyOfRange(leakedKeys, 0, leakedKeysPointer);
}

}
}
16 changes: 16 additions & 0 deletions h2o-core/src/test/java/water/runner/CleanAllKeysTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package water.runner;

import org.junit.Ignore;
import water.H2O;
import water.MRTask;

@Ignore
public class CleanAllKeysTask extends MRTask<CleanAllKeysTask> {

@Override
protected void setupLocal() {
LocalTestRuntime.initKeys.clear();
H2O.raw_clear();
water.fvec.Vec.ESPC.clear();
}
}
19 changes: 19 additions & 0 deletions h2o-core/src/test/java/water/runner/CloudSize.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package water.runner;

import org.junit.Ignore;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Minimal required cloud size for a JUnit test to run on
*/
@Ignore
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface CloudSize {

int value();
}
14 changes: 14 additions & 0 deletions h2o-core/src/test/java/water/runner/CollectInitKeysTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package water.runner;

import org.junit.Ignore;
import water.H2O;
import water.MRTask;

@Ignore
public class CollectInitKeysTask extends MRTask<CollectInitKeysTask> {

@Override
protected void setupLocal() {
LocalTestRuntime.initKeys.addAll(H2O.localKeySet());
}
}
162 changes: 162 additions & 0 deletions h2o-core/src/test/java/water/runner/H2ORunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package water.runner;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.internal.AssumptionViolatedException;
import org.junit.internal.runners.model.EachTestNotifier;
import org.junit.runner.Description;
import org.junit.runner.notification.RunNotifier;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.junit.runners.model.FrameworkMethod;
import org.junit.runners.model.InitializationError;
import org.junit.runners.model.Statement;
import org.junit.runners.model.TestClass;
import water.Key;
import water.TestUtil;
import water.Value;
import water.fvec.Frame;
import water.fvec.Vec;
import water.util.Log;

import java.util.HashSet;
import java.util.List;
import java.util.Set;


@Ignore
public class H2ORunner extends BlockJUnit4ClassRunner {
private final TestClass testClass;

/**
* Creates a BlockJUnit4ClassRunner to run {@code klass}
*
* @param klass
* @throws InitializationError if the test class is malformed.
*/
public H2ORunner(Class<?> klass) throws InitializationError {
super(klass);
testClass = getTestClass();
TestUtil.stall_till_cloudsize(fetchCloudSize());
}


/**
* Returns a {@link Statement}: run all non-overridden {@code @BeforeClass} methods on this class
* and superclasses before executing {@code statement}; if any throws an
* Exception, stop execution and pass the exception on.
*/
protected Statement withBeforeClasses(Statement statement) {
List<FrameworkMethod> befores = getTestClass()
.getAnnotatedMethods(BeforeClass.class);
return befores.isEmpty() ? statement :
new H2ORunnerBefores(statement, befores, null);
}

@Override
protected Statement withAfterClasses(Statement statement) {
final List<FrameworkMethod> afters = testClass
.getAnnotatedMethods(AfterClass.class);
return new H2ORunnerAfters(statement, afters, null);
}

@Override
protected void runChild(FrameworkMethod method, RunNotifier notifier) {
final Description description = describeChild(method);
if (isIgnored(method)) {
notifier.fireTestIgnored(description);
} else {
leaf(methodBlock(method), description, notifier);
}
}

/**
* Runs a {@link Statement} that represents a leaf (aka atomic) test.
*/
private void leaf(Statement statement, Description description,
RunNotifier notifier) {
final EachTestNotifier eachNotifier = new EachTestNotifier(notifier, description);
eachNotifier.fireTestStarted();
try {
statement.evaluate();
} catch (AssumptionViolatedException e) {
eachNotifier.addFailedAssumption(e);
} catch (Throwable e) {
eachNotifier.addFailure(e);
} finally {
try {
checkLeakedKeys(description);
} catch (Throwable t) {
eachNotifier.addFailure(t);
}
eachNotifier.fireTestFinished();
}
}

private void checkLeakedKeys(final Description description) {
final CheckKeysTask checkKeysTask = new CheckKeysTask().doAllNodes();
if (checkKeysTask.leakedKeys.length == 0) {
return;
}

printLeakedKeys(checkKeysTask.leakedKeys);
throw new IllegalStateException(String.format("Test method '%s.%s' leaked %d keys.", description.getTestClass().getName(), description.getMethodName(), checkKeysTask.leakedKeys.length));
}


private void printLeakedKeys(final Key[] leakedKeys) {
final Set<Key> leakedKeysSet = new HashSet<>(leakedKeys.length);

for (Key k : leakedKeys) {
leakedKeysSet.add(k);
}

for (Key key : leakedKeys) {

final Value keyValue = Value.STORE_get(key);
if (keyValue != null && keyValue.isFrame()) {
Frame frame = (Frame) key.get();
Log.err(String.format("Leaked frame with key '%s'. This frame contains the following vectors:", frame._key.toString()));

for (Key vecKey : frame.keys()) {
if (!leakedKeysSet.contains(vecKey)) continue;
Log.err(String.format(" Vector '%s'. This vector contains the following chunks:", vecKey.toString()));

final Vec vec = (Vec) vecKey.get();
for (int i = 0; i < vec.nChunks(); i++) {
final Key chunkKey = vec.chunkKey(i);
if (!leakedKeysSet.contains(chunkKey)) continue;
Log.err(String.format(" Chunk id %d, key '%s'", i, chunkKey));
leakedKeysSet.remove(chunkKey);
}

leakedKeysSet.remove(vecKey);
}
leakedKeysSet.remove(key);
}
}

if (!leakedKeysSet.isEmpty()) {
Log.err(String.format("%nThere are also %d more leaked keys:", leakedKeysSet.size()));
}

for (Key key : leakedKeysSet) {
Log.err(String.format("Key '%s'", key.toString()));
}
}



private int fetchCloudSize() {
final CloudSize annotation = testClass.getAnnotation(CloudSize.class);
if (annotation == null) throw new IllegalStateException("@CloudSize annotation is missing for test class: " + testClass.getName());

final int cloudSize = annotation.value();

if(cloudSize < 1) throw new IllegalStateException("@CloudSize annotation must specify sizes greater than zero. Given value: " + cloudSize);

return cloudSize;
}


}
49 changes: 49 additions & 0 deletions h2o-core/src/test/java/water/runner/H2ORunnerAfters.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package water.runner;

import org.junit.Ignore;
import org.junit.internal.runners.statements.RunAfters;
import org.junit.runners.model.FrameworkMethod;
import org.junit.runners.model.MultipleFailureException;
import org.junit.runners.model.Statement;

import java.util.ArrayList;
import java.util.List;

@Ignore
public class H2ORunnerAfters extends RunAfters {

private final Statement next;

private final Object target;

private final List<FrameworkMethod> afters;

public H2ORunnerAfters(Statement next, List<FrameworkMethod> afters, Object target) {
super(next, afters, target);
this.next = next;
this.target = target;
this.afters = afters;
}

@Override
public void evaluate() throws Throwable {
List<Throwable> errors = new ArrayList<Throwable>();
try {
next.evaluate();
} catch (Throwable e) {
errors.add(e);
} finally {
// Clean all keys shared for the whole test class, created during @BeforeClass,
// but not cleaned in @AfterClass.
new CleanAllKeysTask().doAllNodes();
for (FrameworkMethod each : afters) {
try {
each.invokeExplosively(target);
} catch (Throwable e) {
errors.add(e);
}
}
}
MultipleFailureException.assertEmpty(errors);
}
}
Loading