Skip to content

Commit

Permalink
Fixes #3589: Add support for Parquet files similar to CSV/Arrow (#3711)…
Browse files Browse the repository at this point in the history
… (#3731)
  • Loading branch information
vga91 authored Aug 22, 2023
1 parent 62351b3 commit 9a00d99
Show file tree
Hide file tree
Showing 21 changed files with 2,378 additions and 0 deletions.
169 changes: 169 additions & 0 deletions extended/src/main/java/apoc/export/parquet/ApocParquetReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package apoc.export.parquet;

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnReadStore;
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static apoc.export.parquet.ParquetReadUtil.toTimeUnitJava;
import static apoc.export.parquet.ParquetReadUtil.toValidValue;

public final class ApocParquetReader implements Closeable {
private final ParquetFileReader reader;
private final List<ColumnDescriptor> columns;
private final MessageType schema;
private final GroupConverter recordConverter;
private final String createdBy;

private long currentRowGroupSize = -1L;
private List<ColumnReader> currentRowGroupColumnReaders;
private long currentRowIndex = -1L;
private final ParquetConfig config;

public ApocParquetReader(InputFile file, ParquetConfig config) throws IOException {
this.reader = ParquetFileReader.open(file);
FileMetaData meta = reader.getFooter().getFileMetaData();
this.schema = meta.getSchema();
this.recordConverter = new GroupRecordConverter(this.schema).getRootConverter();
this.createdBy = meta.getCreatedBy();

this.columns = schema.getColumns()
.stream()
.collect(Collectors.toList());

this.config = config;
}

private Object readValue(ColumnReader columnReader) {
ColumnDescriptor column = columnReader.getDescriptor();
PrimitiveType primitiveType = column.getPrimitiveType();
int maxDefinitionLevel = column.getMaxDefinitionLevel();

if (columnReader.getCurrentDefinitionLevel() == maxDefinitionLevel) {
switch (primitiveType.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
case INT96:
return columnReader.getBinary().toStringUsingUTF8();
case BOOLEAN:
return columnReader.getBoolean();
case DOUBLE:
return columnReader.getDouble();
case FLOAT:
return columnReader.getFloat();
case INT32:
return columnReader.getInteger();
case INT64:
// convert int to Temporal, if logical type is not null
long recordLong = columnReader.getLong();
LogicalTypeAnnotation logicalTypeAnnotation = primitiveType.getLogicalTypeAnnotation();
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation logicalTypeAnnotation1 = (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalTypeAnnotation;
if (logicalTypeAnnotation1.isAdjustedToUTC()) {
return Instant.EPOCH.plus(recordLong, toTimeUnitJava(logicalTypeAnnotation1.getUnit()).toChronoUnit());
} else {
return LocalDateTime.ofInstant(Instant.EPOCH.plus(recordLong, toTimeUnitJava(logicalTypeAnnotation1.getUnit()).toChronoUnit()), ZoneId.of("UTC"));
}
}
return recordLong;
default:
throw new IllegalArgumentException("Unsupported type: " + primitiveType);
}
} else {
// fallback
return null;
}
}

public Map<String, Object> getRecord() throws IOException {
if (currentRowIndex == currentRowGroupSize) {

PageReadStore rowGroup = reader.readNextRowGroup();
if (rowGroup == null) {
return null;
}

ColumnReadStore columnReadStore = new ColumnReadStoreImpl(rowGroup, this.recordConverter, this.schema, this.createdBy);

this.currentRowGroupSize = rowGroup.getRowCount();
this.currentRowGroupColumnReaders = columns.stream()
.map(columnReadStore::getColumnReader)
.collect(Collectors.toList());
this.currentRowIndex = 0L;
}

HashMap<String, Object> record = new HashMap<>();
for (ColumnReader columnReader: this.currentRowGroupColumnReaders) {
// if it's a list we have use columnReader.consume() multiple times (until columnReader.getCurrentRepetitionLevel() == 0, i.e. totally consumed)
// to collect the list elements
do {
addRecord(record, columnReader);
columnReader.consume();
} while (columnReader.getCurrentRepetitionLevel() != 0);
}

this.currentRowIndex++;

return record.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> toValidValue(e.getValue(), e.getKey(), config))
);
}

