diff --git a/common/src/main/java/apoc/periodic/PeriodicUtils.java b/common/src/main/java/apoc/periodic/PeriodicUtils.java index ee13c5c01..0390cf261 100644 --- a/common/src/main/java/apoc/periodic/PeriodicUtils.java +++ b/common/src/main/java/apoc/periodic/PeriodicUtils.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.QueryStatistics; +import org.neo4j.graphdb.Result; import org.neo4j.graphdb.Transaction; import org.neo4j.logging.Log; import org.neo4j.procedure.TerminationGuard; @@ -269,7 +270,8 @@ public static Stream submitProc(String name, String statement, Map params = (Map) config.getOrDefault("params", Collections.emptyMap()); JobInfo info = submitJob(name, () -> { try { - db.executeTransactionally(statement, params); + // `resultAsString` in order to consume result + db.executeTransactionally(statement, params, Result::resultAsString); } catch(Exception e) { log.warn("in background task via submit", e); throw new RuntimeException(e); diff --git a/core/src/main/java/apoc/periodic/Periodic.java b/core/src/main/java/apoc/periodic/Periodic.java index 366cfc804..510f7a734 100644 --- a/core/src/main/java/apoc/periodic/Periodic.java +++ b/core/src/main/java/apoc/periodic/Periodic.java @@ -188,7 +188,8 @@ public Stream repeat(@Name("name") String name, @Name("statement") Stri validateQuery(statement); Map params = (Map)config.getOrDefault("params", Collections.emptyMap()); JobInfo info = schedule(name, () -> { - db.executeTransactionally(statement, params); + // `resultAsString` in order to consume result + db.executeTransactionally(statement, params, Result::resultAsString); },0,rate); return Stream.of(info); } diff --git a/core/src/test/java/apoc/periodic/PeriodicTest.java b/core/src/test/java/apoc/periodic/PeriodicTest.java index 3b3e7f5ce..cd1b9db10 100644 --- a/core/src/test/java/apoc/periodic/PeriodicTest.java +++ b/core/src/test/java/apoc/periodic/PeriodicTest.java @@ -40,6 +40,11 @@ import org.neo4j.kernel.api.KernelTransactionHandle; import org.neo4j.kernel.impl.api.KernelTransactions; import org.neo4j.kernel.internal.GraphDatabaseAPI; +import org.neo4j.logging.AssertableLogProvider; +import org.neo4j.logging.Log; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; import org.neo4j.test.rule.DbmsRule; import org.neo4j.test.rule.ImpermanentDbmsRule; @@ -48,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.stream.LongStream; import java.util.stream.Stream; @@ -69,21 +75,72 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.neo4j.driver.internal.util.Iterables.count; +import static org.neo4j.test.assertion.Assert.assertEventually; public class PeriodicTest { + public static class MockLogger { + @Context public Log log; + + @Procedure("apoc.mockLog") + public void mockLog(@Name("value") String value) { + log.info(value); + } + } public static final long RUNDOWN_COUNT = 1000; public static final int BATCH_SIZE = 399; + public static AssertableLogProvider logProvider = new AssertableLogProvider(); + @Rule - public DbmsRule db = new ImpermanentDbmsRule(); + public DbmsRule db = new ImpermanentDbmsRule(logProvider); @Before public void initDb() { - TestUtil.registerProcedure(db, Periodic.class, Schemas.class, Cypher.class, Utils.class); + TestUtil.registerProcedure(db, Periodic.class, Schemas.class, Cypher.class, Utils.class, MockLogger.class); db.executeTransactionally("call apoc.periodic.list() yield name call apoc.periodic.cancel(name) yield name as name2 return count(*)"); } + @Test + public void testRepeatWithVoidProcedure() { + String logVal = "repeatVoid"; + String query = "CALL apoc.periodic.repeat('repeat-1', 'CALL apoc.mockLog($logVal)', 1, {params: {logVal: $logVal}})"; + testLogIncrease(query, logVal); + } + + @Test + public void testRepeatWithVoidProcedureAndReturn() { + String logVal = "repeatVoidWithReturn"; + String query = "CALL apoc.periodic.repeat('repeat-2', 'CALL apoc.mockLog($logVal) RETURN 1', 1, {params: {logVal: $logVal}})"; + testLogIncrease(query, logVal); + } + + @Test + public void testSubmitWithVoidProcedure() { + String logVal = "submitVoid"; + String query = "CALL apoc.periodic.submit('submit-1', 'CALL apoc.mockLog($logVal) RETURN 1', {params: {logVal: $logVal}})"; + testLogIncrease(query, logVal); + } + + @Test + public void testSubmitWithVoidProcedureAndReturn() { + String logVal = "submitVoidWithReturn"; + String query = "CALL apoc.periodic.submit('submit-1', 'CALL apoc.mockLog($logVal)', {params: {logVal: $logVal}})"; + testLogIncrease(query, logVal); + } + + private void testLogIncrease(String query, String logVal) { + // execute a periodic procedure with `CALL apoc.mockLog(...)` as an inner procedure + db.executeTransactionally(query, Map.of("logVal", logVal)); + + // check custom log in logProvider + assertEventually(() -> { + String serialize = logProvider.serialize(); + return serialize.contains(logVal); + }, (val) -> val, 5L, TimeUnit.SECONDS); + + } + @Test public void testSubmitStatement() throws Exception { String callList = "CALL apoc.periodic.list()";