Skip to content

Commit

Permalink
Fixes the Cypher files execution not happening sequentially (#2707) (#…
Browse files Browse the repository at this point in the history
…2726)

Co-authored-by: Nacho Cordón <[email protected]>
  • Loading branch information
github-actions[bot] and ncordon authored Apr 12, 2022
1 parent 016d6ae commit cac5192
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 7 deletions.
12 changes: 6 additions & 6 deletions full/src/main/java/apoc/cypher/CypherExtended.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,19 @@ public Stream<RowResult> runFiles(@Name("file") List<String> fileNames, @Name(va
return runFiles(fileNames, config, parameters, schemaOperation);
}

// This runs the files sequentially
private Stream<RowResult> runFiles(List<String> fileNames, Map<String, Object> config, Map<String, Object> parameters, boolean schemaOperation) {
boolean addStatistics = Util.toBoolean(config.getOrDefault("statistics",true));
int timeout = Util.toInteger(config.getOrDefault("timeout",10));
int queueCapacity = Util.toInteger(config.getOrDefault("queueCapacity",100));
List<Stream<RowResult>> result = new ArrayList<>();
for (final String fileName : fileNames) {
var result = fileNames.stream().flatMap(fileName -> {
final Reader reader = readerForFile(fileName);
final Scanner scanner = createScannerFor(reader);
final Stream<RowResult> stream = runManyStatements(scanner, parameters, schemaOperation, addStatistics, timeout, queueCapacity)
return runManyStatements(scanner, parameters, schemaOperation, addStatistics, timeout, queueCapacity)
.onClose(() -> Util.close(scanner, (e) -> log.info("Cannot close the scanner for file " + fileName + " because the following exception", e)));
result.add(stream);
}
return result.stream().reduce(Stream::concat).orElse(Stream.empty());
});

return result;
}

@Procedure(mode=Mode.SCHEMA)
Expand Down
5 changes: 4 additions & 1 deletion full/src/test/java/apoc/cypher/CypherExtendedTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ public void testRunFileWithParameters() throws Exception {

@Test
public void testRunFilesMultiple() throws Exception {
testResult(db, "CALL apoc.cypher.runFiles(['create.cypher', 'create_delete.cypher'])",
// The execution of both these files should happen sequentially
// There was a bug before that made both executions to
// interleave and at some point we were seeing nodesDeleted = 4
testResult(db, "CALL apoc.cypher.runFiles(['create_with_sleep.cypher', 'create_delete_with_sleep.cypher'])",
r -> {
Map<String, Object> row = r.next();
assertEquals(row.get("row"),((Map)row.get("result")).get("id"));
Expand Down
6 changes: 6 additions & 0 deletions full/src/test/resources/create_delete_with_sleep.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE (n:Node {id:1});

CALL apoc.util.sleep(2000)

MATCH (n)
DELETE n;
7 changes: 7 additions & 0 deletions full/src/test/resources/create_with_sleep.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
UNWIND RANGE(0,2) as id
CREATE (n:Node {id:id})
RETURN n.id as id;

CALL apoc.util.sleep(1000)

MATCH (n) DELETE n;

0 comments on commit cac5192

Please sign in to comment.