diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java index baccf3fa2fde..4ea0529a85ce 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java @@ -13,7 +13,7 @@ import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory; import io.airbyte.integrations.destination.jdbc.copy.CopyDestination; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; +import io.airbyte.integrations.destination.s3.S3Destination; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.function.Consumer; @@ -30,8 +30,8 @@ public static void main(final String[] args) throws Exception { @Override public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) { + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) { final DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config); return CopyConsumerFactory.create( outputRecordCollector, @@ -47,7 +47,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, @Override public void checkPersistence(final JsonNode config) { final DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config); - S3StreamCopier.attemptS3WriteAndDelete(databricksConfig.getS3DestinationConfig().getS3Config()); + S3Destination.attemptS3WriteAndDelete(databricksConfig.getS3DestinationConfig(), ""); } @Override diff --git a/airbyte-integrations/connectors/destination-jdbc/build.gradle b/airbyte-integrations/connectors/destination-jdbc/build.gradle index 86be56686e97..fb46472e57f0 100644 --- a/airbyte-integrations/connectors/destination-jdbc/build.gradle +++ b/airbyte-integrations/connectors/destination-jdbc/build.gradle @@ -10,6 +10,7 @@ dependencies { implementation project(':airbyte-db:lib') implementation project(':airbyte-integrations:bases:base-java') + implementation project(':airbyte-integrations:connectors:destination-s3') implementation project(':airbyte-protocol:models') implementation 'org.apache.commons:commons-lang3:3.11' diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java deleted file mode 100644 index 7a43610953e2..000000000000 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.jdbc.copy.s3; - -import com.fasterxml.jackson.databind.JsonNode; - -public class S3Config { - - private final String endpoint; - private final String bucketName; - private final String accessKeyId; - private final String secretAccessKey; - private final String region; - private final Integer partSize; - - public S3Config( - final String endpoint, - final String bucketName, - final String accessKeyId, - final String secretAccessKey, - final String region, - final Integer partSize) { - this.endpoint = endpoint; - this.bucketName = bucketName; - this.accessKeyId = accessKeyId; - this.secretAccessKey = secretAccessKey; - this.region = region; - this.partSize = partSize; - } - - public String getEndpoint() { - return endpoint; - } - - public String getBucketName() { - return bucketName; - } - - public String getAccessKeyId() { - return accessKeyId; - } - - public String getSecretAccessKey() { - return secretAccessKey; - } - - public String getRegion() { - return region; - } - - public Integer getPartSize() { - return partSize; - } - - public static S3Config getS3Config(final JsonNode config) { - var partSize = S3StreamCopier.DEFAULT_PART_SIZE_MB; - if (config.get("part_size") != null) { - partSize = config.get("part_size").asInt(); - } - return new S3Config( - config.get("s3_endpoint") == null ? "" : config.get("s3_endpoint").asText(), - config.get("s3_bucket_name").asText(), - config.get("access_key_id").asText(), - config.get("secret_access_key").asText(), - config.get("s3_bucket_region").asText(), - partSize); - } - -} diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index 22f8cdfc9bb7..43911c4bd7ff 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -6,12 +6,7 @@ import alex.mojaki.s3upload.MultiPartOutputStream; import alex.mojaki.s3upload.StreamTransferManager; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.db.jdbc.JdbcDatabase; @@ -19,6 +14,7 @@ import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.DestinationSyncMode; import java.io.IOException; @@ -43,10 +39,6 @@ public abstract class S3StreamCopier implements StreamCopier { private static final int DEFAULT_UPLOAD_THREADS = 10; // The S3 cli uses 10 threads by default. private static final int DEFAULT_QUEUE_CAPACITY = DEFAULT_UPLOAD_THREADS; - // The smallest part size is 5MB. An S3 upload can be maximally formed of 10,000 parts. This gives - // us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit. - // WARNING: Too large a part size can cause potential OOM errors. - public static final int DEFAULT_PART_SIZE_MB = 10; // It is optimal to write every 10,000,000 records (BATCH_SIZE * DEFAULT_PART) to a new file. // The BATCH_SIZE is defined in CopyConsumerFactory. // The average size of such a file will be about 1 GB. @@ -57,7 +49,7 @@ public abstract class S3StreamCopier implements StreamCopier { public static final int MAX_PARTS_PER_FILE = 1000; protected final AmazonS3 s3Client; - protected final S3Config s3Config; + protected final S3DestinationConfig s3Config; protected final String tmpTableName; private final DestinationSyncMode destSyncMode; protected final String schemaName; @@ -74,15 +66,15 @@ public abstract class S3StreamCopier implements StreamCopier { private final StagingFilenameGenerator filenameGenerator; public S3StreamCopier(final String stagingFolder, - final DestinationSyncMode destSyncMode, - final String schema, - final String streamName, - final String s3FileName, - final AmazonS3 client, - final JdbcDatabase db, - final S3Config s3Config, - final ExtendedNameTransformer nameTransformer, - final SqlOperations sqlOperations) { + final DestinationSyncMode destSyncMode, + final String schema, + final String streamName, + final String s3FileName, + final AmazonS3 client, + final JdbcDatabase db, + final S3DestinationConfig s3Config, + final ExtendedNameTransformer nameTransformer, + final SqlOperations sqlOperations) { this.destSyncMode = destSyncMode; this.schemaName = schema; this.streamName = streamName; @@ -231,58 +223,11 @@ private void closeAndWaitForUpload() throws IOException { LOGGER.info("All data for {} stream uploaded.", streamName); } - public static void attemptS3WriteAndDelete(final S3Config s3Config) { - attemptS3WriteAndDelete(s3Config, ""); - } - - public static void attemptS3WriteAndDelete(final S3Config s3Config, final String bucketPath) { - final var prefix = bucketPath.isEmpty() ? "" : bucketPath + (bucketPath.endsWith("/") ? "" : "/"); - final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""); - attemptWriteAndDeleteS3Object(s3Config, outputTableName); - } - - private static void attemptWriteAndDeleteS3Object(final S3Config s3Config, final String outputTableName) { - final var s3 = getAmazonS3(s3Config); - final var s3Bucket = s3Config.getBucketName(); - - s3.putObject(s3Bucket, outputTableName, "check-content"); - s3.deleteObject(s3Bucket, outputTableName); - } - - public static AmazonS3 getAmazonS3(final S3Config s3Config) { - final var endpoint = s3Config.getEndpoint(); - final var region = s3Config.getRegion(); - final var accessKeyId = s3Config.getAccessKeyId(); - final var secretAccessKey = s3Config.getSecretAccessKey(); - - final var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); - - if (endpoint.isEmpty()) { - return AmazonS3ClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .withRegion(s3Config.getRegion()) - .build(); - - } else { - - final ClientConfiguration clientConfiguration = new ClientConfiguration(); - clientConfiguration.setSignerOverride("AWSS3V4SignerType"); - - return AmazonS3ClientBuilder - .standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region)) - .withPathStyleAccessEnabled(true) - .withClientConfiguration(clientConfiguration) - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .build(); - } - } - public abstract void copyS3CsvFileIntoTable(JdbcDatabase database, - String s3FileLocation, - String schema, - String tableName, - S3Config s3Config) + String s3FileLocation, + String schema, + String tableName, + S3DestinationConfig s3Config) throws SQLException; } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java index 29aad46767c7..2c03497f42fa 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java @@ -10,28 +10,29 @@ import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; -public abstract class S3StreamCopierFactory implements StreamCopierFactory { +public abstract class S3StreamCopierFactory implements StreamCopierFactory { /** * Used by the copy consumer. */ @Override public StreamCopier create(final String configuredSchema, - final S3Config s3Config, - final String stagingFolder, - final ConfiguredAirbyteStream configuredStream, - final ExtendedNameTransformer nameTransformer, - final JdbcDatabase db, - final SqlOperations sqlOperations) { + final S3DestinationConfig s3Config, + final String stagingFolder, + final ConfiguredAirbyteStream configuredStream, + final ExtendedNameTransformer nameTransformer, + final JdbcDatabase db, + final SqlOperations sqlOperations) { try { final AirbyteStream stream = configuredStream.getStream(); final DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode(); final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer); - final AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config); + final AmazonS3 s3Client = s3Config.getS3Client(); return create(stagingFolder, syncMode, schema, stream.getName(), s3Client, db, s3Config, nameTransformer, sqlOperations); } catch (final Exception e) { @@ -43,14 +44,14 @@ public StreamCopier create(final String configuredSchema, * For specific copier suppliers to implement. */ public abstract StreamCopier create(String stagingFolder, - DestinationSyncMode syncMode, - String schema, - String streamName, - AmazonS3 s3Client, - JdbcDatabase db, - S3Config s3Config, - ExtendedNameTransformer nameTransformer, - SqlOperations sqlOperations) + DestinationSyncMode syncMode, + String schema, + String streamName, + AmazonS3 s3Client, + JdbcDatabase db, + S3DestinationConfig s3Config, + ExtendedNameTransformer nameTransformer, + SqlOperations sqlOperations) throws Exception; } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/test-integration/java/io/airbyte/integrations/destination/jdbc/JdbcDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-jdbc/src/test-integration/java/io/airbyte/integrations/destination/jdbc/JdbcDestinationAcceptanceTest.java deleted file mode 100644 index a49b1664be4f..000000000000 --- a/airbyte-integrations/connectors/destination-jdbc/src/test-integration/java/io/airbyte/integrations/destination/jdbc/JdbcDestinationAcceptanceTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.jdbc; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.commons.json.Jsons; -import io.airbyte.db.Databases; -import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.integrations.base.JavaBaseConstants; -import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; -import org.testcontainers.containers.PostgreSQLContainer; - -public class JdbcDestinationAcceptanceTest extends DestinationAcceptanceTest { - - private PostgreSQLContainer db; - private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer(); - - @Override - protected String getImageName() { - return "airbyte/destination-jdbc:dev"; - } - - @Override - protected JsonNode getConfig() { - return Jsons.jsonNode(ImmutableMap.builder() - .put("username", db.getUsername()) - .put("password", db.getPassword()) - .put("schema", "public") - .put("jdbc_url", String.format("jdbc:postgresql://%s:%s/%s", - db.getHost(), - db.getFirstMappedPort(), - db.getDatabaseName())) - .build()); - } - - @Override - protected JsonNode getFailCheckConfig() { - return Jsons.jsonNode(ImmutableMap.builder() - .put("username", db.getUsername()) - .put("password", "wrong password") - .put("schema", "public") - .put("jdbc_url", String.format("jdbc:postgresql://%s:%s/%s", - db.getHost(), - db.getFirstMappedPort(), - db.getDatabaseName())) - .build()); - } - - @Override - protected List retrieveRecords(final TestDestinationEnv env, - final String streamName, - final String namespace, - final JsonNode streamSchema) - throws Exception { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namingResolver.getIdentifier(namespace)) - .stream() - .map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) - .collect(Collectors.toList()); - } - - @Override - protected List retrieveNormalizedRecords(final TestDestinationEnv env, final String streamName, final String namespace) - throws Exception { - String tableName = namingResolver.getIdentifier(streamName); - if (!tableName.startsWith("\"")) { - // Currently, Normalization always quote tables identifiers - tableName = "\"" + tableName + "\""; - } - return retrieveRecordsFromTable(tableName, namingResolver.getIdentifier(namespace)); - } - - @Override - protected List resolveIdentifier(final String identifier) { - final List result = new ArrayList<>(); - final String resolved = namingResolver.getIdentifier(identifier); - result.add(identifier); - result.add(resolved); - if (!resolved.startsWith("\"")) { - result.add(resolved.toLowerCase()); - result.add(resolved.toUpperCase()); - } - return result; - } - - private List retrieveRecordsFromTable(final String tableName, final String schema) throws SQLException { - return Databases.createPostgresDatabase(db.getUsername(), db.getPassword(), - db.getJdbcUrl()).query( - ctx -> ctx - .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schema, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) - .stream() - .map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat())) - .map(Jsons::deserialize) - .collect(Collectors.toList())); - } - - @Override - protected void setup(final TestDestinationEnv testEnv) { - db = new PostgreSQLContainer<>("postgres:13-alpine"); - db.start(); - } - - @Override - protected void tearDown(final TestDestinationEnv testEnv) { - db.stop(); - db.close(); - } - -} diff --git a/airbyte-integrations/connectors/destination-redshift/build.gradle b/airbyte-integrations/connectors/destination-redshift/build.gradle index bfb9867e23b8..237cc220373d 100644 --- a/airbyte-integrations/connectors/destination-redshift/build.gradle +++ b/airbyte-integrations/connectors/destination-redshift/build.gradle @@ -20,6 +20,7 @@ dependencies { implementation project(':airbyte-integrations:bases:base-java') implementation project(':airbyte-protocol:models') implementation project(':airbyte-integrations:connectors:destination-jdbc') + implementation project(':airbyte-integrations:connectors:destination-s3') implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) implementation 'com.amazonaws:aws-java-sdk-s3:1.11.978' diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyS3Destination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyS3Destination.java index 993885834282..74e43c05dc79 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyS3Destination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyS3Destination.java @@ -13,36 +13,33 @@ import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory; import io.airbyte.integrations.destination.jdbc.copy.CopyDestination; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; +import io.airbyte.integrations.destination.s3.S3Destination; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.function.Consumer; /** - * A more efficient Redshift Destination than the sql-based {@link RedshiftDestination}. Instead of - * inserting data as batched SQL INSERTs, we follow Redshift best practices and, 1) Stream the data - * to S3, creating multiple compressed files per stream. 2) Create a manifest file to load the data - * files in parallel. See: - * https://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-use-copy.html for more info. - * - * Creating multiple files per stream currently has the naive approach of one file per batch on a - * stream up to the max limit of (26 * 26 * 26) 17576 files. Each batch is randomly prefixed by 3 - * Alpha characters and on a collision the batch is appended to the existing file. + * A more efficient Redshift Destination than the sql-based {@link RedshiftDestination}. Instead of inserting data as batched SQL INSERTs, we follow + * Redshift best practices and, 1) Stream the data to S3, creating multiple compressed files per stream. 2) Create a manifest file to load the data + * files in parallel. See: https://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-use-copy.html for more info. + *

+ * Creating multiple files per stream currently has the naive approach of one file per batch on a stream up to the max limit of (26 * 26 * 26) 17576 + * files. Each batch is randomly prefixed by 3 Alpha characters and on a collision the batch is appended to the existing file. */ public class RedshiftCopyS3Destination extends CopyDestination { @Override public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) throws Exception { return CopyConsumerFactory.create( outputRecordCollector, getDatabase(config), getSqlOperations(), getNameTransformer(), - getS3Config(config), + getS3DestinationConfig(config), catalog, new RedshiftStreamCopierFactory(), getConfiguredSchema(config)); @@ -50,7 +47,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, @Override public void checkPersistence(final JsonNode config) throws Exception { - S3StreamCopier.attemptS3WriteAndDelete(getS3Config(config)); + S3Destination.attemptS3WriteAndDelete(getS3DestinationConfig(config), ""); } @Override @@ -72,8 +69,8 @@ private String getConfiguredSchema(final JsonNode config) { return config.get("schema").asText(); } - private S3Config getS3Config(final JsonNode config) { - return S3Config.getS3Config(config); + private S3DestinationConfig getS3DestinationConfig(final JsonNode config) { + return S3DestinationConfig.getS3DestinationConfig(config); } } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java index 348c3cc0c874..edce6fd09828 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java @@ -11,10 +11,10 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; import io.airbyte.integrations.destination.redshift.manifest.Entry; import io.airbyte.integrations.destination.redshift.manifest.Manifest; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.DestinationSyncMode; import java.util.Optional; import java.util.UUID; @@ -31,14 +31,14 @@ public class RedshiftStreamCopier extends S3StreamCopier { private String manifestFilePath = null; public RedshiftStreamCopier(final String stagingFolder, - final DestinationSyncMode destSyncMode, - final String schema, - final String streamName, - final AmazonS3 client, - final JdbcDatabase db, - final S3Config s3Config, - final ExtendedNameTransformer nameTransformer, - final SqlOperations sqlOperations) { + final DestinationSyncMode destSyncMode, + final String schema, + final String streamName, + final AmazonS3 client, + final JdbcDatabase db, + final S3DestinationConfig s3Config, + final ExtendedNameTransformer nameTransformer, + final SqlOperations sqlOperations) { super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", FILE_PREFIX_LENGTH) + "_" + streamName, client, db, s3Config, nameTransformer, sqlOperations); objectMapper = new ObjectMapper(); @@ -56,11 +56,11 @@ public void copyStagingFileToTemporaryTable() { @Override public void copyS3CsvFileIntoTable( - final JdbcDatabase database, - final String s3FileLocation, - final String schema, - final String tableName, - final S3Config s3Config) { + final JdbcDatabase database, + final String s3FileLocation, + final String schema, + final String tableName, + final S3DestinationConfig s3Config) { throw new RuntimeException("Redshift Stream Copier should not copy individual files without use of a manifest"); } @@ -127,7 +127,7 @@ private void executeCopy(final String manifestPath) { getFullS3Path(s3Config.getBucketName(), manifestPath), s3Config.getAccessKeyId(), s3Config.getSecretAccessKey(), - s3Config.getRegion()); + s3Config.getBucketRegion()); Exceptions.toRuntime(() -> db.execute(copyQuery)); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java index 7620512d50b1..d665eb7adb01 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java @@ -9,22 +9,22 @@ import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopierFactory; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.DestinationSyncMode; public class RedshiftStreamCopierFactory extends S3StreamCopierFactory { @Override public StreamCopier create(final String stagingFolder, - final DestinationSyncMode syncMode, - final String schema, - final String streamName, - final AmazonS3 s3Client, - final JdbcDatabase db, - final S3Config s3Config, - final ExtendedNameTransformer nameTransformer, - final SqlOperations sqlOperations) + final DestinationSyncMode syncMode, + final String schema, + final String streamName, + final AmazonS3 s3Client, + final JdbcDatabase db, + final S3DestinationConfig s3Config, + final ExtendedNameTransformer nameTransformer, + final SqlOperations sqlOperations) throws Exception { return new RedshiftStreamCopier(stagingFolder, syncMode, schema, streamName, s3Client, db, s3Config, nameTransformer, sqlOperations); } diff --git a/airbyte-integrations/connectors/destination-s3/build.gradle b/airbyte-integrations/connectors/destination-s3/build.gradle index 547e83765d54..c8a3fd05c5cf 100644 --- a/airbyte-integrations/connectors/destination-s3/build.gradle +++ b/airbyte-integrations/connectors/destination-s3/build.gradle @@ -13,7 +13,6 @@ dependencies { implementation project(':airbyte-config:models') implementation project(':airbyte-protocol:models') implementation project(':airbyte-integrations:bases:base-java') - implementation project(':airbyte-integrations:connectors:destination-jdbc') implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) // csv diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java index fafa4e43f8d4..1724c0600b3f 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java @@ -4,19 +4,20 @@ package io.airbyte.integrations.destination.s3; +import com.amazonaws.services.s3.AmazonS3; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; import io.airbyte.integrations.destination.s3.writer.ProductionWriterFactory; import io.airbyte.integrations.destination.s3.writer.S3WriterFactory; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.UUID; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +33,7 @@ public static void main(final String[] args) throws Exception { @Override public AirbyteConnectionStatus check(final JsonNode config) { try { - S3StreamCopier.attemptS3WriteAndDelete(S3Config.getS3Config(config), config.get("s3_bucket_path").asText()); + attemptS3WriteAndDelete(S3DestinationConfig.getS3DestinationConfig(config), config.get("s3_bucket_path").asText()); return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); } catch (final Exception e) { LOGGER.error("Exception attempting to access the S3 bucket: ", e); @@ -45,10 +46,30 @@ public AirbyteConnectionStatus check(final JsonNode config) { @Override public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog configuredCatalog, - final Consumer outputRecordCollector) { + final ConfiguredAirbyteCatalog configuredCatalog, + final Consumer outputRecordCollector) { final S3WriterFactory formatterFactory = new ProductionWriterFactory(); return new S3Consumer(S3DestinationConfig.getS3DestinationConfig(config), configuredCatalog, formatterFactory, outputRecordCollector); } + /** + * Note that this method completely ignores s3Config.getBucketPath(), in favor of the bucketPath parameter. + */ + public static void attemptS3WriteAndDelete(final S3DestinationConfig s3Config, final String bucketPath) { + attemptS3WriteAndDelete(s3Config, bucketPath, s3Config.getS3Client()); + } + + @VisibleForTesting + static void attemptS3WriteAndDelete(final S3DestinationConfig s3Config, final String bucketPath, final AmazonS3 s3) { + final var prefix = bucketPath.isEmpty() ? "" : bucketPath + (bucketPath.endsWith("/") ? "" : "/"); + final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""); + attemptWriteAndDeleteS3Object(s3Config, outputTableName, s3); + } + + private static void attemptWriteAndDeleteS3Object(final S3DestinationConfig s3Config, final String outputTableName, final AmazonS3 s3) { + final var s3Bucket = s3Config.getBucketName(); + + s3.putObject(s3Bucket, outputTableName, "check-content"); + s3.deleteObject(s3Bucket, outputTableName); + } } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java index 165bb7961c10..c1d6f18d99ac 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java @@ -12,31 +12,50 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; /** - * This class is similar to {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3Config}. It - * has an extra {@code bucketPath} parameter, which is necessary for more delicate data syncing to - * S3. + * An S3 configuration. Typical usage sets at most one of {@code bucketPath} (necessary for more delicate data syncing to S3) and {@code partSize} + * (used by certain bulk-load database operations). */ public class S3DestinationConfig { + // The smallest part size is 5MB. An S3 upload can be maximally formed of 10,000 parts. This gives + // us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit. + // WARNING: Too large a part size can cause potential OOM errors. + public static final int DEFAULT_PART_SIZE_MB = 10; + private final String endpoint; private final String bucketName; private final String bucketPath; private final String bucketRegion; private final String accessKeyId; private final String secretAccessKey; + private final Integer partSize; private final S3FormatConfig formatConfig; + /** + * The part size should not matter in any use case that depends on this constructor. So the default 10 MB is used. + */ + public S3DestinationConfig( + final String endpoint, + final String bucketName, + final String bucketPath, + final String bucketRegion, + final String accessKeyId, + final String secretAccessKey, + final S3FormatConfig formatConfig) { + this(endpoint, bucketName, bucketPath, bucketRegion, accessKeyId, secretAccessKey, DEFAULT_PART_SIZE_MB, formatConfig); + } + public S3DestinationConfig( - final String endpoint, - final String bucketName, - final String bucketPath, - final String bucketRegion, - final String accessKeyId, - final String secretAccessKey, - final S3FormatConfig formatConfig) { + final String endpoint, + final String bucketName, + final String bucketPath, + final String bucketRegion, + final String accessKeyId, + final String secretAccessKey, + final Integer partSize, + final S3FormatConfig formatConfig) { this.endpoint = endpoint; this.bucketName = bucketName; this.bucketPath = bucketPath; @@ -44,17 +63,33 @@ public S3DestinationConfig( this.accessKeyId = accessKeyId; this.secretAccessKey = secretAccessKey; this.formatConfig = formatConfig; + this.partSize = partSize; } public static S3DestinationConfig getS3DestinationConfig(final JsonNode config) { + var partSize = DEFAULT_PART_SIZE_MB; + if (config.get("part_size") != null) { + partSize = config.get("part_size").asInt(); + } + String bucketPath = null; + if (config.get("s3_bucket_path") != null) { + bucketPath = config.get("s3_bucket_path").asText(); + } + // In the "normal" S3 destination, this is never null. However, the Redshift and Snowflake copy destinations don't set a Format config. + S3FormatConfig format = null; + if (config.get("format") != null) { + format = S3FormatConfigs.getS3FormatConfig(config); + } return new S3DestinationConfig( config.get("s3_endpoint") == null ? "" : config.get("s3_endpoint").asText(), config.get("s3_bucket_name").asText(), - config.get("s3_bucket_path").asText(), + bucketPath, config.get("s3_bucket_region").asText(), config.get("access_key_id").asText(), config.get("secret_access_key").asText(), - S3FormatConfigs.getS3FormatConfig(config)); + partSize, + format + ); } public String getEndpoint() { @@ -81,6 +116,10 @@ public String getSecretAccessKey() { return secretAccessKey; } + public Integer getPartSize() { + return partSize; + } + public S3FormatConfig getFormatConfig() { return formatConfig; } @@ -106,13 +145,4 @@ public AmazonS3 getS3Client() { .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) .build(); } - - /** - * @return {@link S3Config} for convenience. The part size should not matter in any use case that - * gets an {@link S3Config} from this class. So the default 10 MB is used. - */ - public S3Config getS3Config() { - return new S3Config(endpoint, bucketName, accessKeyId, secretAccessKey, bucketRegion, 10); - } - } diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationTest.java new file mode 100644 index 000000000000..b06bfcfbc190 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationTest.java @@ -0,0 +1,52 @@ +package io.airbyte.integrations.destination.s3; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import com.amazonaws.services.s3.AmazonS3; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; +import org.mockito.Mockito; + +public class S3DestinationTest { + + private AmazonS3 s3; + private S3DestinationConfig config; + + @BeforeEach + public void setup() { + s3 = mock(AmazonS3.class); + config = new S3DestinationConfig( + "fake-endpoint", + "fake-bucket", + "fake-bucketPath", + "fake-region", + "fake-accessKeyId", + "fake-secretAccessKey", + null + ); + } + + @Test + public void createsThenDeletesTestFile() { + S3Destination.attemptS3WriteAndDelete(config, "fake-fileToWriteAndDelete", s3); + + // We want to enforce that putObject happens before deleteObject, so use inOrder.verify() + final InOrder inOrder = Mockito.inOrder(s3); + + final ArgumentCaptor testFileCaptor = ArgumentCaptor.forClass(String.class); + inOrder.verify(s3).putObject(eq("fake-bucket"), testFileCaptor.capture(), anyString()); + + final String testFile = testFileCaptor.getValue(); + assertTrue(testFile.startsWith("fake-fileToWriteAndDelete/_airbyte_connection_test_"), "testFile was actually " + testFile); + + inOrder.verify(s3).deleteObject("fake-bucket", testFile); + + verifyNoMoreInteractions(s3); + } +} diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index 723f756ad016..5fe4592ffb1f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -11,6 +11,9 @@ WORKDIR /airbyte ENV APPLICATION destination-snowflake +# Needed for JDK17 (in turn, needed on M1 macs) - see https://github.com/snowflakedb/snowflake-jdbc/issues/589#issuecomment-983944767 +ENV DESTINATION_SNOWFLAKE_OPTS "--add-opens java.base/java.nio=ALL-UNNAMED" + COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 diff --git a/airbyte-integrations/connectors/destination-snowflake/README.md b/airbyte-integrations/connectors/destination-snowflake/README.md index 6709d48b7006..b21bda7163d7 100644 --- a/airbyte-integrations/connectors/destination-snowflake/README.md +++ b/airbyte-integrations/connectors/destination-snowflake/README.md @@ -20,4 +20,10 @@ ## For Airbyte employees Put the contents of the `Snowflake Integration Test Config` secret on Rippling under the `Engineering` folder into `secrets/config.json` to be able to run integration tests locally. -The query timeout for insert data to table has been updated from 30 minutes to 3 hours. \ No newline at end of file + +1. Put the contents of the `destination snowflake - insert test creds` LastPass secret into `secrets/insert_config.json`. +1. Put the contents of the `destination snowflake - insert staging test creds` secret into `insert_staging_config.json`. +1. Put the contents of the `destination snowflake - gcs copy test creds` secret into `secrets/copy_gcs_config.json` +1. Put the contents of the `destination snowflake - s3 copy test creds` secret into `secrets/copy_s3_config.json` + +The query timeout for insert data to table has been updated from 30 minutes to 3 hours. diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index 2cf2ba71e413..c63d242b7d01 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -34,6 +34,7 @@ dependencies { implementation project(':airbyte-db:lib') implementation project(':airbyte-integrations:bases:base-java') implementation project(':airbyte-integrations:connectors:destination-jdbc') + implementation project(':airbyte-integrations:connectors:destination-s3') implementation project(':airbyte-protocol:models') integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java index 6089747ca723..b09107bb5448 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java @@ -11,8 +11,8 @@ import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory; import io.airbyte.integrations.destination.jdbc.copy.CopyDestination; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; +import io.airbyte.integrations.destination.s3.S3Destination; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.function.Consumer; @@ -21,14 +21,14 @@ public class SnowflakeCopyS3Destination extends CopyDestination { @Override public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) { + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) { return CopyConsumerFactory.create( outputRecordCollector, getDatabase(config), getSqlOperations(), getNameTransformer(), - getS3Config(config), + getS3DestinationConfig(config), catalog, new SnowflakeS3StreamCopierFactory(), getConfiguredSchema(config)); @@ -36,7 +36,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, @Override public void checkPersistence(final JsonNode config) { - S3StreamCopier.attemptS3WriteAndDelete(getS3Config(config)); + S3Destination.attemptS3WriteAndDelete(getS3DestinationConfig(config), ""); } @Override @@ -58,9 +58,9 @@ private String getConfiguredSchema(final JsonNode config) { return config.get("schema").asText(); } - private S3Config getS3Config(final JsonNode config) { + private S3DestinationConfig getS3DestinationConfig(final JsonNode config) { final JsonNode loadingMethod = config.get("loading_method"); - return S3Config.getS3Config(loadingMethod); + return S3DestinationConfig.getS3DestinationConfig(loadingMethod); } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java index 263ea49edca3..d23b5421ff34 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java @@ -9,8 +9,8 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.DestinationSyncMode; import java.sql.SQLException; import org.slf4j.Logger; @@ -22,25 +22,25 @@ public class SnowflakeS3StreamCopier extends S3StreamCopier { private static final int FILE_PREFIX_LENGTH = 5; public SnowflakeS3StreamCopier(final String stagingFolder, - final DestinationSyncMode destSyncMode, - final String schema, - final String streamName, - final AmazonS3 client, - final JdbcDatabase db, - final S3Config s3Config, - final ExtendedNameTransformer nameTransformer, - final SqlOperations sqlOperations) { + final DestinationSyncMode destSyncMode, + final String schema, + final String streamName, + final AmazonS3 client, + final JdbcDatabase db, + final S3DestinationConfig s3Config, + final ExtendedNameTransformer nameTransformer, + final SqlOperations sqlOperations) { super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", FILE_PREFIX_LENGTH) + "_" + streamName, client, db, s3Config, nameTransformer, sqlOperations); } @Override public void copyS3CsvFileIntoTable( - final JdbcDatabase database, - final String s3FileLocation, - final String schema, - final String tableName, - final S3Config s3Config) + final JdbcDatabase database, + final String s3FileLocation, + final String schema, + final String tableName, + final S3DestinationConfig s3Config) throws SQLException { final var copyQuery = String.format( "COPY INTO %s.%s FROM '%s' " diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierFactory.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierFactory.java index 90a6b6c5ee3c..767c475d429f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierFactory.java @@ -9,22 +9,22 @@ import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopierFactory; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.DestinationSyncMode; public class SnowflakeS3StreamCopierFactory extends S3StreamCopierFactory { @Override public StreamCopier create(final String stagingFolder, - final DestinationSyncMode syncMode, - final String schema, - final String streamName, - final AmazonS3 s3Client, - final JdbcDatabase db, - final S3Config s3Config, - final ExtendedNameTransformer nameTransformer, - final SqlOperations sqlOperations) + final DestinationSyncMode syncMode, + final String schema, + final String streamName, + final AmazonS3 s3Client, + final JdbcDatabase db, + final S3DestinationConfig s3Config, + final ExtendedNameTransformer nameTransformer, + final SqlOperations sqlOperations) throws Exception { return new SnowflakeS3StreamCopier(stagingFolder, syncMode, schema, streamName, s3Client, db, s3Config, nameTransformer, sqlOperations); } diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index 031569d1fa52..baf37448d547 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -25,8 +25,7 @@ Check out common troubleshooting issues for the S3 destination connector on our | S3 Region | string | See [here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-available-regions) for all region codes. | | Access Key ID | string | AWS/Minio credential. | | Secret Access Key | string | AWS/Minio credential. | -| Format | object | Format specific configuration. See below for details. | -| Part Size | integer | Arg to configure a block size. Max allowed blocks by S3 = 10,000, i.e. max stream size = blockSize \* 10,000 blocks. | +| Format | object | Format specific configuration. See the [spec](/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json) for details. | ⚠️ Please note that under "Full Refresh Sync" mode, data in the configured bucket and path will be wiped out before each sync. We recommend you to provision a dedicated S3 resource for this sync to prevent unexpected data deletion from misconfiguration. ⚠️