Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS S3 and Alternative S3 destination connector feature #4038

Merged
merged 7 commits into from
Jun 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,26 @@

public class S3Config {

private final String endpoint;
private final String bucketName;
private final String accessKeyId;
private final String secretAccessKey;
private final String region;
private final Integer partSize;

public S3Config(String bucketName, String accessKeyId, String secretAccessKey, String region, Integer partSize) {
public S3Config(String endpoint, String bucketName, String accessKeyId, String secretAccessKey, String region, Integer partSize) {
this.endpoint = endpoint;
this.bucketName = bucketName;
this.accessKeyId = accessKeyId;
this.secretAccessKey = secretAccessKey;
this.region = region;
this.partSize = partSize;
}

public String getEndpoint() {
return endpoint;
}

public String getBucketName() {
return bucketName;
}
Expand All @@ -68,6 +74,7 @@ public static S3Config getS3Config(JsonNode config) {
partSize = config.get("part_size").asInt();
}
return new S3Config(
config.get("s3_endpoint") == null ? "" : config.get("s3_endpoint").asText(),
config.get("s3_bucket_name").asText(),
config.get("access_key_id").asText(),
config.get("secret_access_key").asText(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@

import alex.mojaki.s3upload.MultiPartOutputStream;
import alex.mojaki.s3upload.StreamTransferManager;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import io.airbyte.db.jdbc.JdbcDatabase;
Expand Down Expand Up @@ -211,13 +213,32 @@ private static void attemptWriteAndDeleteS3Object(S3Config s3Config, String outp
}

public static AmazonS3 getAmazonS3(S3Config s3Config) {
var endpoint = s3Config.getEndpoint();
var region = s3Config.getRegion();
var accessKeyId = s3Config.getAccessKeyId();
var secretAccessKey = s3Config.getSecretAccessKey();

var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey);
return AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.withRegion(s3Config.getRegion())
.build();

if (endpoint.isEmpty()) {
return AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.withRegion(s3Config.getRegion())
.build();

} else {

ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setSignerOverride("AWSS3V4SignerType");

return AmazonS3ClientBuilder
.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region))
.withPathStyleAccessEnabled(true)
.withClientConfiguration(clientConfiguration)
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build();
}
}

public abstract void copyS3CsvFileIntoTable(JdbcDatabase database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

package io.airbyte.integrations.destination.s3;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -67,12 +69,31 @@ public S3Consumer(S3DestinationConfig s3DestinationConfig,

@Override
protected void startTracked() throws Exception {
AWSCredentials awsCreds = new BasicAWSCredentials(s3DestinationConfig.getAccessKeyId(),
s3DestinationConfig.getSecretAccessKey());
AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.withRegion(s3DestinationConfig.getBucketRegion())
.build();

var endpoint = s3DestinationConfig.getEndpoint();

AWSCredentials awsCreds = new BasicAWSCredentials(s3DestinationConfig.getAccessKeyId(), s3DestinationConfig.getSecretAccessKey());
AmazonS3 s3Client = null;

if (endpoint.isEmpty()) {
s3Client = AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.withRegion(s3DestinationConfig.getBucketRegion())
.build();

} else {
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setSignerOverride("AWSS3V4SignerType");

s3Client = AmazonS3ClientBuilder
.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, s3DestinationConfig.getBucketRegion()))
.withPathStyleAccessEnabled(true)
.withClientConfiguration(clientConfiguration)
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build();
}

Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());

for (ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

public class S3DestinationConfig {

private final String endpoint;
private final String bucketName;
private final String bucketPath;
private final String bucketRegion;
Expand All @@ -36,12 +37,14 @@ public class S3DestinationConfig {
private final S3FormatConfig formatConfig;

public S3DestinationConfig(
String endpoint,
String bucketName,
String bucketPath,
String bucketRegion,
String accessKeyId,
String secretAccessKey,
S3FormatConfig formatConfig) {
this.endpoint = endpoint;
this.bucketName = bucketName;
this.bucketPath = bucketPath;
this.bucketRegion = bucketRegion;
Expand All @@ -52,6 +55,7 @@ public S3DestinationConfig(

public static S3DestinationConfig getS3DestinationConfig(JsonNode config) {
return new S3DestinationConfig(
config.get("s3_endpoint") == null ? "" : config.get("s3_endpoint").asText(),
config.get("s3_bucket_name").asText(),
config.get("s3_bucket_path").asText(),
config.get("s3_bucket_region").asText(),
Expand All @@ -60,6 +64,10 @@ public static S3DestinationConfig getS3DestinationConfig(JsonNode config) {
S3FormatConfigs.getS3FormatConfig(config));
}

public String getEndpoint() {
return endpoint;
}

public String getBucketName() {
return bucketName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
],
"additionalProperties": false,
"properties": {
"s3_endpoint": {
"title": "Endpoint",
"type": "string",
"default": "",
"description": "This is your S3 endpoint url.(if you are working with AWS S3, just leave empty).",
"examples": ["http://localhost:9000"]
},
"s3_bucket_name": {
"title": "S3 Bucket Name",
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

package io.airbyte.integrations.destination.s3;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
Expand Down Expand Up @@ -213,12 +215,28 @@ protected void setup(TestDestinationEnv testEnv) {
this.config = S3DestinationConfig.getS3DestinationConfig(configJson);
LOGGER.info("Test full path: {}/{}", config.getBucketName(), config.getBucketPath());

var endpoint = config.getEndpoint();
AWSCredentials awsCreds = new BasicAWSCredentials(config.getAccessKeyId(),
config.getSecretAccessKey());
this.s3Client = AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.withRegion(config.getBucketRegion())
.build();

if (endpoint.isEmpty()) {
this.s3Client = AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.withRegion(config.getBucketRegion())
.build();
} else {
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setSignerOverride("AWSS3V4SignerType");

this.s3Client = AmazonS3ClientBuilder
.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, config.getBucketRegion()))
.withPathStyleAccessEnabled(true)
.withClientConfiguration(clientConfiguration)
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build();
}

}

@Override
Expand Down
11 changes: 7 additions & 4 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

This destination writes data to S3 bucket.

The Airbyte S3 destination allows you to sync data to AWS S3. Each stream is written to its own directory under the bucket.
The Airbyte S3 destination allows you to sync data to AWS S3/ Minio S3. Each stream is written to its own directory under the bucket.

## Sync Mode

Expand All @@ -18,11 +18,12 @@ The Airbyte S3 destination allows you to sync data to AWS S3. Each stream is wri

| Parameter | Type | Notes |
| :--- | :---: | :--- |
| S3 Endpoint | string | URL to S3, If using AWS S3 just leave blank. |
| S3 Bucket Name | string | Name of the bucket to sync data into. |
| S3 Bucket Path | string | Subdirectory under the above bucket to sync the data into. |
| S3 Region | string | See [here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-available-regions) for all region codes. |
| Access Key ID | string | AWS credential. |
| Secret Access Key | string | AWS credential. |
| Access Key ID | string | AWS/Minio credential. |
| Secret Access Key | string | AWS/Minio credential. |
| Format | object | Format specific configuration. See below for details. |

⚠️ Please note that under "Full Refresh Sync" mode, data in the configured bucket and path will be wiped out before each sync. We recommend you to provision a dedicated S3 resource for this sync to prevent unexpected data deletion from misconfiguration. ⚠️
Expand Down Expand Up @@ -104,12 +105,14 @@ With root level flattening, the output CSV is:

### Requirements

1. Allow connections from Airbyte server to your AWS cluster \(if they exist in separate VPCs\).
1. Allow connections from Airbyte server to your AWS S3/ Minio S3 cluster \(if they exist in separate VPCs\).
2. An S3 bucket with credentials \(for the COPY strategy\).

### Setup guide

* Fill up S3 info
* **S3 Endpoint**
* Leave empty if using AWS S3, fill in S3 URL if using Minio S3.
* **S3 Bucket Name**
* See [this](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) to create an S3 bucket.
* **S3 Bucket Region**
Expand Down