Skip to content

Commit

Permalink
Fixes neo4j/apoc#126: apoc.periodic.submit fails with schema operatio…
Browse files Browse the repository at this point in the history
…ns (neo4j/apoc#208)

* Fixes neo4j/apoc#126: apoc.periodic.submit fails with schema operations (core)
* removed unused import
  • Loading branch information
vga91 committed Oct 18, 2022
1 parent 60e228e commit d6a3f32
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 40 deletions.
50 changes: 11 additions & 39 deletions core/src/main/java/apoc/periodic/Periodic.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static org.neo4j.graphdb.QueryExecutionType.QueryType.READ_ONLY;
import static org.neo4j.graphdb.QueryExecutionType.QueryType.READ_WRITE;
import static org.neo4j.graphdb.QueryExecutionType.QueryType.WRITE;
import static apoc.periodic.PeriodicUtils.submitJob;
import static apoc.periodic.PeriodicUtils.submitProc;
import static apoc.periodic.PeriodicUtils.wrapTask;
import static apoc.util.Util.merge;

public class Periodic {
Expand Down Expand Up @@ -166,16 +172,7 @@ public Stream<JobInfo> cancel(@Name("name") String name) {
@Description("apoc.periodic.submit('name',statement,params) - creates a background job which executes a Cypher statement once. The parameter 'params' is optional and can contain query parameters for the Cypher statement")
public Stream<JobInfo> submit(@Name("name") String name, @Name("statement") String statement, @Name(value = "params", defaultValue = "{}") Map<String,Object> config) {
validateQuery(statement);
Map<String,Object> params = (Map)config.getOrDefault("params", Collections.emptyMap());
JobInfo info = submit(name, () -> {
try {
db.executeTransactionally(statement, params);
} catch(Exception e) {
log.warn("in background task via submit", e);
throw new RuntimeException(e);
}
}, log);
return Stream.of(info);
return submitProc(name, statement, config, db, log, pools);
}

@Procedure(mode = Mode.WRITE)
Expand All @@ -190,31 +187,19 @@ public Stream<JobInfo> repeat(@Name("name") String name, @Name("statement") Stri
}

private void validateQuery(String statement) {
Util.validateQuery(db, statement);
Util.validateQuery(db, statement,
READ_ONLY, WRITE, READ_WRITE);
}

@Procedure(mode = Mode.WRITE)
@Description("apoc.periodic.countdown('name',statement,repeat-rate-in-seconds) creates a background job that will repeatedly execute the given Cypher statement until it returns 0.")
public Stream<JobInfo> countdown(@Name("name") String name, @Name("statement") String statement, @Name("rate") long rate) {
validateQuery(statement);
JobInfo info = submit(name, new Countdown(name, statement, rate, log), log);
JobInfo info = submitJob(name, new Countdown(name, statement, rate, log), log, pools);
info.rate = rate;
return Stream.of(info);
}

/**
* Call from a procedure that gets a <code>@Context GraphDatbaseAPI db;</code> injected and provide that db to the runnable.
*/
public <T> JobInfo submit(String name, Runnable task, Log log) {
JobInfo info = new JobInfo(name);
Future<T> future = pools.getJobList().remove(info);
if (future != null && !future.isDone()) future.cancel(false);

Runnable wrappingTask = wrapTask(name, task, log);
Future newFuture = pools.getScheduledExecutorService().submit(wrappingTask);
pools.getJobList().put(info,newFuture);
return info;
}

/**
* Call from a procedure that gets a <code>@Context GraphDatbaseAPI db;</code> injected and provide that db to the runnable.
Expand All @@ -230,19 +215,6 @@ public JobInfo schedule(String name, Runnable task, long delay, long repeat) {
return info;
}

private static Runnable wrapTask(String name, Runnable task, Log log) {
return () -> {
log.debug("Executing task " + name);
try {
task.run();
} catch (Exception e) {
log.error("Error while executing task " + name + " because of the following exception (the task will be killed):", e);
throw e;
}
log.debug("Executed task " + name);
};
}

/**
* Invoke cypherAction in batched transactions being fed from cypherIteration running in main thread
* @param cypherIterate
Expand Down Expand Up @@ -453,7 +425,7 @@ public Countdown(String name, String statement, long rate, Log log) {
@Override
public void run() {
if (Periodic.this.executeNumericResultStatement(statement, Collections.emptyMap()) > 0) {
pools.getScheduledExecutorService().schedule(() -> submit(name, this, log), rate, TimeUnit.SECONDS);
pools.getScheduledExecutorService().schedule(() -> submitJob(name, this, log, pools), rate, TimeUnit.SECONDS);
}
}
}
Expand Down
43 changes: 43 additions & 0 deletions core/src/main/java/apoc/periodic/PeriodicUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.neo4j.procedure.TerminationGuard;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -23,6 +24,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static apoc.periodic.Periodic.JobInfo;

public class PeriodicUtils {

private PeriodicUtils() {
Expand Down Expand Up @@ -116,6 +119,46 @@ public static Stream<BatchAndTotalResult> iterateAndExecuteBatchedInSeparateThre
}
return Stream.of(collector.getResult());
}

public static Stream<JobInfo> submitProc(String name, String statement, Map<String, Object> config, GraphDatabaseService db, Log log, Pools pools) {
Map<String,Object> params = (Map) config.getOrDefault("params", Collections.emptyMap());
JobInfo info = submitJob(name, () -> {
try {
db.executeTransactionally(statement, params);
} catch(Exception e) {
log.warn("in background task via submit", e);
throw new RuntimeException(e);
}
}, log, pools);
return Stream.of(info);
}

/**
* Call from a procedure that gets a <code>@Context GraphDatbaseAPI db;</code> injected and provide that db to the runnable.
*/
public static <T> JobInfo submitJob(String name, Runnable task, Log log, Pools pools) {
JobInfo info = new JobInfo(name);
Future<T> future = pools.getJobList().remove(info);
if (future != null && !future.isDone()) future.cancel(false);

Runnable wrappingTask = wrapTask(name, task, log);
Future newFuture = pools.getScheduledExecutorService().submit(wrappingTask);
pools.getJobList().put(info,newFuture);
return info;
}

public static Runnable wrapTask(String name, Runnable task, Log log) {
return () -> {
log.debug("Executing task " + name);
try {
task.run();
} catch (Exception e) {
log.error("Error while executing task " + name + " because of the following exception (the task will be killed):", e);
throw e;
}
log.debug("Executed task " + name);
};
}
}

/*
Expand Down
30 changes: 29 additions & 1 deletion core/src/test/java/apoc/periodic/PeriodicTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package apoc.periodic;

import apoc.schema.Schemas;
import apoc.util.MapUtil;
import apoc.util.TestUtil;
import org.junit.Before;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class PeriodicTest {

@Before
public void initDb() throws Exception {
TestUtil.registerProcedure(db, Periodic.class);
TestUtil.registerProcedure(db, Periodic.class, Schemas.class);
db.executeTransactionally("call apoc.periodic.list() yield name call apoc.periodic.cancel(name) yield name as name2 return count(*)");
}

Expand All @@ -79,6 +80,33 @@ public void testSubmitStatement() throws Exception {
testCall(db, callList, (r) -> assertEquals(true, r.get("done")));
}

@Test
public void testSubmitWithCreateIndexSchemaOperation() {
try {
testCall(db, "CALL apoc.periodic.submit('subSchema','CREATE INDEX periodicIdx FOR (n:Bar) ON (n.first_name, n.last_name)')",
(row) -> fail("Should fail because of unsupported schema operation"));
} catch (RuntimeException e) {
final String expected = "Failed to invoke procedure `apoc.periodic.submit`: " +
"Caused by: java.lang.RuntimeException: Supported query types for the operation are [READ_ONLY, WRITE, READ_WRITE]";
assertEquals(expected, e.getMessage());
}
}

@Test
public void testSubmitWithCreateUniqueConstraintSchemaOperation() {
try {
db.executeTransactionally("CREATE INDEX periodicIdx FOR (n:Bar) ON (n.first_name, n.last_name)");
final String createConstraint = "CALL db.createUniquePropertyConstraint('uniqueConsName', ['Alpha', 'Beta'], ['foo', 'bar'], 'lucene-1.0')";
testCall(db, "CALL apoc.periodic.submit('subSchema', $createConstraint)",
Map.of("createConstraint", createConstraint),
(row) -> fail("Should fail because of unsupported schema operation"));
} catch (RuntimeException e) {
final String expected = "Failed to invoke procedure `apoc.periodic.submit`: " +
"Caused by: java.lang.RuntimeException: Supported query types for the operation are [READ_ONLY, WRITE, READ_WRITE]";
assertEquals(expected, e.getMessage());
}
}

@Test
public void testSubmitStatementWithParams() throws Exception {
String callList = "CALL apoc.periodic.list()";
Expand Down

0 comments on commit d6a3f32

Please sign in to comment.