Skip to content

Commit

Permalink
Fixes #3426: add apache arrow import procedure to extended (#3978)
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 authored Mar 14, 2024
1 parent 3389251 commit cf7d6ba
Show file tree
Hide file tree
Showing 10 changed files with 715 additions and 144 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
= apoc.import.arrow
:description: This section contains reference documentation for the apoc.import.arrow procedure.

label:procedure[] label:apoc-extended[]

[.emphasis]
apoc.import.arrow(input, $config) - Imports arrow from the provided arrow file or byte array

== Signature

[source]
----
apoc.import.arrow(urlOrBinaryFile :: ANY?, config = {} :: MAP?) :: (file :: STRING?, source :: STRING?, format :: STRING?, nodes :: INTEGER?, relationships :: INTEGER?, properties :: INTEGER?, time :: INTEGER?, rows :: INTEGER?, batchSize :: INTEGER?, batches :: INTEGER?, done :: BOOLEAN?, data :: STRING?)
----

== Input parameters
[.procedures, opts=header]
|===
| Name | Type | Default
|urlOrBinaryFile|ANY?|null
|config|MAP?|{}
|===

== Config parameters
This procedure supports the following config parameters:

.Config parameters
[opts=header, cols='1a,1a,1a,3a']
|===
| name | type |default | description
| unwindBatchSize | Integer | `2000` | the batch size of the unwind
| mapping | Map | `{}` | see `Mapping config` example below
|===

== Output parameters
[.procedures, opts=header]
|===
| Name | Type
|file|STRING?
|source|STRING?
|format|STRING?
|nodes|INTEGER?
|relationships|INTEGER?
|properties|INTEGER?
|time|INTEGER?
|rows|INTEGER?
|batchSize|INTEGER?
|batches|INTEGER?
|done|BOOLEAN?
|data|STRING?
|===

[[usage-apoc.import.arrow]]
== Usage Examples

The `apoc.import.arrow` procedure can be used to import arrow files created by the `apoc.export.arrow.*` procedures.


[source,cypher]
----
CALL apoc.import.arrow("fileCreatedViaExportProcedures.arrow")
----

.Results
[opts=header]
|===
| file | source | format | nodes | relationships | properties | time | rows | batchSize | batches | done | data
| "fileCreatedViaExportProcedures.arrow" | "file" | "arrow" | 3 | 1 | 15 | 105 | 4 | -1 | 0 | TRUE | NULL
|===


We can also import a file from a binary `byte[]` created by the `apoc.export.arrow.stream.*` procedures.

[source,cypher]
----
CALL apoc.import.arrow(`<binaryArrow>`)
----

=== Mapping config

In order to import complex types not supported by Parquet, like Point, Duration, List of Duration, etc..
we can use the mapping config to convert to the desired data type.
For example, if we have a node `(:MyLabel {durationProp: duration('P5M1.5D')}`, and we export it in a parquet file/binary,
we can import it by explicit a map with key the property key, and value the property type.

That is in this example, by using the load procedure:
[source,cypher]
----
CALL apoc.load.arrow(fileOrBinary, {mapping: {durationProp: 'Duration'}})
----

Or with the import procedure:
[source,cypher]
----
CALL apoc.import.parquet(fileOrBinary, {mapping: {durationProp: 'Duration'}})
----

The mapping value types can be one of the following:

* `Point`
* `LocalDateTime`
* `LocalTime`
* `DateTime`
* `Time`
* `Date`
* `Duration`
* `Char`
* `Byte`
* `Double`
* `Float`
* `Short`
* `Int`
* `Long`
* `Node`
* `Relationship`
* `BaseType` followed by Array, to map a list of values, where BaseType can be one of the previous type, for example `DurationArray`


12 changes: 12 additions & 0 deletions docs/asciidoc/modules/ROOT/pages/overview/apoc.import/index.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
= apoc.import
:description: This section contains reference documentation for the apoc.import procedures.

