diff --git a/core/src/main/java/apoc/periodic/Periodic.java b/core/src/main/java/apoc/periodic/Periodic.java index f19fa9cd49..4b3fa75337 100644 --- a/core/src/main/java/apoc/periodic/Periodic.java +++ b/core/src/main/java/apoc/periodic/Periodic.java @@ -26,6 +26,12 @@ import java.util.regex.Pattern; import java.util.stream.Stream; +import static org.neo4j.graphdb.QueryExecutionType.QueryType.READ_ONLY; +import static org.neo4j.graphdb.QueryExecutionType.QueryType.READ_WRITE; +import static org.neo4j.graphdb.QueryExecutionType.QueryType.WRITE; +import static apoc.periodic.PeriodicUtils.submitJob; +import static apoc.periodic.PeriodicUtils.submitProc; +import static apoc.periodic.PeriodicUtils.wrapTask; import static apoc.util.Util.merge; public class Periodic { @@ -166,16 +172,7 @@ public Stream cancel(@Name("name") String name) { @Description("apoc.periodic.submit('name',statement,params) - creates a background job which executes a Cypher statement once. The parameter 'params' is optional and can contain query parameters for the Cypher statement") public Stream submit(@Name("name") String name, @Name("statement") String statement, @Name(value = "params", defaultValue = "{}") Map config) { validateQuery(statement); - Map params = (Map)config.getOrDefault("params", Collections.emptyMap()); - JobInfo info = submit(name, () -> { - try { - db.executeTransactionally(statement, params); - } catch(Exception e) { - log.warn("in background task via submit", e); - throw new RuntimeException(e); - } - }, log); - return Stream.of(info); + return submitProc(name, statement, config, db, log, pools); } @Procedure(mode = Mode.WRITE) @@ -190,31 +187,19 @@ public Stream repeat(@Name("name") String name, @Name("statement") Stri } private void validateQuery(String statement) { - Util.validateQuery(db, statement); + Util.validateQuery(db, statement, + READ_ONLY, WRITE, READ_WRITE); } @Procedure(mode = Mode.WRITE) @Description("apoc.periodic.countdown('name',statement,repeat-rate-in-seconds) creates a background job that will repeatedly execute the given Cypher statement until it returns 0.") public Stream countdown(@Name("name") String name, @Name("statement") String statement, @Name("rate") long rate) { validateQuery(statement); - JobInfo info = submit(name, new Countdown(name, statement, rate, log), log); + JobInfo info = submitJob(name, new Countdown(name, statement, rate, log), log, pools); info.rate = rate; return Stream.of(info); } - /** - * Call from a procedure that gets a @Context GraphDatbaseAPI db; injected and provide that db to the runnable. - */ - public JobInfo submit(String name, Runnable task, Log log) { - JobInfo info = new JobInfo(name); - Future future = pools.getJobList().remove(info); - if (future != null && !future.isDone()) future.cancel(false); - - Runnable wrappingTask = wrapTask(name, task, log); - Future newFuture = pools.getScheduledExecutorService().submit(wrappingTask); - pools.getJobList().put(info,newFuture); - return info; - } /** * Call from a procedure that gets a @Context GraphDatbaseAPI db; injected and provide that db to the runnable. @@ -230,19 +215,6 @@ public JobInfo schedule(String name, Runnable task, long delay, long repeat) { return info; } - private static Runnable wrapTask(String name, Runnable task, Log log) { - return () -> { - log.debug("Executing task " + name); - try { - task.run(); - } catch (Exception e) { - log.error("Error while executing task " + name + " because of the following exception (the task will be killed):", e); - throw e; - } - log.debug("Executed task " + name); - }; - } - /** * Invoke cypherAction in batched transactions being fed from cypherIteration running in main thread * @param cypherIterate @@ -453,7 +425,7 @@ public Countdown(String name, String statement, long rate, Log log) { @Override public void run() { if (Periodic.this.executeNumericResultStatement(statement, Collections.emptyMap()) > 0) { - pools.getScheduledExecutorService().schedule(() -> submit(name, this, log), rate, TimeUnit.SECONDS); + pools.getScheduledExecutorService().schedule(() -> submitJob(name, this, log, pools), rate, TimeUnit.SECONDS); } } } diff --git a/core/src/main/java/apoc/periodic/PeriodicUtils.java b/core/src/main/java/apoc/periodic/PeriodicUtils.java index c3abfeb0f5..7aa237281a 100644 --- a/core/src/main/java/apoc/periodic/PeriodicUtils.java +++ b/core/src/main/java/apoc/periodic/PeriodicUtils.java @@ -10,6 +10,7 @@ import org.neo4j.procedure.TerminationGuard; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -23,6 +24,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static apoc.periodic.Periodic.JobInfo; + public class PeriodicUtils { private PeriodicUtils() { @@ -116,6 +119,46 @@ public static Stream iterateAndExecuteBatchedInSeparateThre } return Stream.of(collector.getResult()); } + + public static Stream submitProc(String name, String statement, Map config, GraphDatabaseService db, Log log, Pools pools) { + Map params = (Map) config.getOrDefault("params", Collections.emptyMap()); + JobInfo info = submitJob(name, () -> { + try { + db.executeTransactionally(statement, params); + } catch(Exception e) { + log.warn("in background task via submit", e); + throw new RuntimeException(e); + } + }, log, pools); + return Stream.of(info); + } + + /** + * Call from a procedure that gets a @Context GraphDatbaseAPI db; injected and provide that db to the runnable. + */ + public static JobInfo submitJob(String name, Runnable task, Log log, Pools pools) { + JobInfo info = new JobInfo(name); + Future future = pools.getJobList().remove(info); + if (future != null && !future.isDone()) future.cancel(false); + + Runnable wrappingTask = wrapTask(name, task, log); + Future newFuture = pools.getScheduledExecutorService().submit(wrappingTask); + pools.getJobList().put(info,newFuture); + return info; + } + + public static Runnable wrapTask(String name, Runnable task, Log log) { + return () -> { + log.debug("Executing task " + name); + try { + task.run(); + } catch (Exception e) { + log.error("Error while executing task " + name + " because of the following exception (the task will be killed):", e); + throw e; + } + log.debug("Executed task " + name); + }; + } } /* diff --git a/core/src/test/java/apoc/periodic/PeriodicTest.java b/core/src/test/java/apoc/periodic/PeriodicTest.java index df9732340d..7df2f63be7 100644 --- a/core/src/test/java/apoc/periodic/PeriodicTest.java +++ b/core/src/test/java/apoc/periodic/PeriodicTest.java @@ -1,5 +1,6 @@ package apoc.periodic; +import apoc.schema.Schemas; import apoc.util.MapUtil; import apoc.util.TestUtil; import org.junit.Before; @@ -53,7 +54,7 @@ public class PeriodicTest { @Before public void initDb() throws Exception { - TestUtil.registerProcedure(db, Periodic.class); + TestUtil.registerProcedure(db, Periodic.class, Schemas.class); db.executeTransactionally("call apoc.periodic.list() yield name call apoc.periodic.cancel(name) yield name as name2 return count(*)"); } @@ -79,6 +80,33 @@ public void testSubmitStatement() throws Exception { testCall(db, callList, (r) -> assertEquals(true, r.get("done"))); } + @Test + public void testSubmitWithCreateIndexSchemaOperation() { + try { + testCall(db, "CALL apoc.periodic.submit('subSchema','CREATE INDEX periodicIdx FOR (n:Bar) ON (n.first_name, n.last_name)')", + (row) -> fail("Should fail because of unsupported schema operation")); + } catch (RuntimeException e) { + final String expected = "Failed to invoke procedure `apoc.periodic.submit`: " + + "Caused by: java.lang.RuntimeException: Supported query types for the operation are [READ_ONLY, WRITE, READ_WRITE]"; + assertEquals(expected, e.getMessage()); + } + } + + @Test + public void testSubmitWithCreateUniqueConstraintSchemaOperation() { + try { + db.executeTransactionally("CREATE INDEX periodicIdx FOR (n:Bar) ON (n.first_name, n.last_name)"); + final String createConstraint = "CALL db.createUniquePropertyConstraint('uniqueConsName', ['Alpha', 'Beta'], ['foo', 'bar'], 'lucene-1.0')"; + testCall(db, "CALL apoc.periodic.submit('subSchema', $createConstraint)", + Map.of("createConstraint", createConstraint), + (row) -> fail("Should fail because of unsupported schema operation")); + } catch (RuntimeException e) { + final String expected = "Failed to invoke procedure `apoc.periodic.submit`: " + + "Caused by: java.lang.RuntimeException: Supported query types for the operation are [READ_ONLY, WRITE, READ_WRITE]"; + assertEquals(expected, e.getMessage()); + } + } + @Test public void testSubmitStatementWithParams() throws Exception { String callList = "CALL apoc.periodic.list()";