From 55dc72b76cae90b076a6a9db51cdfdd2f52e2401 Mon Sep 17 00:00:00 2001 From: Giuseppe Villani Date: Tue, 11 Oct 2022 09:58:28 +0200 Subject: [PATCH 1/2] Fixes neo4j/apoc#126: apoc.periodic.submit fails with schema operations (core) --- .../java/apoc/periodic/PeriodicUtils.java | 41 +++++++++++++++ .../src/main/java/apoc/periodic/Periodic.java | 50 ++++--------------- .../test/java/apoc/periodic/PeriodicTest.java | 14 ++++++ 3 files changed, 66 insertions(+), 39 deletions(-) diff --git a/common/src/main/java/apoc/periodic/PeriodicUtils.java b/common/src/main/java/apoc/periodic/PeriodicUtils.java index cebf23643..6dffb4f8f 100644 --- a/common/src/main/java/apoc/periodic/PeriodicUtils.java +++ b/common/src/main/java/apoc/periodic/PeriodicUtils.java @@ -11,6 +11,7 @@ import org.neo4j.procedure.TerminationGuard; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -245,6 +246,46 @@ public static Stream iterateAndExecuteBatchedInSeparateThre } return Stream.of(collector.getResult()); } + + public static Stream submitProc(String name, String statement, Map config, GraphDatabaseService db, Log log, Pools pools) { + Map params = (Map) config.getOrDefault("params", Collections.emptyMap()); + JobInfo info = submitJob(name, () -> { + try { + db.executeTransactionally(statement, params); + } catch(Exception e) { + log.warn("in background task via submit", e); + throw new RuntimeException(e); + } + }, log, pools); + return Stream.of(info); + } + + /** + * Call from a procedure that gets a @Context GraphDatbaseAPI db; injected and provide that db to the runnable. + */ + public static JobInfo submitJob(String name, Runnable task, Log log, Pools pools) { + JobInfo info = new JobInfo(name); + Future future = pools.getJobList().remove(info); + if (future != null && !future.isDone()) future.cancel(false); + + Runnable wrappingTask = wrapTask(name, task, log); + Future newFuture = pools.getScheduledExecutorService().submit(wrappingTask); + pools.getJobList().put(info,newFuture); + return info; + } + + public static Runnable wrapTask(String name, Runnable task, Log log) { + return () -> { + log.debug("Executing task " + name); + try { + task.run(); + } catch (Exception e) { + log.error("Error while executing task " + name + " because of the following exception (the task will be killed):", e); + throw e; + } + log.debug("Executed task " + name); + }; + } } /* diff --git a/core/src/main/java/apoc/periodic/Periodic.java b/core/src/main/java/apoc/periodic/Periodic.java index 3984ce452..be7974368 100644 --- a/core/src/main/java/apoc/periodic/Periodic.java +++ b/core/src/main/java/apoc/periodic/Periodic.java @@ -22,7 +22,13 @@ import java.util.regex.Pattern; import java.util.stream.Stream; +import static org.neo4j.graphdb.QueryExecutionType.QueryType.READ_ONLY; +import static org.neo4j.graphdb.QueryExecutionType.QueryType.READ_WRITE; +import static org.neo4j.graphdb.QueryExecutionType.QueryType.WRITE; import static apoc.periodic.PeriodicUtils.recordError; +import static apoc.periodic.PeriodicUtils.submitJob; +import static apoc.periodic.PeriodicUtils.submitProc; +import static apoc.periodic.PeriodicUtils.wrapTask; import static apoc.util.Util.merge; public class Periodic { @@ -156,16 +162,7 @@ public Stream cancel(@Name("name") String name) { @Description("apoc.periodic.submit('name',statement,params) - submit a one-off background statement; parameter 'params' is optional and can contain query parameters for Cypher statement") public Stream submit(@Name("name") String name, @Name("statement") String statement, @Name(value = "params", defaultValue = "{}") Map config) { validateQuery(statement); - Map params = (Map)config.getOrDefault("params", Collections.emptyMap()); - JobInfo info = submit(name, () -> { - try { - db.executeTransactionally(statement, params); - } catch(Exception e) { - log.warn("in background task via submit", e); - throw new RuntimeException(e); - } - }, log); - return Stream.of(info); + return submitProc(name, statement, config, db, log, pools); } @Procedure(mode = Mode.WRITE) @@ -180,31 +177,19 @@ public Stream repeat(@Name("name") String name, @Name("statement") Stri } private void validateQuery(String statement) { - Util.validateQuery(db, statement); + Util.validateQuery(db, statement, + READ_ONLY, WRITE, READ_WRITE); } @Procedure(mode = Mode.WRITE) @Description("apoc.periodic.countdown('name',statement,repeat-rate-in-seconds) submit a repeatedly-called background statement until it returns 0") public Stream countdown(@Name("name") String name, @Name("statement") String statement, @Name("rate") long rate) { validateQuery(statement); - JobInfo info = submit(name, new Countdown(name, statement, rate, log), log); + JobInfo info = submitJob(name, new Countdown(name, statement, rate, log), log, pools); info.rate = rate; return Stream.of(info); } - /** - * Call from a procedure that gets a @Context GraphDatbaseAPI db; injected and provide that db to the runnable. - */ - public JobInfo submit(String name, Runnable task, Log log) { - JobInfo info = new JobInfo(name); - Future future = pools.getJobList().remove(info); - if (future != null && !future.isDone()) future.cancel(false); - - Runnable wrappingTask = wrapTask(name, task, log); - Future newFuture = pools.getScheduledExecutorService().submit(wrappingTask); - pools.getJobList().put(info,newFuture); - return info; - } /** * Call from a procedure that gets a @Context GraphDatbaseAPI db; injected and provide that db to the runnable. @@ -220,19 +205,6 @@ public JobInfo schedule(String name, Runnable task, long delay, long repeat) { return info; } - private static Runnable wrapTask(String name, Runnable task, Log log) { - return () -> { - log.debug("Executing task " + name); - try { - task.run(); - } catch (Exception e) { - log.error("Error while executing task " + name + " because of the following exception (the task will be killed):", e); - throw e; - } - log.debug("Executed task " + name); - }; - } - /** * Invoke cypherAction in batched transactions being fed from cypherIteration running in main thread * @param cypherIterate @@ -325,7 +297,7 @@ public Countdown(String name, String statement, long rate, Log log) { @Override public void run() { if (Periodic.this.executeNumericResultStatement(statement, Collections.emptyMap()) > 0) { - pools.getScheduledExecutorService().schedule(() -> submit(name, this, log), rate, TimeUnit.SECONDS); + pools.getScheduledExecutorService().schedule(() -> submitJob(name, this, log, pools), rate, TimeUnit.SECONDS); } } } diff --git a/core/src/test/java/apoc/periodic/PeriodicTest.java b/core/src/test/java/apoc/periodic/PeriodicTest.java index 618149e8e..e9aa4caee 100644 --- a/core/src/test/java/apoc/periodic/PeriodicTest.java +++ b/core/src/test/java/apoc/periodic/PeriodicTest.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.stream.LongStream; import java.util.stream.Stream; @@ -42,6 +43,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.neo4j.driver.internal.util.Iterables.count; +import static org.neo4j.test.assertion.Assert.assertEventually; public class PeriodicTest { @@ -79,6 +81,18 @@ public void testSubmitStatement() throws Exception { testCall(db, callList, (r) -> assertEquals(true, r.get("done"))); } + @Test + public void testSubmitWithSchemaOperation() { + try { + testCall(db, "CALL apoc.periodic.submit('subSchema','CREATE INDEX periodicIdx FOR (n:Bar) ON (n.first_name, n.last_name)')", + (row) -> fail("Should fail because of unsupported schema operation")); + } catch (RuntimeException e) { + final String expected = "Failed to invoke procedure `apoc.periodic.submit`: " + + "Caused by: java.lang.RuntimeException: Supported query types for the operation are [READ_ONLY, WRITE, READ_WRITE]"; + assertEquals(expected, e.getMessage()); + } + } + @Test public void testSubmitStatementWithParams() throws Exception { String callList = "CALL apoc.periodic.list()"; From ed7c6b09e52f885c5c1905856e8c183ff10e12af Mon Sep 17 00:00:00 2001 From: Giuseppe Villani Date: Tue, 11 Oct 2022 11:05:33 +0200 Subject: [PATCH 2/2] removed unused import --- core/src/test/java/apoc/periodic/PeriodicTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/test/java/apoc/periodic/PeriodicTest.java b/core/src/test/java/apoc/periodic/PeriodicTest.java index e9aa4caee..0f25f89b7 100644 --- a/core/src/test/java/apoc/periodic/PeriodicTest.java +++ b/core/src/test/java/apoc/periodic/PeriodicTest.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.TimeUnit; import java.util.stream.LongStream; import java.util.stream.Stream; @@ -43,7 +42,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.neo4j.driver.internal.util.Iterables.count; -import static org.neo4j.test.assertion.Assert.assertEventually; public class PeriodicTest {