Skip to content

Commit

Permalink
[HFWBmuq2] Fixes neo4j/apoc#155: Check for correct tx terminations (n…
Browse files Browse the repository at this point in the history
…eo4j/apoc#256) (neo4j-contrib#3534)

* [HFWBmuq2] Fixes neo4j/apoc#155: Check for correct tx terminations (neo4j/apoc#256)

* [HFWBmuq2] Fixes neo4j/apoc#155: Check for correct tx terminations

* [HFWBmuq2] Code clean

* [HFWBmuq2] removed unused code

* [HFWBmuq2] try solving flaky tests

* [HFWBmuq2] changes review - added time check

* [HFWBmuq2] added local file tests - small changes

* [HFWBmuq2] removed unused imports after rebase

* [HFWBmuq2] fix flaky transaction not found error

* [HFWBmuq2] Fix heapspace and flaky errors

* [HFWBmuq2] [Bc2lkk3N] Fixed testImportCsvTerminate and added TerminationGuard to apoc.import.csv (neo4j/apoc#343)

* [Bc2lkk3N] Fix testImportCsvTerminate and add TerminationGuard to apoc.import.csv

* [Bc2lkk3N] Fix failing tests

* [Bc2lkk3N] Fix typo

* [HFWBmuq2] fix compile error

* [HFWBmuq2] remove imports

* [HFWBmuq2] fix flaky Timeboxed error

* [HFWBmuq2] removed unused imports
  • Loading branch information
vga91 authored and BennuFire committed Jul 10, 2023
1 parent 70501e6 commit c817cfa
Show file tree
Hide file tree
Showing 21 changed files with 66,059 additions and 114 deletions.
13 changes: 12 additions & 1 deletion core/src/main/java/apoc/cypher/Timeboxed.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import apoc.Pools;
import apoc.result.MapResult;
import apoc.util.Util;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
Expand All @@ -29,6 +30,7 @@
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -54,6 +56,9 @@ public class Timeboxed {
@Context
public Pools pools;

@Context
public TerminationGuard terminationGuard;

private final static Map<String,Object> POISON = Collections.singletonMap("__magic", "POISON");

@Procedure
Expand All @@ -70,14 +75,20 @@ public Stream<MapResult> runTimeboxed(@Name("cypher") String cypher, @Name("para
txAtomic.set(innerTx);
Result result = innerTx.execute(cypher, params == null ? Collections.EMPTY_MAP : params);
while (result.hasNext()) {
if (Util.transactionIsTerminated(terminationGuard)) {
txAtomic.get().close();
offerToQueue(queue, POISON, timeout);
return;
}

final Map<String, Object> map = result.next();
offerToQueue(queue, map, timeout);
}
offerToQueue(queue, POISON, timeout);
innerTx.commit();
} catch (TransactionTerminatedException e) {
log.warn("query " + cypher + " has been terminated");
} finally {
offerToQueue(queue, POISON, timeout);
txAtomic.set(null);
}
});
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/java/apoc/export/csv/CsvEntityLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.opencsv.CSVReaderBuilder;
import org.neo4j.graphdb.*;
import org.neo4j.logging.Log;
import org.neo4j.procedure.TerminationGuard;

import java.io.IOException;
import java.util.*;
Expand All @@ -43,14 +44,17 @@ public class CsvEntityLoader {
private final ProgressReporter reporter;
private final Log log;

private final TerminationGuard terminationGuard;

/**
* @param clc configuration object
* @param reporter
*/
public CsvEntityLoader(CsvLoaderConfig clc, ProgressReporter reporter, Log log) {
public CsvEntityLoader(CsvLoaderConfig clc, ProgressReporter reporter, Log log, TerminationGuard terminationGuard) {
this.clc = clc;
this.reporter = reporter;
this.log = log;
this.terminationGuard = terminationGuard;
}

/**
Expand Down Expand Up @@ -100,6 +104,7 @@ public void loadNodes(final Object fileName, final List<String> labels, final Gr
BatchTransaction btx = new BatchTransaction(db, clc.getBatchSize(), reporter);
try {
csv.forEach(line -> {
terminationGuard.check();
lineNo.getAndIncrement();

final EnumSet<Results> results = EnumSet.of(Results.map);
Expand Down Expand Up @@ -215,6 +220,7 @@ public void loadRelationships(
BatchTransaction btx = new BatchTransaction(db, clc.getBatchSize(), reporter);
try {
csv.forEach(line -> {
terminationGuard.check();
lineNo.getAndIncrement();

final EnumSet<Results> results = EnumSet.of(Results.map);
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/apoc/export/csv/ImportCsv.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class ImportCsv {
@Context
public Log log;

@Context
public TerminationGuard terminationGuard;

public ImportCsv(GraphDatabaseService db) {
this.db = db;
}
Expand All @@ -66,7 +69,7 @@ public Stream<ProgressInfo> importCsv(
}
final CsvLoaderConfig clc = CsvLoaderConfig.from(config);
final ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(file, source, "csv"));
final CsvEntityLoader loader = new CsvEntityLoader(clc, reporter, log);
final CsvEntityLoader loader = new CsvEntityLoader(clc, reporter, log, terminationGuard);

final Map<String, Map<String, Long>> idMapping = new HashMap<>();
for (Map<String, Object> node : nodes) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/apoc/export/graphml/ExportGraphML.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public Stream<ProgressInfo> file(@Name("urlOrBinaryFile") Object urlOrBinaryFile

if (exportConfig.storeNodeIds()) graphMLReader.storeNodeIds();

graphMLReader.parseXML(FileUtils.readerFor(urlOrBinaryFile, exportConfig.getCompressionAlgo()));
graphMLReader.parseXML(FileUtils.readerFor(urlOrBinaryFile, exportConfig.getCompressionAlgo()), terminationGuard);
return reporter.getTotal();
});
return Stream.of(result);
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/apoc/export/graphml/XmlGraphMLReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import apoc.util.JsonUtil;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.graphdb.*;
import org.neo4j.procedure.TerminationGuard;

import javax.xml.namespace.QName;
import javax.xml.stream.XMLEventReader;
Expand Down Expand Up @@ -219,7 +220,7 @@ public XmlGraphMLReader(GraphDatabaseService db, Transaction tx) {
this.tx = tx;
}

public long parseXML(Reader input) throws XMLStreamException {
public long parseXML(Reader input, TerminationGuard terminationGuard) throws XMLStreamException {
Map<String, Long> cache = new HashMap<>(1024*32);
XMLInputFactory inputFactory = XMLInputFactory.newInstance();
inputFactory.setProperty("javax.xml.stream.isCoalescing", true);
Expand All @@ -235,6 +236,7 @@ public long parseXML(Reader input) throws XMLStreamException {
try {

while (reader.hasNext()) {
terminationGuard.check();
XMLEvent event;
try {
event = (XMLEvent) reader.next();
Expand Down
20 changes: 16 additions & 4 deletions core/src/main/java/apoc/load/LoadJson.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -42,6 +43,9 @@ public class LoadJson {
private static final String AUTH_HEADER_KEY = "Authorization";
private static final String LOAD_TYPE = "json";

@Context
public TerminationGuard terminationGuard;

@Context
public GraphDatabaseService db;

Expand Down Expand Up @@ -75,26 +79,34 @@ public Stream<MapResult> jsonParams(@Name("urlOrKeyOrBinary") Object urlOrKeyOrB
boolean failOnError = (boolean) config.getOrDefault("failOnError", true);
String compressionAlgo = (String) config.getOrDefault(COMPRESSION, CompressionAlgo.NONE.name());
List<String> pathOptions = (List<String>) config.get("pathOptions");
return loadJsonStream(urlOrKeyOrBinary, headers, payload, path, failOnError, compressionAlgo, pathOptions);
return loadJsonStream(urlOrKeyOrBinary, headers, payload, path, failOnError, compressionAlgo, pathOptions, terminationGuard);
}

public static Stream<MapResult> loadJsonStream(@Name("url") Object url, @Name("headers") Map<String, Object> headers, @Name("payload") String payload) {
return loadJsonStream(url, headers, payload, "", true, null, null);
return loadJsonStream(url, headers, payload, "", true, null, null, null);
}
public static Stream<MapResult> loadJsonStream(@Name("urlOrKeyOrBinary") Object urlOrKeyOrBinary, @Name("headers") Map<String, Object> headers, @Name("payload") String payload, String path, boolean failOnError, String compressionAlgo, List<String> pathOptions) {
public static Stream<MapResult> loadJsonStream(@Name("urlOrKeyOrBinary") Object urlOrKeyOrBinary, @Name("headers") Map<String, Object> headers, @Name("payload") String payload, String path, boolean failOnError, String compressionAlgo, List<String> pathOptions, TerminationGuard terminationGuard) {
if (urlOrKeyOrBinary instanceof String) {
headers = null != headers ? headers : new HashMap<>();
headers.putAll(Util.extractCredentialsIfNeeded((String) urlOrKeyOrBinary, failOnError));
}
Stream<Object> stream = JsonUtil.loadJson(urlOrKeyOrBinary,headers,payload, path, failOnError, compressionAlgo, pathOptions);
return stream.flatMap((value) -> {
if (terminationGuard != null) {
terminationGuard.check();
}
if (value instanceof Map) {
return Stream.of(new MapResult((Map) value));
}
if (value instanceof List) {
if (((List)value).isEmpty()) return Stream.empty();
if (((List) value).get(0) instanceof Map)
return ((List) value).stream().map((v) -> new MapResult((Map) v));
return ((List) value).stream().map((v) -> {
if (terminationGuard != null) {
terminationGuard.check();
}
return new MapResult((Map) v);
});
return Stream.of(new MapResult(Collections.singletonMap("result",value)));
}
if(!failOnError)
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/apoc/load/Xml.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.neo4j.procedure.Mode;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;
import org.neo4j.procedure.UserFunction;
import org.w3c.dom.CharacterData;
import org.w3c.dom.Document;
Expand Down Expand Up @@ -102,6 +103,9 @@ public class Xml {
@Context
public Log log;

@Context
public TerminationGuard terminationGuard;

@Procedure
@Description("apoc.load.xml('http://example.com/test.xml', 'xPath',config, false) YIELD value as doc CREATE (p:Person) SET p.name = doc.name - load from XML URL (e.g. web-api) to import XML as single nested map with attributes and _type, _text and _childrenx fields.")
public Stream<MapResult> xml(@Name("urlOrBinary") Object urlOrBinary, @Name(value = "path", defaultValue = "/") String path, @Name(value = "config",defaultValue = "{}") Map<String, Object> config, @Name(value = "simple", defaultValue = "false") boolean simpleMode) throws Exception {
Expand Down Expand Up @@ -212,6 +216,7 @@ private boolean proceedReader(XMLStreamReader reader) throws XMLStreamException
}

private void handleNode(Deque<Map<String, Object>> stack, Node node, boolean simpleMode) {
terminationGuard.check();

// Handle document node
if (node.getNodeType() == Node.DOCUMENT_NODE) {
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/apoc/refactor/GraphRefactoring.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class GraphRefactoring {
@Context
public Pools pools;

@Context
public TerminationGuard terminationGuard;

private Stream<NodeRefactorResult> doCloneNodes(@Name("nodes") List<Node> nodes, @Name("withRelationships") boolean withRelationships, List<String> skipProperties) {
if (nodes == null) return Stream.empty();
return nodes.stream().map(node -> Util.rebind(tx, node)).map(node -> {
Expand Down Expand Up @@ -230,6 +233,7 @@ public Stream<NodeRefactorResult> cloneSubgraph(@Name("nodes") List<Node> nodes,

// clone nodes and populate copy map
for (Node node : nodes) {
terminationGuard.check();
if (node == null || standinMap.containsKey(node)) continue;
// standinNodes will NOT be cloned

Expand All @@ -254,6 +258,8 @@ public Stream<NodeRefactorResult> cloneSubgraph(@Name("nodes") List<Node> nodes,

// clone relationships, will be between cloned nodes and/or standins
for (Relationship rel : rels) {
terminationGuard.check();

if (rel == null) continue;

Node oldStart = rel.getStartNode();
Expand Down
Loading

0 comments on commit c817cfa

Please sign in to comment.