Skip to content

Commit

Permalink
fixes #1268: apoc.periodic.repeat doesn't provide any feedback when q…
Browse files Browse the repository at this point in the history
…uery is bad (#1320)
  • Loading branch information
conker84 authored and jexp committed Nov 5, 2019
1 parent c928a79 commit d8697ff
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 17 deletions.
36 changes: 35 additions & 1 deletion src/main/java/apoc/periodic/Periodic.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public Stream<JobInfo> list() {
@Procedure(mode = Mode.WRITE)
@Description("apoc.periodic.commit(statement,params) - runs the given statement in separate transactions until it returns 0")
public Stream<RundownResult> commit(@Name("statement") String statement, @Name(value = "params", defaultValue = "") Map<String,Object> parameters) throws ExecutionException, InterruptedException {
validateQuery(statement);
Map<String,Object> params = parameters == null ? Collections.emptyMap() : parameters;
long total = 0, executions = 0, updates = 0;
long start = nanoTime();
Expand Down Expand Up @@ -157,6 +158,7 @@ public Stream<JobInfo> cancel(@Name("name") String name) {
@Procedure(mode = Mode.WRITE)
@Description("apoc.periodic.submit('name',statement) - submit a one-off background statement")
public Stream<JobInfo> submit(@Name("name") String name, @Name("statement") String statement) {
validateQuery(statement);
JobInfo info = submit(name, () -> {
try {
Iterators.count(db.execute(statement));
Expand All @@ -170,14 +172,20 @@ public Stream<JobInfo> submit(@Name("name") String name, @Name("statement") Stri
@Procedure(mode = Mode.WRITE)
@Description("apoc.periodic.repeat('name',statement,repeat-rate-in-seconds, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement.")
public Stream<JobInfo> repeat(@Name("name") String name, @Name("statement") String statement, @Name("rate") long rate, @Name(value = "config", defaultValue = "{}") Map<String,Object> config ) {
validateQuery(statement);
Map<String,Object> params = (Map)config.getOrDefault("params", Collections.emptyMap());
JobInfo info = schedule(name, () -> Iterators.count(db.execute(statement, params)),0,rate);
return Stream.of(info);
}

private void validateQuery(String statement) {
db.execute("EXPLAIN " + statement).close();
}

@Procedure(mode = Mode.WRITE)
@Description("apoc.periodic.countdown('name',statement,repeat-rate-in-seconds) submit a repeatedly-called background statement until it returns 0")
public Stream<JobInfo> countdown(@Name("name") String name, @Name("statement") String statement, @Name("rate") long rate) {
validateQuery(statement);
JobInfo info = submit(name, new Countdown(name, statement, rate));
info.rate = rate;
return Stream.of(info);
Expand Down Expand Up @@ -227,7 +235,10 @@ public Stream<LoopingBatchAndTotalResult> rock_n_roll_while(
@Name("cypherIterate") String cypherIterate,
@Name("cypherAction") String cypherAction,
@Name("batchSize") long batchSize) {

Map<String, String> fieldStatement = Util.map(
"cypherLoop", cypherLoop,
"cypherIterate", cypherIterate);
validateQueries(fieldStatement);
Stream<LoopingBatchAndTotalResult> allResults = Stream.empty();

Map<String,Object> loopParams = new HashMap<>(1);
Expand All @@ -251,6 +262,24 @@ public Stream<LoopingBatchAndTotalResult> rock_n_roll_while(
}
}

private void validateQueries(Map<String, String> fieldStatement) {
String error = fieldStatement.entrySet()
.stream()
.map(e -> {
try {
validateQuery(e.getValue());
return null;
} catch (Exception exception) {
return String.format("Exception for field `%s`, message: %s", e.getKey(), exception.getMessage());
}
})
.filter(e -> e != null)
.collect(Collectors.joining("\n"));
if (!error.isEmpty()) {
throw new RuntimeException(error);
}
}

/**
* invoke cypherAction in batched transactions being feeded from cypherIteration running in main thread
* @param cypherIterate
Expand All @@ -262,6 +291,7 @@ public Stream<BatchAndTotalResult> iterate(
@Name("cypherIterate") String cypherIterate,
@Name("cypherAction") String cypherAction,
@Name("config") Map<String,Object> config) {
validateQuery(cypherIterate);

long batchSize = Util.toLong(config.getOrDefault("batchSize", 10000));
int concurrency = Util.toInteger(config.getOrDefault("concurrency", 50));
Expand Down Expand Up @@ -323,6 +353,10 @@ public Stream<BatchAndTotalResult> rock_n_roll(
@Name("cypherIterate") String cypherIterate,
@Name("cypherAction") String cypherAction,
@Name("batchSize") long batchSize) {
Map<String, String> fieldStatement = Util.map(
"cypherIterate", cypherIterate,
"cypherAction", cypherAction);
validateQueries(fieldStatement);

log.info("starting batched operation using iteration `%s` in separate thread", cypherIterate);
try (Result result = db.execute(cypherIterate)) {
Expand Down
72 changes: 60 additions & 12 deletions src/main/java/apoc/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,19 @@
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.graphdb.*;
import org.neo4j.graphdb.Direction;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.NotInTransactionException;
import org.neo4j.graphdb.PropertyContainer;
import org.neo4j.graphdb.QueryExecutionException;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionGuardException;
import org.neo4j.graphdb.TransactionTerminatedException;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.internal.kernel.api.security.SecurityContext;
Expand All @@ -17,17 +28,54 @@
import org.neo4j.procedure.TerminationGuard;

import javax.lang.model.SourceVersion;
import java.io.*;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.net.*;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLEncoder;
import java.time.Duration;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import java.util.*;
import java.util.concurrent.*;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PrimitiveIterator;
import java.util.Scanner;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import java.util.zip.DeflaterInputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
Expand Down Expand Up @@ -486,21 +534,21 @@ public static Map<String, Object> merge(Map<String, Object> first, Map<String, O
return combined;
}

public static Map<String,Object> map(Object ... values) {
Map<String, Object> map = new LinkedHashMap<>();
public static <T> Map<String, T> map(T ... values) {
Map<String, T> map = new LinkedHashMap<>();
for (int i = 0; i < values.length; i+=2) {
if (values[i] == null) continue;
map.put(values[i].toString(),values[i+1]);
}
return map;
}

public static Map<String, Object> map(List<Object> pairs) {
Map<String, Object> res = new LinkedHashMap<>(pairs.size() / 2);
Iterator<Object> it = pairs.iterator();
public static <T> Map<String, T> map(List<T> pairs) {
Map<String, T> res = new LinkedHashMap<>(pairs.size() / 2);
Iterator<T> it = pairs.iterator();
while (it.hasNext()) {
Object key = it.next();
Object value = it.next();
T value = it.next();
if (key != null) res.put(key.toString(), value);
}
return res;
Expand Down
12 changes: 10 additions & 2 deletions src/test/java/apoc/coll/CollTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,23 @@
import org.neo4j.graphdb.Node;
import org.neo4j.test.TestGraphDatabaseFactory;

import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static apoc.util.TestUtil.testCall;
import static apoc.util.TestUtil.testResult;
import static apoc.util.Util.map;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.neo4j.helpers.collection.Iterables.asSet;

public class CollTest {
Expand Down
102 changes: 100 additions & 2 deletions src/test/java/apoc/periodic/PeriodicTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.neo4j.graphdb.*;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.QueryExecutionException;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.query.ExecutingQuery;
Expand All @@ -26,7 +30,11 @@
import static java.util.stream.Collectors.toList;
import static java.util.stream.StreamSupport.stream;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.graphdb.DependencyResolver.SelectionStrategy.FIRST;

public class PeriodicTest {
Expand Down Expand Up @@ -494,4 +502,94 @@ private long readCount(String statement) {
return Iterators.single(it);
}
}

@Test(expected = QueryExecutionException.class)
public void testCommitFail() {
final String query = "CALL apoc.periodic.commit('UNWIND range(0,1000) as id WITH id CREATE (::Foo {id: id}) limit 1000', {})";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testSubmitFail() {
final String query = "CALL apoc.periodic.submit('foo','create (::Foo)')";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testRepeatFail() {
final String query = "CALL apoc.periodic.repeat('repeat-params', 'MERGE (person:Person {name: {nameValue}})', 2, {params: {nameValue: 'John Doe'}}) YIELD name RETURN nam";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testCountdownFail() {
final String query = "CALL apoc.periodic.countdown('decrement', 'MATCH (counter:Counter) SET counter.c == counter.c - 1 RETURN counter.c as count', 1)";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testRockNRollWhileLoopFail() {
final String query = "CALL apoc.periodic.rock_n_roll_while('return coalescence({previous}, 3) - 1 as loop', " +
"'match (p:Person) return p', " +
"'MATCH (p) where p = {p} SET p.lastname = p.name', " +
"10)";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testRockNRollWhileIterateFail() {
final String query = "CALL apoc.periodic.rock_n_roll_while('return coalesce({previous}, 3) - 1 as loop', " +
"'match (p:Person) return pp', " +
"'MATCH (p) where p = {p} SET p.lastname = p.name', " +
"10)";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testIterateQueryFail() {
final String query = "CALL apoc.periodic.iterate('UNWIND range(0, 1000) as id RETURN ids', " +
"'WITH $id as id CREATE (:Foo {id: $id})', " +
"{batchSize:1,parallel:true})";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testRockNRollIterateFail() {
final String query = "CALL apoc.periodic.rock_n_roll('match (pp:Person) return p', " +
"'WITH {p} as p SET p.lastname = p.name REMOVE p.name', " +
"10)";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testRockNRollActionFail() {
final String query = "CALL apoc.periodic.rock_n_roll('match (p:Person) return p', " +
"'WITH {p} as p SET pp.lastname = p.name REMOVE p.name', " +
"10)";
testFail(query);
}

@Test(expected = QueryExecutionException.class)
public void testRockNRollWhileFail() {
final String query = "CALL apoc.periodic.rock_n_roll_while('return coalescence({previous}, 3) - 1 as loop', " +
"'match (p:Person) return pp', " +
"'MATCH (p) where p = {p} SET p.lastname = p.name', " +
"10)";
try {
testFail(query);
} catch (QueryExecutionException e) {
String expected = "Failed to invoke procedure `apoc.periodic.rock_n_roll_while`: Caused by: java.lang.RuntimeException: Exception for field `cypherLoop`, message: Unknown function 'coalescence' (line 1, column 16 (offset: 15))\n" +
"\"return coalescence({previous}, 3) - 1 as loop\"\n" +
" ^\n" +
"Exception for field `cypherIterate`, message: Variable `pp` not defined (line 1, column 33 (offset: 32))\n" +
"\"EXPLAIN match (p:Person) return pp\"\n" +
" ^";
assertEquals(expected, e.getMessage());
throw e;
}
}

private void testFail(String query) {
testCall(db, query, row -> fail("The test should fail but it didn't"));
}
}

0 comments on commit d8697ff

Please sign in to comment.