Skip to content

Commit

Permalink
move S3Config into destination-s3; update dependencies accordingly (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Dec 10, 2021
1 parent fefd575 commit fc91f67
Show file tree
Hide file tree
Showing 21 changed files with 249 additions and 380 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
import io.airbyte.integrations.destination.s3.S3Destination;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
Expand All @@ -30,8 +30,8 @@ public static void main(final String[] args) throws Exception {

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config);
return CopyConsumerFactory.create(
outputRecordCollector,
Expand All @@ -47,7 +47,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
@Override
public void checkPersistence(final JsonNode config) {
final DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config);
S3StreamCopier.attemptS3WriteAndDelete(databricksConfig.getS3DestinationConfig().getS3Config());
S3Destination.attemptS3WriteAndDelete(databricksConfig.getS3DestinationConfig(), "");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies {

implementation project(':airbyte-db:lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:connectors:destination-s3')
implementation project(':airbyte-protocol:models')

implementation 'org.apache.commons:commons-lang3:3.11'
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,15 @@

import alex.mojaki.s3upload.MultiPartOutputStream;
import alex.mojaki.s3upload.StreamTransferManager;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.io.IOException;
Expand All @@ -43,10 +39,6 @@ public abstract class S3StreamCopier implements StreamCopier {

private static final int DEFAULT_UPLOAD_THREADS = 10; // The S3 cli uses 10 threads by default.
private static final int DEFAULT_QUEUE_CAPACITY = DEFAULT_UPLOAD_THREADS;
// The smallest part size is 5MB. An S3 upload can be maximally formed of 10,000 parts. This gives
// us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit.
// WARNING: Too large a part size can cause potential OOM errors.
public static final int DEFAULT_PART_SIZE_MB = 10;
// It is optimal to write every 10,000,000 records (BATCH_SIZE * DEFAULT_PART) to a new file.
// The BATCH_SIZE is defined in CopyConsumerFactory.
// The average size of such a file will be about 1 GB.
Expand All @@ -57,7 +49,7 @@ public abstract class S3StreamCopier implements StreamCopier {
public static final int MAX_PARTS_PER_FILE = 1000;

protected final AmazonS3 s3Client;
protected final S3Config s3Config;
protected final S3DestinationConfig s3Config;
protected final String tmpTableName;
private final DestinationSyncMode destSyncMode;
protected final String schemaName;
Expand All @@ -74,15 +66,15 @@ public abstract class S3StreamCopier implements StreamCopier {
private final StagingFilenameGenerator filenameGenerator;

public S3StreamCopier(final String stagingFolder,
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final String s3FileName,
final AmazonS3 client,
final JdbcDatabase db,
final S3Config s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final String s3FileName,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
this.destSyncMode = destSyncMode;
this.schemaName = schema;
this.streamName = streamName;
Expand Down Expand Up @@ -231,58 +223,11 @@ private void closeAndWaitForUpload() throws IOException {
LOGGER.info("All data for {} stream uploaded.", streamName);
}

public static void attemptS3WriteAndDelete(final S3Config s3Config) {
attemptS3WriteAndDelete(s3Config, "");
}

public static void attemptS3WriteAndDelete(final S3Config s3Config, final String bucketPath) {
final var prefix = bucketPath.isEmpty() ? "" : bucketPath + (bucketPath.endsWith("/") ? "" : "/");
final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "");
attemptWriteAndDeleteS3Object(s3Config, outputTableName);
}

private static void attemptWriteAndDeleteS3Object(final S3Config s3Config, final String outputTableName) {
final var s3 = getAmazonS3(s3Config);
final var s3Bucket = s3Config.getBucketName();

s3.putObject(s3Bucket, outputTableName, "check-content");
s3.deleteObject(s3Bucket, outputTableName);
}

public static AmazonS3 getAmazonS3(final S3Config s3Config) {
final var endpoint = s3Config.getEndpoint();
final var region = s3Config.getRegion();
final var accessKeyId = s3Config.getAccessKeyId();
final var secretAccessKey = s3Config.getSecretAccessKey();

final var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey);

if (endpoint.isEmpty()) {
return AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.withRegion(s3Config.getRegion())
.build();

} else {

final ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setSignerOverride("AWSS3V4SignerType");

return AmazonS3ClientBuilder
.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region))
.withPathStyleAccessEnabled(true)
.withClientConfiguration(clientConfiguration)
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build();
}
}

public abstract void copyS3CsvFileIntoTable(JdbcDatabase database,
String s3FileLocation,
String schema,
String tableName,
S3Config s3Config)
String s3FileLocation,
String schema,
String tableName,
S3DestinationConfig s3Config)
throws SQLException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,29 @@
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;

public abstract class S3StreamCopierFactory implements StreamCopierFactory<S3Config> {
public abstract class S3StreamCopierFactory implements StreamCopierFactory<S3DestinationConfig> {

/**
* Used by the copy consumer.
*/
@Override
public StreamCopier create(final String configuredSchema,
final S3Config s3Config,
final String stagingFolder,
final ConfiguredAirbyteStream configuredStream,
final ExtendedNameTransformer nameTransformer,
final JdbcDatabase db,
final SqlOperations sqlOperations) {
final S3DestinationConfig s3Config,
final String stagingFolder,
final ConfiguredAirbyteStream configuredStream,
final ExtendedNameTransformer nameTransformer,
final JdbcDatabase db,
final SqlOperations sqlOperations) {
try {
final AirbyteStream stream = configuredStream.getStream();
final DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode();
final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer);
final AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config);
final AmazonS3 s3Client = s3Config.getS3Client();

return create(stagingFolder, syncMode, schema, stream.getName(), s3Client, db, s3Config, nameTransformer, sqlOperations);
} catch (final Exception e) {
Expand All @@ -43,14 +44,14 @@ public StreamCopier create(final String configuredSchema,
* For specific copier suppliers to implement.
*/
public abstract StreamCopier create(String stagingFolder,
DestinationSyncMode syncMode,
String schema,
String streamName,
AmazonS3 s3Client,
JdbcDatabase db,
S3Config s3Config,
ExtendedNameTransformer nameTransformer,
SqlOperations sqlOperations)
DestinationSyncMode syncMode,
String schema,
String streamName,
AmazonS3 s3Client,
JdbcDatabase db,
S3DestinationConfig s3Config,
ExtendedNameTransformer nameTransformer,
SqlOperations sqlOperations)
throws Exception;

}
Loading

0 comments on commit fc91f67

Please sign in to comment.