diff --git a/core/src/main/java/apoc/nodes/Nodes.java b/core/src/main/java/apoc/nodes/Nodes.java index e831dcecb9..ccefd342e3 100644 --- a/core/src/main/java/apoc/nodes/Nodes.java +++ b/core/src/main/java/apoc/nodes/Nodes.java @@ -678,4 +678,22 @@ private int getDegreeSafe(Node node, RelationshipType relType, Direction directi return node.getDegree(relType, direction); } + @UserFunction("apoc.node.rebind") + @Description("apoc.node.rebind(node - to rebind a node (i.e. executing a Transaction.getNodeById(node.getId()) ") + public Node nodeRebind(@Name("node") Node node) { + return Util.rebind(tx, node); + } + + @UserFunction("apoc.rel.rebind") + @Description("apoc.rel.rebind(rel) - to rebind a rel (i.e. executing a Transaction.getRelationshipById(rel.getId()) ") + public Relationship relationshipRebind(@Name("rel") Relationship rel) { + return Util.rebind(tx, rel); + } + + @UserFunction("apoc.any.rebind") + @Description("apoc.any.rebind(Object) - to rebind any rel, node, path, map, list or combination of them (i.e. executing a Transaction.getNodeById(node.getId()) / Transaction.getRelationshipById(rel.getId()))") + public Object anyRebind(@Name("any") Object any) { + return Util.anyRebind(tx, any); + } + } diff --git a/core/src/main/java/apoc/periodic/Periodic.java b/core/src/main/java/apoc/periodic/Periodic.java index ebde235d76..9e361f5608 100644 --- a/core/src/main/java/apoc/periodic/Periodic.java +++ b/core/src/main/java/apoc/periodic/Periodic.java @@ -271,19 +271,29 @@ public Stream iterate( BatchMode batchMode = BatchMode.fromConfig(config); Map params = (Map) config.getOrDefault("params", Collections.emptyMap()); + final boolean rebind = Util.toBoolean(config.get("rebind")); + if (rebind) { + params = Util.anyRebind(tx, params); + } + try (Result result = tx.execute(slottedRuntime(cypherIterate),params)) { - Pair prepared = PeriodicUtils.prepareInnerStatement(cypherAction, batchMode, result.columns(), "_batch"); + final List columns = result.columns(); + if (rebind) { + cypherAction = Util.withMapping(columns.stream(), (c) -> "apoc.any.rebind(" + Util.quote(c) + ") AS " + Util.quote(c)) + cypherAction; + } + Pair prepared = PeriodicUtils.prepareInnerStatement(cypherAction, batchMode, columns, "_batch"); String innerStatement = applyPlanner(prepared.first(), Planner.valueOf((String) config.getOrDefault("planner", Planner.DEFAULT.name()))); boolean iterateList = prepared.other(); String periodicId = UUID.randomUUID().toString(); if (log.isDebugEnabled()) { log.debug("Starting periodic iterate from `%s` operation using iteration `%s` in separate thread with id: `%s`", cypherIterate,cypherAction, periodicId); } + Map finalParams = params; return PeriodicUtils.iterateAndExecuteBatchedInSeparateThread( db, terminationGuard, log, pools, (int)batchSize, parallel, iterateList, retries, result, (tx, p) -> { - final Result r = tx.execute(innerStatement, merge(params, p)); + final Result r = tx.execute(innerStatement, merge(finalParams, p)); Iterators.count(r); // XXX: consume all results return r.getQueryStatistics(); }, diff --git a/core/src/main/java/apoc/util/Util.java b/core/src/main/java/apoc/util/Util.java index d06eac91ca..0014fc4048 100644 --- a/core/src/main/java/apoc/util/Util.java +++ b/core/src/main/java/apoc/util/Util.java @@ -10,12 +10,14 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.eclipse.collections.api.iterator.LongIterator; +import org.neo4j.graphalgo.impl.util.PathImpl; import org.neo4j.graphdb.Direction; import org.neo4j.graphdb.Entity; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.Label; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.NotInTransactionException; +import org.neo4j.graphdb.Path; import org.neo4j.graphdb.QueryExecutionException; import org.neo4j.graphdb.QueryExecutionType; import org.neo4j.graphdb.Relationship; @@ -23,6 +25,7 @@ import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.TransactionTerminatedException; +import org.neo4j.internal.helpers.collection.Iterables; import org.neo4j.internal.helpers.collection.Iterators; import org.neo4j.internal.helpers.collection.Pair; import org.neo4j.internal.kernel.api.procs.ProcedureCallContext; @@ -849,6 +852,29 @@ public static Map flattenMap(Map map, String pre }) .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); } + + public static T anyRebind(Transaction tx, T any) { + if (any instanceof Map) { + return (T) ((Map) any).entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey(), e -> anyRebind(tx, e.getValue()))); + } + if (any instanceof Path) { + final Path path = (Path) any; + PathImpl.Builder builder = new PathImpl.Builder(Util.rebind(tx, path.startNode())); + for (Relationship rel: path.relationships()) { + builder = builder.push(Util.rebind(tx, rel)); + } + return (T) builder.build(); + } + if (any instanceof Iterable) { + return (T) Iterables.stream((Iterable) any) + .map(i -> anyRebind(tx, i)).collect(Collectors.toList()); + } + if (any instanceof Entity) { + return (T) Util.rebind(tx, (Entity) any); + } + return any; + } public static Node rebind(Transaction tx, Node node) { return node instanceof VirtualNode ? node : tx.getNodeById(node.getId()); diff --git a/core/src/test/java/apoc/nodes/NodesTest.java b/core/src/test/java/apoc/nodes/NodesTest.java index 9543c9985b..1430f1d497 100644 --- a/core/src/test/java/apoc/nodes/NodesTest.java +++ b/core/src/test/java/apoc/nodes/NodesTest.java @@ -12,6 +12,8 @@ import org.neo4j.graphdb.Label; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Path; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.RelationshipType; import org.neo4j.internal.helpers.collection.Iterables; import org.neo4j.internal.helpers.collection.Iterators; import org.neo4j.test.rule.DbmsRule; @@ -69,6 +71,43 @@ public void isDense() throws Exception { "WHERE n:Foo AND dense OR n:Bar AND NOT dense RETURN count(*) as c", (row) -> assertEquals(2L, row.get("c"))); } + + @Test + public void rebind() { + TestUtil.testCall(db, "CREATE (a:Foo)-[r1:MY_REL]->(b:Bar)-[r2:ANOTHER_REL]->(c:Baz) WITH a,b,c,r1,r2 \n" + + "RETURN apoc.any.rebind({first: a, second: b, third: c, rels: [r1, r2]}) as rebind", + (row) -> { + final Map rebind = (Map) row.get("rebind"); + final List rels = (List) rebind.get("rels"); + final Relationship firstRel = rels.get(0); + final Relationship secondRel = rels.get(1); + assertEquals(firstRel.getStartNode(), rebind.get("first")); + assertEquals(firstRel.getEndNode(), rebind.get("second")); + assertEquals(secondRel.getStartNode(), rebind.get("second")); + assertEquals(secondRel.getEndNode(), rebind.get("third")); + }); + + TestUtil.testCall(db, "CREATE p1=(a:Foo)-[r1:MY_REL]->(b:Bar), p2=(:Bar)-[r2:ANOTHER_REL]->(c:Baz) \n" + + "RETURN apoc.any.rebind([p1, p2]) as rebind", + (row) -> { + final List rebindList = (List) row.get("rebind"); + assertEquals(2, rebindList.size()); + final Path firstPath = rebindList.get(0); + assertPath(firstPath, List.of("Foo", "Bar"), List.of("MY_REL")); + final Path secondPath = rebindList.get(1); + assertPath(secondPath, List.of("Bar", "Baz"), List.of("ANOTHER_REL")); + }); + } + + private void assertPath(Path rebind, List labels, List relTypes) { + final List actualLabels = Iterables.stream(rebind.nodes()) + .map(i -> i.getLabels().iterator().next()) + .map(Label::name).collect(Collectors.toList()); + assertEquals(labels, actualLabels); + final List actualRelTypes = Iterables.stream(rebind.relationships()).map(Relationship::getType) + .map(RelationshipType::name).collect(Collectors.toList()); + assertEquals(relTypes, actualRelTypes); + } @Test public void cycles() { diff --git a/core/src/test/java/apoc/periodic/PeriodicTest.java b/core/src/test/java/apoc/periodic/PeriodicTest.java index df9732340d..824559adeb 100644 --- a/core/src/test/java/apoc/periodic/PeriodicTest.java +++ b/core/src/test/java/apoc/periodic/PeriodicTest.java @@ -1,12 +1,19 @@ package apoc.periodic; +import apoc.cypher.Cypher; +import apoc.nodes.Nodes; import apoc.util.MapUtil; import apoc.util.TestUtil; +import apoc.util.Util; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.neo4j.common.DependencyResolver; +import org.neo4j.configuration.GraphDatabaseSettings; +import org.neo4j.graphdb.Label; +import org.neo4j.graphdb.Node; import org.neo4j.graphdb.QueryExecutionException; +import org.neo4j.graphdb.Relationship; import org.neo4j.graphdb.Result; import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.TransientTransactionFailureException; @@ -32,6 +39,7 @@ import static apoc.util.TestUtil.testCall; import static apoc.util.TestUtil.testResult; import static apoc.util.Util.map; +import static java.util.Collections.singletonList; import static java.util.stream.Collectors.toList; import static java.util.stream.StreamSupport.stream; import static org.hamcrest.CoreMatchers.equalTo; @@ -49,11 +57,12 @@ public class PeriodicTest { public static final int BATCH_SIZE = 399; @Rule - public DbmsRule db = new ImpermanentDbmsRule(); + public DbmsRule db = new ImpermanentDbmsRule() + .withSetting(GraphDatabaseSettings.procedure_unrestricted, singletonList("apoc.*")); @Before public void initDb() throws Exception { - TestUtil.registerProcedure(db, Periodic.class); + TestUtil.registerProcedure(db, Periodic.class, Nodes.class, Cypher.class); db.executeTransactionally("call apoc.periodic.list() yield name call apoc.periodic.cancel(name) yield name as name2 return count(*)"); } @@ -79,6 +88,99 @@ public void testSubmitStatement() throws Exception { testCall(db, callList, (r) -> assertEquals(true, r.get("done"))); } + @Test + public void testSetWithoutRebindShouldFailsOnlyWithTxSetPropertyInsteadOfSetViaCypher() { + // we create 2 nodes via Transaction, and we rebind only one of them + Node firstNode; + try (Transaction tx = db.beginTx()) { + firstNode = tx.createNode(Label.label("FirstNode")); + tx.commit(); + } + + try (Transaction tx = db.beginTx()) { + // without rebind + firstNode.setProperty("alpha", true); + fail("Should fails due to 'The transaction has been closed.'"); + tx.commit(); + } catch (RuntimeException e) { + assertTrue(e.getMessage().contains("The transaction has been closed.")); + } + + Node secondNode; + try (Transaction tx = db.beginTx()) { + secondNode = tx.createNode(Label.label("SecondNode")); + tx.commit(); + } + + try (Transaction tx = db.beginTx()) { + // with rebind + secondNode = Util.rebind(tx, secondNode); + secondNode.setProperty("alpha", true); + tx.commit(); + } + + // try periodic and runMany procs (which open a new tx) with a set statement and without rebind + db.executeTransactionally("CREATE (:Rebind)-[r:REL_ONE]->(:Baz)"); + + final Map runManyParams = map("cypher", "SET $n.first = 1;"); + testCall(db, "MATCH (n:Rebind) WITH n CALL apoc.cypher.runMany($cypher, {n: n}) YIELD row RETURN row", + runManyParams, (row) -> {}); + testCall(db, "MATCH (:Rebind)-[n:REL_ONE]->(:Baz) WITH n CALL apoc.cypher.runMany($cypher, {n: n}) YIELD row RETURN row", + runManyParams, (row) -> {}); + + // periodic iterate + final String cypherAction = "CALL apoc.cypher.runMany(\"SET $entity.test = true;\", {entity: n}) YIELD row RETURN row"; + + testCall(db, "CALL apoc.periodic.iterate($cypherIterate, $cypherAction, {})", + map("cypherIterate", "MATCH (n:Rebind) RETURN n", "cypherAction", cypherAction), (row) -> {}); + testCall(db, "CALL apoc.periodic.iterate($cypherIterate, $cypherAction, {})", + map("cypherIterate", "MATCH (:Rebind)-[n:REL_ONE]->(:Baz) RETURN n", "cypherAction", cypherAction), (row) -> {}); + + testCall(db, "MATCH (n:Rebind)-[r:REL_ONE]->(:Baz) RETURN n, r", r -> { + final Node node = (Node) r.get("n"); + assertEquals(true, node.getProperty("test")); + assertEquals(1L, node.getProperty("first")); + final Relationship rel = (Relationship) r.get("r"); + assertEquals(true, rel.getProperty("test")); + assertEquals(1L, rel.getProperty("first")); + }); + } + + @Test + public void testIterateRebind() throws Exception { + + Node node; + try (Transaction tx = db.beginTx()) { + node = tx.createNode(Label.label("Ajeje")); + tx.commit(); + } + + final Map config = map("params", map("b", node)); + + final Map params = map("cypherIterate", "WITH $b as b RETURN b", + "cypherAction", "SET b.test = 1", + "config", config); + try { + testCall(db, "CALL apoc.periodic.iterate($cypherIterate, $cypherAction, $config )", params, (row) -> fail()); + } catch (Exception e) { + assertTrue(e.getMessage().contains("NotInTransactionException")); + } + + // with rebind flag + config.put("rebind", true); + + testCall(db, "CALL apoc.periodic.iterate($cypherIterate, $cypherAction, $config )", params, (row) -> { + assertEquals(1L, row.get("batches")); + assertEquals(map(), row.get("errorMessages")); + assertEquals(0L, row.get("failedBatches")); + }); + + testCall(db, "MATCH (b:Ajeje) RETURN b", r -> { + assertEquals(1L, ((Node) r.get("b")).getProperty("test")); + }); + + } + @Test public void testSubmitStatementWithParams() throws Exception { String callList = "CALL apoc.periodic.list()"; diff --git a/docs/asciidoc/modules/ROOT/pages/overview/apoc.rel/apoc.rel.rebind.adoc b/docs/asciidoc/modules/ROOT/pages/overview/apoc.rel/apoc.rel.rebind.adoc new file mode 100644 index 0000000000..41d0ae5cec --- /dev/null +++ b/docs/asciidoc/modules/ROOT/pages/overview/apoc.rel/apoc.rel.rebind.adoc @@ -0,0 +1,26 @@ +//// +This file is generated by DocsTest, so don't change it! +//// + += apoc.rel.rebind +:description: This section contains reference documentation for the apoc.rel.rebind function. + +label:function[] label:apoc-core[] + +[.emphasis] +apoc.rel.rebind(rel) - to rebind a rel (i.e. executing a Transaction.getRelationshipById(rel.getId()) + +== Signature + +[source] +---- +apoc.rel.rebind(rel :: RELATIONSHIP?) :: (RELATIONSHIP?) +---- + +== Input parameters +[.procedures, opts=header] +|=== +| Name | Type | Default +|rel|RELATIONSHIP?|null +|=== + diff --git a/docs/asciidoc/modules/ROOT/pages/overview/apoc.rel/index.adoc b/docs/asciidoc/modules/ROOT/pages/overview/apoc.rel/index.adoc index 58d5fccaf5..b39f1f3bb3 100644 --- a/docs/asciidoc/modules/ROOT/pages/overview/apoc.rel/index.adoc +++ b/docs/asciidoc/modules/ROOT/pages/overview/apoc.rel/index.adoc @@ -18,6 +18,11 @@ returns endNode for (virtual) relationships returns id for (virtual) relationships |label:function[] |label:apoc-core[] +|xref::overview/apoc.rel/apoc.rel.rebind.adoc[apoc.rel.rebind icon:book[]] + +apoc.rel.rebind(rel) - to rebind a rel (i.e. executing a Transaction.getRelationshipById(rel.getId()) +|label:function[] +|label:apoc-core[] |xref::overview/apoc.rel/apoc.rel.startNode.adoc[apoc.rel.startNode icon:book[]] returns startNode for (virtual) relationships diff --git a/full/src/test/java/apoc/periodic/PeriodicExtendedTest.java b/full/src/test/java/apoc/periodic/PeriodicExtendedTest.java index 284f6912cb..24cb25302c 100644 --- a/full/src/test/java/apoc/periodic/PeriodicExtendedTest.java +++ b/full/src/test/java/apoc/periodic/PeriodicExtendedTest.java @@ -1,6 +1,9 @@ package apoc.periodic; +import apoc.create.Create; import apoc.load.Jdbc; +import apoc.nlp.gcp.GCPProcedures; +import apoc.nodes.Nodes; import apoc.util.TestUtil; import org.junit.Before; import org.junit.Rule; @@ -15,22 +18,78 @@ import org.neo4j.test.rule.ImpermanentDbmsRule; import java.sql.SQLException; +import java.util.Collections; import java.util.Map; +import java.util.function.Consumer; import static apoc.util.TestUtil.testCall; +import static apoc.util.TestUtil.testCallCount; import static apoc.util.TestUtil.testResult; import static apoc.util.Util.map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.neo4j.configuration.GraphDatabaseSettings.procedure_unrestricted; public class PeriodicExtendedTest { @Rule - public DbmsRule db = new ImpermanentDbmsRule(); + public DbmsRule db = new ImpermanentDbmsRule() + .withSetting(procedure_unrestricted, Collections.singletonList("apoc.*")); @Before public void initDb() { - TestUtil.registerProcedure(db, Periodic.class, PeriodicExtended.class, Jdbc.class); + TestUtil.registerProcedure(db, Periodic.class, GCPProcedures.class, Create.class, Nodes.class, PeriodicExtended.class, Jdbc.class); + } + + @Test + public void testRebindWithNlpWriteProcedure() { + // use case: https://community.neo4j.com/t5/neo4j-graph-platform/use-of-apoc-periodic-iterate-with-apoc-nlp-gcp-classify-graph/m-p/56846#M33854 + final Map conf = map("rebind", true); + final String iterate = "MATCH (node:Article) RETURN node"; + final String action = "CALL apoc.nlp.gcp.classify.graph(node, $nlpConf) YIELD graph RETURN null"; + + testRebindCommon(conf, iterate, action, 2, r -> assertEquals(Collections.emptyMap(), r.get("errorMessages"))); + + testRebindCommon(map(), iterate, action, 0, this::assertNodeDeletedErr); + } + + @Test + public void testManualRebindWithNlpWriteProcedure() { + final String iterate = "MATCH (node:Article) RETURN id(node) AS id"; + final String action = "MATCH (node) WHERE id(node) = id CALL apoc.nlp.gcp.classify.graph(node, $nlpConf) YIELD graph RETURN null"; + + testRebindCommon(map(), iterate, action, 2, r -> assertEquals(Collections.emptyMap(), r.get("errorMessages"))); + } + + @Test + public void testRebindWithMapIterationAndCreateRelationshipProcedure() { + final Map conf = map("rebind", true); + final String iterate = "MATCH (art:Article) RETURN {key: art, key2: 'another'} as map"; + final String action = "CREATE (node:Category) with map.key as art, node call apoc.create.relationship(art, 'CATEGORY', {b: 1}, node) yield rel return rel"; + + testRebindCommon(conf, iterate, action, 1, r -> assertEquals(Collections.emptyMap(), r.get("errorMessages"))); + + testRebindCommon(map(), iterate, action, 0, this::assertNodeDeletedErr); + } + + private void assertNodeDeletedErr(Map r) { + assertTrue(((Map) r.get("errorMessages")) + .keySet().stream() + .anyMatch(k -> k.matches("Node\\[\\d+] is deleted and cannot be used to create a relationship"))); + } + + private void testRebindCommon(Map conf, String iterate, String action, int expected, Consumer> assertions) { + final Map nlpConf = map("key", "myKey", "nodeProperty", "content", "write", true, "unsupportedDummyClient", true); + final Map baseConf = map("params", map("nlpConf", nlpConf)); + baseConf.putAll(conf); + db.executeTransactionally("CREATE (:Article {content: 'contentBody'})"); + testCall(db,"CALL apoc.periodic.iterate($iterate, $action, $config)", + map( "iterate" , iterate, "action", action, "config", baseConf), + assertions); + + testCallCount(db, "MATCH p=(:Category)<-[:CATEGORY]-(:Article) RETURN p", expected); + db.executeTransactionally("MATCH (n) DETACH DELETE n"); } @Test