Skip to content

Commit

Permalink
fixes neo4j-contrib#1276: Add streaming support to export JSON and Gr…
Browse files Browse the repository at this point in the history
…aphML
  • Loading branch information
conker84 committed Nov 15, 2019
1 parent c426bb2 commit 06a35f9
Show file tree
Hide file tree
Showing 12 changed files with 250 additions and 80 deletions.
5 changes: 4 additions & 1 deletion docs/asciidoc/export/exportJson.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ include::../../../build/generated-documentation/apoc.export.json.csv[]

.Config

[opts=header]
|===
| writeNodeProperties | true/false, if true export properties too.
| name | type | default | description
| writeNodeProperties | boolean | false | if true export properties too.
| stream | boolean | false | stream the json directly to the client into the `data` field
|===

[NOTE]
Expand Down
1 change: 1 addition & 0 deletions docs/asciidoc/export/graphml.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ The output of `labels()` function is not sorted, use it in combination with `apo
| cypherFormat | create | In export to cypher script, define the cypher format (for example use `MERGE` instead of `CREATE`). Possible values are: "create", "updateAll", "addStructure", "updateStructure".
| bulkImport | true | In export it creates files for Neo4j Admin import
| separateHeader | false | In export it creates two file one for header and one for data
| stream | false | stream the xml directly to the client into the `data` field
|===

Values for the `quotes` configuration:
Expand Down
46 changes: 22 additions & 24 deletions src/main/java/apoc/export/csv/ExportCSV.java
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
package apoc.export.csv;

import apoc.Pools;
import apoc.export.cypher.ExportFileManager;
import apoc.export.cypher.FileManagerFactory;
import apoc.export.util.ExportConfig;
import apoc.export.util.ExportUtils;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.export.util.ProgressReporter;
import apoc.result.ProgressInfo;
import apoc.util.QueueBasedSpliterator;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Result;
import org.neo4j.procedure.*;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static apoc.util.FileUtils.checkWriteAllowed;

Expand Down Expand Up @@ -88,37 +90,33 @@ private void preventBulkImport(ExportConfig config) {
}

private Stream<ProgressInfo> exportCsv(@Name("file") String fileName, String source, Object data, ExportConfig exportConfig) throws Exception {
checkWriteAllowed(exportConfig);
ProgressInfo progressInfo = new ProgressInfo(fileName, source, "csv");
if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(exportConfig);
final String format = "csv";
ProgressInfo progressInfo = new ProgressInfo(fileName, source, format);
progressInfo.batchSize = exportConfig.getBatchSize();
ProgressReporter reporter = new ProgressReporter(null, null, progressInfo);
CsvFormat exporter = new CsvFormat(db);

ExportFileManager cypherFileManager = FileManagerFactory
.createFileManager(fileName, exportConfig.isBulkImport(), exportConfig.streamStatements());
.createFileManager(fileName, exportConfig.isBulkImport());

if (exportConfig.streamStatements()) {
long timeout = exportConfig.getTimeoutSeconds();
final ArrayBlockingQueue<ProgressInfo> queue = new ArrayBlockingQueue<>(1000);
ProgressReporter reporterWithConsumer = reporter.withConsumer(
(pi) -> Util.put(queue, pi == ProgressInfo.EMPTY ? ProgressInfo.EMPTY : new ProgressInfo(pi).drain(cypherFileManager.getStringWriter("csv")), timeout)
);
Util.inTxFuture(Pools.DEFAULT, db, () -> {
dump(data, exportConfig, reporterWithConsumer, cypherFileManager, exporter);
return true;
});
QueueBasedSpliterator<ProgressInfo> spliterator = new QueueBasedSpliterator<>(queue, ProgressInfo.EMPTY, terminationGuard, timeout);
return StreamSupport.stream(spliterator, false);
return ExportUtils.getProgressInfoStream(db, terminationGuard, format, exportConfig, reporter, cypherFileManager,
(reporterWithConsumer) -> dump(data, exportConfig, reporterWithConsumer, cypherFileManager, exporter));
} else {
dump(data, exportConfig, reporter, cypherFileManager, exporter);
return reporter.stream();
}
}

private void dump(Object data, ExportConfig c, ProgressReporter reporter, ExportFileManager printWriter, CsvFormat exporter) throws Exception {
if (data instanceof SubGraph)
exporter.dump((SubGraph)data,printWriter,reporter,c);
if (data instanceof Result)
exporter.dump((Result)data,printWriter,reporter,c);
private void dump(Object data, ExportConfig c, ProgressReporter reporter, ExportFileManager printWriter, CsvFormat exporter) {
try {
if (data instanceof SubGraph)
exporter.dump((SubGraph)data,printWriter,reporter,c);
if (data instanceof Result)
exporter.dump((Result)data,printWriter,reporter,c);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
5 changes: 3 additions & 2 deletions src/main/java/apoc/export/cypher/ExportCypher.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import apoc.result.ProgressInfo;
import apoc.util.QueueBasedSpliterator;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.CypherResultSubGraph;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
Expand Down Expand Up @@ -96,13 +97,13 @@ public Stream<DataProgressInfo> schema(@Name(value = "file",defaultValue = "") S
}

private Stream<DataProgressInfo> exportCypher(@Name("file") String fileName, String source, SubGraph graph, ExportConfig c, boolean onlySchema) throws IOException {
if (fileName != null) checkWriteAllowed(c);
if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(c);

ProgressInfo progressInfo = new ProgressInfo(fileName, source, "cypher");
progressInfo.batchSize = c.getBatchSize();
ProgressReporter reporter = new ProgressReporter(null, null, progressInfo);
boolean separatedFiles = !onlySchema && c.separateFiles();
ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, separatedFiles, c.streamStatements());
ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, separatedFiles);

if (c.streamStatements()) {
long timeout = c.getTimeoutSeconds();
Expand Down
16 changes: 12 additions & 4 deletions src/main/java/apoc/export/cypher/FileManagerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* @since 06.12.17
*/
public class FileManagerFactory {
public static ExportFileManager createFileManager(String fileName, boolean separatedFiles, boolean b) {
public static ExportFileManager createFileManager(String fileName, boolean separatedFiles) {
if (fileName == null) {
return new StringExportCypherFileManager(separatedFiles);
}
Expand Down Expand Up @@ -82,15 +82,23 @@ public StringExportCypherFileManager(boolean separatedFiles) {
@Override
public PrintWriter getPrintWriter(String type) throws IOException {
if (this.separatedFiles) {
return new PrintWriter(writers.compute(type, (key, writer) -> writer == null ? new StringWriter() : writer));
return new PrintWriter(getStringWriter(type));
} else {
return new PrintWriter(writers.compute(type.equals("csv") ? type : "cypher", (key, writer) -> writer == null ? new StringWriter() : writer));
switch (type) {
case "csv":
case "json":
case "graphml":
break;
default:
type = "cypher";
}
return new PrintWriter(getStringWriter(type));
}
}

@Override
public StringWriter getStringWriter(String type) {
return writers.get(type);
return writers.computeIfAbsent(type, (key) -> new StringWriter());
}

@Override
Expand Down
54 changes: 43 additions & 11 deletions src/main/java/apoc/export/graphml/ExportGraphML.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package apoc.export.graphml;

import apoc.export.cypher.ExportFileManager;
import apoc.export.cypher.FileManagerFactory;
import apoc.export.util.ExportConfig;
import apoc.export.util.ExportUtils;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.export.util.ProgressReporter;
import apoc.result.ProgressInfo;
import apoc.util.FileUtils;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.CypherResultSubGraph;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
Expand All @@ -14,17 +18,20 @@
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Result;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.procedure.*;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Mode;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

import javax.xml.stream.XMLStreamException;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static apoc.util.FileUtils.checkWriteAllowed;
import static apoc.util.FileUtils.getPrintWriter;

/**
* @author mh
Expand All @@ -34,6 +41,9 @@ public class ExportGraphML {
@Context
public GraphDatabaseService db;

@Context
public TerminationGuard terminationGuard;

@Procedure(name = "apoc.import.graphml",mode = Mode.WRITE)
@Description("apoc.import.graphml(file,config) - imports graphml file")
public Stream<ProgressInfo> file(@Name("file") String fileName, @Name("config") Map<String, Object> config) throws Exception {
Expand Down Expand Up @@ -90,14 +100,36 @@ public Stream<ProgressInfo> query(@Name("query") String query, @Name("file") Str
return exportGraphML(fileName, source, graph, c);
}

private Stream<ProgressInfo> exportGraphML(@Name("file") String fileName, String source, SubGraph graph, ExportConfig config) throws Exception, XMLStreamException {
if (fileName != null) checkWriteAllowed();
ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, source, "graphml"));
PrintWriter printWriter = getPrintWriter(fileName, null);
private Stream<ProgressInfo> exportGraphML(@Name("file") String fileName, String source, SubGraph graph, ExportConfig exportConfig) throws Exception {
if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(exportConfig);
final String format = "graphml";
ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, source, format));
XmlGraphMLWriter exporter = new XmlGraphMLWriter();
exporter.write(graph, printWriter, reporter, config);
printWriter.flush();
printWriter.close();
return reporter.stream();
ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, false);
final PrintWriter graphMl = cypherFileManager.getPrintWriter(format);
if (exportConfig.streamStatements()) {
return ExportUtils.getProgressInfoStream(db, terminationGuard, format, exportConfig, reporter, cypherFileManager,
(reporterWithConsumer) -> {
try {
exporter.write(graph, graphMl, reporterWithConsumer, exportConfig);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} else {
exporter.write(graph, graphMl, reporter, exportConfig);
closeWriter(graphMl);
return reporter.stream();
}
}

private void closeWriter(PrintWriter writer) {
writer.flush();
try {
writer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}
1 change: 1 addition & 0 deletions src/main/java/apoc/export/graphml/XmlGraphMLWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public void write(SubGraph graph, Writer writer, Reporter reporter, ExportConfig
reporter.update(0, 1, props);
}
writeFooter(xmlWriter);
reporter.done();
}

private void writeKey(XMLStreamWriter writer, SubGraph ops, ExportConfig config) throws Exception {
Expand Down
35 changes: 25 additions & 10 deletions src/main/java/apoc/export/json/ExportJson.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import apoc.export.cypher.ExportFileManager;
import apoc.export.cypher.FileManagerFactory;
import apoc.export.util.ExportConfig;
import apoc.export.util.ExportUtils;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.export.util.ProgressReporter;
import apoc.result.ProgressInfo;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.GraphDatabaseService;
Expand All @@ -17,8 +19,8 @@
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

import java.io.PrintWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -31,6 +33,9 @@ public class ExportJson {
@Context
public GraphDatabaseService db;

@Context
public TerminationGuard terminationGuard;

public ExportJson(GraphDatabaseService db) {
this.db = db;
}
Expand Down Expand Up @@ -72,20 +77,30 @@ public Stream<ProgressInfo> query(@Name("query") String query, @Name("file") Str
return exportJson(fileName, source,result,config);
}

private Stream<ProgressInfo> exportJson(@Name("file") String fileName, String source, Object data, Map<String,Object> config) throws Exception {
ExportConfig c = new ExportConfig(config);
checkWriteAllowed(c);
ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, source, "json"));
private Stream<ProgressInfo> exportJson(String fileName, String source, Object data, Map<String,Object> config) throws Exception {
ExportConfig exportConfig = new ExportConfig(config);
if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(exportConfig);
final String format = "json";
ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, source, format));
JsonFormat exporter = new JsonFormat(db);
ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, false);
if (exportConfig.streamStatements()) {
return ExportUtils.getProgressInfoStream(db, terminationGuard, format, exportConfig, reporter, cypherFileManager,
(reporterWithConsumer) -> dump(data, exportConfig, reporterWithConsumer, exporter, cypherFileManager));
} else {
dump(data, exportConfig, reporter, exporter, cypherFileManager);
return reporter.stream();
}
}

ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, false, c.streamStatements());

try (PrintWriter printWriter = cypherFileManager.getPrintWriter("json")) {
private void dump(Object data, ExportConfig c, ProgressReporter reporter, JsonFormat exporter, ExportFileManager cypherFileManager) {
try {
if (data instanceof SubGraph)
exporter.dump(((SubGraph)data),cypherFileManager,reporter,c);
if (data instanceof Result)
exporter.dump(((Result)data),printWriter,reporter,c);
exporter.dump(((Result)data),cypherFileManager,reporter,c);
} catch (Exception e) {
throw new RuntimeException(e);
}
return reporter.stream();
}
}
Loading

0 comments on commit 06a35f9

Please sign in to comment.