Skip to content

Commit

Permalink
fixes neo4j-contrib#1276: Add streaming support to export JSON and Gr…
Browse files Browse the repository at this point in the history
…aphML
  • Loading branch information
conker84 committed Oct 31, 2019
1 parent c426bb2 commit d5d7b3f
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 57 deletions.
5 changes: 4 additions & 1 deletion docs/asciidoc/export/exportJson.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ include::../../../build/generated-documentation/apoc.export.json.csv[]

.Config

[opts=header]
|===
| writeNodeProperties | true/false, if true export properties too.
| name | type | default | description
| writeNodeProperties | boolean | false | if true export properties too.
| stream | boolean | false | stream the json directly to the client into the `data` field
|===

[NOTE]
Expand Down
1 change: 1 addition & 0 deletions docs/asciidoc/export/graphml.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ The output of `labels()` function is not sorted, use it in combination with `apo
| cypherFormat | create | In export to cypher script, define the cypher format (for example use `MERGE` instead of `CREATE`). Possible values are: "create", "updateAll", "addStructure", "updateStructure".
| bulkImport | true | In export it creates files for Neo4j Admin import
| separateHeader | false | In export it creates two file one for header and one for data
| stream | false | stream the xml directly to the client into the `data` field
|===

Values for the `quotes` configuration:
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/apoc/export/csv/ExportCSV.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import apoc.result.ProgressInfo;
import apoc.util.QueueBasedSpliterator;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.GraphDatabaseService;
Expand Down Expand Up @@ -88,7 +89,7 @@ private void preventBulkImport(ExportConfig config) {
}

private Stream<ProgressInfo> exportCsv(@Name("file") String fileName, String source, Object data, ExportConfig exportConfig) throws Exception {
checkWriteAllowed(exportConfig);
if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(exportConfig);
ProgressInfo progressInfo = new ProgressInfo(fileName, source, "csv");
progressInfo.batchSize = exportConfig.getBatchSize();
ProgressReporter reporter = new ProgressReporter(null, null, progressInfo);
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/apoc/export/cypher/ExportCypher.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import apoc.result.ProgressInfo;
import apoc.util.QueueBasedSpliterator;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.CypherResultSubGraph;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
Expand Down Expand Up @@ -96,7 +97,7 @@ public Stream<DataProgressInfo> schema(@Name(value = "file",defaultValue = "") S
}

private Stream<DataProgressInfo> exportCypher(@Name("file") String fileName, String source, SubGraph graph, ExportConfig c, boolean onlySchema) throws IOException {
if (fileName != null) checkWriteAllowed(c);
if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(c);

ProgressInfo progressInfo = new ProgressInfo(fileName, source, "cypher");
progressInfo.batchSize = c.getBatchSize();
Expand Down
14 changes: 11 additions & 3 deletions src/main/java/apoc/export/cypher/FileManagerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,23 @@ public StringExportCypherFileManager(boolean separatedFiles) {
@Override
public PrintWriter getPrintWriter(String type) throws IOException {
if (this.separatedFiles) {
return new PrintWriter(writers.compute(type, (key, writer) -> writer == null ? new StringWriter() : writer));
return new PrintWriter(getStringWriter(type));
} else {
return new PrintWriter(writers.compute(type.equals("csv") ? type : "cypher", (key, writer) -> writer == null ? new StringWriter() : writer));
switch (type) {
case "csv":
case "json":
case "graphml":
break;
default:
type = "cypher";
}
return new PrintWriter(getStringWriter(type));
}
}

@Override
public StringWriter getStringWriter(String type) {
return writers.get(type);
return writers.computeIfAbsent(type, (key) -> new StringWriter());
}

@Override
Expand Down
60 changes: 50 additions & 10 deletions src/main/java/apoc/export/graphml/ExportGraphML.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package apoc.export.graphml;

import apoc.Pools;
import apoc.export.cypher.ExportFileManager;
import apoc.export.cypher.FileManagerFactory;
import apoc.export.util.ExportConfig;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.export.util.ProgressReporter;
import apoc.result.ProgressInfo;
import apoc.util.FileUtils;
import apoc.util.QueueBasedSpliterator;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.CypherResultSubGraph;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
Expand All @@ -14,17 +19,22 @@
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Result;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.procedure.*;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Mode;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

import javax.xml.stream.XMLStreamException;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static apoc.util.FileUtils.checkWriteAllowed;
import static apoc.util.FileUtils.getPrintWriter;

/**
* @author mh
Expand All @@ -34,6 +44,9 @@ public class ExportGraphML {
@Context
public GraphDatabaseService db;

@Context
public TerminationGuard terminationGuard;

@Procedure(name = "apoc.import.graphml",mode = Mode.WRITE)
@Description("apoc.import.graphml(file,config) - imports graphml file")
public Stream<ProgressInfo> file(@Name("file") String fileName, @Name("config") Map<String, Object> config) throws Exception {
Expand Down Expand Up @@ -90,14 +103,41 @@ public Stream<ProgressInfo> query(@Name("query") String query, @Name("file") Str
return exportGraphML(fileName, source, graph, c);
}

private Stream<ProgressInfo> exportGraphML(@Name("file") String fileName, String source, SubGraph graph, ExportConfig config) throws Exception, XMLStreamException {
if (fileName != null) checkWriteAllowed();
private Stream<ProgressInfo> exportGraphML(@Name("file") String fileName, String source, SubGraph graph, ExportConfig exportConfig) throws Exception {
if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(exportConfig);
ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, source, "graphml"));
PrintWriter printWriter = getPrintWriter(fileName, null);
XmlGraphMLWriter exporter = new XmlGraphMLWriter();
exporter.write(graph, printWriter, reporter, config);
printWriter.flush();
printWriter.close();
return reporter.stream();
ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, false, exportConfig.streamStatements());
final PrintWriter graphMl = cypherFileManager.getPrintWriter("graphml");
if (exportConfig.streamStatements()) {
long timeout = exportConfig.getTimeoutSeconds();
final ArrayBlockingQueue<ProgressInfo> queue = new ArrayBlockingQueue<>(1000);
ProgressReporter reporterWithConsumer = reporter.withConsumer(
(pi) -> {
Util.put(queue, pi == ProgressInfo.EMPTY ? ProgressInfo.EMPTY : new ProgressInfo(pi).drain(cypherFileManager.getStringWriter("graphml")), timeout);
closeWriter(graphMl);
}
);
Util.inTxFuture(Pools.DEFAULT, db, () -> {
exporter.write(graph, graphMl, reporterWithConsumer, exportConfig);
return true;
});
QueueBasedSpliterator<ProgressInfo> spliterator = new QueueBasedSpliterator<>(queue, ProgressInfo.EMPTY, terminationGuard, timeout);
return StreamSupport.stream(spliterator, false);
} else {
exporter.write(graph, graphMl, reporter, exportConfig);
closeWriter(graphMl);
return reporter.stream();
}
}

private void closeWriter(PrintWriter writer) {
writer.flush();
try {
writer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}
1 change: 1 addition & 0 deletions src/main/java/apoc/export/graphml/XmlGraphMLWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public void write(SubGraph graph, Writer writer, Reporter reporter, ExportConfig
reporter.update(0, 1, props);
}
writeFooter(xmlWriter);
reporter.done();
}

private void writeKey(XMLStreamWriter writer, SubGraph ops, ExportConfig config) throws Exception {
Expand Down
48 changes: 35 additions & 13 deletions src/main/java/apoc/export/json/ExportJson.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package apoc.export.json;

import apoc.Pools;
import apoc.export.cypher.ExportFileManager;
import apoc.export.cypher.FileManagerFactory;
import apoc.export.util.ExportConfig;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.export.util.ProgressReporter;
import apoc.result.ProgressInfo;
import apoc.util.QueueBasedSpliterator;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.GraphDatabaseService;
Expand All @@ -17,20 +20,25 @@
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

import java.io.PrintWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static apoc.util.FileUtils.checkWriteAllowed;

public class ExportJson {
@Context
public GraphDatabaseService db;

@Context
public TerminationGuard terminationGuard;

public ExportJson(GraphDatabaseService db) {
this.db = db;
}
Expand Down Expand Up @@ -72,20 +80,34 @@ public Stream<ProgressInfo> query(@Name("query") String query, @Name("file") Str
return exportJson(fileName, source,result,config);
}

private Stream<ProgressInfo> exportJson(@Name("file") String fileName, String source, Object data, Map<String,Object> config) throws Exception {
ExportConfig c = new ExportConfig(config);
checkWriteAllowed(c);
private Stream<ProgressInfo> exportJson(String fileName, String source, Object data, Map<String,Object> config) throws Exception {
ExportConfig exportConfig = new ExportConfig(config);
if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(exportConfig);
ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, source, "json"));
JsonFormat exporter = new JsonFormat(db);

ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, false, c.streamStatements());

try (PrintWriter printWriter = cypherFileManager.getPrintWriter("json")) {
if (data instanceof SubGraph)
exporter.dump(((SubGraph)data),cypherFileManager,reporter,c);
if (data instanceof Result)
exporter.dump(((Result)data),printWriter,reporter,c);
ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, false, exportConfig.streamStatements());
if (exportConfig.streamStatements()) {
long timeout = exportConfig.getTimeoutSeconds();
final ArrayBlockingQueue<ProgressInfo> queue = new ArrayBlockingQueue<>(1000);
ProgressReporter reporterWithConsumer = reporter.withConsumer(
(pi) -> Util.put(queue, pi == ProgressInfo.EMPTY ? ProgressInfo.EMPTY : new ProgressInfo(pi).drain(cypherFileManager.getStringWriter("json")), timeout)
);
Util.inTxFuture(Pools.DEFAULT, db, () -> {
dump(data, exportConfig, reporterWithConsumer, exporter, cypherFileManager);
return true;
});
QueueBasedSpliterator<ProgressInfo> spliterator = new QueueBasedSpliterator<>(queue, ProgressInfo.EMPTY, terminationGuard, timeout);
return StreamSupport.stream(spliterator, false);
} else {
dump(data, exportConfig, reporter, exporter, cypherFileManager);
return reporter.stream();
}
return reporter.stream();
}

private void dump(Object data, ExportConfig c, ProgressReporter reporter, JsonFormat exporter, ExportFileManager cypherFileManager) throws Exception {
if (data instanceof SubGraph)
exporter.dump(((SubGraph)data),cypherFileManager,reporter,c);
if (data instanceof Result)
exporter.dump(((Result)data),cypherFileManager,reporter,c);
}
}
29 changes: 18 additions & 11 deletions src/main/java/apoc/export/json/JsonFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.*;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Path;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;

import java.io.IOException;
import java.io.Reader;
Expand All @@ -34,12 +39,15 @@ public ProgressInfo load(Reader reader, Reporter reporter, ExportConfig config)
}

private ProgressInfo dump(Writer writer, Reporter reporter, Consumer<JsonGenerator> consumer) throws Exception {
try (Transaction tx = db.beginTx(); JsonGenerator jsonGenerator = getJsonGenerator(writer);) {

try (Transaction tx = db.beginTx();
JsonGenerator jsonGenerator = getJsonGenerator(writer)) {
consumer.accept(jsonGenerator);

jsonGenerator.flush();
tx.success();
reporter.done();
return reporter.getTotal();
} finally {
writer.close();
}
}

Expand All @@ -56,7 +64,7 @@ public ProgressInfo dump(SubGraph graph, ExportFileManager writer, Reporter repo
return dump(writer.getPrintWriter("json"), reporter, consumer);
}

public ProgressInfo dump(Result result, Writer writer, Reporter reporter, ExportConfig config) throws Exception {
public ProgressInfo dump(Result result, ExportFileManager writer, Reporter reporter, ExportConfig config) throws Exception {
Consumer<JsonGenerator> consumer = (jsonGenerator) -> {
try {
String[] header = result.columns().toArray(new String[result.columns().size()]);
Expand All @@ -69,14 +77,14 @@ public ProgressInfo dump(Result result, Writer writer, Reporter reporter, Export
throw new RuntimeException(e);
}
};
return dump(writer, reporter, consumer);
return dump(writer.getPrintWriter("json"), reporter, consumer);
}

private JsonGenerator getJsonGenerator(Writer writer) throws IOException {
JsonFactory jsonF = new JsonFactory();
JsonGenerator jsonGenerator = jsonF.createGenerator(writer);
jsonGenerator.setCodec(JsonUtil.OBJECT_MAPPER);
jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
JsonGenerator jsonGenerator = new JsonFactory()
.createGenerator(writer)
.setCodec(JsonUtil.OBJECT_MAPPER)
.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
return jsonGenerator;
}

Expand Down Expand Up @@ -158,7 +166,6 @@ private void write(Reporter reporter, JsonGenerator jsonGenerator, ExportConfig
JsonFormatSerializer.DEFAULT.serializeProperty(jsonGenerator, keyName, value, writeKey);
reporter.update(0, 0, 1);
break;

}
}

Expand Down
Loading

0 comments on commit d5d7b3f

Please sign in to comment.