Skip to content

Commit

Permalink
fixes #1268: apoc.periodic.repeat doesn't provide any feedback when q…
Browse files Browse the repository at this point in the history
…uery is bad (#1320)
  • Loading branch information
conker84 authored and sarmbruster committed Nov 13, 2019
1 parent 9c1b596 commit 5fb075e
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 7 deletions.
36 changes: 35 additions & 1 deletion src/main/java/apoc/periodic/Periodic.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public Stream<JobInfo> list() {
@Procedure(mode = Mode.WRITE)
@Description("apoc.periodic.commit(statement,params) - runs the given statement in separate transactions until it returns 0")
public Stream<RundownResult> commit(@Name("statement") String statement, @Name(value = "params", defaultValue = "{}") Map<String,Object> parameters) throws ExecutionException, InterruptedException {
validateQuery(statement);
Map<String,Object> params = parameters == null ? Collections.emptyMap() : parameters;
long total = 0, executions = 0, updates = 0;
long start = System.nanoTime();
Expand Down Expand Up @@ -134,6 +135,7 @@ public Stream<JobInfo> cancel(@Name("name") String name) {
@Procedure(mode = Mode.WRITE)
@Description("apoc.periodic.submit('name',statement) - submit a one-off background statement")
public Stream<JobInfo> submit(@Name("name") String name, @Name("statement") String statement) {
validateQuery(statement);
JobInfo info = submit(name, () -> {
try {
db.executeTransactionally(statement);
Expand All @@ -148,16 +150,22 @@ public Stream<JobInfo> submit(@Name("name") String name, @Name("statement") Stri
@Procedure(mode = Mode.WRITE)
@Description("apoc.periodic.repeat('name',statement,repeat-rate-in-seconds, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement.")
public Stream<JobInfo> repeat(@Name("name") String name, @Name("statement") String statement, @Name("rate") long rate, @Name(value = "config", defaultValue = "{}") Map<String,Object> config ) {
validateQuery(statement);
Map<String,Object> params = (Map)config.getOrDefault("params", Collections.emptyMap());
JobInfo info = schedule(name, () -> {
db.executeTransactionally(statement, params);
},0,rate);
return Stream.of(info);
}

private void validateQuery(String statement) {
db.executeTransactionally("EXPLAIN " + statement);
}

@Procedure(mode = Mode.WRITE)
@Description("apoc.periodic.countdown('name',statement,repeat-rate-in-seconds) submit a repeatedly-called background statement until it returns 0")
public Stream<JobInfo> countdown(@Name("name") String name, @Name("statement") String statement, @Name("rate") long rate) {
validateQuery(statement);
JobInfo info = submit(name, new Countdown(name, statement, rate));
info.rate = rate;
return Stream.of(info);
Expand Down Expand Up @@ -207,7 +215,10 @@ public Stream<LoopingBatchAndTotalResult> rock_n_roll_while(
@Name("cypherIterate") String cypherIterate,
@Name("cypherAction") String cypherAction,
@Name("batchSize") long batchSize) {

Map<String, String> fieldStatement = Util.map(
"cypherLoop", cypherLoop,
"cypherIterate", cypherIterate);
validateQueries(fieldStatement);
Stream<LoopingBatchAndTotalResult> allResults = Stream.empty();

Map<String,Object> loopParams = new HashMap<>(1);
Expand All @@ -231,6 +242,24 @@ public Stream<LoopingBatchAndTotalResult> rock_n_roll_while(
}
}

private void validateQueries(Map<String, String> fieldStatement) {
String error = fieldStatement.entrySet()
.stream()
.map(e -> {
try {
validateQuery(e.getValue());
return null;
} catch (Exception exception) {
return String.format("Exception for field `%s`, message: %s", e.getKey(), exception.getMessage());
}
})
.filter(e -> e != null)
.collect(Collectors.joining("\n"));
if (!error.isEmpty()) {
throw new RuntimeException(error);
}
}

/**
* invoke cypherAction in batched transactions being feeded from cypherIteration running in main thread
* @param cypherIterate
Expand All @@ -242,6 +271,7 @@ public Stream<BatchAndTotalResult> iterate(
@Name("cypherIterate") String cypherIterate,
@Name("cypherAction") String cypherAction,
@Name("config") Map<String,Object> config) {
validateQuery(cypherIterate);

long batchSize = Util.toLong(config.getOrDefault("batchSize", 10000));
int concurrency = Util.toInteger(config.getOrDefault("concurrency", 50));
Expand Down Expand Up @@ -291,6 +321,10 @@ public Stream<BatchAndTotalResult> rock_n_roll(
@Name("cypherIterate") String cypherIterate,
@Name("cypherAction") String cypherAction,
@Name("batchSize") long batchSize) {
Map<String, String> fieldStatement = Util.map(
"cypherIterate", cypherIterate,
"cypherAction", cypherAction);
validateQueries(fieldStatement);

log.info("starting batched operation using iteration `%s` in separate thread", cypherIterate);
try (Result result = tx.execute(cypherIterate)) {
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/apoc/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -493,21 +493,21 @@ public static Map<String, Object> merge(Map<String, Object> first, Map<String, O
return combined;
}

public static Map<String,Object> map(Object ... values) {
Map<String, Object> map = new LinkedHashMap<>();
public static <T> Map<String, T> map(T ... values) {
Map<String, T> map = new LinkedHashMap<>();
for (int i = 0; i < values.length; i+=2) {
if (values[i] == null) continue;
map.put(values[i].toString(),values[i+1]);
}
return map;
}

public static Map<String, Object> map(List<Object> pairs) {
Map<String, Object> res = new LinkedHashMap<>(pairs.size() / 2);
Iterator<Object> it = pairs.iterator();
public static <T> Map<String, T> map(List<T> pairs) {
Map<String, T> res = new LinkedHashMap<>(pairs.size() / 2);
Iterator<T> it = pairs.iterator();
while (it.hasNext()) {
Object key = it.next();
Object value = it.next();
T value = it.next();
if (key != null) res.put(key.toString(), value);
}
return res;
Expand Down
89 changes: 89 additions & 0 deletions src/test/java/apoc/periodic/PeriodicTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -482,4 +482,93 @@ private long tryReadCount(int maxAttempts, String statement, long expected) thro
return count;
}

@Test(expected = QueryExecutionException.class)
public void testCommitFail() {
final String query = "CALL apoc.periodic.commit('UNWIND range(0,1000) as id WITH id CREATE (::Foo {id: id}) limit 1000', {})";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testSubmitFail() {
final String query = "CALL apoc.periodic.submit('foo','create (::Foo)')";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testRepeatFail() {
final String query = "CALL apoc.periodic.repeat('repeat-params', 'MERGE (person:Person {name: $nameValue})', 2, {params: {nameValue: 'John Doe'}}) YIELD name RETURN nam";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testCountdownFail() {
final String query = "CALL apoc.periodic.countdown('decrement', 'MATCH (counter:Counter) SET counter.c == counter.c - 1 RETURN counter.c as count', 1)";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testRockNRollWhileLoopFail() {
final String query = "CALL apoc.periodic.rock_n_roll_while('return coalescence($previous, 3) - 1 as loop', " +
"'match (p:Person) return p', " +
"'MATCH (p) where p = {p} SET p.lastname = p.name', " +
"10)";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testRockNRollWhileIterateFail() {
final String query = "CALL apoc.periodic.rock_n_roll_while('return coalesce($previous, 3) - 1 as loop', " +
"'match (p:Person) return pp', " +
"'MATCH (p) where p = {p} SET p.lastname = p.name', " +
"10)";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testIterateQueryFail() {
final String query = "CALL apoc.periodic.iterate('UNWIND range(0, 1000) as id RETURN ids', " +
"'WITH $id as id CREATE (:Foo {id: $id})', " +
"{batchSize:1,parallel:true})";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testRockNRollIterateFail() {
final String query = "CALL apoc.periodic.rock_n_roll('match (pp:Person) return p', " +
"'WITH $p as p SET p.lastname = p.name REMOVE p.name', " +
"10)";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testRockNRollActionFail() {
final String query = "CALL apoc.periodic.rock_n_roll('match (p:Person) return p', " +
"'WITH $p as p SET pp.lastname = p.name REMOVE p.name', " +
"10)";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testRockNRollWhileFail() {
final String query = "CALL apoc.periodic.rock_n_roll_while('return coalescence($previous, 3) - 1 as loop', " +
"'match (p:Person) return pp', " +
"'MATCH (p) where p = $p SET p.lastname = p.name', " +
"10)";
try {
testFail(query);
} catch (QueryExecutionException e) {
String expected = "Failed to invoke procedure `apoc.periodic.rock_n_roll_while`: Caused by: java.lang.RuntimeException: Exception for field `cypherLoop`, message: Unknown function 'coalescence' (line 1, column 16 (offset: 15))\n" +
"\"return coalescence($previous, 3) - 1 as loop\"\n" +
" ^\n" +
"Exception for field `cypherIterate`, message: Variable `pp` not defined (line 1, column 33 (offset: 32))\n" +
"\"EXPLAIN match (p:Person) return pp\"\n" +
" ^";
assertEquals(expected, e.getMessage());
throw e;
}
}

private void testFail(String query) {
testCall(db, query, row -> fail("The test should fail but it didn't"));
}
}

0 comments on commit 5fb075e

Please sign in to comment.