Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move S3Config into destination-s3; update dependencies accordingly #8562

Merged
merged 11 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
import io.airbyte.integrations.destination.s3.S3Destination;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
Expand All @@ -30,8 +30,8 @@ public static void main(final String[] args) throws Exception {

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config);
return CopyConsumerFactory.create(
outputRecordCollector,
Expand All @@ -47,7 +47,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
@Override
public void checkPersistence(final JsonNode config) {
final DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config);
S3StreamCopier.attemptS3WriteAndDelete(databricksConfig.getS3DestinationConfig().getS3Config());
S3Destination.attemptS3WriteAndDelete(databricksConfig.getS3DestinationConfig(), "");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies {

implementation project(':airbyte-db:lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:connectors:destination-s3')
tuliren marked this conversation as resolved.
Show resolved Hide resolved
implementation project(':airbyte-protocol:models')

implementation 'org.apache.commons:commons-lang3:3.11'
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,15 @@

import alex.mojaki.s3upload.MultiPartOutputStream;
import alex.mojaki.s3upload.StreamTransferManager;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.io.IOException;
Expand All @@ -43,10 +39,6 @@ public abstract class S3StreamCopier implements StreamCopier {

private static final int DEFAULT_UPLOAD_THREADS = 10; // The S3 cli uses 10 threads by default.
private static final int DEFAULT_QUEUE_CAPACITY = DEFAULT_UPLOAD_THREADS;
// The smallest part size is 5MB. An S3 upload can be maximally formed of 10,000 parts. This gives
// us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit.
// WARNING: Too large a part size can cause potential OOM errors.
public static final int DEFAULT_PART_SIZE_MB = 10;
// It is optimal to write every 10,000,000 records (BATCH_SIZE * DEFAULT_PART) to a new file.
// The BATCH_SIZE is defined in CopyConsumerFactory.
// The average size of such a file will be about 1 GB.
Expand All @@ -57,7 +49,7 @@ public abstract class S3StreamCopier implements StreamCopier {
public static final int MAX_PARTS_PER_FILE = 1000;

protected final AmazonS3 s3Client;
protected final S3Config s3Config;
protected final S3DestinationConfig s3Config;
protected final String tmpTableName;
private final DestinationSyncMode destSyncMode;
protected final String schemaName;
Expand All @@ -74,15 +66,15 @@ public abstract class S3StreamCopier implements StreamCopier {
private final StagingFilenameGenerator filenameGenerator;

public S3StreamCopier(final String stagingFolder,
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final String s3FileName,
final AmazonS3 client,
final JdbcDatabase db,
final S3Config s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final String s3FileName,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
this.destSyncMode = destSyncMode;
this.schemaName = schema;
this.streamName = streamName;
Expand Down Expand Up @@ -231,58 +223,11 @@ private void closeAndWaitForUpload() throws IOException {
LOGGER.info("All data for {} stream uploaded.", streamName);
}

public static void attemptS3WriteAndDelete(final S3Config s3Config) {
attemptS3WriteAndDelete(s3Config, "");
}

public static void attemptS3WriteAndDelete(final S3Config s3Config, final String bucketPath) {
final var prefix = bucketPath.isEmpty() ? "" : bucketPath + (bucketPath.endsWith("/") ? "" : "/");
final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "");
attemptWriteAndDeleteS3Object(s3Config, outputTableName);
}

private static void attemptWriteAndDeleteS3Object(final S3Config s3Config, final String outputTableName) {
final var s3 = getAmazonS3(s3Config);
final var s3Bucket = s3Config.getBucketName();

s3.putObject(s3Bucket, outputTableName, "check-content");
s3.deleteObject(s3Bucket, outputTableName);
}

public static AmazonS3 getAmazonS3(final S3Config s3Config) {
final var endpoint = s3Config.getEndpoint();
final var region = s3Config.getRegion();
final var accessKeyId = s3Config.getAccessKeyId();
final var secretAccessKey = s3Config.getSecretAccessKey();

final var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey);

if (endpoint.isEmpty()) {
return AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.withRegion(s3Config.getRegion())
.build();

} else {

final ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setSignerOverride("AWSS3V4SignerType");

return AmazonS3ClientBuilder
.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region))
.withPathStyleAccessEnabled(true)
.withClientConfiguration(clientConfiguration)
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build();
}
}

public abstract void copyS3CsvFileIntoTable(JdbcDatabase database,
String s3FileLocation,
String schema,
String tableName,
S3Config s3Config)
String s3FileLocation,
String schema,
String tableName,
S3DestinationConfig s3Config)
throws SQLException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,29 @@
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;

public abstract class S3StreamCopierFactory implements StreamCopierFactory<S3Config> {
public abstract class S3StreamCopierFactory implements StreamCopierFactory<S3DestinationConfig> {

/**
* Used by the copy consumer.
*/
@Override
public StreamCopier create(final String configuredSchema,
final S3Config s3Config,
final String stagingFolder,
final ConfiguredAirbyteStream configuredStream,
final ExtendedNameTransformer nameTransformer,
final JdbcDatabase db,
final SqlOperations sqlOperations) {
final S3DestinationConfig s3Config,
final String stagingFolder,
final ConfiguredAirbyteStream configuredStream,
final ExtendedNameTransformer nameTransformer,
final JdbcDatabase db,
final SqlOperations sqlOperations) {
try {
final AirbyteStream stream = configuredStream.getStream();
final DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode();
final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer);
final AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config);
final AmazonS3 s3Client = s3Config.getS3Client();

return create(stagingFolder, syncMode, schema, stream.getName(), s3Client, db, s3Config, nameTransformer, sqlOperations);
} catch (final Exception e) {
Expand All @@ -43,14 +44,14 @@ public StreamCopier create(final String configuredSchema,
* For specific copier suppliers to implement.
*/
public abstract StreamCopier create(String stagingFolder,
DestinationSyncMode syncMode,
String schema,
String streamName,
AmazonS3 s3Client,
JdbcDatabase db,
S3Config s3Config,
ExtendedNameTransformer nameTransformer,
SqlOperations sqlOperations)
DestinationSyncMode syncMode,
String schema,
String streamName,
AmazonS3 s3Client,
JdbcDatabase db,
S3DestinationConfig s3Config,
ExtendedNameTransformer nameTransformer,
SqlOperations sqlOperations)
throws Exception;

}
Loading