Skip to content

Commit

Permalink
Fixes #1582: rebind flag in apoc.periodic.iterate
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Aug 2, 2022
1 parent 194ca2a commit d6d3282
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 6 deletions.
18 changes: 18 additions & 0 deletions core/src/main/java/apoc/nodes/Nodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -678,4 +678,22 @@ private int getDegreeSafe(Node node, RelationshipType relType, Direction directi
return node.getDegree(relType, direction);
}

@UserFunction("apoc.node.rebind")
@Description("apoc.node.rebind(node - to rebind a node (i.e. executing a Transaction.getNodeById(node.getId()) ")
public Node nodeRebind(@Name("node") Node node) {
return Util.rebind(tx, node);
}

@UserFunction("apoc.rel.rebind")
@Description("apoc.rel.rebind(rel) - to rebind a rel (i.e. executing a Transaction.getRelationshipById(rel.getId()) ")
public Relationship relationshipRebind(@Name("rel") Relationship rel) {
return Util.rebind(tx, rel);
}

@UserFunction("apoc.any.rebind")
@Description("apoc.any.rebind(Object) - to rebind any rel, node, path, map, list or combination of them (i.e. executing a Transaction.getNodeById(node.getId()) / Transaction.getRelationshipById(rel.getId()))")
public Object anyRebind(@Name("any") Object any) {
return Util.anyRebind(tx, any);
}

}
14 changes: 12 additions & 2 deletions core/src/main/java/apoc/periodic/Periodic.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,19 +271,29 @@ public Stream<BatchAndTotalResult> iterate(
BatchMode batchMode = BatchMode.fromConfig(config);
Map<String,Object> params = (Map<String, Object>) config.getOrDefault("params", Collections.emptyMap());

final boolean rebind = Util.toBoolean(config.get("rebind"));
if (rebind) {
params = Util.anyRebind(tx, params);
}

try (Result result = tx.execute(slottedRuntime(cypherIterate),params)) {
Pair<String,Boolean> prepared = PeriodicUtils.prepareInnerStatement(cypherAction, batchMode, result.columns(), "_batch");
final List<String> columns = result.columns();
if (rebind) {
cypherAction = Util.withMapping(columns.stream(), (c) -> "apoc.any.rebind(" + Util.quote(c) + ") AS " + Util.quote(c)) + cypherAction;
}
Pair<String,Boolean> prepared = PeriodicUtils.prepareInnerStatement(cypherAction, batchMode, columns, "_batch");
String innerStatement = applyPlanner(prepared.first(), Planner.valueOf((String) config.getOrDefault("planner", Planner.DEFAULT.name())));
boolean iterateList = prepared.other();
String periodicId = UUID.randomUUID().toString();
if (log.isDebugEnabled()) {
log.debug("Starting periodic iterate from `%s` operation using iteration `%s` in separate thread with id: `%s`", cypherIterate,cypherAction, periodicId);
}
Map<String, Object> finalParams = params;
return PeriodicUtils.iterateAndExecuteBatchedInSeparateThread(
db, terminationGuard, log, pools,
(int)batchSize, parallel, iterateList, retries, result,
(tx, p) -> {
final Result r = tx.execute(innerStatement, merge(params, p));
final Result r = tx.execute(innerStatement, merge(finalParams, p));
Iterators.count(r); // XXX: consume all results
return r.getQueryStatistics();
},
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/java/apoc/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.collections.api.iterator.LongIterator;
import org.neo4j.graphalgo.impl.util.PathImpl;
import org.neo4j.graphdb.Direction;
import org.neo4j.graphdb.Entity;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.NotInTransactionException;
import org.neo4j.graphdb.Path;
import org.neo4j.graphdb.QueryExecutionException;
import org.neo4j.graphdb.QueryExecutionType;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionTerminatedException;
import org.neo4j.internal.helpers.collection.Iterables;
import org.neo4j.internal.helpers.collection.Iterators;
import org.neo4j.internal.helpers.collection.Pair;
import org.neo4j.internal.kernel.api.procs.ProcedureCallContext;
Expand Down Expand Up @@ -849,6 +852,29 @@ public static Map<String, Object> flattenMap(Map<String, Object> map, String pre
})
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
}

public static <T> T anyRebind(Transaction tx, T any) {
if (any instanceof Map) {
return (T) ((Map<String, Object>) any).entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey(), e -> anyRebind(tx, e.getValue())));
}
if (any instanceof Path) {
final Path path = (Path) any;
PathImpl.Builder builder = new PathImpl.Builder(Util.rebind(tx, path.startNode()));
for (Relationship rel: path.relationships()) {
builder = builder.push(Util.rebind(tx, rel));
}
return (T) builder.build();
}
if (any instanceof Iterable) {
return (T) Iterables.stream((Iterable) any)
.map(i -> anyRebind(tx, i)).collect(Collectors.toList());
}
if (any instanceof Entity) {
return (T) Util.rebind(tx, (Entity) any);
}
return any;
}

