Skip to content

Commit

Permalink
🎉 Destination Redshift (copy): accept bucket path for staging data (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored and schlattk committed Jan 4, 2022
1 parent 9378801 commit 63eb203
Show file tree
Hide file tree
Showing 42 changed files with 1,796 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.21
dockerImageTag: 0.3.22
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
- name: Rockset
Expand All @@ -161,7 +161,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.1.16
dockerImageTag: 0.2.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
icon: s3.svg
- name: SFTP-JSON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2918,7 +2918,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.21"
- dockerImage: "airbyte/destination-redshift:0.3.22"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
Expand Down Expand Up @@ -2979,6 +2979,13 @@
>AWS docs</a> for more details."
examples:
- "airbyte.staging"
s3_bucket_path:
title: "S3 Bucket Path"
type: "string"
description: "The directory under the S3 bucket where data will be written.\
\ If not provided, then defaults to the root directory."
examples:
- "data_sync/test"
s3_bucket_region:
title: "S3 Bucket Region"
type: "string"
Expand Down Expand Up @@ -3086,7 +3093,7 @@
supported_destination_sync_modes:
- "append"
- "overwrite"
- dockerImage: "airbyte/destination-s3:0.1.16"
- dockerImage: "airbyte/destination-s3:0.2.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.s3.LegacyS3StreamCopier;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter;
Expand All @@ -23,9 +24,8 @@
import org.slf4j.LoggerFactory;

/**
* This implementation is similar to
* {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier}. The difference is that
* this implementation creates Parquet staging files, instead of CSV ones.
* This implementation is similar to {@link LegacyS3StreamCopier}. The difference is that this
* implementation creates Parquet staging files, instead of CSV ones.
* <p>
* </p>
* It does the following operations:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class GcsAvroWriter extends BaseGcsWriter implements S3Writer {
private final StreamTransferManager uploadManager;
private final MultiPartOutputStream outputStream;
private final DataFileWriter<GenericData.Record> dataFileWriter;
private final String objectKey;

public GcsAvroWriter(final GcsDestinationConfig config,
final AmazonS3 s3Client,
Expand All @@ -47,7 +48,7 @@ public GcsAvroWriter(final GcsDestinationConfig config,
super(config, s3Client, configuredStream);

final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.AVRO);
final String objectKey = String.join("/", outputPrefix, outputFilename);
objectKey = String.join("/", outputPrefix, outputFilename);

LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(),
objectKey);
Expand Down Expand Up @@ -85,4 +86,9 @@ protected void closeWhenFail() throws IOException {
uploadManager.abort();
}

@Override
public String getOutputPath() {
return objectKey;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class GcsCsvWriter extends BaseGcsWriter implements S3Writer {
private final MultiPartOutputStream outputStream;
private final CSVPrinter csvPrinter;
private final String gcsCsvFileLocation; // this used in destination-bigquery (GCS upload type)
private final String objectKey;

public GcsCsvWriter(final GcsDestinationConfig config,
final AmazonS3 s3Client,
Expand All @@ -48,7 +49,7 @@ public GcsCsvWriter(final GcsDestinationConfig config,
this.csvSheetGenerator = CsvSheetGenerator.Factory.create(configuredStream.getStream().getJsonSchema(), formatConfig);

final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.CSV);
final String objectKey = String.join("/", outputPrefix, outputFilename);
objectKey = String.join("/", outputPrefix, outputFilename);
gcsCsvFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey);

LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(),
Expand Down Expand Up @@ -90,4 +91,9 @@ public CSVPrinter getCsvPrinter() {
return csvPrinter;
}

@Override
public String getOutputPath() {
return objectKey;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class GcsJsonlWriter extends BaseGcsWriter implements S3Writer {
private final StreamTransferManager uploadManager;
private final MultiPartOutputStream outputStream;
private final PrintWriter printWriter;
private final String objectKey;

public GcsJsonlWriter(final GcsDestinationConfig config,
final AmazonS3 s3Client,
Expand All @@ -43,7 +44,7 @@ public GcsJsonlWriter(final GcsDestinationConfig config,
super(config, s3Client, configuredStream);

final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.JSONL);
final String objectKey = String.join("/", outputPrefix, outputFilename);
objectKey = String.join("/", outputPrefix, outputFilename);

LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), objectKey);

Expand Down Expand Up @@ -78,4 +79,9 @@ protected void closeWhenFail() {
uploadManager.abort();
}

@Override
public String getOutputPath() {
return objectKey;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig;
import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter;
Expand Down Expand Up @@ -37,10 +36,10 @@ public class GcsParquetWriter extends BaseGcsWriter implements S3Writer {

private static final Logger LOGGER = LoggerFactory.getLogger(GcsParquetWriter.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final ObjectWriter WRITER = MAPPER.writer();

private final ParquetWriter<Record> parquetWriter;
private final AvroRecordFactory avroRecordFactory;
private final String objectKey;

public GcsParquetWriter(final GcsDestinationConfig config,
final AmazonS3 s3Client,
Expand All @@ -52,7 +51,7 @@ public GcsParquetWriter(final GcsDestinationConfig config,
super(config, s3Client, configuredStream);

final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.PARQUET);
final String objectKey = String.join("/", outputPrefix, outputFilename);
objectKey = String.join("/", outputPrefix, outputFilename);
LOGGER.info("Storage path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), objectKey);

final URI uri = new URI(String.format("s3a://%s/%s/%s", config.getBucketName(), outputPrefix, outputFilename));
Expand Down Expand Up @@ -109,4 +108,9 @@ public void close(final boolean hasFailed) throws IOException {
}
}

@Override
public String getOutputPath() {
return objectKey;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.gcs.avro;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;

import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.io.IOException;
import java.sql.Timestamp;
import java.time.Instant;
import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;

class GcsAvroWriterTest {

@Test
public void generatesCorrectObjectPath() throws IOException {
final GcsAvroWriter writer = new GcsAvroWriter(
new GcsDestinationConfig(
"fake-bucket",
"fake-bucketPath",
"fake-bucketRegion",
null,
new S3AvroFormatConfig(new ObjectMapper().createObjectNode())),
mock(AmazonS3.class, RETURNS_DEEP_STUBS),
new ConfiguredAirbyteStream()
.withStream(new AirbyteStream()
.withNamespace("fake-namespace")
.withName("fake-stream")),
Timestamp.from(Instant.ofEpochMilli(1234)),
mock(Schema.class),
null);

assertEquals("fake-bucketPath/fake_namespace/fake_stream/1970_01_01_1234_0.avro", writer.getOutputPath());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'

testImplementation "org.testcontainers:postgresql:1.15.3"
testImplementation "org.mockito:mockito-inline:4.1.0"

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation "org.testcontainers:postgresql:1.15.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class CopyConsumerFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(CopyConsumerFactory.class);

private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256 mib
private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256 MiB

public static <T> AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public interface StreamCopier {
/**
* Creates the staging file and all the necessary items to write data to this file.
*
* @return the name of the staging file
* @return A string that unqiuely identifies the file. E.g. the filename, or a unique suffix that is
* appended to a shared filename prefix
*/
String prepareStagingFile();

Expand Down
Loading

0 comments on commit 63eb203

Please sign in to comment.