diff --git a/core/src/main/java/apoc/export/graphml/XmlGraphMLReader.java b/core/src/main/java/apoc/export/graphml/XmlGraphMLReader.java index 5cf075bc4b..13765d359e 100644 --- a/core/src/main/java/apoc/export/graphml/XmlGraphMLReader.java +++ b/core/src/main/java/apoc/export/graphml/XmlGraphMLReader.java @@ -18,6 +18,8 @@ */ package apoc.export.graphml; +import static apoc.util.ConvertUtil.toValidValue; + import apoc.export.util.BatchTransaction; import apoc.export.util.ExportConfig; import apoc.export.util.Reporter; @@ -218,13 +220,39 @@ public Object parseValue(String input) { public static final QName TYPE = QName.valueOf("attr.type"); public static final QName LIST = QName.valueOf("attr.list"); public static final QName KEY = QName.valueOf("key"); + public static final QName VALUE = QName.valueOf("value"); + public static final QName DATA_TYPE = QName.valueOf("type"); + public static final QName KIND = QName.valueOf("kind"); public XmlGraphMLReader(GraphDatabaseService db, Transaction tx) { this.db = db; this.tx = tx; } + public enum ReaderType { + GRAPHML("data", KEY, LABEL, LABELS), + GEXF("attvalue", FOR, KIND, LABEL); + + public String data; + public QName key; + public QName label; + public QName labels; + + ReaderType(String data, QName key, QName label, QName labels) { + this.data = data; + this.key = key; + this.label = label; + this.labels = labels; + } + } + public long parseXML(Reader input, TerminationGuard terminationGuard) throws XMLStreamException { + return parseXML(input, terminationGuard, ReaderType.GRAPHML); + } + + public long parseXML(Reader input, TerminationGuard terminationGuard, ReaderType readerType) + throws XMLStreamException { + Map dataMap = new HashMap<>(); Map cache = new HashMap<>(1024 * 32); XMLInputFactory inputFactory = XMLInputFactory.newInstance(); inputFactory.setProperty("javax.xml.stream.isCoalescing", true); @@ -238,7 +266,6 @@ public long parseXML(Reader input, TerminationGuard terminationGuard) throws XML int count = 0; BatchTransaction tx = new BatchTransaction(db, batchSize * 10, reporter); try { - while (reader.hasNext()) { terminationGuard.check(); XMLEvent event; @@ -257,11 +284,15 @@ public long parseXML(Reader input, TerminationGuard terminationGuard) throws XML continue; } if (event.isStartElement()) { - StartElement element = event.asStartElement(); String name = element.getName().getLocalPart(); - - if (name.equals("graphml") || name.equals("graph")) continue; + boolean isNameGexf = readerType.equals(ReaderType.GEXF) && name.equals("gexf"); + if (name.equals("graphml") || name.equals("graph") || isNameGexf) continue; + if (readerType.equals(ReaderType.GEXF) && name.equals("attribute")) { + String id = getAttribute(element, ID); + String type = getAttribute(element, DATA_TYPE); + dataMap.put(id, type); + } if (name.equals("key")) { String id = getAttribute(element, ID); Key key = new Key( @@ -284,19 +315,24 @@ public long parseXML(Reader input, TerminationGuard terminationGuard) throws XML else relKeys.put(id, key); continue; } - if (name.equals("data")) { + if (name.equals(readerType.data)) { if (last == null) continue; - String id = getAttribute(element, KEY); + String id = getAttribute(element, readerType.key); boolean isNode = last instanceof Node; Key key = isNode ? nodeKeys.get(id) : relKeys.get(id); if (key == null) key = Key.defaultKey(id, isNode); final Map.Entry eventEntry = getDataEventEntry(reader, key); final XMLEvent next = eventEntry.getKey(); - final Object value = eventEntry.getValue(); + Object value = readerType.equals(ReaderType.GRAPHML) + ? eventEntry.getValue() + : getAttribute(element, VALUE); if (value != null) { if (this.labels && isNode && id.equals("labels")) { addLabels((Node) last, value.toString()); } else if (!this.labels || isNode || !id.equals("label")) { + value = readerType.equals(ReaderType.GRAPHML) + ? value + : toValidValue(value, key.name, dataMap); last.setProperty(key.name, value); if (reporter != null) reporter.update(0, 0, 1); } @@ -311,7 +347,7 @@ public long parseXML(Reader input, TerminationGuard terminationGuard) throws XML String id = getAttribute(element, ID); Node node = tx.getTransaction().createNode(); if (this.labels) { - String labels = getAttribute(element, LABELS); + String labels = getAttribute(element, readerType.labels); addLabels(node, labels); } if (storeNodeIds) node.setProperty("id", id); @@ -324,7 +360,7 @@ public long parseXML(Reader input, TerminationGuard terminationGuard) throws XML } if (name.equals("edge")) { tx.increment(); - String label = getAttribute(element, LABEL); + String label = getAttribute(element, readerType.label); Node from = getByNodeId(cache, tx.getTransaction(), element, XmlNodeExport.NodeType.SOURCE); Node to = getByNodeId(cache, tx.getTransaction(), element, XmlNodeExport.NodeType.TARGET); diff --git a/core/src/main/java/apoc/load/Xml.java b/core/src/main/java/apoc/load/Xml.java index 8f8d10d6da..a34bf33f17 100644 --- a/core/src/main/java/apoc/load/Xml.java +++ b/core/src/main/java/apoc/load/Xml.java @@ -115,7 +115,7 @@ public Stream xml( @Name(value = "config", defaultValue = "{}") Map config, @Name(value = "simple", defaultValue = "false") boolean simpleMode) throws Exception { - return xmlXpathToMapResult(urlOrBinary, simpleMode, path, config); + return xmlXpathToMapResult(urlOrBinary, simpleMode, path, config, terminationGuard); } @UserFunction("apoc.xml.parse") @@ -128,14 +128,23 @@ public Map parse( throws Exception { if (config == null) config = Collections.emptyMap(); boolean failOnError = (boolean) config.getOrDefault("failOnError", true); - return parse(new ByteArrayInputStream(data.getBytes(Charset.forName("UTF-8"))), simpleMode, path, failOnError) + return parse( + new ByteArrayInputStream(data.getBytes(Charset.forName("UTF-8"))), + simpleMode, + path, + failOnError, + terminationGuard) .map(mr -> mr.value) .findFirst() .orElse(null); } - private Stream xmlXpathToMapResult( - @Name("urlOrBinary") Object urlOrBinary, boolean simpleMode, String path, Map config) + public static Stream xmlXpathToMapResult( + @Name("urlOrBinary") Object urlOrBinary, + boolean simpleMode, + String path, + Map config, + TerminationGuard terminationGuard) throws Exception { if (config == null) config = Collections.emptyMap(); boolean failOnError = (boolean) config.getOrDefault("failOnError", true); @@ -143,14 +152,15 @@ private Stream xmlXpathToMapResult( Map headers = (Map) config.getOrDefault("headers", Collections.emptyMap()); CountingInputStream is = FileUtils.inputStreamFor( urlOrBinary, headers, null, (String) config.getOrDefault(COMPRESSION, CompressionAlgo.NONE.name())); - return parse(is, simpleMode, path, failOnError); + return parse(is, simpleMode, path, failOnError, terminationGuard); } catch (Exception e) { if (!failOnError) return Stream.of(new MapResult(Collections.emptyMap())); else throw e; } } - private Stream parse(InputStream data, boolean simpleMode, String path, boolean failOnError) + public static Stream parse( + InputStream data, boolean simpleMode, String path, boolean failOnError, TerminationGuard terminationGuard) throws Exception { List result = new ArrayList<>(); try { @@ -173,7 +183,7 @@ private Stream parse(InputStream data, boolean simpleMode, String pat for (int i = 0; i < nodeList.getLength(); i++) { final Deque> stack = new LinkedList<>(); - handleNode(stack, nodeList.item(i), simpleMode); + handleNode(stack, nodeList.item(i), simpleMode, terminationGuard); for (int index = 0; index < stack.size(); index++) { result.add(new MapResult(stack.pollFirst())); } @@ -223,7 +233,8 @@ private boolean proceedReader(XMLStreamReader reader) throws XMLStreamException } } - private void handleNode(Deque> stack, Node node, boolean simpleMode) { + private static void handleNode( + Deque> stack, Node node, boolean simpleMode, TerminationGuard terminationGuard) { terminationGuard.check(); // Handle document node @@ -231,7 +242,7 @@ private void handleNode(Deque> stack, Node node, boolean sim NodeList children = node.getChildNodes(); for (int i = 0; i < children.getLength(); i++) { if (children.item(i).getLocalName() != null) { - handleNode(stack, children.item(i), simpleMode); + handleNode(stack, children.item(i), simpleMode, terminationGuard); return; } } @@ -248,7 +259,7 @@ private void handleNode(Deque> stack, Node node, boolean sim // This is to deal with text between xml tags for example new line characters if (child.getNodeType() != Node.TEXT_NODE && child.getNodeType() != Node.CDATA_SECTION_NODE) { - handleNode(stack, child, simpleMode); + handleNode(stack, child, simpleMode, terminationGuard); count++; } else { // Deal with text nodes @@ -290,7 +301,7 @@ private void handleNode(Deque> stack, Node node, boolean sim * @param node * @param elementMap */ - private void handleTypeAndAttributes(Node node, Map elementMap) { + private static void handleTypeAndAttributes(Node node, Map elementMap) { // Set type if (node.getLocalName() != null) { elementMap.put("_type", node.getLocalName()); @@ -312,7 +323,7 @@ private void handleTypeAndAttributes(Node node, Map elementMap) * @param node * @param elementMap */ - private void handleTextNode(Node node, Map elementMap) { + private static void handleTextNode(Node node, Map elementMap) { Object text = ""; int nodeType = node.getNodeType(); switch (nodeType) { @@ -344,7 +355,7 @@ private void handleTextNode(Node node, Map elementMap) { * @param text * @return */ - private String normalizeText(String text) { + private static String normalizeText(String text) { String[] tokens = StringUtils.split(text, "\n"); for (int i = 0; i < tokens.length; i++) { tokens[i] = tokens[i].trim(); @@ -682,7 +693,7 @@ private void setPropertyIfNotNull(org.neo4j.graphdb.Node root, String propertyKe } } - private RuntimeException generateXmlDoctypeException() { + private static RuntimeException generateXmlDoctypeException() { throw new RuntimeException("XML documents with a DOCTYPE are not allowed."); } } diff --git a/core/src/main/java/apoc/util/ConvertUtil.java b/core/src/main/java/apoc/util/ConvertUtil.java new file mode 100644 index 0000000000..6e1d533a6c --- /dev/null +++ b/core/src/main/java/apoc/util/ConvertUtil.java @@ -0,0 +1,259 @@ +package apoc.util; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.json.JsonWriteFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.neo4j.exceptions.Neo4jException; +import org.neo4j.values.storable.DateTimeValue; +import org.neo4j.values.storable.DateValue; +import org.neo4j.values.storable.DurationValue; +import org.neo4j.values.storable.LocalDateTimeValue; +import org.neo4j.values.storable.LocalTimeValue; +import org.neo4j.values.storable.PointValue; +import org.neo4j.values.storable.TimeValue; +import org.neo4j.values.storable.Values; + +public class ConvertUtil { + public static Object toValidValue(Object object, String field, Map mapping) { + Object fieldName = mapping.get(field); + if (object != null && fieldName != null) { + return convertValue(object.toString(), fieldName.toString()); + } + + if (object instanceof Collection) { + // if there isn't a mapping config, we convert the list to a String[] + List list = ((Collection) object) + .stream().map(i -> toValidValue(i, field, mapping)).collect(Collectors.toList()); + + try { + return list.toArray(new String[0]); + } catch (ArrayStoreException e) { + return list.toArray(new Object[0]); + } + } + if (object instanceof Map) { + return ((Map) object) + .entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, e -> toValidValue(e.getValue(), field, mapping))); + } + return getNeo4jValue(object); + } + + private static Object getNeo4jValue(Object object) { + try { + // we test if is a valid Neo4j type + Values.of(object); + return object; + } catch (Exception e) { + // otherwise we try to coerce it + return object.toString(); + } + } + + /** + * In case of complex type non-readable from Parquet, i.e. Duration, Point, List of Neo4j Types... + * we can use the `mapping: {keyToConvert: valueTypeName}` config to convert them. + * For example `mapping: {myPropertyKey: "DateArray"}` + */ + private static Object convertValue(String value, String typeName) { + typeName = typeName.toLowerCase(); // Suitable to work with Parquet/Arrow/Gexf + switch (typeName) { + // {"crs":"wgs-84-3d","latitude":13.1,"longitude":33.46789,"height":100.0} + case "point": + return getPointValue(value); + case "localdatetime": + return LocalDateTimeValue.parse(value).asObjectCopy(); + case "localtime": + return LocalTimeValue.parse(value).asObjectCopy(); + case "datetime": + return DateTimeValue.parse(value, () -> ZoneId.of("Z")).asObjectCopy(); + case "time": + return TimeValue.parse(value, () -> ZoneId.of("Z")).asObjectCopy(); + case "date": + return DateValue.parse(value).asObjectCopy(); + case "duration": + return DurationValue.parse(value); + case "boolean": + return Boolean.parseBoolean(value); + case "char": + return value.charAt(0); + case "byte": + return value.getBytes(); + case "double": + return Double.parseDouble(value); + case "float": + return Float.parseFloat(value); + case "short": + return Short.parseShort(value); + case "int": + case "integer": + return Integer.parseInt(value); + case "long": + return Long.parseLong(value); + case "node": + case "relationship": + return JsonUtil.parse(value, null, Map.class); + case "no_value": + case "NO_VALUE": + return null; + case "listboolean": + value = StringUtils.removeStart(value, "["); + value = StringUtils.removeEnd(value, "]"); + String dataType = typeName.replace("array", "").replace("list", ""); + + final Object[] arr = getPrototypeFor(dataType); + return Arrays.stream(value.split(",")) + .map(item -> convertValue(StringUtils.trim(item), dataType)) + .collect(Collectors.toList()) + .toArray(arr); + default: + // If ends with "Array", for example StringArray + if (typeName.endsWith("array") || typeName.startsWith("list")) { + value = StringUtils.removeStart(value, "["); + value = StringUtils.removeEnd(value, "]"); + String array = typeName.replace("array", "").replace("list", ""); + + final Object[] prototype = getPrototypeFor(array); + return Arrays.stream(value.split(",")) + .map(item -> convertValue(StringUtils.trim(item), array)) + .collect(Collectors.toList()) + .toArray(prototype); + } + return value; + } + } + + private static PointValue getPointValue(String value) { + try { + return PointValue.parse(value); + } catch (Neo4jException e) { + // fallback in case of double-quotes, e.g. + // {"crs":"wgs-84-3d","latitude":13.1,"longitude":33.46789,"height":100.0} + // we remove the double quotes before parsing the result, e.g. + // {crs:"wgs-84-3d",latitude:13.1,longitude:33.46789,height:100.0} + ObjectMapper objectMapper = new ObjectMapper().disable(JsonWriteFeature.QUOTE_FIELD_NAMES.mappedFeature()); + try { + Map readValue = objectMapper.readValue(value, Map.class); + String stringWithoutKeyQuotes = objectMapper.writeValueAsString(readValue); + return PointValue.parse(stringWithoutKeyQuotes); + } catch (JsonProcessingException ex) { + throw new RuntimeException(ex); + } + } + } + + // similar to CsvPropertyConverter + public static Object[] getPrototypeFor(String type) { + type = type.toLowerCase(); // Suitable to work with Parquet/Arrow/Gexf + switch (type) { + case "long": + return new Long[] {}; + case "integer": + return new Integer[] {}; + case "double": + return new Double[] {}; + case "float": + return new Float[] {}; + case "boolean": + return new Boolean[] {}; + case "byte": + return new Byte[] {}; + case "short": + return new Short[] {}; + case "char": + return new Character[] {}; + case "string": + return new String[] {}; + case "datetime": + return new ZonedDateTime[] {}; + case "localtime": + return new LocalTime[] {}; + case "localdatetime": + return new LocalDateTime[] {}; + case "point": + return new PointValue[] {}; + case "time": + return new OffsetTime[] {}; + case "date": + return new LocalDate[] {}; + case "duration": + return new DurationValue[] {}; + default: + throw new IllegalStateException("Type " + type + " not supported."); + } + } + + public static T withBackOffRetries( + Supplier func, + boolean retry, + int backoffRetry, + boolean exponential, + Consumer exceptionHandler) { + T result; + backoffRetry = backoffRetry < 1 ? 5 : backoffRetry; + int countDown = backoffRetry; + exceptionHandler = Objects.requireNonNullElse(exceptionHandler, exe -> {}); + while (true) { + try { + result = func.get(); + break; + } catch (Exception e) { + if (!retry || countDown < 1) throw e; + exceptionHandler.accept(e); + countDown--; + long delay = getDelay(backoffRetry, countDown, exponential); + backoffSleep(delay); + } + } + return result; + } + + private static void backoffSleep(long millis) { + sleep(millis, "Operation interrupted during backoff"); + } + + public static void sleep(long millis, String interruptedMessage) { + try { + Thread.sleep(millis); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(interruptedMessage, ie); + } + } + + private static long getDelay(int backoffRetry, int countDown, boolean exponential) { + int backOffTime = backoffRetry - countDown; + long sleepMultiplier = exponential + ? (long) Math.pow(2, backOffTime) + : // Exponential retry progression + backOffTime; // Linear retry progression + return Math.min( + Duration.ofSeconds(1).multipliedBy(sleepMultiplier).toMillis(), + Duration.ofSeconds(30).toMillis() // Max 30s + ); + } + + public static String joinStringLabels(Collection labels) { + return CollectionUtils.isNotEmpty(labels) + ? ":" + labels.stream().map(Util::quote).collect(Collectors.joining(":")) + : ""; + } +} diff --git a/docs/asciidoc/modules/ROOT/nav.adoc b/docs/asciidoc/modules/ROOT/nav.adoc index 479f13bec5..049baa557f 100644 --- a/docs/asciidoc/modules/ROOT/nav.adoc +++ b/docs/asciidoc/modules/ROOT/nav.adoc @@ -32,6 +32,7 @@ include::partial$generated-documentation/nav.adoc[] ** xref::import/import-csv.adoc[] ** xref::import/arrow.adoc[] ** xref::import/import-graphml.adoc[] + ** xref::import/gexf.adoc[] * xref:export/index.adoc[] ** xref::export/web-apis.adoc[] diff --git a/docs/asciidoc/modules/ROOT/pages/import/gexf.adoc b/docs/asciidoc/modules/ROOT/pages/import/gexf.adoc new file mode 100644 index 0000000000..ec06ff1c6a --- /dev/null +++ b/docs/asciidoc/modules/ROOT/pages/import/gexf.adoc @@ -0,0 +1,222 @@ +[[gexf]] += Load GEXF (Graph Exchange XML Format) +:description: This section describes procedures that can be used to import data from GEXF files. + + + +Many existing applications and data integrations use GEXF to describes a graph with nodes and edges. +For further information, you should visit the https://gexf.net/[official documentation]. + +It is possible to load or import nodes and relationship from a GEXF file with the procedures + `apoc.load.gexf` and `apoc.import.gexf`. You need to: + +* provide a path to a GEXF file +* provide configuration (optional) + +The `apoc.import.gexf` read as the `apoc.load.gexf` but also create nodes and relationships in Neo4j. + +For reading from files you'll have to enable the config option: + +---- +apoc.import.file.enabled=true +---- + +By default file paths are global, for paths relative to the `import` directory set: + +---- +apoc.import.file.use_neo4j_config=true +---- + +== Examples for apoc.load.gexf + +.load.gexf +---- + + + + + + + + + + + + +---- + +[source, cypher] +---- +CALL apoc.load.gexf('load.gexf') +---- + +.Results +[opts="header"] +|=== +| value +| {_type: gexf, _children: [{_type: graph, defaultedgetype: directed, _children: [{_type: nodes, _children: [{_type: node, _children: [{_type: attvalues, _children: [{_type: attvalue, for: 0, value: http://gephi.org}]}], foo: bar}]}]}], version: 1.2} +|=== + +== Examples for apoc.import.gexf + +Besides the file you can pass in a config map: + +.Config parameters +[opts=header] +|=== +| name | type | default | description +| readLabels | Boolean | false | Creates node labels based on the value in the `labels` property of `node` elements +| defaultRelationshipType | String | RELATED | The default relationship type to use if none is specified in the GraphML file +| storeNodeIds | Boolean | false | store the `id` property of `node` elements +| batchSize | Integer | 20000 | The number of elements to process per transaction +| compression | `Enum[NONE, BYTES, GZIP, BZIP2, DEFLATE, BLOCK_LZ4, FRAMED_SNAPPY]` | `null` | Allow taking binary data, either not compressed (value: `NONE`) or compressed (other values) +| source | Map | Empty map | See `source / target config` parameter below +| target | Map | Empty map | See `source / target config` parameter below +See the xref::overview/apoc.load/apoc.load.csv.adoc#_binary_file[Binary file example] +|=== + + +With the following file will be created: + +* 1 node with label Gephi +* 2 nodes with label Webatlas +* 1 node with label RTGI +* 1 node with label BarabasiLab +* 6 relationships of kind KNOWS +* 1 relationship of kind HAS_TICKET +* 1 relationship of kind BAZ + +.data.gexf +---- + + + + Gephi.org + A Web network + + + + + + + + + + + + true + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +---- + +[source, cypher] +---- +CALL apoc.import.gexf('data.gexf', {readLabels:true}) +---- + +.Results +[opts="header"] +|=== +| value +| { +"relationships" : 8, +"batches" : 0, +"file" : "file:/../data.gexf", +"nodes" : 5, +"format" : "gexf", +"source" : "file", +"time" : 9736, +"rows" : 0, +"batchSize" : -1, +"done" : true, +"properties" : 21 +} +|=== + +We can also store the node IDs by executing: +[source, cypher] +---- +CALL apoc.import.gexf('data.gexf', {readLabels:true, storeNodeIds: true}) +---- + +=== source / target config + +Allows the import of relations in case the source and / or target nodes are not present in the file, searching for nodes via a custom label and property. +To do this, we can insert into the config map `source: {label: '', id: `''`}` and/or `source: {label: '', id: `''`}` +In this way, we can search start and end nodes via the source and end attribute of `edge` tag. + +For example, with a config map `{source: {id: 'myId', label: 'Foo'}, target: {id: 'other', label: 'Bar'}}` +with a edge row like `KNOWS` +we search a source node `(:Foo {myId: 'n0'})` and an end node `(:Bar {other: 'n1'})`. +The id key is optional (the default is `'id'`). + + + + diff --git a/docs/asciidoc/modules/ROOT/pages/import/index.adoc b/docs/asciidoc/modules/ROOT/pages/import/index.adoc index 07560c3881..c0af3eb013 100644 --- a/docs/asciidoc/modules/ROOT/pages/import/index.adoc +++ b/docs/asciidoc/modules/ROOT/pages/import/index.adoc @@ -20,4 +20,5 @@ For more information on these procedures, see: * xref::import/load-html.adoc[] * xref::import/import-csv.adoc[] * xref::import/import-graphml.adoc[] +* xref::import/gexf.adoc[] * xref::import/arrow.adoc[] \ No newline at end of file diff --git a/full/src/main/java/apoc/export/arrow/ImportArrow.java b/full/src/main/java/apoc/export/arrow/ImportArrow.java index c3bad891be..e3143daa4d 100644 --- a/full/src/main/java/apoc/export/arrow/ImportArrow.java +++ b/full/src/main/java/apoc/export/arrow/ImportArrow.java @@ -5,6 +5,7 @@ import static apoc.export.arrow.ArrowUtils.FIELD_SOURCE_ID; import static apoc.export.arrow.ArrowUtils.FIELD_TARGET_ID; import static apoc.export.arrow.ArrowUtils.FIELD_TYPE; +import static apoc.util.ConvertUtil.toValidValue; import apoc.Extended; import apoc.Pools; @@ -12,20 +13,10 @@ import apoc.export.util.ProgressReporter; import apoc.result.ProgressInfo; import apoc.util.FileUtils; -import apoc.util.JsonUtil; import apoc.util.Util; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.json.JsonWriteFeature; -import com.fasterxml.jackson.databind.ObjectMapper; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.channels.SeekableByteChannel; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetTime; -import java.time.ZoneId; -import java.time.ZonedDateTime; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -33,7 +24,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BitVector; @@ -42,7 +32,6 @@ import org.apache.arrow.vector.ipc.ArrowFileReader; import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.ipc.ArrowStreamReader; -import org.apache.commons.lang3.StringUtils; import org.neo4j.graphdb.Entity; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.Label; @@ -54,14 +43,6 @@ import org.neo4j.procedure.Mode; import org.neo4j.procedure.Name; import org.neo4j.procedure.Procedure; -import org.neo4j.values.storable.DateTimeValue; -import org.neo4j.values.storable.DateValue; -import org.neo4j.values.storable.DurationValue; -import org.neo4j.values.storable.LocalDateTimeValue; -import org.neo4j.values.storable.LocalTimeValue; -import org.neo4j.values.storable.PointValue; -import org.neo4j.values.storable.TimeValue; -import org.neo4j.values.storable.Values; @Extended public class ImportArrow { @@ -230,150 +211,4 @@ public Map getMapping() { return mapping; } } - - public static Object toValidValue(Object object, String field, Map mapping) { - Object fieldName = mapping.get(field); - if (object != null && fieldName != null) { - return convertValue(object.toString(), fieldName.toString()); - } - - if (object instanceof Collection) { - // if there isn't a mapping config, we convert the list to a String[] - return ((Collection) object) - .stream() - .map(i -> toValidValue(i, field, mapping)) - .collect(Collectors.toList()) - .toArray(new String[0]); - } - if (object instanceof Map) { - return ((Map) object) - .entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, e -> toValidValue(e.getValue(), field, mapping))); - } - try { - // we test if is a valid Neo4j type - Values.of(object); - return object; - } catch (Exception e) { - // otherwise we try to coerce it - return object.toString(); - } - } - - /** - * In case of complex type non-readable from Parquet, i.e. Duration, Point, List of Neo4j Types... - * we can use the `mapping: {keyToConvert: valueTypeName}` config to convert them. - * For example `mapping: {myPropertyKey: "DateArray"}` - */ - private static Object convertValue(String value, String typeName) { - switch (typeName) { - case "Point": - return getPointValue(value); - case "LocalDateTime": - return LocalDateTimeValue.parse(value).asObjectCopy(); - case "LocalTime": - return LocalTimeValue.parse(value).asObjectCopy(); - case "DateTime": - return DateTimeValue.parse(value, () -> ZoneId.of("Z")).asObjectCopy(); - case "Time": - return TimeValue.parse(value, () -> ZoneId.of("Z")).asObjectCopy(); - case "Date": - return DateValue.parse(value).asObjectCopy(); - case "Duration": - return DurationValue.parse(value); - case "Char": - return value.charAt(0); - case "Byte": - return value.getBytes(); - case "Double": - return Double.parseDouble(value); - case "Float": - return Float.parseFloat(value); - case "Short": - return Short.parseShort(value); - case "Int": - return Integer.parseInt(value); - case "Long": - return Long.parseLong(value); - case "Node": - case "Relationship": - return JsonUtil.parse(value, null, Map.class); - case "NO_VALUE": - return null; - default: - // If ends with "Array", for example StringArray - if (typeName.endsWith("Array")) { - value = StringUtils.removeStart(value, "["); - value = StringUtils.removeEnd(value, "]"); - String array = typeName.replace("Array", ""); - - final Object[] prototype = getPrototypeFor(array); - return Arrays.stream(value.split(",")) - .map(item -> convertValue(StringUtils.trim(item), array)) - .collect(Collectors.toList()) - .toArray(prototype); - } - return value; - } - } - - private static PointValue getPointValue(String value) { - try { - return PointValue.parse(value); - } catch (RuntimeException e) { - // fallback in case of double-quotes, e.g. - // {"crs":"wgs-84-3d","latitude":13.1,"longitude":33.46789,"height":100.0} - // we remove the double quotes before parsing the result, e.g. - // {crs:"wgs-84-3d",latitude:13.1,longitude:33.46789,height:100.0} - ObjectMapper objectMapper = new ObjectMapper().disable(JsonWriteFeature.QUOTE_FIELD_NAMES.mappedFeature()); - try { - Map readValue = objectMapper.readValue(value, Map.class); - String stringWithoutKeyQuotes = objectMapper.writeValueAsString(readValue); - return PointValue.parse(stringWithoutKeyQuotes); - } catch (JsonProcessingException ex) { - throw new RuntimeException(ex); - } - } - } - - // similar to CsvPropertyConverter - public static Object[] getPrototypeFor(String type) { - switch (type) { - case "Long": - return new Long[] {}; - case "Integer": - return new Integer[] {}; - case "Double": - return new Double[] {}; - case "Float": - return new Float[] {}; - case "Boolean": - return new Boolean[] {}; - case "Byte": - return new Byte[] {}; - case "Short": - return new Short[] {}; - case "Char": - return new Character[] {}; - case "String": - return new String[] {}; - case "DateTime": - return new ZonedDateTime[] {}; - case "LocalTime": - return new LocalTime[] {}; - case "LocalDateTime": - return new LocalDateTime[] {}; - case "Point": - return new PointValue[] {}; - case "Time": - return new OffsetTime[] {}; - case "Date": - return new LocalDate[] {}; - case "Duration": - return new DurationValue[] {}; - default: - throw new IllegalStateException("Type " + type + " not supported."); - } - } } diff --git a/full/src/main/java/apoc/load/Gexf.java b/full/src/main/java/apoc/load/Gexf.java new file mode 100644 index 0000000000..3c59b2e5fb --- /dev/null +++ b/full/src/main/java/apoc/load/Gexf.java @@ -0,0 +1,83 @@ +package apoc.load; + +import static apoc.load.Xml.xmlXpathToMapResult; + +import apoc.Extended; +import apoc.Pools; +import apoc.export.graphml.XmlGraphMLReader; +import apoc.export.util.CountingReader; +import apoc.export.util.ExportConfig; +import apoc.export.util.ProgressReporter; +import apoc.result.MapResult; +import apoc.result.ProgressInfo; +import apoc.util.FileUtils; +import apoc.util.Util; +import java.util.Map; +import java.util.stream.Stream; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Transaction; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Mode; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; +import org.neo4j.procedure.TerminationGuard; + +@Extended +public class Gexf { + + @Context + public GraphDatabaseService db; + + @Context + public Transaction tx; + + @Context + public TerminationGuard terminationGuard; + + @Context + public Pools pools; + + @Procedure("apoc.load.gexf") + @Description("apoc.load.gexf(urlOrBinary, path, $config) - load Gexf file from URL or binary source") + public Stream gexf( + @Name("urlOrBinary") Object urlOrBinary, + @Name(value = "config", defaultValue = "{}") Map config) + throws Exception { + boolean simpleMode = Util.toBoolean(config.getOrDefault("simpleMode", false)); + String path = (String) config.getOrDefault("path", "/"); + return xmlXpathToMapResult(urlOrBinary, simpleMode, path, config, terminationGuard); + } + + @Procedure(name = "apoc.import.gexf", mode = Mode.WRITE) + @Description("Imports a graph from the provided GraphML file.") + public Stream importGexf( + @Name("urlOrBinaryFile") Object urlOrBinaryFile, @Name("config") Map config) { + ProgressInfo result = Util.inThread(pools, () -> { + ExportConfig exportConfig = new ExportConfig(config); + String file = null; + String source = "binary"; + if (urlOrBinaryFile instanceof String) { + file = (String) urlOrBinaryFile; + source = "file"; + } + ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(file, source, "gexf")); + XmlGraphMLReader graphReader = new XmlGraphMLReader(db, tx) + .reporter(reporter) + .batchSize(exportConfig.getBatchSize()) + .relType(exportConfig.defaultRelationshipType()) + .source(exportConfig.getSource()) + .target(exportConfig.getTarget()) + .nodeLabels(exportConfig.readLabels()); + + if (exportConfig.storeNodeIds()) graphReader.storeNodeIds(); + + try (CountingReader reader = FileUtils.readerFor(urlOrBinaryFile, exportConfig.getCompressionAlgo())) { + graphReader.parseXML(reader, terminationGuard, XmlGraphMLReader.ReaderType.GEXF); + } + + return reporter.getTotal(); + }); + return Stream.of(result); + } +} diff --git a/full/src/main/resources/extended.txt b/full/src/main/resources/extended.txt index a3a9dcaf7d..4bade733ed 100644 --- a/full/src/main/resources/extended.txt +++ b/full/src/main/resources/extended.txt @@ -82,6 +82,8 @@ apoc.json.validate apoc.graph.filterProperties apoc.graph.filterProperties apoc.load.csv +apoc.load.gexf +apoc.import.gexf apoc.load.csvParams apoc.load.directory apoc.load.directory.async.add diff --git a/full/src/test/java/apoc/load/GexfTest.java b/full/src/test/java/apoc/load/GexfTest.java new file mode 100644 index 0000000000..73be07ae1b --- /dev/null +++ b/full/src/test/java/apoc/load/GexfTest.java @@ -0,0 +1,214 @@ +package apoc.load; + +import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED; +import static apoc.ApocConfig.APOC_IMPORT_FILE_USE_NEO4J_CONFIG; +import static apoc.ApocConfig.apocConfig; +import static apoc.util.ExtendedTestUtil.assertRelationship; +import static apoc.util.MapUtil.map; +import static apoc.util.TestUtil.testCall; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import apoc.util.TestUtil; +import java.util.List; +import java.util.Map; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.ResourceIterator; +import org.neo4j.test.rule.DbmsRule; +import org.neo4j.test.rule.ImpermanentDbmsRule; + +public class GexfTest { + + @Rule + public DbmsRule db = new ImpermanentDbmsRule(); + + @Before + public void setup() { + apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, true); + apocConfig().setProperty(APOC_IMPORT_FILE_USE_NEO4J_CONFIG, false); + TestUtil.registerProcedure(db, Gexf.class); + } + + @After + public void tearDown() { + db.shutdown(); + } + + @Test + public void testLoadGexf() { + final String file = + ClassLoader.getSystemResource("gexf/single-node.gexf").toString(); + testCall(db, "CALL apoc.load.gexf($file)", Map.of("file", file), (row) -> { + Map value = (Map) row.get("value"); + String expected = + "{_children=[{_children=[{_children=[{_children=[{_children=[{_type=attvalue, for=0, value=http://gephi.org}], _type=attvalues}], _type=node, id=0, label=bar}], _type=nodes}], defaultedgetype=directed, _type=graph}], _type=gexf, version=1.2}"; + assertEquals(expected, value.toString()); + }); + } + + @Test + public void testImportGexf() { + final String file = ClassLoader.getSystemResource("gexf/data.gexf").toString(); + TestUtil.testCall(db, "CALL apoc.import.gexf($file, {readLabels:true})", map("file", file), (r) -> { + assertEquals("gexf", r.get("format")); + assertEquals(5L, r.get("nodes")); + assertEquals(8L, r.get("relationships")); + }); + + TestUtil.testCallCount(db, "MATCH (n) RETURN n", 5); + + TestUtil.testResult(db, "MATCH (n:Gephi) RETURN properties(n) as props", r -> { + ResourceIterator propsIterator = r.columnAs("props"); + Map props = propsIterator.next(); + assertEquals("http://gephi.org", props.get("0")); + assertEquals(1.0f, props.get("1")); + + props = propsIterator.next(); + assertEquals("http://test.gephi.org", props.get("0")); + }); + + TestUtil.testResult(db, "MATCH (n:BarabasiLab) RETURN properties(n) as props", r -> { + ResourceIterator propsIterator = r.columnAs("props"); + Map props = propsIterator.next(); + assertEquals("http://barabasilab.com", props.get("0")); + assertEquals(1.0f, props.get("1")); + }); + + Map multiDataTypeNodeProps = Map.of( + "0", + "http://gephi.org", + "1", + 1.0f, + "room", + 10, + "price", + Double.parseDouble("10.02"), + "projects", + 300L, + "members", + new String[] {"Altomare", "Sterpeto", "Lino"}, + "pins", + new boolean[] {true, false, true, false}); + + TestUtil.testResult(db, "MATCH ()-[rel]->() RETURN rel ORDER BY rel.score", r -> { + final ResourceIterator rels = r.columnAs("rel"); + + assertRelationship( + rels.next(), + "KNOWS", + Map.of("score", 1.5f), + List.of("Gephi"), + multiDataTypeNodeProps, + List.of("Webatlas"), + Map.of("0", "http://webatlas.fr", "1", 2.0f)); + + assertRelationship( + rels.next(), + "BAZ", + Map.of("score", 2.0f, "foo", "bar"), + List.of("Gephi"), + multiDataTypeNodeProps, + List.of("Gephi"), + multiDataTypeNodeProps); + + assertRelationship( + rels.next(), + "HAS_TICKET", + Map.of("score", 3f, "ajeje", "brazorf"), + List.of("Gephi"), + multiDataTypeNodeProps, + List.of("RTGI"), + Map.of("0", "http://rtgi.fr", "1", 1.0f)); + + assertRelationship( + rels.next(), + "KNOWS", + Map.of("score", 4.0f), + List.of("Gephi"), + multiDataTypeNodeProps, + List.of("RTGI"), + Map.of("0", "http://rtgi.fr", "1", 1.0f)); + + assertRelationship( + rels.next(), + "KNOWS", + Map.of("score", 5.0f), + List.of("Webatlas"), + Map.of("0", "http://webatlas.fr", "1", 2.0f), + List.of("Gephi"), + multiDataTypeNodeProps); + + assertRelationship( + rels.next(), + "KNOWS", + Map.of("score", 6.0f), + List.of("RTGI"), + Map.of("0", "http://rtgi.fr", "1", 1.0f), + List.of("Webatlas"), + Map.of("0", "http://webatlas.fr", "1", 2.0f)); + + assertRelationship( + rels.next(), + "KNOWS", + Map.of("score", 7.0f), + List.of("Gephi"), + multiDataTypeNodeProps, + List.of("Webatlas", "BarabasiLab"), + Map.of("0", "http://barabasilab.com", "1", 1.0f, "2", false)); + + assertRelationship( + rels.next(), + "KNOWS", + Map.of("score", 8.0f), + List.of("Gephi"), + Map.of("0", "http://test.gephi.org", "1", 2.0f), + List.of("Webatlas", "BarabasiLab"), + Map.of("0", "http://barabasilab.com", "1", 1.0f, "2", false)); + + assertFalse(rels.hasNext()); + }); + } + + @Test + public void testImportGexfWithDefaultRelationshipTypeSourceAndTargetConfigs() { + String defaultRelType = "TEST_DEFAULT"; + final String file = + ClassLoader.getSystemResource("gexf/single-rel.gexf").toString(); + + db.executeTransactionally("CREATE (:Foo {startId: 'start'})"); + db.executeTransactionally("CREATE (:Bar {endId: 'end'})"); + + TestUtil.testCall( + db, + "CALL apoc.import.gexf($file, {defaultRelationshipType: $defaultRelType, source: $source, target: $target})", + map( + "file", + file, + "defaultRelType", + defaultRelType, + "source", + map("label", "Foo", "id", "startId"), + "target", + map("label", "Bar", "id", "endId")), + (r) -> { + assertEquals("gexf", r.get("format")); + assertEquals(1L, r.get("relationships")); + }); + + TestUtil.testCall(db, "MATCH ()-[rel]->() RETURN rel", r -> { + Relationship rel = (Relationship) r.get("rel"); + assertRelationship( + rel, + defaultRelType, + Map.of(), + List.of("Foo"), + Map.of("startId", "start"), + List.of("Bar"), + Map.of("endId", "end")); + }); + } +} diff --git a/full/src/test/java/apoc/util/ExtendedTestUtil.java b/full/src/test/java/apoc/util/ExtendedTestUtil.java index 7bf30c449c..3d5a69c702 100644 --- a/full/src/test/java/apoc/util/ExtendedTestUtil.java +++ b/full/src/test/java/apoc/util/ExtendedTestUtil.java @@ -2,6 +2,9 @@ import static apoc.util.TestUtil.testCall; import static apoc.util.TestUtil.testCallAssertions; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.neo4j.test.assertion.Assert.assertEventually; @@ -10,16 +13,79 @@ import java.io.IOException; import java.nio.file.Files; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Label; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.RelationshipType; import org.neo4j.graphdb.Result; import org.neo4j.graphdb.ResultTransformer; +import org.neo4j.internal.helpers.collection.Iterables; +import org.neo4j.internal.helpers.collection.Iterators; import org.neo4j.test.assertion.Assert; public class ExtendedTestUtil { + public static void assertRelationship( + Relationship rel, + String expectedRelType, + Map expectedProps, + List expectedStartNodeLabels, + Map expectedStartNodeProps, + List expectedEndNodeLabels, + Map expectedEndNodeProps) { + + Node startNode = rel.getStartNode(); + Node endNode = rel.getEndNode(); + assertMapEquals(expectedProps, rel.getAllProperties()); + assertEquals(RelationshipType.withName(expectedRelType), rel.getType()); + Set