[.procedures, opts=header, cols='5a,1a']
|===
| Qualified Name | Type
|xref::overview/apoc.import/apoc.import.arrow.adoc[apoc.import.arrow icon:book[]]

apoc.import.arrow(input, $config) - Imports arrow from the provided arrow file or byte array
|label:procedure[]
|===

Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,17 @@ apoc.get.rels(rel\|id\|[ids]) - quickly returns all relationships with these id'
|label:procedure[]
|===

== xref::overview/apoc.import/index.adoc[]

[.procedures, opts=header, cols='5a,1a']
|===
| Qualified Name | Type
|xref::overview/apoc.import/apoc.import.arrow.adoc[apoc.import.arrow icon:book[]]

apoc.import.arrow(input, $config) - Imports arrow from the provided arrow file or byte array
|label:procedure[]
|===

== xref::overview/apoc.load/index.adoc[]

[.procedures, opts=header, cols='5a,1a']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ This file is generated by DocsTest, so don't change it!
** xref::overview/apoc.get/index.adoc[]
*** xref::overview/apoc.get/apoc.get.nodes.adoc[]
*** xref::overview/apoc.get/apoc.get.rels.adoc[]
** xref::overview/apoc.import/index.adoc[]
*** xref::overview/apoc.import/apoc.import.arrow.adoc[]
** xref::overview/apoc.load/index.adoc[]
*** xref::overview/apoc.load/apoc.load.csv.adoc[]
*** xref::overview/apoc.load/apoc.load.csvParams.adoc[]
Expand Down
6 changes: 6 additions & 0 deletions extended/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ dependencies {
compileOnly group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.5', {
exclude group: 'com.amazonaws'
}

compileOnly group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0'
compileOnly group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0'
testImplementation group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0'
testImplementation group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0'

testImplementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.5', {
exclude group: 'com.amazonaws'
}
Expand Down
225 changes: 225 additions & 0 deletions extended/src/main/java/apoc/export/arrow/ImportArrow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package apoc.export.arrow;

import apoc.Extended;
import apoc.Pools;
import apoc.export.util.BatchTransaction;
import apoc.export.util.ProgressReporter;
import apoc.result.ProgressInfo;
import apoc.util.FileUtils;
import apoc.util.Util;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.neo4j.graphdb.Entity;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.security.URLAccessChecker;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Mode;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.channels.SeekableByteChannel;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import static apoc.util.ExtendedUtil.toValidValue;

