Skip to content

Commit

Permalink
[HFWBmuq2] Fixes #155: Check for correct tx terminations
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Jan 23, 2023
1 parent f08caee commit d5ee589
Show file tree
Hide file tree
Showing 17 changed files with 543 additions and 161 deletions.
13 changes: 11 additions & 2 deletions common/src/main/java/apoc/load/LoadJsonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,31 @@
import java.util.Map;
import java.util.stream.Stream;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.TerminationGuard;

public class LoadJsonUtils {
public static Stream<MapResult> loadJsonStream(@Name("urlOrKeyOrBinary") Object urlOrKeyOrBinary, @Name("headers") Map<String, Object> headers, @Name("payload") String payload, String path, boolean failOnError, String compressionAlgo, List<String> pathOptions) {
public static Stream<MapResult> loadJsonStream(@Name("urlOrKeyOrBinary") Object urlOrKeyOrBinary, @Name("headers") Map<String, Object> headers, @Name("payload") String payload, String path, boolean failOnError, String compressionAlgo, List<String> pathOptions, TerminationGuard terminationGuard) {
if (urlOrKeyOrBinary instanceof String) {
headers = null != headers ? headers : new HashMap<>();
headers.putAll(Util.extractCredentialsIfNeeded((String) urlOrKeyOrBinary, failOnError));
}
Stream<Object> stream = JsonUtil.loadJson(urlOrKeyOrBinary,headers,payload, path, failOnError, compressionAlgo, pathOptions);
return stream.flatMap((value) -> {
if (terminationGuard != null) {
terminationGuard.check();
}
if (value instanceof Map) {
return Stream.of(new MapResult((Map) value));
}
if (value instanceof List) {
if (((List)value).isEmpty()) return Stream.empty();
if (((List) value).get(0) instanceof Map)
return ((List) value).stream().map((v) -> new MapResult((Map) v));
return ((List) value).stream().map((v) -> {
if (terminationGuard != null) {
terminationGuard.check();
}
return new MapResult((Map) v);
});
return Stream.of(new MapResult(Collections.singletonMap("result",value)));
}
if(!failOnError)
Expand Down
1 change: 1 addition & 0 deletions common/src/main/java/apoc/util/JsonUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import apoc.export.util.DurationValueSerializer;
import apoc.export.util.PointSerializer;
import apoc.export.util.TemporalSerializer;
import apoc.result.ProgressInfo;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
Expand Down
262 changes: 131 additions & 131 deletions common/src/main/resources/movies.cypher

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions core/src/main/java/apoc/cypher/Timeboxed.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import apoc.Pools;
import apoc.result.MapResult;
import apoc.util.Util;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
Expand All @@ -11,6 +12,7 @@
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -36,6 +38,9 @@ public class Timeboxed {
@Context
public Pools pools;

@Context
public TerminationGuard terminationGuard;

private final static Map<String,Object> POISON = Collections.singletonMap("__magic", "POISON");

@Procedure("apoc.cypher.runTimeboxed")
Expand All @@ -52,6 +57,12 @@ public Stream<MapResult> runTimeboxed(@Name("statement") String cypher, @Name("p
txAtomic.set(innerTx);
Result result = innerTx.execute(cypher, params == null ? Collections.EMPTY_MAP : params);
while (result.hasNext()) {
if (Util.transactionIsTerminated(terminationGuard)) {
txAtomic.get().close();
offerToQueue(queue, POISON, timeout);
return;
}

final Map<String, Object> map = result.next();
offerToQueue(queue, map, timeout);
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/apoc/export/csv/CsvFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.procedure.TerminationGuard;

import java.io.IOException;
import java.io.PrintWriter;
Expand Down Expand Up @@ -253,6 +253,9 @@ private void writeRow(ExportConfig config, ExportFileManager writer, Set<String>
} else {
csvWriter.writeNext(headerNode.toArray(new String[headerNode.size()]), false);
}

// terminationGuard.check();
// todo - here...
rows.forEach(row -> csvWriter.writeNext(row.toArray(new String[row.size()]), false));
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/apoc/load/LoadJson.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import apoc.result.ObjectResult;
import apoc.util.CompressionAlgo;
import apoc.util.JsonUtil;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

import java.util.Collections;
import java.util.List;
Expand All @@ -17,6 +19,9 @@
import static apoc.util.CompressionConfig.COMPRESSION;

public class LoadJson {

@Context
public TerminationGuard terminationGuard;

@SuppressWarnings("unchecked")
@Procedure("apoc.load.jsonArray")
Expand Down Expand Up @@ -49,7 +54,7 @@ public Stream<MapResult> jsonParams(@Name("urlOrKeyOrBinary") Object urlOrKeyOrB
boolean failOnError = (boolean) config.getOrDefault("failOnError", true);
String compressionAlgo = (String) config.getOrDefault(COMPRESSION, CompressionAlgo.NONE.name());
List<String> pathOptions = (List<String>) config.get("pathOptions");
return loadJsonStream(urlOrKeyOrBinary, headers, payload, path, failOnError, compressionAlgo, pathOptions);
return loadJsonStream(urlOrKeyOrBinary, headers, payload, path, failOnError, compressionAlgo, pathOptions, terminationGuard);
}

}
6 changes: 6 additions & 0 deletions core/src/main/java/apoc/load/Xml.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import apoc.util.CompressionAlgo;
import apoc.util.CompressionConfig;
import apoc.util.FileUtils;
import apoc.util.Util;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.graphdb.Label;
Expand All @@ -19,6 +20,7 @@
import org.neo4j.procedure.Mode;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;
import org.neo4j.procedure.UserFunction;
import org.w3c.dom.CharacterData;
import org.w3c.dom.Document;
Expand Down Expand Up @@ -82,6 +84,9 @@ public class Xml {
@Context
public Log log;

@Context
public TerminationGuard terminationGuard;

@Procedure("apoc.load.xml")
@Description("Loads a single nested map from an XML URL (e.g. web-API).")
public Stream<MapResult> xml(@Name("urlOrBinary") Object urlOrBinary, @Name(value = "path", defaultValue = "/") String path, @Name(value = "config",defaultValue = "{}") Map<String, Object> config, @Name(value = "simple", defaultValue = "false") boolean simpleMode) throws Exception {
Expand Down Expand Up @@ -176,6 +181,7 @@ private XMLStreamReader getXMLStreamReader(Object urlOrBinary, XmlImportConfig c
}

private void handleNode(Deque<Map<String, Object>> stack, Node node, boolean simpleMode) {
terminationGuard.check();

// Handle document node
if (node.getNodeType() == Node.DOCUMENT_NODE) {
Expand Down
40 changes: 40 additions & 0 deletions core/src/test/java/apoc/cypher/CypherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import static apoc.util.TestUtil.testCallEmpty;
import static apoc.util.TestUtil.testFail;
import static apoc.util.TestUtil.testResult;
import static apoc.util.TransactionTestUtil.checkTerminationGuard;
import static apoc.util.TransactionTestUtil.checkTransactionNotInList;
import static apoc.util.TransactionTestUtil.terminateTransactionAsync;
import static apoc.util.Util.map;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
Expand Down Expand Up @@ -158,6 +161,43 @@ public void testWithTimeout() {
result -> result.hasNext()));
}

@Test
public void testWithTermination() {
final String query = "CALL apoc.cypher.runTimeboxed('unwind range (0, 10) as id CALL apoc.util.sleep(2000) return 0', null, 20000)";
checkTerminationGuard(db, query);
}

@Test
public void testWithTerminationInnerTransaction() {
final String innerLongQuery = "CALL apoc.util.sleep(10999) RETURN 0";
final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, 99999)";

terminateTransactionAsync(db, innerLongQuery);

final long l = System.currentTimeMillis();

// assert query terminated (RETURN 0)
TestUtil.testCall(db, query,
Map.of("innerQuery", innerLongQuery),
row -> assertEquals(Map.of("0", 0L), row.get("value")));

final long l1 = System.currentTimeMillis() - l;
System.out.println("l - System.currentTimeMillis() = " + l1);
checkTransactionNotInList(db, query);
}

@Test
public void testDoItWithTermination() {
final String query = "CALL apoc.cypher.run('unwind range (0, 99) as id CALL apoc.util.sleep(2000) return 0', {})";
checkTerminationGuard(db, query);

final String query2 = "CALL apoc.cypher.doIt('CALL apoc.util.sleep(20000) return 0', {})";
checkTerminationGuard(db, query2);

final String query3 = "CALL apoc.cypher.doIt('unwind range (0, 9999) as id CREATE (n:Test) return n', {})";
checkTerminationGuard(db, query3);
}

@Test
public void testRunMany() {
final Map<String, Object> map = map("name", "John", "name2", "Doe");
Expand Down
112 changes: 112 additions & 0 deletions core/src/test/java/apoc/export/BigGraphTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package apoc.export;

import apoc.export.csv.ExportCSV;
import apoc.export.csv.ImportCsv;
import apoc.export.cypher.ExportCypher;
import apoc.export.graphml.ExportGraphML;
import apoc.export.json.ExportJson;
import apoc.graph.Graphs;
import apoc.meta.Meta;
import apoc.refactor.GraphRefactoring;
import apoc.refactor.rename.Rename;
import apoc.util.TestUtil;
import apoc.util.Util;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.neo4j.configuration.GraphDatabaseSettings;
import org.neo4j.graphdb.Node;
import org.neo4j.test.rule.DbmsRule;
import org.neo4j.test.rule.ImpermanentDbmsRule;

import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

import static apoc.ApocConfig.APOC_EXPORT_FILE_ENABLED;
import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED;
import static apoc.ApocConfig.apocConfig;
import static apoc.util.TransactionTestUtil.checkTerminationGuard;
import static org.neo4j.configuration.GraphDatabaseSettings.TransactionStateMemoryAllocation.OFF_HEAP;
import static org.neo4j.configuration.SettingValueParsers.BYTES;

public class BigGraphTest {
private static File directory = new File("target/import");
static { //noinspection ResultOfMethodCallIgnored
directory.mkdirs();
}

@ClassRule
public static DbmsRule db = new ImpermanentDbmsRule()
.withSetting(GraphDatabaseSettings.memory_tracking, true)
.withSetting(GraphDatabaseSettings.tx_state_memory_allocation, OFF_HEAP)
.withSetting(GraphDatabaseSettings.tx_state_max_off_heap_memory, BYTES.parse("500m"))
.withSetting(GraphDatabaseSettings.load_csv_file_url_root, directory.toPath().toAbsolutePath());

@BeforeClass
public static void setUp() throws Exception {
TestUtil.registerProcedure(db, Rename.class, ExportCSV.class, ExportJson.class, ExportCypher.class, ExportGraphML.class, Graphs.class, Meta.class, ImportCsv.class, GraphRefactoring.class);
apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, true);
apocConfig().setProperty(APOC_EXPORT_FILE_ENABLED, true);

final String query = Util.readResourceFile("movies.cypher");
IntStream.range(0, 20000).forEach(__-> db.executeTransactionally(query));
}

@Test
public void testTerminateExportCsv() {
checkTerminationGuard(db, "CALL apoc.export.csv.all('testTerminate.csv',{})");
}

@Test
public void testTerminateExportGraphMl() {
checkTerminationGuard(db, "CALL apoc.export.graphml.all('testTerminate.graphml', null)");
}

@Test
public void testTerminateExportCypher() {
checkTerminationGuard(db, "CALL apoc.export.cypher.all('testTerminate.cypher',{})");
}

@Test
public void testTerminateExportJson() {
checkTerminationGuard(db, "CALL apoc.export.json.all('testTerminate.json',{})");
}

@Test
public void testTerminateRenameNodeProp() {
checkTerminationGuard(db, "CALL apoc.refactor.rename.nodeProperty('name', 'nameTwo')");
}

@Test
public void testTerminateRenameTypeProp() {
checkTerminationGuard(db, "CALL apoc.refactor.rename.typeProperty('roles', 'rolesTwo')");
}

@Test
public void testTerminateRenameType() {
checkTerminationGuard(db, "CALL apoc.refactor.rename.type('DIRECTED', 'DIRECTED_TWO')");
}

@Test
public void testTerminateRenameLabel() {
checkTerminationGuard(db, "CALL apoc.refactor.rename.label('Other', 'OtherTwo')");
}

@Test
public void testTerminateRefactorProcs() {
List<Node> nodes = db.executeTransactionally("MATCH (n:Person) RETURN collect(n) as nodes", Collections.emptyMap(),
r -> r.<List<Node>>columnAs("nodes").next());

checkTerminationGuard(db, "CALL apoc.refactor.cloneNodes($nodes)",
Map.of("nodes", nodes));

checkTerminationGuard(db, "CALL apoc.refactor.cloneSubgraph($nodes)",
Map.of("nodes", nodes));

db.executeTransactionally("CREATE CONSTRAINT FOR (n:BornLabel) REQUIRE n.targetKey IS UNIQUE");
checkTerminationGuard(db, "CALL apoc.refactor.categorize('id', 'SOMETHING', true, 'BornLabel', 'targetKey', [], 1)");
}
}
27 changes: 27 additions & 0 deletions core/src/test/java/apoc/export/csv/ExportCsvTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.IntStream;

import static apoc.ApocConfig.APOC_EXPORT_FILE_ENABLED;
import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED;
Expand All @@ -34,7 +35,9 @@
import static apoc.util.CompressionAlgo.GZIP;
import static apoc.util.CompressionAlgo.NONE;
import static apoc.util.MapUtil.map;
import static apoc.util.TestUtil.testCall;
import static apoc.util.TestUtil.testResult;
import static apoc.util.TransactionTestUtil.checkTerminationGuard;
import static java.nio.charset.StandardCharsets.UTF_8;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertArrayEquals;
Expand Down Expand Up @@ -142,6 +145,30 @@ private String readFile(String fileName) {
private String readFile(String fileName, Charset charset, CompressionAlgo compression) {
return BinaryTestUtil.readFileToString(new File(directory, fileName), charset, compression);
}

// todo - db.executeTransactionally(Util.readResourceFile("movies.cypher")); - basta questo?
@Test
public void testExportInvalidQuoteValue1() throws Exception {
final String query = Util.readResourceFile("movies.cypher");
IntStream.range(0, 3999).forEach(__-> db.executeTransactionally(query));
String fileName = "allEEEE.csv";
System.out.println("im here");
final long l = System.currentTimeMillis();
testCall(db, "CALL apoc.export.csv.all($file,{})", Map.of("file", fileName), r -> {
System.out.println("r" + r.values());
});
System.out.println("time=" + (System.currentTimeMillis() - l));
// checkTerminationGuard(db, "CALL apoc.export.csv.all($file,{})", Map.of("file", fileName));
// try {
// TestUtil.testCall(db, "CALL apoc.export.csv.all($file,{})",
// map("file", fileName),
// (r) -> assertResults(fileName, r, "database"));
// fail();
// } catch (RuntimeException e) {
// final String expectedMessage = "Failed to invoke procedure `apoc.export.csv.all`: Caused by: java.lang.RuntimeException: The string value of the field quote is not valid";
// assertEquals(expectedMessage, e.getMessage());
// }
}

@Test
public void testExportInvalidQuoteValue() {
Expand Down
Loading

0 comments on commit d5ee589

Please sign in to comment.