diff --git a/full/src/main/java/apoc/periodic/PeriodicExtended.java b/full/src/main/java/apoc/periodic/PeriodicExtended.java index 63ca705616..035d92589b 100644 --- a/full/src/main/java/apoc/periodic/PeriodicExtended.java +++ b/full/src/main/java/apoc/periodic/PeriodicExtended.java @@ -37,6 +37,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.neo4j.graphdb.QueryExecutionType.QueryType.READ_ONLY; +import static org.neo4j.graphdb.QueryExecutionType.QueryType.READ_WRITE; +import static org.neo4j.graphdb.QueryExecutionType.QueryType.SCHEMA_WRITE; +import static org.neo4j.graphdb.QueryExecutionType.QueryType.WRITE; + @Extended public class PeriodicExtended { @Context public GraphDatabaseService db; @@ -51,8 +56,16 @@ private void recordError(Map executionErrors, Exception e) { executionErrors.compute(msg, (s, i) -> i == null ? 1 : i + 1); } + @Procedure(mode = Mode.SCHEMA) + @Description("apoc.periodic.submitSchema(name, statement, $config) - equivalent to apoc.periodic.submit which can also accept schema operations") + public Stream submitSchema(@Name("name") String name, @Name("statement") String statement, @Name(value = "params", defaultValue = "{}") Map config) { + validateQuery(statement); + return PeriodicUtils.submitProc(name, statement, config, db, log, pools); + } + private void validateQuery(String statement) { - db.executeTransactionally("EXPLAIN " + statement); + Util.validateQuery(db, statement, + READ_ONLY, WRITE, READ_WRITE, SCHEMA_WRITE); } /** diff --git a/full/src/main/resources/extended.txt b/full/src/main/resources/extended.txt index 7efc8a46c2..46dc107fc6 100644 --- a/full/src/main/resources/extended.txt +++ b/full/src/main/resources/extended.txt @@ -119,6 +119,7 @@ apoc.nlp.gcp.entities.graph apoc.nlp.gcp.entities.stream apoc.periodic.rock_n_roll apoc.periodic.rock_n_roll_while +apoc.periodic.submitSchema apoc.redis.append apoc.redis.configGet apoc.redis.configSet diff --git a/full/src/test/java/apoc/periodic/PeriodicExtendedTest.java b/full/src/test/java/apoc/periodic/PeriodicExtendedTest.java index 72bf44ef33..e08171941a 100644 --- a/full/src/test/java/apoc/periodic/PeriodicExtendedTest.java +++ b/full/src/test/java/apoc/periodic/PeriodicExtendedTest.java @@ -34,6 +34,7 @@ import java.sql.SQLException; import java.util.Collections; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static apoc.util.TestUtil.testCall; @@ -43,6 +44,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.neo4j.test.assertion.Assert.assertEventually; public class PeriodicExtendedTest { @@ -53,6 +55,44 @@ public class PeriodicExtendedTest { public void initDb() { TestUtil.registerProcedure(db, Periodic.class, NodesExtended.class, GCPProcedures.class, Create.class, PeriodicExtended.class, Jdbc.class); } + + @Test + public void testSubmitSchema() { + testCall(db, "CALL apoc.periodic.submitSchema('subSchema','CREATE INDEX periodicIdx FOR (n:Bar) ON (n.first_name, n.last_name)')", + (row) -> { + assertEquals("subSchema", row.get("name")); + assertEquals(false, row.get("done")); + }); + + assertEventually(() -> db.executeTransactionally("SHOW INDEXES YIELD name WHERE name = 'periodicIdx' RETURN count(*) AS count", + Collections.emptyMap(), + (res) -> res.columnAs("count").next()), + val -> val == 1L, 15L, TimeUnit.SECONDS); + + testCall(db, "CALL apoc.periodic.list()", (row) -> { + assertEquals("subSchema", row.get("name")); + assertEquals(true, row.get("done")); + }); + } + + @Test + public void testSubmitSchemaWithWriteOperation() { + testCall(db, "CALL apoc.periodic.submitSchema('subSchema','CREATE (:SchemaLabel)')", + (row) -> { + assertEquals("subSchema", row.get("name")); + assertEquals(false, row.get("done")); + }); + + assertEventually(() -> db.executeTransactionally("MATCH (n:SchemaLabel) RETURN count(n) AS count", + Collections.emptyMap(), + (res) -> res.columnAs("count").next()), + val -> val == 1L, 15L, TimeUnit.SECONDS); + + testCall(db, "CALL apoc.periodic.list()", (row) -> { + assertEquals("subSchema", row.get("name")); + assertEquals(true, row.get("done")); + }); + } @Test public void testRebindWithNlpWriteProcedure() {