From ddd13aaccffdc697a521df7007c536d5df60398b Mon Sep 17 00:00:00 2001 From: Andrea Santurbano Date: Thu, 31 Oct 2019 17:58:32 +0100 Subject: [PATCH] fixes #1276: Add streaming support to export JSON and GraphML --- src/main/java/apoc/export/csv/ExportCSV.java | 3 +- .../java/apoc/export/cypher/ExportCypher.java | 3 +- .../export/cypher/FileManagerFactory.java | 14 ++++- .../apoc/export/graphml/ExportGraphML.java | 60 +++++++++++++++---- .../apoc/export/graphml/XmlGraphMLWriter.java | 1 + .../java/apoc/export/json/ExportJson.java | 48 +++++++++++---- .../java/apoc/export/json/JsonFormat.java | 29 +++++---- .../export/graphml/ExportGraphMLTest.java | 40 ++++++++----- .../java/apoc/export/json/ExportJsonTest.java | 60 ++++++++++++++++++- 9 files changed, 202 insertions(+), 56 deletions(-) diff --git a/src/main/java/apoc/export/csv/ExportCSV.java b/src/main/java/apoc/export/csv/ExportCSV.java index c62fac020c..7948c0fcf4 100644 --- a/src/main/java/apoc/export/csv/ExportCSV.java +++ b/src/main/java/apoc/export/csv/ExportCSV.java @@ -9,6 +9,7 @@ import apoc.result.ProgressInfo; import apoc.util.QueueBasedSpliterator; import apoc.util.Util; +import org.apache.commons.lang3.StringUtils; import org.neo4j.cypher.export.DatabaseSubGraph; import org.neo4j.cypher.export.SubGraph; import org.neo4j.graphdb.GraphDatabaseService; @@ -88,7 +89,7 @@ private void preventBulkImport(ExportConfig config) { } private Stream exportCsv(@Name("file") String fileName, String source, Object data, ExportConfig exportConfig) throws Exception { - checkWriteAllowed(exportConfig); + if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(exportConfig); ProgressInfo progressInfo = new ProgressInfo(fileName, source, "csv"); progressInfo.batchSize = exportConfig.getBatchSize(); ProgressReporter reporter = new ProgressReporter(null, null, progressInfo); diff --git a/src/main/java/apoc/export/cypher/ExportCypher.java b/src/main/java/apoc/export/cypher/ExportCypher.java index 0d5c470006..48626784c0 100644 --- a/src/main/java/apoc/export/cypher/ExportCypher.java +++ b/src/main/java/apoc/export/cypher/ExportCypher.java @@ -7,6 +7,7 @@ import apoc.result.ProgressInfo; import apoc.util.QueueBasedSpliterator; import apoc.util.Util; +import org.apache.commons.lang3.StringUtils; import org.neo4j.cypher.export.CypherResultSubGraph; import org.neo4j.cypher.export.DatabaseSubGraph; import org.neo4j.cypher.export.SubGraph; @@ -96,7 +97,7 @@ public Stream schema(@Name(value = "file",defaultValue = "") S } private Stream exportCypher(@Name("file") String fileName, String source, SubGraph graph, ExportConfig c, boolean onlySchema) throws IOException { - if (fileName != null) checkWriteAllowed(c); + if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(c); ProgressInfo progressInfo = new ProgressInfo(fileName, source, "cypher"); progressInfo.batchSize = c.getBatchSize(); diff --git a/src/main/java/apoc/export/cypher/FileManagerFactory.java b/src/main/java/apoc/export/cypher/FileManagerFactory.java index 5ef01bc682..681ff2f8ec 100644 --- a/src/main/java/apoc/export/cypher/FileManagerFactory.java +++ b/src/main/java/apoc/export/cypher/FileManagerFactory.java @@ -82,15 +82,23 @@ public StringExportCypherFileManager(boolean separatedFiles) { @Override public PrintWriter getPrintWriter(String type) throws IOException { if (this.separatedFiles) { - return new PrintWriter(writers.compute(type, (key, writer) -> writer == null ? new StringWriter() : writer)); + return new PrintWriter(getStringWriter(type)); } else { - return new PrintWriter(writers.compute(type.equals("csv") ? type : "cypher", (key, writer) -> writer == null ? new StringWriter() : writer)); + switch (type) { + case "csv": + case "json": + case "graphml": + break; + default: + type = "cypher"; + } + return new PrintWriter(getStringWriter(type)); } } @Override public StringWriter getStringWriter(String type) { - return writers.get(type); + return writers.computeIfAbsent(type, (key) -> new StringWriter()); } @Override diff --git a/src/main/java/apoc/export/graphml/ExportGraphML.java b/src/main/java/apoc/export/graphml/ExportGraphML.java index 4e688aada0..68c43528f5 100644 --- a/src/main/java/apoc/export/graphml/ExportGraphML.java +++ b/src/main/java/apoc/export/graphml/ExportGraphML.java @@ -1,11 +1,16 @@ package apoc.export.graphml; +import apoc.Pools; +import apoc.export.cypher.ExportFileManager; +import apoc.export.cypher.FileManagerFactory; import apoc.export.util.ExportConfig; import apoc.export.util.NodesAndRelsSubGraph; import apoc.export.util.ProgressReporter; import apoc.result.ProgressInfo; import apoc.util.FileUtils; +import apoc.util.QueueBasedSpliterator; import apoc.util.Util; +import org.apache.commons.lang3.StringUtils; import org.neo4j.cypher.export.CypherResultSubGraph; import org.neo4j.cypher.export.DatabaseSubGraph; import org.neo4j.cypher.export.SubGraph; @@ -14,17 +19,22 @@ import org.neo4j.graphdb.Relationship; import org.neo4j.graphdb.Result; import org.neo4j.helpers.collection.Iterables; -import org.neo4j.procedure.*; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Mode; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; +import org.neo4j.procedure.TerminationGuard; -import javax.xml.stream.XMLStreamException; import java.io.PrintWriter; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static apoc.util.FileUtils.checkWriteAllowed; -import static apoc.util.FileUtils.getPrintWriter; /** * @author mh @@ -34,6 +44,9 @@ public class ExportGraphML { @Context public GraphDatabaseService db; + @Context + public TerminationGuard terminationGuard; + @Procedure(name = "apoc.import.graphml",mode = Mode.WRITE) @Description("apoc.import.graphml(file,config) - imports graphml file") public Stream file(@Name("file") String fileName, @Name("config") Map config) throws Exception { @@ -90,14 +103,41 @@ public Stream query(@Name("query") String query, @Name("file") Str return exportGraphML(fileName, source, graph, c); } - private Stream exportGraphML(@Name("file") String fileName, String source, SubGraph graph, ExportConfig config) throws Exception, XMLStreamException { - if (fileName != null) checkWriteAllowed(); + private Stream exportGraphML(@Name("file") String fileName, String source, SubGraph graph, ExportConfig exportConfig) throws Exception { + if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(exportConfig); ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, source, "graphml")); - PrintWriter printWriter = getPrintWriter(fileName, null); XmlGraphMLWriter exporter = new XmlGraphMLWriter(); - exporter.write(graph, printWriter, reporter, config); - printWriter.flush(); - printWriter.close(); - return reporter.stream(); + ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, false, exportConfig.streamStatements()); + final PrintWriter graphMl = cypherFileManager.getPrintWriter("graphml"); + if (exportConfig.streamStatements()) { + long timeout = exportConfig.getTimeoutSeconds(); + final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1000); + ProgressReporter reporterWithConsumer = reporter.withConsumer( + (pi) -> { + Util.put(queue, pi == ProgressInfo.EMPTY ? ProgressInfo.EMPTY : new ProgressInfo(pi).drain(cypherFileManager.getStringWriter("graphml")), timeout); + closeWriter(graphMl); + } + ); + Util.inTxFuture(Pools.DEFAULT, db, () -> { + exporter.write(graph, graphMl, reporterWithConsumer, exportConfig); + return true; + }); + QueueBasedSpliterator spliterator = new QueueBasedSpliterator<>(queue, ProgressInfo.EMPTY, terminationGuard, timeout); + return StreamSupport.stream(spliterator, false); + } else { + exporter.write(graph, graphMl, reporter, exportConfig); + closeWriter(graphMl); + return reporter.stream(); + } } + + private void closeWriter(PrintWriter writer) { + writer.flush(); + try { + writer.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } diff --git a/src/main/java/apoc/export/graphml/XmlGraphMLWriter.java b/src/main/java/apoc/export/graphml/XmlGraphMLWriter.java index e97b80a9cd..0b2c7eb5cb 100644 --- a/src/main/java/apoc/export/graphml/XmlGraphMLWriter.java +++ b/src/main/java/apoc/export/graphml/XmlGraphMLWriter.java @@ -36,6 +36,7 @@ public void write(SubGraph graph, Writer writer, Reporter reporter, ExportConfig reporter.update(0, 1, props); } writeFooter(xmlWriter); + reporter.done(); } private void writeKey(XMLStreamWriter writer, SubGraph ops, ExportConfig config) throws Exception { diff --git a/src/main/java/apoc/export/json/ExportJson.java b/src/main/java/apoc/export/json/ExportJson.java index 646621126f..9d3bdf0fde 100644 --- a/src/main/java/apoc/export/json/ExportJson.java +++ b/src/main/java/apoc/export/json/ExportJson.java @@ -1,12 +1,15 @@ package apoc.export.json; +import apoc.Pools; import apoc.export.cypher.ExportFileManager; import apoc.export.cypher.FileManagerFactory; import apoc.export.util.ExportConfig; import apoc.export.util.NodesAndRelsSubGraph; import apoc.export.util.ProgressReporter; import apoc.result.ProgressInfo; +import apoc.util.QueueBasedSpliterator; import apoc.util.Util; +import org.apache.commons.lang3.StringUtils; import org.neo4j.cypher.export.DatabaseSubGraph; import org.neo4j.cypher.export.SubGraph; import org.neo4j.graphdb.GraphDatabaseService; @@ -17,13 +20,15 @@ import org.neo4j.procedure.Description; import org.neo4j.procedure.Name; import org.neo4j.procedure.Procedure; +import org.neo4j.procedure.TerminationGuard; -import java.io.PrintWriter; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static apoc.util.FileUtils.checkWriteAllowed; @@ -31,6 +36,9 @@ public class ExportJson { @Context public GraphDatabaseService db; + @Context + public TerminationGuard terminationGuard; + public ExportJson(GraphDatabaseService db) { this.db = db; } @@ -72,20 +80,34 @@ public Stream query(@Name("query") String query, @Name("file") Str return exportJson(fileName, source,result,config); } - private Stream exportJson(@Name("file") String fileName, String source, Object data, Map config) throws Exception { - ExportConfig c = new ExportConfig(config); - checkWriteAllowed(c); + private Stream exportJson(String fileName, String source, Object data, Map config) throws Exception { + ExportConfig exportConfig = new ExportConfig(config); + if (StringUtils.isNotBlank(fileName)) checkWriteAllowed(exportConfig); ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, source, "json")); JsonFormat exporter = new JsonFormat(db); - - ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, false, c.streamStatements()); - - try (PrintWriter printWriter = cypherFileManager.getPrintWriter("json")) { - if (data instanceof SubGraph) - exporter.dump(((SubGraph)data),cypherFileManager,reporter,c); - if (data instanceof Result) - exporter.dump(((Result)data),printWriter,reporter,c); + ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, false, exportConfig.streamStatements()); + if (exportConfig.streamStatements()) { + long timeout = exportConfig.getTimeoutSeconds(); + final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1000); + ProgressReporter reporterWithConsumer = reporter.withConsumer( + (pi) -> Util.put(queue, pi == ProgressInfo.EMPTY ? ProgressInfo.EMPTY : new ProgressInfo(pi).drain(cypherFileManager.getStringWriter("json")), timeout) + ); + Util.inTxFuture(Pools.DEFAULT, db, () -> { + dump(data, exportConfig, reporterWithConsumer, exporter, cypherFileManager); + return true; + }); + QueueBasedSpliterator spliterator = new QueueBasedSpliterator<>(queue, ProgressInfo.EMPTY, terminationGuard, timeout); + return StreamSupport.stream(spliterator, false); + } else { + dump(data, exportConfig, reporter, exporter, cypherFileManager); + return reporter.stream(); } - return reporter.stream(); + } + + private void dump(Object data, ExportConfig c, ProgressReporter reporter, JsonFormat exporter, ExportFileManager cypherFileManager) throws Exception { + if (data instanceof SubGraph) + exporter.dump(((SubGraph)data),cypherFileManager,reporter,c); + if (data instanceof Result) + exporter.dump(((Result)data),cypherFileManager,reporter,c); } } diff --git a/src/main/java/apoc/export/json/JsonFormat.java b/src/main/java/apoc/export/json/JsonFormat.java index 19dedf24f8..e877d77e1b 100644 --- a/src/main/java/apoc/export/json/JsonFormat.java +++ b/src/main/java/apoc/export/json/JsonFormat.java @@ -11,7 +11,12 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; import org.neo4j.cypher.export.SubGraph; -import org.neo4j.graphdb.*; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Path; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.Result; +import org.neo4j.graphdb.Transaction; import java.io.IOException; import java.io.Reader; @@ -34,12 +39,15 @@ public ProgressInfo load(Reader reader, Reporter reporter, ExportConfig config) } private ProgressInfo dump(Writer writer, Reporter reporter, Consumer consumer) throws Exception { - try (Transaction tx = db.beginTx(); JsonGenerator jsonGenerator = getJsonGenerator(writer);) { - + try (Transaction tx = db.beginTx(); + JsonGenerator jsonGenerator = getJsonGenerator(writer)) { consumer.accept(jsonGenerator); - + jsonGenerator.flush(); tx.success(); + reporter.done(); return reporter.getTotal(); + } finally { + writer.close(); } } @@ -56,7 +64,7 @@ public ProgressInfo dump(SubGraph graph, ExportFileManager writer, Reporter repo return dump(writer.getPrintWriter("json"), reporter, consumer); } - public ProgressInfo dump(Result result, Writer writer, Reporter reporter, ExportConfig config) throws Exception { + public ProgressInfo dump(Result result, ExportFileManager writer, Reporter reporter, ExportConfig config) throws Exception { Consumer consumer = (jsonGenerator) -> { try { String[] header = result.columns().toArray(new String[result.columns().size()]); @@ -69,14 +77,14 @@ public ProgressInfo dump(Result result, Writer writer, Reporter reporter, Export throw new RuntimeException(e); } }; - return dump(writer, reporter, consumer); + return dump(writer.getPrintWriter("json"), reporter, consumer); } private JsonGenerator getJsonGenerator(Writer writer) throws IOException { - JsonFactory jsonF = new JsonFactory(); - JsonGenerator jsonGenerator = jsonF.createGenerator(writer); - jsonGenerator.setCodec(JsonUtil.OBJECT_MAPPER); - jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); + JsonGenerator jsonGenerator = new JsonFactory() + .createGenerator(writer) + .setCodec(JsonUtil.OBJECT_MAPPER) + .setPrettyPrinter(new MinimalPrettyPrinter("\n")); return jsonGenerator; } @@ -158,7 +166,6 @@ private void write(Reporter reporter, JsonGenerator jsonGenerator, ExportConfig JsonFormatSerializer.DEFAULT.serializeProperty(jsonGenerator, keyName, value, writeKey); reporter.update(0, 0, 1); break; - } } diff --git a/src/test/java/apoc/export/graphml/ExportGraphMLTest.java b/src/test/java/apoc/export/graphml/ExportGraphMLTest.java index 44e0f328e3..0b65156435 100644 --- a/src/test/java/apoc/export/graphml/ExportGraphMLTest.java +++ b/src/test/java/apoc/export/graphml/ExportGraphMLTest.java @@ -155,16 +155,7 @@ public void testImportGraphML() throws Exception { fw.write(EXPECTED_TYPES); fw.close(); TestUtil.testCall(db, "CALL apoc.import.graphml({file},{readLabels:true})", map("file", output.getAbsolutePath()), (r) -> { - assertEquals(3L, r.get("nodes")); - assertEquals(1L, r.get("relationships")); - assertEquals(8L, r.get("properties")); - assertEquals(output.getAbsolutePath(), r.get("file")); - if (r.get("source").toString().contains(":")) - assertEquals("statement" + ": nodes(2), rels(1)", r.get("source")); - else - assertEquals("file", r.get("source")); - assertEquals("graphml", r.get("format")); - assertTrue("Should get time greater than 0",((long) r.get("time")) > 0); + assertResults(output, r, "statement"); }); TestUtil.testCall(db, "MATCH (c:Bar {age: 12, values: [1,2,3]}) RETURN COUNT(c) AS c", null, (r) -> assertEquals(1L, r.get("c"))); @@ -253,7 +244,7 @@ public void testExportGraphGraphML() throws Exception { assertXMLEquals(output, EXPECTED_FALSE); } - private void assertXMLEquals(File output, String xmlString) { + private void assertXMLEquals(Object output, String xmlString) { Diff myDiff = DiffBuilder.compare(xmlString) .withTest(output) .checkForSimilar() @@ -420,15 +411,36 @@ public void testExportGraphmlQueryWithStringCaptionCamelCase() throws FileNotFou } private void assertResults(File output, Map r, final String source) { - assertEquals(3L, r.get("nodes")); - assertEquals(1L, r.get("relationships")); - assertEquals(8L, r.get("properties")); + assertCommons(r); assertEquals(output.getAbsolutePath(), r.get("file")); if (r.get("source").toString().contains(":")) assertEquals(source + ": nodes(3), rels(1)", r.get("source")); else assertEquals("file", r.get("source")); + assertNull("data should be null", r.get("data")); + } + + private void assertCommons(Map r) { + assertEquals(3L, r.get("nodes")); + assertEquals(1L, r.get("relationships")); + assertEquals(8L, r.get("properties")); assertEquals("graphml", r.get("format")); assertTrue("Should get time greater than 0",((long) r.get("time")) > 0); } + + private void assertStreamResults(Map r, final String source) { + assertCommons(r); + assertEquals(source + ": nodes(3), rels(1)", r.get("source")); + assertNull("file should be null", r.get("file")); + assertNotNull("data should be not null", r.get("data")); + } + + @Test + public void testExportAllGraphMLStream() throws Exception { + TestUtil.testCall(db, "CALL apoc.export.graphml.all(null, {stream: true})", + (r) -> { + assertStreamResults(r, "database"); + assertXMLEquals(r.get("data"), EXPECTED_FALSE); + }); + } } \ No newline at end of file diff --git a/src/test/java/apoc/export/json/ExportJsonTest.java b/src/test/java/apoc/export/json/ExportJsonTest.java index fbd4ef1971..d8517a061f 100644 --- a/src/test/java/apoc/export/json/ExportJsonTest.java +++ b/src/test/java/apoc/export/json/ExportJsonTest.java @@ -15,6 +15,8 @@ import static apoc.util.MapUtil.map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class ExportJsonTest { @@ -55,6 +57,18 @@ public void testExportAllJson() throws Exception { assertFileEquals(filename); } + @Test + public void testExportAllJsonStream() throws Exception { + String filename = "all.json"; + TestUtil.testCall(db, "CALL apoc.export.json.all(null, {stream: true})", + map("file", filename), + (r) -> { + assertStreamResults(r, "database"); + assertStreamEquals(filename, r.get("data").toString()); + } + ); + } + @Test public void testExportPointMapDatetimeJson() throws Exception { String filename = "mapPointDatetime.json"; @@ -71,6 +85,26 @@ public void testExportPointMapDatetimeJson() throws Exception { assertTrue("Should get statement",r.get("source").toString().contains("statement: cols(7)")); assertEquals(filename, r.get("file")); assertEquals("json", r.get("format")); + assertFileEquals(filename); + }); + assertFileEquals(filename); + } + + @Test + public void testExportPointMapDatetimeStreamJson() throws Exception { + String filename = "mapPointDatetime.json"; + String query = "return {data: 1, value: {age: 12, name:'Mike', data: {number: [1,3,5], born: date('2018-10-29'), place: point({latitude: 13.1, longitude: 33.46789})}}} as map, " + + "datetime('2015-06-24T12:50:35.556+0100') AS theDateTime, " + + "localdatetime('2015185T19:32:24') AS theLocalDateTime," + + "point({latitude: 13.1, longitude: 33.46789}) as point," + + "date('+2015-W13-4') as date," + + "time('125035.556+0100') as time," + + "localTime('12:50:35.556') as localTime"; + TestUtil.testCall(db, "CALL apoc.export.json.query({query}, null, {stream: true})", + map("file", filename, "query", query), + (r) -> { + assertTrue("Should get statement", r.get("source").toString().contains("statement: cols(7)")); + assertStreamEquals(filename, r.get("data").toString()); }); assertFileEquals(filename); } @@ -81,7 +115,7 @@ public void testExportListNode() throws Exception { String query = "MATCH (u:User) RETURN COLLECT(u) as list"; - TestUtil.testCall(db, "CALL apoc.export.json.query({query},{file})", + TestUtil.testCall(db, "CALL apoc.export.json.query({query}, {file})", map("file", filename, "query", query), (r) -> { assertTrue("Should get statement",r.get("source").toString().contains("statement: cols(1)")); @@ -338,8 +372,28 @@ private void assertResults(String filename, Map r, final String } private void assertFileEquals(String fileName) { - String expectedText = TestUtil.readFileToString(new File(directoryExpected, fileName)); String actualText = TestUtil.readFileToString(new File(directory, fileName)); - assertEquals(JsonUtil.parse(expectedText,null,Object.class), JsonUtil.parse(actualText,null,Object.class)); + assertStreamEquals(fileName, actualText); + } + + private void assertStreamResults(Map r, final String source) { + assertEquals(3L, r.get("nodes")); + assertEquals(1L, r.get("relationships")); + assertEquals(10L, r.get("properties")); + assertEquals(source + ": nodes(3), rels(1)", r.get("source")); + assertNull("file should be null", r.get("file")); + assertNotNull("data should be not null", r.get("data")); + assertEquals("json", r.get("format")); + assertTrue("Should get time greater than 0",((long) r.get("time")) >= 0); + } + + private void assertStreamEquals(String fileName, String actualText) { + String expectedText = TestUtil.readFileToString(new File(directoryExpected, fileName)); + String[] actualArray = actualText.split("\n"); + String[] expectArray = expectedText.split("\n"); + assertEquals(expectArray.length, actualArray.length); + for (int i = 0; i < actualArray.length; i++) { + assertEquals(JsonUtil.parse(expectArray[i],null, Object.class), JsonUtil.parse(actualArray[i],null, Object.class)); + } } }