Skip to content

Commit

Permalink
moved in full
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Aug 26, 2022
1 parent d6d3282 commit c2978fa
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 219 deletions.
18 changes: 0 additions & 18 deletions core/src/main/java/apoc/nodes/Nodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -678,22 +678,4 @@ private int getDegreeSafe(Node node, RelationshipType relType, Direction directi
return node.getDegree(relType, direction);
}

@UserFunction("apoc.node.rebind")
@Description("apoc.node.rebind(node - to rebind a node (i.e. executing a Transaction.getNodeById(node.getId()) ")
public Node nodeRebind(@Name("node") Node node) {
return Util.rebind(tx, node);
}

@UserFunction("apoc.rel.rebind")
@Description("apoc.rel.rebind(rel) - to rebind a rel (i.e. executing a Transaction.getRelationshipById(rel.getId()) ")
public Relationship relationshipRebind(@Name("rel") Relationship rel) {
return Util.rebind(tx, rel);
}

@UserFunction("apoc.any.rebind")
@Description("apoc.any.rebind(Object) - to rebind any rel, node, path, map, list or combination of them (i.e. executing a Transaction.getNodeById(node.getId()) / Transaction.getRelationshipById(rel.getId()))")
public Object anyRebind(@Name("any") Object any) {
return Util.anyRebind(tx, any);
}

}
14 changes: 2 additions & 12 deletions core/src/main/java/apoc/periodic/Periodic.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,29 +271,19 @@ public Stream<BatchAndTotalResult> iterate(
BatchMode batchMode = BatchMode.fromConfig(config);
Map<String,Object> params = (Map<String, Object>) config.getOrDefault("params", Collections.emptyMap());

final boolean rebind = Util.toBoolean(config.get("rebind"));
if (rebind) {
params = Util.anyRebind(tx, params);
}

try (Result result = tx.execute(slottedRuntime(cypherIterate),params)) {
final List<String> columns = result.columns();
if (rebind) {
cypherAction = Util.withMapping(columns.stream(), (c) -> "apoc.any.rebind(" + Util.quote(c) + ") AS " + Util.quote(c)) + cypherAction;
}
Pair<String,Boolean> prepared = PeriodicUtils.prepareInnerStatement(cypherAction, batchMode, columns, "_batch");
Pair<String,Boolean> prepared = PeriodicUtils.prepareInnerStatement(cypherAction, batchMode, result.columns(), "_batch");
String innerStatement = applyPlanner(prepared.first(), Planner.valueOf((String) config.getOrDefault("planner", Planner.DEFAULT.name())));
boolean iterateList = prepared.other();
String periodicId = UUID.randomUUID().toString();
if (log.isDebugEnabled()) {
log.debug("Starting periodic iterate from `%s` operation using iteration `%s` in separate thread with id: `%s`", cypherIterate,cypherAction, periodicId);
}
Map<String, Object> finalParams = params;
return PeriodicUtils.iterateAndExecuteBatchedInSeparateThread(
db, terminationGuard, log, pools,
(int)batchSize, parallel, iterateList, retries, result,
(tx, p) -> {
final Result r = tx.execute(innerStatement, merge(finalParams, p));
final Result r = tx.execute(innerStatement, merge(params, p));
Iterators.count(r); // XXX: consume all results
return r.getQueryStatistics();
},
Expand Down
23 changes: 0 additions & 23 deletions core/src/main/java/apoc/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -852,29 +852,6 @@ public static Map<String, Object> flattenMap(Map<String, Object> map, String pre
})
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
}

public static <T> T anyRebind(Transaction tx, T any) {
if (any instanceof Map) {
return (T) ((Map<String, Object>) any).entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey(), e -> anyRebind(tx, e.getValue())));
}
if (any instanceof Path) {
final Path path = (Path) any;
PathImpl.Builder builder = new PathImpl.Builder(Util.rebind(tx, path.startNode()));
for (Relationship rel: path.relationships()) {
builder = builder.push(Util.rebind(tx, rel));
}
return (T) builder.build();
}
if (any instanceof Iterable) {
return (T) Iterables.stream((Iterable) any)
.map(i -> anyRebind(tx, i)).collect(Collectors.toList());
}
if (any instanceof Entity) {
return (T) Util.rebind(tx, (Entity) any);
}
return any;
}

public static Node rebind(Transaction tx, Node node) {
return node instanceof VirtualNode ? node : tx.getNodeById(node.getId());
Expand Down
39 changes: 0 additions & 39 deletions core/src/test/java/apoc/nodes/NodesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Path;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.internal.helpers.collection.Iterables;
import org.neo4j.internal.helpers.collection.Iterators;
import org.neo4j.test.rule.DbmsRule;
Expand Down Expand Up @@ -71,43 +69,6 @@ public void isDense() throws Exception {
"WHERE n:Foo AND dense OR n:Bar AND NOT dense RETURN count(*) as c",
(row) -> assertEquals(2L, row.get("c")));
}