public void addRecord(Map<String, Object> record, ColumnReader columnReader) {
Object value = readValue(columnReader);
if (value== null) {
return;
}
String[] path = columnReader.getDescriptor().getPath();
String fieldName = path[0];
try {
// if it's a list, create a list of consumed sub-records
boolean isAList = path.length == 3 && path[1].equals("list");
record.compute(fieldName, (k, v) -> {
if (v == null) {
if (isAList) {
return new ArrayList<>() {{ add(value); }};
}
return value;
}
if (isAList) {
List list = (List) v;
list.add(value);
return list;
}
throw new RuntimeException("Multiple element with the same key found, but the element type is not a list");
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void close() throws IOException {
reader.close();
}
}

155 changes: 155 additions & 0 deletions extended/src/main/java/apoc/export/parquet/ExportParquet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package apoc.export.parquet;

import apoc.ApocConfig;
import apoc.Description;
import apoc.Extended;
import apoc.Pools;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.result.ByteArrayResult;
import apoc.result.ProgressInfo;
import apoc.util.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static apoc.ApocConfig.APOC_EXPORT_FILE_ENABLED;
import static apoc.ApocConfig.EXPORT_NOT_ENABLED_ERROR;
import static apoc.ApocConfig.apocConfig;
import static apoc.export.parquet.ParquetExportType.Type.from;

@Extended
public class ExportParquet {
public static final String EXPORT_TO_FILE_PARQUET_ERROR = EXPORT_NOT_ENABLED_ERROR +
"\nOtherwise, if you are running in a cloud environment without filesystem access, use the apoc.export.parquet.*.stream procedures to stream the export back to your client.";;

@Context
public Transaction tx;

@Context
public Log log;

@Context
public GraphDatabaseService db;

@Context
public TerminationGuard terminationGuard;

@Context
public ApocConfig apocConfig;

@Context
public Pools pools;


@Procedure("apoc.export.parquet.all.stream")
@Description("Exports the full database to the provided CSV file.")
public Stream<ByteArrayResult> all(@Name(value = "config", defaultValue = "{}") Map<String, Object> config) {
return exportParquet(new DatabaseSubGraph(tx), new ParquetConfig(config));
}

@Procedure("apoc.export.parquet.data.stream")
@Description("Exports the given nodes and relationships to the provided CSV file.")
public Stream<ByteArrayResult> data(@Name("nodes") List<Node> nodes, @Name("rels") List<Relationship> rels, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) {
ParquetConfig conf = new ParquetConfig(config);
return exportParquet(new NodesAndRelsSubGraph(tx, nodes, rels), conf);
}

@Procedure("apoc.export.parquet.graph.stream")
@Description("Exports the given graph to the provided CSV file.")
public Stream<ByteArrayResult> graph(@Name("graph") Map<String,Object> graph, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) {
Collection<Node> nodes = (Collection<Node>) graph.get("nodes");
Collection<Relationship> rels = (Collection<Relationship>) graph.get("relationships");
ParquetConfig conf = new ParquetConfig(config);

return exportParquet(new NodesAndRelsSubGraph(tx, nodes, rels), conf);
}

@Procedure("apoc.export.parquet.query.stream")
@Description("Exports the results from running the given Cypher query to the provided CSV file.")
public Stream<ByteArrayResult> query(@Name("query") String query, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) {
ParquetConfig exportConfig = new ParquetConfig(config);
Map<String,Object> params = config == null ? Collections.emptyMap() : (Map<String,Object>)config.getOrDefault("params", Collections.emptyMap());
Result result = tx.execute(query,params);

return exportParquet(result, exportConfig);
}

@Procedure("apoc.export.parquet.all")
@Description("Exports the full database to the provided CSV file.")
public Stream<ProgressInfo> all(@Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) throws IOException {
return exportParquet(fileName, new DatabaseSubGraph(tx), new ParquetConfig(config));
}

@Procedure("apoc.export.parquet.data")
@Description("Exports the given nodes and relationships to the provided CSV file.")
public Stream<ProgressInfo> data(@Name("nodes") List<Node> nodes, @Name("rels") List<Relationship> rels, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) throws IOException {
ParquetConfig conf = new ParquetConfig(config);
return exportParquet(fileName, new NodesAndRelsSubGraph(tx, nodes, rels), conf);
}

@Procedure("apoc.export.parquet.graph")
@Description("Exports the given graph to the provided CSV file.")
public Stream<ProgressInfo> graph(@Name("graph") Map<String,Object> graph, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) throws IOException {
Collection<Node> nodes = (Collection<Node>) graph.get("nodes");
Collection<Relationship> rels = (Collection<Relationship>) graph.get("relationships");
ParquetConfig conf = new ParquetConfig(config);

return exportParquet(fileName, new NodesAndRelsSubGraph(tx, nodes, rels), conf);
}

@Procedure("apoc.export.parquet.query")
@Description("Exports the results from running the given Cypher query to the provided CSV file.")
public Stream<ProgressInfo> query(@Name("query") String query, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) throws IOException {
ParquetConfig exportConfig = new ParquetConfig(config);
Map<String,Object> params = config == null ? Collections.emptyMap() : (Map<String,Object>)config.getOrDefault("params", Collections.emptyMap());
Result result = tx.execute(query,params);

return exportParquet(fileName, result, exportConfig);
}

public Stream<ProgressInfo> exportParquet(String fileName, Object data, ParquetConfig config) throws IOException {
if (StringUtils.isBlank(fileName)) {
throw new RuntimeException("The fileName must exists. Otherwise, use the `apoc.export.parquet.*.stream.` procedures to stream the export back to your client.");
}
// normalize file url
fileName = FileUtils.changeFileUrlIfImportDirectoryConstrained(fileName);

// we cannot use apocConfig().checkWriteAllowed(..) because the error is confusing
// since it says "... use the `{stream:true}` config", but with arrow procedures the streaming mode is implemented via different procedures
if (!apocConfig().getBoolean(APOC_EXPORT_FILE_ENABLED)) {
throw new RuntimeException(EXPORT_TO_FILE_PARQUET_ERROR);
}
ParquetExportType exportType = from(data);
if (data instanceof Result) {
return new ExportParquetResultFileStrategy(fileName, db, pools, terminationGuard, log, exportType).export((Result) data, config);
}
return new ExportParquetGraphFileStrategy(fileName, db, pools, terminationGuard, log, exportType).export((SubGraph) data, config);
}

public Stream<ByteArrayResult> exportParquet(Object data, ParquetConfig config) {

ParquetExportType exportType = from(data);
if (data instanceof Result) {
return new ExportParquetResultStreamStrategy(db, pools, terminationGuard, log, exportType).export((Result) data, config);
}
return new ExportParquetGraphStreamStrategy(db, pools, terminationGuard, log, exportType).export((SubGraph) data, config);
}
}

Loading

0 comments on commit 9a00d99

Please sign in to comment.