public static Node rebind(Transaction tx, Node node) {
return node instanceof VirtualNode ? node : tx.getNodeById(node.getId());
Expand Down
39 changes: 39 additions & 0 deletions core/src/test/java/apoc/nodes/NodesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Path;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.internal.helpers.collection.Iterables;
import org.neo4j.internal.helpers.collection.Iterators;
import org.neo4j.test.rule.DbmsRule;
Expand Down Expand Up @@ -69,6 +71,43 @@ public void isDense() throws Exception {
"WHERE n:Foo AND dense OR n:Bar AND NOT dense RETURN count(*) as c",
(row) -> assertEquals(2L, row.get("c")));
}

@Test
public void rebind() {
TestUtil.testCall(db, "CREATE (a:Foo)-[r1:MY_REL]->(b:Bar)-[r2:ANOTHER_REL]->(c:Baz) WITH a,b,c,r1,r2 \n" +
"RETURN apoc.any.rebind({first: a, second: b, third: c, rels: [r1, r2]}) as rebind",
(row) -> {
final Map<String, Object> rebind = (Map<String, Object>) row.get("rebind");
final List<Relationship> rels = (List<Relationship>) rebind.get("rels");
final Relationship firstRel = rels.get(0);
final Relationship secondRel = rels.get(1);
assertEquals(firstRel.getStartNode(), rebind.get("first"));
assertEquals(firstRel.getEndNode(), rebind.get("second"));
assertEquals(secondRel.getStartNode(), rebind.get("second"));
assertEquals(secondRel.getEndNode(), rebind.get("third"));
});

TestUtil.testCall(db, "CREATE p1=(a:Foo)-[r1:MY_REL]->(b:Bar), p2=(:Bar)-[r2:ANOTHER_REL]->(c:Baz) \n" +
"RETURN apoc.any.rebind([p1, p2]) as rebind",
(row) -> {
final List<Path> rebindList = (List<Path>) row.get("rebind");
assertEquals(2, rebindList.size());
final Path firstPath = rebindList.get(0);
assertPath(firstPath, List.of("Foo", "Bar"), List.of("MY_REL"));
final Path secondPath = rebindList.get(1);
assertPath(secondPath, List.of("Bar", "Baz"), List.of("ANOTHER_REL"));
});
}

private void assertPath(Path rebind, List<String> labels, List<String> relTypes) {
final List<String> actualLabels = Iterables.stream(rebind.nodes())
.map(i -> i.getLabels().iterator().next())
.map(Label::name).collect(Collectors.toList());
assertEquals(labels, actualLabels);
final List<String> actualRelTypes = Iterables.stream(rebind.relationships()).map(Relationship::getType)
.map(RelationshipType::name).collect(Collectors.toList());
assertEquals(relTypes, actualRelTypes);
}

