Skip to content

Commit

Permalink
Fixes #126: apoc.periodic.submit fails with schema operations (core) (#…
Browse files Browse the repository at this point in the history
…208)

* Fixes #126: apoc.periodic.submit fails with schema operations (core)
* removed unused import
  • Loading branch information
vga91 authored Oct 18, 2022
1 parent fa0104a commit 3700669
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 39 deletions.
41 changes: 41 additions & 0 deletions common/src/main/java/apoc/periodic/PeriodicUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.neo4j.procedure.TerminationGuard;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -245,6 +246,46 @@ public static Stream<BatchAndTotalResult> iterateAndExecuteBatchedInSeparateThre
}
return Stream.of(collector.getResult());
}

public static Stream<JobInfo> submitProc(String name, String statement, Map<String, Object> config, GraphDatabaseService db, Log log, Pools pools) {
Map<String,Object> params = (Map) config.getOrDefault("params", Collections.emptyMap());
JobInfo info = submitJob(name, () -> {
try {
db.executeTransactionally(statement, params);
} catch(Exception e) {
log.warn("in background task via submit", e);
throw new RuntimeException(e);
}
}, log, pools);
return Stream.of(info);
}

/**
* Call from a procedure that gets a <code>@Context GraphDatbaseAPI db;</code> injected and provide that db to the runnable.
*/
public static <T> JobInfo submitJob(String name, Runnable task, Log log, Pools pools) {
JobInfo info = new JobInfo(name);
Future<T> future = pools.getJobList().remove(info);
if (future != null && !future.isDone()) future.cancel(false);

Runnable wrappingTask = wrapTask(name, task, log);
Future newFuture = pools.getScheduledExecutorService().submit(wrappingTask);
pools.getJobList().put(info,newFuture);
return info;
}

public static Runnable wrapTask(String name, Runnable task, Log log) {
return () -> {
log.debug("Executing task " + name);
try {
task.run();
} catch (Exception e) {
log.error("Error while executing task " + name + " because of the following exception (the task will be killed):", e);
throw e;
}
log.debug("Executed task " + name);
};
}
}

/*
Expand Down
50 changes: 11 additions & 39 deletions core/src/main/java/apoc/periodic/Periodic.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static org.neo4j.graphdb.QueryExecutionType.QueryType.READ_ONLY;
import static org.neo4j.graphdb.QueryExecutionType.QueryType.READ_WRITE;
import static org.neo4j.graphdb.QueryExecutionType.QueryType.WRITE;
import static apoc.periodic.PeriodicUtils.recordError;
import static apoc.periodic.PeriodicUtils.submitJob;
import static apoc.periodic.PeriodicUtils.submitProc;
import static apoc.periodic.PeriodicUtils.wrapTask;
import static apoc.util.Util.merge;

public class Periodic {
Expand Down Expand Up @@ -156,16 +162,7 @@ public Stream<JobInfo> cancel(@Name("name") String name) {
@Description("apoc.periodic.submit('name',statement,params) - submit a one-off background statement; parameter 'params' is optional and can contain query parameters for Cypher statement")
public Stream<JobInfo> submit(@Name("name") String name, @Name("statement") String statement, @Name(value = "params", defaultValue = "{}") Map<String,Object> config) {
validateQuery(statement);
Map<String,Object> params = (Map)config.getOrDefault("params", Collections.emptyMap());
JobInfo info = submit(name, () -> {
try {
db.executeTransactionally(statement, params);
} catch(Exception e) {
log.warn("in background task via submit", e);
throw new RuntimeException(e);
}
}, log);
return Stream.of(info);
return submitProc(name, statement, config, db, log, pools);
}

@Procedure(mode = Mode.WRITE)
Expand All @@ -180,31 +177,19 @@ public Stream<JobInfo> repeat(@Name("name") String name, @Name("statement") Stri
}

private void validateQuery(String statement) {
Util.validateQuery(db, statement);
Util.validateQuery(db, statement,
READ_ONLY, WRITE, READ_WRITE);
}

@Procedure(mode = Mode.WRITE)
@Description("apoc.periodic.countdown('name',statement,repeat-rate-in-seconds) submit a repeatedly-called background statement until it returns 0")
public Stream<JobInfo> countdown(@Name("name") String name, @Name("statement") String statement, @Name("rate") long rate) {
validateQuery(statement);
JobInfo info = submit(name, new Countdown(name, statement, rate, log), log);
JobInfo info = submitJob(name, new Countdown(name, statement, rate, log), log, pools);
info.rate = rate;
return Stream.of(info);
}

/**
* Call from a procedure that gets a <code>@Context GraphDatbaseAPI db;</code> injected and provide that db to the runnable.
*/
public <T> JobInfo submit(String name, Runnable task, Log log) {
JobInfo info = new JobInfo(name);
Future<T> future = pools.getJobList().remove(info);
if (future != null && !future.isDone()) future.cancel(false);

Runnable wrappingTask = wrapTask(name, task, log);
Future newFuture = pools.getScheduledExecutorService().submit(wrappingTask);
pools.getJobList().put(info,newFuture);
return info;
}

/**
* Call from a procedure that gets a <code>@Context GraphDatbaseAPI db;</code> injected and provide that db to the runnable.
Expand All @@ -220,19 +205,6 @@ public JobInfo schedule(String name, Runnable task, long delay, long repeat) {
return info;
}

private static Runnable wrapTask(String name, Runnable task, Log log) {
return () -> {
log.debug("Executing task " + name);
try {
task.run();
} catch (Exception e) {
log.error("Error while executing task " + name + " because of the following exception (the task will be killed):", e);
throw e;
}
log.debug("Executed task " + name);
};
}

/**
* Invoke cypherAction in batched transactions being fed from cypherIteration running in main thread
* @param cypherIterate
Expand Down Expand Up @@ -325,7 +297,7 @@ public Countdown(String name, String statement, long rate, Log log) {
@Override
public void run() {
if (Periodic.this.executeNumericResultStatement(statement, Collections.emptyMap()) > 0) {
pools.getScheduledExecutorService().schedule(() -> submit(name, this, log), rate, TimeUnit.SECONDS);
pools.getScheduledExecutorService().schedule(() -> submitJob(name, this, log, pools), rate, TimeUnit.SECONDS);
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/java/apoc/periodic/PeriodicTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ public void testSubmitStatement() throws Exception {
testCall(db, callList, (r) -> assertEquals(true, r.get("done")));
}

@Test
public void testSubmitWithSchemaOperation() {
try {
testCall(db, "CALL apoc.periodic.submit('subSchema','CREATE INDEX periodicIdx FOR (n:Bar) ON (n.first_name, n.last_name)')",
(row) -> fail("Should fail because of unsupported schema operation"));
} catch (RuntimeException e) {
final String expected = "Failed to invoke procedure `apoc.periodic.submit`: " +
"Caused by: java.lang.RuntimeException: Supported query types for the operation are [READ_ONLY, WRITE, READ_WRITE]";
assertEquals(expected, e.getMessage());
}
}

@Test
public void testSubmitStatementWithParams() throws Exception {
String callList = "CALL apoc.periodic.list()";
Expand Down

0 comments on commit 3700669

Please sign in to comment.