Skip to content

Commit

Permalink
[hDJkPkOQ] Fixes neo4j/apoc#126: apoc.periodic.submit fails with sche…
Browse files Browse the repository at this point in the history
…ma operations (extended) (#3211) (#3508) (#3691)
  • Loading branch information
vga91 authored Jul 27, 2023
1 parent b9ed5fa commit f0c71dd
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 1 deletion.
15 changes: 14 additions & 1 deletion full/src/main/java/apoc/periodic/PeriodicExtended.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.neo4j.graphdb.QueryExecutionType.QueryType.READ_ONLY;
import static org.neo4j.graphdb.QueryExecutionType.QueryType.READ_WRITE;
import static org.neo4j.graphdb.QueryExecutionType.QueryType.SCHEMA_WRITE;
import static org.neo4j.graphdb.QueryExecutionType.QueryType.WRITE;

@Extended
public class PeriodicExtended {
@Context public GraphDatabaseService db;
Expand All @@ -51,8 +56,16 @@ private void recordError(Map<String, Long> executionErrors, Exception e) {
executionErrors.compute(msg, (s, i) -> i == null ? 1 : i + 1);
}

@Procedure(mode = Mode.SCHEMA)
@Description("apoc.periodic.submitSchema(name, statement, $config) - equivalent to apoc.periodic.submit which can also accept schema operations")
public Stream<Periodic.JobInfo> submitSchema(@Name("name") String name, @Name("statement") String statement, @Name(value = "params", defaultValue = "{}") Map<String,Object> config) {
validateQuery(statement);
return PeriodicUtils.submitProc(name, statement, config, db, log, pools);
}

private void validateQuery(String statement) {
db.executeTransactionally("EXPLAIN " + statement);
Util.validateQuery(db, statement,
READ_ONLY, WRITE, READ_WRITE, SCHEMA_WRITE);
}

/**
Expand Down
1 change: 1 addition & 0 deletions full/src/main/resources/extended.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ apoc.nlp.gcp.entities.graph
apoc.nlp.gcp.entities.stream
apoc.periodic.rock_n_roll
apoc.periodic.rock_n_roll_while
apoc.periodic.submitSchema
apoc.redis.append
apoc.redis.configGet
apoc.redis.configSet
Expand Down
40 changes: 40 additions & 0 deletions full/src/test/java/apoc/periodic/PeriodicExtendedTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static apoc.util.TestUtil.testCall;
Expand All @@ -43,6 +44,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.test.assertion.Assert.assertEventually;

public class PeriodicExtendedTest {

Expand All @@ -53,6 +55,44 @@ public class PeriodicExtendedTest {
public void initDb() {
TestUtil.registerProcedure(db, Periodic.class, NodesExtended.class, GCPProcedures.class, Create.class, PeriodicExtended.class, Jdbc.class);
}

@Test
public void testSubmitSchema() {
testCall(db, "CALL apoc.periodic.submitSchema('subSchema','CREATE INDEX periodicIdx FOR (n:Bar) ON (n.first_name, n.last_name)')",
(row) -> {
assertEquals("subSchema", row.get("name"));
assertEquals(false, row.get("done"));
});

assertEventually(() -> db.executeTransactionally("SHOW INDEXES YIELD name WHERE name = 'periodicIdx' RETURN count(*) AS count",
Collections.emptyMap(),
(res) -> res.<Long>columnAs("count").next()),
val -> val == 1L, 15L, TimeUnit.SECONDS);

testCall(db, "CALL apoc.periodic.list()", (row) -> {
assertEquals("subSchema", row.get("name"));
assertEquals(true, row.get("done"));
});
}

@Test
public void testSubmitSchemaWithWriteOperation() {
testCall(db, "CALL apoc.periodic.submitSchema('subSchema','CREATE (:SchemaLabel)')",
(row) -> {
assertEquals("subSchema", row.get("name"));
assertEquals(false, row.get("done"));
});

assertEventually(() -> db.executeTransactionally("MATCH (n:SchemaLabel) RETURN count(n) AS count",
Collections.emptyMap(),
(res) -> res.<Long>columnAs("count").next()),
val -> val == 1L, 15L, TimeUnit.SECONDS);

testCall(db, "CALL apoc.periodic.list()", (row) -> {
assertEquals("subSchema", row.get("name"));
assertEquals(true, row.get("done"));
});
}

@Test
public void testRebindWithNlpWriteProcedure() {
Expand Down

0 comments on commit f0c71dd

Please sign in to comment.