Skip to content

Commit

Permalink
Fixes neo4j-contrib#2373 and #218: Schedule task for one-off executio…
Browse files Browse the repository at this point in the history
…n at specified time (neo4j-contrib#2520)

* Fixes neo4j-contrib#2373 and #218: Schedule task for one-off execution at specified time

* docs param changes
  • Loading branch information
vga91 authored Oct 24, 2022
1 parent 50d18f7 commit 212a4fc
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 25 deletions.
93 changes: 80 additions & 13 deletions core/src/main/java/apoc/periodic/Periodic.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import apoc.Pools;
import apoc.util.Util;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.Result;
Expand All @@ -16,6 +17,12 @@
import org.neo4j.logging.Log;
import org.neo4j.procedure.*;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -30,6 +37,8 @@

public class Periodic {

public static final String ERROR_DATE_BEFORE = "The provided date is before current date";

enum Planner {DEFAULT, COST, IDP, DP }

public static final Pattern PLANNER_PATTERN = Pattern.compile("\\bplanner\\s*=\\s*[^\\s]*", Pattern.CASE_INSENSITIVE);
Expand Down Expand Up @@ -167,25 +176,64 @@ public Stream<JobInfo> cancel(@Name("name") String name) {
public Stream<JobInfo> submit(@Name("name") String name, @Name("statement") String statement, @Name(value = "params", defaultValue = "{}") Map<String,Object> config) {
validateQuery(statement);
Map<String,Object> params = (Map)config.getOrDefault("params", Collections.emptyMap());
JobInfo info = submit(name, () -> {

final Temporal atTime = (Temporal) (config.get("atTime"));

final Runnable task = () -> {
try {
db.executeTransactionally(statement, params);
} catch(Exception e) {
} catch (Exception e) {
log.warn("in background task via submit", e);
throw new RuntimeException(e);
}
}, log);
};

JobInfo info = atTime != null
? getJobInfo(name, atTime, task, ScheduleType.DEFAULT)
: submit(name, task);

return Stream.of(info);
}

private JobInfo getJobInfo(String name, Temporal atTime, Runnable task, ScheduleType scheduleType) {
if (atTime instanceof LocalDate) {
atTime = ((LocalDate) atTime).atStartOfDay();
}
final boolean isTime = atTime instanceof OffsetTime || atTime instanceof LocalTime;
Temporal now = isTime
? LocalTime.now()
: LocalDateTime.now();

final long secPerDay = DateUtils.MILLIS_PER_DAY / 1000L;
long delay = now.until(atTime, ChronoUnit.SECONDS);
if (isTime && delay < 0) {
// we consider the day after
delay = delay + secPerDay;
}
if (delay < 0) {
throw new RuntimeException(ERROR_DATE_BEFORE);
}
return schedule(name, task, delay, secPerDay, scheduleType);
}

@Procedure(mode = Mode.WRITE)
@Description("apoc.periodic.repeat('name',statement,repeat-rate-in-seconds, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement.")
public Stream<JobInfo> repeat(@Name("name") String name, @Name("statement") String statement, @Name("rate") long rate, @Name(value = "config", defaultValue = "{}") Map<String,Object> config ) {
@Description("apoc.periodic.repeat('name',statement, rateOrTime, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement.")
public Stream<JobInfo> repeat(@Name("name") String name, @Name("statement") String statement, @Name("rateOrTime") Object rateOrTime, @Name(value = "config", defaultValue = "{}") Map<String,Object> config ) {

validateQuery(statement);
Map<String,Object> params = (Map)config.getOrDefault("params", Collections.emptyMap());
JobInfo info = schedule(name, () -> {
final Runnable runnable = () -> {
db.executeTransactionally(statement, params);
},0,rate);
};
final JobInfo info;
if (rateOrTime instanceof Long) {
info = schedule(name, runnable,0, (long) rateOrTime);
} else if(rateOrTime instanceof Temporal) {
info = getJobInfo(name, (Temporal) rateOrTime, runnable, ScheduleType.FIXED_RATE);
} else {
throw new RuntimeException("invalid type of rateOrTime parameter");
}

return Stream.of(info);
}

Expand All @@ -197,15 +245,15 @@ private void validateQuery(String statement) {
@Description("apoc.periodic.countdown('name',statement,repeat-rate-in-seconds) submit a repeatedly-called background statement until it returns 0")
public Stream<JobInfo> countdown(@Name("name") String name, @Name("statement") String statement, @Name("rate") long rate) {
validateQuery(statement);
JobInfo info = submit(name, new Countdown(name, statement, rate, log), log);
JobInfo info = submit(name, new Countdown(name, statement, rate, log));
info.rate = rate;
return Stream.of(info);
}

/**
* Call from a procedure that gets a <code>@Context GraphDatbaseAPI db;</code> injected and provide that db to the runnable.
*/
public <T> JobInfo submit(String name, Runnable task, Log log) {
public <T> JobInfo submit(String name, Runnable task) {
JobInfo info = new JobInfo(name);
Future<T> future = pools.getJobList().remove(info);
if (future != null && !future.isDone()) future.cancel(false);
Expand All @@ -216,20 +264,39 @@ public <T> JobInfo submit(String name, Runnable task, Log log) {
return info;
}

private enum ScheduleType { DEFAULT, FIXED_DELAY, FIXED_RATE }

public JobInfo schedule(String name, Runnable task, long delay, long repeat) {
return schedule(name, task, delay, repeat, ScheduleType.FIXED_DELAY);
}

/**
* Call from a procedure that gets a <code>@Context GraphDatbaseAPI db;</code> injected and provide that db to the runnable.
*/
public JobInfo schedule(String name, Runnable task, long delay, long repeat) {
JobInfo info = new JobInfo(name,delay,repeat);
public JobInfo schedule(String name, Runnable task, long delay, long repeat, ScheduleType isFixedDelay) {
JobInfo info = new JobInfo(name, delay, isFixedDelay.equals(ScheduleType.DEFAULT) ? 0 : repeat);
Future future = pools.getJobList().remove(info);
if (future != null && !future.isDone()) future.cancel(false);

Runnable wrappingTask = wrapTask(name, task, log);
ScheduledFuture<?> newFuture = pools.getScheduledExecutorService().scheduleWithFixedDelay(wrappingTask, delay, repeat, TimeUnit.SECONDS);
ScheduledFuture<?> newFuture = getScheduledFuture(wrappingTask, delay, repeat, isFixedDelay);
pools.getJobList().put(info,newFuture);
return info;
}

private ScheduledFuture<?> getScheduledFuture(Runnable wrappingTask, long delay, long repeat, ScheduleType isFixedDelay) {
final ScheduledExecutorService service = pools.getScheduledExecutorService();
final TimeUnit timeUnit = TimeUnit.SECONDS;
switch (isFixedDelay) {
case FIXED_DELAY:
return service.scheduleWithFixedDelay(wrappingTask, delay, repeat, timeUnit);
case FIXED_RATE:
return service.scheduleAtFixedRate(wrappingTask, delay, repeat, timeUnit);
default:
return service.schedule(wrappingTask, delay, timeUnit);
}
}

private static Runnable wrapTask(String name, Runnable task, Log log) {
return () -> {
log.debug("Executing task " + name);
Expand Down Expand Up @@ -453,7 +520,7 @@ public Countdown(String name, String statement, long rate, Log log) {
@Override
public void run() {
if (Periodic.this.executeNumericResultStatement(statement, Collections.emptyMap()) > 0) {
pools.getScheduledExecutorService().schedule(() -> submit(name, this, log), rate, TimeUnit.SECONDS);
pools.getScheduledExecutorService().schedule(() -> submit(name, this), rate, TimeUnit.SECONDS);
}
}
}
Expand Down
67 changes: 67 additions & 0 deletions core/src/test/java/apoc/periodic/PeriodicTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,23 @@
import org.neo4j.test.rule.DbmsRule;
import org.neo4j.test.rule.ImpermanentDbmsRule;

import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static apoc.periodic.Periodic.ERROR_DATE_BEFORE;
import static apoc.periodic.Periodic.applyPlanner;
import static apoc.util.TestUtil.testCall;
import static apoc.util.TestUtil.testResult;
import static apoc.util.Util.map;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
import static java.util.stream.StreamSupport.stream;
import static org.hamcrest.CoreMatchers.equalTo;
Expand All @@ -42,6 +47,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.driver.internal.util.Iterables.count;
import static org.neo4j.test.assertion.Assert.assertEventually;

public class PeriodicTest {

Expand Down Expand Up @@ -79,6 +85,67 @@ public void testSubmitStatement() throws Exception {
testCall(db, callList, (r) -> assertEquals(true, r.get("done")));
}

@Test
public void testSubmitStatementAtTime() {
String callList = "CALL apoc.periodic.list()";
assertFalse(db.executeTransactionally(callList, Collections.emptyMap(), Result::hasNext));

TestUtil.testFail(db, "CALL apoc.periodic.submit('submitAtTime','create (:Ajeje)', {atTime: datetime('2019')})", RuntimeException.class);

try {
testCall(db, "CALL apoc.periodic.submit('errorDate','create (:ToFail)', {atTime: $time})",
map("time", ZonedDateTime.now().minusDays(1)),
(row) -> fail() );
} catch (RuntimeException e) {
assertTrue(e.getMessage().contains(ERROR_DATE_BEFORE));
}

testCall(db, "CALL apoc.periodic.submit('submitAtTime','create (:Ajeje)', {atTime: $time})",
map("time", ZonedDateTime.now().plusSeconds(10)),
(row) -> {
assertEquals("submitAtTime", row.get("name"));
assertEquals(false, row.get("done"));
assertEquals(false, row.get("cancelled"));
assertTrue((long) row.get("delay") <= 10);
assertEquals(0L, row.get("rate"));
});

final String queryCount = "MATCH (:Ajeje) RETURN COUNT(*) AS count";
long countBefore = TestUtil.singleResultFirstColumn(db, queryCount);
assertEquals(0L, countBefore);
assertEventually(() -> db.executeTransactionally(queryCount,
emptyMap(), (r) -> r.<Long>columnAs("count").next()),
value -> value == 1L, 20L, TimeUnit.SECONDS);

assertEventually(() -> db.executeTransactionally(callList, emptyMap(), Result::hasNext),
value -> !value, 10L, TimeUnit.SECONDS);
}

@Test
public void testRepeatStatementAtTime() {
String callList = "CALL apoc.periodic.list()";
assertFalse(db.executeTransactionally(callList, Collections.emptyMap(), Result::hasNext));

testCall(db, "CALL apoc.periodic.repeat('repeatAtTime','create (:Brazorf)', time($timeAsString))",
map("timeAsString", LocalTime.now().plusSeconds(10).toString()),
(row) -> {
assertEquals("repeatAtTime", row.get("name"));
assertEquals(false, row.get("done"));
assertEquals(false, row.get("cancelled"));
assertTrue((long) row.get("delay") <= 10);
assertEquals(86400L, row.get("rate"));
});

final String queryCount = "MATCH (:Brazorf) RETURN COUNT(*) AS count";
long countBefore = TestUtil.singleResultFirstColumn(db, queryCount);
assertEquals(0L, countBefore);
assertEventually(() -> db.executeTransactionally(queryCount,
emptyMap(), (r) -> r.<Long>columnAs("count").next()),
value -> value == 1L, 20L, TimeUnit.SECONDS);

testCall(db, callList, (r) -> assertEquals(false, r.get("done")));
}

@Test
public void testSubmitStatementWithParams() throws Exception {
String callList = "CALL apoc.periodic.list()";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
¦apoc.periodic.countdown(name :: STRING?, statement :: STRING?, rate :: INTEGER?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)
¦apoc.periodic.iterate(cypherIterate :: STRING?, cypherAction :: STRING?, config :: MAP?) :: (batches :: INTEGER?, total :: INTEGER?, timeTaken :: INTEGER?, committedOperations :: INTEGER?, failedOperations :: INTEGER?, failedBatches :: INTEGER?, retries :: INTEGER?, errorMessages :: MAP?, batch :: MAP?, operations :: MAP?, wasTerminated :: BOOLEAN?, failedParams :: MAP?, updateStatistics :: MAP?)
¦apoc.periodic.list() :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)
¦apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rate :: INTEGER?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)
¦apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rateOrTime :: OBJECT?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)
¦apoc.periodic.rock_n_roll(cypherIterate :: STRING?, cypherAction :: STRING?, batchSize :: INTEGER?) :: (batches :: INTEGER?, total :: INTEGER?, timeTaken :: INTEGER?, committedOperations :: INTEGER?, failedOperations :: INTEGER?, failedBatches :: INTEGER?, retries :: INTEGER?, errorMessages :: MAP?, batch :: MAP?, operations :: MAP?, wasTerminated :: BOOLEAN?, failedParams :: MAP?, updateStatistics :: MAP?)
¦apoc.periodic.rock_n_roll_while(cypherLoop :: STRING?, cypherIterate :: STRING?, cypherAction :: STRING?, batchSize :: INTEGER?) :: (loop :: ANY?, batches :: INTEGER?, total :: INTEGER?)
¦apoc.periodic.submit(name :: STRING?, statement :: STRING?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ apoc.periodic.list - list all jobs
|label:apoc-core[]
|xref::overview/apoc.periodic/apoc.periodic.adoc[apoc.periodic.repeat icon:book[]]

apoc.periodic.repeat('name',statement,repeat-rate-in-seconds, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement.
apoc.periodic.repeat('name',statement, rateOrTime, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement.
|label:procedure[]
|label:apoc-core[]
|xref::overview/apoc.periodic/apoc.periodic.adoc[apoc.periodic.rock_n_roll icon:book[]]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
¦signature
¦apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rate :: INTEGER?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)
¦apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rateOrTime :: OBJECT?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
¦type¦qualified name¦signature¦description
¦procedure¦apoc.periodic.repeat¦apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rate :: INTEGER?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)¦apoc.periodic.repeat('name',statement,repeat-rate-in-seconds, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement.
¦procedure¦apoc.periodic.repeat¦apoc.periodic.repeat(name :: STRING?, statement :: STRING?, rateOrTime :: OBJECT?, config = {} :: MAP?) :: (name :: STRING?, delay :: INTEGER?, rate :: INTEGER?, done :: BOOLEAN?, cancelled :: BOOLEAN?)¦apoc.periodic.repeat('name',statement, rateOrTime, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement.
Loading

0 comments on commit 212a4fc

Please sign in to comment.