From e57f679aa1eaf11a7d162d507766396462f6a481 Mon Sep 17 00:00:00 2001 From: Panhavad Date: Thu, 10 Jun 2021 15:15:48 +0000 Subject: [PATCH 1/7] base from prev PR --- .../destination/jdbc/copy/s3/S3Config.java | 10 ++++-- .../jdbc/copy/s3/S3StreamCopier.java | 29 +++++++++++++--- .../destination/s3/S3Consumer.java | 33 +++++++++++++++---- .../destination/s3/S3DestinationConfig.java | 8 +++++ .../src/main/resources/spec.json | 7 ++++ 5 files changed, 75 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java index e32d318167ade..af2f5c1fabc7c 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java @@ -27,14 +27,15 @@ import com.fasterxml.jackson.databind.JsonNode; public class S3Config { - + private final String endPoint; private final String bucketName; private final String accessKeyId; private final String secretAccessKey; private final String region; private final Integer partSize; - public S3Config(String bucketName, String accessKeyId, String secretAccessKey, String region, Integer partSize) { + public S3Config(String endPoint, String bucketName, String accessKeyId, String secretAccessKey, String region, Integer partSize) { + this.endPoint = endPoint; this.bucketName = bucketName; this.accessKeyId = accessKeyId; this.secretAccessKey = secretAccessKey; @@ -42,6 +43,10 @@ public S3Config(String bucketName, String accessKeyId, String secretAccessKey, S this.partSize = partSize; } + public String getEndPoint() { + return endPoint; + } + public String getBucketName() { return bucketName; } @@ -68,6 +73,7 @@ public static S3Config getS3Config(JsonNode config) { partSize = config.get("part_size").asInt(); } return new S3Config( + config.get("s3_endpoint").asText(), config.get("s3_bucket_name").asText(), config.get("access_key_id").asText(), config.get("secret_access_key").asText(), diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index 5c75873306392..71f2b63522333 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -26,8 +26,10 @@ import alex.mojaki.s3upload.MultiPartOutputStream; import alex.mojaki.s3upload.StreamTransferManager; +import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import io.airbyte.db.jdbc.JdbcDatabase; @@ -211,13 +213,32 @@ private static void attemptWriteAndDeleteS3Object(S3Config s3Config, String outp } public static AmazonS3 getAmazonS3(S3Config s3Config) { + var endPoint = s3Config.getEndPoint(); + var region = s3Config.getRegion(); var accessKeyId = s3Config.getAccessKeyId(); var secretAccessKey = s3Config.getSecretAccessKey(); + var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); - return AmazonS3ClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .withRegion(s3Config.getRegion()) - .build(); + + if (endPoint.equals("aws")) { + return AmazonS3ClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .withRegion(s3Config.getRegion()) + .build(); + + } else { + + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setSignerOverride("AWSS3V4SignerType"); + + return AmazonS3ClientBuilder + .standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endPoint, region)) + .withPathStyleAccessEnabled(true) + .withClientConfiguration(clientConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); + } } public abstract void copyS3CsvFileIntoTable(JdbcDatabase database, diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java index 84b8efb4a2b43..65654da303689 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java @@ -24,9 +24,11 @@ package io.airbyte.integrations.destination.s3; +import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import io.airbyte.commons.json.Jsons; @@ -67,12 +69,31 @@ public S3Consumer(S3DestinationConfig s3DestinationConfig, @Override protected void startTracked() throws Exception { - AWSCredentials awsCreds = new BasicAWSCredentials(s3DestinationConfig.getAccessKeyId(), - s3DestinationConfig.getSecretAccessKey()); - AmazonS3 s3Client = AmazonS3ClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .withRegion(s3DestinationConfig.getBucketRegion()) - .build(); + + var endPoint = s3DestinationConfig.getEndPoint(); + + AWSCredentials awsCreds = new BasicAWSCredentials(s3DestinationConfig.getAccessKeyId(), s3DestinationConfig.getSecretAccessKey()); + AmazonS3 s3Client = null; + + if (endPoint.equals("aws")) { + s3Client = AmazonS3ClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .withRegion(s3DestinationConfig.getBucketRegion()) + .build(); + + } else { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setSignerOverride("AWSS3V4SignerType"); + + s3Client = AmazonS3ClientBuilder + .standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endPoint, s3DestinationConfig.getBucketRegion())) + .withPathStyleAccessEnabled(true) + .withClientConfiguration(clientConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); + } + Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); for (ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java index 64c9899905fb1..345f80e302a42 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java @@ -28,6 +28,7 @@ public class S3DestinationConfig { + private final String endPoint; private final String bucketName; private final String bucketPath; private final String bucketRegion; @@ -36,12 +37,14 @@ public class S3DestinationConfig { private final S3FormatConfig formatConfig; public S3DestinationConfig( + String endPoint, String bucketName, String bucketPath, String bucketRegion, String accessKeyId, String secretAccessKey, S3FormatConfig formatConfig) { + this.endPoint = endPoint; this.bucketName = bucketName; this.bucketPath = bucketPath; this.bucketRegion = bucketRegion; @@ -52,6 +55,7 @@ public S3DestinationConfig( public static S3DestinationConfig getS3DestinationConfig(JsonNode config) { return new S3DestinationConfig( + config.get("s3_endpoint").asText(), config.get("s3_bucket_name").asText(), config.get("s3_bucket_path").asText(), config.get("s3_bucket_region").asText(), @@ -60,6 +64,10 @@ public static S3DestinationConfig getS3DestinationConfig(JsonNode config) { S3FormatConfigs.getS3FormatConfig(config)); } + public String getEndPoint() { + return endPoint; + } + public String getBucketName() { return bucketName; } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json index 4d9808c040787..e1f071ff3b748 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json @@ -7,6 +7,7 @@ "title": "S3 Destination Spec", "type": "object", "required": [ + "s3_endpoint", "s3_bucket_name", "s3_bucket_path", "s3_bucket_region", @@ -16,6 +17,12 @@ ], "additionalProperties": false, "properties": { + "s3_endpoint": { + "title": "Endpoint", + "type": "string", + "description": "this is end point url.( if you are working with aws, just type aws).", + "examples": ["localhost:9000", "aws"] + }, "s3_bucket_name": { "title": "S3 Bucket Name", "type": "string", From 4051b2c982e8b880ab99c1f85323304f78424feb Mon Sep 17 00:00:00 2001 From: Chensokheng <52232579+Chensokheng@users.noreply.github.com> Date: Thu, 10 Jun 2021 23:48:52 +0700 Subject: [PATCH 2/7] add s3 alternative destination connector feature --- .../destination/jdbc/copy/s3/S3Config.java | 11 ++++---- .../jdbc/copy/s3/S3StreamCopier.java | 6 ++--- .../destination/s3/S3Consumer.java | 6 ++--- .../destination/s3/S3DestinationConfig.java | 10 +++---- .../src/main/resources/spec.json | 4 +-- .../s3/S3CsvDestinationAcceptanceTest.java | 26 ++++++++++++++++--- .../s3/csv/S3CsvOutputFormatterTest.java | 2 +- 7 files changed, 42 insertions(+), 23 deletions(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java index af2f5c1fabc7c..417aea09c76cb 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java @@ -27,15 +27,16 @@ import com.fasterxml.jackson.databind.JsonNode; public class S3Config { - private final String endPoint; + + private final String endpoint; private final String bucketName; private final String accessKeyId; private final String secretAccessKey; private final String region; private final Integer partSize; - public S3Config(String endPoint, String bucketName, String accessKeyId, String secretAccessKey, String region, Integer partSize) { - this.endPoint = endPoint; + public S3Config(String endpoint, String bucketName, String accessKeyId, String secretAccessKey, String region, Integer partSize) { + this.endpoint = endpoint; this.bucketName = bucketName; this.accessKeyId = accessKeyId; this.secretAccessKey = secretAccessKey; @@ -43,8 +44,8 @@ public S3Config(String endPoint, String bucketName, String accessKeyId, String s this.partSize = partSize; } - public String getEndPoint() { - return endPoint; + public String getEndpoint() { + return endpoint; } public String getBucketName() { diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index 71f2b63522333..bcb179cb5bb6a 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -213,14 +213,14 @@ private static void attemptWriteAndDeleteS3Object(S3Config s3Config, String outp } public static AmazonS3 getAmazonS3(S3Config s3Config) { - var endPoint = s3Config.getEndPoint(); + var endpoint = s3Config.getEndpoint(); var region = s3Config.getRegion(); var accessKeyId = s3Config.getAccessKeyId(); var secretAccessKey = s3Config.getSecretAccessKey(); var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); - if (endPoint.equals("aws")) { + if (endpoint.equalsIgnoreCase("aws")) { return AmazonS3ClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) .withRegion(s3Config.getRegion()) @@ -233,7 +233,7 @@ public static AmazonS3 getAmazonS3(S3Config s3Config) { return AmazonS3ClientBuilder .standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endPoint, region)) + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region)) .withPathStyleAccessEnabled(true) .withClientConfiguration(clientConfiguration) .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java index 65654da303689..430f748cce3da 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java @@ -70,12 +70,12 @@ public S3Consumer(S3DestinationConfig s3DestinationConfig, @Override protected void startTracked() throws Exception { - var endPoint = s3DestinationConfig.getEndPoint(); + var endpoint = s3DestinationConfig.getEndpoint(); AWSCredentials awsCreds = new BasicAWSCredentials(s3DestinationConfig.getAccessKeyId(), s3DestinationConfig.getSecretAccessKey()); AmazonS3 s3Client = null; - if (endPoint.equals("aws")) { + if (endpoint.equalsIgnoreCase("aws")) { s3Client = AmazonS3ClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) .withRegion(s3DestinationConfig.getBucketRegion()) @@ -87,7 +87,7 @@ protected void startTracked() throws Exception { s3Client = AmazonS3ClientBuilder .standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endPoint, s3DestinationConfig.getBucketRegion())) + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, s3DestinationConfig.getBucketRegion())) .withPathStyleAccessEnabled(true) .withClientConfiguration(clientConfiguration) .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java index 345f80e302a42..2db6ffcfd450a 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java @@ -28,7 +28,7 @@ public class S3DestinationConfig { - private final String endPoint; + private final String endpoint; private final String bucketName; private final String bucketPath; private final String bucketRegion; @@ -37,14 +37,14 @@ public class S3DestinationConfig { private final S3FormatConfig formatConfig; public S3DestinationConfig( - String endPoint, + String endpoint, String bucketName, String bucketPath, String bucketRegion, String accessKeyId, String secretAccessKey, S3FormatConfig formatConfig) { - this.endPoint = endPoint; + this.endpoint = endpoint; this.bucketName = bucketName; this.bucketPath = bucketPath; this.bucketRegion = bucketRegion; @@ -64,8 +64,8 @@ public static S3DestinationConfig getS3DestinationConfig(JsonNode config) { S3FormatConfigs.getS3FormatConfig(config)); } - public String getEndPoint() { - return endPoint; + public String getEndpoint() { + return endpoint; } public String getBucketName() { diff --git a/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json index e1f071ff3b748..1948866d2a491 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json @@ -20,8 +20,8 @@ "s3_endpoint": { "title": "Endpoint", "type": "string", - "description": "this is end point url.( if you are working with aws, just type aws).", - "examples": ["localhost:9000", "aws"] + "description": "This is your s3 endpoint url.( if you are working with aws, just type aws).", + "examples": ["http://localhost:9000", "aws"] }, "s3_bucket_name": { "title": "S3 Bucket Name", diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.java index 30df1d5d184c2..da6d7ee9fe2b3 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.java @@ -24,9 +24,11 @@ package io.airbyte.integrations.destination.s3; +import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.DeleteObjectsRequest; @@ -213,12 +215,28 @@ protected void setup(TestDestinationEnv testEnv) { this.config = S3DestinationConfig.getS3DestinationConfig(configJson); LOGGER.info("Test full path: {}/{}", config.getBucketName(), config.getBucketPath()); + var endpoint = config.getEndpoint(); AWSCredentials awsCreds = new BasicAWSCredentials(config.getAccessKeyId(), config.getSecretAccessKey()); - this.s3Client = AmazonS3ClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .withRegion(config.getBucketRegion()) - .build(); + + if (endpoint.equalsIgnoreCase("aws")) { + this.s3Client = AmazonS3ClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .withRegion(config.getBucketRegion()) + .build(); + } else { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setSignerOverride("AWSS3V4SignerType"); + + this.s3Client = AmazonS3ClientBuilder + .standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, config.getBucketRegion())) + .withPathStyleAccessEnabled(true) + .withClientConfiguration(clientConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); + } + } @Override diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvOutputFormatterTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvOutputFormatterTest.java index c2d6972c3233b..73921df02a856 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvOutputFormatterTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvOutputFormatterTest.java @@ -52,7 +52,7 @@ public void testGetOutputPrefix() { public void testGetOutputFilename() { Timestamp timestamp = new Timestamp(1471461319000L); assertEquals( - "2016_08_17_1471461319000_0.csv", + "2016_08_18_1471461319000_0.csv", S3CsvOutputFormatter.getOutputFilename(timestamp)); } From 1dfe8b8661b06cc5c170a94ec7b1fff8bb2dafef Mon Sep 17 00:00:00 2001 From: Panhavad Date: Thu, 10 Jun 2021 18:43:46 +0000 Subject: [PATCH 3/7] fix testGetOutputFilename --- .../destination/s3/csv/S3CsvOutputFormatterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvOutputFormatterTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvOutputFormatterTest.java index 73921df02a856..c2d6972c3233b 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvOutputFormatterTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvOutputFormatterTest.java @@ -52,7 +52,7 @@ public void testGetOutputPrefix() { public void testGetOutputFilename() { Timestamp timestamp = new Timestamp(1471461319000L); assertEquals( - "2016_08_18_1471461319000_0.csv", + "2016_08_17_1471461319000_0.csv", S3CsvOutputFormatter.getOutputFilename(timestamp)); } From 874db87344e20591f5c0772757ae5649a1277d52 Mon Sep 17 00:00:00 2001 From: Panhavad Date: Fri, 11 Jun 2021 06:59:22 +0000 Subject: [PATCH 4/7] default using aws --- .../destination/jdbc/copy/s3/S3StreamCopier.java | 2 +- .../integrations/destination/s3/S3Consumer.java | 2 +- .../destination-s3/src/main/resources/spec.json | 6 +++--- .../s3/S3CsvDestinationAcceptanceTest.java | 2 +- docs/integrations/destinations/s3.md | 11 +++++++---- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index bcb179cb5bb6a..5ae62a174da9d 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -220,7 +220,7 @@ public static AmazonS3 getAmazonS3(S3Config s3Config) { var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); - if (endpoint.equalsIgnoreCase("aws")) { + if (endpoint.isEmpty()) { return AmazonS3ClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) .withRegion(s3Config.getRegion()) diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java index 430f748cce3da..3753d028dfa55 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java @@ -75,7 +75,7 @@ protected void startTracked() throws Exception { AWSCredentials awsCreds = new BasicAWSCredentials(s3DestinationConfig.getAccessKeyId(), s3DestinationConfig.getSecretAccessKey()); AmazonS3 s3Client = null; - if (endpoint.equalsIgnoreCase("aws")) { + if (endpoint.isEmpty()) { s3Client = AmazonS3ClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) .withRegion(s3DestinationConfig.getBucketRegion()) diff --git a/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json index 1948866d2a491..9895fe6d13957 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json @@ -7,7 +7,6 @@ "title": "S3 Destination Spec", "type": "object", "required": [ - "s3_endpoint", "s3_bucket_name", "s3_bucket_path", "s3_bucket_region", @@ -20,8 +19,9 @@ "s3_endpoint": { "title": "Endpoint", "type": "string", - "description": "This is your s3 endpoint url.( if you are working with aws, just type aws).", - "examples": ["http://localhost:9000", "aws"] + "default": "", + "description": "This is your S3 endpoint url.(if you are working with AWS S3, just leave empty).", + "examples": ["http://localhost:9000"] }, "s3_bucket_name": { "title": "S3 Bucket Name", diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.java index da6d7ee9fe2b3..dbd3bd84a4518 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.java @@ -219,7 +219,7 @@ protected void setup(TestDestinationEnv testEnv) { AWSCredentials awsCreds = new BasicAWSCredentials(config.getAccessKeyId(), config.getSecretAccessKey()); - if (endpoint.equalsIgnoreCase("aws")) { + if (endpoint.isEmpty()) { this.s3Client = AmazonS3ClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) .withRegion(config.getBucketRegion()) diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index 81711257e0471..e4c02b9665215 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -4,7 +4,7 @@ This destination writes data to S3 bucket. -The Airbyte S3 destination allows you to sync data to AWS S3. Each stream is written to its own directory under the bucket. +The Airbyte S3 destination allows you to sync data to AWS S3/ Minio S3. Each stream is written to its own directory under the bucket. ## Sync Mode @@ -18,11 +18,12 @@ The Airbyte S3 destination allows you to sync data to AWS S3. Each stream is wri | Parameter | Type | Notes | | :--- | :---: | :--- | +| S3 Endpoint | string | URL to S3, If using AWS S3 just leave blank. | | S3 Bucket Name | string | Name of the bucket to sync data into. | | S3 Bucket Path | string | Subdirectory under the above bucket to sync the data into. | | S3 Region | string | See [here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-available-regions) for all region codes. | -| Access Key ID | string | AWS credential. | -| Secret Access Key | string | AWS credential. | +| Access Key ID | string | AWS/Minio credential. | +| Secret Access Key | string | AWS/Minio credential. | | Format | object | Format specific configuration. See below for details. | ⚠️ Please note that under "Full Refresh Sync" mode, data in the configured bucket and path will be wiped out before each sync. We recommend you to provision a dedicated S3 resource for this sync to prevent unexpected data deletion from misconfiguration. ⚠️ @@ -104,12 +105,14 @@ With root level flattening, the output CSV is: ### Requirements -1. Allow connections from Airbyte server to your AWS cluster \(if they exist in separate VPCs\). +1. Allow connections from Airbyte server to your AWS S3/ Minio S3 cluster \(if they exist in separate VPCs\). 2. An S3 bucket with credentials \(for the COPY strategy\). ### Setup guide * Fill up S3 info + * **S3 Endpoint** + * Leave empty if using AWS S3, fill in S3 URL if using Minio S3. * **S3 Bucket Name** * See [this](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) to create an S3 bucket. * **S3 Bucket Region** From d0d37fa5da9ce9649c60b3496991bede9e94587c Mon Sep 17 00:00:00 2001 From: Duk Panhavad Date: Sun, 13 Jun 2021 14:29:21 +0700 Subject: [PATCH 5/7] Update airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java Co-authored-by: LiRen Tu --- .../airbyte/integrations/destination/jdbc/copy/s3/S3Config.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java index 417aea09c76cb..b70d26ac3a182 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java @@ -74,7 +74,7 @@ public static S3Config getS3Config(JsonNode config) { partSize = config.get("part_size").asInt(); } return new S3Config( - config.get("s3_endpoint").asText(), + config.get("s3_endpoint") == null ? "" : config.get("s3_endpoint").asText() config.get("s3_bucket_name").asText(), config.get("access_key_id").asText(), config.get("secret_access_key").asText(), From 5b3b32c1f84bfe5b53ab01e3d7bea1f8c497e1e2 Mon Sep 17 00:00:00 2001 From: Duk Panhavad Date: Sun, 13 Jun 2021 14:29:30 +0700 Subject: [PATCH 6/7] Update airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java Co-authored-by: LiRen Tu --- .../integrations/destination/s3/S3DestinationConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java index 2db6ffcfd450a..77ed6388444f5 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java @@ -55,7 +55,7 @@ public S3DestinationConfig( public static S3DestinationConfig getS3DestinationConfig(JsonNode config) { return new S3DestinationConfig( - config.get("s3_endpoint").asText(), + config.get("s3_endpoint") == null ? "" : config.get("s3_endpoint").asText() config.get("s3_bucket_name").asText(), config.get("s3_bucket_path").asText(), config.get("s3_bucket_region").asText(), From cea1018ad1303abc1770e0850e745dc401d8f078 Mon Sep 17 00:00:00 2001 From: Panhavad Date: Sun, 13 Jun 2021 07:43:22 +0000 Subject: [PATCH 7/7] fix missing "," --- .../airbyte/integrations/destination/jdbc/copy/s3/S3Config.java | 2 +- .../integrations/destination/s3/S3DestinationConfig.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java index b70d26ac3a182..3dd61940bdd77 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java @@ -74,7 +74,7 @@ public static S3Config getS3Config(JsonNode config) { partSize = config.get("part_size").asInt(); } return new S3Config( - config.get("s3_endpoint") == null ? "" : config.get("s3_endpoint").asText() + config.get("s3_endpoint") == null ? "" : config.get("s3_endpoint").asText(), config.get("s3_bucket_name").asText(), config.get("access_key_id").asText(), config.get("secret_access_key").asText(), diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java index 77ed6388444f5..5f055b3e6a3d4 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java @@ -55,7 +55,7 @@ public S3DestinationConfig( public static S3DestinationConfig getS3DestinationConfig(JsonNode config) { return new S3DestinationConfig( - config.get("s3_endpoint") == null ? "" : config.get("s3_endpoint").asText() + config.get("s3_endpoint") == null ? "" : config.get("s3_endpoint").asText(), config.get("s3_bucket_name").asText(), config.get("s3_bucket_path").asText(), config.get("s3_bucket_region").asText(),