From ce244390b5d5ed9ba4dd7cab6f14c45d6389af88 Mon Sep 17 00:00:00 2001 From: Giuseppe Villani Date: Mon, 21 Aug 2023 10:15:13 +0200 Subject: [PATCH] Fixes #3589: Add support for Parquet files similar to CSV/Arrow (#3711) --- build.gradle | 2 +- extended/build.gradle | 7 + .../export/parquet/ApocParquetReader.java | 169 ++++++++++ .../apoc/export/parquet/ExportParquet.java | 155 +++++++++ .../parquet/ExportParquetFileStrategy.java | 109 +++++++ .../ExportParquetGraphFileStrategy.java | 43 +++ .../ExportParquetGraphStreamStrategy.java | 25 ++ .../ExportParquetResultFileStrategy.java | 52 +++ .../ExportParquetResultStreamStrategy.java | 23 ++ .../export/parquet/ExportParquetStrategy.java | 44 +++ .../parquet/ExportParquetStreamStrategy.java | 156 +++++++++ .../apoc/export/parquet/ImportParquet.java | 138 ++++++++ .../apoc/export/parquet/ParquetConfig.java | 34 ++ .../export/parquet/ParquetExportType.java | 173 ++++++++++ .../apoc/export/parquet/ParquetReadUtil.java | 233 +++++++++++++ .../java/apoc/export/parquet/ParquetUtil.java | 227 +++++++++++++ .../src/main/java/apoc/load/LoadParquet.java | 73 +++++ .../java/apoc/ComparePerformancesTest.java | 95 ++++++ .../apoc/export/parquet/ParquetHdfsTest.java | 80 +++++ .../apoc/export/parquet/ParquetS3Test.java | 59 ++++ .../java/apoc/export/parquet/ParquetTest.java | 308 ++++++++++++++++++ .../apoc/export/parquet/ParquetTestUtil.java | 178 ++++++++++ extra-dependencies/hadoop/build.gradle | 4 + 23 files changed, 2386 insertions(+), 1 deletion(-) create mode 100644 extended/src/main/java/apoc/export/parquet/ApocParquetReader.java create mode 100644 extended/src/main/java/apoc/export/parquet/ExportParquet.java create mode 100644 extended/src/main/java/apoc/export/parquet/ExportParquetFileStrategy.java create mode 100644 extended/src/main/java/apoc/export/parquet/ExportParquetGraphFileStrategy.java create mode 100644 extended/src/main/java/apoc/export/parquet/ExportParquetGraphStreamStrategy.java create mode 100644 extended/src/main/java/apoc/export/parquet/ExportParquetResultFileStrategy.java create mode 100644 extended/src/main/java/apoc/export/parquet/ExportParquetResultStreamStrategy.java create mode 100644 extended/src/main/java/apoc/export/parquet/ExportParquetStrategy.java create mode 100644 extended/src/main/java/apoc/export/parquet/ExportParquetStreamStrategy.java create mode 100644 extended/src/main/java/apoc/export/parquet/ImportParquet.java create mode 100644 extended/src/main/java/apoc/export/parquet/ParquetConfig.java create mode 100644 extended/src/main/java/apoc/export/parquet/ParquetExportType.java create mode 100644 extended/src/main/java/apoc/export/parquet/ParquetReadUtil.java create mode 100644 extended/src/main/java/apoc/export/parquet/ParquetUtil.java create mode 100644 extended/src/main/java/apoc/load/LoadParquet.java create mode 100644 extended/src/test/java/apoc/ComparePerformancesTest.java create mode 100644 extended/src/test/java/apoc/export/parquet/ParquetHdfsTest.java create mode 100644 extended/src/test/java/apoc/export/parquet/ParquetS3Test.java create mode 100644 extended/src/test/java/apoc/export/parquet/ParquetTest.java create mode 100644 extended/src/test/java/apoc/export/parquet/ParquetTestUtil.java diff --git a/build.gradle b/build.gradle index cb01dbc133..2324b847a9 100644 --- a/build.gradle +++ b/build.gradle @@ -130,7 +130,7 @@ subprojects { ext { // NB: due to version.json generation by parsing this file, the next line must not have any if/then/else logic - neo4jVersion = "5.11.0" + neo4jVersion = "5.9.0" // instead we apply the override logic here neo4jVersionEffective = project.hasProperty("neo4jVersionOverride") ? project.getProperty("neo4jVersionOverride") : neo4jVersion testContainersVersion = '1.18.3' diff --git a/extended/build.gradle b/extended/build.gradle index c5387b798b..c03695670f 100644 --- a/extended/build.gradle +++ b/extended/build.gradle @@ -52,6 +52,7 @@ task gitSubmoduleLoad { } dependencies { + apt project(':processor') apt group: 'org.neo4j', name: 'neo4j', version: neo4jVersionEffective // mandatory to run @ServiceProvider based META-INF code generation @@ -102,6 +103,11 @@ dependencies { compileOnly group: 'com.sun.mail', name: 'javax.mail', version: '1.6.0' compileOnly group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: '1.6.0' + compileOnly group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.13.1', withoutServers + // testImplementation analogous is not needed since is bundled via `test-utils` submodule + compileOnly group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.1.0', withoutServers + + // These dependencies affect the tests only, they will not be packaged in the resulting .jar testImplementation project(':test-utils') testImplementation project(':core') @@ -130,6 +136,7 @@ dependencies { testImplementation group: 'com.sun.mail', name: 'javax.mail', version: '1.6.0' testImplementation group: 'org.postgresql', name: 'postgresql', version: '42.1.4' testImplementation group: 'org.zapodot', name: 'embedded-ldap-junit', version: '0.9.0' + testImplementation group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.13.1', withoutServers configurations.all { diff --git a/extended/src/main/java/apoc/export/parquet/ApocParquetReader.java b/extended/src/main/java/apoc/export/parquet/ApocParquetReader.java new file mode 100644 index 0000000000..fa02b2d198 --- /dev/null +++ b/extended/src/main/java/apoc/export/parquet/ApocParquetReader.java @@ -0,0 +1,169 @@ +package apoc.export.parquet; + +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ColumnReadStore; +import org.apache.parquet.column.ColumnReader; +import org.apache.parquet.column.impl.ColumnReadStoreImpl; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; + +import java.io.Closeable; +import java.io.IOException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static apoc.export.parquet.ParquetReadUtil.toTimeUnitJava; +import static apoc.export.parquet.ParquetReadUtil.toValidValue; + +public final class ApocParquetReader implements Closeable { + private final ParquetFileReader reader; + private final List columns; + private final MessageType schema; + private final GroupConverter recordConverter; + private final String createdBy; + + private long currentRowGroupSize = -1L; + private List currentRowGroupColumnReaders; + private long currentRowIndex = -1L; + private final ParquetConfig config; + + public ApocParquetReader(InputFile file, ParquetConfig config) throws IOException { + this.reader = ParquetFileReader.open(file); + FileMetaData meta = reader.getFooter().getFileMetaData(); + this.schema = meta.getSchema(); + this.recordConverter = new GroupRecordConverter(this.schema).getRootConverter(); + this.createdBy = meta.getCreatedBy(); + + this.columns = schema.getColumns() + .stream() + .collect(Collectors.toList()); + + this.config = config; + } + + private Object readValue(ColumnReader columnReader) { + ColumnDescriptor column = columnReader.getDescriptor(); + PrimitiveType primitiveType = column.getPrimitiveType(); + int maxDefinitionLevel = column.getMaxDefinitionLevel(); + + if (columnReader.getCurrentDefinitionLevel() == maxDefinitionLevel) { + switch (primitiveType.getPrimitiveTypeName()) { + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + case INT96: + return columnReader.getBinary().toStringUsingUTF8(); + case BOOLEAN: + return columnReader.getBoolean(); + case DOUBLE: + return columnReader.getDouble(); + case FLOAT: + return columnReader.getFloat(); + case INT32: + return columnReader.getInteger(); + case INT64: + // convert int to Temporal, if logical type is not null + long recordLong = columnReader.getLong(); + LogicalTypeAnnotation logicalTypeAnnotation = primitiveType.getLogicalTypeAnnotation(); + if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation logicalTypeAnnotation1 = (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalTypeAnnotation; + if (logicalTypeAnnotation1.isAdjustedToUTC()) { + return Instant.EPOCH.plus(recordLong, toTimeUnitJava(logicalTypeAnnotation1.getUnit()).toChronoUnit()); + } else { + return LocalDateTime.ofInstant(Instant.EPOCH.plus(recordLong, toTimeUnitJava(logicalTypeAnnotation1.getUnit()).toChronoUnit()), ZoneId.of("UTC")); + } + } + return recordLong; + default: + throw new IllegalArgumentException("Unsupported type: " + primitiveType); + } + } else { + // fallback + return null; + } + } + + public Map getRecord() throws IOException { + if (currentRowIndex == currentRowGroupSize) { + + PageReadStore rowGroup = reader.readNextRowGroup(); + if (rowGroup == null) { + return null; + } + + ColumnReadStore columnReadStore = new ColumnReadStoreImpl(rowGroup, this.recordConverter, this.schema, this.createdBy); + + this.currentRowGroupSize = rowGroup.getRowCount(); + this.currentRowGroupColumnReaders = columns.stream() + .map(columnReadStore::getColumnReader) + .collect(Collectors.toList()); + this.currentRowIndex = 0L; + } + + HashMap record = new HashMap<>(); + for (ColumnReader columnReader: this.currentRowGroupColumnReaders) { + // if it's a list we have use columnReader.consume() multiple times (until columnReader.getCurrentRepetitionLevel() == 0, i.e. totally consumed) + // to collect the list elements + do { + addRecord(record, columnReader); + columnReader.consume(); + } while (columnReader.getCurrentRepetitionLevel() != 0); + } + + this.currentRowIndex++; + + return record.entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> toValidValue(e.getValue(), e.getKey(), config)) + ); + } + + public void addRecord(Map record, ColumnReader columnReader) { + Object value = readValue(columnReader); + if (value== null) { + return; + } + String[] path = columnReader.getDescriptor().getPath(); + String fieldName = path[0]; + try { + // if it's a list, create a list of consumed sub-records + boolean isAList = path.length == 3 && path[1].equals("list"); + record.compute(fieldName, (k, v) -> { + if (v == null) { + if (isAList) { + return new ArrayList<>() {{ add(value); }}; + } + return value; + } + if (isAList) { + List list = (List) v; + list.add(value); + return list; + } + throw new RuntimeException("Multiple element with the same key found, but the element type is not a list"); + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + reader.close(); + } +} + diff --git a/extended/src/main/java/apoc/export/parquet/ExportParquet.java b/extended/src/main/java/apoc/export/parquet/ExportParquet.java new file mode 100644 index 0000000000..79dbfb6884 --- /dev/null +++ b/extended/src/main/java/apoc/export/parquet/ExportParquet.java @@ -0,0 +1,155 @@ +package apoc.export.parquet; + +import apoc.ApocConfig; +import apoc.Description; +import apoc.Extended; +import apoc.Pools; +import apoc.export.util.NodesAndRelsSubGraph; +import apoc.result.ByteArrayResult; +import apoc.result.ProgressInfo; +import apoc.util.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.neo4j.cypher.export.DatabaseSubGraph; +import org.neo4j.cypher.export.SubGraph; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.Result; +import org.neo4j.graphdb.Transaction; +import org.neo4j.logging.Log; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; +import org.neo4j.procedure.TerminationGuard; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static apoc.ApocConfig.APOC_EXPORT_FILE_ENABLED; +import static apoc.ApocConfig.EXPORT_NOT_ENABLED_ERROR; +import static apoc.ApocConfig.apocConfig; +import static apoc.export.parquet.ParquetExportType.Type.from; + +@Extended +public class ExportParquet { + public static final String EXPORT_TO_FILE_PARQUET_ERROR = EXPORT_NOT_ENABLED_ERROR + + "\nOtherwise, if you are running in a cloud environment without filesystem access, use the apoc.export.parquet.*.stream procedures to stream the export back to your client.";; + + @Context + public Transaction tx; + + @Context + public Log log; + + @Context + public GraphDatabaseService db; + + @Context + public TerminationGuard terminationGuard; + + @Context + public ApocConfig apocConfig; + + @Context + public Pools pools; + + + @Procedure("apoc.export.parquet.all.stream") + @Description("Exports the full database to the provided CSV file.") + public Stream all(@Name(value = "config", defaultValue = "{}") Map config) { + return exportParquet(new DatabaseSubGraph(tx), new ParquetConfig(config)); + } + + @Procedure("apoc.export.parquet.data.stream") + @Description("Exports the given nodes and relationships to the provided CSV file.") + public Stream data(@Name("nodes") List nodes, @Name("rels") List rels, @Name(value = "config", defaultValue = "{}") Map config) { + ParquetConfig conf = new ParquetConfig(config); + return exportParquet(new NodesAndRelsSubGraph(tx, nodes, rels), conf); + } + + @Procedure("apoc.export.parquet.graph.stream") + @Description("Exports the given graph to the provided CSV file.") + public Stream graph(@Name("graph") Map graph, @Name(value = "config", defaultValue = "{}") Map config) { + Collection nodes = (Collection) graph.get("nodes"); + Collection rels = (Collection) graph.get("relationships"); + ParquetConfig conf = new ParquetConfig(config); + + return exportParquet(new NodesAndRelsSubGraph(tx, nodes, rels), conf); + } + + @Procedure("apoc.export.parquet.query.stream") + @Description("Exports the results from running the given Cypher query to the provided CSV file.") + public Stream query(@Name("query") String query, @Name(value = "config", defaultValue = "{}") Map config) { + ParquetConfig exportConfig = new ParquetConfig(config); + Map params = config == null ? Collections.emptyMap() : (Map)config.getOrDefault("params", Collections.emptyMap()); + Result result = tx.execute(query,params); + + return exportParquet(result, exportConfig); + } + + @Procedure("apoc.export.parquet.all") + @Description("Exports the full database to the provided CSV file.") + public Stream all(@Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map config) throws IOException { + return exportParquet(fileName, new DatabaseSubGraph(tx), new ParquetConfig(config)); + } + + @Procedure("apoc.export.parquet.data") + @Description("Exports the given nodes and relationships to the provided CSV file.") + public Stream data(@Name("nodes") List nodes, @Name("rels") List rels, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map config) throws IOException { + ParquetConfig conf = new ParquetConfig(config); + return exportParquet(fileName, new NodesAndRelsSubGraph(tx, nodes, rels), conf); + } + + @Procedure("apoc.export.parquet.graph") + @Description("Exports the given graph to the provided CSV file.") + public Stream graph(@Name("graph") Map graph, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map config) throws IOException { + Collection nodes = (Collection) graph.get("nodes"); + Collection rels = (Collection) graph.get("relationships"); + ParquetConfig conf = new ParquetConfig(config); + + return exportParquet(fileName, new NodesAndRelsSubGraph(tx, nodes, rels), conf); + } + + @Procedure("apoc.export.parquet.query") + @Description("Exports the results from running the given Cypher query to the provided CSV file.") + public Stream query(@Name("query") String query, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map config) throws IOException { + ParquetConfig exportConfig = new ParquetConfig(config); + Map params = config == null ? Collections.emptyMap() : (Map)config.getOrDefault("params", Collections.emptyMap()); + Result result = tx.execute(query,params); + + return exportParquet(fileName, result, exportConfig); + } + + public Stream exportParquet(String fileName, Object data, ParquetConfig config) throws IOException { + if (StringUtils.isBlank(fileName)) { + throw new RuntimeException("The fileName must exists. Otherwise, use the `apoc.export.parquet.*.stream.` procedures to stream the export back to your client."); + } + // normalize file url + fileName = FileUtils.changeFileUrlIfImportDirectoryConstrained(fileName); + + // we cannot use apocConfig().checkWriteAllowed(..) because the error is confusing + // since it says "... use the `{stream:true}` config", but with arrow procedures the streaming mode is implemented via different procedures + if (!apocConfig().getBoolean(APOC_EXPORT_FILE_ENABLED)) { + throw new RuntimeException(EXPORT_TO_FILE_PARQUET_ERROR); + } + ParquetExportType exportType = from(data); + if (data instanceof Result) { + return new ExportParquetResultFileStrategy(fileName, db, pools, terminationGuard, log, exportType).export((Result) data, config); + } + return new ExportParquetGraphFileStrategy(fileName, db, pools, terminationGuard, log, exportType).export((SubGraph) data, config); + } + + public Stream exportParquet(Object data, ParquetConfig config) { + + ParquetExportType exportType = from(data); + if (data instanceof Result) { + return new ExportParquetResultStreamStrategy(db, pools, terminationGuard, log, exportType).export((Result) data, config); + } + return new ExportParquetGraphStreamStrategy(db, pools, terminationGuard, log, exportType).export((SubGraph) data, config); + } +} + diff --git a/extended/src/main/java/apoc/export/parquet/ExportParquetFileStrategy.java b/extended/src/main/java/apoc/export/parquet/ExportParquetFileStrategy.java new file mode 100644 index 0000000000..50fd21b809 --- /dev/null +++ b/extended/src/main/java/apoc/export/parquet/ExportParquetFileStrategy.java @@ -0,0 +1,109 @@ +package apoc.export.parquet; + +import apoc.Pools; +import apoc.export.util.ProgressReporter; +import apoc.result.ProgressInfo; +import apoc.util.QueueBasedSpliterator; +import apoc.util.QueueUtil; +import apoc.util.Util; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.schema.MessageType; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.logging.Log; +import org.neo4j.procedure.TerminationGuard; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + + +public abstract class ExportParquetFileStrategy implements ExportParquetStrategy> { + + private final String fileName; + private final GraphDatabaseService db; + private final Pools pools; + private final TerminationGuard terminationGuard; + private final Log logger; + private final ParquetExportType exportType; + ParquetWriter writer; + + public ExportParquetFileStrategy(String fileName, GraphDatabaseService db, Pools pools, TerminationGuard terminationGuard, Log logger, ParquetExportType exportType) { + this.fileName = fileName; + this.db = db; + this.pools = pools; + this.terminationGuard = terminationGuard; + this.logger = logger; + this.exportType = exportType; + } + + public Stream export(IN data, ParquetConfig config) { + + ProgressInfo progressInfo = new ProgressInfo(fileName, getSource(data), "parquet"); + progressInfo.batchSize = config.getBatchSize(); + ProgressReporter reporter = new ProgressReporter(null, null, progressInfo); + + Path fileToWrite = new Path(fileName); + final BlockingQueue queue = new ArrayBlockingQueue<>(10); + Util.inTxFuture(pools.getDefaultExecutorService(), db, tx -> { + int batchCount = 0; + List rows = new ArrayList<>(config.getBatchSize()); + ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(fileToWrite); + + try { + Iterator it = toIterator(reporter, data); + while (!Util.transactionIsTerminated(terminationGuard) && it.hasNext()) { + rows.add(it.next()); + + if (batchCount > 0 && batchCount % config.getBatchSize() == 0) { + writeBatch(builder, rows, data, config); + } + ++batchCount; + } + if (!rows.isEmpty()) { + writeBatch(builder, rows, data, config); + } + QueueUtil.put(queue, progressInfo, 10); + return true; + } catch (Exception e) { + logger.error("Exception while extracting Parquet data:", e); + } finally { + closeWriter(); + reporter.done(); + QueueUtil.put(queue, ProgressInfo.EMPTY, 10); + } + return true; + }); + + QueueBasedSpliterator spliterator = new QueueBasedSpliterator<>(queue, ProgressInfo.EMPTY, terminationGuard, Integer.MAX_VALUE); + return StreamSupport.stream(spliterator, false); + } + + private void closeWriter() { + if (this.writer == null) return; + try { + this.writer.close(); + } catch (IOException ignored) {} + } + + private void writeBatch(ExampleParquetWriter.Builder builder, List rows, IN data, ParquetConfig config) { + + List conf = exportType.createConfig(rows, data, config); + MessageType schema = exportType.schemaFor(db, conf); + + if (writer == null) { + this.writer = getBuild(schema, builder); + } + writeRows(rows, writer, exportType, schema); + } + + public abstract String getSource(IN data); + + public abstract Iterator toIterator(ProgressReporter reporter, IN data); +} diff --git a/extended/src/main/java/apoc/export/parquet/ExportParquetGraphFileStrategy.java b/extended/src/main/java/apoc/export/parquet/ExportParquetGraphFileStrategy.java new file mode 100644 index 0000000000..57729218fb --- /dev/null +++ b/extended/src/main/java/apoc/export/parquet/ExportParquetGraphFileStrategy.java @@ -0,0 +1,43 @@ +package apoc.export.parquet; + +import apoc.Pools; +import apoc.export.util.ProgressReporter; +import apoc.result.ProgressInfo; +import apoc.util.collection.Iterables; +import org.neo4j.cypher.export.SubGraph; +import org.neo4j.graphdb.Entity; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.logging.Log; +import org.neo4j.procedure.TerminationGuard; + +import java.util.Iterator; +import java.util.stream.Stream; + +public class ExportParquetGraphFileStrategy extends ExportParquetFileStrategy { + public ExportParquetGraphFileStrategy(String fileName, GraphDatabaseService db, Pools pools, TerminationGuard terminationGuard, Log logger, ParquetExportType exportType) { + super(fileName, db, pools, terminationGuard, logger, exportType); + } + + @Override + public Stream export(SubGraph data, ParquetConfig config) { + return super.export(data, config); + } + + @Override + public String getSource(SubGraph subGraph) { + return String.format("graph: nodes(%d), rels(%d)", Iterables.count(subGraph.getNodes()), Iterables.count(subGraph.getRelationships())); + } + + @Override + public Iterator toIterator(ProgressReporter reporter, SubGraph data) { + return Stream.concat(Iterables.stream(data.getNodes()), Iterables.stream(data.getRelationships())) + .map(entity -> { + reporter.update(entity instanceof Node ? 1 : 0, + entity instanceof Relationship ? 1 : 0, 0); + return entity; + }) + .iterator(); + } +} diff --git a/extended/src/main/java/apoc/export/parquet/ExportParquetGraphStreamStrategy.java b/extended/src/main/java/apoc/export/parquet/ExportParquetGraphStreamStrategy.java new file mode 100644 index 0000000000..ba3ab23952 --- /dev/null +++ b/extended/src/main/java/apoc/export/parquet/ExportParquetGraphStreamStrategy.java @@ -0,0 +1,25 @@ +package apoc.export.parquet; + +import apoc.Pools; +import apoc.util.collection.Iterables; +import org.neo4j.cypher.export.SubGraph; +import org.neo4j.graphdb.Entity; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.logging.Log; +import org.neo4j.procedure.TerminationGuard; + +import java.util.Iterator; +import java.util.stream.Stream; + + +public class ExportParquetGraphStreamStrategy extends ExportParquetStreamStrategy { + public ExportParquetGraphStreamStrategy(GraphDatabaseService db, Pools pools, TerminationGuard terminationGuard, Log logger, ParquetExportType exportType) { + super(db, pools, terminationGuard, logger, exportType); + } + + @Override + public Iterator toIterator(SubGraph data) { + return Stream.concat(Iterables.stream(data.getNodes()), Iterables.stream(data.getRelationships())) + .iterator(); + } +} diff --git a/extended/src/main/java/apoc/export/parquet/ExportParquetResultFileStrategy.java b/extended/src/main/java/apoc/export/parquet/ExportParquetResultFileStrategy.java new file mode 100644 index 0000000000..04d26a6d6a --- /dev/null +++ b/extended/src/main/java/apoc/export/parquet/ExportParquetResultFileStrategy.java @@ -0,0 +1,52 @@ +package apoc.export.parquet; + +import apoc.Pools; +import apoc.export.util.ProgressReporter; +import apoc.result.ProgressInfo; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.Result; +import org.neo4j.logging.Log; +import org.neo4j.procedure.TerminationGuard; + +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Stream; + + +public class ExportParquetResultFileStrategy extends ExportParquetFileStrategy, Result> { + public ExportParquetResultFileStrategy(String fileName, GraphDatabaseService db, Pools pools, TerminationGuard terminationGuard, Log logger, ParquetExportType exportType) { + super(fileName, db, pools, terminationGuard, logger, exportType); + } + + @Override + public String getSource(Result result) { + return String.format("statement: cols(%d)", result.columns().size()); + } + + @Override + public Iterator> toIterator(ProgressReporter reporter, Result data) { + + return data.stream() + .peek(row -> { + row.forEach((key, val) -> { + final boolean notNodeNorRelationship = !(val instanceof Node) && !(val instanceof Relationship); + reporter.update(val instanceof Node ? 1 : 0, + val instanceof Relationship ? 1 : 0, + notNodeNorRelationship ? 1 : 0); + if (notNodeNorRelationship) { + reporter.nextRow(); + } + }); + }) + .iterator(); + } + + @Override + public Stream export(Result data, ParquetConfig config) { + return super.export(data, config); + } + + +} diff --git a/extended/src/main/java/apoc/export/parquet/ExportParquetResultStreamStrategy.java b/extended/src/main/java/apoc/export/parquet/ExportParquetResultStreamStrategy.java new file mode 100644 index 0000000000..4af89343ad --- /dev/null +++ b/extended/src/main/java/apoc/export/parquet/ExportParquetResultStreamStrategy.java @@ -0,0 +1,23 @@ +package apoc.export.parquet; + +import apoc.Pools; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Result; +import org.neo4j.logging.Log; +import org.neo4j.procedure.TerminationGuard; + +import java.util.Iterator; +import java.util.Map; + + +public class ExportParquetResultStreamStrategy extends ExportParquetStreamStrategy, Result> { + public ExportParquetResultStreamStrategy(GraphDatabaseService db, Pools pools, TerminationGuard terminationGuard, Log logger, ParquetExportType exportType) { + super(db, pools, terminationGuard, logger, exportType); + } + + @Override + public Iterator> toIterator(Result data) { + return data.stream() + .iterator(); + } +} diff --git a/extended/src/main/java/apoc/export/parquet/ExportParquetStrategy.java b/extended/src/main/java/apoc/export/parquet/ExportParquetStrategy.java new file mode 100644 index 0000000000..dcd7a9e242 --- /dev/null +++ b/extended/src/main/java/apoc/export/parquet/ExportParquetStrategy.java @@ -0,0 +1,44 @@ +package apoc.export.parquet; + +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; +import java.util.List; + + +public interface ExportParquetStrategy { + + OUT export(IN data, ParquetConfig config); + + default void writeRows(List rows, ParquetWriter writer, ParquetExportType type, MessageType schema) { + rows.stream() + .map(i -> type.toRecord(schema, i)) + .forEach(i -> { + try { + writer.write(i); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + rows.clear(); + } + + default ParquetWriter getBuild(MessageType schema, ExampleParquetWriter.Builder builder) { + try { + return builder + .withType(schema) + // TODO - configurable. This generate a .crc file + .withValidation(false) + // TODO - check other configs, e.g. .enableDictionaryEncoding(), .withDictionaryPageSize(2*1024) etc.. + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/extended/src/main/java/apoc/export/parquet/ExportParquetStreamStrategy.java b/extended/src/main/java/apoc/export/parquet/ExportParquetStreamStrategy.java new file mode 100644 index 0000000000..4781ffed88 --- /dev/null +++ b/extended/src/main/java/apoc/export/parquet/ExportParquetStreamStrategy.java @@ -0,0 +1,156 @@ +package apoc.export.parquet; + +import apoc.Pools; +import apoc.result.ByteArrayResult; +import apoc.util.QueueBasedSpliterator; +import apoc.util.QueueUtil; +import apoc.util.Util; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; +import org.apache.parquet.schema.MessageType; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.logging.Log; +import org.neo4j.procedure.TerminationGuard; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + + +public abstract class ExportParquetStreamStrategy implements ExportParquetStrategy> { + + private final GraphDatabaseService db; + private final Pools pools; + private final TerminationGuard terminationGuard; + private final Log logger; + private final ParquetExportType exportType; + + public ExportParquetStreamStrategy(GraphDatabaseService db, Pools pools, TerminationGuard terminationGuard, Log logger, ParquetExportType exportType) { + this.db = db; + this.pools = pools; + this.terminationGuard = terminationGuard; + this.logger = logger; + this.exportType = exportType; + } + + public Stream export(IN data, ParquetConfig config) { + final BlockingQueue queue = new ArrayBlockingQueue<>(100); + + Util.inTxFuture(pools.getDefaultExecutorService(), db, tx -> { + int batchCount = 0; + List rows = new ArrayList<>(config.getBatchSize()); + + try { + Iterator it = toIterator(data); + while (!Util.transactionIsTerminated(terminationGuard) && it.hasNext()) { + rows.add(it.next()); + + if (batchCount > 0 && batchCount % config.getBatchSize() == 0) { + byte[] bytes = writeBatch(rows, data, config); + QueueUtil.put(queue, new ByteArrayResult(bytes), 10); + } + ++batchCount; + } + if (!rows.isEmpty()) { + byte[] bytes = writeBatch(rows, data, config); + QueueUtil.put(queue, new ByteArrayResult(bytes), 10); + } + return true; + } catch (Exception e) { + logger.error("Exception while extracting Parquet data:", e); + } finally { + QueueUtil.put(queue, ByteArrayResult.NULL, 10); + } + return true; + }); + + QueueBasedSpliterator spliterator = new QueueBasedSpliterator<>(queue, ByteArrayResult.NULL, terminationGuard, Integer.MAX_VALUE); + return StreamSupport.stream(spliterator, false); + } + + private byte[] writeBatch(List rows, IN data, ParquetConfig config) { + List conf = exportType.createConfig(rows, data, config); + MessageType schema = exportType.schemaFor(db, conf); + try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream()) { + ParquetBufferedWriter out = new ParquetBufferedWriter(bytesOut); + + try (ParquetWriter writer = getBuild(schema, ExampleParquetWriter.builder(out))) { + writeRows(rows, writer, exportType, schema); + } + + return bytesOut.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public abstract Iterator toIterator(IN data); + + // create OutputFile + private record ParquetBufferedWriter(OutputStream out) implements OutputFile { + + @Override + public PositionOutputStream create(long blockSizeHint) { + return createPositionOutputstream(); + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException { + return createPositionOutputstream(); + } + + private PositionOutputStream createPositionOutputstream() { + return new PositionOutputStream() { + + int pos = 0; + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + pos++; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + pos += len; + } + }; + } + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return 0; + } + } +} diff --git a/extended/src/main/java/apoc/export/parquet/ImportParquet.java b/extended/src/main/java/apoc/export/parquet/ImportParquet.java new file mode 100644 index 0000000000..86b1d35f1c --- /dev/null +++ b/extended/src/main/java/apoc/export/parquet/ImportParquet.java @@ -0,0 +1,138 @@ +package apoc.export.parquet; + +import apoc.Pools; +import apoc.export.util.BatchTransaction; +import apoc.export.util.ProgressReporter; +import apoc.result.ProgressInfo; +import apoc.util.Util; +import org.neo4j.graphdb.Entity; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Label; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.RelationshipType; +import org.neo4j.logging.Log; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Mode; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; +import org.neo4j.values.storable.Value; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; + +import static apoc.export.parquet.ParquetReadUtil.getReader; +import static apoc.export.parquet.ParquetUtil.FIELD_ID; +import static apoc.export.parquet.ParquetUtil.FIELD_LABELS; +import static apoc.export.parquet.ParquetUtil.FIELD_SOURCE_ID; +import static apoc.export.parquet.ParquetUtil.FIELD_TARGET_ID; +import static apoc.export.parquet.ParquetUtil.FIELD_TYPE; + +public class ImportParquet { + + @Context + public GraphDatabaseService db; + + @Context + public Pools pools; + + @Context + public Log log; + + @Procedure(name = "apoc.import.parquet", mode = Mode.WRITE) + @Description("Imports nodes and relationships with the given labels and types from the provided CSV file.") + public Stream importParquet( + @Name("input") Object input, + @Name(value = "config", defaultValue = "{}") Map config) { + ProgressInfo result = + Util.inThread(pools, () -> { + + String file = null; + String sourceInfo = "binary"; + if (input instanceof String) { + file = (String) input; + sourceInfo = "file"; + } + final ParquetConfig conf = new ParquetConfig(config); + + final Map idMapping = new HashMap<>(); + try (ApocParquetReader reader = getReader(input, conf)) { + + final ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(file, sourceInfo, "parquet")); + + BatchTransaction btx = new BatchTransaction(db, conf.getBatchSize(), reporter); + + try { + Map recordMap; + while ((recordMap = reader.getRecord()) != null) { + + String relType = (String) recordMap.remove(FIELD_TYPE); + if (relType == null) { + // is node + Object[] stringLabels = (Object[]) recordMap.remove(FIELD_LABELS); + Label[] labels = Optional.ofNullable(stringLabels) + .map(l -> Arrays.stream(l).map(Object::toString).map(Label::label).toArray(Label[]::new)) + .orElse(new Label[]{}); + final Node node = btx.getTransaction().createNode(labels); + + long id = (long) recordMap.remove(FIELD_ID); + idMapping.put(id, node.getId()); + + addProps(recordMap, node); + reporter.update(1, 0, recordMap.size()); + } else { + // is relationship + long sourceId = (long) recordMap.remove(FIELD_SOURCE_ID); + Long idSource = idMapping.get(sourceId); + final Node source = btx.getTransaction().getNodeById(idSource); + + long targetId = (long) recordMap.remove(FIELD_TARGET_ID); + Long idTarget = idMapping.get(targetId); + final Node target = btx.getTransaction().getNodeById(idTarget); + + final Relationship rel = source.createRelationshipTo(target, RelationshipType.withName(relType)); + addProps(recordMap, rel); + reporter.update(0, 1, recordMap.size()); + } + + btx.increment(); + } + btx.doCommit(); + } catch (RuntimeException e) { + btx.rollback(); + throw e; + } finally { + btx.close(); + } + + return reporter.getTotal(); + } + }); + return Stream.of(result); + } + + private void addProps(Map recordMap, Entity rel) { + recordMap.forEach((k, v)-> { + Object value = getNeo4jObject(v); + rel.setProperty(k, value); + }); + } + + private Object getNeo4jObject(Object object) { + if (object instanceof Value) { + return ((Value) object).asObject(); + } + if (object instanceof Collection) { + // convert to String[], other array types can be converted via `mapping` config + return ((Collection) object) + .stream() + .map(Object::toString).toArray(String[]::new); + } + return object; + } +} diff --git a/extended/src/main/java/apoc/export/parquet/ParquetConfig.java b/extended/src/main/java/apoc/export/parquet/ParquetConfig.java new file mode 100644 index 0000000000..dcf3f2fd13 --- /dev/null +++ b/extended/src/main/java/apoc/export/parquet/ParquetConfig.java @@ -0,0 +1,34 @@ +package apoc.export.parquet; + +import apoc.util.Util; +import org.apache.parquet.hadoop.ParquetFileWriter; + +import java.util.Collections; +import java.util.Map; + +public class ParquetConfig { + + private final int batchSize; + + private final Map config; + private final Map mapping; + + public ParquetConfig(Map config) { + this.config = config == null ? Collections.emptyMap() : config; + this.batchSize = Util.toInteger(this.config.getOrDefault("batchSize", 20000)); + this.mapping = (Map) this.config.getOrDefault("mapping", Map.of()); + } + + public int getBatchSize() { + return batchSize; + } + + public Map getConfig() { + return config; + } + + public Map getMapping() { + return mapping; + } +} + diff --git a/extended/src/main/java/apoc/export/parquet/ParquetExportType.java b/extended/src/main/java/apoc/export/parquet/ParquetExportType.java new file mode 100644 index 0000000000..c8c536ec1d --- /dev/null +++ b/extended/src/main/java/apoc/export/parquet/ParquetExportType.java @@ -0,0 +1,173 @@ +package apoc.export.parquet; + +import apoc.meta.Types; +import apoc.util.Util; +import apoc.util.collection.Iterables; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.neo4j.cypher.export.SubGraph; +import org.neo4j.graphdb.Entity; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Label; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.RelationshipType; +import org.neo4j.graphdb.Result; +import org.neo4j.graphdb.ResultTransformer; + +import java.util.AbstractMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static apoc.export.parquet.ParquetUtil.*; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*; + +public interface ParquetExportType { + enum Type { + RESULT(new ResultType()), + GRAPH(new GraphType()); + + private final ParquetExportType graphType; + + Type(ParquetExportType graphType) { + this.graphType = graphType; + } + + public static ParquetExportType from(Object data) { + Type type = data instanceof Result + ? Type.RESULT + : Type.GRAPH; + + return type.graphType; + } + } + + MessageType schemaFor(GraphDatabaseService db, List> type); + Group toRecord(MessageType schema, ROW data); + List> createConfig(List row, TYPE data, ParquetConfig config); + + class GraphType implements ParquetExportType { + + private MessageType schema; + private List> config; + + @Override + public MessageType schemaFor(GraphDatabaseService db, List> type) { + + if (this.schema != null) { + return this.schema; + } + org.apache.parquet.schema.Types.GroupBuilder messageTypeBuilder = org.apache.parquet.schema.Types.buildMessage(); + + final Predicate> filterStream = m -> m.get("propertyName") != null; + final ResultTransformer parsePropertiesResult = result -> { + result.stream() + .filter(filterStream) + .forEach(m -> { + String propertyName = (String) m.get("propertyName"); + List propertyTypes = ((List>) m.get("types")) + .stream().flatMap(List::stream) + .collect(Collectors.toList()); + toField(propertyName, new HashSet<>(propertyTypes), messageTypeBuilder); + }); + return null; + }; + + Map confMap = type.get(0); + final Map parameters = Map.of("config", confMap); + + // group by `propertyName` in order to + String query = "CALL apoc.meta.%s($config) " + + "YIELD propertyName, propertyTypes " + + "RETURN propertyName, collect(propertyTypes) as types"; + + db.executeTransactionally(String.format(query, "nodeTypeProperties"), + parameters, parsePropertiesResult); + + getField(messageTypeBuilder, INT64, FIELD_ID); + addListItem(FIELD_LABELS, messageTypeBuilder); + + if (confMap.containsKey("includeRels")) { + db.executeTransactionally(String.format(query, "relTypeProperties"), + parameters, parsePropertiesResult); + getField(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.INT64, FIELD_SOURCE_ID); + getField(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.INT64, FIELD_TARGET_ID); + getField(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.BINARY, FIELD_TYPE); + } + + this.schema = messageTypeBuilder.named("apocExport"); + return this.schema; + } + + @Override + public Group toRecord(MessageType schema, Entity entity) { + + Group group = mapToRecord(schema, entity.getAllProperties()); + if (entity instanceof Node) { + group.append(FIELD_ID, entity.getId()); + appendList(group, FIELD_LABELS, Util.labelStrings((Node) entity)); + } else { + Relationship rel = (Relationship) entity; + group.append(FIELD_TYPE, rel.getType().name()); + group.append(FIELD_SOURCE_ID, rel.getStartNodeId()); + group.append(FIELD_TARGET_ID, rel.getEndNodeId()); + } + + return group; + } + + @Override + public List> createConfig(List entity, SubGraph data, ParquetConfig config) { + if (this.config != null) { + return this.config; + } + final List allLabelsInUse = Iterables.stream(data.getAllLabelsInUse()) + .map(Label::name) + .collect(Collectors.toList()); + final List allRelationshipTypesInUse = Iterables.stream(data.getAllRelationshipTypesInUse()) + .map(RelationshipType::name) + .collect(Collectors.toList()); + Map configMap = new HashMap<>(); + configMap.put("includeLabels", allLabelsInUse); + if (!allRelationshipTypesInUse.isEmpty()) { + configMap.put("includeRels", allRelationshipTypesInUse); + } + configMap.putAll(config.getConfig()); + this.config = List.of(configMap); + return this.config; + } + } + + class ResultType implements ParquetExportType> { + + @Override + public MessageType schemaFor(GraphDatabaseService db, List> type) { + // we re-calculate the schema for each batch + org.apache.parquet.schema.Types.GroupBuilder messageTypeBuilder = org.apache.parquet.schema.Types.buildMessage(); + + type.stream() + .flatMap(m -> m.entrySet().stream()) + .map(e -> new AbstractMap.SimpleEntry<>(e.getKey(), fromMetaType(Types.of(e.getValue())))) + .collect(Collectors.groupingBy(AbstractMap.SimpleEntry::getKey, Collectors.mapping(AbstractMap.SimpleEntry::getValue, Collectors.toSet()))) + .forEach((key, value) -> toField(key, value, messageTypeBuilder)); + + return messageTypeBuilder.named("apocExport"); + } + + @Override + public Group toRecord(MessageType schema, Map map) { + return mapToRecord(schema, map); + } + + @Override + public List> createConfig(List> row, Result data, ParquetConfig config) { + return row; + } + } + +} diff --git a/extended/src/main/java/apoc/export/parquet/ParquetReadUtil.java b/extended/src/main/java/apoc/export/parquet/ParquetReadUtil.java new file mode 100644 index 0000000000..4f6dd49922 --- /dev/null +++ b/extended/src/main/java/apoc/export/parquet/ParquetReadUtil.java @@ -0,0 +1,233 @@ +package apoc.export.parquet; + +import apoc.ApocConfig; +import apoc.load.LoadParquet; +import apoc.util.JsonUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.DelegatingSeekableInputStream; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.SeekableInputStream; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.neo4j.values.storable.DateTimeValue; +import org.neo4j.values.storable.DateValue; +import org.neo4j.values.storable.DurationValue; +import org.neo4j.values.storable.LocalDateTimeValue; +import org.neo4j.values.storable.LocalTimeValue; +import org.neo4j.values.storable.PointValue; +import org.neo4j.values.storable.TimeValue; +import org.neo4j.values.storable.Values; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static apoc.util.FileUtils.changeFileUrlIfImportDirectoryConstrained; + +public class ParquetReadUtil { + + public static Object toValidValue(Object object, String field, ParquetConfig config) { + Object fieldName = config.getMapping().get(field); + if (object != null && fieldName != null) { + return convertValue(object.toString(), fieldName.toString()); + } + + if (object instanceof Collection) { + // if there isn't a mapping config, we convert the list to a String[] + return ((Collection) object).stream() + .map(i -> toValidValue(i, field, config)) + .collect(Collectors.toList()) + .toArray(new String[0]); + } + if (object instanceof Map) { + return ((Map) object).entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> toValidValue(e.getValue(), field, config))); + } + try { + // we test if is a valid Neo4j type + Values.of(object); + return object; + } catch (Exception e) { + // otherwise we try to coerce it + return object.toString(); + } + } + + /** + * In case of complex type non-readable from Parquet, i.e. Duration, Point, List of Neo4j Types... + * we can use the `mapping: {keyToConvert: valueTypeName}` config to convert them. + * For example `mapping: {myPropertyKey: "DateArray"}` + */ + private static Object convertValue(String value, String typeName) { + switch (typeName) { + case "Point": + return PointValue.parse(value); + case "LocalDateTime": + return LocalDateTimeValue.parse(value).asObjectCopy(); + case "LocalTime": + return LocalTimeValue.parse(value).asObjectCopy(); + case "DateTime": + return DateTimeValue.parse(value, () -> ZoneId.of("Z")).asObjectCopy(); + case "Time": + return TimeValue.parse(value, () -> ZoneId.of("Z")).asObjectCopy(); + case "Date": + return DateValue.parse(value).asObjectCopy(); + case "Duration": + return DurationValue.parse(value); + case "Char": + return value.charAt(0); + case "Byte": + return value.getBytes(); + case "Double": + return Double.parseDouble(value); + case "Float": + return Float.parseFloat(value); + case "Short": + return Short.parseShort(value); + case "Int": + return Integer.parseInt(value); + case "Long": + return Long.parseLong(value); + case "Node", "Relationship": + return JsonUtil.parse(value, null, Map.class); + case "NO_VALUE": + return null; + default: + // If ends with "Array", for example StringArray + if (typeName.endsWith("Array")) { + value = StringUtils.removeStart(value, "["); + value = StringUtils.removeEnd(value, "]"); + String array = typeName.replace("Array", ""); + + final Object[] prototype = getPrototypeFor(array); + return Arrays.stream(value.split(",")) + .map(item -> convertValue(StringUtils.trim(item), array)) + .toList() + .toArray(prototype); + } + return value; + } + } + + // similar to CsvPropertyConverter + static Object[] getPrototypeFor(String type) { + switch (type) { + case "Long": + return new Long[]{}; + case "Integer": + return new Integer[]{}; + case "Double": + return new Double[]{}; + case "Float": + return new Float[]{}; + case "Boolean": + return new Boolean[]{}; + case "Byte": + return new Byte[]{}; + case "Short": + return new Short[]{}; + case "Char": + return new Character[]{}; + case "String": + return new String[]{}; + case "DateTime": + return new ZonedDateTime[]{}; + case "LocalTime": + return new LocalTime[]{}; + case "LocalDateTime": + return new LocalDateTime[]{}; + case "Point": + return new PointValue[]{}; + case "Time": + return new OffsetTime[]{}; + case "Date": + return new LocalDate[]{}; + case "Duration": + return new DurationValue[]{}; + default: + throw new IllegalStateException("Type " + type + " not supported."); + } + } + + public static java.util.concurrent.TimeUnit toTimeUnitJava(LogicalTypeAnnotation.TimeUnit unit) { + return switch (unit) { + case NANOS -> TimeUnit.NANOSECONDS; + case MICROS -> TimeUnit.MICROSECONDS; + case MILLIS -> TimeUnit.MILLISECONDS; + }; + } + + + public static InputFile getInputFile(Object source) throws IOException { + if (source instanceof String) { + ApocConfig.apocConfig().isImportFileEnabled(); + String fileName = changeFileUrlIfImportDirectoryConstrained((String) source); + Path file = new Path(fileName); + return HadoopInputFile.fromPath(file, new Configuration()); + } + return new ParquetStream((byte[]) source); + } + + public static ApocParquetReader getReader(Object source, ParquetConfig conf) { + + try { + return new ApocParquetReader(getInputFile(source), conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static class ParquetStream implements InputFile { + private final byte[] data; + + private static class SeekableByteArrayInputStream extends ByteArrayInputStream { + public SeekableByteArrayInputStream(byte[] buf) { + super(buf); + } + + public void setPos(int pos) { + this.pos = pos; + } + + public int getPos() { + return this.pos; + } + } + + public ParquetStream(byte[] stream) { + this.data = stream; + } + + @Override + public long getLength() { + return this.data.length; + } + + @Override + public SeekableInputStream newStream() { + return new DelegatingSeekableInputStream(new SeekableByteArrayInputStream(this.data)) { + @Override + public void seek(long newPos) { + ((SeekableByteArrayInputStream) this.getStream()).setPos((int) newPos); + } + + @Override + public long getPos() { + return ((SeekableByteArrayInputStream) this.getStream()).getPos(); + } + }; + } + } +} diff --git a/extended/src/main/java/apoc/export/parquet/ParquetUtil.java b/extended/src/main/java/apoc/export/parquet/ParquetUtil.java new file mode 100644 index 0000000000..1ae3eb619b --- /dev/null +++ b/extended/src/main/java/apoc/export/parquet/ParquetUtil.java @@ -0,0 +1,227 @@ +package apoc.export.parquet; + + +import apoc.convert.ConvertUtils; +import apoc.util.JsonUtil; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.NanoTime; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; + +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.Date; +import java.util.Map; +import java.util.Set; + +import static apoc.util.Util.labelStrings; +import static org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; +import static org.apache.parquet.schema.LogicalTypeAnnotation.ListLogicalTypeAnnotation; +import static org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Types.GroupBuilder; +import static org.apache.parquet.schema.Types.optional; +import static org.apache.parquet.schema.Types.optionalList; + +public class ParquetUtil { + public static String FIELD_ID = "__id"; + public static String FIELD_LABELS = "__labels"; + public static String FIELD_SOURCE_ID = "__source_id"; + public static String FIELD_TARGET_ID = "__target_id"; + public static String FIELD_TYPE = "__type"; + + public static String fromMetaType(apoc.meta.Types type) { + switch (type) { + case INTEGER: + return "LONG"; + case FLOAT: + return "DOUBLE"; + case LIST: + String inner = type.toString().substring("LIST OF ".length()).trim(); + final apoc.meta.Types innerType = apoc.meta.Types.from(inner); + if (innerType == apoc.meta.Types.LIST || innerType == apoc.meta.Types.MAP ) { + return "ANYARRAY"; + } + return fromMetaType(innerType) + "ARRAY"; + default: + return type.name().replaceAll("_", "").toUpperCase(); + } + } + + public static Group mapToRecord(MessageType schema, Map map) { + GroupFactory factory = new SimpleGroupFactory(schema); + Group group = factory.newGroup(); + + map.forEach((k, v)-> { + try { + Type type = schema.getType(k); + if (type.getLogicalTypeAnnotation() instanceof ListLogicalTypeAnnotation) { + appendList(group, k, v); + } else { + appendElement(group, k, v, schema); + } + } catch (Exception e2) { + throw new RuntimeException(e2); + } + }); + return group; + } + + public static void appendList(Group group, String k, Object v) { + Group group1 = group.addGroup(k); + ConvertUtils.convertToList(v).forEach(item -> { + Group group2 = group1.addGroup(0); + group2.add(0, item.toString()); + }); + } + + private static long writeDateMilliVector(Object value) { + if (value instanceof Date) { + return ((Date) value).getTime(); + } else if (value instanceof LocalDateTime) { + return ((LocalDateTime) value) + .toInstant(ZoneOffset.UTC) + .toEpochMilli(); + } else if (value instanceof ZonedDateTime) { + return ((ZonedDateTime) value) + .toInstant() + .toEpochMilli(); + } else if (value instanceof OffsetDateTime) { + return ((OffsetDateTime) value) + .toInstant() + .toEpochMilli(); + } else { + return (long) value; + } + } + + public static void appendElement(Group group, String fieldName, Object value, MessageType schema) { + if (value == null) { + return; + } + + PrimitiveType.PrimitiveTypeName typeName = schema.getType(fieldName) + .asPrimitiveType() + .getPrimitiveTypeName(); + if (typeName.equals(INT64)) { + group.append(fieldName, writeDateMilliVector(value)); + } else if (typeName.equals(BINARY)) { + group.append(fieldName, serializeValue(value)); + } else if (value instanceof Integer) { + group.append(fieldName, (int) value); + } else if (value instanceof Float) { + group.append(fieldName, (float) value); + } else if (value instanceof Double) { + group.append(fieldName, (double) value); + } else if (value instanceof Long) { + group.append(fieldName, (long) value); + } else if (value instanceof NanoTime) { + group.append(fieldName, (NanoTime) value); + } else if (value instanceof Boolean) { + group.append(fieldName, (boolean) value); + } else { + // fallback + group.append(fieldName, serializeValue(value)); + } + + } + + private static String serializeValue(Object val){ + if (val instanceof Node) { + Node value = (Node) val; + Map allProperties = value.getAllProperties(); + allProperties.put(FIELD_ID, value.getId()); + allProperties.put(FIELD_LABELS, labelStrings(value)); + return JsonUtil.writeValueAsString(allProperties); + } + if (val instanceof Relationship) { + Relationship value = (Relationship) val; + Map allProperties = value.getAllProperties(); + allProperties.put(FIELD_ID, value.getId()); + allProperties.put(FIELD_SOURCE_ID, value.getStartNodeId()); + allProperties.put(FIELD_TARGET_ID, value.getEndNodeId()); + allProperties.put(FIELD_TYPE, value.getType().name()); + return JsonUtil.writeValueAsString(allProperties); + } + if (val instanceof Map) { + return JsonUtil.writeValueAsString(val); + } + return val.toString(); + } + + public static void addListItem(String fieldName, GroupBuilder test) { + PrimitiveType element = optional(BINARY).named("element"); + GroupType groupType = optionalList() + .element(element) + .named(fieldName); + test.addField(groupType); + } + + static void toField(String fieldName, Set propertyTypes, GroupBuilder builder) { + + if (propertyTypes.size() > 1) { + // multi type handled as a string + getSchemaFieldAssembler(builder, fieldName, "String"); + } else { + getSchemaFieldAssembler(builder, fieldName, propertyTypes.iterator().next()); + } + } + + public static void getField(GroupBuilder builder, PrimitiveType.PrimitiveTypeName type, String fieldName) { + builder.addField(optional(type).named(fieldName)); + } + + private static void getSchemaFieldAssembler(GroupBuilder builder, String fieldName, String propertyType) { + propertyType = propertyType.toUpperCase(); + + switch (propertyType) { + case "BOOLEAN" -> builder.addField(optional(BOOLEAN).named(fieldName)); + case "LONG" -> builder.addField(optional(INT64).named(fieldName)); + case "DOUBLE" -> builder.addField(optional(DOUBLE).named(fieldName)); + case "DATETIME" -> addDateTimeField(builder, fieldName, true); + case "LOCALDATETIME" -> addDateTimeField(builder, fieldName, false); + case "DATE" -> { + PrimitiveType type = optional(INT64) + .as(DateLogicalTypeAnnotation.dateType()) + .named(fieldName); + builder.addField(type); + } + case "DURATION", "NODE", "RELATIONSHIP", "POINT" -> { + // convert each type not manageable from parquet to string, + // which can be re-imported via mapping config + builder.addField(optional(BINARY).named(fieldName)); + } + default -> { + if (propertyType.endsWith("ARRAY")) { + // convert each type not manageable from parquet to string, + // which can be re-imported via mapping config + addListItem(fieldName, builder); + } else { + builder.addField(optional(BINARY).named(fieldName)); + } + } + } + } + + private static Types.BaseGroupBuilder addDateTimeField(GroupBuilder builder, String fieldName, boolean isAdjustedToUTC) { + TimestampLogicalTypeAnnotation type = TimestampLogicalTypeAnnotation.timestampType(isAdjustedToUTC, LogicalTypeAnnotation.TimeUnit.MILLIS); + PrimitiveType primitiveType = optional(INT64) + .as(type) + .named(fieldName); + return builder.addField(primitiveType); + } +} diff --git a/extended/src/main/java/apoc/load/LoadParquet.java b/extended/src/main/java/apoc/load/LoadParquet.java new file mode 100644 index 0000000000..a685285142 --- /dev/null +++ b/extended/src/main/java/apoc/load/LoadParquet.java @@ -0,0 +1,73 @@ +package apoc.load; + +import apoc.export.parquet.ApocParquetReader; +import apoc.export.parquet.ParquetConfig; +import apoc.result.MapResult; +import apoc.util.Util; +import org.apache.parquet.io.DelegatingSeekableInputStream; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.SeekableInputStream; +import org.neo4j.logging.Log; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Map; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.function.Consumer; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static apoc.export.parquet.ParquetReadUtil.getReader; + +public class LoadParquet { + + @Context public Log log; + + + private static class ParquetSpliterator extends Spliterators.AbstractSpliterator { + + private final ApocParquetReader reader; + + public ParquetSpliterator(ApocParquetReader reader) { + super(Long.MAX_VALUE, Spliterator.ORDERED); + this.reader = reader; + } + + @Override + public synchronized boolean tryAdvance(Consumer action) { + try { + Map read = reader.getRecord(); + if (read != null) { + MapResult result = new MapResult(read); + action.accept(result); + return true; + } + + return false; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + @Procedure(name = "apoc.load.parquet") + @Description("Load parquet from the provided file or binary") + public Stream load( + @Name("input") Object input, + @Name(value = "config", defaultValue = "{}") Map config) throws IOException { + + ParquetConfig conf = new ParquetConfig(config); + + ApocParquetReader reader = getReader(input, conf); + return StreamSupport.stream(new ParquetSpliterator(reader),false) + .onClose(() -> Util.close(reader)); + } + + + +} diff --git a/extended/src/test/java/apoc/ComparePerformancesTest.java b/extended/src/test/java/apoc/ComparePerformancesTest.java new file mode 100644 index 0000000000..f9c5dfd9d1 --- /dev/null +++ b/extended/src/test/java/apoc/ComparePerformancesTest.java @@ -0,0 +1,95 @@ +package apoc; + +import apoc.export.parquet.ImportParquet; +import apoc.export.csv.ExportCSV; +import apoc.export.csv.ImportCsv; +import apoc.export.parquet.ExportParquet; +import apoc.load.LoadParquet; +import apoc.meta.Meta; +import apoc.util.TestUtil; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.neo4j.configuration.GraphDatabaseSettings; +import org.neo4j.test.rule.DbmsRule; +import org.neo4j.test.rule.ImpermanentDbmsRule; + +import java.io.File; +import java.util.Map; +import java.util.stream.IntStream; + +import static apoc.ApocConfig.APOC_EXPORT_FILE_ENABLED; +import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED; +import static apoc.ApocConfig.apocConfig; +import static org.junit.Assert.assertEquals; +import static org.neo4j.configuration.GraphDatabaseSettings.TransactionStateMemoryAllocation.OFF_HEAP; +import static org.neo4j.configuration.SettingValueParsers.BYTES; + +@Ignore("This test compare import/export procedures performances, we ignore it since it's slow and just log the times spent") +public class ComparePerformancesTest { + private static final File directory = new File("target/import"); + static { //noinspection ResultOfMethodCallIgnored + directory.mkdirs(); + } + + @Rule + public DbmsRule db = new ImpermanentDbmsRule() + .withSetting(GraphDatabaseSettings.memory_tracking, true) + .withSetting(GraphDatabaseSettings.tx_state_memory_allocation, OFF_HEAP) + .withSetting(GraphDatabaseSettings.tx_state_max_off_heap_memory, BYTES.parse("4G")) + .withSetting(GraphDatabaseSettings.load_csv_file_url_root, directory.toPath().toAbsolutePath()); + + @Before + public void setUp() throws Exception { + TestUtil.registerProcedure(db, ImportParquet.class, ExportParquet.class, ExportCSV.class, Meta.class, ImportCsv.class, LoadParquet.class); + apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, true); + apocConfig().setProperty(APOC_EXPORT_FILE_ENABLED, true); + + IntStream.range(0, 50) + .forEach(__-> db.executeTransactionally("UNWIND range(0, 19999) as id WITH id " + + "CREATE (:Start {idStart: id})-[:REL {idRel: id}]->(:End {idEnd: id})")); + } + + @Test + public void testPerformanceImportAndExportCsv() { + exportCsv(); + importCsv(); + } + + @Test + public void testPerformanceImportAndExportParquet() { + exportParquet(); + importParquet(); + } + + private void exportParquet() { + testPerformanceCommon("CALL apoc.export.parquet.all('test.parquet')", "endExportParquet = "); + } + + private void exportCsv() { + testPerformanceCommon("CALL apoc.export.csv.all('test.csv', {bulkImport: true})", "endExportCsv = "); + } + + private void importCsv() { + testPerformanceCommon("CALL apoc.import.csv([{fileName: 'test.nodes.Start.csv', labels: ['Start']}," + + "{fileName: 'test.nodes.End.csv', labels: ['End']}], [{fileName: 'test.relationships.REL.csv', type: 'REL'}], {}) ", "endImportCsv = "); + } + + private void importParquet() { + testPerformanceCommon("CALL apoc.import.parquet('test.parquet')", "endImportParquet = "); + } + + private void testPerformanceCommon(String call, String printTime) { + long start = System.currentTimeMillis(); + TestUtil.testCall(db, call, this::progressInfoAssertion); + long end = System.currentTimeMillis() - start; + System.out.println(printTime + end); + } + + private void progressInfoAssertion(Map r) { + assertEquals(2000000L, r.get("nodes")); + assertEquals(1000000L, r.get("relationships")); + } + +} diff --git a/extended/src/test/java/apoc/export/parquet/ParquetHdfsTest.java b/extended/src/test/java/apoc/export/parquet/ParquetHdfsTest.java new file mode 100644 index 0000000000..0bbee9093f --- /dev/null +++ b/extended/src/test/java/apoc/export/parquet/ParquetHdfsTest.java @@ -0,0 +1,80 @@ +package apoc.export.parquet; + +import apoc.util.HdfsTestUtils; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.neo4j.configuration.GraphDatabaseSettings; +import org.neo4j.test.rule.DbmsRule; +import org.neo4j.test.rule.ImpermanentDbmsRule; + +import java.io.File; +import java.util.Map; + +import static apoc.export.parquet.ParquetTest.MAPPING_ALL; +import static apoc.export.parquet.ParquetTestUtil.beforeClassCommon; +import static apoc.export.parquet.ParquetTestUtil.beforeCommon; +import static apoc.export.parquet.ParquetTestUtil.testImportAllCommon; +import static apoc.util.TestUtil.testResult; +import static org.junit.Assert.assertEquals; + +public class ParquetHdfsTest { + + private static final File directory = new File("target/hdfs-parquet-import"); + static { //noinspection ResultOfMethodCallIgnored + directory.mkdirs(); + } + + @ClassRule + public static DbmsRule db = new ImpermanentDbmsRule() + .withSetting(GraphDatabaseSettings.load_csv_file_url_root, directory.toPath().toAbsolutePath()); + + private static MiniDFSCluster miniDFSCluster; + + @BeforeClass + public static void setUp() throws Exception { + beforeClassCommon(db); + miniDFSCluster = HdfsTestUtils.getLocalHDFSCluster(); + } + + @Before + public void before() { + beforeCommon(db); + } + + @AfterClass + public static void tearDown() { + if (miniDFSCluster!= null) { + miniDFSCluster.shutdown(); + } + db.shutdown(); + } + + @Test + public void testFileRoundtripParquetAll() { + String hdfsUrl = String.format("%s/user/%s/all.parquet", miniDFSCluster.getURI().toString(), System.getProperty("user.name")); + + // check export procedure + String file = db.executeTransactionally("CALL apoc.export.parquet.all($url) YIELD file", + Map.of("url", hdfsUrl), + ParquetTestUtil::extractFileName); + + // check that file extracted from apoc.export is equals to `hdfs://path/to/file` url + assertEquals(hdfsUrl, file); + + // check load procedure + final String query = "CALL apoc.load.parquet($file, $config) YIELD value " + + "RETURN value"; + + testResult(db, query, Map.of("file", file, "config", MAPPING_ALL), + ParquetTestUtil::roundtripLoadAllAssertions); + + // check import procedure + Map params = Map.of("file", file, "config", MAPPING_ALL); + testImportAllCommon(db, params); + + } +} diff --git a/extended/src/test/java/apoc/export/parquet/ParquetS3Test.java b/extended/src/test/java/apoc/export/parquet/ParquetS3Test.java new file mode 100644 index 0000000000..a6bee21c8e --- /dev/null +++ b/extended/src/test/java/apoc/export/parquet/ParquetS3Test.java @@ -0,0 +1,59 @@ +package apoc.export.parquet; + +import apoc.util.s3.S3BaseTest; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.neo4j.test.rule.DbmsRule; +import org.neo4j.test.rule.ImpermanentDbmsRule; + +import java.util.Map; + +import static apoc.export.parquet.ParquetTest.MAPPING_ALL; +import static apoc.export.parquet.ParquetTest.MAPPING_QUERY; +import static apoc.export.parquet.ParquetTest.testReturnNodeAndRelCommon; +import static apoc.export.parquet.ParquetTestUtil.beforeClassCommon; +import static apoc.export.parquet.ParquetTestUtil.beforeCommon; +import static apoc.util.TestUtil.testResult; + +public class ParquetS3Test extends S3BaseTest { + + @ClassRule + public static DbmsRule db = new ImpermanentDbmsRule(); + + @BeforeClass + public static void beforeClass() { + beforeClassCommon(db); + } + + @Before + public void before() { + beforeCommon(db); + } + + @Test + public void testFileRoundtripParquetAll() { + // given - when + String file = db.executeTransactionally("CALL apoc.export.parquet.all('test_all.parquet') YIELD file", + Map.of(), + ParquetTestUtil::extractFileName); + + // then + final String query = "CALL apoc.load.parquet($file, $config) YIELD value " + + "RETURN value"; + + testResult(db, query, Map.of("file", file, "config", MAPPING_ALL), + ParquetTestUtil::roundtripLoadAllAssertions); + } + + @Test + public void testReturnNodeAndRel() { + testReturnNodeAndRelCommon(() -> db.executeTransactionally( + "CALL apoc.export.parquet.query('MATCH (n:ParquetNode)-[r:BAR]->(o:Other) RETURN n,r,o ORDER BY n.idStart', " + + "'volume_test.parquet', $config) YIELD file ", + Map.of("config", MAPPING_QUERY), + ParquetTestUtil::extractFileName)); + } + +} diff --git a/extended/src/test/java/apoc/export/parquet/ParquetTest.java b/extended/src/test/java/apoc/export/parquet/ParquetTest.java new file mode 100644 index 0000000000..57e1a1e81c --- /dev/null +++ b/extended/src/test/java/apoc/export/parquet/ParquetTest.java @@ -0,0 +1,308 @@ +package apoc.export.parquet; + +import apoc.graph.Graphs; +import apoc.load.LoadParquet; +import apoc.meta.Meta; +import apoc.util.TestUtil; +import apoc.util.collection.Iterators; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.neo4j.configuration.GraphDatabaseSettings; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.ResourceIterator; +import org.neo4j.graphdb.Result; +import org.neo4j.test.rule.DbmsRule; +import org.neo4j.test.rule.ImpermanentDbmsRule; +import org.neo4j.values.storable.DateValue; +import org.neo4j.values.storable.LocalDateTimeValue; + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static apoc.ApocConfig.APOC_EXPORT_FILE_ENABLED; +import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED; +import static apoc.ApocConfig.LOAD_FROM_FILE_ERROR; +import static apoc.ApocConfig.apocConfig; +import static apoc.export.parquet.ExportParquet.EXPORT_TO_FILE_PARQUET_ERROR; +import static apoc.export.parquet.ParquetTestUtil.assertBarRel; +import static apoc.export.parquet.ParquetTestUtil.assertNodeAndLabel; +import static apoc.export.parquet.ParquetTestUtil.beforeClassCommon; +import static apoc.export.parquet.ParquetTestUtil.beforeCommon; +import static apoc.export.parquet.ParquetTestUtil.testImportAllCommon; +import static apoc.util.TestUtil.testCall; +import static apoc.util.TestUtil.testResult; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + + +public class ParquetTest { + + public static final Map> MAPPING_ALL = Map.of("mapping", + Map.of("bffSince", "Duration", "place", "Point", + "listDate", "DateArray", "listInt", "LongArray") + ); + public static final Map> MAPPING_QUERY = Map.of("mapping", + Map.of("n", "Node", "r", "Relationship", "o", "Node") + ); + private static File directory = new File("target/parquet import"); + static { //noinspection ResultOfMethodCallIgnored + directory.mkdirs(); + } + + @ClassRule + public static DbmsRule db = new ImpermanentDbmsRule() + .withSetting(GraphDatabaseSettings.load_csv_file_url_root, directory.toPath().toAbsolutePath()); + + @BeforeClass + public static void beforeClass() { + beforeClassCommon(db); + } + + @Before + public void before() { + beforeCommon(db); + } + + @Test + public void testStreamRoundtripParquetQueryMultipleTypes() { + List values = List.of(1L, "", 7.0, DateValue.parse("1999"), LocalDateTimeValue.parse("2023-06-14T08:38:28.193000000")); + + final byte[] byteArray = db.executeTransactionally( + "CALL apoc.export.parquet.query.stream('UNWIND $values AS item RETURN item', {params: {values: $values}})", + Map.of("values", values), + ParquetTest::extractByteArray); + + // then + final String query = "CALL apoc.load.parquet($byteArray, $config) YIELD value " + + "RETURN value"; + testResult(db, query, Map.of("byteArray", byteArray, "config", MAPPING_QUERY), result -> { + List> value = Iterators.asList(result.columnAs("value")); + Set expected = Set.of("", "1", "7.0", "2023-06-14T08:38:28.193", "1999-01-01"); + Set actual = value.stream() + .flatMap(i -> i.values().stream()) + .collect(Collectors.toSet()); + assertEquals(expected, actual); + }); + } + + @Test + public void testFileRoundtripParquetGraph() { + // given - when + String file = db.executeTransactionally("CALL apoc.graph.fromDB('neo4j',{}) yield graph " + + "CALL apoc.export.parquet.graph(graph, 'graph_test.parquet') YIELD file " + + "RETURN file", + Map.of(), + ParquetTestUtil::extractFileName); + + // then + final String query = "CALL apoc.load.parquet($file, $config) YIELD value " + + "RETURN value"; + testResult(db, query, Map.of("file", file, "config", MAPPING_ALL), + ParquetTestUtil::roundtripLoadAllAssertions); + } + + @Test + public void testStreamRoundtripParquetAllWithImportExportConfsDisabled() { + // disable both export and import configs + apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, false); + apocConfig().setProperty(APOC_EXPORT_FILE_ENABLED, false); + + // should work regardless of the previous config + testStreamRoundtripAllCommon(); + } + + @Test + public void testExportFileWithConfigDisabled() { + apocConfig().setProperty(APOC_EXPORT_FILE_ENABLED, false); + + assertFails("CALL apoc.export.parquet.all('ignore.parquet')", EXPORT_TO_FILE_PARQUET_ERROR); + } + + @Test + public void testLoadImportFiletWithConfigDisabled() { + apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, false); + + assertFails("CALL apoc.load.parquet('ignore.parquet')", LOAD_FROM_FILE_ERROR); + assertFails("CALL apoc.import.parquet('ignore.parquet')", LOAD_FROM_FILE_ERROR); + } + + private static void assertFails(String call, String expectedErrMsg) { + try { + testCall(db, call, r -> fail("Should fail due to " + expectedErrMsg)); + } catch (Exception e) { + String actualErrMsg = e.getMessage(); + assertTrue("Actual err. message is: " + actualErrMsg, actualErrMsg.contains(expectedErrMsg)); + } + } + + @Test + public void testStreamRoundtripParquetAll() { + testStreamRoundtripAllCommon(); + } + + private static void testStreamRoundtripAllCommon() { + // given - when + final byte[] bytes = db.executeTransactionally("CALL apoc.export.parquet.all.stream()", + Map.of(), + ParquetTest::extractByteArray); + + // then + final String query = "CALL apoc.load.parquet($bytes, $config) YIELD value " + + "RETURN value"; + + testResult(db, query, Map.of("bytes", bytes, "config", MAPPING_ALL), + ParquetTestUtil::roundtripLoadAllAssertions); + } + + @Test + public void testStreamRoundtripWithAnotherpleBatches() { + final List bytes = db.executeTransactionally("CALL apoc.export.parquet.all.stream({batchSize:1})", + Map.of(), + r -> Iterators.asList(r.columnAs("value"))); + + // then + final String query = "UNWIND $bytes AS byte CALL apoc.load.parquet(byte, $config) YIELD value " + + "RETURN value"; + + testResult(db, query, Map.of("bytes", bytes, "config", MAPPING_ALL), + ParquetTestUtil::roundtripLoadAllAssertions); + } + + @Test + public void testRoundtripWithMultipleBatches() { + final String fileName = db.executeTransactionally("CALL apoc.export.parquet.all('test.parquet', {batchSize:1})", + Map.of(), + ParquetTestUtil::extractFileName); + + // then + final String query = "CALL apoc.load.parquet($file, $config) YIELD value " + + "RETURN value"; + + testResult(db, query, Map.of("file", fileName, "config", MAPPING_ALL), + ParquetTestUtil::roundtripLoadAllAssertions); + } + + @Test + public void testFileRoundtripImportParquetAll() { + // given - when + String file = db.executeTransactionally("CALL apoc.export.parquet.all('test_all.parquet') YIELD file", + Map.of(), + ParquetTestUtil::extractFileName); + + + // then + Map params = Map.of("file", file, "config", MAPPING_ALL); + testImportAllCommon(db, params); + } + + @Test + public void testFileRoundtripParquetAll() { + // given - when + String file = db.executeTransactionally("CALL apoc.export.parquet.all('test_all.parquet') YIELD file", + Map.of(), + ParquetTestUtil::extractFileName); + + // then + final String query = "CALL apoc.load.parquet($file, $config) YIELD value " + + "RETURN value"; + + testResult(db, query, Map.of("file", file, "config", MAPPING_ALL), + ParquetTestUtil::roundtripLoadAllAssertions); + } + + @Test + public void testReturnNodeAndRelStream() { + testReturnNodeAndRelCommon(() -> db.executeTransactionally("CALL apoc.export.parquet.query.stream('MATCH (n:ParquetNode)-[r:BAR]->(o:Other) RETURN n,r,o ORDER BY n.idStart') ", + Map.of(), + ParquetTest::extractByteArray)); + } + + @Test + public void testReturnNodeAndRel() { + testReturnNodeAndRelCommon(() -> db.executeTransactionally( + "CALL apoc.export.parquet.query('MATCH (n:ParquetNode)-[r:BAR]->(o:Other) RETURN n,r,o ORDER BY n.idStart', " + + "'volume_test.parquet', $config) YIELD file ", + Map.of("config", MAPPING_QUERY), + ParquetTestUtil::extractFileName)); + } + + public static void testReturnNodeAndRelCommon(Supplier supplier) { + db.executeTransactionally("CREATE (:ParquetNode{idStart:1})-[:BAR {idRel: 'one'}]->(:Other {idOther: datetime('2020')})"); + db.executeTransactionally("CREATE (:ParquetNode{idStart:2})-[:BAR {idRel: 'two'}]->(:Other {idOther: datetime('1999')})"); + + Object fileOrBinary = supplier.get(); + + // then + final String query = "CALL apoc.load.parquet($file, $config)"; + + testResult(db, query, Map.of("file", fileOrBinary, "config", MAPPING_QUERY), + res -> { + ResourceIterator> value = res.columnAs("value"); + Map row = value.next(); + Map relTwo = (Map) row.get("r"); + assertBarRel("one", relTwo); + + Map startTwo = (Map) row.get("n"); + assertNodeAndLabel(startTwo, "ParquetNode"); + assertEquals(1L, startTwo.get("idStart")); + + Map endTwo = (Map) row.get("o"); + assertNodeAndLabel(endTwo, "Other"); + assertEquals("2020-01-01T00:00Z", endTwo.get("idOther")); + + row = value.next(); + Map rel = (Map) row.get("r"); + assertBarRel("two", rel); + + Map start = (Map) row.get("n"); + assertNodeAndLabel(start, "ParquetNode"); + assertEquals(2L, start.get("idStart")); + + Map end = (Map) row.get("o"); + assertNodeAndLabel(end, "Other"); + assertEquals("1999-01-01T00:00Z", end.get("idOther")); + + assertFalse(res.hasNext()); + }); + + db.executeTransactionally("MATCH (n:ParquetNode), (o:Other) DETACH DELETE n, o"); + } + + @Test + public void testFileVolumeParquetAll() { + // given - when + db.executeTransactionally("UNWIND range(0, 10000 - 1) AS id CREATE (:ParquetNode{id:id})"); + + String file = db.executeTransactionally("CALL apoc.export.parquet.query('MATCH (n:ParquetNode) RETURN n.id AS id', 'volume_test.parquet') YIELD file ", + Map.of(), + ParquetTestUtil::extractFileName); + + final List expected = LongStream.range(0, 10000) + .boxed() + .collect(Collectors.toList()); + + // then + final String query = "CALL apoc.load.parquet($file, $config) YIELD value " + + "WITH value.id AS id ORDER BY id RETURN collect(id) as ids"; + + testCall(db, query, Map.of("file", file, "config", MAPPING_ALL), + r -> assertEquals(expected, r.get("ids"))); + + db.executeTransactionally("MATCH (n:ParquetNode) DELETE n"); + } + + private static byte[] extractByteArray(Result result) { + ResourceIterator value = result.columnAs("value"); + return value.next(); + } + +} \ No newline at end of file diff --git a/extended/src/test/java/apoc/export/parquet/ParquetTestUtil.java b/extended/src/test/java/apoc/export/parquet/ParquetTestUtil.java new file mode 100644 index 0000000000..e760e687f5 --- /dev/null +++ b/extended/src/test/java/apoc/export/parquet/ParquetTestUtil.java @@ -0,0 +1,178 @@ +package apoc.export.parquet; + +import apoc.convert.ConvertUtils; +import apoc.graph.Graphs; +import apoc.load.LoadParquet; +import apoc.meta.Meta; +import apoc.util.TestUtil; +import apoc.util.collection.Iterators; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.ResourceIterator; +import org.neo4j.graphdb.Result; +import org.neo4j.kernel.impl.util.ValueUtils; +import org.neo4j.values.AnyValue; +import org.neo4j.values.storable.DurationValue; +import org.neo4j.values.storable.LocalDateTimeValue; +import org.neo4j.values.storable.PointValue; +import org.neo4j.values.virtual.VirtualValues; + +import java.time.LocalDate; +import java.util.List; +import java.util.Map; + +import static apoc.ApocConfig.APOC_EXPORT_FILE_ENABLED; +import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED; +import static apoc.ApocConfig.apocConfig; +import static apoc.export.parquet.ParquetUtil.FIELD_ID; +import static apoc.export.parquet.ParquetUtil.FIELD_LABELS; +import static apoc.export.parquet.ParquetUtil.FIELD_SOURCE_ID; +import static apoc.export.parquet.ParquetUtil.FIELD_TARGET_ID; +import static apoc.export.parquet.ParquetUtil.FIELD_TYPE; +import static apoc.util.TestUtil.testCall; +import static apoc.util.TestUtil.testResult; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ParquetTestUtil { + + public static void beforeClassCommon(GraphDatabaseService db) { + TestUtil.registerProcedure(db, ExportParquet.class, ImportParquet.class, LoadParquet.class, Graphs.class, Meta.class); + } + + public static void beforeCommon(GraphDatabaseService db) { + db.executeTransactionally("MATCH (n) DETACH DELETE n"); + + db.executeTransactionally("CREATE (f:User {name:'Adam',age:42,male:true,kids:['Sam','Anna','Grace', 'Qwe'], born:localdatetime('2015-05-18T19:32:24.000'), place:point({latitude: 13.1, longitude: 33.46789, height: 100.0})})-[:KNOWS {since: 1993, bffSince: duration('P5M1.5D')}]->(b:User {name:'Jim',age:42})"); + db.executeTransactionally("CREATE (:Another {foo:1, listDate: [date('1999'), date('2000')], listInt: [1,2]}), (:Another {bar:'Sam'})"); + + apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, true); + apocConfig().setProperty(APOC_EXPORT_FILE_ENABLED, true); + } + + public static void testImportAllCommon(GraphDatabaseService db, Map params) { + // remove current data + db.executeTransactionally("MATCH (n) DETACH DELETE n"); + + final String query = "CALL apoc.import.parquet($file, $config)"; + testCall(db, query, params, + r -> { + assertEquals(4L, r.get("nodes")); + assertEquals(1L, r.get("relationships")); + }); + + testCall(db, "MATCH (start:User)-[rel:KNOWS]->(end:User) RETURN start, rel, end", r -> { + Node start = (Node) r.get("start"); + assertFirstUserNodeProps(start.getAllProperties()); + Node end = (Node) r.get("end"); + assertSecondUserNodeProps(end.getAllProperties()); + Relationship rel = (Relationship) r.get("rel"); + assertRelationshipProps(rel.getAllProperties()); + }); + + testResult(db, "MATCH (m:Another) RETURN m", r -> { + ResourceIterator m = r.columnAs("m"); + Node node = m.next(); + assertFirstAnotherNodeProps(node.getAllProperties()); + node = m.next(); + assertSecondAnotherNodeProps(node.getAllProperties()); + assertFalse(r.hasNext()); + }); + } + + public static void roundtripLoadAllAssertions(Result result) { + ResourceIterator> value = result.columnAs("value"); + Map actual = value.next(); + assertFirstUserNode(actual); + actual = value.next(); + assertSecondUserNode(actual); + actual = value.next(); + assertFirstAnotherNode(actual); + actual = value.next(); + assertSecondAnotherNode(actual); + actual = value.next(); + assertRelationship(actual); + assertFalse(value.hasNext()); + } + + public static void assertNodeAndLabel(Map startTwo, String label) { + assertTrue(startTwo.get(FIELD_ID) instanceof Long); + assertEquals(ValueUtils.of(List.of(label)), ValueUtils.of(startTwo.get(FIELD_LABELS))); + } + + public static void assertBarRel(String one, Map relTwo) { + assertEquals(one, relTwo.get("idRel")); + assertEquals("BAR", relTwo.get(FIELD_TYPE)); + assertTrue(relTwo.get(FIELD_ID) instanceof Long); + assertTrue(relTwo.get(FIELD_SOURCE_ID) instanceof Long); + assertTrue(relTwo.get(FIELD_TARGET_ID) instanceof Long); + } + + public static String extractFileName(Result result) { + return Iterators.single(result.columnAs("file")); + } + + private static void assertFirstUserNode(Map map) { + assertNodeAndLabel(map, "User"); + assertFirstUserNodeProps(map); + } + + private static void assertSecondUserNode(Map map) { + assertNodeAndLabel(map, "User"); + assertSecondUserNodeProps(map); + } + + private static void assertFirstUserNodeProps(Map props) { + assertEquals("Adam", props.get("name")); + assertEquals(42L, props.get("age")); + assertEquals( true, props.get("male")); + assertArrayEquals(new String[] { "Sam", "Anna", "Grace", "Qwe" }, (String[]) props.get("kids")); + Map latitude = Map.of("latitude", 13.1D, "longitude", 33.46789D, "height", 100.0D); + assertEquals(PointValue.fromMap(VirtualValues.map(latitude.keySet().toArray(new String[0]), latitude.values().stream().map(ValueUtils::of).toArray(AnyValue[]::new))), + props.get("place")); + assertEquals(LocalDateTimeValue.parse("2015-05-18T19:32:24.000").asObject(), props.get("born")); + } + + private static void assertSecondUserNodeProps(Map props) { + assertEquals( "Jim", props.get("name")); + assertEquals(42L, props.get("age")); + } + + private static void assertFirstAnotherNode(Map map) { + assertNodeAndLabel(map, "Another"); + assertFirstAnotherNodeProps(map); + } + + private static void assertFirstAnotherNodeProps(Map map) { + assertEquals(1L, map.get("foo")); + List listDate = ConvertUtils.convertToList(map.get("listDate")); + assertEquals(2, listDate.size()); + assertEquals(LocalDate.of(1999, 1, 1), listDate.get(0)); + assertEquals(LocalDate.of(2000, 1, 1), listDate.get(1)); + assertArrayEquals(new long[] {1L, 2L}, (long[]) map.get("listInt")); + } + + private static void assertSecondAnotherNode(Map map) { + assertNodeAndLabel(map, "Another"); + assertSecondAnotherNodeProps(map); + } + + private static void assertSecondAnotherNodeProps(Map map) { + assertEquals("Sam", map.get("bar")); + } + + private static void assertRelationship(Map map) { + assertTrue(map.get(FIELD_SOURCE_ID) instanceof Long); + assertTrue(map.get(FIELD_TARGET_ID) instanceof Long); + assertRelationshipProps(map); + } + + private static void assertRelationshipProps(Map props) { + assertEquals(DurationValue.parse("P5M1DT12H"), props.get("bffSince")); + assertEquals(1993L, props.get("since")); + } + +} diff --git a/extra-dependencies/hadoop/build.gradle b/extra-dependencies/hadoop/build.gradle index 3871155103..0c3c503310 100644 --- a/extra-dependencies/hadoop/build.gradle +++ b/extra-dependencies/hadoop/build.gradle @@ -36,6 +36,10 @@ dependencies { implementation group: 'org.apache.hadoop', name: 'hadoop-hdfs-client', version: '3.3.5', commonExclusions implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.5', commonExclusions implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.23.1', commonExclusions + + implementation group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.13.1', commonExclusions + implementation group: 'org.apache.parquet', name: 'parquet-column', version: '1.13.1', commonExclusions + implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.5', commonExclusions }