From dc213be6d69fb91567e5d097e796d4372373c383 Mon Sep 17 00:00:00 2001 From: Giuseppe Villani Date: Fri, 16 Jun 2023 16:45:50 +0200 Subject: [PATCH 1/2] [drqYtb6r] apoc.cypher.run* procedures don't return results (extended) (#3624) --- .../main/java/apoc/cypher/CypherExtended.java | 6 +- .../cypher/CypherEnterpriseExtendedTest.java | 401 ++++++++++++++++++ .../apoc/cypher/CypherEnterpriseTest.java | 58 --- 3 files changed, 405 insertions(+), 60 deletions(-) create mode 100644 extended/src/test/java/apoc/cypher/CypherEnterpriseExtendedTest.java delete mode 100644 extended/src/test/java/apoc/cypher/CypherEnterpriseTest.java diff --git a/extended/src/main/java/apoc/cypher/CypherExtended.java b/extended/src/main/java/apoc/cypher/CypherExtended.java index 84c5b80c65..57a02c2dfa 100644 --- a/extended/src/main/java/apoc/cypher/CypherExtended.java +++ b/extended/src/main/java/apoc/cypher/CypherExtended.java @@ -4,6 +4,7 @@ import apoc.Pools; import apoc.result.MapResult; import apoc.util.CompressionAlgo; +import apoc.util.EntityUtil; import apoc.util.FileUtils; import apoc.util.QueueBasedSpliterator; import apoc.util.QueueUtil; @@ -242,7 +243,8 @@ private Object consumeResult(Result result, BlockingQueue queue, bool int row = 0; while (result.hasNext()) { terminationGuard.check(); - queue.put(new RowResult(row++, result.next())); + Map res = EntityUtil.anyRebind(tx, result.next()); + queue.put(new RowResult(row++, res)); } if (addStatistics) { queue.put(new RowResult(-1, toMap(result.getQueryStatistics(), System.currentTimeMillis() - time, row))); @@ -425,7 +427,7 @@ public Stream parallel2(@Name("fragment") String fragment, @Name("par } return futures.stream().flatMap(f -> { try { - return f.get().stream().map(MapResult::new); + return EntityUtil.anyRebind(tx, f.get()).stream().map(MapResult::new); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException("Error executing in parallel " + statement, e); } diff --git a/extended/src/test/java/apoc/cypher/CypherEnterpriseExtendedTest.java b/extended/src/test/java/apoc/cypher/CypherEnterpriseExtendedTest.java new file mode 100644 index 0000000000..f071492fd6 --- /dev/null +++ b/extended/src/test/java/apoc/cypher/CypherEnterpriseExtendedTest.java @@ -0,0 +1,401 @@ +package apoc.cypher; + +import apoc.util.Neo4jContainerExtension; +import apoc.util.TestContainerUtil.ApocPackage; +import apoc.util.collection.Iterables; +import apoc.util.collection.Iterators; +import org.apache.commons.io.FileUtils; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import org.neo4j.driver.Session; +import org.neo4j.driver.types.Node; +import org.neo4j.driver.types.Relationship; + +import static apoc.util.TestContainerUtil.createEnterpriseDB; +import static apoc.util.TestContainerUtil.testCall; +import static apoc.util.TestContainerUtil.testResult; +import static apoc.util.Util.map; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; + +public class CypherEnterpriseExtendedTest { + private static final String CREATE_RETURNQUERY_NODES = "UNWIND range(0,3) as id \n" + + "CREATE (n:ReturnQuery {id:id})-[:REL {idRel: id}]->(:Other {idOther: id})"; + private static final String CREATE_RESULT_NODES = "UNWIND range(0,3) as id \n" + + "CREATE (n:Result {id:id})-[:REL {idRel: id}]->(:Other {idOther: id})"; + private static final String SIMPLE_RETURN_QUERIES = "MATCH (n:ReturnQuery) RETURN n"; + + private static final String SET_RETURN_FILE = "set_and_return.cypher"; + private static final String MATCH_RETURN_FILE = "match_and_return.cypher"; + + private static final File importFolder = new File("import"); + + public static String SET_NODE = """ + MATCH (n:Result)-[:REL]->(:Other) + SET n.updated = true + RETURN n; + """; + + public static String SET_AND_RETURN_QUERIES = """ + MATCH (n:Result)-[:REL]->(:Other) + SET n.updated = true + RETURN n; + + MATCH (n:Result)-[rel:REL]->(o:Other) + SET rel.updated = 1 + RETURN n, o, collect(rel) AS rels; + + MATCH (n:Result)-[rel:REL]->(o:Other) + SET o.updated = 'true' + RETURN collect(n) as nodes, collect(rel) as rels, collect(o) as others; + """; + + private static Neo4jContainerExtension neo4jContainer; + private static Session session; + + @BeforeClass + public static void beforeAll() { + // We build the project, the artifact will be placed into ./build/libs + neo4jContainer = createEnterpriseDB(List.of(ApocPackage.EXTENDED), true) + .withNeo4jConfig("dbms.transaction.timeout", "60s"); + neo4jContainer.start(); + session = neo4jContainer.getSession(); + + // create cypher files + createContainerFile(SET_RETURN_FILE, SET_AND_RETURN_QUERIES); + createContainerFile(MATCH_RETURN_FILE, SIMPLE_RETURN_QUERIES); + } + + private static void createContainerFile(String fileName, String fileContent) { + try { + File file = new File(importFolder, fileName); + FileUtils.writeStringToFile(file, fileContent, StandardCharsets.UTF_8); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterClass + public static void afterAll() throws IOException { + Stream.of(SET_RETURN_FILE, MATCH_RETURN_FILE) + .forEach(file -> new File(importFolder, file).delete()); + + session.close(); + neo4jContainer.close(); + } + + @After + public void after() { + session.writeTransaction(tx -> tx.run("MATCH (n) DETACH DELETE n")); + } + + @Test + public void testParallelTransactionGuard() { + // given + String parallelQuery = "UNWIND range(0,9) as id CALL apoc.util.sleep(10000) WITH id RETURN id"; + + // when + try { + int size = 10_000; + testResult(neo4jContainer.getSession(), + "CALL apoc.cypher.parallel2('" + parallelQuery + "', {a: range(1, $size)}, 'a')", + map("size", size), + r -> {}); + } catch (Exception ignored) {} + + // then + boolean anyLingeringParallelTx = neo4jContainer.getSession().readTransaction(tx -> { + var currentTxs = tx.run("SHOW TRANSACTIONS").stream(); + return currentTxs.anyMatch( record -> record.get( "currentQuery" ).toString().contains(parallelQuery)); + }); + + assertFalse(anyLingeringParallelTx); + } + + @Test + public void testRunFileWithSetAndResults() { + String query = "CALL apoc.cypher.runFile($file)"; + Map params = Map.of("file", SET_RETURN_FILE); + + testRunProcedureWithSetAndReturnResults(query, params); + } + + @Test + public void testRunFileWithResults() { + String query = "CALL apoc.cypher.runFile($file)"; + Map params = Map.of("file", MATCH_RETURN_FILE); + + testRunProcedureWithSimpleReturnResults(query, params); + } + + @Test + public void testRunFilesWithSetAndResults() { + String query = "CALL apoc.cypher.runFiles([$file])"; + Map params = Map.of("file", SET_RETURN_FILE); + + testRunProcedureWithSetAndReturnResults(query, params); + } + + @Test + public void testRunFilesWithResults() { + String query = "CALL apoc.cypher.runFiles([$file])"; + Map params = Map.of("file", MATCH_RETURN_FILE); + + testRunProcedureWithSimpleReturnResults(query, params); + } + + @Test + public void testCypherParallelWithSetAndResults() { + session.writeTransaction(tx -> tx.run(CREATE_RESULT_NODES)); + + String query = "CALL apoc.cypher.parallel($file, {a: range(1,4)}, 'a')"; + Map params = Map.of("file", SET_NODE); + + RuntimeException e = assertThrows(RuntimeException.class, + () -> testCall(session, query, params, (res) -> {}) + ); + String expectedMessage = "is not allowed for user 'neo4j' with roles [PUBLIC, admin] overridden by READ."; + Assertions.assertThat(e.getMessage()).contains(expectedMessage); + } + + @Test + public void testCypherParallel2WithSetAndResults() { + session.writeTransaction(tx -> tx.run(CREATE_RESULT_NODES)); + + String query = "CALL apoc.cypher.parallel2($file, {a: range(1,4)}, 'a')"; + Map params = Map.of("file", SET_NODE); + + RuntimeException e = assertThrows(RuntimeException.class, + () -> testCall(session, query, params, (res) -> {}) + ); + String expectedMessage = "Creating new property name on database 'neo4j' is not allowed for user 'neo4j' with roles [PUBLIC, admin] overridden by READ"; + Assertions.assertThat(e.getMessage()).contains(expectedMessage); + } + + @Test + public void testCypherParallelWithResults() { + String query = "CALL apoc.cypher.parallel($file, {a: range(1,3)}, 'a')"; + Map params = Map.of("file", SIMPLE_RETURN_QUERIES); + + testCypherParallelCommon(query, params); + } + + @Test + public void testCypherParallel2WithResults() { + String query = "CALL apoc.cypher.parallel2($file, {a: range(1,3)}, 'a')"; + Map params = Map.of("file", SIMPLE_RETURN_QUERIES); + + testCypherParallelCommon(query, params); + } + + private void testCypherParallelCommon(String query, Map params) { + session.writeTransaction(tx -> tx.run(CREATE_RETURNQUERY_NODES)); + + testResult(session, query, params, r -> { + assertBatchCypherParallel(r); + assertBatchCypherParallel(r); + assertBatchCypherParallel(r); + + assertFalse(r.hasNext()); + }); + } + + private void assertBatchCypherParallel(Iterator> r) { + Map next = r.next(); + assertReturnQueryNode(0L, (Map) next.get("value")); + next = r.next(); + assertReturnQueryNode(1L, (Map) next.get("value")); + next = r.next(); + assertReturnQueryNode(2L, (Map) next.get("value")); + next = r.next(); + assertReturnQueryNode(3L, (Map) next.get("value")); + } + + @Test + public void testCypherMapParallelWithResults() { + String query = """ + MATCH (n:ReturnQuery) WITH COLLECT(n) AS list + CALL apoc.cypher.mapParallel('MATCH (_)-[r:REL]->(o:Other) RETURN r, o', {}, list) + YIELD value RETURN value"""; + Map params = Map.of("file", SIMPLE_RETURN_QUERIES); + + testCypherMapParallelCommon(query, params); + } + + @Test + public void testCypherMapParallel2WithResults() { + String query = """ + MATCH (n:ReturnQuery) WITH COLLECT(n) AS list + CALL apoc.cypher.mapParallel2('MATCH (_)-[r:REL]->(o:Other) RETURN r, o', {}, list, 1) + YIELD value RETURN value"""; + Map params = Map.of("file", SIMPLE_RETURN_QUERIES); + + testCypherMapParallelCommon(query, params); + } + + public void testRunProcedureWithSimpleReturnResults(String query, Map params) { + session.writeTransaction(tx -> tx.run(CREATE_RETURNQUERY_NODES)); + testResult(session, query, params, + r -> { + // check that all results from the 1st statement are correctly returned + Map row = r.next(); + assertReturnQueryNode(row, 0L); + row = r.next(); + assertReturnQueryNode(row, 1L); + row = r.next(); + assertReturnQueryNode(row, 2L); + row = r.next(); + assertReturnQueryNode(row, 3L); + + // check `queryStatistics` row + row = r.next(); + Map result = (Map) row.get("result"); + assertEquals(-1L, row.get("row")); + assertEquals(0L, (long) result.get("nodesCreated")); + assertEquals(0L, (long) result.get("propertiesSet")); + + assertFalse(r.hasNext()); + }); + } + + public void testRunProcedureWithSetAndReturnResults(String query, Map params) { + session.writeTransaction(tx -> tx.run(CREATE_RESULT_NODES)); + + testResult(session, query, params, + r -> { + // check that all results from the 1st statement are correctly returned + Map row = r.next(); + assertRunProcNode(row, 0L); + row = r.next(); + assertRunProcNode(row, 1L); + row = r.next(); + assertRunProcNode(row, 2L); + row = r.next(); + assertRunProcNode(row, 3L); + + // check `queryStatistics` row + row = r.next(); + assertRunProcStatistics(row); + + // check that all results from the 2nd statement are correctly returned + row = r.next(); + assertRunProcRel(row, 0L); + row = r.next(); + assertRunProcRel(row, 1L); + row = r.next(); + assertRunProcRel(row, 2L); + row = r.next(); + assertRunProcRel(row, 3L); + + // check `queryStatistics` row + row = r.next(); + assertRunProcStatistics(row); + + // check that all results from the 3rd statement are correctly returned + row = r.next(); + assertEquals(0L, row.get("row") ); + Map result = (Map) row.get("result"); + assertEquals(3, result.size()); + List rels = (List) result.get("rels"); + List nodes = (List) result.get("nodes"); + List others = (List) result.get("others"); + assertEquals(4L, rels.size()); + assertEquals(4L, nodes.size()); + assertEquals(4L, others.size()); + row = r.next(); + + // check `queryStatistics` row + assertRunProcStatistics(row); + assertFalse(r.hasNext()); + }); + + // check that the procedure's SET operations work properly + testResult(session, + "MATCH p=(:Result {updated:true})-[:REL {updated: 1}]->(:Other {updated: 'true'}) RETURN *", + r -> assertEquals(4L, Iterators.count(r)) + ); + } + + private void assertRunProcStatistics(Map row) { + Map result = (Map) row.get("result"); + assertEquals(-1L, row.get("row")); + assertEquals(0L, (long) result.get("nodesCreated")); + assertEquals(4L, (long) result.get("propertiesSet")); + } + + private void testCypherMapParallelCommon(String query, Map params) { + session.writeTransaction(tx -> tx.run(CREATE_RETURNQUERY_NODES)); + + testResult(session, query, params, r -> { + Map next = r.next(); + assertOtherNodeAndRel(0L, (Map) next.get("value")); + next = r.next(); + assertOtherNodeAndRel(1L, (Map) next.get("value")); + next = r.next(); + assertOtherNodeAndRel(2L, (Map) next.get("value")); + next = r.next(); + assertOtherNodeAndRel(3L, (Map) next.get("value")); + + assertFalse(r.hasNext()); + }); + } + + public void assertOtherNodeAndRel(long id, Map result) { + Node n = (Node) result.get("o"); + assertEquals(List.of("Other"), Iterables.asList(n.labels())); + assertEquals(Map.of("idOther", id), n.asMap()); + + Relationship rel = (Relationship) result.get("r"); + assertEquals("REL", rel.type()); + assertEquals(Map.of("idRel", id), rel.asMap()); + } + + private void assertRunProcRel(Map row, long id) { + assertEquals(id, row.get("row") ); + + Map result = (Map) row.get("result"); + assertEquals(3, result.size()); + List n = (List) result.get("rels"); + assertEquals(1L, n.size()); + assertEquals("REL", n.get(0).type()); + assertEquals(Map.of("idRel", id, "updated", 1L), n.get(0).asMap()); + } + + private void assertReturnQueryNode(Map row, long id) { + assertEquals(id, row.get("row") ); + + Map result = (Map) row.get("result"); + assertEquals(1, result.size()); + assertReturnQueryNode(id, result); + } + + public void assertReturnQueryNode(long id, Map result) { + Node n = result.get("n"); + assertEquals(List.of("ReturnQuery"), Iterables.asList(n.labels())); + assertEquals(Map.of("id", id), n.asMap()); + } + + private void assertRunProcNode(Map row, long id) { + assertEquals(id, row.get("row") ); + + Map result = (Map) row.get("result"); + assertEquals(1, result.size()); + + Node n = result.get("n"); + assertEquals(List.of("Result"), Iterables.asList(n.labels())); + assertEquals(Map.of("id", id, "updated", true), n.asMap()); + } +} diff --git a/extended/src/test/java/apoc/cypher/CypherEnterpriseTest.java b/extended/src/test/java/apoc/cypher/CypherEnterpriseTest.java deleted file mode 100644 index 885da04027..0000000000 --- a/extended/src/test/java/apoc/cypher/CypherEnterpriseTest.java +++ /dev/null @@ -1,58 +0,0 @@ -package apoc.cypher; - -import apoc.util.Neo4jContainerExtension; -import apoc.util.TestContainerUtil.ApocPackage; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import java.util.List; -import org.neo4j.driver.Session; - -import static apoc.util.TestContainerUtil.createEnterpriseDB; -import static apoc.util.TestContainerUtil.testResult; -import static apoc.util.Util.map; - -public class CypherEnterpriseTest { - - private static Neo4jContainerExtension neo4jContainer; - private static Session session; - - @BeforeClass - public static void beforeAll() { - // We build the project, the artifact will be placed into ./build/libs - neo4jContainer = createEnterpriseDB(List.of(ApocPackage.EXTENDED), true) - .withNeo4jConfig("dbms.transaction.timeout", "60s"); - neo4jContainer.start(); - session = neo4jContainer.getSession(); - } - - @AfterClass - public static void afterAll() { - session.close(); - neo4jContainer.close(); - } - - @Test - public void testParallelTransactionGuard() { - // given - String parallelQuery = "UNWIND range(0,9) as id CALL apoc.util.sleep(10000) WITH id RETURN id"; - - // when - try { - int size = 10_000; - testResult(neo4jContainer.getSession(), - "CALL apoc.cypher.parallel2('" + parallelQuery + "', {a: range(1, $size)}, 'a')", - map("size", size), - r -> {}); - } catch (Exception ignored) {} - - // then - boolean anyLingeringParallelTx = neo4jContainer.getSession().readTransaction(tx -> { - var currentTxs = tx.run("SHOW TRANSACTIONS").stream(); - return currentTxs.anyMatch( record -> record.get( "currentQuery" ).toString().contains(parallelQuery)); - }); - - Assert.assertFalse(anyLingeringParallelTx); - } -} From f5119704f6741d27b13e37a5a5cbabcf5fc63003 Mon Sep 17 00:00:00 2001 From: vga91 Date: Mon, 24 Jul 2023 09:53:39 +0200 Subject: [PATCH 2/2] [drqYtb6r] changed importFolder with the correct one provided by TestContainerUtil --- .../java/apoc/cypher/CypherEnterpriseExtendedTest.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/extended/src/test/java/apoc/cypher/CypherEnterpriseExtendedTest.java b/extended/src/test/java/apoc/cypher/CypherEnterpriseExtendedTest.java index f071492fd6..fdde91761b 100644 --- a/extended/src/test/java/apoc/cypher/CypherEnterpriseExtendedTest.java +++ b/extended/src/test/java/apoc/cypher/CypherEnterpriseExtendedTest.java @@ -12,18 +12,17 @@ import org.junit.Test; import java.io.File; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.stream.Stream; import org.neo4j.driver.Session; import org.neo4j.driver.types.Node; import org.neo4j.driver.types.Relationship; import static apoc.util.TestContainerUtil.createEnterpriseDB; +import static apoc.util.TestContainerUtil.importFolder; import static apoc.util.TestContainerUtil.testCall; import static apoc.util.TestContainerUtil.testResult; import static apoc.util.Util.map; @@ -41,8 +40,6 @@ public class CypherEnterpriseExtendedTest { private static final String SET_RETURN_FILE = "set_and_return.cypher"; private static final String MATCH_RETURN_FILE = "match_and_return.cypher"; - private static final File importFolder = new File("import"); - public static String SET_NODE = """ MATCH (n:Result)-[:REL]->(:Other) SET n.updated = true @@ -89,10 +86,7 @@ private static void createContainerFile(String fileName, String fileContent) { } @AfterClass - public static void afterAll() throws IOException { - Stream.of(SET_RETURN_FILE, MATCH_RETURN_FILE) - .forEach(file -> new File(importFolder, file).delete()); - + public static void afterAll() { session.close(); neo4jContainer.close(); }