diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json index f2a687154155..a0d5a1f0c9e6 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba", "name": "Snowflake", "dockerRepository": "airbyte/destination-snowflake", - "dockerImageTag": "0.3.15", + "dockerImageTag": "0.3.16", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/f7a7d195-377f-cf5b-70a5-be6b819019dc.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/f7a7d195-377f-cf5b-70a5-be6b819019dc.json index ffd94c24f212..a3384e94df6d 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/f7a7d195-377f-cf5b-70a5-be6b819019dc.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/f7a7d195-377f-cf5b-70a5-be6b819019dc.json @@ -2,7 +2,7 @@ "destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc", "name": "Redshift", "dockerRepository": "airbyte/destination-redshift", - "dockerImageTag": "0.3.17", + "dockerImageTag": "0.3.18", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift", "icon": "redshift.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 8b2ce00effa4..e95b46ee7d4f 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -42,7 +42,7 @@ - destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba name: Snowflake dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.3.15 + dockerImageTag: 0.3.16 documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake - destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 name: S3 @@ -52,7 +52,7 @@ - destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc name: Redshift dockerRepository: airbyte/destination-redshift - dockerImageTag: 0.3.17 + dockerImageTag: 0.3.18 documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift icon: redshift.svg - destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e diff --git a/airbyte-integrations/connectors/destination-jdbc/Dockerfile b/airbyte-integrations/connectors/destination-jdbc/Dockerfile index 843bf880b97a..3607f5055c43 100644 --- a/airbyte-integrations/connectors/destination-jdbc/Dockerfile +++ b/airbyte-integrations/connectors/destination-jdbc/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.6 +LABEL io.airbyte.version=0.3.7 LABEL io.airbyte.name=airbyte/destination-jdbc diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/StagingFilenameGenerator.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/StagingFilenameGenerator.java new file mode 100644 index 000000000000..4033a570218a --- /dev/null +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/StagingFilenameGenerator.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.jdbc; + +/** + * The staging file is uploaded to cloud storage in multiple parts. This class keeps track of the + * filename, and returns a new one when the old file has had enough parts. + */ +public class StagingFilenameGenerator { + + private final String streamName; + private final int maxPartsPerFile; + + // the file suffix will change after the max number of file + // parts have been generated for the current suffix; + // its value starts from 0. + private int currentFileSuffix = 0; + // the number of parts that have been generated for the current + // file suffix; its value range will be [1, maxPartsPerFile] + private int currentFileSuffixPartCount = 0; + + public StagingFilenameGenerator(final String streamName, final int maxPartsPerFile) { + this.streamName = streamName; + this.maxPartsPerFile = maxPartsPerFile; + } + + /** + * This method is assumed to be called whenever one part of a file is going to be created. The + * currentFileSuffix increments from 0. The currentFileSuffixPartCount cycles from 1 to + * maxPartsPerFile. + */ + public String getStagingFilename() { + if (currentFileSuffixPartCount < maxPartsPerFile) { + // when the number of parts for the file has not reached the max, + // keep using the same file (i.e. keep the suffix) + currentFileSuffixPartCount += 1; + } else { + // otherwise, reset the part counter, and use a different file + // (i.e. update the suffix) + currentFileSuffix += 1; + currentFileSuffixPartCount = 1; + } + return String.format("%s_%05d", streamName, currentFileSuffix); + } + +} diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java index 796ec0475a75..94f35c6f4ce2 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java @@ -11,10 +11,10 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.DestinationSyncMode; @@ -40,6 +40,15 @@ public abstract class GcsStreamCopier implements StreamCopier { private static final Logger LOGGER = LoggerFactory.getLogger(GcsStreamCopier.class); + // It is optimal to write every 10,000,000 records (BATCH_SIZE * MAX_PER_FILE_PART_COUNT) to a new + // file. + // The BATCH_SIZE is defined in CopyConsumerFactory. + // The average size of such a file will be about 1 GB. + // This will make it easier to work with files and speed up the recording of large amounts of data. + // In addition, for a large number of records, we will not get a drop in the copy request to + // QUERY_TIMEOUT when + // the records from the file are copied to the staging table. + public static final int MAX_PARTS_PER_FILE = 1000; private final Storage storageClient; private final GcsConfig gcsConfig; @@ -54,6 +63,7 @@ public abstract class GcsStreamCopier implements StreamCopier { private final HashMap channels = new HashMap<>(); private final HashMap csvPrinters = new HashMap<>(); private final String stagingFolder; + private final StagingFilenameGenerator filenameGenerator; public GcsStreamCopier(final String stagingFolder, final DestinationSyncMode destSyncMode, @@ -74,28 +84,31 @@ public GcsStreamCopier(final String stagingFolder, this.tmpTableName = nameTransformer.getTmpTableName(streamName); this.storageClient = storageClient; this.gcsConfig = gcsConfig; + this.filenameGenerator = new StagingFilenameGenerator(streamName, MAX_PARTS_PER_FILE); } private String prepareGcsStagingFile() { - return String.join("/", stagingFolder, schemaName, Strings.addRandomSuffix("", "", 6) + "_" + streamName); + return String.join("/", stagingFolder, schemaName, filenameGenerator.getStagingFilename()); } @Override public String prepareStagingFile() { final var name = prepareGcsStagingFile(); - gcsStagingFiles.add(name); - final var blobId = BlobId.of(gcsConfig.getBucketName(), name); - final var blobInfo = BlobInfo.newBuilder(blobId).build(); - final var blob = storageClient.create(blobInfo); - final var channel = blob.writer(); - channels.put(name, channel); - final OutputStream outputStream = Channels.newOutputStream(channel); - - final var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8); - try { - csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT)); - } catch (final IOException e) { - throw new RuntimeException(e); + if (!gcsStagingFiles.contains(name)) { + gcsStagingFiles.add(name); + final var blobId = BlobId.of(gcsConfig.getBucketName(), name); + final var blobInfo = BlobInfo.newBuilder(blobId).build(); + final var blob = storageClient.create(blobInfo); + final var channel = blob.writer(); + channels.put(name, channel); + final OutputStream outputStream = Channels.newOutputStream(channel); + + final var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8); + try { + csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT)); + } catch (final IOException e) { + throw new RuntimeException(e); + } } return name; } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index f96c28011588..22f8cdfc9bb7 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -17,6 +17,7 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.DestinationSyncMode; @@ -46,8 +47,15 @@ public abstract class S3StreamCopier implements StreamCopier { // us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit. // WARNING: Too large a part size can cause potential OOM errors. public static final int DEFAULT_PART_SIZE_MB = 10; + // It is optimal to write every 10,000,000 records (BATCH_SIZE * DEFAULT_PART) to a new file. + // The BATCH_SIZE is defined in CopyConsumerFactory. + // The average size of such a file will be about 1 GB. + // This will make it easier to work with files and speed up the recording of large amounts of data. + // In addition, for a large number of records, we will not get a drop in the copy request to + // QUERY_TIMEOUT when + // the records from the file are copied to the staging table. + public static final int MAX_PARTS_PER_FILE = 1000; - public final Map filePrefixIndexMap = new HashMap<>(); protected final AmazonS3 s3Client; protected final S3Config s3Config; protected final String tmpTableName; @@ -63,6 +71,7 @@ public abstract class S3StreamCopier implements StreamCopier { private final Map csvPrinters = new HashMap<>(); private final String s3FileName; protected final String stagingFolder; + private final StagingFilenameGenerator filenameGenerator; public S3StreamCopier(final String stagingFolder, final DestinationSyncMode destSyncMode, @@ -85,49 +94,41 @@ public S3StreamCopier(final String stagingFolder, this.tmpTableName = nameTransformer.getTmpTableName(streamName); this.s3Client = client; this.s3Config = s3Config; + this.filenameGenerator = new StagingFilenameGenerator(streamName, MAX_PARTS_PER_FILE); } private String prepareS3StagingFile() { - return String.join("/", stagingFolder, schemaName, getFilePrefixIndex() + "_" + s3FileName); - } - - private Integer getFilePrefixIndex() { - int result = 0; - if (filePrefixIndexMap.containsKey(s3FileName)) { - result = filePrefixIndexMap.get(s3FileName) + 1; - filePrefixIndexMap.put(s3FileName, result); - } else { - filePrefixIndexMap.put(s3FileName, 0); - } - return result; + return String.join("/", stagingFolder, schemaName, filenameGenerator.getStagingFilename()); } @Override public String prepareStagingFile() { final var name = prepareS3StagingFile(); - s3StagingFiles.add(name); - LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize()); - // The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not - // have support for streaming multipart uploads; - // The alternative is first writing the entire output to disk before loading into S3. This is not - // feasible with large tables. - // Data is chunked into parts. A part is sent off to a queue to be uploaded once it has reached it's - // configured part size. - // Memory consumption is queue capacity * part size = 10 * 10 = 100 MB at current configurations. - final var manager = new StreamTransferManager(s3Config.getBucketName(), name, s3Client) - .numUploadThreads(DEFAULT_UPLOAD_THREADS) - .queueCapacity(DEFAULT_QUEUE_CAPACITY) - .partSize(s3Config.getPartSize()); - multipartUploadManagers.put(name, manager); - final var outputStream = manager.getMultiPartOutputStreams().get(0); - // We only need one output stream as we only have one input stream. This is reasonably performant. - // See the above comment. - outputStreams.put(name, outputStream); - final var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8); - try { - csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT)); - } catch (final IOException e) { - throw new RuntimeException(e); + if (!s3StagingFiles.contains(name)) { + s3StagingFiles.add(name); + LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize()); + // The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not + // have support for streaming multipart uploads; + // The alternative is first writing the entire output to disk before loading into S3. This is not + // feasible with large tables. + // Data is chunked into parts. A part is sent off to a queue to be uploaded once it has reached it's + // configured part size. + // Memory consumption is queue capacity * part size = 10 * 10 = 100 MB at current configurations. + final var manager = new StreamTransferManager(s3Config.getBucketName(), name, s3Client) + .numUploadThreads(DEFAULT_UPLOAD_THREADS) + .queueCapacity(DEFAULT_QUEUE_CAPACITY) + .partSize(s3Config.getPartSize()); + multipartUploadManagers.put(name, manager); + final var outputStream = manager.getMultiPartOutputStreams().get(0); + // We only need one output stream as we only have one input stream. This is reasonably performant. + // See the above comment. + outputStreams.put(name, outputStream); + final var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8); + try { + csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT)); + } catch (final IOException e) { + throw new RuntimeException(e); + } } return name; } @@ -207,7 +208,6 @@ public void removeFileAndDropTmpTable() throws Exception { LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName); sqlOperations.dropTableIfExists(db, schemaName, tmpTableName); LOGGER.info("{} tmp table in destination cleaned.", tmpTableName); - filePrefixIndexMap.clear(); } protected static String getFullS3Path(final String s3BucketName, final String s3StagingFile) { diff --git a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/StagingFilenameGeneratorTest.java b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/StagingFilenameGeneratorTest.java new file mode 100644 index 000000000000..90e65423525e --- /dev/null +++ b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/StagingFilenameGeneratorTest.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.jdbc; + +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.Test; + +class StagingFilenameGeneratorTest { + + private static final String STREAM_NAME = RandomStringUtils.randomAlphabetic(5).toLowerCase(); + private static final int MAX_PARTS_PER_FILE = 3; + private static final StagingFilenameGenerator FILENAME_GENERATOR = + new StagingFilenameGenerator(STREAM_NAME, MAX_PARTS_PER_FILE); + + @Test + public void testGetStagingFilename() { + // the file suffix increments after every MAX_PARTS_PER_FILE method calls + for (int suffix = 0; suffix < 10; ++suffix) { + for (int part = 0; part < MAX_PARTS_PER_FILE; ++part) { + assertEquals(STREAM_NAME + "_0000" + suffix, FILENAME_GENERATOR.getStagingFilename()); + } + } + for (int suffix = 10; suffix < 20; ++suffix) { + for (int part = 0; part < MAX_PARTS_PER_FILE; ++part) { + assertEquals(STREAM_NAME + "_000" + suffix, FILENAME_GENERATOR.getStagingFilename()); + } + } + } + +} diff --git a/airbyte-integrations/connectors/destination-redshift/Dockerfile b/airbyte-integrations/connectors/destination-redshift/Dockerfile index b62bbf3f360f..e69b69767764 100644 --- a/airbyte-integrations/connectors/destination-redshift/Dockerfile +++ b/airbyte-integrations/connectors/destination-redshift/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.17 +LABEL io.airbyte.version=0.3.18 LABEL io.airbyte.name=airbyte/destination-redshift diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index 4dc84d53cf8a..89fdb18238f1 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.15 +LABEL io.airbyte.version=0.3.16 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java index bbb6a9b9ac5e..c9c087c6ab97 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java @@ -9,16 +9,26 @@ import com.google.common.base.Preconditions; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.standardtest.destination.DataArgumentsProvider; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.nio.file.Path; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; public class SnowflakeInsertDestinationAcceptanceTest extends DestinationAcceptanceTest { @@ -133,4 +143,23 @@ protected void tearDown(final TestDestinationEnv testEnv) throws Exception { SnowflakeDatabase.getDatabase(baseConfig).execute(createSchemaQuery); } + /** + * This test is disabled because it is very slow, and should only be run manually for now. + */ + @Disabled + @ParameterizedTest + @ArgumentsSource(DataArgumentsProvider.class) + public void testSyncWithBillionRecords(final String messagesFilename, final String catalogFilename) throws Exception { + final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class); + final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); + final List messages = MoreResources.readResource(messagesFilename).lines() + .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); + + final List largeNumberRecords = + Collections.nCopies(15000000, messages).stream().flatMap(List::stream).collect(Collectors.toList()); + + final JsonNode config = getConfig(); + runSyncAndVerifyStateOutput(config, largeNumberRecords, configuredCatalog, false); + } + }