@Extended
public class ImportArrow {

// TODO: field similar to the one placed in ArrowUtils (placed in core)
// when the Arrow procedures will be placed in extended remove these lines
// and replace FIELD_ID with FIELD_ID.getName(), FIELD_LABELS with FIELD_LABELS.getName(), etc..
public static String FIELD_ID = "<id>";
public static String FIELD_LABELS = "labels";
public static String FIELD_SOURCE_ID = "<source.id>";
public static String FIELD_TARGET_ID = "<target.id>";
public static String FIELD_TYPE = "<type>";
// -- end ArrowUtils fields

@Context
public Pools pools;

@Context
public GraphDatabaseService db;

@Context
public URLAccessChecker urlAccessChecker;


@Procedure(name = "apoc.import.arrow", mode = Mode.WRITE)
@Description("Imports arrow from the provided arrow file or byte array")
public Stream<ProgressInfo> importFile(@Name("input") Object input, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) throws Exception {

ProgressInfo result =
Util.inThread(pools, () -> {
String file = null;
String sourceInfo = "binary";
if (input instanceof String) {
file = (String) input;
sourceInfo = "file";
}

final ArrowConfig conf = new ArrowConfig(config);

final Map<Long, Long> idMapping = new HashMap<>();

AtomicInteger counter = new AtomicInteger();
try (ArrowReader reader = getReader(input);
VectorSchemaRoot schemaRoot = reader.getVectorSchemaRoot()) {

final ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(file, sourceInfo, "arrow"));
BatchTransaction btx = new BatchTransaction(db, conf.getBatchSize(), reporter);
try {
while (hasElements(counter, reader, schemaRoot)) {

final Map<String, Object> row = schemaRoot.getFieldVectors()
.stream()
.collect(
HashMap::new,
(map, fieldVector) -> {
Object read = read(fieldVector, counter.get(), conf);
if (read == null) {
return;
}
map.put(fieldVector.getName(), read);
},
HashMap::putAll);

String relType = (String) row.remove(FIELD_TYPE);
if (relType == null) {
// is node
String[] stringLabels = (String[]) row.remove(FIELD_LABELS);
Label[] labels = Optional.ofNullable(stringLabels)
.map(l -> Arrays.stream(l).map(Label::label).toArray(Label[]::new))
.orElse(new Label[]{});
final Node node = btx.getTransaction().createNode(labels);

long id = (long) row.remove(FIELD_ID);
idMapping.put(id, node.getId());

addProps(row, node);
reporter.update(1, 0, row.size());
} else {
// is relationship
long sourceId = (long) row.remove(FIELD_SOURCE_ID);
Long idSource = idMapping.get(sourceId);
final Node source = btx.getTransaction().getNodeById(idSource);

long targetId = (long) row.remove(FIELD_TARGET_ID);
Long idTarget = idMapping.get(targetId);
final Node target = btx.getTransaction().getNodeById(idTarget);

final Relationship rel = source.createRelationshipTo(target, RelationshipType.withName(relType));
addProps(row, rel);
reporter.update(0, 1, row.size());
}

counter.incrementAndGet();
btx.increment();
}

btx.doCommit();
} catch (RuntimeException e) {
btx.rollback();
throw e;
} finally {
btx.close();
}

return reporter.getTotal();
}
});

return Stream.of(result);
}


private ArrowReader getReader(Object input) throws IOException {
RootAllocator allocator = new RootAllocator();
if (input instanceof String) {
final SeekableByteChannel channel = FileUtils.inputStreamFor(input, null, null, null, urlAccessChecker)
.asChannel();
return new ArrowFileReader(channel, allocator);
}
ByteArrayInputStream inputStream = new ByteArrayInputStream((byte[]) input);
return new ArrowStreamReader(inputStream, allocator);
}


private static boolean hasElements(AtomicInteger counter, ArrowReader reader, VectorSchemaRoot schemaRoot) throws IOException {
if (counter.get() >= schemaRoot.getRowCount()) {
if (reader.loadNextBatch()) {
counter.set(0);
} else {
return false;
}
}

return true;
}

private static Object read(FieldVector fieldVector, int index, ArrowConfig conf) {

if (fieldVector.isNull(index)) {
return null;
} else if (fieldVector instanceof BitVector) {
BitVector fe = (BitVector) fieldVector;
return fe.get(index) == 1;
} else {
Object object = fieldVector.getObject(index);
if (object instanceof Collection coll && coll.isEmpty()) {
return null;
}
return toValidValue(object, fieldVector.getName(), conf.getMapping());
}
}

private void addProps(Map<String, Object> row, Entity rel) {
row.forEach(rel::setProperty);
}

/**
* Analog to ArrowConfig present in APOC Core.
* TODO Merge these 2 classes when arrow procedure will be moved to APOC Extended
*/
public static class ArrowConfig {

private final int batchSize;
private final Map<String, Object> mapping;

public ArrowConfig(Map<String, Object> config) {
if (config == null) {
config = Collections.emptyMap();
}
this.mapping = (Map<String, Object>) config.getOrDefault("mapping", Map.of());
this.batchSize = Util.toInteger(config.getOrDefault("batchSize", 2000));
}

public int getBatchSize() {
return batchSize;
}

public Map<String, Object> getMapping() {
return mapping;
}
}

}
Loading

0 comments on commit cf7d6ba

Please sign in to comment.