diff --git a/.github/workflows/publish-command.yml b/.github/workflows/publish-command.yml index e7f8de2da4b76..2d0f3ad151565 100644 --- a/.github/workflows/publish-command.yml +++ b/.github/workflows/publish-command.yml @@ -138,6 +138,7 @@ jobs: ZOOM_INTEGRATION_TEST_CREDS: ${{ secrets.ZOOM_INTEGRATION_TEST_CREDS }} PLAID_INTEGRATION_TEST_CREDS: ${{ secrets.PLAID_INTEGRATION_TEST_CREDS }} DESTINATION_S3_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_INTEGRATION_TEST_CREDS }} + DESTINATION_GCS_CREDS: ${{ secrets.DESTINATION_GCS_CREDS }} - run: | docker login -u airbytebot -p ${DOCKER_PASSWORD} ./tools/integrations/manage.sh publish airbyte-integrations/${{ github.event.inputs.connector }} ${{ github.event.inputs.run-tests }} diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml index 8c3984f01c457..479fd0970cba9 100644 --- a/.github/workflows/test-command.yml +++ b/.github/workflows/test-command.yml @@ -136,6 +136,7 @@ jobs: ZOOM_INTEGRATION_TEST_CREDS: ${{ secrets.ZOOM_INTEGRATION_TEST_CREDS }} PLAID_INTEGRATION_TEST_CREDS: ${{ secrets.PLAID_INTEGRATION_TEST_CREDS }} DESTINATION_S3_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_INTEGRATION_TEST_CREDS }} + DESTINATION_GCS_CREDS: ${{ secrets.DESTINATION_GCS_CREDS }} - run: | ./tools/bin/ci_integration_test.sh ${{ github.event.inputs.connector }} name: test ${{ github.event.inputs.connector }} diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/ca8f6566-e555-4b40-943a-545bf123117a.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/ca8f6566-e555-4b40-943a-545bf123117a.json new file mode 100644 index 0000000000000..4d95d6eabbcea --- /dev/null +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/ca8f6566-e555-4b40-943a-545bf123117a.json @@ -0,0 +1,7 @@ +{ + "destinationDefinitionId": "ca8f6566-e555-4b40-943a-545bf123117a", + "name": "Google Cloud Storage (GCS)", + "dockerRepository": "airbyte/destination-gcs", + "dockerImageTag": "0.1.0", + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/gcs" +} diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index a0f810b5c90e8..038ed4a0fa96a 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -24,6 +24,11 @@ dockerRepository: airbyte/destination-bigquery-denormalized dockerImageTag: 0.1.0 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery +- destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a + name: Google Cloud Storage (GCS) + dockerRepository: airbyte/destination-gcs + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs - destinationDefinitionId: 356668e2-7e34-47f3-a3b0-67a8a481b692 name: Google PubSub dockerRepository: airbyte/destination-pubsub diff --git a/airbyte-integrations/builds.md b/airbyte-integrations/builds.md index fa81fcb6336c9..0db0cb6d94710 100644 --- a/airbyte-integrations/builds.md +++ b/airbyte-integrations/builds.md @@ -127,14 +127,16 @@ # Destinations BigQuery [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-bigquery%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-bigquery) + Google Cloud Storage (GCS) [![destination-gcs](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-s3%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-gcs) + + Google PubSub [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-pubsub) + Local CSV [![destination-csv](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-csv%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-csv) Local JSON [![destination-local-json](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-local-json%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-local-json) Postgres [![destination-postgres](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-postgres%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-postgres) - Google PubSub [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-pubsub) - Redshift [![destination-redshift](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-redshift%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-redshift) S3 [![destination-s3](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-s3%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-s3) diff --git a/airbyte-integrations/connectors/destination-gcs/.dockerignore b/airbyte-integrations/connectors/destination-gcs/.dockerignore new file mode 100644 index 0000000000000..65c7d0ad3e73c --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/destination-gcs/Dockerfile b/airbyte-integrations/connectors/destination-gcs/Dockerfile new file mode 100644 index 0000000000000..158ac162a09ff --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/Dockerfile @@ -0,0 +1,11 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte +ENV APPLICATION destination-gcs + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-gcs diff --git a/airbyte-integrations/connectors/destination-gcs/README.md b/airbyte-integrations/connectors/destination-gcs/README.md new file mode 100644 index 0000000000000..6ad38446997d7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/README.md @@ -0,0 +1,26 @@ +# Destination Google Cloud Storage (GCS) + +In order to test the D3 destination, you need an Google Cloud Platform account. + +## Community Contributor + +As a community contributor, you can follow these steps to run integration tests. + +- Create an GCS bucket for testing. +- Generate a [HMAC key](https://cloud.google.com/storage/docs/authentication/hmackeys) for the bucket with reading and writing permissions. Please note that currently only the HMAC key credential is supported. More credential types will be added in the future. +- Paste the bucket and key information into the config files under [`./sample_secrets`](./sample_secrets). +- Rename the directory from `sample_secrets` to `secrets`. +- Feel free to modify the config files with different settings in the acceptance test file (e.g. `GcsCsvDestinationAcceptanceTest.java`, method `getFormatConfig`), as long as they follow the schema defined in [spec.json](src/main/resources/spec.json). + +## Airbyte Employee + +- Access the `destination gcs creds` secrets on Last Pass, and put it in `sample_secrets/config.json`. +- Rename the directory from `sample_secrets` to `secrets`. + +## Add New Output Format +- Add a new enum in `S3Format`. +- Modify `spec.json` to specify the configuration of this new format. +- Update `S3FormatConfigs` to be able to construct a config for this new format. +- Create a new package under `io.airbyte.integrations.destination.gcs`. +- Implement a new `GcsWriter`. The implementation can extend `BaseGcsWriter`. +- Write an acceptance test for the new output format. The test can extend `GcsDestinationAcceptanceTest`. diff --git a/airbyte-integrations/connectors/destination-gcs/build.gradle b/airbyte-integrations/connectors/destination-gcs/build.gradle new file mode 100644 index 0000000000000..58b24c19a7421 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/build.gradle @@ -0,0 +1,38 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.destination.gcs.GcsDestination' +} + +dependencies { + implementation project(':airbyte-config:models') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:bases:base-java') + implementation project(':airbyte-integrations:connectors:destination-jdbc') + implementation project(':airbyte-integrations:connectors:destination-s3') + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + + implementation platform('com.amazonaws:aws-java-sdk-bom:1.12.14') + implementation 'com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.1' + + // csv + implementation 'com.amazonaws:aws-java-sdk-s3:1.11.978' + implementation 'org.apache.commons:commons-csv:1.4' + implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2' + + // parquet + implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.0' + implementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.0' + implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.0' + implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0' + implementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: '0.2.10' + + testImplementation 'org.apache.commons:commons-lang3:3.11' + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-gcs') +} diff --git a/airbyte-integrations/connectors/destination-gcs/sample_secrets/config.json b/airbyte-integrations/connectors/destination-gcs/sample_secrets/config.json new file mode 100644 index 0000000000000..6340e629e9bbb --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/sample_secrets/config.json @@ -0,0 +1,10 @@ +{ + "gcs_bucket_name": "", + "gcs_bucket_path": "integration-test", + "gcs_bucket_region": "", + "credential": { + "credential_type": "HMAC_KEY", + "hmac_key_access_id": "", + "hmac_key_secret": "" + } +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java new file mode 100644 index 0000000000000..b50b7e46b29b6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java @@ -0,0 +1,119 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs; + +import com.amazonaws.services.s3.AmazonS3; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory; +import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; + +public class GcsConsumer extends FailureTrackingAirbyteMessageConsumer { + + private final GcsDestinationConfig gcsDestinationConfig; + private final ConfiguredAirbyteCatalog configuredCatalog; + private final GcsWriterFactory writerFactory; + private final Consumer outputRecordCollector; + private final Map streamNameAndNamespaceToWriters; + + private AirbyteMessage lastStateMessage = null; + + public GcsConsumer(GcsDestinationConfig gcsDestinationConfig, + ConfiguredAirbyteCatalog configuredCatalog, + GcsWriterFactory writerFactory, + Consumer outputRecordCollector) { + this.gcsDestinationConfig = gcsDestinationConfig; + this.configuredCatalog = configuredCatalog; + this.writerFactory = writerFactory; + this.outputRecordCollector = outputRecordCollector; + this.streamNameAndNamespaceToWriters = new HashMap<>(configuredCatalog.getStreams().size()); + } + + @Override + protected void startTracked() throws Exception { + AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig); + + Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); + + for (ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { + S3Writer writer = writerFactory + .create(gcsDestinationConfig, s3Client, configuredStream, uploadTimestamp); + writer.initialize(); + + AirbyteStream stream = configuredStream.getStream(); + AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair + .fromAirbyteSteam(stream); + streamNameAndNamespaceToWriters.put(streamNamePair, writer); + } + } + + @Override + protected void acceptTracked(AirbyteMessage airbyteMessage) throws Exception { + if (airbyteMessage.getType() == Type.STATE) { + this.lastStateMessage = airbyteMessage; + return; + } else if (airbyteMessage.getType() != Type.RECORD) { + return; + } + + AirbyteRecordMessage recordMessage = airbyteMessage.getRecord(); + AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair + .fromRecordMessage(recordMessage); + + if (!streamNameAndNamespaceToWriters.containsKey(pair)) { + throw new IllegalArgumentException( + String.format( + "Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s", + Jsons.serialize(configuredCatalog), Jsons.serialize(recordMessage))); + } + + UUID id = UUID.randomUUID(); + streamNameAndNamespaceToWriters.get(pair).write(id, recordMessage); + } + + @Override + protected void close(boolean hasFailed) throws Exception { + for (S3Writer handler : streamNameAndNamespaceToWriters.values()) { + handler.close(hasFailed); + } + // Gcs stream uploader is all or nothing if a failure happens in the destination. + if (!hasFailed) { + outputRecordCollector.accept(lastStateMessage); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestination.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestination.java new file mode 100644 index 0000000000000..a028bb5adefc7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestination.java @@ -0,0 +1,76 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs; + +import com.amazonaws.services.s3.AmazonS3; +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.BaseConnector; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory; +import io.airbyte.integrations.destination.gcs.writer.ProductionWriterFactory; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GcsDestination extends BaseConnector implements Destination { + + private static final Logger LOGGER = LoggerFactory.getLogger(GcsDestination.class); + + public static void main(String[] args) throws Exception { + new IntegrationRunner(new GcsDestination()).run(args); + } + + @Override + public AirbyteConnectionStatus check(JsonNode config) { + try { + GcsDestinationConfig destinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config); + AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(destinationConfig); + s3Client.putObject(destinationConfig.getBucketName(), "test", "check-content"); + s3Client.deleteObject(destinationConfig.getBucketName(), "test"); + return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); + } catch (Exception e) { + LOGGER.error("Exception attempting to access the Gcs bucket: {}", e.getMessage()); + return new AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage("Could not connect to the Gcs bucket with the provided configuration. \n" + e + .getMessage()); + } + } + + @Override + public AirbyteMessageConsumer getConsumer(JsonNode config, + ConfiguredAirbyteCatalog configuredCatalog, + Consumer outputRecordCollector) { + GcsWriterFactory formatterFactory = new ProductionWriterFactory(); + return new GcsConsumer(GcsDestinationConfig.getGcsDestinationConfig(config), configuredCatalog, formatterFactory, outputRecordCollector); + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfig.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfig.java new file mode 100644 index 0000000000000..30567aa6c1838 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfig.java @@ -0,0 +1,82 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.destination.gcs.credential.GcsCredentialConfig; +import io.airbyte.integrations.destination.gcs.credential.GcsCredentialConfigs; +import io.airbyte.integrations.destination.s3.S3FormatConfig; +import io.airbyte.integrations.destination.s3.S3FormatConfigs; + +public class GcsDestinationConfig { + + private final String bucketName; + private final String bucketPath; + private final String bucketRegion; + private final GcsCredentialConfig credentialConfig; + private final S3FormatConfig formatConfig; + + public GcsDestinationConfig(String bucketName, + String bucketPath, + String bucketRegion, + GcsCredentialConfig credentialConfig, + S3FormatConfig formatConfig) { + this.bucketName = bucketName; + this.bucketPath = bucketPath; + this.bucketRegion = bucketRegion; + this.credentialConfig = credentialConfig; + this.formatConfig = formatConfig; + } + + public static GcsDestinationConfig getGcsDestinationConfig(JsonNode config) { + return new GcsDestinationConfig( + config.get("gcs_bucket_name").asText(), + config.get("gcs_bucket_path").asText(), + config.get("gcs_bucket_region").asText(), + GcsCredentialConfigs.getCredentialConfig(config), + S3FormatConfigs.getS3FormatConfig(config)); + } + + public String getBucketName() { + return bucketName; + } + + public String getBucketPath() { + return bucketPath; + } + + public String getBucketRegion() { + return bucketRegion; + } + + public GcsCredentialConfig getCredentialConfig() { + return credentialConfig; + } + + public S3FormatConfig getFormatConfig() { + return formatConfig; + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsS3Helper.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsS3Helper.java new file mode 100644 index 0000000000000..d5112b50bbd19 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsS3Helper.java @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import io.airbyte.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig; + +public class GcsS3Helper { + + private static final String GCS_ENDPOINT = "https://storage.googleapis.com"; + + public static AmazonS3 getGcsS3Client(GcsDestinationConfig gcsDestinationConfig) { + GcsHmacKeyCredentialConfig hmacKeyCredential = (GcsHmacKeyCredentialConfig) gcsDestinationConfig.getCredentialConfig(); + BasicAWSCredentials awsCreds = new BasicAWSCredentials(hmacKeyCredential.getHmacKeyAccessId(), hmacKeyCredential.getHmacKeySecret()); + + return AmazonS3ClientBuilder.standard() + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration(GCS_ENDPOINT, gcsDestinationConfig.getBucketRegion())) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java new file mode 100644 index 0000000000000..60b7d30b8a0d9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java @@ -0,0 +1,107 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs.avro; + +import alex.mojaki.s3upload.MultiPartOutputStream; +import alex.mojaki.s3upload.StreamTransferManager; +import com.amazonaws.services.s3.AmazonS3; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter; +import io.airbyte.integrations.destination.s3.S3Format; +import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory; +import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; +import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig; +import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper; +import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.io.IOException; +import java.sql.Timestamp; +import java.util.UUID; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericDatumWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GcsAvroWriter extends BaseGcsWriter implements S3Writer { + + protected static final Logger LOGGER = LoggerFactory.getLogger(GcsAvroWriter.class); + + private final AvroRecordFactory avroRecordFactory; + private final StreamTransferManager uploadManager; + private final MultiPartOutputStream outputStream; + private final DataFileWriter dataFileWriter; + + public GcsAvroWriter(GcsDestinationConfig config, + AmazonS3 s3Client, + ConfiguredAirbyteStream configuredStream, + Timestamp uploadTimestamp, + Schema schema, + JsonFieldNameUpdater nameUpdater) + throws IOException { + super(config, s3Client, configuredStream); + + String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.AVRO); + String objectKey = String.join("/", outputPrefix, outputFilename); + + LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), + objectKey); + + this.avroRecordFactory = new AvroRecordFactory(schema, nameUpdater); + this.uploadManager = S3StreamTransferManagerHelper.getDefault(config.getBucketName(), objectKey, s3Client); + // We only need one output stream as we only have one input stream. This is reasonably performant. + this.outputStream = uploadManager.getMultiPartOutputStreams().get(0); + + S3AvroFormatConfig formatConfig = (S3AvroFormatConfig) config.getFormatConfig(); + // The DataFileWriter always uses binary encoding. + // If json encoding is needed in the future, use the GenericDatumWriter directly. + this.dataFileWriter = new DataFileWriter<>(new GenericDatumWriter()) + .setCodec(formatConfig.getCodecFactory()) + .create(schema, outputStream); + } + + @Override + public void write(UUID id, AirbyteRecordMessage recordMessage) throws IOException { + dataFileWriter.append(avroRecordFactory.getAvroRecord(id, recordMessage)); + } + + @Override + protected void closeWhenSucceed() throws IOException { + dataFileWriter.close(); + outputStream.close(); + uploadManager.complete(); + } + + @Override + protected void closeWhenFail() throws IOException { + dataFileWriter.close(); + outputStream.close(); + uploadManager.abort(); + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredential.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredential.java new file mode 100644 index 0000000000000..fe1b0ddb61e85 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredential.java @@ -0,0 +1,29 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs.credential; + +public enum GcsCredential { + HMAC_KEY +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialConfig.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialConfig.java new file mode 100644 index 0000000000000..78854bc80ed01 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialConfig.java @@ -0,0 +1,31 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs.credential; + +public interface GcsCredentialConfig { + + GcsCredential getCredentialType(); + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialConfigs.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialConfigs.java new file mode 100644 index 0000000000000..2e4d599076c6f --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialConfigs.java @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs.credential; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; + +public class GcsCredentialConfigs { + + public static GcsCredentialConfig getCredentialConfig(JsonNode config) { + JsonNode credentialConfig = config.get("credential"); + GcsCredential credentialType = GcsCredential.valueOf(credentialConfig.get("credential_type").asText().toUpperCase()); + + if (credentialType == GcsCredential.HMAC_KEY) { + return new GcsHmacKeyCredentialConfig(credentialConfig); + } + throw new RuntimeException("Unexpected credential: " + Jsons.serialize(credentialConfig)); + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.java new file mode 100644 index 0000000000000..41afcc258af03 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.java @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs.credential; + +import com.fasterxml.jackson.databind.JsonNode; + +public class GcsHmacKeyCredentialConfig implements GcsCredentialConfig { + + private final String hmacKeyAccessId; + private final String hmacKeySecret; + + public GcsHmacKeyCredentialConfig(JsonNode credentialConfig) { + this.hmacKeyAccessId = credentialConfig.get("hmac_key_access_id").asText(); + this.hmacKeySecret = credentialConfig.get("hmac_key_secret").asText(); + } + + public String getHmacKeyAccessId() { + return hmacKeyAccessId; + } + + public String getHmacKeySecret() { + return hmacKeySecret; + } + + @Override + public GcsCredential getCredentialType() { + return GcsCredential.HMAC_KEY; + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java new file mode 100644 index 0000000000000..ede22ebce82e5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java @@ -0,0 +1,102 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs.csv; + +import alex.mojaki.s3upload.MultiPartOutputStream; +import alex.mojaki.s3upload.StreamTransferManager; +import com.amazonaws.services.s3.AmazonS3; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter; +import io.airbyte.integrations.destination.s3.S3Format; +import io.airbyte.integrations.destination.s3.csv.CsvSheetGenerator; +import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig; +import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper; +import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.util.UUID; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.csv.QuoteMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GcsCsvWriter extends BaseGcsWriter implements S3Writer { + + private static final Logger LOGGER = LoggerFactory.getLogger(GcsCsvWriter.class); + + private final CsvSheetGenerator csvSheetGenerator; + private final StreamTransferManager uploadManager; + private final MultiPartOutputStream outputStream; + private final CSVPrinter csvPrinter; + + public GcsCsvWriter(GcsDestinationConfig config, + AmazonS3 s3Client, + ConfiguredAirbyteStream configuredStream, + Timestamp uploadTimestamp) + throws IOException { + super(config, s3Client, configuredStream); + + S3CsvFormatConfig formatConfig = (S3CsvFormatConfig) config.getFormatConfig(); + this.csvSheetGenerator = CsvSheetGenerator.Factory.create(configuredStream.getStream().getJsonSchema(), formatConfig); + + String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.CSV); + String objectKey = String.join("/", outputPrefix, outputFilename); + + LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), + objectKey); + + this.uploadManager = S3StreamTransferManagerHelper.getDefault(config.getBucketName(), objectKey, s3Client); + // We only need one output stream as we only have one input stream. This is reasonably performant. + this.outputStream = uploadManager.getMultiPartOutputStreams().get(0); + this.csvPrinter = new CSVPrinter(new PrintWriter(outputStream, true, StandardCharsets.UTF_8), + CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL) + .withHeader(csvSheetGenerator.getHeaderRow().toArray(new String[0]))); + } + + @Override + public void write(UUID id, AirbyteRecordMessage recordMessage) throws IOException { + csvPrinter.printRecord(csvSheetGenerator.getDataRow(id, recordMessage)); + } + + @Override + protected void closeWhenSucceed() throws IOException { + csvPrinter.close(); + outputStream.close(); + uploadManager.complete(); + } + + @Override + protected void closeWhenFail() throws IOException { + csvPrinter.close(); + outputStream.close(); + uploadManager.abort(); + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlWriter.java new file mode 100644 index 0000000000000..690493790d3c8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlWriter.java @@ -0,0 +1,99 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs.jsonl; + +import alex.mojaki.s3upload.MultiPartOutputStream; +import alex.mojaki.s3upload.StreamTransferManager; +import com.amazonaws.services.s3.AmazonS3; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter; +import io.airbyte.integrations.destination.s3.S3Format; +import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper; +import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GcsJsonlWriter extends BaseGcsWriter implements S3Writer { + + protected static final Logger LOGGER = LoggerFactory.getLogger(GcsJsonlWriter.class); + + private static final ObjectMapper MAPPER = MoreMappers.initMapper(); + + private final StreamTransferManager uploadManager; + private final MultiPartOutputStream outputStream; + private final PrintWriter printWriter; + + public GcsJsonlWriter(GcsDestinationConfig config, + AmazonS3 s3Client, + ConfiguredAirbyteStream configuredStream, + Timestamp uploadTimestamp) { + super(config, s3Client, configuredStream); + + String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.JSONL); + String objectKey = String.join("/", outputPrefix, outputFilename); + + LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), objectKey); + + this.uploadManager = S3StreamTransferManagerHelper.getDefault(config.getBucketName(), objectKey, s3Client); + // We only need one output stream as we only have one input stream. This is reasonably performant. + this.outputStream = uploadManager.getMultiPartOutputStreams().get(0); + this.printWriter = new PrintWriter(outputStream, true, StandardCharsets.UTF_8); + } + + @Override + public void write(UUID id, AirbyteRecordMessage recordMessage) { + ObjectNode json = MAPPER.createObjectNode(); + json.put(JavaBaseConstants.COLUMN_NAME_AB_ID, id.toString()); + json.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt()); + json.set(JavaBaseConstants.COLUMN_NAME_DATA, recordMessage.getData()); + printWriter.println(Jsons.serialize(json)); + } + + @Override + protected void closeWhenSucceed() { + printWriter.close(); + outputStream.close(); + uploadManager.complete(); + } + + @Override + protected void closeWhenFail() { + printWriter.close(); + outputStream.close(); + uploadManager.abort(); + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/parquet/GcsParquetWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/parquet/GcsParquetWriter.java new file mode 100644 index 0000000000000..fd2cbce5b1bc5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/parquet/GcsParquetWriter.java @@ -0,0 +1,147 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs.parquet; + +import com.amazonaws.services.s3.AmazonS3; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig; +import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter; +import io.airbyte.integrations.destination.s3.S3Format; +import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; +import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig; +import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.sql.Timestamp; +import java.util.UUID; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +public class GcsParquetWriter extends BaseGcsWriter implements S3Writer { + + private static final Logger LOGGER = LoggerFactory.getLogger(GcsParquetWriter.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final ObjectWriter WRITER = MAPPER.writer(); + + private final Schema schema; + private final JsonFieldNameUpdater nameUpdater; + private final ParquetWriter parquetWriter; + private final JsonAvroConverter converter = new JsonAvroConverter(); + + public GcsParquetWriter(GcsDestinationConfig config, + AmazonS3 s3Client, + ConfiguredAirbyteStream configuredStream, + Timestamp uploadTimestamp, + Schema schema, + JsonFieldNameUpdater nameUpdater) + throws URISyntaxException, IOException { + super(config, s3Client, configuredStream); + this.schema = schema; + this.nameUpdater = nameUpdater; + + String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.PARQUET); + String objectKey = String.join("/", outputPrefix, outputFilename); + LOGGER.info("Storage path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), objectKey); + + URI uri = new URI(String.format("s3a://%s/%s/%s", config.getBucketName(), outputPrefix, outputFilename)); + Path path = new Path(uri); + + LOGGER.info("Full GCS path for stream '{}': {}", stream.getName(), path); + + S3ParquetFormatConfig formatConfig = (S3ParquetFormatConfig) config.getFormatConfig(); + Configuration hadoopConfig = getHadoopConfig(config); + this.parquetWriter = AvroParquetWriter.builder(HadoopOutputFile.fromPath(path, hadoopConfig)) + .withSchema(schema) + .withCompressionCodec(formatConfig.getCompressionCodec()) + .withRowGroupSize(formatConfig.getBlockSize()) + .withMaxPaddingSize(formatConfig.getMaxPaddingSize()) + .withPageSize(formatConfig.getPageSize()) + .withDictionaryPageSize(formatConfig.getDictionaryPageSize()) + .withDictionaryEncoding(formatConfig.isDictionaryEncoding()) + .build(); + } + + public static Configuration getHadoopConfig(GcsDestinationConfig config) { + GcsHmacKeyCredentialConfig hmacKeyCredential = (GcsHmacKeyCredentialConfig) config.getCredentialConfig(); + Configuration hadoopConfig = new Configuration(); + + // the default org.apache.hadoop.fs.s3a.S3AFileSystem does not work for GCS + hadoopConfig.set("fs.s3a.impl", "io.airbyte.integrations.destination.gcs.util.GcsS3FileSystem"); + + // https://stackoverflow.com/questions/64141204/process-data-in-google-storage-on-an-aws-emr-cluster-in-spark + hadoopConfig.set("fs.s3a.access.key", hmacKeyCredential.getHmacKeyAccessId()); + hadoopConfig.set("fs.s3a.secret.key", hmacKeyCredential.getHmacKeySecret()); + hadoopConfig.setBoolean("fs.s3a.path.style.access", true); + hadoopConfig.set("fs.s3a.endpoint", "storage.googleapis.com"); + hadoopConfig.setInt("fs.s3a.list.version", 1); + + return hadoopConfig; + } + + @Override + public void write(UUID id, AirbyteRecordMessage recordMessage) throws IOException { + JsonNode inputData = recordMessage.getData(); + inputData = nameUpdater.getJsonWithStandardizedFieldNames(inputData); + + ObjectNode jsonRecord = MAPPER.createObjectNode(); + jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString()); + jsonRecord.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt()); + jsonRecord.setAll((ObjectNode) inputData); + + GenericData.Record avroRecord = converter.convertToGenericDataRecord(WRITER.writeValueAsBytes(jsonRecord), schema); + parquetWriter.write(avroRecord); + } + + @Override + public void close(boolean hasFailed) throws IOException { + if (hasFailed) { + LOGGER.warn("Failure detected. Aborting upload of stream '{}'...", stream.getName()); + parquetWriter.close(); + LOGGER.warn("Upload of stream '{}' aborted.", stream.getName()); + } else { + LOGGER.info("Uploading remaining data for stream '{}'.", stream.getName()); + parquetWriter.close(); + LOGGER.info("Upload completed for stream '{}'.", stream.getName()); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/util/GcsS3FileSystem.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/util/GcsS3FileSystem.java new file mode 100644 index 0000000000000..e4dc2bcc17d27 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/util/GcsS3FileSystem.java @@ -0,0 +1,46 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs.util; + +import java.io.IOException; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3AFileSystem; + +/** + * Patch {@link S3AFileSystem} to make it work for GCS. + */ +public class GcsS3FileSystem extends S3AFileSystem { + + /** + * Method {@code doesBucketExistV2} used in the {@link S3AFileSystem#verifyBucketExistsV2} does not + * work for GCS. + */ + @Override + @Retries.RetryTranslated + protected void verifyBucketExistsV2() throws IOException { + super.verifyBucketExists(); + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java new file mode 100644 index 0000000000000..8e664c26414f0 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java @@ -0,0 +1,160 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs.writer; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; +import com.amazonaws.services.s3.model.HeadBucketRequest; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.integrations.destination.s3.S3DestinationConstants; +import io.airbyte.integrations.destination.s3.S3Format; +import io.airbyte.integrations.destination.s3.util.S3OutputPathHelper; +import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; +import java.io.IOException; +import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.LinkedList; +import java.util.List; +import java.util.TimeZone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The base implementation takes care of the following: + *
  • Create shared instance variables.
  • + *
  • Create the bucket and prepare the bucket path.
  • + */ +public abstract class BaseGcsWriter implements S3Writer { + + private static final Logger LOGGER = LoggerFactory.getLogger(BaseGcsWriter.class); + + protected final GcsDestinationConfig config; + protected final AmazonS3 s3Client; + protected final AirbyteStream stream; + protected final DestinationSyncMode syncMode; + protected final String outputPrefix; + + protected BaseGcsWriter(GcsDestinationConfig config, + AmazonS3 s3Client, + ConfiguredAirbyteStream configuredStream) { + this.config = config; + this.s3Client = s3Client; + this.stream = configuredStream.getStream(); + this.syncMode = configuredStream.getDestinationSyncMode(); + this.outputPrefix = S3OutputPathHelper.getOutputPrefix(config.getBucketPath(), stream); + } + + /** + *
  • 1. Create bucket if necessary.
  • + *
  • 2. Under OVERWRITE mode, delete all objects with the output prefix.
  • + */ + @Override + public void initialize() { + String bucket = config.getBucketName(); + if (!gcsBucketExist(s3Client, bucket)) { + LOGGER.info("Bucket {} does not exist; creating...", bucket); + s3Client.createBucket(bucket); + LOGGER.info("Bucket {} has been created.", bucket); + } + + if (syncMode == DestinationSyncMode.OVERWRITE) { + LOGGER.info("Overwrite mode"); + List keysToDelete = new LinkedList<>(); + List objects = s3Client.listObjects(bucket, outputPrefix) + .getObjectSummaries(); + for (S3ObjectSummary object : objects) { + keysToDelete.add(new KeyVersion(object.getKey())); + } + + if (keysToDelete.size() > 0) { + LOGGER.info("Purging non-empty output path for stream '{}' under OVERWRITE mode...", stream.getName()); + // Google Cloud Storage doesn't accept request to delete multiple objects + for (KeyVersion keyToDelete : keysToDelete) { + s3Client.deleteObject(bucket, keyToDelete.getKey()); + } + LOGGER.info("Deleted {} file(s) for stream '{}'.", keysToDelete.size(), + stream.getName()); + } + } + } + + /** + * {@link AmazonS3#doesBucketExistV2} should be used to check the bucket existence. However, this + * method does not work for GCS. So we use {@link AmazonS3#headBucket} instead, which will throw an + * exception if the bucket does not exist, or there is no permission to access it. + */ + public boolean gcsBucketExist(AmazonS3 s3Client, String bucket) { + try { + s3Client.headBucket(new HeadBucketRequest(bucket)); + return true; + } catch (Exception e) { + return false; + } + } + + @Override + public void close(boolean hasFailed) throws IOException { + if (hasFailed) { + LOGGER.warn("Failure detected. Aborting upload of stream '{}'...", stream.getName()); + closeWhenFail(); + LOGGER.warn("Upload of stream '{}' aborted.", stream.getName()); + } else { + LOGGER.info("Uploading remaining data for stream '{}'.", stream.getName()); + closeWhenSucceed(); + LOGGER.info("Upload completed for stream '{}'.", stream.getName()); + } + } + + /** + * Operations that will run when the write succeeds. + */ + protected void closeWhenSucceed() throws IOException { + // Do nothing by default + } + + /** + * Operations that will run when the write fails. + */ + protected void closeWhenFail() throws IOException { + // Do nothing by default + } + + // Filename: __0. + public static String getOutputFilename(Timestamp timestamp, S3Format format) { + DateFormat formatter = new SimpleDateFormat(S3DestinationConstants.YYYY_MM_DD_FORMAT_STRING); + formatter.setTimeZone(TimeZone.getTimeZone("UTC")); + return String.format( + "%s_%d_0.%s", + formatter.format(timestamp), + timestamp.getTime(), + format.getFileExtension()); + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/GcsWriterFactory.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/GcsWriterFactory.java new file mode 100644 index 0000000000000..1918881f67205 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/GcsWriterFactory.java @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs.writer; + +import com.amazonaws.services.s3.AmazonS3; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.sql.Timestamp; + +/** + * Create different {@link GcsWriterFactory} based on {@link GcsDestinationConfig}. + */ +public interface GcsWriterFactory { + + S3Writer create(GcsDestinationConfig config, + AmazonS3 s3Client, + ConfiguredAirbyteStream configuredStream, + Timestamp uploadTimestamp) + throws Exception; + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/ProductionWriterFactory.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/ProductionWriterFactory.java new file mode 100644 index 0000000000000..28996732d8107 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/ProductionWriterFactory.java @@ -0,0 +1,86 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs.writer; + +import com.amazonaws.services.s3.AmazonS3; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.integrations.destination.gcs.avro.GcsAvroWriter; +import io.airbyte.integrations.destination.gcs.csv.GcsCsvWriter; +import io.airbyte.integrations.destination.gcs.jsonl.GcsJsonlWriter; +import io.airbyte.integrations.destination.gcs.parquet.GcsParquetWriter; +import io.airbyte.integrations.destination.s3.S3Format; +import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; +import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter; +import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.sql.Timestamp; +import org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProductionWriterFactory implements GcsWriterFactory { + + protected static final Logger LOGGER = LoggerFactory.getLogger(ProductionWriterFactory.class); + + @Override + public S3Writer create(GcsDestinationConfig config, + AmazonS3 s3Client, + ConfiguredAirbyteStream configuredStream, + Timestamp uploadTimestamp) + throws Exception { + S3Format format = config.getFormatConfig().getFormat(); + + if (format == S3Format.AVRO || format == S3Format.PARQUET) { + AirbyteStream stream = configuredStream.getStream(); + + JsonToAvroSchemaConverter schemaConverter = new JsonToAvroSchemaConverter(); + Schema avroSchema = schemaConverter.getAvroSchema(stream.getJsonSchema(), stream.getName(), stream.getNamespace(), true); + JsonFieldNameUpdater nameUpdater = new JsonFieldNameUpdater(schemaConverter.getStandardizedNames()); + + LOGGER.info("Paquet schema for stream {}: {}", stream.getName(), avroSchema.toString(false)); + if (nameUpdater.hasNameUpdate()) { + LOGGER.info("The following field names will be standardized: {}", nameUpdater); + } + + if (format == S3Format.AVRO) { + return new GcsAvroWriter(config, s3Client, configuredStream, uploadTimestamp, avroSchema, nameUpdater); + } else { + return new GcsParquetWriter(config, s3Client, configuredStream, uploadTimestamp, avroSchema, nameUpdater); + } + } + + if (format == S3Format.CSV) { + return new GcsCsvWriter(config, s3Client, configuredStream, uploadTimestamp); + } + + if (format == S3Format.JSONL) { + return new GcsJsonlWriter(config, s3Client, configuredStream, uploadTimestamp); + } + + throw new RuntimeException("Unexpected GCS destination format: " + format); + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-gcs/src/main/resources/spec.json new file mode 100644 index 0000000000000..2e68885449d85 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/resources/spec.json @@ -0,0 +1,328 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/gcs", + "supportsIncremental": true, + "supportsNormalization": false, + "supportsDBT": false, + "supported_destination_sync_modes": ["overwrite", "append"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "GCS Destination Spec", + "type": "object", + "required": [ + "gcs_bucket_name", + "gcs_bucket_path", + "gcs_bucket_region", + "credential", + "format" + ], + "additionalProperties": false, + "properties": { + "gcs_bucket_name": { + "title": "GCS Bucket Name", + "type": "string", + "description": "The name of the GCS bucket.", + "examples": ["airbyte_sync"] + }, + "gcs_bucket_path": { + "description": "Directory under the GCS bucket where data will be written.", + "type": "string", + "examples": ["data_sync/test"] + }, + "gcs_bucket_region": { + "title": "GCS Bucket Region", + "type": "string", + "default": "", + "description": "The region of the GCS bucket.", + "enum": [ + "", + "-- North America --", + "northamerica-northeast1", + "us-central1", + "us-east1", + "us-east4", + "us-west1", + "us-west2", + "us-west3", + "us-west4", + "-- South America --", + "southamerica-east1", + "-- Europe --", + "europe-central2", + "europe-north1", + "europe-west1", + "europe-west2", + "europe-west3", + "europe-west4", + "europe-west6", + "-- Asia --", + "asia-east1", + "asia-east2", + "asia-northeast1", + "asia-northeast2", + "asia-northeast3", + "asia-south1", + "asia-south2", + "asia-southeast1", + "asia-southeast2", + "-- Australia --", + "australia-southeast1", + "australia-southeast2", + "-- Multi-regions --", + "asia", + "eu", + "us", + "-- Dual-regions --", + "asia1", + "eur4", + "nam4" + ] + }, + "credential": { + "title": "Credential", + "type": "object", + "oneOf": [ + { + "title": "HMAC key", + "required": [ + "credential_type", + "hmac_key_access_id", + "hmac_key_secret" + ], + "properties": { + "credential_type": { + "type": "string", + "enum": ["HMAC_KEY"], + "default": "HMAC_KEY" + }, + "hmac_key_access_id": { + "type": "string", + "description": "HMAC key access ID. When linked to a service account, this ID is 61 characters long; when linked to a user account, it is 24 characters long.", + "title": "HMAC Key Access ID", + "airbyte_secret": true, + "examples": ["1234567890abcdefghij1234"] + }, + "hmac_key_secret": { + "type": "string", + "description": "The corresponding secret for the access ID. It is a 40-character base-64 encoded string.", + "title": "HMAC Key Secret", + "airbyte_secret": true, + "examples": ["1234567890abcdefghij1234567890ABCDEFGHIJ"] + } + } + } + ] + }, + "format": { + "title": "Output Format", + "type": "object", + "description": "Output data format", + "oneOf": [ + { + "title": "Avro: Apache Avro", + "required": ["format_type", "compression_codec"], + "properties": { + "format_type": { + "type": "string", + "enum": ["Avro"], + "default": "Avro" + }, + "compression_codec": { + "title": "Compression Codec", + "description": "The compression algorithm used to compress data. Default to no compression.", + "type": "object", + "oneOf": [ + { + "title": "no compression", + "required": ["codec"], + "properties": { + "codec": { + "type": "string", + "enum": ["no compression"], + "default": "no compression" + } + } + }, + { + "title": "Deflate", + "required": ["codec", "compression_level"], + "properties": { + "codec": { + "type": "string", + "enum": ["Deflate"], + "default": "Deflate" + }, + "compression_level": { + "title": "Deflate level", + "description": "0: no compression & fastest, 9: best compression & slowest.", + "type": "integer", + "default": 0, + "minimum": 0, + "maximum": 9 + } + } + }, + { + "title": "bzip2", + "required": ["codec"], + "properties": { + "codec": { + "type": "string", + "enum": ["bzip2"], + "default": "bzip2" + } + } + }, + { + "title": "xz", + "required": ["codec", "compression_level"], + "properties": { + "codec": { + "type": "string", + "enum": ["xz"], + "default": "xz" + }, + "compression_level": { + "title": "Compression level", + "description": "See here for details.", + "type": "integer", + "default": 6, + "minimum": 0, + "maximum": 9 + } + } + }, + { + "title": "zstandard", + "required": ["codec", "compression_level"], + "properties": { + "codec": { + "type": "string", + "enum": ["zstandard"], + "default": "zstandard" + }, + "compression_level": { + "title": "Compression level", + "description": "Negative levels are 'fast' modes akin to lz4 or snappy, levels above 9 are generally for archival purposes, and levels above 18 use a lot of memory.", + "type": "integer", + "default": 3, + "minimum": -5, + "maximum": 22 + }, + "include_checksum": { + "title": "Include checksum", + "description": "If true, include a checksum with each data block.", + "type": "boolean", + "default": false + } + } + }, + { + "title": "snappy", + "required": ["codec"], + "properties": { + "codec": { + "type": "string", + "enum": ["snappy"], + "default": "snappy" + } + } + } + ] + } + } + }, + { + "title": "CSV: Comma-Separated Values", + "required": ["format_type", "flattening"], + "properties": { + "format_type": { + "type": "string", + "enum": ["CSV"], + "default": "CSV" + }, + "flattening": { + "type": "string", + "title": "Normalization (Flattening)", + "description": "Whether the input json data should be normalized (flattened) in the output CSV. Please refer to docs for details.", + "default": "No flattening", + "enum": ["No flattening", "Root level flattening"] + } + } + }, + { + "title": "JSON Lines: newline-delimited JSON", + "required": ["format_type"], + "properties": { + "format_type": { + "type": "string", + "enum": ["JSONL"], + "default": "JSONL" + } + } + }, + { + "title": "Parquet: Columnar Storage", + "required": ["format_type"], + "properties": { + "format_type": { + "type": "string", + "enum": ["Parquet"], + "default": "Parquet" + }, + "compression_codec": { + "title": "Compression Codec", + "description": "The compression algorithm used to compress data pages.", + "type": "string", + "enum": [ + "UNCOMPRESSED", + "SNAPPY", + "GZIP", + "LZO", + "BROTLI", + "LZ4", + "ZSTD" + ], + "default": "UNCOMPRESSED" + }, + "block_size_mb": { + "title": "Block Size (Row Group Size) (MB)", + "description": "This is the size of a row group being buffered in memory. It limits the memory usage when writing. Larger values will improve the IO when reading, but consume more memory when writing. Default: 128 MB.", + "type": "integer", + "default": 128, + "examples": [128] + }, + "max_padding_size_mb": { + "title": "Max Padding Size (MB)", + "description": "Maximum size allowed as padding to align row groups. This is also the minimum size of a row group. Default: 8 MB.", + "type": "integer", + "default": 8, + "examples": [8] + }, + "page_size_kb": { + "title": "Page Size (KB)", + "description": "The page size is for compression. A block is composed of pages. A page is the smallest unit that must be read fully to access a single record. If this value is too small, the compression will deteriorate. Default: 1024 KB.", + "type": "integer", + "default": 1024, + "examples": [1024] + }, + "dictionary_page_size_kb": { + "title": "Dictionary Page Size (KB)", + "description": "There is one dictionary page per column per row group when dictionary encoding is used. The dictionary page size works like the page size but for dictionary. Default: 1024 KB.", + "type": "integer", + "default": 1024, + "examples": [1024] + }, + "dictionary_encoding": { + "title": "Dictionary Encoding", + "description": "Default: true.", + "type": "boolean", + "default": true + } + } + } + ] + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/AvroRecordHelper.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/AvroRecordHelper.java new file mode 100644 index 0000000000000..db7cf31e1d7fe --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/AvroRecordHelper.java @@ -0,0 +1,65 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.util.MoreIterators; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; +import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter; + +public class AvroRecordHelper { + + public static JsonFieldNameUpdater getFieldNameUpdater(String streamName, String namespace, JsonNode streamSchema) { + JsonToAvroSchemaConverter schemaConverter = new JsonToAvroSchemaConverter(); + schemaConverter.getAvroSchema(streamSchema, streamName, namespace, true); + return new JsonFieldNameUpdater(schemaConverter.getStandardizedNames()); + } + + /** + * Convert an Airbyte JsonNode from Avro / Parquet Record to a plain one. + *
  • Remove the airbyte id and emission timestamp fields.
  • + *
  • Remove null fields that must exist in Parquet but does not in original Json.
  • This + * function mutates the input Json. + */ + public static JsonNode pruneAirbyteJson(JsonNode input) { + ObjectNode output = (ObjectNode) input; + + // Remove Airbyte columns. + output.remove(JavaBaseConstants.COLUMN_NAME_AB_ID); + output.remove(JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + + // Fields with null values does not exist in the original Json but only in Parquet. + for (String field : MoreIterators.toList(output.fieldNames())) { + if (output.get(field) == null || output.get(field).isNull()) { + output.remove(field); + } + } + + return output; + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java new file mode 100644 index 0000000000000..230cd278c91cd --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java @@ -0,0 +1,85 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs; + +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectReader; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.destination.s3.S3Format; +import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; +import java.util.LinkedList; +import java.util.List; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.SeekableByteArrayInput; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericDatumReader; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +public class GcsAvroDestinationAcceptanceTest extends GcsDestinationAcceptanceTest { + + private final JsonAvroConverter converter = new JsonAvroConverter(); + + protected GcsAvroDestinationAcceptanceTest() { + super(S3Format.AVRO); + } + + @Override + protected JsonNode getFormatConfig() { + return Jsons.deserialize("{\n" + + " \"format_type\": \"Avro\",\n" + + " \"compression_codec\": { \"codec\": \"no compression\", \"compression_level\": 5, \"include_checksum\": true }\n" + + "}"); + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, String streamName, String namespace, JsonNode streamSchema) throws Exception { + JsonFieldNameUpdater nameUpdater = AvroRecordHelper.getFieldNameUpdater(streamName, namespace, streamSchema); + + List objectSummaries = getAllSyncedObjects(streamName, namespace); + List jsonRecords = new LinkedList<>(); + + for (S3ObjectSummary objectSummary : objectSummaries) { + S3Object object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey()); + try (DataFileReader dataFileReader = new DataFileReader<>( + new SeekableByteArrayInput(object.getObjectContent().readAllBytes()), + new GenericDatumReader<>())) { + ObjectReader jsonReader = MAPPER.reader(); + while (dataFileReader.hasNext()) { + GenericData.Record record = dataFileReader.next(); + byte[] jsonBytes = converter.convertToJson(record); + JsonNode jsonRecord = jsonReader.readTree(jsonBytes); + jsonRecord = nameUpdater.getJsonWithOriginalFieldNames(jsonRecord); + jsonRecords.add(AvroRecordHelper.pruneAirbyteJson(jsonRecord)); + } + } + } + + return jsonRecords; + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java new file mode 100644 index 0000000000000..be64006c6744d --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java @@ -0,0 +1,131 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs; + +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.s3.S3Format; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.StreamSupport; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.csv.QuoteMode; + +public class GcsCsvDestinationAcceptanceTest extends GcsDestinationAcceptanceTest { + + public GcsCsvDestinationAcceptanceTest() { + super(S3Format.CSV); + } + + @Override + protected JsonNode getFormatConfig() { + return Jsons.deserialize("{\n" + + " \"format_type\": \"CSV\",\n" + + " \"flattening\": \"Root level flattening\"\n" + + "}"); + } + + /** + * Convert json_schema to a map from field name to field types. + */ + private static Map getFieldTypes(JsonNode streamSchema) { + Map fieldTypes = new HashMap<>(); + JsonNode fieldDefinitions = streamSchema.get("properties"); + Iterator> iterator = fieldDefinitions.fields(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + fieldTypes.put(entry.getKey(), entry.getValue().get("type").asText()); + } + return fieldTypes; + } + + private static JsonNode getJsonNode(Map input, Map fieldTypes) { + ObjectNode json = MAPPER.createObjectNode(); + + if (input.containsKey(JavaBaseConstants.COLUMN_NAME_DATA)) { + return Jsons.deserialize(input.get(JavaBaseConstants.COLUMN_NAME_DATA)); + } + + for (Map.Entry entry : input.entrySet()) { + String key = entry.getKey(); + if (key.equals(JavaBaseConstants.COLUMN_NAME_AB_ID) || key + .equals(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) { + continue; + } + String value = entry.getValue(); + if (value == null || value.equals("")) { + continue; + } + String type = fieldTypes.get(key); + switch (type) { + case "boolean" -> json.put(key, Boolean.valueOf(value)); + case "integer" -> json.put(key, Integer.valueOf(value)); + case "number" -> json.put(key, Double.valueOf(value)); + default -> json.put(key, value); + } + } + return json; + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws IOException { + List objectSummaries = getAllSyncedObjects(streamName, namespace); + + Map fieldTypes = getFieldTypes(streamSchema); + List jsonRecords = new LinkedList<>(); + + for (S3ObjectSummary objectSummary : objectSummaries) { + S3Object object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey()); + try (Reader in = new InputStreamReader(object.getObjectContent(), StandardCharsets.UTF_8)) { + Iterable records = CSVFormat.DEFAULT + .withQuoteMode(QuoteMode.NON_NUMERIC) + .withFirstRecordAsHeader() + .parse(in); + StreamSupport.stream(records.spliterator(), false) + .forEach(r -> jsonRecords.add(getJsonNode(r.toMap(), fieldTypes))); + } + } + + return jsonRecords; + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java new file mode 100644 index 0000000000000..d4341aee30dfc --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java @@ -0,0 +1,168 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.destination.s3.S3Format; +import io.airbyte.integrations.destination.s3.S3FormatConfig; +import io.airbyte.integrations.destination.s3.util.S3OutputPathHelper; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * When adding a new GCS destination acceptance test, extend this class and do the following: + *
  • Implement {@link #getFormatConfig} that returns a {@link S3FormatConfig}
  • + *
  • Implement {@link #retrieveRecords} that returns the Json records for the test
  • + * + * Under the hood, a {@link io.airbyte.integrations.destination.gcs.GcsDestinationConfig} is + * constructed as follows: + *
  • Retrieve the secrets from "secrets/config.json"
  • + *
  • Get the GCS bucket path from the constructor
  • + *
  • Get the format config from {@link #getFormatConfig}
  • + */ +public abstract class GcsDestinationAcceptanceTest extends DestinationAcceptanceTest { + + protected static final Logger LOGGER = LoggerFactory.getLogger(GcsDestinationAcceptanceTest.class); + protected static final ObjectMapper MAPPER = MoreMappers.initMapper(); + + protected final String secretFilePath = "secrets/config.json"; + protected final S3Format outputFormat; + protected JsonNode configJson; + protected GcsDestinationConfig config; + protected AmazonS3 s3Client; + + protected GcsDestinationAcceptanceTest(S3Format outputFormat) { + this.outputFormat = outputFormat; + } + + protected JsonNode getBaseConfigJson() { + return Jsons.deserialize(IOs.readFile(Path.of(secretFilePath))); + } + + @Override + protected String getImageName() { + return "airbyte/destination-gcs:dev"; + } + + @Override + protected JsonNode getConfig() { + return configJson; + } + + @Override + protected JsonNode getFailCheckConfig() { + JsonNode baseJson = getBaseConfigJson(); + JsonNode failCheckJson = Jsons.clone(baseJson); + // invalid credential + ((ObjectNode) failCheckJson).put("access_key_id", "fake-key"); + ((ObjectNode) failCheckJson).put("secret_access_key", "fake-secret"); + return failCheckJson; + } + + /** + * Helper method to retrieve all synced objects inside the configured bucket path. + */ + protected List getAllSyncedObjects(String streamName, String namespace) { + String outputPrefix = S3OutputPathHelper + .getOutputPrefix(config.getBucketPath(), namespace, streamName); + List objectSummaries = s3Client + .listObjects(config.getBucketName(), outputPrefix) + .getObjectSummaries() + .stream() + .sorted(Comparator.comparingLong(o -> o.getLastModified().getTime())) + .collect(Collectors.toList()); + LOGGER.info( + "All objects: {}", + objectSummaries.stream().map(o -> String.format("%s/%s", o.getBucketName(), o.getKey())).collect(Collectors.toList())); + return objectSummaries; + } + + protected abstract JsonNode getFormatConfig(); + + /** + * This method does the following: + *
  • Construct the GCS destination config.
  • + *
  • Construct the GCS client.
  • + */ + @Override + protected void setup(TestDestinationEnv testEnv) { + JsonNode baseConfigJson = getBaseConfigJson(); + // Set a random GCS bucket path for each integration test + JsonNode configJson = Jsons.clone(baseConfigJson); + String testBucketPath = String.format( + "%s_test_%s", + outputFormat.name().toLowerCase(Locale.ROOT), + RandomStringUtils.randomAlphanumeric(5)); + ((ObjectNode) configJson) + .put("gcs_bucket_path", testBucketPath) + .set("format", getFormatConfig()); + this.configJson = configJson; + this.config = GcsDestinationConfig.getGcsDestinationConfig(configJson); + LOGGER.info("Test full path: {}/{}", config.getBucketName(), config.getBucketPath()); + + this.s3Client = GcsS3Helper.getGcsS3Client(config); + } + + /** + * Remove all the S3 output from the tests. + */ + @Override + protected void tearDown(TestDestinationEnv testEnv) { + List keysToDelete = new LinkedList<>(); + List objects = s3Client + .listObjects(config.getBucketName(), config.getBucketPath()) + .getObjectSummaries(); + for (S3ObjectSummary object : objects) { + keysToDelete.add(new KeyVersion(object.getKey())); + } + + if (keysToDelete.size() > 0) { + LOGGER.info("Tearing down test bucket path: {}/{}", config.getBucketName(), + config.getBucketPath()); + // Google Cloud Storage doesn't accept request to delete multiple objects + for (KeyVersion keyToDelete : keysToDelete) { + s3Client.deleteObject(config.getBucketName(), keyToDelete.getKey()); + } + LOGGER.info("Deleted {} file(s).", keysToDelete.size()); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsJsonlDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsJsonlDestinationAcceptanceTest.java new file mode 100644 index 0000000000000..5753f9a971b0a --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsJsonlDestinationAcceptanceTest.java @@ -0,0 +1,75 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs; + +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.s3.S3Format; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; + +public class GcsJsonlDestinationAcceptanceTest extends GcsDestinationAcceptanceTest { + + protected GcsJsonlDestinationAcceptanceTest() { + super(S3Format.JSONL); + } + + @Override + protected JsonNode getFormatConfig() { + return Jsons.deserialize("{\n" + + " \"format_type\": \"JSONL\"\n" + + "}"); + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws IOException { + List objectSummaries = getAllSyncedObjects(streamName, namespace); + List jsonRecords = new LinkedList<>(); + + for (S3ObjectSummary objectSummary : objectSummaries) { + S3Object object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey()); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(object.getObjectContent(), StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + jsonRecords.add(Jsons.deserialize(line).get(JavaBaseConstants.COLUMN_NAME_DATA)); + } + } + } + + return jsonRecords; + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java new file mode 100644 index 0000000000000..f5ddccd8d44e8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java @@ -0,0 +1,96 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs; + +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectReader; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.destination.gcs.parquet.GcsParquetWriter; +import io.airbyte.integrations.destination.s3.S3Format; +import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.LinkedList; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.hadoop.ParquetReader; +import tech.allegro.schema.json2avro.converter.JsonAvroConverter; + +public class GcsParquetDestinationAcceptanceTest extends GcsDestinationAcceptanceTest { + + private final JsonAvroConverter converter = new JsonAvroConverter(); + + protected GcsParquetDestinationAcceptanceTest() { + super(S3Format.PARQUET); + } + + @Override + protected JsonNode getFormatConfig() { + return Jsons.deserialize("{\n" + + " \"format_type\": \"Parquet\",\n" + + " \"compression_codec\": \"GZIP\"\n" + + "}"); + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws IOException, URISyntaxException { + JsonFieldNameUpdater nameUpdater = AvroRecordHelper.getFieldNameUpdater(streamName, namespace, streamSchema); + + List objectSummaries = getAllSyncedObjects(streamName, namespace); + List jsonRecords = new LinkedList<>(); + + for (S3ObjectSummary objectSummary : objectSummaries) { + S3Object object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey()); + URI uri = new URI(String.format("s3a://%s/%s", object.getBucketName(), object.getKey())); + var path = new org.apache.hadoop.fs.Path(uri); + Configuration hadoopConfig = GcsParquetWriter.getHadoopConfig(config); + + try (ParquetReader parquetReader = ParquetReader.builder(new AvroReadSupport<>(), path) + .withConf(hadoopConfig) + .build()) { + ObjectReader jsonReader = MAPPER.reader(); + GenericData.Record record; + while ((record = parquetReader.read()) != null) { + byte[] jsonBytes = converter.convertToJson(record); + JsonNode jsonRecord = jsonReader.readTree(jsonBytes); + jsonRecord = nameUpdater.getJsonWithOriginalFieldNames(jsonRecord); + jsonRecords.add(AvroRecordHelper.pruneAirbyteJson(jsonRecord)); + } + } + } + + return jsonRecords; + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfigTest.java b/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfigTest.java new file mode 100644 index 0000000000000..466dbb282a88b --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfigTest.java @@ -0,0 +1,65 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.gcs; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.integrations.destination.gcs.credential.GcsCredentialConfig; +import io.airbyte.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig; +import io.airbyte.integrations.destination.s3.S3FormatConfig; +import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig; +import java.io.IOException; +import org.junit.jupiter.api.Test; + +class GcsDestinationConfigTest { + + @Test + public void testGetGcsDestinationConfig() throws IOException { + JsonNode configJson = Jsons.deserialize(MoreResources.readResource("test_config.json")); + + GcsDestinationConfig config = GcsDestinationConfig.getGcsDestinationConfig(configJson); + assertEquals("test_bucket", config.getBucketName()); + assertEquals("test_path", config.getBucketPath()); + assertEquals("us-west1", config.getBucketRegion()); + + GcsCredentialConfig credentialConfig = config.getCredentialConfig(); + assertTrue(credentialConfig instanceof GcsHmacKeyCredentialConfig); + + GcsHmacKeyCredentialConfig hmacKeyConfig = (GcsHmacKeyCredentialConfig) credentialConfig; + assertEquals("test_access_id", hmacKeyConfig.getHmacKeyAccessId()); + assertEquals("test_secret", hmacKeyConfig.getHmacKeySecret()); + + S3FormatConfig formatConfig = config.getFormatConfig(); + assertTrue(formatConfig instanceof S3AvroFormatConfig); + + S3AvroFormatConfig avroFormatConfig = (S3AvroFormatConfig) formatConfig; + assertEquals("deflate-5", avroFormatConfig.getCodecFactory().toString()); + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/test/resources/test_config.json b/airbyte-integrations/connectors/destination-gcs/src/test/resources/test_config.json new file mode 100644 index 0000000000000..e480298f69d18 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/test/resources/test_config.json @@ -0,0 +1,17 @@ +{ + "gcs_bucket_name": "test_bucket", + "gcs_bucket_path": "test_path", + "gcs_bucket_region": "us-west1", + "credential": { + "credential_type": "HMAC_KEY", + "hmac_key_access_id": "test_access_id", + "hmac_key_secret": "test_secret" + }, + "format": { + "format_type": "Avro", + "compression_codec": { + "codec": "Deflate", + "compression_level": 5 + } + } +} diff --git a/airbyte-integrations/connectors/destination-s3/README.md b/airbyte-integrations/connectors/destination-s3/README.md index 6873c3d7290a8..e163606e68a71 100644 --- a/airbyte-integrations/connectors/destination-s3/README.md +++ b/airbyte-integrations/connectors/destination-s3/README.md @@ -1,6 +1,6 @@ # S3 Test Configuration -In order to test the D3 destination, you need an AWS account (or alternative S3 account). +In order to test the S3 destination, you need an AWS account (or alternative S3 account). ## Community Contributor @@ -14,8 +14,7 @@ As a community contributor, you will need access to AWS to run the integration t ## Airbyte Employee -- Access the `destination s3 * creds` secrets on Last Pass. The `*` here represents the different file format. -- Replace the `config.json` under `sample_secrets`. +- Access the `destination s3 creds` secrets on Last Pass, and put it in `sample_secrets/config.json`. - Rename the directory from `sample_secrets` to `secrets`. ## Add New Output Format diff --git a/airbyte-integrations/connectors/source-dixa/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-dixa/unit_tests/unit_test.py index c6806f54c0126..033751492cbf3 100644 --- a/airbyte-integrations/connectors/source-dixa/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-dixa/unit_tests/unit_test.py @@ -20,6 +20,8 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +# + from datetime import datetime, timezone import pytest @@ -28,9 +30,7 @@ @pytest.fixture def conversation_export(): - return ConversationExport( - start_date=datetime(year=2021, month=7, day=1, hour=12, tzinfo=timezone.utc), batch_size=1, logger=None - ) + return ConversationExport(start_date=datetime(year=2021, month=7, day=1, hour=12, tzinfo=timezone.utc), batch_size=1, logger=None) def test_validate_ms_timestamp_with_valid_input(): @@ -67,14 +67,8 @@ def test_add_days_to_ms_timestamp(): def test_stream_slices_without_state(conversation_export): conversation_export.end_timestamp = 1625270400001 # 2021-07-03 00:00:00 + 1 ms expected_slices = [ - { - 'updated_after': 1625140800000, # 2021-07-01 12:00:00 - 'updated_before': 1625227200000 # 2021-07-02 12:00:00 - }, - { - 'updated_after': 1625227200000, - 'updated_before': 1625270400001 - } + {"updated_after": 1625140800000, "updated_before": 1625227200000}, # 2021-07-01 12:00:00 # 2021-07-02 12:00:00 + {"updated_after": 1625227200000, "updated_before": 1625270400001}, ] actual_slices = conversation_export.stream_slices() assert actual_slices == expected_slices @@ -85,12 +79,7 @@ def test_stream_slices_without_state_large_batch(): start_date=datetime(year=2021, month=7, day=1, hour=12, tzinfo=timezone.utc), batch_size=31, logger=None ) conversation_export.end_timestamp = 1625270400001 # 2021-07-03 00:00:00 + 1 ms - expected_slices = [ - { - 'updated_after': 1625140800000, # 2021-07-01 12:00:00 - 'updated_before': 1625270400001 - } - ] + expected_slices = [{"updated_after": 1625140800000, "updated_before": 1625270400001}] # 2021-07-01 12:00:00 actual_slices = conversation_export.stream_slices() assert actual_slices == expected_slices @@ -107,26 +96,18 @@ def test_stream_slices_with_start_timestamp_larger_than_state(): Test that if start_timestamp is larger than state, then start at start_timestamp. """ conversation_export = ConversationExport( - start_date=datetime(year=2021, month=12, day=1, tzinfo=timezone.utc), batch_size=31, - logger=None + start_date=datetime(year=2021, month=12, day=1, tzinfo=timezone.utc), batch_size=31, logger=None ) conversation_export.end_timestamp = 1638360000001 # 2021-12-01 12:00:00 + 1 ms - expected_slices = [ - { - 'updated_after': 1638316800000, # 2021-07-01 12:00:00 - 'updated_before': 1638360000001 - } - ] - actual_slices = conversation_export.stream_slices( - stream_state={'updated_at': 1625220000000} # # 2021-07-02 12:00:00 - ) + expected_slices = [{"updated_after": 1638316800000, "updated_before": 1638360000001}] # 2021-07-01 12:00:00 + actual_slices = conversation_export.stream_slices(stream_state={"updated_at": 1625220000000}) # # 2021-07-02 12:00:00 assert actual_slices == expected_slices def test_get_updated_state_without_state(conversation_export): - assert conversation_export.get_updated_state( - current_stream_state=None, latest_record={'updated_at': 1625263200000} - ) == {'updated_at': 1625140800000} + assert conversation_export.get_updated_state(current_stream_state=None, latest_record={"updated_at": 1625263200000}) == { + "updated_at": 1625140800000 + } def test_get_updated_state_with_bigger_state(conversation_export): diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index f947b36085824..fb3e0842e9e64 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -102,6 +102,7 @@ * [Zoom](integrations/sources/zoom.md) * [Destinations](integrations/destinations/README.md) * [BigQuery](integrations/destinations/bigquery.md) + * [Google Cloud Storage (GCS)](integrations/destinations/gcs.md) * [Google PubSub](integrations/destinations/pubsub.md) * [Local CSV](integrations/destinations/local-csv.md) * [Local JSON](integrations/destinations/local-json.md) diff --git a/docs/integrations/README.md b/docs/integrations/README.md index 79f9f9c4a7606..ffb608efd5807 100644 --- a/docs/integrations/README.md +++ b/docs/integrations/README.md @@ -84,6 +84,7 @@ Airbyte uses a grading system for connectors to help users understand what to ex | Connector | Grade | |----|----| |[BigQuery](./destinations/bigquery.md)| Certified | +|[Google Cloud Storage (GCS)](./destinations/s3.md)| Alpha | |[Google Pubsub](./destinations/pubsub.md)| Alpha | |[Local CSV](./destinations/local-csv.md)| Certified | |[Local JSON](./destinations/local-json.md)| Certified | diff --git a/docs/integrations/destinations/gcs.md b/docs/integrations/destinations/gcs.md new file mode 100644 index 0000000000000..0e1312e315b4c --- /dev/null +++ b/docs/integrations/destinations/gcs.md @@ -0,0 +1,375 @@ +# Google Cloud Storage + +## Overview + +This destination writes data to GCS bucket. + +The Airbyte GCS destination allows you to sync data to cloud storage buckets. Each stream is written to its own directory under the bucket. + +## Sync Mode + +| Feature | Support | Notes | +| :--- | :---: | :--- | +| Full Refresh Sync | ✅ | Warning: this mode deletes all previously synced data in the configured bucket path. | +| Incremental - Append Sync | ✅ | | +| Namespaces | ❌ | Setting a specific bucket path is equivalent to having separate namespaces. | + +## Configuration + +| Parameter | Type | Notes | +| :--- | :---: | :--- | +| GCS Bucket Name | string | Name of the bucket to sync data into. | +| GCS Bucket Path | string | Subdirectory under the above bucket to sync the data into. | +| GCS Region | string | See [here](https://cloud.google.com/storage/docs/locations) for all region codes. | +| HMAC Key Access ID | string | HMAC key access ID . The access ID for the GCS bucket. When linked to a service account, this ID is 61 characters long; when linked to a user account, it is 24 characters long. See [HMAC key](https://cloud.google.com/storage/docs/authentication/hmackeys) for details. | +| HMAC Key Secret | string | The corresponding secret for the access ID. It is a 40-character base-64 encoded string. | +| Format | object | Format specific configuration. See below for details. | + +Currently, only the [HMAC key](https://cloud.google.com/storage/docs/authentication/hmackeys) is supported. More credential types will be added in the future. + +⚠️ Please note that under "Full Refresh Sync" mode, data in the configured bucket and path will be wiped out before each sync. We recommend you to provision a dedicated S3 resource for this sync to prevent unexpected data deletion from misconfiguration. ⚠️ + +The full path of the output data is: + +``` +///--. +``` + +For example: + +``` +testing_bucket/data_output_path/public/users/2021_01_01_1609541171643_0.csv +↑ ↑ ↑ ↑ ↑ ↑ ↑ ↑ +| | | | | | | format extension +| | | | | | partition id +| | | | | upload time in millis +| | | | upload date in YYYY-MM-DD +| | | stream name +| | source namespace (if it exists) +| bucket path +bucket name +``` + +Please note that the stream name may contain a prefix, if it is configured on the connection. + +The rationales behind this naming pattern are: +1. Each stream has its own directory. +2. The data output files can be sorted by upload time. +3. The upload time composes of a date part and millis part so that it is both readable and unique. + +Currently, each data sync will only create one file per stream. In the future, the output file can be partitioned by size. Each partition is identifiable by the partition ID, which is always 0 for now. + +## Output Schema + +Each stream will be outputted to its dedicated directory according to the configuration. The complete datastore of each stream includes all the output files under that directory. You can think of the directory as equivalent of a Table in the database world. + +- Under Full Refresh Sync mode, old output files will be purged before new files are created. +- Under Incremental - Append Sync mode, new output files will be added that only contain the new data. + +### Avro + +[Apache Avro](https://avro.apache.org/) serializes data in a compact binary format. Currently, the Airbyte S3 Avro connector always uses the [binary encoding](http://avro.apache.org/docs/current/spec.html#binary_encoding), and assumes that all data records follow the same schema. + +#### Configuration + +Here is the available compression codecs: + +- No compression +- `deflate` + - Compression level + - Range `[0, 9]`. Default to 0. + - Level 0: no compression & fastest. + - Level 9: best compression & slowest. +- `bzip2` +- `xz` + - Compression level + - Range `[0, 9]`. Default to 6. + - Level 0-3 are fast with medium compression. + - Level 4-6 are fairly slow with high compression. + - Level 7-9 are like level 6 but use bigger dictionaries and have higher memory requirements. Unless the uncompressed size of the file exceeds 8 MiB, 16 MiB, or 32 MiB, it is waste of memory to use the presets 7, 8, or 9, respectively. +- `zstandard` + - Compression level + - Range `[-5, 22]`. Default to 3. + - Negative levels are 'fast' modes akin to `lz4` or `snappy`. + - Levels above 9 are generally for archival purposes. + - Levels above 18 use a lot of memory. + - Include checksum + - If set to `true`, a checksum will be included in each data block. +- `snappy` + +#### Data schema + +Under the hood, an Airbyte data stream in Json schema is converted to an Avro schema, and then the Json object is converted to an Avro record based on the Avro schema. Because the data stream can come from any data source, the Avro S3 destination connector has the following arbitrary rules. + +1. Json schema types are mapped to Avro typea as follows: + +| Json Data Type | Avro Data Type | + | :---: | :---: | +| string | string | +| number | double | +| integer | int | +| boolean | boolean | +| null | null | +| object | record | +| array | array | + +2. Built-in Json schema formats are not mapped to Avro logical types at this moment. +2. Combined restrictions ("allOf", "anyOf", and "oneOf") will be converted to type unions. The corresponding Avro schema can be less stringent. For example, the following Json schema + + ```json + { + "oneOf": [ + { "type": "string" }, + { "type": "integer" } + ] + } + ``` +will become this in Avro schema: + + ```json + { + "type": ["null", "string", "int"] + } + ``` + +2. Keyword `not` is not supported, as there is no equivalent validation mechanism in Avro schema. +3. Only alphanumeric characters and underscores (`/a-zA-Z0-9_/`) are allowed in a stream or field name. Any special character will be converted to an alphabet or underscore. For example, `spécial:character_names` will become `special_character_names`. The original names will be stored in the `doc` property in this format: `_airbyte_original_name:`. +4. All field will be nullable. For example, a `string` Json field will be typed as `["null", "string"]` in Avro. This is necessary because the incoming data stream may have optional fields. +5. For array fields in Json schema, when the `items` property is an array, it means that each element in the array should follow its own schema sequentially. For example, the following specification means the first item in the array should be a string, and the second a number. + + ```json + { + "array_field": { + "type": "array", + "items": [ + { "type": "string" }, + { "type": "number" } + ] + } + } + ``` + +This is not supported in Avro schema. As a compromise, the converter creates a union, ["string", "number"], which is less stringent: + + ```json + { + "name": "array_field", + "type": [ + "null", + { + "type": "array", + "items": ["null", "string"] + } + ], + "default": null + } + ``` + +6. Two Airbyte specific fields will be added to each Avro record: + +| Field | Schema | Document | + | :--- | :--- | :---: | +| `_airbyte_ab_id` | `uuid` | [link](http://avro.apache.org/docs/current/spec.html#UUID) +| `_airbyte_emitted_at` | `timestamp-millis` | [link](http://avro.apache.org/docs/current/spec.html#Timestamp+%28millisecond+precision%29) | + +7. Currently `additionalProperties` is not supported. This means if the source is schemaless (e.g. Mongo), or has flexible fields, they will be ignored. We will have a solution soon. Feel free to submit a new issue if this is blocking for you. + +For example, given the following Json schema: + +```json +{ + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "id": { + "type": "integer" + }, + "user": { + "type": ["null", "object"], + "properties": { + "id": { + "type": "integer" + }, + "field_with_spécial_character": { + "type": "integer" + } + } + }, + "created_at": { + "type": ["null", "string"], + "format": "date-time" + } + } +} +``` + +Its corresponding Avro schema will be: + +```json +{ + "name" : "stream_name", + "type" : "record", + "fields" : [ { + "name" : "_airbyte_ab_id", + "type" : { + "type" : "string", + "logicalType" : "uuid" + } + }, { + "name" : "_airbyte_emitted_at", + "type" : { + "type" : "long", + "logicalType" : "timestamp-millis" + } + }, { + "name" : "id", + "type" : [ "null", "int" ], + "default" : null + }, { + "name" : "user", + "type" : [ "null", { + "type" : "record", + "name" : "user", + "fields" : [ { + "name" : "id", + "type" : [ "null", "int" ], + "default" : null + }, { + "name" : "field_with_special_character", + "type" : [ "null", "int" ], + "doc" : "_airbyte_original_name:field_with_spécial_character", + "default" : null + } ] + } ], + "default" : null + }, { + "name" : "created_at", + "type" : [ "null", "string" ], + "default" : null + } ] +} +``` + +### CSV + +Like most of the other Airbyte destination connectors, usually the output has three columns: a UUID, an emission timestamp, and the data blob. With the CSV output, it is possible to normalize (flatten) the data blob to multiple columns. + +| Column | Condition | Description | +| :--- | :--- | :--- | +| `_airbyte_ab_id` | Always exists | A uuid assigned by Airbyte to each processed record. | +| `_airbyte_emitted_at` | Always exists. | A timestamp representing when the event was pulled from the data source. | +| `_airbyte_data` | When no normalization (flattening) is needed, all data reside under this column as a json blob. | +| root level fields | When root level normalization (flattening) is selected, the root level fields are expanded. | + +For example, given the following json object from a source: + +```json +{ + "user_id": 123, + "name": { + "first": "John", + "last": "Doe" + } +} +``` + +With no normalization, the output CSV is: + +| `_airbyte_ab_id` | `_airbyte_emitted_at` | `_airbyte_data` | +| :--- | :--- | :--- | +| `26d73cde-7eb1-4e1e-b7db-a4c03b4cf206` | 1622135805000 | `{ "user_id": 123, name: { "first": "John", "last": "Doe" } }` | + +With root level normalization, the output CSV is: + +| `_airbyte_ab_id` | `_airbyte_emitted_at` | `user_id` | `name` | +| :--- | :--- | :--- | :--- | +| `26d73cde-7eb1-4e1e-b7db-a4c03b4cf206` | 1622135805000 | 123 | `{ "first": "John", "last": "Doe" }` | + +### JSON Lines (JSONL) + +[Json Lines](https://jsonlines.org/) is a text format with one JSON per line. Each line has a structure as follows: + +```json +{ + "_airbyte_ab_id": "", + "_airbyte_emitted_at": "", + "_airbyte_data": "" +} +``` + +For example, given the following two json objects from a source: + +```json +[ + { + "user_id": 123, + "name": { + "first": "John", + "last": "Doe" + } + }, + { + "user_id": 456, + "name": { + "first": "Jane", + "last": "Roe" + } + } +] +``` + +They will be like this in the output file: + +```jsonl +{ "_airbyte_ab_id": "26d73cde-7eb1-4e1e-b7db-a4c03b4cf206", "_airbyte_emitted_at": "1622135805000", "_airbyte_data": { "user_id": 123, "name": { "first": "John", "last": "Doe" } } } +{ "_airbyte_ab_id": "0a61de1b-9cdd-4455-a739-93572c9a5f20", "_airbyte_emitted_at": "1631948170000", "_airbyte_data": { "user_id": 456, "name": { "first": "Jane", "last": "Roe" } } } +``` + +### Parquet + +#### Configuration + +The following configuration is available to configure the Parquet output: + +| Parameter | Type | Default | Description | +| :--- | :---: | :---: | :--- | +| `compression_codec` | enum | `UNCOMPRESSED` | **Compression algorithm**. Available candidates are: `UNCOMPRESSED`, `SNAPPY`, `GZIP`, `LZO`, `BROTLI`, `LZ4`, and `ZSTD`. | +| `block_size_mb` | integer | 128 (MB) | **Block size (row group size)** in MB. This is the size of a row group being buffered in memory. It limits the memory usage when writing. Larger values will improve the IO when reading, but consume more memory when writing. | +| `max_padding_size_mb` | integer | 8 (MB) | **Max padding size** in MB. This is the maximum size allowed as padding to align row groups. This is also the minimum size of a row group. | +| `page_size_kb` | integer | 1024 (KB) | **Page size** in KB. The page size is for compression. A block is composed of pages. A page is the smallest unit that must be read fully to access a single record. If this value is too small, the compression will deteriorate. | +| `dictionary_page_size_kb` | integer | 1024 (KB) | **Dictionary Page Size** in KB. There is one dictionary page per column per row group when dictionary encoding is used. The dictionary page size works like the page size but for dictionary. | +| `dictionary_encoding` | boolean | `true` | **Dictionary encoding**. This parameter controls whether dictionary encoding is turned on. | + +These parameters are related to the `ParquetOutputFormat`. See the [Java doc](https://www.javadoc.io/doc/org.apache.parquet/parquet-hadoop/1.12.0/org/apache/parquet/hadoop/ParquetOutputFormat.html) for more details. Also see [Parquet documentation](https://parquet.apache.org/documentation/latest/#configurations) for their recommended configurations (512 - 1024 MB block size, 8 KB page size). + +#### Data schema + +Under the hood, an Airbyte data stream in Json schema is first converted to an Avro schema, then the Json object is converted to an Avro record, and finally the Avro record is outputted to the Parquet format. See the `Data schema` section from the [Avro output](#avro) for rules and limitations. + +## Getting started + +### Requirements + +1. Allow connections from Airbyte server to your GCS cluster \(if they exist in separate VPCs\). +2. An GCP bucket with credentials \(for the COPY strategy\). + +### Setup guide + +* Fill up GCS info + * **GCS Bucket Name** + * See [this](https://cloud.google.com/storage/docs/creating-buckets) to create an S3 bucket. + * **GCS Bucket Region** + * **HMAC Key Access ID** + * See [this](https://cloud.google.com/storage/docs/authentication/hmackeys) on how to generate an access key. + * We recommend creating an Airbyte-specific user or service account. This user or account will require read and write permissions to objects in the bucket. + * **Secret Access Key** + * Corresponding key to the above access ID. +* Make sure your GCS bucket is accessible from the machine running Airbyte. + * This depends on your networking setup. + * The easiest way to verify if Airbyte is able to connect to your GCS bucket is via the check connection tool in the UI. + +## CHANGELOG + +| Version | Date | Pull Request | Subject | +| :--- | :--- | :--- | :--- | +| 0.1.0 | 2021-07-16 | [#4329](https://github.com/airbytehq/airbyte/pull/4784) | Initial release. | diff --git a/tools/bin/ci_credentials.sh b/tools/bin/ci_credentials.sh index 4d76b90bbde75..5c3aee199c286 100755 --- a/tools/bin/ci_credentials.sh +++ b/tools/bin/ci_credentials.sh @@ -30,6 +30,7 @@ write_standard_creds destination-snowflake "$SNOWFLAKE_S3_COPY_INTEGRATION_TEST_ write_standard_creds destination-snowflake "$SNOWFLAKE_GCS_COPY_INTEGRATION_TEST_CREDS" "copy_gcs_config.json" write_standard_creds destination-redshift "$AWS_REDSHIFT_INTEGRATION_TEST_CREDS" write_standard_creds destination-s3 "$DESTINATION_S3_INTEGRATION_TEST_CREDS" +write_standard_creds destination-gcs "$DESTINATION_GCS_CREDS" write_standard_creds base-normalization "$BIGQUERY_INTEGRATION_TEST_CREDS" "bigquery.json" write_standard_creds base-normalization "$SNOWFLAKE_INTEGRATION_TEST_CREDS" "snowflake.json"