Skip to content

Commit

Permalink
fixes neo4j-contrib#1276: Add streaming support to export JSON and Gr…
Browse files Browse the repository at this point in the history
…aphML
  • Loading branch information
conker84 committed Oct 31, 2019
1 parent c426bb2 commit ddd13aa
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 56 deletions.
3 changes: 2 additions & 1 deletion src/main/java/apoc/export/csv/ExportCSV.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import apoc.result.ProgressInfo;
import apoc.util.QueueBasedSpliterator;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.GraphDatabaseService;
Expand Down Expand Up @@ -88,7 +89,7 @@ private void preventBulkImport(ExportConfig config) {
}

private Stream<ProgressInfo> exportCsv(@Name("file") String fileName, String source, Object data, ExportConfig exportConfig) throws Exception {
checkWriteAllowed(exportConfig);
if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(exportConfig);
ProgressInfo progressInfo = new ProgressInfo(fileName, source, "csv");
progressInfo.batchSize = exportConfig.getBatchSize();
ProgressReporter reporter = new ProgressReporter(null, null, progressInfo);
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/apoc/export/cypher/ExportCypher.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import apoc.result.ProgressInfo;
import apoc.util.QueueBasedSpliterator;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.CypherResultSubGraph;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
Expand Down Expand Up @@ -96,7 +97,7 @@ public Stream<DataProgressInfo> schema(@Name(value = "file",defaultValue = "") S
}

private Stream<DataProgressInfo> exportCypher(@Name("file") String fileName, String source, SubGraph graph, ExportConfig c, boolean onlySchema) throws IOException {
if (fileName != null) checkWriteAllowed(c);
if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(c);

ProgressInfo progressInfo = new ProgressInfo(fileName, source, "cypher");
progressInfo.batchSize = c.getBatchSize();
Expand Down
14 changes: 11 additions & 3 deletions src/main/java/apoc/export/cypher/FileManagerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,23 @@ public StringExportCypherFileManager(boolean separatedFiles) {
@Override
public PrintWriter getPrintWriter(String type) throws IOException {
if (this.separatedFiles) {
return new PrintWriter(writers.compute(type, (key, writer) -> writer == null ? new StringWriter() : writer));
return new PrintWriter(getStringWriter(type));
} else {
return new PrintWriter(writers.compute(type.equals("csv") ? type : "cypher", (key, writer) -> writer == null ? new StringWriter() : writer));
switch (type) {
case "csv":
case "json":
case "graphml":
break;
default:
type = "cypher";
}
return new PrintWriter(getStringWriter(type));
}
}

@Override
public StringWriter getStringWriter(String type) {
return writers.get(type);
return writers.computeIfAbsent(type, (key) -> new StringWriter());
}

@Override
Expand Down
60 changes: 50 additions & 10 deletions src/main/java/apoc/export/graphml/ExportGraphML.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package apoc.export.graphml;

import apoc.Pools;
import apoc.export.cypher.ExportFileManager;
import apoc.export.cypher.FileManagerFactory;
import apoc.export.util.ExportConfig;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.export.util.ProgressReporter;
import apoc.result.ProgressInfo;
import apoc.util.FileUtils;
import apoc.util.QueueBasedSpliterator;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.CypherResultSubGraph;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
Expand All @@ -14,17 +19,22 @@
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Result;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.procedure.*;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Mode;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

import javax.xml.stream.XMLStreamException;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static apoc.util.FileUtils.checkWriteAllowed;
import static apoc.util.FileUtils.getPrintWriter;

/**
* @author mh
Expand All @@ -34,6 +44,9 @@ public class ExportGraphML {
@Context
public GraphDatabaseService db;

@Context
public TerminationGuard terminationGuard;

@Procedure(name = "apoc.import.graphml",mode = Mode.WRITE)
@Description("apoc.import.graphml(file,config) - imports graphml file")
public Stream<ProgressInfo> file(@Name("file") String fileName, @Name("config") Map<String, Object> config) throws Exception {
Expand Down Expand Up @@ -90,14 +103,41 @@ public Stream<ProgressInfo> query(@Name("query") String query, @Name("file") Str
return exportGraphML(fileName, source, graph, c);
}

private Stream<ProgressInfo> exportGraphML(@Name("file") String fileName, String source, SubGraph graph, ExportConfig config) throws Exception, XMLStreamException {
if (fileName != null) checkWriteAllowed();
private Stream<ProgressInfo> exportGraphML(@Name("file") String fileName, String source, SubGraph graph, ExportConfig exportConfig) throws Exception {
if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(exportConfig);
ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, source, "graphml"));
PrintWriter printWriter = getPrintWriter(fileName, null);
XmlGraphMLWriter exporter = new XmlGraphMLWriter();
exporter.write(graph, printWriter, reporter, config);
printWriter.flush();
printWriter.close();
return reporter.stream();
ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, false, exportConfig.streamStatements());
final PrintWriter graphMl = cypherFileManager.getPrintWriter("graphml");
if (exportConfig.streamStatements()) {
long timeout = exportConfig.getTimeoutSeconds();
final ArrayBlockingQueue<ProgressInfo> queue = new ArrayBlockingQueue<>(1000);
ProgressReporter reporterWithConsumer = reporter.withConsumer(
(pi) -> {
Util.put(queue, pi == ProgressInfo.EMPTY ? ProgressInfo.EMPTY : new ProgressInfo(pi).drain(cypherFileManager.getStringWriter("graphml")), timeout);
closeWriter(graphMl);
}
);
Util.inTxFuture(Pools.DEFAULT, db, () -> {
exporter.write(graph, graphMl, reporterWithConsumer, exportConfig);
return true;
});
QueueBasedSpliterator<ProgressInfo> spliterator = new QueueBasedSpliterator<>(queue, ProgressInfo.EMPTY, terminationGuard, timeout);
return StreamSupport.stream(spliterator, false);
} else {
exporter.write(graph, graphMl, reporter, exportConfig);
closeWriter(graphMl);
return reporter.stream();
}
}

private void closeWriter(PrintWriter writer) {
writer.flush();
try {
writer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}
1 change: 1 addition & 0 deletions src/main/java/apoc/export/graphml/XmlGraphMLWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public void write(SubGraph graph, Writer writer, Reporter reporter, ExportConfig
reporter.update(0, 1, props);
}
writeFooter(xmlWriter);
reporter.done();
}

private void writeKey(XMLStreamWriter writer, SubGraph ops, ExportConfig config) throws Exception {
Expand Down
48 changes: 35 additions & 13 deletions src/main/java/apoc/export/json/ExportJson.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package apoc.export.json;

import apoc.Pools;
import apoc.export.cypher.ExportFileManager;
import apoc.export.cypher.FileManagerFactory;
import apoc.export.util.ExportConfig;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.export.util.ProgressReporter;
import apoc.result.ProgressInfo;
import apoc.util.QueueBasedSpliterator;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.GraphDatabaseService;
Expand All @@ -17,20 +20,25 @@
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

import java.io.PrintWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static apoc.util.FileUtils.checkWriteAllowed;

public class ExportJson {
@Context
public GraphDatabaseService db;

@Context
public TerminationGuard terminationGuard;

public ExportJson(GraphDatabaseService db) {
this.db = db;
}
Expand Down Expand Up @@ -72,20 +80,34 @@ public Stream<ProgressInfo> query(@Name("query") String query, @Name("file") Str
return exportJson(fileName, source,result,config);
}

private Stream<ProgressInfo> exportJson(@Name("file") String fileName, String source, Object data, Map<String,Object> config) throws Exception {
ExportConfig c = new ExportConfig(config);
checkWriteAllowed(c);
private Stream<ProgressInfo> exportJson(String fileName, String source, Object data, Map<String,Object> config) throws Exception {
ExportConfig exportConfig = new ExportConfig(config);
if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(exportConfig);
ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, source, "json"));
JsonFormat exporter = new JsonFormat(db);

ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, false, c.streamStatements());

try (PrintWriter printWriter = cypherFileManager.getPrintWriter("json")) {
if (data instanceof SubGraph)
exporter.dump(((SubGraph)data),cypherFileManager,reporter,c);
if (data instanceof Result)
exporter.dump(((Result)data),printWriter,reporter,c);
ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, false, exportConfig.streamStatements());
if (exportConfig.streamStatements()) {
long timeout = exportConfig.getTimeoutSeconds();
final ArrayBlockingQueue<ProgressInfo> queue = new ArrayBlockingQueue<>(1000);
ProgressReporter reporterWithConsumer = reporter.withConsumer(
(pi) -> Util.put(queue, pi == ProgressInfo.EMPTY ? ProgressInfo.EMPTY : new ProgressInfo(pi).drain(cypherFileManager.getStringWriter("json")), timeout)
);
Util.inTxFuture(Pools.DEFAULT, db, () -> {
dump(data, exportConfig, reporterWithConsumer, exporter, cypherFileManager);
return true;
});
QueueBasedSpliterator<ProgressInfo> spliterator = new QueueBasedSpliterator<>(queue, ProgressInfo.EMPTY, terminationGuard, timeout);
return StreamSupport.stream(spliterator, false);
} else {
dump(data, exportConfig, reporter, exporter, cypherFileManager);
return reporter.stream();
}
return reporter.stream();
}

private void dump(Object data, ExportConfig c, ProgressReporter reporter, JsonFormat exporter, ExportFileManager cypherFileManager) throws Exception {
if (data instanceof SubGraph)
exporter.dump(((SubGraph)data),cypherFileManager,reporter,c);
if (data instanceof Result)
exporter.dump(((Result)data),cypherFileManager,reporter,c);
}
}
29 changes: 18 additions & 11 deletions src/main/java/apoc/export/json/JsonFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.*;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Path;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;

import java.io.IOException;
import java.io.Reader;
Expand All @@ -34,12 +39,15 @@ public ProgressInfo load(Reader reader, Reporter reporter, ExportConfig config)
}

private ProgressInfo dump(Writer writer, Reporter reporter, Consumer<JsonGenerator> consumer) throws Exception {
try (Transaction tx = db.beginTx(); JsonGenerator jsonGenerator = getJsonGenerator(writer);) {

try (Transaction tx = db.beginTx();
JsonGenerator jsonGenerator = getJsonGenerator(writer)) {
consumer.accept(jsonGenerator);

jsonGenerator.flush();
tx.success();
reporter.done();
return reporter.getTotal();
} finally {
writer.close();
}
}

Expand All @@ -56,7 +64,7 @@ public ProgressInfo dump(SubGraph graph, ExportFileManager writer, Reporter repo
return dump(writer.getPrintWriter("json"), reporter, consumer);
}

public ProgressInfo dump(Result result, Writer writer, Reporter reporter, ExportConfig config) throws Exception {
public ProgressInfo dump(Result result, ExportFileManager writer, Reporter reporter, ExportConfig config) throws Exception {
Consumer<JsonGenerator> consumer = (jsonGenerator) -> {
try {
String[] header = result.columns().toArray(new String[result.columns().size()]);
Expand All @@ -69,14 +77,14 @@ public ProgressInfo dump(Result result, Writer writer, Reporter reporter, Export
throw new RuntimeException(e);
}
};
return dump(writer, reporter, consumer);
return dump(writer.getPrintWriter("json"), reporter, consumer);
}

private JsonGenerator getJsonGenerator(Writer writer) throws IOException {
JsonFactory jsonF = new JsonFactory();
JsonGenerator jsonGenerator = jsonF.createGenerator(writer);
jsonGenerator.setCodec(JsonUtil.OBJECT_MAPPER);
jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
JsonGenerator jsonGenerator = new JsonFactory()
.createGenerator(writer)
.setCodec(JsonUtil.OBJECT_MAPPER)
.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
return jsonGenerator;
}

Expand Down Expand Up @@ -158,7 +166,6 @@ private void write(Reporter reporter, JsonGenerator jsonGenerator, ExportConfig
JsonFormatSerializer.DEFAULT.serializeProperty(jsonGenerator, keyName, value, writeKey);
reporter.update(0, 0, 1);
break;

}
}

Expand Down
40 changes: 26 additions & 14 deletions src/test/java/apoc/export/graphml/ExportGraphMLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,7 @@ public void testImportGraphML() throws Exception {
fw.write(EXPECTED_TYPES); fw.close();
TestUtil.testCall(db, "CALL apoc.import.graphml({file},{readLabels:true})", map("file", output.getAbsolutePath()),
(r) -> {
assertEquals(3L, r.get("nodes"));
assertEquals(1L, r.get("relationships"));
assertEquals(8L, r.get("properties"));
assertEquals(output.getAbsolutePath(), r.get("file"));
if (r.get("source").toString().contains(":"))
assertEquals("statement" + ": nodes(2), rels(1)", r.get("source"));
else
assertEquals("file", r.get("source"));
assertEquals("graphml", r.get("format"));
assertTrue("Should get time greater than 0",((long) r.get("time")) > 0);
assertResults(output, r, "statement");
});

TestUtil.testCall(db, "MATCH (c:Bar {age: 12, values: [1,2,3]}) RETURN COUNT(c) AS c", null, (r) -> assertEquals(1L, r.get("c")));
Expand Down Expand Up @@ -253,7 +244,7 @@ public void testExportGraphGraphML() throws Exception {
assertXMLEquals(output, EXPECTED_FALSE);
}

private void assertXMLEquals(File output, String xmlString) {
private void assertXMLEquals(Object output, String xmlString) {
Diff myDiff = DiffBuilder.compare(xmlString)
.withTest(output)
.checkForSimilar()
Expand Down Expand Up @@ -420,15 +411,36 @@ public void testExportGraphmlQueryWithStringCaptionCamelCase() throws FileNotFou
}

private void assertResults(File output, Map<String, Object> r, final String source) {
assertEquals(3L, r.get("nodes"));
assertEquals(1L, r.get("relationships"));
assertEquals(8L, r.get("properties"));
assertCommons(r);
assertEquals(output.getAbsolutePath(), r.get("file"));
if (r.get("source").toString().contains(":"))
assertEquals(source + ": nodes(3), rels(1)", r.get("source"));
else
assertEquals("file", r.get("source"));
assertNull("data should be null", r.get("data"));
}

private void assertCommons(Map<String, Object> r) {
assertEquals(3L, r.get("nodes"));
assertEquals(1L, r.get("relationships"));
assertEquals(8L, r.get("properties"));
assertEquals("graphml", r.get("format"));
assertTrue("Should get time greater than 0",((long) r.get("time")) > 0);
}

private void assertStreamResults(Map<String, Object> r, final String source) {
assertCommons(r);
assertEquals(source + ": nodes(3), rels(1)", r.get("source"));
assertNull("file should be null", r.get("file"));
assertNotNull("data should be not null", r.get("data"));
}

@Test
public void testExportAllGraphMLStream() throws Exception {
TestUtil.testCall(db, "CALL apoc.export.graphml.all(null, {stream: true})",
(r) -> {
assertStreamResults(r, "database");
assertXMLEquals(r.get("data"), EXPECTED_FALSE);
});
}
}
Loading

0 comments on commit ddd13aa

Please sign in to comment.