From f235b1bf1f7a6dfb17dac514bd6ea3568d769674 Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Fri, 11 Dec 2020 10:22:51 +0100 Subject: [PATCH 1/5] Incremental CSV destination --- .../seed/destination_definitions.yaml | 2 +- .../connectors/destination-csv/Dockerfile | 2 +- .../destination/csv/CsvDestination.java | 44 +++++++++++++++++-- .../src/main/resources/spec.json | 1 + 4 files changed, 44 insertions(+), 5 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 638753b405897..d6240818a27b0 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -1,7 +1,7 @@ - destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6 name: Local CSV dockerRepository: airbyte/destination-csv - dockerImageTag: 0.1.4 + dockerImageTag: 0.1.5 documentationUrl: https://hub.docker.com/r/airbyte/integration-singer-csv-destination - destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503 name: Postgres diff --git a/airbyte-integrations/connectors/destination-csv/Dockerfile b/airbyte-integrations/connectors/destination-csv/Dockerfile index 10d1507a33e1e..6384ec4901cd5 100644 --- a/airbyte-integrations/connectors/destination-csv/Dockerfile +++ b/airbyte-integrations/connectors/destination-csv/Dockerfile @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/destination-csv diff --git a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index ddb8a6b35685a..20211458b85cd 100644 --- a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -40,11 +40,16 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.SyncMode; +import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; import java.time.Instant; import java.util.HashMap; import java.util.Map; @@ -109,11 +114,12 @@ public DestinationConsumer write(JsonNode config, ConfiguredAirb final String streamName = stream.getStream().getName(); final String tableName = getNamingResolver().getRawTableName(streamName); final String tmpTableName = getNamingResolver().getTmpTableName(streamName); + final SyncMode syncMode = stream.getSyncMode(); final Path tmpPath = destinationDir.resolve(tmpTableName + ".csv"); final Path finalPath = destinationDir.resolve(tableName + ".csv"); final FileWriter fileWriter = new FileWriter(tmpPath.toFile()); final CSVPrinter printer = new CSVPrinter(fileWriter, CSVFormat.DEFAULT.withHeader(COLUMN_AB_ID, COLUMN_EMITTED_AT, COLUMN_DATA)); - writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath)); + writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath, syncMode)); } return new CsvConsumer(writeConfigs, catalog); @@ -183,14 +189,40 @@ protected void close(boolean hasFailed) throws IOException { // do not persist the data, if there are any failures. if (!hasFailed) { for (final WriteConfig writeConfig : writeConfigs.values()) { - Files.move(writeConfig.getTmpPath(), writeConfig.getFinalPath(), StandardCopyOption.REPLACE_EXISTING); + final boolean fileAlreadyExists = writeConfig.getFinalPath().toFile().exists(); + if (writeConfig.getSyncMode() == SyncMode.FULL_REFRESH || !fileAlreadyExists) { + Files.move(writeConfig.getTmpPath(), writeConfig.getFinalPath(), StandardCopyOption.REPLACE_EXISTING); + } else if (writeConfig.getSyncMode() == SyncMode.INCREMENTAL) { + insertCsvFile(writeConfig.getTmpPath(), writeConfig.getFinalPath()); + } } } // clean up tmp files. for (final WriteConfig writeConfig : writeConfigs.values()) { Files.deleteIfExists(writeConfig.getTmpPath()); } + } + /** + * Copy and append Csv file to another + * @param srcFilePath CSV file to append data from + * @param dstFilePath CSV file to append data to + * @throws IOException + */ + private static void insertCsvFile(Path srcFilePath, Path dstFilePath) throws IOException { + try ( + final BufferedReader reader = Files.newBufferedReader(srcFilePath); + final BufferedWriter writer = Files.newBufferedWriter(dstFilePath, StandardCharsets.UTF_8, StandardOpenOption.CREATE, + StandardOpenOption.APPEND)) { + // Skip header line + reader.readLine(); + String line; + while ((line = reader.readLine()) != null) { + writer.write(line); + writer.newLine(); + } + writer.flush(); + } } } @@ -200,11 +232,13 @@ private static class WriteConfig { private final CSVPrinter writer; private final Path tmpPath; private final Path finalPath; + private final SyncMode syncMode; - public WriteConfig(CSVPrinter writer, Path tmpPath, Path finalPath) { + public WriteConfig(CSVPrinter writer, Path tmpPath, Path finalPath, SyncMode syncMode) { this.writer = writer; this.tmpPath = tmpPath; this.finalPath = finalPath; + this.syncMode = syncMode; } public CSVPrinter getWriter() { @@ -219,6 +253,10 @@ public Path getFinalPath() { return finalPath; } + public SyncMode getSyncMode() { + return syncMode; + } + } public static void main(String[] args) throws Exception { diff --git a/airbyte-integrations/connectors/destination-csv/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-csv/src/main/resources/spec.json index 4dba528498b03..e51bc2ccb452b 100644 --- a/airbyte-integrations/connectors/destination-csv/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-csv/src/main/resources/spec.json @@ -1,5 +1,6 @@ { "documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-csv", + "supportsIncremental": true, "connectionSpecification": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "CSV Destination Spec", From 484da49b318ee460eaf85d1a7f569aa1f1f05577 Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Fri, 11 Dec 2020 12:12:26 +0100 Subject: [PATCH 2/5] Format Code --- .../io/airbyte/integrations/destination/csv/CsvDestination.java | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index 20211458b85cd..dde8779111709 100644 --- a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -205,6 +205,7 @@ protected void close(boolean hasFailed) throws IOException { /** * Copy and append Csv file to another + * * @param srcFilePath CSV file to append data from * @param dstFilePath CSV file to append data to * @throws IOException From 1126d180d97879deb72cf9db8ccccc3b48ef68b4 Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Fri, 11 Dec 2020 12:13:31 +0100 Subject: [PATCH 3/5] Change version in json file too --- .../8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json index cf88f3bd8a3d3..4aaab96644b09 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6", "name": "Local CSV", "dockerRepository": "airbyte/destination-csv", - "dockerImageTag": "0.1.4", + "dockerImageTag": "0.1.5", "documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-csv-destination" } From 4b9ede96616d399867b73dd5fc5d32ca8126591d Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Fri, 11 Dec 2020 18:15:27 +0100 Subject: [PATCH 4/5] Handle incremental update in tmp files not in final one --- .../destination/csv/CsvDestination.java | 57 ++++--------------- 1 file changed, 10 insertions(+), 47 deletions(-) diff --git a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index dde8779111709..fd1615f3f38c2 100644 --- a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -41,16 +41,11 @@ import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.SyncMode; -import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; -import java.nio.file.StandardOpenOption; -import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -108,18 +103,22 @@ public DestinationConsumer write(JsonNode config, ConfiguredAirb FileUtils.forceMkdir(destinationDir.toFile()); - final long now = Instant.now().toEpochMilli(); final Map writeConfigs = new HashMap<>(); for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { final String streamName = stream.getStream().getName(); final String tableName = getNamingResolver().getRawTableName(streamName); final String tmpTableName = getNamingResolver().getTmpTableName(streamName); - final SyncMode syncMode = stream.getSyncMode(); final Path tmpPath = destinationDir.resolve(tmpTableName + ".csv"); final Path finalPath = destinationDir.resolve(tableName + ".csv"); - final FileWriter fileWriter = new FileWriter(tmpPath.toFile()); - final CSVPrinter printer = new CSVPrinter(fileWriter, CSVFormat.DEFAULT.withHeader(COLUMN_AB_ID, COLUMN_EMITTED_AT, COLUMN_DATA)); - writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath, syncMode)); + CSVFormat csvFormat = CSVFormat.DEFAULT.withHeader(COLUMN_AB_ID, COLUMN_EMITTED_AT, COLUMN_DATA); + final boolean isIncremental = stream.getSyncMode() == SyncMode.INCREMENTAL; + if (isIncremental && finalPath.toFile().exists()) { + Files.copy(finalPath, tmpPath, StandardCopyOption.REPLACE_EXISTING); + csvFormat = csvFormat.withSkipHeaderRecord(); + } + final FileWriter fileWriter = new FileWriter(tmpPath.toFile(), isIncremental); + final CSVPrinter printer = new CSVPrinter(fileWriter, csvFormat); + writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath)); } return new CsvConsumer(writeConfigs, catalog); @@ -189,12 +188,7 @@ protected void close(boolean hasFailed) throws IOException { // do not persist the data, if there are any failures. if (!hasFailed) { for (final WriteConfig writeConfig : writeConfigs.values()) { - final boolean fileAlreadyExists = writeConfig.getFinalPath().toFile().exists(); - if (writeConfig.getSyncMode() == SyncMode.FULL_REFRESH || !fileAlreadyExists) { Files.move(writeConfig.getTmpPath(), writeConfig.getFinalPath(), StandardCopyOption.REPLACE_EXISTING); - } else if (writeConfig.getSyncMode() == SyncMode.INCREMENTAL) { - insertCsvFile(writeConfig.getTmpPath(), writeConfig.getFinalPath()); - } } } // clean up tmp files. @@ -202,30 +196,6 @@ protected void close(boolean hasFailed) throws IOException { Files.deleteIfExists(writeConfig.getTmpPath()); } } - - /** - * Copy and append Csv file to another - * - * @param srcFilePath CSV file to append data from - * @param dstFilePath CSV file to append data to - * @throws IOException - */ - private static void insertCsvFile(Path srcFilePath, Path dstFilePath) throws IOException { - try ( - final BufferedReader reader = Files.newBufferedReader(srcFilePath); - final BufferedWriter writer = Files.newBufferedWriter(dstFilePath, StandardCharsets.UTF_8, StandardOpenOption.CREATE, - StandardOpenOption.APPEND)) { - // Skip header line - reader.readLine(); - String line; - while ((line = reader.readLine()) != null) { - writer.write(line); - writer.newLine(); - } - writer.flush(); - } - } - } private static class WriteConfig { @@ -233,13 +203,11 @@ private static class WriteConfig { private final CSVPrinter writer; private final Path tmpPath; private final Path finalPath; - private final SyncMode syncMode; - public WriteConfig(CSVPrinter writer, Path tmpPath, Path finalPath, SyncMode syncMode) { + public WriteConfig(CSVPrinter writer, Path tmpPath, Path finalPath) { this.writer = writer; this.tmpPath = tmpPath; this.finalPath = finalPath; - this.syncMode = syncMode; } public CSVPrinter getWriter() { @@ -253,11 +221,6 @@ public Path getTmpPath() { public Path getFinalPath() { return finalPath; } - - public SyncMode getSyncMode() { - return syncMode; - } - } public static void main(String[] args) throws Exception { From 3d79c7b77c94f53a647299418636dc3135656617 Mon Sep 17 00:00:00 2001 From: Christophe Duong Date: Fri, 11 Dec 2020 18:19:30 +0100 Subject: [PATCH 5/5] Format code --- .../airbyte/integrations/destination/csv/CsvDestination.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index fd1615f3f38c2..f376607384d3c 100644 --- a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -188,14 +188,16 @@ protected void close(boolean hasFailed) throws IOException { // do not persist the data, if there are any failures. if (!hasFailed) { for (final WriteConfig writeConfig : writeConfigs.values()) { - Files.move(writeConfig.getTmpPath(), writeConfig.getFinalPath(), StandardCopyOption.REPLACE_EXISTING); + Files.move(writeConfig.getTmpPath(), writeConfig.getFinalPath(), StandardCopyOption.REPLACE_EXISTING); } } // clean up tmp files. for (final WriteConfig writeConfig : writeConfigs.values()) { Files.deleteIfExists(writeConfig.getTmpPath()); } + } + } private static class WriteConfig { @@ -221,6 +223,7 @@ public Path getTmpPath() { public Path getFinalPath() { return finalPath; } + } public static void main(String[] args) throws Exception {