@Test
public void rebind() {
TestUtil.testCall(db, "CREATE (a:Foo)-[r1:MY_REL]->(b:Bar)-[r2:ANOTHER_REL]->(c:Baz) WITH a,b,c,r1,r2 \n" +
"RETURN apoc.any.rebind({first: a, second: b, third: c, rels: [r1, r2]}) as rebind",
(row) -> {
final Map<String, Object> rebind = (Map<String, Object>) row.get("rebind");
final List<Relationship> rels = (List<Relationship>) rebind.get("rels");
final Relationship firstRel = rels.get(0);
final Relationship secondRel = rels.get(1);
assertEquals(firstRel.getStartNode(), rebind.get("first"));
assertEquals(firstRel.getEndNode(), rebind.get("second"));
assertEquals(secondRel.getStartNode(), rebind.get("second"));
assertEquals(secondRel.getEndNode(), rebind.get("third"));
});

TestUtil.testCall(db, "CREATE p1=(a:Foo)-[r1:MY_REL]->(b:Bar), p2=(:Bar)-[r2:ANOTHER_REL]->(c:Baz) \n" +
"RETURN apoc.any.rebind([p1, p2]) as rebind",
(row) -> {
final List<Path> rebindList = (List<Path>) row.get("rebind");
assertEquals(2, rebindList.size());
final Path firstPath = rebindList.get(0);
assertPath(firstPath, List.of("Foo", "Bar"), List.of("MY_REL"));
final Path secondPath = rebindList.get(1);
assertPath(secondPath, List.of("Bar", "Baz"), List.of("ANOTHER_REL"));
});
}

private void assertPath(Path rebind, List<String> labels, List<String> relTypes) {
final List<String> actualLabels = Iterables.stream(rebind.nodes())
.map(i -> i.getLabels().iterator().next())
.map(Label::name).collect(Collectors.toList());
assertEquals(labels, actualLabels);
final List<String> actualRelTypes = Iterables.stream(rebind.relationships()).map(Relationship::getType)
.map(RelationshipType::name).collect(Collectors.toList());
assertEquals(relTypes, actualRelTypes);
}

