From 460e4df5363ac7d41b99fdf048d1c721cf78cef1 Mon Sep 17 00:00:00 2001 From: vga91 Date: Tue, 6 Jun 2023 09:53:31 +0200 Subject: [PATCH] [drqYtb6r] apoc.cypher.run* procedures don't return results (extended) --- .../main/java/apoc/cypher/CypherExtended.java | 6 +- .../src/main/java/apoc/util/EntityUtil.java | 37 --- .../cypher/CypherEnterpriseExtendedTest.java | 250 ++++++++++++++++++ .../apoc/cypher/CypherEnterpriseTest.java | 58 ---- 4 files changed, 254 insertions(+), 97 deletions(-) delete mode 100644 extended/src/main/java/apoc/util/EntityUtil.java create mode 100644 extended/src/test/java/apoc/cypher/CypherEnterpriseExtendedTest.java delete mode 100644 extended/src/test/java/apoc/cypher/CypherEnterpriseTest.java diff --git a/extended/src/main/java/apoc/cypher/CypherExtended.java b/extended/src/main/java/apoc/cypher/CypherExtended.java index aca0866e0f..9ed827a95a 100644 --- a/extended/src/main/java/apoc/cypher/CypherExtended.java +++ b/extended/src/main/java/apoc/cypher/CypherExtended.java @@ -4,6 +4,7 @@ import apoc.Pools; import apoc.result.MapResult; import apoc.util.CompressionAlgo; +import apoc.util.EntityUtil; import apoc.util.FileUtils; import apoc.util.QueueBasedSpliterator; import apoc.util.Util; @@ -202,7 +203,8 @@ private Object consumeResult(Result result, BlockingQueue queue, bool int row = 0; while (result.hasNext()) { terminationGuard.check(); - queue.put(new RowResult(row++, result.next())); + Map res = EntityUtil.anyRebind(tx, result.next()); + queue.put(new RowResult(row++, res)); } if (addStatistics) { queue.put(new RowResult(-1, toMap(result.getQueryStatistics(), System.currentTimeMillis() - time, row))); @@ -385,7 +387,7 @@ public Stream parallel2(@Name("fragment") String fragment, @Name("par } return futures.stream().flatMap(f -> { try { - return f.get().stream().map(MapResult::new); + return EntityUtil.anyRebind(tx, f.get()).stream().map(MapResult::new); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException("Error executing in parallel " + statement, e); } diff --git a/extended/src/main/java/apoc/util/EntityUtil.java b/extended/src/main/java/apoc/util/EntityUtil.java deleted file mode 100644 index 3c0a2d5e58..0000000000 --- a/extended/src/main/java/apoc/util/EntityUtil.java +++ /dev/null @@ -1,37 +0,0 @@ -package apoc.util; - -import org.neo4j.graphalgo.impl.util.PathImpl; -import org.neo4j.graphdb.Entity; -import org.neo4j.graphdb.Path; -import org.neo4j.graphdb.Relationship; -import org.neo4j.graphdb.Transaction; -import org.neo4j.internal.helpers.collection.Iterables; - -import java.util.Map; -import java.util.stream.Collectors; - -public class EntityUtil { - - public static T anyRebind(Transaction tx, T any) { - if (any instanceof Map) { - return (T) ((Map) any).entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey(), e -> anyRebind(tx, e.getValue()))); - } - if (any instanceof Path) { - final Path path = (Path) any; - PathImpl.Builder builder = new PathImpl.Builder(Util.rebind(tx, path.startNode())); - for (Relationship rel: path.relationships()) { - builder = builder.push(Util.rebind(tx, rel)); - } - return (T) builder.build(); - } - if (any instanceof Iterable) { - return (T) Iterables.stream((Iterable) any) - .map(i -> anyRebind(tx, i)).collect(Collectors.toList()); - } - if (any instanceof Entity) { - return (T) Util.rebind(tx, (Entity) any); - } - return any; - } -} diff --git a/extended/src/test/java/apoc/cypher/CypherEnterpriseExtendedTest.java b/extended/src/test/java/apoc/cypher/CypherEnterpriseExtendedTest.java new file mode 100644 index 0000000000..3362071764 --- /dev/null +++ b/extended/src/test/java/apoc/cypher/CypherEnterpriseExtendedTest.java @@ -0,0 +1,250 @@ +package apoc.cypher; + +import apoc.util.Neo4jContainerExtension; +import apoc.util.TestContainerUtil.ApocPackage; +import apoc.util.collection.Iterables; +import org.apache.commons.io.FileUtils; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.neo4j.driver.Session; +import org.neo4j.driver.types.Node; +import org.neo4j.driver.types.Relationship; + +import static apoc.cypher.CypherTestUtil.CREATE_RESULT_NODES; +import static apoc.cypher.CypherTestUtil.CREATE_RETURNQUERY_NODES; +import static apoc.cypher.CypherTestUtil.SET_AND_RETURN_QUERIES; +import static apoc.cypher.CypherTestUtil.SET_NODE; +import static apoc.cypher.CypherTestUtil.SIMPLE_RETURN_QUERIES; +import static apoc.cypher.CypherTestUtil.assertReturnQueryNode; +import static apoc.cypher.CypherTestUtil.testRunProcedureWithSetAndReturnResults; +import static apoc.cypher.CypherTestUtil.testRunProcedureWithSimpleReturnResults; +import static apoc.util.TestContainerUtil.createEnterpriseDB; +import static apoc.util.TestContainerUtil.importFolder; +import static apoc.util.TestContainerUtil.testCall; +import static apoc.util.TestContainerUtil.testResult; +import static apoc.util.Util.map; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; + +public class CypherEnterpriseExtendedTest { + private static final String SET_RETURN_FILE = "set_and_return.cypher"; + private static final String MATCH_RETURN_FILE = "match_and_return.cypher"; + + private static Neo4jContainerExtension neo4jContainer; + private static Session session; + + @BeforeClass + public static void beforeAll() { + // We build the project, the artifact will be placed into ./build/libs + neo4jContainer = createEnterpriseDB(List.of(ApocPackage.EXTENDED), true) + .withNeo4jConfig("dbms.transaction.timeout", "60s"); + neo4jContainer.start(); + session = neo4jContainer.getSession(); + + // create cypher files + createContainerFile(SET_RETURN_FILE, SET_AND_RETURN_QUERIES); + createContainerFile(MATCH_RETURN_FILE, SIMPLE_RETURN_QUERIES); + } + + private static void createContainerFile(String fileName, String fileContent) { + try { + File file = new File(importFolder, fileName); + FileUtils.writeStringToFile(file, fileContent, StandardCharsets.UTF_8); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterClass + public static void afterAll() { + session.close(); + neo4jContainer.close(); + } + + @After + public void after() { + session.writeTransaction(tx -> tx.run("MATCH (n) DETACH DELETE n")); + } + + @Test + public void testParallelTransactionGuard() { + // given + String parallelQuery = "UNWIND range(0,9) as id CALL apoc.util.sleep(10000) WITH id RETURN id"; + + // when + try { + int size = 10_000; + testResult(neo4jContainer.getSession(), + "CALL apoc.cypher.parallel2('" + parallelQuery + "', {a: range(1, $size)}, 'a')", + map("size", size), + r -> {}); + } catch (Exception ignored) {} + + // then + boolean anyLingeringParallelTx = neo4jContainer.getSession().readTransaction(tx -> { + var currentTxs = tx.run("SHOW TRANSACTIONS").stream(); + return currentTxs.anyMatch( record -> record.get( "currentQuery" ).toString().contains(parallelQuery)); + }); + + assertFalse(anyLingeringParallelTx); + } + + @Test + public void testRunFileWithSetAndResults() { + String query = "CALL apoc.cypher.runFile($file)"; + Map params = Map.of("file", SET_RETURN_FILE); + + testRunProcedureWithSetAndReturnResults(session, query, params); + } + + @Test + public void testRunFileWithResults() { + String query = "CALL apoc.cypher.runFile($file)"; + Map params = Map.of("file", MATCH_RETURN_FILE); + + testRunProcedureWithSimpleReturnResults(session, query, params); + } + + @Test + public void testRunFilesWithSetAndResults() { + String query = "CALL apoc.cypher.runFiles([$file])"; + Map params = Map.of("file", SET_RETURN_FILE); + + testRunProcedureWithSetAndReturnResults(session, query, params); + } + + @Test + public void testRunFilesWithResults() { + String query = "CALL apoc.cypher.runFiles([$file])"; + Map params = Map.of("file", MATCH_RETURN_FILE); + + testRunProcedureWithSimpleReturnResults(session, query, params); + } + + @Test + public void testCypherParallelWithSetAndResults() { + session.writeTransaction(tx -> tx.run(CREATE_RESULT_NODES)); + + String query = "CALL apoc.cypher.parallel($file, {a: range(1,4)}, 'a')"; + Map params = Map.of("file", SET_NODE); + + RuntimeException e = assertThrows(RuntimeException.class, + () -> testCall(session, query, params, (res) -> {}) + ); + String expectedMessage = "Set property for property 'updated' on database 'neo4j' is not allowed for user 'neo4j' with roles [PUBLIC, admin] overridden by READ."; + Assertions.assertThat(e.getMessage()).contains(expectedMessage); + } + + @Test + public void testCypherParallel2WithSetAndResults() { + session.writeTransaction(tx -> tx.run(CREATE_RESULT_NODES)); + + String query = "CALL apoc.cypher.parallel2($file, {a: range(1,4)}, 'a')"; + Map params = Map.of("file", SET_NODE); + + RuntimeException e = assertThrows(RuntimeException.class, + () -> testCall(session, query, params, (res) -> {}) + ); + String expectedMessage = "Creating new property name on database 'neo4j' is not allowed for user 'neo4j' with roles [PUBLIC, admin] overridden by READ"; + Assertions.assertThat(e.getMessage()).contains(expectedMessage); + } + + @Test + public void testCypherParallelWithResults() { + String query = "CALL apoc.cypher.parallel($file, {a: range(1,3)}, 'a')"; + Map params = Map.of("file", SIMPLE_RETURN_QUERIES); + + testCypherParallelCommon(query, params); + } + + @Test + public void testCypherParallel2WithResults() { + String query = "CALL apoc.cypher.parallel2($file, {a: range(1,3)}, 'a')"; + Map params = Map.of("file", SIMPLE_RETURN_QUERIES); + + testCypherParallelCommon(query, params); + } + + private void testCypherParallelCommon(String query, Map params) { + session.writeTransaction(tx -> tx.run(CREATE_RETURNQUERY_NODES)); + + testResult(session, query, params, r -> { + assertBatchCypherParallel(r); + assertBatchCypherParallel(r); + assertBatchCypherParallel(r); + + assertFalse(r.hasNext()); + }); + } + + private void assertBatchCypherParallel(Iterator> r) { + Map next = r.next(); + assertReturnQueryNode(0L, (Map) next.get("value")); + next = r.next(); + assertReturnQueryNode(1L, (Map) next.get("value")); + next = r.next(); + assertReturnQueryNode(2L, (Map) next.get("value")); + next = r.next(); + assertReturnQueryNode(3L, (Map) next.get("value")); + } + + @Test + public void testCypherMapParallelWithResults() { + String query = """ + MATCH (n:ReturnQuery) WITH COLLECT(n) AS list + CALL apoc.cypher.mapParallel('MATCH (_)-[r:REL]->(o:Other) RETURN r, o', {}, list) + YIELD value RETURN value"""; + Map params = Map.of("file", SIMPLE_RETURN_QUERIES); + + testCypherMapParallelCommon(query, params); + } + + @Test + public void testCypherMapParallel2WithResults() { + String query = """ + MATCH (n:ReturnQuery) WITH COLLECT(n) AS list + CALL apoc.cypher.mapParallel2('MATCH (_)-[r:REL]->(o:Other) RETURN r, o', {}, list, 1) + YIELD value RETURN value"""; + Map params = Map.of("file", SIMPLE_RETURN_QUERIES); + + testCypherMapParallelCommon(query, params); + } + + private void testCypherMapParallelCommon(String query, Map params) { + session.writeTransaction(tx -> tx.run(CREATE_RETURNQUERY_NODES)); + + testResult(session, query, params, r -> { + Map next = r.next(); + assertOtherNodeAndRel(0L, (Map) next.get("value")); + next = r.next(); + assertOtherNodeAndRel(1L, (Map) next.get("value")); + next = r.next(); + assertOtherNodeAndRel(2L, (Map) next.get("value")); + next = r.next(); + assertOtherNodeAndRel(3L, (Map) next.get("value")); + + assertFalse(r.hasNext()); + }); + } + + public static void assertOtherNodeAndRel(long id, Map result) { + Node n = (Node) result.get("o"); + assertEquals(List.of("Other"), Iterables.asList(n.labels())); + assertEquals(Map.of("idOther", id), n.asMap()); + + Relationship rel = (Relationship) result.get("r"); + assertEquals("REL", rel.type()); + assertEquals(Map.of("idRel", id), rel.asMap()); + } +} diff --git a/extended/src/test/java/apoc/cypher/CypherEnterpriseTest.java b/extended/src/test/java/apoc/cypher/CypherEnterpriseTest.java deleted file mode 100644 index 885da04027..0000000000 --- a/extended/src/test/java/apoc/cypher/CypherEnterpriseTest.java +++ /dev/null @@ -1,58 +0,0 @@ -package apoc.cypher; - -import apoc.util.Neo4jContainerExtension; -import apoc.util.TestContainerUtil.ApocPackage; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import java.util.List; -import org.neo4j.driver.Session; - -import static apoc.util.TestContainerUtil.createEnterpriseDB; -import static apoc.util.TestContainerUtil.testResult; -import static apoc.util.Util.map; - -public class CypherEnterpriseTest { - - private static Neo4jContainerExtension neo4jContainer; - private static Session session; - - @BeforeClass - public static void beforeAll() { - // We build the project, the artifact will be placed into ./build/libs - neo4jContainer = createEnterpriseDB(List.of(ApocPackage.EXTENDED), true) - .withNeo4jConfig("dbms.transaction.timeout", "60s"); - neo4jContainer.start(); - session = neo4jContainer.getSession(); - } - - @AfterClass - public static void afterAll() { - session.close(); - neo4jContainer.close(); - } - - @Test - public void testParallelTransactionGuard() { - // given - String parallelQuery = "UNWIND range(0,9) as id CALL apoc.util.sleep(10000) WITH id RETURN id"; - - // when - try { - int size = 10_000; - testResult(neo4jContainer.getSession(), - "CALL apoc.cypher.parallel2('" + parallelQuery + "', {a: range(1, $size)}, 'a')", - map("size", size), - r -> {}); - } catch (Exception ignored) {} - - // then - boolean anyLingeringParallelTx = neo4jContainer.getSession().readTransaction(tx -> { - var currentTxs = tx.run("SHOW TRANSACTIONS").stream(); - return currentTxs.anyMatch( record -> record.get( "currentQuery" ).toString().contains(parallelQuery)); - }); - - Assert.assertFalse(anyLingeringParallelTx); - } -}