diff --git a/core/src/main/java/apoc/periodic/Periodic.java b/core/src/main/java/apoc/periodic/Periodic.java index 484e84b440..151548f142 100644 --- a/core/src/main/java/apoc/periodic/Periodic.java +++ b/core/src/main/java/apoc/periodic/Periodic.java @@ -3,6 +3,7 @@ import apoc.Pools; import apoc.util.Util; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.time.DateUtils; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.QueryStatistics; import org.neo4j.graphdb.Result; @@ -16,6 +17,12 @@ import org.neo4j.logging.Log; import org.neo4j.procedure.*; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetTime; +import java.time.temporal.ChronoUnit; +import java.time.temporal.Temporal; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -30,6 +37,8 @@ public class Periodic { + public static final String ERROR_DATE_BEFORE = "The provided date is before current date"; + enum Planner {DEFAULT, COST, IDP, DP } public static final Pattern PLANNER_PATTERN = Pattern.compile("\\bplanner\\s*=\\s*[^\\s]*", Pattern.CASE_INSENSITIVE); @@ -167,25 +176,64 @@ public Stream cancel(@Name("name") String name) { public Stream submit(@Name("name") String name, @Name("statement") String statement, @Name(value = "params", defaultValue = "{}") Map config) { validateQuery(statement); Map params = (Map)config.getOrDefault("params", Collections.emptyMap()); - JobInfo info = submit(name, () -> { + + final Temporal atTime = (Temporal) (config.get("atTime")); + + final Runnable task = () -> { try { db.executeTransactionally(statement, params); - } catch(Exception e) { + } catch (Exception e) { log.warn("in background task via submit", e); throw new RuntimeException(e); } - }, log); + }; + + JobInfo info = atTime != null + ? getJobInfo(name, atTime, task, ScheduleType.DEFAULT) + : submit(name, task); + return Stream.of(info); } + private JobInfo getJobInfo(String name, Temporal atTime, Runnable task, ScheduleType scheduleType) { + if (atTime instanceof LocalDate) { + atTime = ((LocalDate) atTime).atStartOfDay(); + } + final boolean isTime = atTime instanceof OffsetTime || atTime instanceof LocalTime; + Temporal now = isTime + ? LocalTime.now() + : LocalDateTime.now(); + + final long secPerDay = DateUtils.MILLIS_PER_DAY / 1000L; + long delay = now.until(atTime, ChronoUnit.SECONDS); + if (isTime && delay < 0) { + // we consider the day after + delay = delay + secPerDay; + } + if (delay < 0) { + throw new RuntimeException(ERROR_DATE_BEFORE); + } + return schedule(name, task, delay, secPerDay, scheduleType); + } + @Procedure(mode = Mode.WRITE) - @Description("apoc.periodic.repeat('name',statement,repeat-rate-in-seconds, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement.") - public Stream repeat(@Name("name") String name, @Name("statement") String statement, @Name("rate") long rate, @Name(value = "config", defaultValue = "{}") Map config ) { + @Description("apoc.periodic.repeat('name',statement, rateOrTime, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement.") + public Stream repeat(@Name("name") String name, @Name("statement") String statement, @Name("rateOrTime") Object rateOrTime, @Name(value = "config", defaultValue = "{}") Map config ) { + validateQuery(statement); Map params = (Map)config.getOrDefault("params", Collections.emptyMap()); - JobInfo info = schedule(name, () -> { + final Runnable runnable = () -> { db.executeTransactionally(statement, params); - },0,rate); + }; + final JobInfo info; + if (rateOrTime instanceof Long) { + info = schedule(name, runnable,0, (long) rateOrTime); + } else if(rateOrTime instanceof Temporal) { + info = getJobInfo(name, (Temporal) rateOrTime, runnable, ScheduleType.FIXED_RATE); + } else { + throw new RuntimeException("invalid type of rateOrTime parameter"); + } + return Stream.of(info); } @@ -197,7 +245,7 @@ private void validateQuery(String statement) { @Description("apoc.periodic.countdown('name',statement,repeat-rate-in-seconds) submit a repeatedly-called background statement until it returns 0") public Stream countdown(@Name("name") String name, @Name("statement") String statement, @Name("rate") long rate) { validateQuery(statement); - JobInfo info = submit(name, new Countdown(name, statement, rate, log), log); + JobInfo info = submit(name, new Countdown(name, statement, rate, log)); info.rate = rate; return Stream.of(info); } @@ -205,7 +253,7 @@ public Stream countdown(@Name("name") String name, @Name("statement") S /** * Call from a procedure that gets a @Context GraphDatbaseAPI db; injected and provide that db to the runnable. */ - public JobInfo submit(String name, Runnable task, Log log) { + public JobInfo submit(String name, Runnable task) { JobInfo info = new JobInfo(name); Future future = pools.getJobList().remove(info); if (future != null && !future.isDone()) future.cancel(false); @@ -216,20 +264,39 @@ public JobInfo submit(String name, Runnable task, Log log) { return info; } + private enum ScheduleType { DEFAULT, FIXED_DELAY, FIXED_RATE } + + public JobInfo schedule(String name, Runnable task, long delay, long repeat) { + return schedule(name, task, delay, repeat, ScheduleType.FIXED_DELAY); + } + /** * Call from a procedure that gets a @Context GraphDatbaseAPI db; injected and provide that db to the runnable. */ - public JobInfo schedule(String name, Runnable task, long delay, long repeat) { - JobInfo info = new JobInfo(name,delay,repeat); + public JobInfo schedule(String name, Runnable task, long delay, long repeat, ScheduleType isFixedDelay) { + JobInfo info = new JobInfo(name, delay, isFixedDelay.equals(ScheduleType.DEFAULT) ? 0 : repeat); Future future = pools.getJobList().remove(info); if (future != null && !future.isDone()) future.cancel(false); Runnable wrappingTask = wrapTask(name, task, log); - ScheduledFuture newFuture = pools.getScheduledExecutorService().scheduleWithFixedDelay(wrappingTask, delay, repeat, TimeUnit.SECONDS); + ScheduledFuture newFuture = getScheduledFuture(wrappingTask, delay, repeat, isFixedDelay); pools.getJobList().put(info,newFuture); return info; } + private ScheduledFuture getScheduledFuture(Runnable wrappingTask, long delay, long repeat, ScheduleType isFixedDelay) { + final ScheduledExecutorService service = pools.getScheduledExecutorService(); + final TimeUnit timeUnit = TimeUnit.SECONDS; + switch (isFixedDelay) { + case FIXED_DELAY: + return service.scheduleWithFixedDelay(wrappingTask, delay, repeat, timeUnit); + case FIXED_RATE: + return service.scheduleAtFixedRate(wrappingTask, delay, repeat, timeUnit); + default: + return service.schedule(wrappingTask, delay, timeUnit); + } + } + private static Runnable wrapTask(String name, Runnable task, Log log) { return () -> { log.debug("Executing task " + name); @@ -453,7 +520,7 @@ public Countdown(String name, String statement, long rate, Log log) { @Override public void run() { if (Periodic.this.executeNumericResultStatement(statement, Collections.emptyMap()) > 0) { - pools.getScheduledExecutorService().schedule(() -> submit(name, this, log), rate, TimeUnit.SECONDS); + pools.getScheduledExecutorService().schedule(() -> submit(name, this), rate, TimeUnit.SECONDS); } } } diff --git a/core/src/test/java/apoc/periodic/PeriodicTest.java b/core/src/test/java/apoc/periodic/PeriodicTest.java index df9732340d..88aa7875c7 100644 --- a/core/src/test/java/apoc/periodic/PeriodicTest.java +++ b/core/src/test/java/apoc/periodic/PeriodicTest.java @@ -20,18 +20,23 @@ import org.neo4j.test.rule.DbmsRule; import org.neo4j.test.rule.ImpermanentDbmsRule; +import java.time.LocalTime; +import java.time.ZonedDateTime; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.stream.LongStream; import java.util.stream.Stream; +import static apoc.periodic.Periodic.ERROR_DATE_BEFORE; import static apoc.periodic.Periodic.applyPlanner; import static apoc.util.TestUtil.testCall; import static apoc.util.TestUtil.testResult; import static apoc.util.Util.map; +import static java.util.Collections.emptyMap; import static java.util.stream.Collectors.toList; import static java.util.stream.StreamSupport.stream; import static org.hamcrest.CoreMatchers.equalTo; @@ -42,6 +47,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.neo4j.driver.internal.util.Iterables.count; +import static org.neo4j.test.assertion.Assert.assertEventually; public class PeriodicTest { @@ -79,6 +85,67 @@ public void testSubmitStatement() throws Exception { testCall(db, callList, (r) -> assertEquals(true, r.get("done"))); } + @Test + public void testSubmitStatementAtTime() { + String callList = "CALL apoc.periodic.list()"; + assertFalse(db.executeTransactionally(callList, Collections.emptyMap(), Result::hasNext)); + + TestUtil.testFail(db, "CALL apoc.periodic.submit('submitAtTime','create (:Ajeje)', {atTime: datetime('2019')})", RuntimeException.class); + + try { + testCall(db, "CALL apoc.periodic.submit('errorDate','create (:ToFail)', {atTime: $time})", + map("time", ZonedDateTime.now().minusDays(1)), + (row) -> fail() ); + } catch (RuntimeException e) { + assertTrue(e.getMessage().contains(ERROR_DATE_BEFORE)); + } + + testCall(db, "CALL apoc.periodic.submit('submitAtTime','create (:Ajeje)', {atTime: $time})", + map("time", ZonedDateTime.now().plusSeconds(10)), + (row) -> { + assertEquals("submitAtTime", row.get("name")); + assertEquals(false, row.get("done")); + assertEquals(false, row.get("cancelled")); + assertTrue((long) row.get("delay") <= 10); + assertEquals(0L, row.get("rate")); + }); + + final String queryCount = "MATCH (:Ajeje) RETURN COUNT(*) AS count"; + long countBefore = TestUtil.singleResultFirstColumn(db, queryCount); + assertEquals(0L, countBefore); + assertEventually(() -> db.executeTransactionally(queryCount, + emptyMap(), (r) -> r.columnAs("count").next()), + value -> value == 1L, 20L, TimeUnit.SECONDS); + + assertEventually(() -> db.executeTransactionally(callList, emptyMap(), Result::hasNext), + value -> !value, 10L, TimeUnit.SECONDS); + } + + @Test + public void testRepeatStatementAtTime() { + String callList = "CALL apoc.periodic.list()"; + assertFalse(db.executeTransactionally(callList, Collections.emptyMap(), Result::hasNext)); + + testCall(db, "CALL apoc.periodic.repeat('repeatAtTime','create (:Brazorf)', time($timeAsString))", + map("timeAsString", LocalTime.now().plusSeconds(10).toString()), + (row) -> { + assertEquals("repeatAtTime", row.get("name")); + assertEquals(false, row.get("done")); + assertEquals(false, row.get("cancelled")); + assertTrue((long) row.get("delay") <= 10); + assertEquals(86400L, row.get("rate")); + }); + + final String queryCount = "MATCH (:Brazorf) RETURN COUNT(*) AS count"; + long countBefore = TestUtil.singleResultFirstColumn(db, queryCount); + assertEquals(0L, countBefore); + assertEventually(() -> db.executeTransactionally(queryCount, + emptyMap(), (r) -> r.columnAs("count").next()), + value -> value == 1L, 20L, TimeUnit.SECONDS); + + testCall(db, callList, (r) -> assertEquals(false, r.get("done"))); + } + @Test public void testSubmitStatementWithParams() throws Exception { String callList = "CALL apoc.periodic.list()"; diff --git a/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic-lite.csv b/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic-lite.csv index 711e32abaf..87fd055875 100644 --- a/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic-lite.csv +++ b/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic-lite.csv @@ -4,7 +4,7 @@ ¦apoc.periodic.countdown(name :: STRING?, statement :: STRING?, rate :: INTEGER?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?) ¦apoc.periodic.iterate(cypherIterate :: STRING?, cypherAction :: STRING?, config :: MAP?) :: (batches :: INTEGER?, total :: INTEGER?, timeTaken :: INTEGER?, committedOperations :: INTEGER?, failedOperations :: INTEGER?, failedBatches :: INTEGER?, retries :: INTEGER?, errorMessages :: MAP?, batch :: MAP?, operations :: MAP?, wasTerminated :: BOOLEAN?, failedParams :: MAP?, updateStatistics :: MAP?) ¦apoc.periodic.list() :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?) -¦apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rate :: INTEGER?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?) +¦apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rateOrTime :: OBJECT?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?) ¦apoc.periodic.rock_n_roll(cypherIterate :: STRING?, cypherAction :: STRING?, batchSize :: INTEGER?) :: (batches :: INTEGER?, total :: INTEGER?, timeTaken :: INTEGER?, committedOperations :: INTEGER?, failedOperations :: INTEGER?, failedBatches :: INTEGER?, retries :: INTEGER?, errorMessages :: MAP?, batch :: MAP?, operations :: MAP?, wasTerminated :: BOOLEAN?, failedParams :: MAP?, updateStatistics :: MAP?) ¦apoc.periodic.rock_n_roll_while(cypherLoop :: STRING?, cypherIterate :: STRING?, cypherAction :: STRING?, batchSize :: INTEGER?) :: (loop :: ANY?, batches :: INTEGER?, total :: INTEGER?) ¦apoc.periodic.submit(name :: STRING?, statement :: STRING?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?) diff --git a/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic.csv b/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic.csv index 73ddee727a..3c6fdce297 100644 --- a/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic.csv +++ b/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic.csv @@ -26,7 +26,7 @@ apoc.periodic.list - list all jobs |label:apoc-core[] |xref::overview/apoc.periodic/apoc.periodic.adoc[apoc.periodic.repeat icon:book[]] -apoc.periodic.repeat('name',statement,repeat-rate-in-seconds, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement. +apoc.periodic.repeat('name',statement, rateOrTime, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement. |label:procedure[] |label:apoc-core[] |xref::overview/apoc.periodic/apoc.periodic.adoc[apoc.periodic.rock_n_roll icon:book[]] diff --git a/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic.repeat-lite.csv b/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic.repeat-lite.csv index 7ce8d164ae..1ba49dcc93 100644 --- a/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic.repeat-lite.csv +++ b/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic.repeat-lite.csv @@ -1,2 +1,2 @@ ¦signature -¦apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rate :: INTEGER?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?) +¦apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rateOrTime :: OBJECT?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?) diff --git a/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic.repeat.csv b/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic.repeat.csv index d46d984308..1504e6c06d 100644 --- a/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic.repeat.csv +++ b/docs/asciidoc/modules/ROOT/examples/generated-documentation/apoc.periodic.repeat.csv @@ -1,2 +1,2 @@ ¦type¦qualified name¦signature¦description -¦procedure¦apoc.periodic.repeat¦apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rate :: INTEGER?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)¦apoc.periodic.repeat('name',statement,repeat-rate-in-seconds, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement. +¦procedure¦apoc.periodic.repeat¦apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rateOrTime :: OBJECT?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)¦apoc.periodic.repeat('name',statement, rateOrTime, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement. diff --git a/docs/asciidoc/modules/ROOT/examples/generated-documentation/documentation.csv b/docs/asciidoc/modules/ROOT/examples/generated-documentation/documentation.csv index 5cfb8b5f21..a2d0224085 100644 --- a/docs/asciidoc/modules/ROOT/examples/generated-documentation/documentation.csv +++ b/docs/asciidoc/modules/ROOT/examples/generated-documentation/documentation.csv @@ -242,7 +242,7 @@ ¦procedure¦apoc.periodic.countdown¦apoc.periodic.countdown(name :: STRING?, statement :: STRING?, rate :: INTEGER?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)¦apoc.periodic.countdown('name',statement,repeat-rate-in-seconds) submit a repeatedly-called background statement until it returns 0¦true¦ ¦procedure¦apoc.periodic.iterate¦apoc.periodic.iterate(cypherIterate :: STRING?, cypherAction :: STRING?, config :: MAP?) :: (batches :: INTEGER?, total :: INTEGER?, timeTaken :: INTEGER?, committedOperations :: INTEGER?, failedOperations :: INTEGER?, failedBatches :: INTEGER?, retries :: INTEGER?, errorMessages :: MAP?, batch :: MAP?, operations :: MAP?, wasTerminated :: BOOLEAN?, failedParams :: MAP?, updateStatistics :: MAP?)¦apoc.periodic.iterate('statement returning items', 'statement per item', {batchSize:1000,iterateList:true,parallel:false,params:{},concurrency:50,retries:0}) YIELD batches, total - run the second statement for each item returned by the first statement. Returns number of batches and total processed rows¦true¦ ¦procedure¦apoc.periodic.list¦apoc.periodic.list() :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)¦apoc.periodic.list - list all jobs¦true¦ -¦procedure¦apoc.periodic.repeat¦apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rate :: INTEGER?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)¦apoc.periodic.repeat('name',statement,repeat-rate-in-seconds, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement.¦true¦ +¦procedure¦apoc.periodic.repeat¦apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rateOrTime :: OBJECT?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)¦apoc.periodic.repeat('name',statement, rateOrTime, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement.¦true¦ ¦procedure¦apoc.periodic.rock_n_roll¦apoc.periodic.rock_n_roll(cypherIterate :: STRING?, cypherAction :: STRING?, batchSize :: INTEGER?) :: (batches :: INTEGER?, total :: INTEGER?, timeTaken :: INTEGER?, committedOperations :: INTEGER?, failedOperations :: INTEGER?, failedBatches :: INTEGER?, retries :: INTEGER?, errorMessages :: MAP?, batch :: MAP?, operations :: MAP?, wasTerminated :: BOOLEAN?, failedParams :: MAP?, updateStatistics :: MAP?)¦apoc.periodic.rock_n_roll('some cypher for iteration', 'some cypher as action on each iteration', 10000) YIELD batches, total - run the action statement in batches over the iterator statement's results in a separate thread. Returns number of batches and total processed rows¦false¦xref::graph-updates/periodic-execution.adoc#periodic-rock-n-roll ¦procedure¦apoc.periodic.rock_n_roll_while¦apoc.periodic.rock_n_roll_while(cypherLoop :: STRING?, cypherIterate :: STRING?, cypherAction :: STRING?, batchSize :: INTEGER?) :: (loop :: ANY?, batches :: INTEGER?, total :: INTEGER?)¦apoc.periodic.rock_n_roll_while('some cypher for knowing when to stop', 'some cypher for iteration', 'some cypher as action on each iteration', 10000) YIELD batches, total - run the action statement in batches over the iterator statement's results in a separate thread. Returns number of batches and total processed rows¦false¦ ¦procedure¦apoc.periodic.submit¦apoc.periodic.submit(name :: STRING?, statement :: STRING?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)¦apoc.periodic.submit('name',statement) - submit a one-off background statement¦true¦ diff --git a/docs/asciidoc/modules/ROOT/pages/overview/apoc.periodic/apoc.periodic.repeat.adoc b/docs/asciidoc/modules/ROOT/pages/overview/apoc.periodic/apoc.periodic.repeat.adoc index 14a075333c..bae3382840 100644 --- a/docs/asciidoc/modules/ROOT/pages/overview/apoc.periodic/apoc.periodic.repeat.adoc +++ b/docs/asciidoc/modules/ROOT/pages/overview/apoc.periodic/apoc.periodic.repeat.adoc @@ -8,13 +8,13 @@ This file is generated by DocsTest, so don't change it! label:procedure[] label:apoc-core[] [.emphasis] -apoc.periodic.repeat('name',statement,repeat-rate-in-seconds, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement. +apoc.periodic.repeat('name',statement, rateOrTime, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement. == Signature [source] ---- -apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rate :: INTEGER?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?) +apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rateOrTime :: OBJECT?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?) ---- == Input parameters diff --git a/docs/asciidoc/modules/ROOT/pages/overview/apoc.periodic/index.adoc b/docs/asciidoc/modules/ROOT/pages/overview/apoc.periodic/index.adoc index 342810f4c0..9c18c9ab15 100644 --- a/docs/asciidoc/modules/ROOT/pages/overview/apoc.periodic/index.adoc +++ b/docs/asciidoc/modules/ROOT/pages/overview/apoc.periodic/index.adoc @@ -35,7 +35,7 @@ apoc.periodic.list - list all jobs |label:apoc-core[] |xref::overview/apoc.periodic/apoc.periodic.repeat.adoc[apoc.periodic.repeat icon:book[]] -apoc.periodic.repeat('name',statement,repeat-rate-in-seconds, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement. +apoc.periodic.repeat('name',statement, rateOrTime, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement. |label:procedure[] |label:apoc-core[] |xref::overview/apoc.periodic/apoc.periodic.rock_n_roll.adoc[apoc.periodic.rock_n_roll icon:book[]] diff --git a/docs/asciidoc/modules/ROOT/partials/generated-documentation/documentation.adoc b/docs/asciidoc/modules/ROOT/partials/generated-documentation/documentation.adoc index 774bcd3d4b..1f50b4d706 100644 --- a/docs/asciidoc/modules/ROOT/partials/generated-documentation/documentation.adoc +++ b/docs/asciidoc/modules/ROOT/partials/generated-documentation/documentation.adoc @@ -2554,7 +2554,7 @@ apoc.periodic.list - list all jobs |label:apoc-core[] |xref::overview/apoc.periodic/apoc.periodic.repeat.adoc[apoc.periodic.repeat icon:book[]] -apoc.periodic.repeat('name',statement,repeat-rate-in-seconds, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement. +apoc.periodic.repeat('name',statement, rateOrTime, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement. |label:procedure[] |label:apoc-core[] |xref::overview/apoc.periodic/apoc.periodic.rock_n_roll.adoc[apoc.periodic.rock_n_roll icon:book[]] diff --git a/docs/asciidoc/modules/ROOT/partials/usage/apoc.periodic.cancel.adoc b/docs/asciidoc/modules/ROOT/partials/usage/apoc.periodic.cancel.adoc index bbbdd080e3..17c2ffab4c 100644 --- a/docs/asciidoc/modules/ROOT/partials/usage/apoc.periodic.cancel.adoc +++ b/docs/asciidoc/modules/ROOT/partials/usage/apoc.periodic.cancel.adoc @@ -10,4 +10,4 @@ CALL apoc.periodic.cancel("create-people"); |=== | name | delay | rate | done | cancelled | "create-people" | 0 | 0 | TRUE | TRUE -|=== \ No newline at end of file +|=== diff --git a/docs/asciidoc/modules/ROOT/partials/usage/apoc.periodic.repeat.adoc b/docs/asciidoc/modules/ROOT/partials/usage/apoc.periodic.repeat.adoc index 220a501d74..425d706ed3 100644 --- a/docs/asciidoc/modules/ROOT/partials/usage/apoc.periodic.repeat.adoc +++ b/docs/asciidoc/modules/ROOT/partials/usage/apoc.periodic.repeat.adoc @@ -31,4 +31,16 @@ RETURN count(*) AS count; | 110 |=== + +We can also schedule the task daily starting at a specific date or time, with the 3rd parameter as a `java.time.temporal.Temporal`, for example: + +[source,cypher] +---- +CALL apoc.periodic.repeat( + "create-people", + "UNWIND range(1,10) AS id CREATE (:Person {uuid: apoc.create.uuid()})", + time("11:30") +); +---- + If we want to cancel this job, we can use the xref::overview/apoc.periodic/apoc.periodic.cancel.adoc[] procedure. \ No newline at end of file diff --git a/docs/asciidoc/modules/ROOT/partials/usage/apoc.periodic.submit.adoc b/docs/asciidoc/modules/ROOT/partials/usage/apoc.periodic.submit.adoc index 56eec330cc..688475ea6c 100644 --- a/docs/asciidoc/modules/ROOT/partials/usage/apoc.periodic.submit.adoc +++ b/docs/asciidoc/modules/ROOT/partials/usage/apoc.periodic.submit.adoc @@ -11,4 +11,15 @@ CALL apoc.periodic.submit( |=== | name | delay | rate | done | cancelled | "create-person" | 0 | 0 | FALSE | FALSE -|=== \ No newline at end of file +|=== + +We can also schedule the task at a specific date or time, with the config `{atTime: java.time.temporal.Temporal}`, for example: + +[source,cypher] +---- +CALL apoc.periodic.submit( + "create-person", + "CREATE (:Person {name: 'Michael Hunger'})", + {atTime: time("11:30")} +); +---- \ No newline at end of file diff --git a/full/src/test/resources/signatures.csv b/full/src/test/resources/signatures.csv index c36cf0ae07..94ee471606 100644 --- a/full/src/test/resources/signatures.csv +++ b/full/src/test/resources/signatures.csv @@ -317,7 +317,7 @@ apoc.periodic.commit,"apoc.periodic.commit(statement :: STRING?, params = {} :: apoc.periodic.countdown,"apoc.periodic.countdown(name :: STRING?, statement :: STRING?, rate :: INTEGER?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)" apoc.periodic.iterate,"apoc.periodic.iterate(cypherIterate :: STRING?, cypherAction :: STRING?, config :: MAP?) :: (batches :: INTEGER?, total :: INTEGER?, timeTaken :: INTEGER?, committedOperations :: INTEGER?, failedOperations :: INTEGER?, failedBatches :: INTEGER?, retries :: INTEGER?, errorMessages :: MAP?, batch :: MAP?, operations :: MAP?, wasTerminated :: BOOLEAN?, failedParams :: MAP?)" apoc.periodic.list,"apoc.periodic.list() :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)" -apoc.periodic.repeat,"apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rate :: INTEGER?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)" +apoc.periodic.repeat,"apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rateOrTime :: OBJECT?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)" apoc.periodic.rock_n_roll,"apoc.periodic.rock_n_roll(cypherIterate :: STRING?, cypherAction :: STRING?, batchSize :: INTEGER?) :: (batches :: INTEGER?, total :: INTEGER?, timeTaken :: INTEGER?, committedOperations :: INTEGER?, failedOperations :: INTEGER?, failedBatches :: INTEGER?, retries :: INTEGER?, errorMessages :: MAP?, batch :: MAP?, operations :: MAP?, wasTerminated :: BOOLEAN?, failedParams :: MAP?)" apoc.periodic.rock_n_roll_while,"apoc.periodic.rock_n_roll_while(cypherLoop :: STRING?, cypherIterate :: STRING?, cypherAction :: STRING?, batchSize :: INTEGER?) :: (loop :: ANY?, batches :: INTEGER?, total :: INTEGER?)" apoc.periodic.submit,"apoc.periodic.submit(name :: STRING?, statement :: STRING?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)"