Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #3589: Add support for Parquet files similar to CSV/Arrow #3711

Merged
merged 1 commit into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ subprojects {

ext {
// NB: due to version.json generation by parsing this file, the next line must not have any if/then/else logic
neo4jVersion = "5.11.0"
neo4jVersion = "5.9.0"
// instead we apply the override logic here
neo4jVersionEffective = project.hasProperty("neo4jVersionOverride") ? project.getProperty("neo4jVersionOverride") : neo4jVersion
testContainersVersion = '1.18.3'
Expand Down
7 changes: 7 additions & 0 deletions extended/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ task gitSubmoduleLoad {
}

dependencies {

apt project(':processor')
apt group: 'org.neo4j', name: 'neo4j', version: neo4jVersionEffective // mandatory to run @ServiceProvider based META-INF code generation

Expand Down Expand Up @@ -102,6 +103,11 @@ dependencies {
compileOnly group: 'com.sun.mail', name: 'javax.mail', version: '1.6.0'
compileOnly group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: '1.6.0'

compileOnly group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.13.1', withoutServers
// testImplementation analogous is not needed since is bundled via `test-utils` submodule
compileOnly group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.1.0', withoutServers


// These dependencies affect the tests only, they will not be packaged in the resulting .jar
testImplementation project(':test-utils')
testImplementation project(':core')
Expand Down Expand Up @@ -130,6 +136,7 @@ dependencies {
testImplementation group: 'com.sun.mail', name: 'javax.mail', version: '1.6.0'
testImplementation group: 'org.postgresql', name: 'postgresql', version: '42.1.4'
testImplementation group: 'org.zapodot', name: 'embedded-ldap-junit', version: '0.9.0'
testImplementation group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.13.1', withoutServers


configurations.all {
Expand Down
169 changes: 169 additions & 0 deletions extended/src/main/java/apoc/export/parquet/ApocParquetReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package apoc.export.parquet;

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnReadStore;
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static apoc.export.parquet.ParquetReadUtil.toTimeUnitJava;
import static apoc.export.parquet.ParquetReadUtil.toValidValue;

public final class ApocParquetReader implements Closeable {
private final ParquetFileReader reader;
private final List<ColumnDescriptor> columns;
private final MessageType schema;
private final GroupConverter recordConverter;
private final String createdBy;

private long currentRowGroupSize = -1L;
private List<ColumnReader> currentRowGroupColumnReaders;
private long currentRowIndex = -1L;
private final ParquetConfig config;

public ApocParquetReader(InputFile file, ParquetConfig config) throws IOException {
this.reader = ParquetFileReader.open(file);
FileMetaData meta = reader.getFooter().getFileMetaData();
this.schema = meta.getSchema();
this.recordConverter = new GroupRecordConverter(this.schema).getRootConverter();
this.createdBy = meta.getCreatedBy();

this.columns = schema.getColumns()
.stream()
.collect(Collectors.toList());

this.config = config;
}

private Object readValue(ColumnReader columnReader) {
ColumnDescriptor column = columnReader.getDescriptor();
PrimitiveType primitiveType = column.getPrimitiveType();
int maxDefinitionLevel = column.getMaxDefinitionLevel();

if (columnReader.getCurrentDefinitionLevel() == maxDefinitionLevel) {
switch (primitiveType.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
case INT96:
return columnReader.getBinary().toStringUsingUTF8();
case BOOLEAN:
return columnReader.getBoolean();
case DOUBLE:
return columnReader.getDouble();
case FLOAT:
return columnReader.getFloat();
case INT32:
return columnReader.getInteger();
case INT64:
// convert int to Temporal, if logical type is not null
long recordLong = columnReader.getLong();
LogicalTypeAnnotation logicalTypeAnnotation = primitiveType.getLogicalTypeAnnotation();
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation logicalTypeAnnotation1 = (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalTypeAnnotation;
if (logicalTypeAnnotation1.isAdjustedToUTC()) {
return Instant.EPOCH.plus(recordLong, toTimeUnitJava(logicalTypeAnnotation1.getUnit()).toChronoUnit());
} else {
return LocalDateTime.ofInstant(Instant.EPOCH.plus(recordLong, toTimeUnitJava(logicalTypeAnnotation1.getUnit()).toChronoUnit()), ZoneId.of("UTC"));
}
}
return recordLong;
default:
throw new IllegalArgumentException("Unsupported type: " + primitiveType);
}
} else {
// fallback
return null;
}
}

public Map<String, Object> getRecord() throws IOException {
if (currentRowIndex == currentRowGroupSize) {

PageReadStore rowGroup = reader.readNextRowGroup();
if (rowGroup == null) {
return null;
}

ColumnReadStore columnReadStore = new ColumnReadStoreImpl(rowGroup, this.recordConverter, this.schema, this.createdBy);

this.currentRowGroupSize = rowGroup.getRowCount();
this.currentRowGroupColumnReaders = columns.stream()
.map(columnReadStore::getColumnReader)
.collect(Collectors.toList());
this.currentRowIndex = 0L;
}

HashMap<String, Object> record = new HashMap<>();
for (ColumnReader columnReader: this.currentRowGroupColumnReaders) {
// if it's a list we have use columnReader.consume() multiple times (until columnReader.getCurrentRepetitionLevel() == 0, i.e. totally consumed)
// to collect the list elements
do {
addRecord(record, columnReader);
columnReader.consume();
} while (columnReader.getCurrentRepetitionLevel() != 0);
}

this.currentRowIndex++;

return record.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> toValidValue(e.getValue(), e.getKey(), config))
);
}

public void addRecord(Map<String, Object> record, ColumnReader columnReader) {
Object value = readValue(columnReader);
if (value== null) {
return;
}
String[] path = columnReader.getDescriptor().getPath();
String fieldName = path[0];
try {
// if it's a list, create a list of consumed sub-records
boolean isAList = path.length == 3 && path[1].equals("list");
record.compute(fieldName, (k, v) -> {
if (v == null) {
if (isAList) {
return new ArrayList<>() {{ add(value); }};
}
return value;
}
if (isAList) {
List list = (List) v;
list.add(value);
return list;
}
throw new RuntimeException("Multiple element with the same key found, but the element type is not a list");
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void close() throws IOException {
reader.close();
}
}

155 changes: 155 additions & 0 deletions extended/src/main/java/apoc/export/parquet/ExportParquet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package apoc.export.parquet;

import apoc.ApocConfig;
import apoc.Description;
import apoc.Extended;
import apoc.Pools;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.result.ByteArrayResult;
import apoc.result.ProgressInfo;
import apoc.util.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static apoc.ApocConfig.APOC_EXPORT_FILE_ENABLED;
import static apoc.ApocConfig.EXPORT_NOT_ENABLED_ERROR;
import static apoc.ApocConfig.apocConfig;
import static apoc.export.parquet.ParquetExportType.Type.from;

@Extended
public class ExportParquet {
public static final String EXPORT_TO_FILE_PARQUET_ERROR = EXPORT_NOT_ENABLED_ERROR +
"\nOtherwise, if you are running in a cloud environment without filesystem access, use the apoc.export.parquet.*.stream procedures to stream the export back to your client.";;

@Context
public Transaction tx;

@Context
public Log log;

@Context
public GraphDatabaseService db;

@Context
public TerminationGuard terminationGuard;

@Context
public ApocConfig apocConfig;

@Context
public Pools pools;


@Procedure("apoc.export.parquet.all.stream")
@Description("Exports the full database to the provided CSV file.")
public Stream<ByteArrayResult> all(@Name(value = "config", defaultValue = "{}") Map<String, Object> config) {
return exportParquet(new DatabaseSubGraph(tx), new ParquetConfig(config));
}

@Procedure("apoc.export.parquet.data.stream")
@Description("Exports the given nodes and relationships to the provided CSV file.")
public Stream<ByteArrayResult> data(@Name("nodes") List<Node> nodes, @Name("rels") List<Relationship> rels, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) {
ParquetConfig conf = new ParquetConfig(config);
return exportParquet(new NodesAndRelsSubGraph(tx, nodes, rels), conf);
}

@Procedure("apoc.export.parquet.graph.stream")
@Description("Exports the given graph to the provided CSV file.")
public Stream<ByteArrayResult> graph(@Name("graph") Map<String,Object> graph, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) {
Collection<Node> nodes = (Collection<Node>) graph.get("nodes");
Collection<Relationship> rels = (Collection<Relationship>) graph.get("relationships");
ParquetConfig conf = new ParquetConfig(config);

return exportParquet(new NodesAndRelsSubGraph(tx, nodes, rels), conf);
}

@Procedure("apoc.export.parquet.query.stream")
@Description("Exports the results from running the given Cypher query to the provided CSV file.")
public Stream<ByteArrayResult> query(@Name("query") String query, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) {
ParquetConfig exportConfig = new ParquetConfig(config);
Map<String,Object> params = config == null ? Collections.emptyMap() : (Map<String,Object>)config.getOrDefault("params", Collections.emptyMap());
Result result = tx.execute(query,params);

return exportParquet(result, exportConfig);
}

@Procedure("apoc.export.parquet.all")
@Description("Exports the full database to the provided CSV file.")
public Stream<ProgressInfo> all(@Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) throws IOException {
return exportParquet(fileName, new DatabaseSubGraph(tx), new ParquetConfig(config));
}

@Procedure("apoc.export.parquet.data")
@Description("Exports the given nodes and relationships to the provided CSV file.")
public Stream<ProgressInfo> data(@Name("nodes") List<Node> nodes, @Name("rels") List<Relationship> rels, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) throws IOException {
ParquetConfig conf = new ParquetConfig(config);
return exportParquet(fileName, new NodesAndRelsSubGraph(tx, nodes, rels), conf);
}

@Procedure("apoc.export.parquet.graph")
@Description("Exports the given graph to the provided CSV file.")
public Stream<ProgressInfo> graph(@Name("graph") Map<String,Object> graph, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) throws IOException {
Collection<Node> nodes = (Collection<Node>) graph.get("nodes");
Collection<Relationship> rels = (Collection<Relationship>) graph.get("relationships");
ParquetConfig conf = new ParquetConfig(config);

return exportParquet(fileName, new NodesAndRelsSubGraph(tx, nodes, rels), conf);
}

@Procedure("apoc.export.parquet.query")
@Description("Exports the results from running the given Cypher query to the provided CSV file.")
public Stream<ProgressInfo> query(@Name("query") String query, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) throws IOException {
ParquetConfig exportConfig = new ParquetConfig(config);
Map<String,Object> params = config == null ? Collections.emptyMap() : (Map<String,Object>)config.getOrDefault("params", Collections.emptyMap());
Result result = tx.execute(query,params);

return exportParquet(fileName, result, exportConfig);
}

public Stream<ProgressInfo> exportParquet(String fileName, Object data, ParquetConfig config) throws IOException {
if (StringUtils.isBlank(fileName)) {
throw new RuntimeException("The fileName must exists. Otherwise, use the `apoc.export.parquet.*.stream.` procedures to stream the export back to your client.");
}
// normalize file url
fileName = FileUtils.changeFileUrlIfImportDirectoryConstrained(fileName);

// we cannot use apocConfig().checkWriteAllowed(..) because the error is confusing
// since it says "... use the `{stream:true}` config", but with arrow procedures the streaming mode is implemented via different procedures
if (!apocConfig().getBoolean(APOC_EXPORT_FILE_ENABLED)) {
throw new RuntimeException(EXPORT_TO_FILE_PARQUET_ERROR);
}
ParquetExportType exportType = from(data);
if (data instanceof Result) {
return new ExportParquetResultFileStrategy(fileName, db, pools, terminationGuard, log, exportType).export((Result) data, config);
}
return new ExportParquetGraphFileStrategy(fileName, db, pools, terminationGuard, log, exportType).export((SubGraph) data, config);
}

public Stream<ByteArrayResult> exportParquet(Object data, ParquetConfig config) {

ParquetExportType exportType = from(data);
if (data instanceof Result) {
return new ExportParquetResultStreamStrategy(db, pools, terminationGuard, log, exportType).export((Result) data, config);
}
return new ExportParquetGraphStreamStrategy(db, pools, terminationGuard, log, exportType).export((SubGraph) data, config);
}
}

Loading