@Test
public void cycles() {
Expand Down
106 changes: 104 additions & 2 deletions core/src/test/java/apoc/periodic/PeriodicTest.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package apoc.periodic;

import apoc.cypher.Cypher;
import apoc.nodes.Nodes;
import apoc.util.MapUtil;
import apoc.util.TestUtil;
import apoc.util.Util;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.neo4j.common.DependencyResolver;
import org.neo4j.configuration.GraphDatabaseSettings;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.QueryExecutionException;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransientTransactionFailureException;
Expand All @@ -32,6 +39,7 @@
import static apoc.util.TestUtil.testCall;
import static apoc.util.TestUtil.testResult;
import static apoc.util.Util.map;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static java.util.stream.StreamSupport.stream;
import static org.hamcrest.CoreMatchers.equalTo;
Expand All @@ -49,11 +57,12 @@ public class PeriodicTest {
public static final int BATCH_SIZE = 399;

@Rule
public DbmsRule db = new ImpermanentDbmsRule();
public DbmsRule db = new ImpermanentDbmsRule()
.withSetting(GraphDatabaseSettings.procedure_unrestricted, singletonList("apoc.*"));

@Before
public void initDb() throws Exception {
TestUtil.registerProcedure(db, Periodic.class);
TestUtil.registerProcedure(db, Periodic.class, Nodes.class, Cypher.class);
db.executeTransactionally("call apoc.periodic.list() yield name call apoc.periodic.cancel(name) yield name as name2 return count(*)");
}

Expand All @@ -79,6 +88,99 @@ public void testSubmitStatement() throws Exception {
testCall(db, callList, (r) -> assertEquals(true, r.get("done")));
}

@Test
public void testSetWithoutRebindShouldFailsOnlyWithTxSetPropertyInsteadOfSetViaCypher() {
// we create 2 nodes via Transaction, and we rebind only one of them
Node firstNode;
try (Transaction tx = db.beginTx()) {
firstNode = tx.createNode(Label.label("FirstNode"));
tx.commit();
}

try (Transaction tx = db.beginTx()) {
// without rebind
firstNode.setProperty("alpha", true);
fail("Should fails due to 'The transaction has been closed.'");
tx.commit();
} catch (RuntimeException e) {
assertTrue(e.getMessage().contains("The transaction has been closed."));
}

Node secondNode;
try (Transaction tx = db.beginTx()) {
secondNode = tx.createNode(Label.label("SecondNode"));
tx.commit();
}

try (Transaction tx = db.beginTx()) {
// with rebind
secondNode = Util.rebind(tx, secondNode);
secondNode.setProperty("alpha", true);
tx.commit();
}

// try periodic and runMany procs (which open a new tx) with a set statement and without rebind
db.executeTransactionally("CREATE (:Rebind)-[r:REL_ONE]->(:Baz)");

final Map<String, Object> runManyParams = map("cypher", "SET $n.first = 1;");
testCall(db, "MATCH (n:Rebind) WITH n CALL apoc.cypher.runMany($cypher, {n: n}) YIELD row RETURN row",
runManyParams, (row) -> {});
testCall(db, "MATCH (:Rebind)-[n:REL_ONE]->(:Baz) WITH n CALL apoc.cypher.runMany($cypher, {n: n}) YIELD row RETURN row",
runManyParams, (row) -> {});

// periodic iterate
final String cypherAction = "CALL apoc.cypher.runMany(\"SET $entity.test = true;\", {entity: n}) YIELD row RETURN row";

testCall(db, "CALL apoc.periodic.iterate($cypherIterate, $cypherAction, {})",
map("cypherIterate", "MATCH (n:Rebind) RETURN n", "cypherAction", cypherAction), (row) -> {});
testCall(db, "CALL apoc.periodic.iterate($cypherIterate, $cypherAction, {})",
map("cypherIterate", "MATCH (:Rebind)-[n:REL_ONE]->(:Baz) RETURN n", "cypherAction", cypherAction), (row) -> {});

testCall(db, "MATCH (n:Rebind)-[r:REL_ONE]->(:Baz) RETURN n, r", r -> {
final Node node = (Node) r.get("n");
assertEquals(true, node.getProperty("test"));
assertEquals(1L, node.getProperty("first"));
final Relationship rel = (Relationship) r.get("r");
assertEquals(true, rel.getProperty("test"));
assertEquals(1L, rel.getProperty("first"));
});
}

@Test
public void testIterateRebind() throws Exception {

Node node;
try (Transaction tx = db.beginTx()) {
node = tx.createNode(Label.label("Ajeje"));
tx.commit();
}

final Map<String, Object> config = map("params", map("b", node));

final Map<String, Object> params = map("cypherIterate", "WITH $b as b RETURN b",
"cypherAction", "SET b.test = 1",
"config", config);
try {
testCall(db, "CALL apoc.periodic.iterate($cypherIterate, $cypherAction, $config )", params, (row) -> fail());
} catch (Exception e) {
assertTrue(e.getMessage().contains("NotInTransactionException"));
}

// with rebind flag
config.put("rebind", true);

testCall(db, "CALL apoc.periodic.iterate($cypherIterate, $cypherAction, $config )", params, (row) -> {
assertEquals(1L, row.get("batches"));
assertEquals(map(), row.get("errorMessages"));
assertEquals(0L, row.get("failedBatches"));
});

testCall(db, "MATCH (b:Ajeje) RETURN b", r -> {
assertEquals(1L, ((Node) r.get("b")).getProperty("test"));
});

}

@Test
public void testSubmitStatementWithParams() throws Exception {
String callList = "CALL apoc.periodic.list()";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
////
This file is generated by DocsTest, so don't change it!
////

= apoc.rel.rebind
:description: This section contains reference documentation for the apoc.rel.rebind function.

label:function[] label:apoc-core[]

[.emphasis]
apoc.rel.rebind(rel) - to rebind a rel (i.e. executing a Transaction.getRelationshipById(rel.getId())

== Signature

[source]
----
apoc.rel.rebind(rel :: RELATIONSHIP?) :: (RELATIONSHIP?)
----

== Input parameters
[.procedures, opts=header]
|===
| Name | Type | Default
|rel|RELATIONSHIP?|null
|===

5 changes: 5 additions & 0 deletions docs/asciidoc/modules/ROOT/pages/overview/apoc.rel/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ returns endNode for (virtual) relationships
returns id for (virtual) relationships
|label:function[]
|label:apoc-core[]
|xref::overview/apoc.rel/apoc.rel.rebind.adoc[apoc.rel.rebind icon:book[]]

apoc.rel.rebind(rel) - to rebind a rel (i.e. executing a Transaction.getRelationshipById(rel.getId())
|label:function[]
|label:apoc-core[]
|xref::overview/apoc.rel/apoc.rel.startNode.adoc[apoc.rel.startNode icon:book[]]

returns startNode for (virtual) relationships
Expand Down
Loading

0 comments on commit d6d3282

Please sign in to comment.