Skip to content

Commit

Permalink
combine S3Config and S3DestinationConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Dec 6, 2021
1 parent 9ab7fa7 commit 34dd5f0
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
@Override
public void checkPersistence(final JsonNode config) {
final DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config);
S3StreamCopier.attemptS3WriteAndDelete(databricksConfig.getS3DestinationConfig().getS3Config());
S3StreamCopier.attemptS3WriteAndDelete(databricksConfig.getS3DestinationConfig());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.s3.S3Config;
import io.airbyte.integrations.destination.s3.S3Destination;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.io.IOException;
Expand Down Expand Up @@ -50,7 +50,7 @@ public abstract class S3StreamCopier implements StreamCopier {
public static final int MAX_PARTS_PER_FILE = 1000;

protected final AmazonS3 s3Client;
protected final S3Config s3Config;
protected final S3DestinationConfig s3Config;
protected final String tmpTableName;
private final DestinationSyncMode destSyncMode;
protected final String schemaName;
Expand All @@ -67,15 +67,15 @@ public abstract class S3StreamCopier implements StreamCopier {
private final StagingFilenameGenerator filenameGenerator;

public S3StreamCopier(final String stagingFolder,
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final String s3FileName,
final AmazonS3 client,
final JdbcDatabase db,
final S3Config s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final String s3FileName,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
this.destSyncMode = destSyncMode;
this.schemaName = schema;
this.streamName = streamName;
Expand Down Expand Up @@ -224,15 +224,15 @@ private void closeAndWaitForUpload() throws IOException {
LOGGER.info("All data for {} stream uploaded.", streamName);
}

public static void attemptS3WriteAndDelete(final S3Config s3Config) {
public static void attemptS3WriteAndDelete(final S3DestinationConfig s3Config) {
S3Destination.attemptS3WriteAndDelete(s3Config, "");
}

public abstract void copyS3CsvFileIntoTable(JdbcDatabase database,
String s3FileLocation,
String schema,
String tableName,
S3Config s3Config)
String s3FileLocation,
String schema,
String tableName,
S3DestinationConfig s3Config)
throws SQLException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,25 @@
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory;
import io.airbyte.integrations.destination.s3.S3Config;
import io.airbyte.integrations.destination.s3.S3Destination;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;

public abstract class S3StreamCopierFactory implements StreamCopierFactory<S3Config> {
public abstract class S3StreamCopierFactory implements StreamCopierFactory<S3DestinationConfig> {

/**
* Used by the copy consumer.
*/
@Override
public StreamCopier create(final String configuredSchema,
final S3Config s3Config,
final String stagingFolder,
final ConfiguredAirbyteStream configuredStream,
final ExtendedNameTransformer nameTransformer,
final JdbcDatabase db,
final SqlOperations sqlOperations) {
final S3DestinationConfig s3Config,
final String stagingFolder,
final ConfiguredAirbyteStream configuredStream,
final ExtendedNameTransformer nameTransformer,
final JdbcDatabase db,
final SqlOperations sqlOperations) {
try {
final AirbyteStream stream = configuredStream.getStream();
final DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode();
Expand All @@ -45,14 +45,14 @@ public StreamCopier create(final String configuredSchema,
* For specific copier suppliers to implement.
*/
public abstract StreamCopier create(String stagingFolder,
DestinationSyncMode syncMode,
String schema,
String streamName,
AmazonS3 s3Client,
JdbcDatabase db,
S3Config s3Config,
ExtendedNameTransformer nameTransformer,
SqlOperations sqlOperations)
DestinationSyncMode syncMode,
String schema,
String streamName,
AmazonS3 s3Client,
JdbcDatabase db,
S3DestinationConfig s3Config,
ExtendedNameTransformer nameTransformer,
SqlOperations sqlOperations)
throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
import io.airbyte.integrations.destination.s3.S3Config;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
Expand Down Expand Up @@ -42,15 +42,15 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
getDatabase(config),
getSqlOperations(),
getNameTransformer(),
getS3Config(config),
getS3DestinationConfig(config),
catalog,
new RedshiftStreamCopierFactory(),
getConfiguredSchema(config));
}

@Override
public void checkPersistence(final JsonNode config) throws Exception {
S3StreamCopier.attemptS3WriteAndDelete(getS3Config(config));
S3StreamCopier.attemptS3WriteAndDelete(getS3DestinationConfig(config));
}

@Override
Expand All @@ -72,8 +72,8 @@ private String getConfiguredSchema(final JsonNode config) {
return config.get("schema").asText();
}

private S3Config getS3Config(final JsonNode config) {
return S3Config.getS3Config(config);
private S3DestinationConfig getS3DestinationConfig(final JsonNode config) {
return S3DestinationConfig.getS3DestinationConfig(config);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
import io.airbyte.integrations.destination.redshift.manifest.Entry;
import io.airbyte.integrations.destination.redshift.manifest.Manifest;
import io.airbyte.integrations.destination.s3.S3Config;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -31,14 +31,14 @@ public class RedshiftStreamCopier extends S3StreamCopier {
private String manifestFilePath = null;

public RedshiftStreamCopier(final String stagingFolder,
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final AmazonS3 client,
final JdbcDatabase db,
final S3Config s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", FILE_PREFIX_LENGTH) + "_" + streamName,
client, db, s3Config, nameTransformer, sqlOperations);
objectMapper = new ObjectMapper();
Expand All @@ -56,11 +56,11 @@ public void copyStagingFileToTemporaryTable() {

@Override
public void copyS3CsvFileIntoTable(
final JdbcDatabase database,
final String s3FileLocation,
final String schema,
final String tableName,
final S3Config s3Config) {
final JdbcDatabase database,
final String s3FileLocation,
final String schema,
final String tableName,
final S3DestinationConfig s3Config) {
throw new RuntimeException("Redshift Stream Copier should not copy individual files without use of a manifest");
}

Expand Down Expand Up @@ -127,7 +127,7 @@ private void executeCopy(final String manifestPath) {
getFullS3Path(s3Config.getBucketName(), manifestPath),
s3Config.getAccessKeyId(),
s3Config.getSecretAccessKey(),
s3Config.getRegion());
s3Config.getBucketRegion());

Exceptions.toRuntime(() -> db.execute(copyQuery));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopierFactory;
import io.airbyte.integrations.destination.s3.S3Config;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.DestinationSyncMode;

public class RedshiftStreamCopierFactory extends S3StreamCopierFactory {

@Override
public StreamCopier create(final String stagingFolder,
final DestinationSyncMode syncMode,
final String schema,
final String streamName,
final AmazonS3 s3Client,
final JdbcDatabase db,
final S3Config s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations)
final DestinationSyncMode syncMode,
final String schema,
final String streamName,
final AmazonS3 s3Client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations)
throws Exception {
return new RedshiftStreamCopier(stagingFolder, syncMode, schema, streamName, s3Client, db, s3Config, nameTransformer, sqlOperations);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static void main(final String[] args) throws Exception {
@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try {
attemptS3WriteAndDelete(S3Config.getS3Config(config), config.get("s3_bucket_path").asText());
attemptS3WriteAndDelete(S3DestinationConfig.getS3DestinationConfig(config), config.get("s3_bucket_path").asText());
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception attempting to access the S3 bucket: ", e);
Expand All @@ -56,23 +56,23 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
return new S3Consumer(S3DestinationConfig.getS3DestinationConfig(config), configuredCatalog, formatterFactory, outputRecordCollector);
}

public static void attemptS3WriteAndDelete(final S3Config s3Config, final String bucketPath) {
public static void attemptS3WriteAndDelete(final S3DestinationConfig s3Config, final String bucketPath) {
final var prefix = bucketPath.isEmpty() ? "" : bucketPath + (bucketPath.endsWith("/") ? "" : "/");
final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "");
attemptWriteAndDeleteS3Object(s3Config, outputTableName);
}

private static void attemptWriteAndDeleteS3Object(final S3Config s3Config, final String outputTableName) {
private static void attemptWriteAndDeleteS3Object(final S3DestinationConfig s3Config, final String outputTableName) {
final var s3 = getAmazonS3(s3Config);
final var s3Bucket = s3Config.getBucketName();

s3.putObject(s3Bucket, outputTableName, "check-content");
s3.deleteObject(s3Bucket, outputTableName);
}

public static AmazonS3 getAmazonS3(final S3Config s3Config) {
public static AmazonS3 getAmazonS3(final S3DestinationConfig s3Config) {
final var endpoint = s3Config.getEndpoint();
final var region = s3Config.getRegion();
final var region = s3Config.getBucketRegion();
final var accessKeyId = s3Config.getAccessKeyId();
final var secretAccessKey = s3Config.getSecretAccessKey();

Expand All @@ -81,7 +81,7 @@ public static AmazonS3 getAmazonS3(final S3Config s3Config) {
if (endpoint.isEmpty()) {
return AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.withRegion(s3Config.getRegion())
.withRegion(s3Config.getBucketRegion())
.build();

} else {
Expand Down
Loading

0 comments on commit 34dd5f0

Please sign in to comment.