From 34dd5f0b3307e845e96c5b3900c5315800337db9 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Dec 2021 15:23:36 -0800 Subject: [PATCH] combine S3Config and S3DestinationConfig --- .../databricks/DatabricksDestination.java | 2 +- .../jdbc/copy/s3/S3StreamCopier.java | 32 ++++---- .../jdbc/copy/s3/S3StreamCopierFactory.java | 32 ++++---- .../redshift/RedshiftCopyS3Destination.java | 10 +-- .../redshift/RedshiftStreamCopier.java | 30 ++++---- .../redshift/RedshiftStreamCopierFactory.java | 18 ++--- .../integrations/destination/s3/S3Config.java | 76 ------------------- .../destination/s3/S3Destination.java | 12 +-- .../destination/s3/S3DestinationConfig.java | 57 +++++++++----- .../snowflake/SnowflakeCopyS3Destination.java | 10 +-- .../snowflake/SnowflakeS3StreamCopier.java | 28 +++---- .../SnowflakeS3StreamCopierFactory.java | 18 ++--- 12 files changed, 136 insertions(+), 189 deletions(-) delete mode 100644 airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Config.java diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java index baccf3fa2fde..8606b4cf9d47 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java @@ -47,7 +47,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, @Override public void checkPersistence(final JsonNode config) { final DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config); - S3StreamCopier.attemptS3WriteAndDelete(databricksConfig.getS3DestinationConfig().getS3Config()); + S3StreamCopier.attemptS3WriteAndDelete(databricksConfig.getS3DestinationConfig()); } @Override diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index 957890af076b..7b87efcdab15 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -14,8 +14,8 @@ import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; -import io.airbyte.integrations.destination.s3.S3Config; import io.airbyte.integrations.destination.s3.S3Destination; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.DestinationSyncMode; import java.io.IOException; @@ -50,7 +50,7 @@ public abstract class S3StreamCopier implements StreamCopier { public static final int MAX_PARTS_PER_FILE = 1000; protected final AmazonS3 s3Client; - protected final S3Config s3Config; + protected final S3DestinationConfig s3Config; protected final String tmpTableName; private final DestinationSyncMode destSyncMode; protected final String schemaName; @@ -67,15 +67,15 @@ public abstract class S3StreamCopier implements StreamCopier { private final StagingFilenameGenerator filenameGenerator; public S3StreamCopier(final String stagingFolder, - final DestinationSyncMode destSyncMode, - final String schema, - final String streamName, - final String s3FileName, - final AmazonS3 client, - final JdbcDatabase db, - final S3Config s3Config, - final ExtendedNameTransformer nameTransformer, - final SqlOperations sqlOperations) { + final DestinationSyncMode destSyncMode, + final String schema, + final String streamName, + final String s3FileName, + final AmazonS3 client, + final JdbcDatabase db, + final S3DestinationConfig s3Config, + final ExtendedNameTransformer nameTransformer, + final SqlOperations sqlOperations) { this.destSyncMode = destSyncMode; this.schemaName = schema; this.streamName = streamName; @@ -224,15 +224,15 @@ private void closeAndWaitForUpload() throws IOException { LOGGER.info("All data for {} stream uploaded.", streamName); } - public static void attemptS3WriteAndDelete(final S3Config s3Config) { + public static void attemptS3WriteAndDelete(final S3DestinationConfig s3Config) { S3Destination.attemptS3WriteAndDelete(s3Config, ""); } public abstract void copyS3CsvFileIntoTable(JdbcDatabase database, - String s3FileLocation, - String schema, - String tableName, - S3Config s3Config) + String s3FileLocation, + String schema, + String tableName, + S3DestinationConfig s3Config) throws SQLException; } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java index b2375f95c23e..eb20c22ec7ed 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java @@ -10,25 +10,25 @@ import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory; -import io.airbyte.integrations.destination.s3.S3Config; import io.airbyte.integrations.destination.s3.S3Destination; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; -public abstract class S3StreamCopierFactory implements StreamCopierFactory { +public abstract class S3StreamCopierFactory implements StreamCopierFactory { /** * Used by the copy consumer. */ @Override public StreamCopier create(final String configuredSchema, - final S3Config s3Config, - final String stagingFolder, - final ConfiguredAirbyteStream configuredStream, - final ExtendedNameTransformer nameTransformer, - final JdbcDatabase db, - final SqlOperations sqlOperations) { + final S3DestinationConfig s3Config, + final String stagingFolder, + final ConfiguredAirbyteStream configuredStream, + final ExtendedNameTransformer nameTransformer, + final JdbcDatabase db, + final SqlOperations sqlOperations) { try { final AirbyteStream stream = configuredStream.getStream(); final DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode(); @@ -45,14 +45,14 @@ public StreamCopier create(final String configuredSchema, * For specific copier suppliers to implement. */ public abstract StreamCopier create(String stagingFolder, - DestinationSyncMode syncMode, - String schema, - String streamName, - AmazonS3 s3Client, - JdbcDatabase db, - S3Config s3Config, - ExtendedNameTransformer nameTransformer, - SqlOperations sqlOperations) + DestinationSyncMode syncMode, + String schema, + String streamName, + AmazonS3 s3Client, + JdbcDatabase db, + S3DestinationConfig s3Config, + ExtendedNameTransformer nameTransformer, + SqlOperations sqlOperations) throws Exception; } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyS3Destination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyS3Destination.java index 1b0372130f6c..307b4887d3b3 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyS3Destination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyS3Destination.java @@ -14,7 +14,7 @@ import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory; import io.airbyte.integrations.destination.jdbc.copy.CopyDestination; import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; -import io.airbyte.integrations.destination.s3.S3Config; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.function.Consumer; @@ -42,7 +42,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, getDatabase(config), getSqlOperations(), getNameTransformer(), - getS3Config(config), + getS3DestinationConfig(config), catalog, new RedshiftStreamCopierFactory(), getConfiguredSchema(config)); @@ -50,7 +50,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, @Override public void checkPersistence(final JsonNode config) throws Exception { - S3StreamCopier.attemptS3WriteAndDelete(getS3Config(config)); + S3StreamCopier.attemptS3WriteAndDelete(getS3DestinationConfig(config)); } @Override @@ -72,8 +72,8 @@ private String getConfiguredSchema(final JsonNode config) { return config.get("schema").asText(); } - private S3Config getS3Config(final JsonNode config) { - return S3Config.getS3Config(config); + private S3DestinationConfig getS3DestinationConfig(final JsonNode config) { + return S3DestinationConfig.getS3DestinationConfig(config); } } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java index 216243ef7015..edce6fd09828 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java @@ -14,7 +14,7 @@ import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; import io.airbyte.integrations.destination.redshift.manifest.Entry; import io.airbyte.integrations.destination.redshift.manifest.Manifest; -import io.airbyte.integrations.destination.s3.S3Config; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.DestinationSyncMode; import java.util.Optional; import java.util.UUID; @@ -31,14 +31,14 @@ public class RedshiftStreamCopier extends S3StreamCopier { private String manifestFilePath = null; public RedshiftStreamCopier(final String stagingFolder, - final DestinationSyncMode destSyncMode, - final String schema, - final String streamName, - final AmazonS3 client, - final JdbcDatabase db, - final S3Config s3Config, - final ExtendedNameTransformer nameTransformer, - final SqlOperations sqlOperations) { + final DestinationSyncMode destSyncMode, + final String schema, + final String streamName, + final AmazonS3 client, + final JdbcDatabase db, + final S3DestinationConfig s3Config, + final ExtendedNameTransformer nameTransformer, + final SqlOperations sqlOperations) { super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", FILE_PREFIX_LENGTH) + "_" + streamName, client, db, s3Config, nameTransformer, sqlOperations); objectMapper = new ObjectMapper(); @@ -56,11 +56,11 @@ public void copyStagingFileToTemporaryTable() { @Override public void copyS3CsvFileIntoTable( - final JdbcDatabase database, - final String s3FileLocation, - final String schema, - final String tableName, - final S3Config s3Config) { + final JdbcDatabase database, + final String s3FileLocation, + final String schema, + final String tableName, + final S3DestinationConfig s3Config) { throw new RuntimeException("Redshift Stream Copier should not copy individual files without use of a manifest"); } @@ -127,7 +127,7 @@ private void executeCopy(final String manifestPath) { getFullS3Path(s3Config.getBucketName(), manifestPath), s3Config.getAccessKeyId(), s3Config.getSecretAccessKey(), - s3Config.getRegion()); + s3Config.getBucketRegion()); Exceptions.toRuntime(() -> db.execute(copyQuery)); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java index 98eb7f1747f1..d665eb7adb01 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java @@ -10,21 +10,21 @@ import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopierFactory; -import io.airbyte.integrations.destination.s3.S3Config; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.DestinationSyncMode; public class RedshiftStreamCopierFactory extends S3StreamCopierFactory { @Override public StreamCopier create(final String stagingFolder, - final DestinationSyncMode syncMode, - final String schema, - final String streamName, - final AmazonS3 s3Client, - final JdbcDatabase db, - final S3Config s3Config, - final ExtendedNameTransformer nameTransformer, - final SqlOperations sqlOperations) + final DestinationSyncMode syncMode, + final String schema, + final String streamName, + final AmazonS3 s3Client, + final JdbcDatabase db, + final S3DestinationConfig s3Config, + final ExtendedNameTransformer nameTransformer, + final SqlOperations sqlOperations) throws Exception { return new RedshiftStreamCopier(stagingFolder, syncMode, schema, streamName, s3Client, db, s3Config, nameTransformer, sqlOperations); } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Config.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Config.java deleted file mode 100644 index 5eb92ec8e558..000000000000 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Config.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.s3; - -import com.fasterxml.jackson.databind.JsonNode; - -public class S3Config { - - // The smallest part size is 5MB. An S3 upload can be maximally formed of 10,000 parts. This gives - // us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit. - // WARNING: Too large a part size can cause potential OOM errors. - public static final int DEFAULT_PART_SIZE_MB = 10; - - private final String endpoint; - private final String bucketName; - private final String accessKeyId; - private final String secretAccessKey; - private final String region; - private final Integer partSize; - - public S3Config( - final String endpoint, - final String bucketName, - final String accessKeyId, - final String secretAccessKey, - final String region, - final Integer partSize) { - this.endpoint = endpoint; - this.bucketName = bucketName; - this.accessKeyId = accessKeyId; - this.secretAccessKey = secretAccessKey; - this.region = region; - this.partSize = partSize; - } - - public String getEndpoint() { - return endpoint; - } - - public String getBucketName() { - return bucketName; - } - - public String getAccessKeyId() { - return accessKeyId; - } - - public String getSecretAccessKey() { - return secretAccessKey; - } - - public String getRegion() { - return region; - } - - public Integer getPartSize() { - return partSize; - } - - public static S3Config getS3Config(final JsonNode config) { - var partSize = DEFAULT_PART_SIZE_MB; - if (config.get("part_size") != null) { - partSize = config.get("part_size").asInt(); - } - return new S3Config( - config.get("s3_endpoint") == null ? "" : config.get("s3_endpoint").asText(), - config.get("s3_bucket_name").asText(), - config.get("access_key_id").asText(), - config.get("secret_access_key").asText(), - config.get("s3_bucket_region").asText(), - partSize); - } - -} diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java index dbb85437e3e4..d1d262becb02 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java @@ -37,7 +37,7 @@ public static void main(final String[] args) throws Exception { @Override public AirbyteConnectionStatus check(final JsonNode config) { try { - attemptS3WriteAndDelete(S3Config.getS3Config(config), config.get("s3_bucket_path").asText()); + attemptS3WriteAndDelete(S3DestinationConfig.getS3DestinationConfig(config), config.get("s3_bucket_path").asText()); return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); } catch (final Exception e) { LOGGER.error("Exception attempting to access the S3 bucket: ", e); @@ -56,13 +56,13 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, return new S3Consumer(S3DestinationConfig.getS3DestinationConfig(config), configuredCatalog, formatterFactory, outputRecordCollector); } - public static void attemptS3WriteAndDelete(final S3Config s3Config, final String bucketPath) { + public static void attemptS3WriteAndDelete(final S3DestinationConfig s3Config, final String bucketPath) { final var prefix = bucketPath.isEmpty() ? "" : bucketPath + (bucketPath.endsWith("/") ? "" : "/"); final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""); attemptWriteAndDeleteS3Object(s3Config, outputTableName); } - private static void attemptWriteAndDeleteS3Object(final S3Config s3Config, final String outputTableName) { + private static void attemptWriteAndDeleteS3Object(final S3DestinationConfig s3Config, final String outputTableName) { final var s3 = getAmazonS3(s3Config); final var s3Bucket = s3Config.getBucketName(); @@ -70,9 +70,9 @@ private static void attemptWriteAndDeleteS3Object(final S3Config s3Config, final s3.deleteObject(s3Bucket, outputTableName); } - public static AmazonS3 getAmazonS3(final S3Config s3Config) { + public static AmazonS3 getAmazonS3(final S3DestinationConfig s3Config) { final var endpoint = s3Config.getEndpoint(); - final var region = s3Config.getRegion(); + final var region = s3Config.getBucketRegion(); final var accessKeyId = s3Config.getAccessKeyId(); final var secretAccessKey = s3Config.getSecretAccessKey(); @@ -81,7 +81,7 @@ public static AmazonS3 getAmazonS3(final S3Config s3Config) { if (endpoint.isEmpty()) { return AmazonS3ClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .withRegion(s3Config.getRegion()) + .withRegion(s3Config.getBucketRegion()) .build(); } else { diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java index 582ff1fc2364..7eb6f699f527 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java @@ -14,26 +14,48 @@ import com.fasterxml.jackson.databind.JsonNode; /** - * This class is similar to {@link S3Config}. It has an extra {@code bucketPath} parameter, which is necessary for more delicate data syncing to S3. + * An S3 configuration. Typical usage sets at most one of {@code bucketPath} (necessary for more delicate data syncing to S3) and {@code partSize} + * (used by certain bulk-load database operations). */ public class S3DestinationConfig { + // The smallest part size is 5MB. An S3 upload can be maximally formed of 10,000 parts. This gives + // us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit. + // WARNING: Too large a part size can cause potential OOM errors. + public static final int DEFAULT_PART_SIZE_MB = 10; + private final String endpoint; private final String bucketName; private final String bucketPath; private final String bucketRegion; private final String accessKeyId; private final String secretAccessKey; + private final Integer partSize; private final S3FormatConfig formatConfig; + /** + * The part size should not matter in any use case that depends on this constructor. So the default 10 MB is used. + */ public S3DestinationConfig( - final String endpoint, - final String bucketName, - final String bucketPath, - final String bucketRegion, - final String accessKeyId, - final String secretAccessKey, - final S3FormatConfig formatConfig) { + final String endpoint, + final String bucketName, + final String bucketPath, + final String bucketRegion, + final String accessKeyId, + final String secretAccessKey, + final S3FormatConfig formatConfig) { + this(endpoint, bucketName, bucketPath, bucketRegion, accessKeyId, secretAccessKey, DEFAULT_PART_SIZE_MB, formatConfig); + } + + public S3DestinationConfig( + final String endpoint, + final String bucketName, + final String bucketPath, + final String bucketRegion, + final String accessKeyId, + final String secretAccessKey, + final Integer partSize, + final S3FormatConfig formatConfig) { this.endpoint = endpoint; this.bucketName = bucketName; this.bucketPath = bucketPath; @@ -41,9 +63,14 @@ public S3DestinationConfig( this.accessKeyId = accessKeyId; this.secretAccessKey = secretAccessKey; this.formatConfig = formatConfig; + this.partSize = partSize; } public static S3DestinationConfig getS3DestinationConfig(final JsonNode config) { + var partSize = DEFAULT_PART_SIZE_MB; + if (config.get("part_size") != null) { + partSize = config.get("part_size").asInt(); + } return new S3DestinationConfig( config.get("s3_endpoint") == null ? "" : config.get("s3_endpoint").asText(), config.get("s3_bucket_name").asText(), @@ -51,6 +78,7 @@ public static S3DestinationConfig getS3DestinationConfig(final JsonNode config) config.get("s3_bucket_region").asText(), config.get("access_key_id").asText(), config.get("secret_access_key").asText(), + partSize, S3FormatConfigs.getS3FormatConfig(config)); } @@ -78,6 +106,10 @@ public String getSecretAccessKey() { return secretAccessKey; } + public Integer getPartSize() { + return partSize; + } + public S3FormatConfig getFormatConfig() { return formatConfig; } @@ -103,13 +135,4 @@ public AmazonS3 getS3Client() { .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) .build(); } - - /** - * @return {@link S3Config} for convenience. The part size should not matter in any use case that - * gets an {@link S3Config} from this class. So the default 10 MB is used. - */ - public S3Config getS3Config() { - return new S3Config(endpoint, bucketName, accessKeyId, secretAccessKey, bucketRegion, 10); - } - } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java index db1936484b3b..53e996ce7a32 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java @@ -12,7 +12,7 @@ import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory; import io.airbyte.integrations.destination.jdbc.copy.CopyDestination; import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; -import io.airbyte.integrations.destination.s3.S3Config; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.function.Consumer; @@ -28,7 +28,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, getDatabase(config), getSqlOperations(), getNameTransformer(), - getS3Config(config), + getS3DestinationConfig(config), catalog, new SnowflakeS3StreamCopierFactory(), getConfiguredSchema(config)); @@ -36,7 +36,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, @Override public void checkPersistence(final JsonNode config) { - S3StreamCopier.attemptS3WriteAndDelete(getS3Config(config)); + S3StreamCopier.attemptS3WriteAndDelete(getS3DestinationConfig(config)); } @Override @@ -58,9 +58,9 @@ private String getConfiguredSchema(final JsonNode config) { return config.get("schema").asText(); } - private S3Config getS3Config(final JsonNode config) { + private S3DestinationConfig getS3DestinationConfig(final JsonNode config) { final JsonNode loadingMethod = config.get("loading_method"); - return S3Config.getS3Config(loadingMethod); + return S3DestinationConfig.getS3DestinationConfig(loadingMethod); } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java index ee1d0f79dc5e..d23b5421ff34 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java @@ -10,7 +10,7 @@ import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; -import io.airbyte.integrations.destination.s3.S3Config; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.DestinationSyncMode; import java.sql.SQLException; import org.slf4j.Logger; @@ -22,25 +22,25 @@ public class SnowflakeS3StreamCopier extends S3StreamCopier { private static final int FILE_PREFIX_LENGTH = 5; public SnowflakeS3StreamCopier(final String stagingFolder, - final DestinationSyncMode destSyncMode, - final String schema, - final String streamName, - final AmazonS3 client, - final JdbcDatabase db, - final S3Config s3Config, - final ExtendedNameTransformer nameTransformer, - final SqlOperations sqlOperations) { + final DestinationSyncMode destSyncMode, + final String schema, + final String streamName, + final AmazonS3 client, + final JdbcDatabase db, + final S3DestinationConfig s3Config, + final ExtendedNameTransformer nameTransformer, + final SqlOperations sqlOperations) { super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", FILE_PREFIX_LENGTH) + "_" + streamName, client, db, s3Config, nameTransformer, sqlOperations); } @Override public void copyS3CsvFileIntoTable( - final JdbcDatabase database, - final String s3FileLocation, - final String schema, - final String tableName, - final S3Config s3Config) + final JdbcDatabase database, + final String s3FileLocation, + final String schema, + final String tableName, + final S3DestinationConfig s3Config) throws SQLException { final var copyQuery = String.format( "COPY INTO %s.%s FROM '%s' " diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierFactory.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierFactory.java index 9bcccf37f48b..767c475d429f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierFactory.java @@ -10,21 +10,21 @@ import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopierFactory; -import io.airbyte.integrations.destination.s3.S3Config; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.DestinationSyncMode; public class SnowflakeS3StreamCopierFactory extends S3StreamCopierFactory { @Override public StreamCopier create(final String stagingFolder, - final DestinationSyncMode syncMode, - final String schema, - final String streamName, - final AmazonS3 s3Client, - final JdbcDatabase db, - final S3Config s3Config, - final ExtendedNameTransformer nameTransformer, - final SqlOperations sqlOperations) + final DestinationSyncMode syncMode, + final String schema, + final String streamName, + final AmazonS3 s3Client, + final JdbcDatabase db, + final S3DestinationConfig s3Config, + final ExtendedNameTransformer nameTransformer, + final SqlOperations sqlOperations) throws Exception { return new SnowflakeS3StreamCopier(stagingFolder, syncMode, schema, streamName, s3Client, db, s3Config, nameTransformer, sqlOperations); }