@Test
public void cycles() {
Expand Down
106 changes: 2 additions & 104 deletions core/src/test/java/apoc/periodic/PeriodicTest.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
package apoc.periodic;

import apoc.cypher.Cypher;
import apoc.nodes.Nodes;
import apoc.util.MapUtil;
import apoc.util.TestUtil;
import apoc.util.Util;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.neo4j.common.DependencyResolver;
import org.neo4j.configuration.GraphDatabaseSettings;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.QueryExecutionException;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransientTransactionFailureException;
Expand All @@ -39,7 +32,6 @@
import static apoc.util.TestUtil.testCall;
import static apoc.util.TestUtil.testResult;
import static apoc.util.Util.map;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static java.util.stream.StreamSupport.stream;
import static org.hamcrest.CoreMatchers.equalTo;
Expand All @@ -57,12 +49,11 @@ public class PeriodicTest {
public static final int BATCH_SIZE = 399;

@Rule
public DbmsRule db = new ImpermanentDbmsRule()
.withSetting(GraphDatabaseSettings.procedure_unrestricted, singletonList("apoc.*"));
public DbmsRule db = new ImpermanentDbmsRule();

@Before
public void initDb() throws Exception {
TestUtil.registerProcedure(db, Periodic.class, Nodes.class, Cypher.class);
TestUtil.registerProcedure(db, Periodic.class);
db.executeTransactionally("call apoc.periodic.list() yield name call apoc.periodic.cancel(name) yield name as name2 return count(*)");
}

Expand All @@ -88,99 +79,6 @@ public void testSubmitStatement() throws Exception {
testCall(db, callList, (r) -> assertEquals(true, r.get("done")));
}

@Test
public void testSetWithoutRebindShouldFailsOnlyWithTxSetPropertyInsteadOfSetViaCypher() {
// we create 2 nodes via Transaction, and we rebind only one of them
Node firstNode;
try (Transaction tx = db.beginTx()) {
firstNode = tx.createNode(Label.label("FirstNode"));
tx.commit();
}

try (Transaction tx = db.beginTx()) {
// without rebind
firstNode.setProperty("alpha", true);
fail("Should fails due to 'The transaction has been closed.'");
tx.commit();
} catch (RuntimeException e) {
assertTrue(e.getMessage().contains("The transaction has been closed."));
}

Node secondNode;
try (Transaction tx = db.beginTx()) {
secondNode = tx.createNode(Label.label("SecondNode"));
tx.commit();
}

try (Transaction tx = db.beginTx()) {
// with rebind
secondNode = Util.rebind(tx, secondNode);
secondNode.setProperty("alpha", true);
tx.commit();
}

// try periodic and runMany procs (which open a new tx) with a set statement and without rebind
db.executeTransactionally("CREATE (:Rebind)-[r:REL_ONE]->(:Baz)");

final Map<String, Object> runManyParams = map("cypher", "SET $n.first = 1;");
testCall(db, "MATCH (n:Rebind) WITH n CALL apoc.cypher.runMany($cypher, {n: n}) YIELD row RETURN row",
runManyParams, (row) -> {});
testCall(db, "MATCH (:Rebind)-[n:REL_ONE]->(:Baz) WITH n CALL apoc.cypher.runMany($cypher, {n: n}) YIELD row RETURN row",
runManyParams, (row) -> {});

// periodic iterate
final String cypherAction = "CALL apoc.cypher.runMany(\"SET $entity.test = true;\", {entity: n}) YIELD row RETURN row";

testCall(db, "CALL apoc.periodic.iterate($cypherIterate, $cypherAction, {})",
map("cypherIterate", "MATCH (n:Rebind) RETURN n", "cypherAction", cypherAction), (row) -> {});
testCall(db, "CALL apoc.periodic.iterate($cypherIterate, $cypherAction, {})",
map("cypherIterate", "MATCH (:Rebind)-[n:REL_ONE]->(:Baz) RETURN n", "cypherAction", cypherAction), (row) -> {});

testCall(db, "MATCH (n:Rebind)-[r:REL_ONE]->(:Baz) RETURN n, r", r -> {
final Node node = (Node) r.get("n");
assertEquals(true, node.getProperty("test"));
assertEquals(1L, node.getProperty("first"));
final Relationship rel = (Relationship) r.get("r");
assertEquals(true, rel.getProperty("test"));
assertEquals(1L, rel.getProperty("first"));
});
}

@Test
public void testIterateRebind() throws Exception {

Node node;
try (Transaction tx = db.beginTx()) {
node = tx.createNode(Label.label("Ajeje"));
tx.commit();
}

final Map<String, Object> config = map("params", map("b", node));

final Map<String, Object> params = map("cypherIterate", "WITH $b as b RETURN b",
"cypherAction", "SET b.test = 1",
"config", config);
try {
testCall(db, "CALL apoc.periodic.iterate($cypherIterate, $cypherAction, $config )", params, (row) -> fail());
} catch (Exception e) {
assertTrue(e.getMessage().contains("NotInTransactionException"));
}

// with rebind flag
config.put("rebind", true);

testCall(db, "CALL apoc.periodic.iterate($cypherIterate, $cypherAction, $config )", params, (row) -> {
assertEquals(1L, row.get("batches"));
assertEquals(map(), row.get("errorMessages"));
assertEquals(0L, row.get("failedBatches"));
});

testCall(db, "MATCH (b:Ajeje) RETURN b", r -> {
assertEquals(1L, ((Node) r.get("b")).getProperty("test"));
});

}

@Test
public void testSubmitStatementWithParams() throws Exception {
String callList = "CALL apoc.periodic.list()";
Expand Down
37 changes: 37 additions & 0 deletions full/src/main/java/apoc/nodes/NodesExtended.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package apoc.nodes;

import apoc.Extended;
import apoc.util.EntityUtil;
import apoc.util.Util;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Transaction;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.UserFunction;

@Extended
public class NodesExtended {

@Context
public Transaction tx;

@UserFunction("apoc.node.rebind")
@Description("apoc.node.rebind(node - to rebind a node (i.e. executing a Transaction.getNodeById(node.getId()) ")
public Node nodeRebind(@Name("node") Node node) {
return Util.rebind(tx, node);
}

@UserFunction("apoc.rel.rebind")
@Description("apoc.rel.rebind(rel) - to rebind a rel (i.e. executing a Transaction.getRelationshipById(rel.getId()) ")
public Relationship relationshipRebind(@Name("rel") Relationship rel) {
return Util.rebind(tx, rel);
}

@UserFunction("apoc.any.rebind")
@Description("apoc.any.rebind(Object) - to rebind any rel, node, path, map, list or combination of them (i.e. executing a Transaction.getNodeById(node.getId()) / Transaction.getRelationshipById(rel.getId()))")
public Object anyRebind(@Name("any") Object any) {
return EntityUtil.anyRebind(tx, any);
}
}
37 changes: 37 additions & 0 deletions full/src/main/java/apoc/util/EntityUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package apoc.util;

import org.neo4j.graphalgo.impl.util.PathImpl;
import org.neo4j.graphdb.Entity;
import org.neo4j.graphdb.Path;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Transaction;
import org.neo4j.internal.helpers.collection.Iterables;

import java.util.Map;
import java.util.stream.Collectors;

public class EntityUtil {

public static <T> T anyRebind(Transaction tx, T any) {
if (any instanceof Map) {
return (T) ((Map<String, Object>) any).entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey(), e -> anyRebind(tx, e.getValue())));
}
if (any instanceof Path) {
final Path path = (Path) any;
PathImpl.Builder builder = new PathImpl.Builder(Util.rebind(tx, path.startNode()));
for (Relationship rel: path.relationships()) {
builder = builder.push(Util.rebind(tx, rel));
}
return (T) builder.build();
}
if (any instanceof Iterable) {
return (T) Iterables.stream((Iterable) any)
.map(i -> anyRebind(tx, i)).collect(Collectors.toList());
}
if (any instanceof Entity) {
return (T) Util.rebind(tx, (Entity) any);
}
return any;
}
}
Loading

0 comments on commit c2978fa

Please sign in to comment.